Documentation Index
Fetch the complete documentation index at: https://mintlify.com/apache/flink/llms.txt
Use this file to discover all available pages before exploring further.
User-defined functions (UDFs) let you extend Flink SQL and the Table API with custom logic that cannot be expressed using built-in functions. You implement them in Java, Scala, or Python and register them with the TableEnvironment.
Flink supports the following UDF kinds:
| Kind | Base class | Input | Output |
|---|
| Scalar function | ScalarFunction | One or more values per row | One value per row |
| Async scalar function | AsyncScalarFunction | One or more values per row | One value per row (async) |
| Table function | TableFunction<T> | One or more values per row | Zero or more rows |
| Async table function | AsyncTableFunction<T> | One or more values per row | Zero or more rows (async) |
| Aggregate function | AggregateFunction<T, ACC> | Values from multiple rows | One value per group |
| Table aggregate function | TableAggregateFunction<T, ACC> | Values from multiple rows | Zero or more rows per group |
Registering functions
// register by class (recommended)
tableEnv.createTemporarySystemFunction("my_func", MyScalarFunc.class);
// register by instance (use when the function has constructor parameters)
tableEnv.createTemporarySystemFunction("my_func", new MyScalarFunc(true));
// register as a catalog function (persisted in current catalog/database)
tableEnv.createFunction("my_catalog_func", MyScalarFunc.class);
A function registered with createTemporarySystemFunction is available everywhere in the session. A function registered with createFunction is stored in the current catalog and database and survives session restarts (when using a persistent catalog).
Scalar functions
A scalar function maps zero, one, or multiple input values to a single output value. Implement one or more public eval(...) methods. The method signature determines the input and output types.
Basic example
import org.apache.flink.table.functions.ScalarFunction;
public static class HashFunction extends ScalarFunction {
// takes any type, returns INT
public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
return o.hashCode();
}
}
TableEnvironment env = TableEnvironment.create(...);
// inline call (no registration needed)
env.from("MyTable").select(call(HashFunction.class, $("myField")));
// register then call in SQL
env.createTemporarySystemFunction("HashFunction", HashFunction.class);
env.sqlQuery("SELECT HashFunction(myField) FROM MyTable").execute().print();
Overloaded eval methods
Flink dispatches to the correct overload based on argument types:
public static class SumFunction extends ScalarFunction {
public Integer eval(Integer a, Integer b) {
return a + b;
}
public Integer eval(String a, String b) {
return Integer.valueOf(a) + Integer.valueOf(b);
}
public Integer eval(Double... d) {
double result = 0;
for (double v : d) result += v;
return (int) result;
}
}
Parameterized functions
Pass constructor parameters when registering by instance:
public static class SubstringFunction extends ScalarFunction {
private final boolean endInclusive;
public SubstringFunction(boolean endInclusive) {
this.endInclusive = endInclusive;
}
public String eval(String s, Integer begin, Integer end) {
return s.substring(begin, endInclusive ? end + 1 : end);
}
}
env.createTemporarySystemFunction("substring_incl", new SubstringFunction(true));
Python scalar function
from pyflink.table.udf import udf
from pyflink.table.types import DataTypes
@udf(result_type=DataTypes.STRING())
def to_upper(s: str) -> str:
return s.upper() if s else None
t_env.create_temporary_system_function("to_upper", to_upper)
t_env.sql_query("SELECT to_upper(name) FROM users").execute().print()
Async scalar functions
Use AsyncScalarFunction when the function needs to make network calls (database lookups, REST requests). Async execution overlaps waiting time across multiple concurrent requests, dramatically increasing throughput.
import org.apache.flink.table.functions.AsyncScalarFunction;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public static class EnrichmentFunction extends AsyncScalarFunction {
private transient Executor executor;
@Override
public void open(FunctionContext context) {
executor = Executors.newFixedThreadPool(10);
}
// first arg is always CompletableFuture<ReturnType>
public void eval(CompletableFuture<String> future, Integer id) {
executor.execute(() -> {
// simulate an external lookup
try { Thread.sleep(50); } catch (InterruptedException e) {}
switch (id) {
case 1: future.complete("Alice"); break;
case 2: future.complete("Bob"); break;
default: future.completeExceptionally(
new IllegalArgumentException("Unknown id: " + id));
}
});
}
}
env.getConfig().set("table.exec.async-scalar.max-concurrent-operations", "50");
env.getConfig().set("table.exec.async-scalar.timeout", "30s");
env.createTemporarySystemFunction("enrich", EnrichmentFunction.class);
env.sqlQuery("SELECT enrich(user_id) AS user_name FROM events").execute().print();
Table functions
A table function (UDTF) maps one input row to zero, one, or many output rows. Implement eval(...) and call collect(...) to emit each output row.
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public static class SplitFunction extends TableFunction<Row> {
public void eval(String str) {
for (String s : str.split(" ")) {
collect(Row.of(s, s.length()));
}
}
}
TableEnvironment env = TableEnvironment.create(...);
env.createTemporarySystemFunction("SplitFunction", SplitFunction.class);
// inner join: rows with no split results are dropped
env.sqlQuery(
"SELECT myField, word, length " +
"FROM MyTable, LATERAL TABLE(SplitFunction(myField))"
).execute().print();
// left join: unmatched outer rows are preserved with NULL for UDTF columns
env.sqlQuery(
"SELECT myField, word, length " +
"FROM MyTable " +
"LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE"
).execute().print();
Python table function
from pyflink.table.udf import udtf
from pyflink.table.types import DataTypes
from pyflink.common import Row
@udtf(result_types=[DataTypes.STRING(), DataTypes.INT()])
def split(s: str):
for word in s.split(" "):
yield Row(word, len(word))
t_env.create_temporary_system_function("split", split)
t_env.sql_query(
"SELECT myField, word, length "
"FROM MyTable, LATERAL TABLE(split(myField))"
).execute().print()
Aggregate functions
Aggregate functions (UDAGGs) map values from multiple rows to a single scalar result. The function maintains an accumulator that is updated row by row:
createAccumulator() — creates the initial (empty) accumulator.
accumulate(acc, value...) — called for each input row.
getValue(acc) — returns the final result after all rows are processed.
retract(acc, value...) — optional; required for streaming queries that retract rows.
merge(acc, Iterable<ACC>) — optional; required for session windows and bounded OVER aggregates.
import org.apache.flink.table.functions.AggregateFunction;
import static org.apache.flink.table.api.Expressions.*;
public static class WeightedAvgAccumulator {
public long sum = 0;
public int count = 0;
}
public static class WeightedAvg
extends AggregateFunction<Long, WeightedAvgAccumulator> {
@Override
public WeightedAvgAccumulator createAccumulator() {
return new WeightedAvgAccumulator();
}
@Override
public Long getValue(WeightedAvgAccumulator acc) {
return acc.count == 0 ? null : acc.sum / acc.count;
}
public void accumulate(WeightedAvgAccumulator acc, Long value, Integer weight) {
acc.sum += value * weight;
acc.count += weight;
}
public void retract(WeightedAvgAccumulator acc, Long value, Integer weight) {
acc.sum -= value * weight;
acc.count -= weight;
}
public void merge(
WeightedAvgAccumulator acc,
Iterable<WeightedAvgAccumulator> it) {
for (WeightedAvgAccumulator a : it) {
acc.sum += a.sum;
acc.count += a.count;
}
}
}
TableEnvironment env = TableEnvironment.create(...);
env.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);
env.sqlQuery(
"SELECT category, WeightedAvg(price, quantity) AS avg_price " +
"FROM Products GROUP BY category"
).execute().print();
Type inference with annotations
Flink infers input and output types from the Java method signature using reflection. Use annotations when the default inference is insufficient.
@DataTypeHint
import org.apache.flink.table.annotation.DataTypeHint;
public static class OverloadedFunction extends ScalarFunction {
// simple case: no hint needed for Long
public Long eval(long a, long b) {
return a + b;
}
// specify precision and scale for DECIMAL
public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) {
return BigDecimal.valueOf(a + b);
}
// return a complex ROW type
@DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
public Row eval(int i) {
return Row.of(String.valueOf(i), Instant.ofEpochSecond(i));
}
}
@FunctionHint
import org.apache.flink.table.annotation.FunctionHint;
// globally declare the output type for all eval() overloads
@FunctionHint(output = @DataTypeHint("ROW<word STRING, count INT>"))
public static class WordCountFunction extends TableFunction<Row> {
public void eval(String sentence) {
for (String w : sentence.split(" ")) {
collect(Row.of(w, 1));
}
}
public void eval() {
collect(Row.of("<empty>", 0));
}
}
Named parameters with @ArgumentHint
Annotate parameters with @ArgumentHint to support named-parameter call syntax in SQL:
public static class FormatterFunction extends ScalarFunction {
public String eval(
@ArgumentHint(name = "input", isOptional = false, type = @DataTypeHint("STRING")) String s,
@ArgumentHint(name = "padding", isOptional = true, type = @DataTypeHint("INT")) Integer pad
) {
return pad == null ? s : String.format("%" + pad + "s", s);
}
}
Calling with named parameters in SQL:
SELECT FormatterFunction(input => myField, padding => 10) FROM MyTable;
-- optional parameter omitted:
SELECT FormatterFunction(input => myField) FROM MyTable;
Runtime context and open/close lifecycle
Access job parameters, metrics, and distributed cache files via FunctionContext:
public static class HashCodeFunction extends ScalarFunction {
private int factor;
@Override
public void open(FunctionContext context) throws Exception {
// read a job parameter set via env.getConfig().addJobParameter(...)
factor = Integer.parseInt(context.getJobParameter("hashcode_factor", "31"));
}
public int eval(String s) {
return s.hashCode() * factor;
}
@Override
public void close() {
// release resources (connections, thread pools, etc.)
}
}
env.getConfig().addJobParameter("hashcode_factor", "31");
env.createTemporarySystemFunction("hashCode", HashCodeFunction.class);
env.sqlQuery("SELECT hashCode(name) FROM users").execute().print();
Function resolution order
When multiple functions share the same name, Flink resolves them in this order:
- Temporary system functions
- System (built-in) functions
- Temporary catalog functions in the current catalog and database
- Catalog functions in the current catalog and database
Use a fully-qualified name (catalog.database.function) for precise reference.