Skip to content

Commit 0e8a42a

Browse files
committed
[FLINK-37914][table] Add built-in OBJECT_UPDATE function
1 parent 71f6717 commit 0e8a42a

File tree

13 files changed

+794
-6
lines changed

13 files changed

+794
-6
lines changed

docs/data/sql_functions.yml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1219,6 +1219,35 @@ valueconstruction:
12191219
- table: NUMERIC.rows
12201220
description: Creates a NUMERIC interval of rows (commonly used in window creation).
12211221

1222+
valuemodification:
1223+
- sql: OBJECT_UPDATE(object, key, value [, key, value , ...])
1224+
table: OBJECT.objectUpdate(key, value [, key, value , ...])
1225+
description: |
1226+
Updates existing fields in a structured object by providing key-value pairs.
1227+
1228+
This function takes a structured object and updates specified fields with new values.
1229+
The keys must be string literals that correspond to existing fields in the structured type.
1230+
If a key does not exist in the input object, an exception will be thrown.
1231+
If the value type is not compatible with the corresponding structured field type,
1232+
an exception will also be thrown.
1233+
1234+
The function expects alternating key-value pairs where keys are field names (non-null strings)
1235+
and values are the new values for those fields. At least one key-value pair must be provided.
1236+
The total number of arguments must be odd (object + pairs of key-value arguments).
1237+
1238+
The result type is the same structured type as the input, with the specified fields
1239+
updated to their new values.
1240+
1241+
```sql
1242+
-- Update the 'name' field of a user object
1243+
OBJECT_UPDATE(OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 14), 'name', 'Alice')
1244+
-- Returns: User{name='Alice', age=14})
1245+
1246+
-- Update multiple fields
1247+
OBJECT_UPDATE(OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 14), 'name', 'Alice', 'age', 30)
1248+
-- Returns: User{name='Alice', age=30})
1249+
```
1250+
12221251
valueaccess:
12231252
- sql: tableName.compositeType.field
12241253
table: |

docs/data/sql_functions_zh.yml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1301,6 +1301,35 @@ valueconstruction:
13011301
- table: NUMERIC.rows
13021302
description: 创建一个 NUMERIC 行间隔(通常用于窗口创建)。
13031303

1304+
valuemodification:
1305+
- sql: OBJECT_UPDATE(object, key, value [, key, value , ...])
1306+
table: OBJECT.objectUpdate(key, value [, key, value , ...])
1307+
description: |
1308+
Updates existing fields in a structured object by providing key-value pairs.
1309+
1310+
This function takes a structured object and updates specified fields with new values.
1311+
The keys must be string literals that correspond to existing fields in the structured type.
1312+
If a key does not exist in the input object, an exception will be thrown.
1313+
If the value type is not compatible with the corresponding structured field type,
1314+
an exception will also be thrown.
1315+
1316+
The function expects alternating key-value pairs where keys are field names (non-null strings)
1317+
and values are the new values for those fields. At least one key-value pair must be provided.
1318+
The total number of arguments must be odd (object + pairs of key-value arguments).
1319+
1320+
The result type is the same structured type as the input, with the specified fields
1321+
updated to their new values.
1322+
1323+
```sql
1324+
-- Update the 'name' field of a user object
1325+
OBJECT_UPDATE(OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 14), 'name', 'Alice')
1326+
-- Returns: User{name='Alice', age=14})
1327+
1328+
-- Update multiple fields
1329+
OBJECT_UPDATE(OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 14), 'name', 'Alice', 'age', 30)
1330+
-- Returns: User{name='Alice', age=30})
1331+
```
1332+
13041333
valueaccess:
13051334
- sql: tableName.compositeType.field
13061335
table: |

flink-python/docs/reference/pyflink.table/expressions.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,3 +324,13 @@ JSON functions
324324
Expression.json_query
325325
Expression.json_quote
326326
Expression.json_unquote
327+
328+
value modification functions
329+
----------------------------
330+
331+
.. currentmodule:: pyflink.table.expression
332+
333+
.. autosummary::
334+
335+
:toctree: api/
336+
Expression.object_update

flink-python/pyflink/table/expression.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2193,6 +2193,54 @@ def json_unquote(self) -> 'Expression':
21932193
"""
21942194
return _unary_op("jsonUnquote")(self)
21952195

2196+
# ---------------------------- value modification functions -----------------------------
2197+
2198+
def object_update(self, *kv) -> "Expression":
2199+
"""
2200+
Updates existing fields in a structured object by providing key-value pairs.
2201+
2202+
This function takes a structured object and updates specified fields with new values.
2203+
The keys must be string literals that correspond to existing fields in the structured type.
2204+
If a key does not exist in the input object, an exception will be thrown.
2205+
If the value type is not compatible with the corresponding structured field type,
2206+
an exception will also be thrown.
2207+
2208+
The function expects alternating key-value pairs where keys are field names (non-null strings)
2209+
and values are the new values for those fields. At least one key-value pair must be provided.
2210+
The total number of arguments must be odd (object + pairs of key-value arguments).
2211+
2212+
The result type is the same structured type as the input, with the specified fields
2213+
updated to their new values.
2214+
2215+
Example:
2216+
::
2217+
2218+
>>> # Update the 'name' field of a user object
2219+
>>> user_obj.object_update("name", "Alice")
2220+
>>> # Returns an updated user object with 'name' set to "Alice"
2221+
>>>
2222+
>>> # Update multiple fields
2223+
>>> user_obj.object_update("name", "Alice", "age", 30)
2224+
>>> # Returns an updated user object with 'name' set to "Alice" and 'age' set to 30
2225+
2226+
The result type is the same structured type as the input, with the specified
2227+
fields updated to their new values.
2228+
2229+
:param kv: key-value pairs where even-indexed elements are field names
2230+
(strings) and odd-indexed elements are the new values for those
2231+
fields
2232+
:return: expression representing the updated structured type with modified
2233+
field values
2234+
"""
2235+
gateway = get_gateway()
2236+
ApiExpressionUtils = (
2237+
gateway.jvm.org.apache.flink.table.expressions.ApiExpressionUtils
2238+
)
2239+
exprs = [
2240+
ApiExpressionUtils.objectToExpression(_get_java_expression(e)) for e in kv
2241+
]
2242+
return _binary_op("objectUpdate")(self, to_jarray(gateway.jvm.Object, exprs))
2243+
21962244

21972245
# add the docs
21982246
_make_math_log_doc()

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@
148148
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.NOT;
149149
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.NOT_BETWEEN;
150150
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.NOT_EQUALS;
151+
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.OBJECT_UPDATE;
151152
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.OR;
152153
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ORDER_ASC;
153154
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ORDER_DESC;
@@ -2488,4 +2489,48 @@ public OutType percentile(InType percentage, InType frequency) {
24882489
objectToExpression(percentage),
24892490
objectToExpression(frequency)));
24902491
}
2492+
2493+
/**
2494+
* Updates existing fields in a structured object by providing key-value pairs.
2495+
*
2496+
* <p>This function takes a structured object and updates specified fields with new values. The
2497+
* keys must be string literals that correspond to existing fields in the structured type. If a
2498+
* key does not exist in the input object, an exception will be thrown. If the value type is not
2499+
* compatible with the corresponding structured field type, an exception will also be thrown.
2500+
*
2501+
* <p>The function expects alternating key-value pairs where keys are field names (non-null
2502+
* strings) and values are the new values for those fields. At least one key-value pair must be
2503+
* provided.
2504+
*
2505+
* <p>Example usage:
2506+
*
2507+
* <pre>{@code
2508+
* // Create a structured object representing a user
2509+
* User userObject = objectOf("com.example.User", "name", "Bob", "age", 25);
2510+
*
2511+
* // Update the 'name' field of a user object
2512+
* User updatedUser1 = userObject.objectUpdate("name", "Alice")
2513+
*
2514+
* // Update multiple fields
2515+
* User updatedUser2 = userObject.objectUpdate("name", "Alice", "age", 30)
2516+
*
2517+
* }</pre>
2518+
*
2519+
* <p>The result type is the same structured type as the input, with the specified fields
2520+
* updated to their new values.
2521+
*
2522+
* @param kv key-value pairs where even-indexed elements are field names (strings) and
2523+
* odd-indexed elements are the new values for those fields
2524+
* @return expression representing a new structured object with updated field values
2525+
* @see org.apache.flink.table.functions.BuiltInFunctionDefinitions#OBJECT_UPDATE
2526+
*/
2527+
public OutType objectUpdate(InType... kv) {
2528+
final Expression[] expressions =
2529+
Stream.concat(
2530+
Stream.of(toExpr()),
2531+
Stream.of(kv).map(ApiExpressionUtils::objectToExpression))
2532+
.toArray(Expression[]::new);
2533+
return toApiSpecificExpression(
2534+
ApiExpressionUtils.unresolvedCall(OBJECT_UPDATE, expressions));
2535+
}
24912536
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1628,6 +1628,16 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
16281628
"org.apache.flink.table.runtime.functions.scalar.ObjectOfFunction")
16291629
.build();
16301630

1631+
public static final BuiltInFunctionDefinition OBJECT_UPDATE =
1632+
BuiltInFunctionDefinition.newBuilder()
1633+
.name("OBJECT_UPDATE")
1634+
.kind(SCALAR)
1635+
.inputTypeStrategy(SpecificInputTypeStrategies.OBJECT_UPDATE)
1636+
.outputTypeStrategy(SpecificTypeStrategies.OBJECT_UPDATE)
1637+
.runtimeClass(
1638+
"org.apache.flink.table.runtime.functions.scalar.ObjectUpdateFunction")
1639+
.build();
1640+
16311641
// --------------------------------------------------------------------------------------------
16321642
// Math functions
16331643
// --------------------------------------------------------------------------------------------

0 commit comments

Comments
 (0)