Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/apache/flink/llms.txt

Use this file to discover all available pages before exploring further.

Catalogs provide metadata for databases, tables, views, partitions, and functions, and the information needed to access data stored in external systems. By abstracting metadata management behind a unified API, catalogs let Table API programs and SQL queries reference objects by name without hard-coding connection details in every query. Flink always starts with a default in-memory catalog named default_catalog and a default database named default_database. Any objects you create without specifying a catalog are placed there.

Catalog types

GenericInMemoryCatalog

GenericInMemoryCatalog is the default catalog. All objects live only for the duration of the session. It is case-sensitive, unlike Hive Metastore.

JdbcCatalog

JdbcCatalog connects Flink to a relational database over JDBC. Postgres and MySQL are the two supported implementations. Tables in the database are automatically mapped to Flink tables—no manual DDL required.

HiveCatalog

HiveCatalog serves two purposes:
  1. Persistent storage for Flink metadata (tables, views, UDFs) that survives session restarts.
  2. Hive integration: reading and writing existing Hive metadata so Flink queries can access Hive tables transparently.
The Hive Metastore stores all object names in lowercase. This differs from GenericInMemoryCatalog, which is case-sensitive.

User-defined catalogs

You can implement custom catalogs by implementing the Catalog interface and a companion CatalogFactory. The factory is discovered via Java SPI: register the fully-qualified class name in META-INF/services/org.apache.flink.table.factories.Factory. The type identifier in the factory must match the type property in CREATE CATALOG DDL.
Since Flink 1.16, user-defined catalogs should load classes through CatalogFactory.Context#getClassLoader() rather than Thread.currentThread().getContextClassLoader() to avoid ClassNotFoundException.

Registering a catalog

import org.apache.flink.table.api.*;
import org.apache.flink.table.catalog.hive.HiveCatalog;

TableEnvironment tableEnv = TableEnvironment.create(
    EnvironmentSettings.inStreamingMode()
);

// create and register a HiveCatalog
HiveCatalog catalog = new HiveCatalog(
    "myhive",          // catalog name
    null,              // default database (uses "default")
    "/path/to/hive-conf"
);
tableEnv.registerCatalog("myhive", catalog);

// switch to the new catalog
tableEnv.useCatalog("myhive");
tableEnv.useDatabase("default");

Creating tables in a catalog

// must be using the target catalog first
tableEnv.useCatalog("myhive");
tableEnv.useDatabase("mydb");

// create the database
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS mydb");

// create a Kafka-backed table
tableEnv.executeSql(
    "CREATE TABLE user_events (" +
    "  user_id BIGINT," +
    "  action STRING," +
    "  event_time TIMESTAMP(3)," +
    "  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'user-events'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'format' = 'json'" +
    ")"
);

System.out.println(java.util.Arrays.toString(tableEnv.listTables()));

Catalog API reference

Database operations

// create
catalog.createDatabase("mydb", new CatalogDatabaseImpl(new HashMap<>(), ""), false);

// drop
catalog.dropDatabase("mydb", false);

// get metadata
CatalogDatabase db = catalog.getDatabase("mydb");

// check existence
boolean exists = catalog.databaseExists("mydb");

// list all databases
List<String> dbs = catalog.listDatabases();

Table operations

// create
catalog.createTable(
    new ObjectPath("mydb", "mytable"),
    CatalogTable.newBuilder()
        .schema(schema)
        .options(options)
        .build(),
    false
);

// drop
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);

// rename
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table", false);

// check existence
boolean exists = catalog.tableExists(new ObjectPath("mydb", "mytable"));

// list
List<String> tables = catalog.listTables("mydb");

Function operations

import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.FunctionLanguage;

// register a Java UDF in the catalog
catalog.createFunction(
    new ObjectPath("mydb", "my_udf"),
    new CatalogFunctionImpl("com.example.MyScalarFunction", FunctionLanguage.JAVA),
    false
);

// list functions
List<String> functions = catalog.listFunctions("mydb");

Switching catalogs and databases

Flink resolves table names relative to the current catalog and database. You can always use a fully-qualified three-part name to reference objects in any catalog:
// switch context
tableEnv.useCatalog("myhive");
tableEnv.useDatabase("analytics");

// fully-qualified reference to a table in another catalog
tableEnv.from("other_catalog.other_db.some_table");

// list
String[] catalogs = tableEnv.listCatalogs();
String[] databases = tableEnv.listDatabases();
String[] tables = tableEnv.listTables();

Catalog Store: persisting catalog configurations

A CatalogStore saves catalog configurations so they can be restored when a session restarts. Flink ships two built-in implementations:
  • GenericInMemoryCatalogStore (default): configurations are in-memory only.
  • FileCatalogStore: configurations are written to a directory on a local or remote filesystem, one file per catalog.
import org.apache.flink.table.catalog.FileCatalogStore;

CatalogStore catalogStore = new FileCatalogStore("file:///var/flink/catalog-store");

EnvironmentSettings settings = EnvironmentSettings.newInstance()
    .inBatchMode()
    .withCatalogStore(catalogStore)
    .build();

TableEnvironment tableEnv = TableEnvironment.create(settings);
Or configure via conf/config.yaml / SQL Gateway:
table.catalog-store.kind: file
table.catalog-store.file.path: file:///var/flink/catalog-store

Custom catalog implementation

Implement Catalog and CatalogFactory to integrate Flink with a proprietary metadata store. Register the factory in META-INF/services/org.apache.flink.table.factories.Factory inside your JAR, then reference it in DDL:
CREATE CATALOG my_custom_catalog WITH (
    'type' = 'my-catalog-type',
    'endpoint' = 'https://metadata.example.com'
);
The factory factoryIdentifier() must return 'my-catalog-type'.

Supporting time travel

If your catalog stores historical versions of tables, implement getTable(ObjectPath, long timestamp) to enable the FOR SYSTEM_TIME AS OF syntax:
public class MyVersionedCatalog implements Catalog {

    @Override
    public CatalogBaseTable getTable(ObjectPath tablePath, long timestamp)
            throws TableNotExistException {
        Schema schema = buildSchemaAt(timestamp);
        Map<String, String> options = buildOptionsAt(timestamp);
        return CatalogTable.newBuilder()
                .schema(schema)
                .options(options)
                .snapshot(timestamp)
                .build();
    }
}
Users can then query historical data:
SELECT * FROM my_table FOR SYSTEM_TIME AS OF TIMESTAMP '2024-01-01 00:00:00';