The JDBC connector enables Flink to read from and write to relational databases using the standard JDBC interface. It supports bounded scan sources, lookup joins for enrichment, and streaming and batch sinks. The connector is maintained in a separate repository: Repository: apache/flink-connector-jdbcDocumentation Index
Fetch the complete documentation index at: https://mintlify.com/apache/flink/llms.txt
Use this file to discover all available pages before exploring further.
The JDBC connector is externalized from the main Flink repository. Add the
flink-connector-jdbc dependency plus a JDBC driver (such as MySQL, PostgreSQL, or Oracle) to your project.Dependency
Source table
The JDBC source performs a bounded scan of a table or custom query:scan.partition.* options for parallel reads, or use a custom query with query:
Parallel scan with partitioning
For large tables, enable parallel reads by partitioning the scan:Sink table
The JDBC sink appends or upserts rows into a database table:Lookup join
The JDBC connector supports lookup joins for enriching streaming data with database lookups:For temporal lookup joins, the stream table must have a
proctime attribute defined. The FOR SYSTEM_TIME AS OF syntax ensures each lookup uses the dimension table state at the time the record arrived.Key connector options
| Option | Required | Default | Description |
|---|---|---|---|
connector | Yes | — | Must be 'jdbc'. |
url | Yes | — | JDBC connection URL, e.g. jdbc:postgresql://host:5432/db. |
table-name | Yes (if query not set) | — | Name of the table to read from or write to. |
driver | No | auto-detected | JDBC driver class name. |
username | No | — | Database username. |
password | No | — | Database password. |
query | No | — | Custom SQL query for source scan. Cannot be used together with table-name for sources. |
connection.max-retry-timeout | No | 60 s | Maximum timeout for retrying failed connections. |
scan.partition.column | No | — | Column used to partition the scan. Must be a numeric, date, or timestamp type. |
scan.partition.num | No | — | Number of partitions to split the scan into. |
scan.partition.lower-bound | No | — | Lower bound of the partition column. |
scan.partition.upper-bound | No | — | Upper bound of the partition column. |
scan.fetch-size | No | 0 (unlimited) | Number of rows to fetch per round-trip to the database. |
scan.auto-commit | No | true | Whether the JDBC connection uses auto-commit. |
lookup.cache | No | NONE | Caching strategy: NONE, PARTIAL, or FULL. |
lookup.partial-cache.max-rows | No | — | Maximum number of rows in the partial cache. |
lookup.partial-cache.expire-after-write | No | — | TTL for cache entries. |
lookup.partial-cache.cache-missing-key | No | true | Whether to cache lookups that returned no rows. |
lookup.max-retries | No | 3 | Maximum number of retries for failed lookups. |
sink.buffer-flush.max-rows | No | 100 | Maximum number of rows to buffer before flushing to the database. Set to 0 to disable buffering. |
sink.buffer-flush.interval | No | 1 s | Maximum time to buffer rows before flushing. Set to 0 to flush only when the buffer is full. |
sink.max-retries | No | 3 | Maximum number of retries for failed sink writes. |
sink.parallelism | No | — | Parallelism of the JDBC sink operator. |

