Use this file to discover all available pages before exploring further.
How do I install PyFlink?
Install PyFlink with pip:
pip install apache-flink
PyFlink requires Java 11 or 17 and Python 3.9–3.12. Verify your versions before installing:
java -version # must be 11.x or 17.xpython --version # must be 3.9–3.12
If you need Arrow-based (pandas) UDFs, also install:
pip install pyarrow pandas
Which Python versions are supported?
PyFlink supports Python 3.9, 3.10, 3.11, and 3.12. Python 3.8 and earlier are not supported.Python 2 is not supported. Python 3.8 and earlier 3.x versions are not supported.The Python version used by your UDF workers must match the version used to install PyFlink. If you use a virtual environment or add_python_archive(), configure the interpreter path explicitly:
The PyFlink version is tied to the Flink release (e.g., 2.0.0). Make sure your connector JARs match this version.
How do I wait for jobs to finish when running in mini-cluster mode?
When you run a Table API job in a Flink mini-cluster (local Python process), call .wait() after execute_insert():
# Wait for the job to complete (required in mini-cluster mode)table.execute_insert("my_sink").wait()
Without .wait(), the local Python process may exit before the job finishes producing output.On a remote cluster, omit .wait()—the call returns immediately after job submission, and the cluster runs the job asynchronously:
# On a remote cluster, do NOT call .wait()table.execute_insert("my_sink")
For the DataStream API, always call env.execute() to trigger execution. In remote mode, it returns after submission.
Why does my UDF fail with ModuleNotFoundError on the cluster?
Your local Python environment has the library, but the task managers do not. Use one of these approaches to distribute it:Option 1: requirements file
Yes. For best performance, use Arrow-based (vectorized) UDFs so that Flink passes entire batches of rows as pandas.Series objects rather than individual values:
import pandas as pdimport numpy as npfrom pyflink.table import DataTypesfrom pyflink.table.udf import udf@udf(result_type=DataTypes.DOUBLE(), func_type="pandas")def normalize(s: pd.Series) -> pd.Series: return (s - s.mean()) / s.std()
Requires pyarrow and pandas to be installed:
pip install pyarrow pandas
Why is my Python UDF slow?
Python UDFs have overhead because data must cross the JVM–Python boundary. Common solutions:
Use Arrow-based UDFs (func_type="pandas") to process batches instead of individual rows. This is the single most impactful optimization.
@udf(result_type=DataTypes.DOUBLE(), func_type="pandas")def fast_udf(s: pd.Series) -> pd.Series: return s * 2.0
Increase bundle size to reduce JVM–Python round trips: