Documentation Index
Fetch the complete documentation index at: https://mintlify.com/apache/flink/llms.txt
Use this file to discover all available pages before exploring further.
Apache ORC is a columnar binary storage format designed for high-throughput reads and strong compression. Flink’s ORC format is compatible with Apache Hive.
ORC is a serialization schema (for sinks) and a deserialization schema (for sources).
Dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-orc</artifactId>
<version>${flink.version}</version>
</dependency>
For SQL connectors, use the fat JAR:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-orc</artifactId>
<version>${flink.version}</version>
</dependency>
Usage with Table API / SQL
Filesystem source and sink
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
dt STRING
) PARTITIONED BY (dt) WITH (
'connector' = 'filesystem',
'path' = '/tmp/user_behavior',
'format' = 'orc'
)
With Snappy compression:
CREATE TABLE compressed_logs (
log_id BIGINT,
message STRING,
level STRING,
dt STRING
) PARTITIONED BY (dt) WITH (
'connector' = 'filesystem',
'path' = '/data/logs/',
'format' = 'orc',
'orc.compress' = 'SNAPPY'
)
Streaming insert
INSERT INTO user_behavior
SELECT
user_id, item_id, category_id, behavior, ts,
DATE_FORMAT(ts, 'yyyy-MM-dd') AS dt
FROM kafka_source;
ORC is a bulk-encoded format. Files are finalized on each Flink checkpoint. Tune your checkpoint interval to balance file size and latency.
| Option | Required | Default | Description |
|---|
format | Yes | — | Must be 'orc'. |
The ORC format also supports any ORC table property passed directly as a format option:
| ORC property | Common values | Description |
|---|
orc.compress | NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD | Compression codec. Default is ZLIB. |
orc.compress.size | (integer) | Compression chunk size in bytes. |
orc.stripe.size | (integer) | Size of ORC stripes in bytes. |
orc.row.index.stride | (integer) | Number of rows between row index entries. |
orc.bloom.filter.columns | (comma-separated column names) | Columns for which to create Bloom filter indexes. |
CREATE TABLE orc_table (
id BIGINT,
name STRING,
dt STRING
) PARTITIONED BY (dt) WITH (
'connector' = 'filesystem',
'path' = '/data/orc/',
'format' = 'orc',
'orc.compress' = 'ZSTD',
'orc.bloom.filter.columns' = 'id'
)
Data type mapping
ORC format type mapping is compatible with Apache Hive.
| Flink Data Type | ORC physical type | ORC logical type |
|---|
CHAR | bytes | CHAR |
VARCHAR | bytes | VARCHAR |
STRING | bytes | STRING |
BOOLEAN | long | BOOLEAN |
BYTES / BINARY / VARBINARY | bytes | BINARY |
DECIMAL | decimal | DECIMAL |
TINYINT | long | BYTE |
SMALLINT | long | SHORT |
INT | long | INT |
BIGINT | long | LONG |
FLOAT | double | FLOAT |
DOUBLE | double | DOUBLE |
DATE | long | DATE |
TIMESTAMP | timestamp | TIMESTAMP |
ARRAY | — | LIST |
MAP | — | MAP |
ROW | — | STRUCT |
Usage with DataStream API
To write ORC files from a DataStream job, implement a Vectorizer that converts your type to ORC’s VectorizedRowBatch and use OrcBulkWriterFactory with FileSink:
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.orc.vector.Vectorizer;
public class EventVectorizer extends Vectorizer<Event> implements Serializable {
public EventVectorizer(String schema) {
super(schema);
}
@Override
public void vectorize(Event element, VectorizedRowBatch batch) throws IOException {
BytesColumnVector idCol = (BytesColumnVector) batch.cols[0];
LongColumnVector tsCol = (LongColumnVector) batch.cols[1];
BytesColumnVector valueCol = (BytesColumnVector) batch.cols[2];
int row = batch.size++;
idCol.setVal(row, element.getId().getBytes(StandardCharsets.UTF_8));
tsCol.vector[row] = element.getTimestamp();
valueCol.setVal(row, element.getValue().getBytes(StandardCharsets.UTF_8));
}
}
Create the sink:
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
String schema = "struct<id:string,ts:bigint,value:string>";
DataStream<Event> stream = ...;
OrcBulkWriterFactory<Event> writerFactory =
new OrcBulkWriterFactory<>(new EventVectorizer(schema));
FileSink<Event> sink = FileSink
.forBulkFormat(
new org.apache.flink.core.fs.Path("/output/orc/"),
writerFactory)
.build();
stream.sinkTo(sink);
To configure Hadoop settings and ORC writer properties:
import org.apache.hadoop.conf.Configuration;
import java.util.Properties;
String schema = "struct<id:string,ts:bigint>";
Configuration hadoopConf = new Configuration();
Properties writerProps = new Properties();
writerProps.setProperty("orc.compress", "SNAPPY");
writerProps.setProperty("orc.stripe.size", "67108864"); // 64 MB
OrcBulkWriterFactory<Event> writerFactory =
new OrcBulkWriterFactory<>(new EventVectorizer(schema), writerProps, hadoopConf);
You can attach key-value metadata to ORC files from within the vectorize method:
@Override
public void vectorize(Event element, VectorizedRowBatch batch) throws IOException {
// ... write columns ...
this.addUserMetadata("producer", ByteBuffer.wrap("flink-job".getBytes()));
this.addUserMetadata("schema-version", ByteBuffer.wrap("v2".getBytes()));
}