Documentation Index
Fetch the complete documentation index at: https://mintlify.com/apache/flink/llms.txt
Use this file to discover all available pages before exploring further.
Catalogs provide metadata for databases, tables, views, partitions, and functions, and the information needed to access data stored in external systems. By abstracting metadata management behind a unified API, catalogs let Table API programs and SQL queries reference objects by name without hard-coding connection details in every query.
Flink always starts with a default in-memory catalog named default_catalog and a default database named default_database. Any objects you create without specifying a catalog are placed there.
Catalog types
GenericInMemoryCatalog
GenericInMemoryCatalog is the default catalog. All objects live only for the duration of the session. It is case-sensitive, unlike Hive Metastore.
JdbcCatalog
JdbcCatalog connects Flink to a relational database over JDBC. Postgres and MySQL are the two supported implementations. Tables in the database are automatically mapped to Flink tables—no manual DDL required.
HiveCatalog
HiveCatalog serves two purposes:
- Persistent storage for Flink metadata (tables, views, UDFs) that survives session restarts.
- Hive integration: reading and writing existing Hive metadata so Flink queries can access Hive tables transparently.
The Hive Metastore stores all object names in lowercase. This differs from GenericInMemoryCatalog, which is case-sensitive.
User-defined catalogs
You can implement custom catalogs by implementing the Catalog interface and a companion CatalogFactory. The factory is discovered via Java SPI: register the fully-qualified class name in META-INF/services/org.apache.flink.table.factories.Factory. The type identifier in the factory must match the type property in CREATE CATALOG DDL.
Since Flink 1.16, user-defined catalogs should load classes through CatalogFactory.Context#getClassLoader() rather than Thread.currentThread().getContextClassLoader() to avoid ClassNotFoundException.
Registering a catalog
import org.apache.flink.table.api.*;
import org.apache.flink.table.catalog.hive.HiveCatalog;
TableEnvironment tableEnv = TableEnvironment.create(
EnvironmentSettings.inStreamingMode()
);
// create and register a HiveCatalog
HiveCatalog catalog = new HiveCatalog(
"myhive", // catalog name
null, // default database (uses "default")
"/path/to/hive-conf"
);
tableEnv.registerCatalog("myhive", catalog);
// switch to the new catalog
tableEnv.useCatalog("myhive");
tableEnv.useDatabase("default");
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.catalog import HiveCatalog
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
catalog = HiveCatalog("myhive", None, "/path/to/hive-conf")
t_env.register_catalog("myhive", catalog)
t_env.use_catalog("myhive")
t_env.use_database("default")
CREATE CATALOG myhive WITH (
'type' = 'hive',
'hive-conf-dir' = '/path/to/hive-conf'
);
USE CATALOG myhive;
USE default;
Creating tables in a catalog
Java (DDL)
Java (API)
Python
// must be using the target catalog first
tableEnv.useCatalog("myhive");
tableEnv.useDatabase("mydb");
// create the database
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS mydb");
// create a Kafka-backed table
tableEnv.executeSql(
"CREATE TABLE user_events (" +
" user_id BIGINT," +
" action STRING," +
" event_time TIMESTAMP(3)," +
" WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'user-events'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")"
);
System.out.println(java.util.Arrays.toString(tableEnv.listTables()));
import org.apache.flink.table.api.*;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.catalog.hive.HiveCatalog;
TableEnvironment tableEnv = TableEnvironment.create(
EnvironmentSettings.inStreamingMode()
);
HiveCatalog catalog = new HiveCatalog("myhive", null, "/path/to/hive-conf");
tableEnv.registerCatalog("myhive", catalog);
// create a database programmatically
catalog.createDatabase("mydb", new CatalogDatabaseImpl(new java.util.HashMap<>(), ""), false);
// define schema
Schema schema = Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build();
// create a catalog table using a TableDescriptor
tableEnv.createTable(
"myhive.mydb.mytable",
TableDescriptor.forConnector("kafka")
.schema(schema)
.option("topic", "my-topic")
.option("properties.bootstrap.servers", "localhost:9092")
.option("format", "json")
.build()
);
// list tables in the database
catalog.listTables("mydb").forEach(System.out::println);
from pyflink.table import *
from pyflink.table.catalog import HiveCatalog, CatalogDatabase
t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
catalog = HiveCatalog("myhive", None, "/path/to/hive-conf")
t_env.register_catalog("myhive", catalog)
# create a database
database = CatalogDatabase.create_instance({"k1": "v1"}, None)
catalog.create_database("mydb", database)
# create a table
schema = Schema.new_builder() \
.column("name", DataTypes.STRING()) \
.column("age", DataTypes.INT()) \
.build()
t_env.create_table(
"myhive.mydb.mytable",
TableDescriptor.for_connector("kafka")
.schema(schema)
.option("topic", "my-topic")
.option("properties.bootstrap.servers", "localhost:9092")
.option("format", "json")
.build()
)
Catalog API reference
Database operations
// create
catalog.createDatabase("mydb", new CatalogDatabaseImpl(new HashMap<>(), ""), false);
// drop
catalog.dropDatabase("mydb", false);
// get metadata
CatalogDatabase db = catalog.getDatabase("mydb");
// check existence
boolean exists = catalog.databaseExists("mydb");
// list all databases
List<String> dbs = catalog.listDatabases();
Table operations
// create
catalog.createTable(
new ObjectPath("mydb", "mytable"),
CatalogTable.newBuilder()
.schema(schema)
.options(options)
.build(),
false
);
// drop
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
// rename
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table", false);
// check existence
boolean exists = catalog.tableExists(new ObjectPath("mydb", "mytable"));
// list
List<String> tables = catalog.listTables("mydb");
Function operations
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.FunctionLanguage;
// register a Java UDF in the catalog
catalog.createFunction(
new ObjectPath("mydb", "my_udf"),
new CatalogFunctionImpl("com.example.MyScalarFunction", FunctionLanguage.JAVA),
false
);
// list functions
List<String> functions = catalog.listFunctions("mydb");
Switching catalogs and databases
Flink resolves table names relative to the current catalog and database. You can always use a fully-qualified three-part name to reference objects in any catalog:
// switch context
tableEnv.useCatalog("myhive");
tableEnv.useDatabase("analytics");
// fully-qualified reference to a table in another catalog
tableEnv.from("other_catalog.other_db.some_table");
// list
String[] catalogs = tableEnv.listCatalogs();
String[] databases = tableEnv.listDatabases();
String[] tables = tableEnv.listTables();
USE CATALOG myhive;
USE analytics;
SHOW CATALOGS;
SHOW DATABASES;
SHOW TABLES;
-- fully-qualified query
SELECT * FROM other_catalog.other_db.some_table;
Catalog Store: persisting catalog configurations
A CatalogStore saves catalog configurations so they can be restored when a session restarts. Flink ships two built-in implementations:
GenericInMemoryCatalogStore (default): configurations are in-memory only.
FileCatalogStore: configurations are written to a directory on a local or remote filesystem, one file per catalog.
import org.apache.flink.table.catalog.FileCatalogStore;
CatalogStore catalogStore = new FileCatalogStore("file:///var/flink/catalog-store");
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inBatchMode()
.withCatalogStore(catalogStore)
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
Or configure via conf/config.yaml / SQL Gateway:
table.catalog-store.kind: file
table.catalog-store.file.path: file:///var/flink/catalog-store
Custom catalog implementation
Implement Catalog and CatalogFactory to integrate Flink with a proprietary metadata store.
Register the factory in META-INF/services/org.apache.flink.table.factories.Factory inside your JAR, then reference it in DDL:
CREATE CATALOG my_custom_catalog WITH (
'type' = 'my-catalog-type',
'endpoint' = 'https://metadata.example.com'
);
The factory factoryIdentifier() must return 'my-catalog-type'.
Supporting time travel
If your catalog stores historical versions of tables, implement getTable(ObjectPath, long timestamp) to enable the FOR SYSTEM_TIME AS OF syntax:
public class MyVersionedCatalog implements Catalog {
@Override
public CatalogBaseTable getTable(ObjectPath tablePath, long timestamp)
throws TableNotExistException {
Schema schema = buildSchemaAt(timestamp);
Map<String, String> options = buildOptionsAt(timestamp);
return CatalogTable.newBuilder()
.schema(schema)
.options(options)
.snapshot(timestamp)
.build();
}
}
Users can then query historical data:
SELECT * FROM my_table FOR SYSTEM_TIME AS OF TIMESTAMP '2024-01-01 00:00:00';