Understanding Apache Fluss
This is a data system internals blog post. So if you enjoyed my table formats internals blog posts , or writing on Apache Kafka internals or Apache BookKeeper internals , you might enjoy this one. But beware, it’s long and detailed. Also note that I work for Confluent, which also runs Apache Flink but does not run nor contributes to Apache Fluss. However, this post aims to be a faithful and objective description of Fluss. Apache Fluss is a table storage engine for Flink being developed by Alibaba in collaboration with Ververica. To write this blog post, I reverse engineered a high level architecture by reading the Fluss code from the main branch (and running tests), in August 2025. This follows my same approach to my writing about Kafka, Pulsar, BookKeeper, and the table formats (Iceberg, Delta, Hudi and Paimon) as the code is always the true source of information. Unlike the rest, I have not had time to formally verify Fluss in TLA+ or Fizzbee, though I did not notice any obvious issues that are not already logged in a GitHub issue. Let’s get started. We’ll start with some high level discussion in the Fluss Overview section, then get into the internals in the Fluss Cluster Core Architecture and Fluss Lakehouse Architecture sections. In simplest terms, Apache Fluss has been designed as a disaggregated table storage engine for Apache Flink. It runs as a distributed cluster of tablet and coordinator servers, though much of the logic is housed in client-side modules in Flink. Fig 1. Fluss architecture components Fluss provides three main features: Low-latency table storage Append-only tables known as Log Tables. Keyed, mutable tables, known as Primary Key Tables (PK tables), that emit changelog streams. Tiering to lakehouse table storage Currently to Apache Paimon (Apache Iceberg coming soon). Client-side abstractions for unifying low-latency and historical table storage. Fig 2. Fluss logical model This post uses terminology from A Conceptual Model for Storage Unification , including terms such as internal tiering, shared tiering, materialization, and client/server-side stitching. This post also uses the term “real-time” mostly to distinguish between lower-latency hot data and colder, historical data. Last year I did a 3-part deep dive into Apache Paimon (a lakehouse table format), which was born as a table storage engine for Flink (originally named Flink Table Store). Paimon advertises itself today as “ A lake format that enables building a Realtime Lakehouse Architecture ”. Now Apache Fluss is being developed, also with the aim of being the table storage engine for Flink. Both projects (Paimon and Fluss) offer append-only tables and primary key tables with changelogs. So why do we need another table storage solution for Flink? The answer is a combination of efficiency and latency. Paimon was specifically designed to be good at streaming ingestion (compared to Iceberg/Delta), however, it is still relatively slow and costly for real-time data. Paimon is also not particularly efficient at generating changelogs, which is one of Flink’s primary use cases. This is not a criticism of Paimon, only a reality of building decentralized table stores directly on object storage. Fluss in many ways is the realization that an object-store-only table format is not enough for real-time data. Apache Fluss is a table storage service initially designed to replace or sit largely in front of Paimon, offering lower latency table storage and changelogs. In the background, Fluss offloads data to Paimon. Fluss can also rely on its own internal tiering without any offloading to Paimon at all. Having two tiering mechanisms can sometimes be confusing, but we’ll dig into that later. Fig 3. Fluss and Paimon forming real-time/historical table storage for Flink With the introduction of a fast tier (real-time) and a larger slower tier (historical), Flink needs a way to stitch these data stores together. Fluss provides client-side modules that provide a simple API to Flink, but which under the hood, do the work of figuring out where the cut-off exists between real-time and historical data, and stitching them together. One of Flink’s main roles is that of a changelog engine (based on changes to a materialized view). A stateful Flink job consumes one or more input streams, maintains a private materialized view (MV), and emits changes to that materialized view as a changelog (such as a Kafka topic). Fig 4. Flink as a changelog engine However, the state maintained by Flink can grow large, placing pressure on Flink state management, such as its checkpointing and recovery. A Flink job can become cumbersome and unreliable if its state grows too large. Flink 2.0 has introduced disaggregated state storage (also contributed by Alibaba) which aims to solve or at least mitigate the large state problem by offloading the state to an object store. In this way Flink 2.0 should be able to better support large state Flink jobs. In the VLDB paper Disaggregated State Management in Apache Flink® 2.0 , the authors stated that “ we observe up to 94% reduction in checkpoint duration, up to 49× faster recovery after failures or a rescaling operation, and up to 50% cost savings. ” Paimon and Fluss offer a different approach to the large state problem. Instead of only offloading state, they expose the materialized data itself as a shared table that can be accessed by other jobs. This turns what was previously private job state into a public resource, enabling new patterns such as lookup joins. Fig 5. Flink table storage project evolution Paimon was the first table storage engine for Flink. It provides append-only tables, primary key tables and changelogs for primary key tables. If you are interested in learning about Paimon internals then I did a pretty detailed deep dive and even formally verified the protocol . While Paimon is no doubt one of the best table formats for streaming ingest, it still has limits. One of the main limitations is how it supports changelogs, which is a concern of the Paimon writer (or compactor). Maintaining a changelog may require lookups against the target Paimon table (slow) and caching results (placing memory pressure on Flink). Alternatives include generating changelogs based on full compactions, which add latency, can combine changes (losing some change events) and significantly impact compaction performance. In short, Paimon didn’t nail the changelog engine job and remains higher latency than regular databases and event streams. Fluss provides these very same primitives, append-only table, primary key table and changelog, but optimized for lower latency and better efficiency than Paimon (for small real-time writes). Fluss stores the primary key table across a set of tablet servers with RocksDB as the on-server storage engine. Fluss solves Paimon’s changelog issues by efficiently calculating the changes based on existing data stored in RocksDB. This is more efficient than Paimon’s lookups and higher fidelity (and more efficient) than Paimon’s compaction based changelog generation. Whether Fluss is a better changelog (and state management) engine than Flink 2.0 with disaggregated state storage remains to be seen, but one really nice thing about a Fluss Primary Key table (and Paimon PK table for that matter) is that it turns the formerly private MV state of a Flink job into a shared resource for other Flink jobs. Providing support for lookup joins where before another Flink job would need to consume the changelog. Fig 6. Fluss PK Table offloads MV/changelog state management from Flink and provides a shared data primitive for other Flink jobs, with lookup (lookup join) support Append-only tables are based on an adapted copy the Apache Kafka log replication and controller code. Where Fluss diverges is that Fluss is a table storage engine, it exposes a table API, whereas Kafka gives you untyped byte streams where schemas are optional. Kafka is extremely permissive and unopinionated about the data it replicates. One of the key features missing from Kafka if you want to use it as an append-only table storage engine, is being able to selectively read a subset of the columns or a subset of the rows as you can with a regular database. Fluss adds this capability by enforcing tabular schemas and serializing records in a columnar format (Arrow IPC). This allows clients to include projections (and in the future, filters too) in their read requests, which get pushed down to the Fluss storage layer. With enforced tabular schemas and columnar storage, Kafka log replication can be made into a simple append-only table storage solution. Fig 7. The Fluss client serializes record batches into columnar Arrow IPC batches which are appended to segment files by the tablet servers. On the other end, the client that reads the batches converts the columnar Arrow into records again. While Fluss storage servers can pushdown projections to the file system reading level, this doesn’t apply to tiered segment data which would make column pruning on small batches prohibitively expensive. As we’ll see in this deep dive, Fluss doesn’t just build on Kafka for append-only tables (log tables) but also for the changelogs of primary key tables and as the durability mechanism for PK Tables. With the addition of Fluss as a lower latency table storage engine, we now have the problem of stitching together real-time and historical data. As I described in my Conceptual Model for Storage Unification , it comes down to two main points: API abstraction . Stitching together the different physical storage into one logical model. Physical storage management (tiering, materialization, lifecycle management). Fig 8. Fluss real-time and historical data Fluss places the stitching and conversion logic client-side, with interactions with the Fluss coordinators to discover the necessary metadata. The physical storage management is split up between a trifecta of: Fluss storage servers (known as Tablet servers) Fluss coordinators (based on the ZooKeeper Kafka controller) Fluss clients (where conversion logic is housed) Most storage management involves tiering and backup: Internal tiering is performed by storage servers themselves, much like tiered storage in Kafka. Internal tiering allows for the size of Fluss-cluster-hosted log tables and PK table changelogs to exceed the storage drive capacity of the storage servers. RocksDB state is not tiered and must fit on disk. RocksDB snapshots are taken periodically and stored in object storage, for both recovery and a source of historical reads for clients (but this is not tiering). Lakehouse tiering is run as a Flink job, using Fluss libraries for reading from Fluss storage and writing to Paimon. Fluss coordinators are responsible for managing tiering state and assigning tiering work to tiering jobs. Schema evolution, a critical part of storage management, is on the roadmap, so Fluss may not be ready for most production use cases quite yet. With that introduction, let’s dive into the internals to see how Fluss works. We’ll start by focusing on the Fluss cluster core architecture and then expand to include the lakehouse integration. Fluss has three main components: Tablet servers , which form the real-time storage component. Coordinator servers , which are similar to KRaft in Kafka, storing not only general metadata but acting as a coordination layer. Fluss clients, which present a read/write API to Flink, and do some of the unification work to meld historical and real-time data. The clients interact with both the Fluss coordinators and Fluss tablet servers, as well as reading directly from object storage. Fig 9. Fluss core architecture (without lakehouse or Flink) A Fluss Log Table is divided into two logical levels that define how data is sharded: At the top level, the table is divided into partitions , which are logical groupings of data by partition columns, similar to partitions in Paimon (and other table formats). A partition column could be a date column, or country etc. Within each partition, data is further subdivided into table buckets , which are the units of parallelism for writes and reads; each table bucket is an independent append-only stream and corresponds to a Flink source split (discussed later). This is also how Paimon divides the data within a partition, aligning the two logical models closely. Fig 10. Log table sharding scheme. Each table bucket is physically stored as a replicated log tablet by the Fluss cluster . A log tablet is the equivalent of a Kafka topic partition, and its code is based on an adapted copy of Kafka and its replication protocol. When a Fluss client appends to a Log Table, it must identify the right table partition and table bucket for any given record. Table partition selection is based on the partition column(s), but bucket selection is based on a similar approach to Kafka producers, using schemes such as round-robin, sticky or hashes of a bucket key. Fig 11. Table partition and table bucket selection when writing. Fluss clients write to table buckets according to the partition columns(s) to choose the table partition, then a bucket scheme, such as round-robin, sticky, hash-based (like Kafka producers choosing partitions). Each log tablet is a replicated append-only log of records, built on the Kafka replication protocol. It is the equivalent of a Kafka topic partition. So by default, each log tablet has three replicas, with one leader and two followers. Fig 12. Log Tablet replication is based on Kafka partition replication. It has the concept of the high watermark, and the ISR though the recent work of hardening the protocol against simultaneous correlated failures is not included (as it is based on the old ZooKeeper controller, not KRaft). Fluss optionally stores log table batches in a columnar format to allow for projection pushdown to the filesystem level, as well as obtaining the other benefits of Arrow. Fluss clients accumulate records into batches and then serialize each batch into Arrow vectors, using Arrow IPC . Log tablet replicas append these Arrow IPC record batches to log segment files on disk (as-is). These Arrow record batches are self-describing, with metadata to allow the file reader to be able to read only the requested columns from disk (when a projection is included in a fetch request). Projection pushdown consists of a client including a column projection in its fetch requests, and that projection getting pushed all the way down to the file storage level, that prunes unwanted columns while reading data from disk. This avoids network IO, but may come with additional storage IO overhead if columns of the projection are widely fragmented across a file. Storing log table data as a sequence of concatenated Arrow IPC record batches is quite different to using a single Parquet file as a segment file. A segment file with 100 concatenated Arrow IPC record batches stores each batch as its own self-contained columnar block, so reading a single column across the file requires touching every batch’s metadata and buffers, whereas a Parquet file lays out columns contiguously across the entire file, allowing direct, bulk column reads with projection pushdown. This adds some file IO overhead compared to Parquet and makes pruning columns in tiered log segments impractical and expensive. But serializing batches into Parquet files on the client is also not a great choice, so this approach of Arrow IPC files is a middleground. It may be possible in the future to use compaction to rewrite segment files into Parquet for more optimal columnar access of tiered segment data. On the consumption side, Fluss clients convert the Arrow vectors back into rows. This columnar storage is entirely abstracted away from application code, which is record (row) based. Each log record has a change type, which can be used by Flink and Paimon in streaming jobs/streaming ingestion: +I, Insert +U, Update After -U, Update Before For Log Tables, the Fluss client assigns each record the +A change type. The remaining change types are used for the changelogs of PK tables which is explained in the PK Table section. Fluss has two types of tiering: Internal Tiering (akin to traditional tiered storage), which is covered in this section. Lakehouse Tiering , which is covered in the next major section, Fluss Lakehouse Architecture. Tablet servers internally tier log tablet segment files to object storage, which matches the general approach of Kafka. The difference is that when a Fluss client fetches from an offset that is tiered, the log tablet returns only the metadata of a set of tiered log segments, so the Fluss client can download those segments itself. This offloads load from Fluss servers during catch-up reads. Fig 13. Log tablet with internal segment tiering and replication. Fluss made the decision to place some of the logic on the client for melding local data on tablet server disks with internally tiered data on object storage. This diverges from the Kafka API which does not support this, thus placing the burden of downloading tiered segments on the Kafka brokers. The actual tiering work is performed as follows. Each tablet server has a RemoteLogManager which is responsible for tiering segments to remote storage via log tiering tasks. This RemoteLogManager can only trigger tiering tasks for log tablets that have their leader replicas on this server. Each tiering task works as follows: A task is scoped to a single log tablet, and identifies local log segment files to upload (and remote ones to expire based on TTL). It uploads the target segment files to object storage. It commits them by: Creating a new manifest file with the metadata of all current remote segments. Writes the manifest file to object storage. Sends a CommitRemoteLogManifestRequest to the coordinator, which contains the path to the manifest file in remote storage (the coordinator does not store the manifest itself). Once committed, expired segment files are deleted in object storage. Asynchronously, the coordinator notifies the log tablet replica via a NotifyRemoteLogOffsetsRequest so the replica knows: The offset range that is tiered (so it knows when to serve tiered metadata to the client, and when to read from disk). What local segments can be deleted. As I mentioned earlier, because clients download tiered segment files, the network IO benefits of columnar storage are limited to the hottest data stored on Fluss tablet servers. Even if a client doing a full table scan only needs one column, it must still download entire log segment files. There is no way around this except to use a different columnar storage format than Arrow IPC with N record batches per segment file. It’s worth understanding a little about Flink’s Datastream Source API, in order to understand how Flink reads from Fluss. Splits : Flink distributes the work of reading into splits which are independent chunks of input (e.g. a file, topic partition). Split enumeration : A split enumerator runs on the JobManager and discovers source inputs, generating corresponding splits, and assigning them to reader tasks. Readers : Each TaskManager runs a source reader, which reads the sources described by its assigned splits and emits records to downstream tasks. This design cleanly separates discovery & coordination (enumerator) from data reading (readers), while keeping splits small and resumable for fault tolerance. When Flink uses Fluss as a source, a Fluss split enumerator runs on the JobManager to discover and assign splits (which describe table buckets). Each TaskManager hosts a source reader, which uses a split reader to fetch records from its assigned table bucket splits and emit them downstream. Fig 14. Fluss integration of Log Table via Flink Source API (without lakehouse integration). In this way, a Log Table source parallelizes the reading of all table buckets of the table, emitting the records to the next operators in the DAG. While log tablets are built on an adapted version of Kafka replication, there are some notable differences: Fluss uses two levels of sharding : Table partitions, via a partition column(s). Multiple table buckets per partition. Obligatory tabular schemas : A Fluss table must have a flat table schema with primitive types (structs, arrays, maps on the roadmap). Columnar storage: Allowing for projection pushdown (which also depends on a schema). Tiered storage (internal tiering) : Clients download tiered segments (Tablet servers only serve tiered segment metadata to clients). Fluss has no automatic consumer group protocol . This role is performed by Flink assigning splits to readers. A primary key table is also organized into two logical levels of partitions and table buckets. Clients write data to specific partitions and buckets as described in the log tablet section. However, the Primary Key table has a different API as it is a mutable table: Writes: Upserts and Deletes to the table via a PutKV API. Lookups and Prefix Lookups against the table. Changelog scans (which can involve a hybrid read of KV snapshot files plus changelog, discussed later). Each table bucket of a PK table is backed by a KV Tablet, which emits changes to a child log tablet. KV Tablet state is composed of: A RocksDB table for storing the keyed table state. A Log Tablet for storing the changelog. This inner log tablet also acts as a write-ahead log (WAL), as described in the Writing to a KV Tablet subsection. Fig 15. A KV tablet and its replicated child log tablet for the changelog. Unlike log tablets, KV tablets do not store data in Arrow format as data is stored in RocksDB. However, the child log tablet uses Arrow as normal. There are a few other notable differences to Log Tablets in how data is written, tiered and read, which is covered in the next subsections. The PutKV API accepts a KV batch which contains a set of key-value records. When the value is not null, Fluss treats this as an upsert, and when the value is null, it treats it as a delete. The KV tablet performs the write as follows: For each record in the batch: Perform a read against the RocksDB table to determine the change to be emitted to the changelog. A change record could be: a DELETE record with the old row, if the write contains a null value. an UPDATE_BEFORE record with the original record and an UPDATE_AFTER record with the new record, if a record exists and the write has a non-null value. an INSERT with the new record, if no existing record exists. Buffer both the new changelog records and the RocksDB write in memory for now. Append all buffered changelog records (as an Arrow IPC record batch) to the child log tablet and wait for the batch to get committed (based on Kafka replication protocol). Once the change records are committed, perform the buffered RocksDB writes. The new records overwrite the existing records by default, but read about merge engines and partial updates in the next subsection. Durability : A log tablet is like a Kafka partition, it is replicated across multiple servers, with one leader and multiple followers. The KV tablet itself is unreplicated, though it does regularly upload RocksDB snapshot files to object storage. Therefore, the changelog acts as a replicated write-ahead log (WAL) for the KV tablet. If the server disk were to die, the state could be recovered by downloading the latest KV tablet snapshot file and replaying the changelog from the corresponding next offset (the last offset is stored with the snapshot). Interestingly, the KV tablet leader has no followers, and moves with the log tablet leader. When a leader election occurs in the log tablet, the KV tablet leader changes with it. The new KV tablet leader (on the tablet server of the new log tablet leader) must download the latest RocksDB snapshot file and replay the changelog to recover the state of the former KV leader. This means that large RocksDB state could be an issue for availability due to the large amount of state needing to be downloaded and replayed on each leader election. This design may change in the future. By default, PK tables have no merge engine. The new row overwrites the old row (via the DefaultRowMerger class) as described in the last subsection. But Fluss supports using the merge types FIRST_ROW and VERSIONED in PK tables. Each merge operation has an old row and new row (either of which could be null). Merge types: None: New row replaces old row. FIRST_ROW: Keep the old row, unless is it null, then take the new row. VERSIONED: Take whichever row has the highest version (new row version is supplied by the client). The Flink source for Fluss can use FIRST_ROW, but VERSIONED doesn’t seem to be used anywhere yet. Partial update : An update does not need to include all columns, allowing for partial updates. Partial updates cannot be combined with merge types. If a write only contains a subset of the columns, the PartialUpdateRowMerger class is used instead of the DefaultRowMerger class. It goes column by column, and takes the column from the new row, if it exists, else it takes it from the old row, to create a new merged row. The keyed state in RocksDB cannot be tiered, it must fit entirely on disk. Snapshots are made and stored in object storage, but this is not tiering but for backup/recovery and historical reads. Therefore, the key cardinality should not be excessive. The changelog is a log tablet, and is tiered as described in the log table tiering subsection. Fig 16. KV Tablet (and child Log Tablet) read, write, tiering and replication. A Fluss client can send lookup and prefix lookup requests to the KV tablet leader, which get translated into RocksDB lookups and prefix lookups. It is also possible to scan the changelog, and there are different options from the starting offset here: Earliest offset : Read the entire change stream from start to finish, assuming infinite retention of the log tablet. Latest offset : Read only the latest changes from the log tablet (but will miss historical data). Full : Bootstrap from a table RocksDB snapshot then switch over to reading from the log tablet. When using the full mode, the process works as follows: The Fluss client contacts the coordinator server to find out the KV snapshot files, along with the changelog offset they correspond to. Fluss client downloads the RocksDB snapshot files and initializes a RocksDB instance based on the files. Fluss client iterates over the records of the RocksDB table, treating each record as an insert (+I) row kind. Fluss client switches to reading from the log tablet directly, starting from the next offset after the snapshot. This follows the same logic as with a log tablet, where the client may not receive actual batches, but metadata of tiered log segments. Of course, this is usually within the Flink source API architecture of split enumeration and split readers. Fig 17. Fluss integration to the Flink Source API. The difference between log table and primary key table source are highlighted in green. While Flink provides the HybridSource abstraction that allows for reading from a bounded source until completion then switching to an unbounded source, Fluss has chosen to implement this within the Split abstraction itself: For table buckets that are PK-based, the split enumerator also requests the metadata of the KV snapshots from the coordinator. It creates hybrid splits which contain the snapshot and log tablet metadata. For hybrid splits, each split reader first loads the bounded snapshot into a RocksDB instance and processes the records, then switches to the unbounded log tablet (the changelog). Schema evolution is listed under the roadmap . Fluss coordinators play a key role in the following functions: Regular cluster metadata Internal tiering metadata and coordination Lakehouse tiering coordination (see next section) Serving metadata directly to clients (for client-side stitching) Next we’ll see how the Fluss core architecture is extended to include the lakehouse. Before we begin, let’s define some terms to make the discussion more precise. These are terms that only exist in this post to make explaining things easier (the Fluss project does not define these abstractions): Logical table (log table, primary key table). Partitioned and bucketed. Fluss table . Served by a Fluss cluster. Stored on disk and internally tiered segments/snapshot files. Lakehouse table . A Paimon table (Iceberg coming soon), fed by Lakehouse tiering. Fig 18. Logical table maps onto a Fluss Table (hosted by Fluss cluster) and optionally, a Lakehouse Table Fluss is a set of client-side and server-side components, with lakehouse integration being predominantly client-side. We can break up lakehouse integration into the following pieces (which are somewhat interrelated): Lakehouse tiering : Copying from Fluss tables to Lakehouse tables. Storage unification : Unifying historical lakehouse data with real-time Fluss tablet server data A logical table could map to only a Fluss table, or it could map to a combination of a Fluss table and a Lakehouse table. It is possible for there to be overlap (and therefore duplication) of Fluss and Lakehouse table. As with internal tiering, only Log Tables and the changelogs of Primary Key Tables can be tiered to a lakehouse. Paimon itself converts the changelog stream back into a primary key table. Right now Apache Paimon is the main open table format (OTF) that is supported, due to its superior stream ingestion support, but Apache Iceberg integration is on the way. Lakehouse tiering is driven by one or more Flink jobs that use client-side Fluss libraries to: Learn of what tables must be tiered (via the Fluss coordinators) Do the tiering (read from Fluss table, write to Lakehouse table) Notify the coordinators of tiering task completion or failure, and metadata regarding lakehouse snapshot metadata and how it maps to Fluss table offset metadata. Fig 19. Flink-based lakehouse tiering components. A table tiering task is initiated by a lakehouse tiering process (run in Flink) sending a heartbeat to the coordinator, telling it that it needs work. The coordinator will provide it a table that needs tiering, which includes the metadata about the current lakehouse snapshot including the log offset of every table bucket that the lakehouse snapshot corresponds to. The tiering task will start reading from each table bucket from these offsets. Fig 20. Internal topology lakehouse tiering Flink job. The tiering read process reads (log scan) from the Fluss table and is essentially the same as described in the Log Table and Primary Key Table sections of the Fluss Core Architecture section. For log tables, the table buckets are assigned as splits to the various source readers, which use the Fluss client to read the log tablet of each table bucket. Under the hood, the Fluss client fetches from tablet servers (the leader replica of each log tablet), which may return data payloads or tiered log segment metadata to the client. For primary key tables, the tiering job scans the changelog, generating hybrid splits and assigning them to the readers. The Fluss client in each reader will start by downloading the KV snapshot files (RocksDB) and iterate over the RocksDB table, and then will switch to fetching from the changelog log tablet. The tiering write process involves writing the records as data files to the lakehouse (using Paimon/Iceberg libraries). The specifics are not important here, but the write phase requires an atomic commit, and so newly written but not committed files are not part of the table yet. The tiering commit process involves the Paimon commit but also updates the Fluss coordinator with the committed lakehouse snapshot metadata and the last tiered offset of each table bucket of the Fluss table. Basically, we need coordination to ensure that tiering doesn’t skip or duplicate data, and that clients can know the switch over point from lakehouse to Fluss cluster. The coordinators in turn notify the tablet servers of the lakehouse offset of each table bucket, which is a key part of the storage unification story, as we’ll discuss soon. The Flink topology itself is pretty standard for writing to a lakehouse, with multiple reader tasks emitting records for multiple Paimon writer tasks, with a single Paimon committer task that commits the writes serially to avoid commit conflicts. There is one writer per Paimon bucket, and it seems plausible that the Paimon table partitioning and bucketing will match Fluss (though I didn’t get around to confirming that). Evolving Fluss and Paimon tables in unison will be required, but schema evolution is not implemented yet. Each logical table has a simple state machine that governs when it can be tiered. Fig 21. Table tiering job state transitions, managed by Fluss coordinators. The key to storage unification in Fluss is the compatibility of the different storage formats and the work of the client to stitch data from different storage and different storage formats together transparently under a simple API. This job of stitching the different storage together is shared across the Fluss client and Fluss-Flink module. Fig 22. Fluss uses client-side stitching at two layers of the client-side abstractions. So far I’ve avoided tackling this question. Fluss makes for an interesting case study. It uses two types of tiering: Tiering of Fluss storage (log segments) to object storage, performed by tablet servers. This is classic internal tiering. Tiering of Fluss storage to the lakehouse (Paimon), performed by Flink using Fluss client-side modules and coordinated by Fluss Coordinators. In my first pass I equated lakehouse tiering to shared tiering as described in my post A Conceptual Model for Storage Unification . In that post, I expressed reservations about the risks and complexity of shared tiering in general. My reservation comes from the complexity of using a secondary storage format to store the majority of the primary system’s data and the problems of performance due to not being able to optimize secondary storage for both primary and secondary systems. But I soon realized that Fluss lakehouse tiering is another form of internal tiering as both storage tiers serve the same analytical workload (and the same compute engine). A Fluss cluster is akin to a durable distributed cache in front of Paimon, where Flink reads/writes hot data from/to this durable distributed cache and reads cold data from Paimon. Together, both storage types form the primary storage. The Paimon table only needs to support analytical workloads and thus can make full use of the various table organization options in Paimon without penalizing another system. Additionally, the logical data model of the Fluss API is extremely close to both Flink and Paimon (by design), which reduces the costs and risk associated with the conversion of formats. Fig 22. Fluss cluster storage and Paimon storage form the primary storage for the primary system (Flink). This is very different to a Kafka architecture that might tier to a lakehouse format. Kafka serves event streams largely in event-driven architectures and Paimon serves as an analytics table. The two workloads are completely different (sequential vs analytical). Having Kafka place the majority of its data in a lakehouse table would present many of the issues of shared tiering discussed in my storage unification blog post, namely conversion risk and performance constraints due to using the same storage for completely different access patterns. Likewise, Kafka is unopinionated regarding payloads, with many being schemaless, or using any one of Avro, Protobuf, JSON and JSONSchema. Payloads can be arbitrarily nested, in any of the above formats complicating the bidirectional conversions between lakehouse formats and Kafka payloads. As I noted in the last subsection, lakehouse tiering is a form of internal tiering, as the lakehouse tier serves the same workload and compute engine. Of course, the Paimon table could be read by other systems too, so there is still room for some conflicting requirements around data organization, but far less between a purely sequential workload (like Kafka) and an analytical workload. However, curiously, Fluss now has two types of internal tiering: Tiering of log segment files (as-is). This is the direct-access form of tiering discussed in my conceptual model post. Tiering of log data (in its logical form), which is of the API-access form of tiering. The question is why does Fluss need both? And why are they not lifecycle linked? Fluss table storage is expired based on a TTL and each lakehouse format provides TTL functionality too. But data is not expired in Fluss storage once it has been copied to the lakehouse. Here are some thoughts: First of all, I am sure linking the lifecycles of Fluss table and lakehouse table data will eventually be implemented, as it looks pretty trivial. It might be more reliable to keep internal log segment tiering, despite also using lakehouse tiering, should the lakehouse become unavailable for a period. For example, the write availability of Fluss would be tightly linked to the availability of the lakehouse catalog without internal tiering acting as an efficient spill-over safety mechanism. Without lifecycle-linked tiers, lakehouse tiering actually has more in common with materialization (data copy not move). This is not a bad thing, there are cases where it might be advantageous to keep the lifecycle of tiered log segments separate from that of the lakehouse. For example, some Fluss clients may wish to consume Log Tables in a purely sequential fashion and might benefit from reading tiered log segments rather than a Paimon table. Fluss also places Kafka API support on the roadmap, which makes maintaining internal tiering and lakehouse tiering as separately lifecycle managed processes compelling (as it avoids the issues of conversion and performance optimization constraints). Kafka clients could be served from internally tiered log segments and Flink could continue to merge both Fluss table and lakehouse table data, thereby avoiding the pitfalls of shared tiering (as the lakehouse still only must support one master). This is another case where lakehouse tiering starts looking more like materialization when using Fluss as a Kafka compatible system. It can be confusing having two types of tiering, but it also allows for users to configure Fluss to support different workloads. If Fluss is only a lakehouse real-time layer, then it might make sense to only use lakehouse tiering. However, if Fluss needs to support different access patterns for the same data, then it makes sense to use both (using the materialization approach). Apache Fluss can be seen as a realization that Apache Paimon, while well-suited to streaming ingestion and materialized views, is not sufficient as the sole table storage engine for Flink. Paimon remains a strong option for large-scale, object-store-backed tables, but it falls short when low-latency streaming processing requires efficient changelogs and high throughput, small writes to table storage. Fluss is designed to fill this gap. It provides the low-latency tier with append-only and primary key tables, supports efficient changelog generation, and integrates with Paimon through tiering. Client-side modules in Flink then stitch these tiers together, giving a unified view of real-time and historical data. Fluss aims to broaden its support to include other analytics engines such as Apache Spark and other table formats such as Iceberg, making it a more generic real-time layer for lakehouses. It should probably eventually be seen as an extension to Paimon (et al) more than a table storage engine made specially for Flink. What stands out about Fluss is how it shifts between tiering and materialization depending on its role. As a real-time layer in front of a lakehouse, it functions more like a tiering system, since both Fluss cluster and the lakehouse serve the same analytical workload. If eventually it extends to support the Kafka API as an event streaming system, it would resemble a materialization approach instead. Fluss still faces several adoption challenges. Schema evolution is not yet supported, and lifecycle management remains limited to simple TTL-based policies rather than being tied to lakehouse tiering progress. Its replication-based design also inherits the same networking cost concerns that Kafka faces in cloud environments. Flink 2.0 disaggregated state storage solves the large state problem without networking costs, but the state remains private to the Flink job. Fluss has placed direct-to-object-storage writes on its roadmap and so should eventually close this problem for workloads with high networking costs. Finally, given that Fluss copies heavily from Kafka for its log tablet storage, it raises questions for the Apache Kafka community as well. Features such as columnar storage, projection pushdown, and stronger integration of schemas are all central to how Fluss turns a (sharded) log into an append-only table. While Kafka has traditionally remained unopinionated about payloads, there are benefits to adding schema-aware storage. It may be worth considering whether some of these ideas have a place in Kafka’s future too. Low-latency table storage Append-only tables known as Log Tables. Keyed, mutable tables, known as Primary Key Tables (PK tables), that emit changelog streams. Tiering to lakehouse table storage Currently to Apache Paimon (Apache Iceberg coming soon). Client-side abstractions for unifying low-latency and historical table storage. API abstraction . Stitching together the different physical storage into one logical model. Physical storage management (tiering, materialization, lifecycle management). Fluss storage servers (known as Tablet servers) Fluss coordinators (based on the ZooKeeper Kafka controller) Fluss clients (where conversion logic is housed) Internal tiering is performed by storage servers themselves, much like tiered storage in Kafka. Internal tiering allows for the size of Fluss-cluster-hosted log tables and PK table changelogs to exceed the storage drive capacity of the storage servers. RocksDB state is not tiered and must fit on disk. RocksDB snapshots are taken periodically and stored in object storage, for both recovery and a source of historical reads for clients (but this is not tiering). Lakehouse tiering is run as a Flink job, using Fluss libraries for reading from Fluss storage and writing to Paimon. Fluss coordinators are responsible for managing tiering state and assigning tiering work to tiering jobs. Tablet servers , which form the real-time storage component. Coordinator servers , which are similar to KRaft in Kafka, storing not only general metadata but acting as a coordination layer. At the top level, the table is divided into partitions , which are logical groupings of data by partition columns, similar to partitions in Paimon (and other table formats). A partition column could be a date column, or country etc. Within each partition, data is further subdivided into table buckets , which are the units of parallelism for writes and reads; each table bucket is an independent append-only stream and corresponds to a Flink source split (discussed later). This is also how Paimon divides the data within a partition, aligning the two logical models closely. +I, Insert +U, Update After -U, Update Before Internal Tiering (akin to traditional tiered storage), which is covered in this section. Lakehouse Tiering , which is covered in the next major section, Fluss Lakehouse Architecture. A task is scoped to a single log tablet, and identifies local log segment files to upload (and remote ones to expire based on TTL). It uploads the target segment files to object storage. It commits them by: Creating a new manifest file with the metadata of all current remote segments. Writes the manifest file to object storage. Sends a CommitRemoteLogManifestRequest to the coordinator, which contains the path to the manifest file in remote storage (the coordinator does not store the manifest itself). Once committed, expired segment files are deleted in object storage. Asynchronously, the coordinator notifies the log tablet replica via a NotifyRemoteLogOffsetsRequest so the replica knows: The offset range that is tiered (so it knows when to serve tiered metadata to the client, and when to read from disk). What local segments can be deleted. Splits : Flink distributes the work of reading into splits which are independent chunks of input (e.g. a file, topic partition). Split enumeration : A split enumerator runs on the JobManager and discovers source inputs, generating corresponding splits, and assigning them to reader tasks. Readers : Each TaskManager runs a source reader, which reads the sources described by its assigned splits and emits records to downstream tasks. Fluss uses two levels of sharding : Table partitions, via a partition column(s). Multiple table buckets per partition. Obligatory tabular schemas : A Fluss table must have a flat table schema with primitive types (structs, arrays, maps on the roadmap). Columnar storage: Allowing for projection pushdown (which also depends on a schema). Tiered storage (internal tiering) : Clients download tiered segments (Tablet servers only serve tiered segment metadata to clients). Fluss has no automatic consumer group protocol . This role is performed by Flink assigning splits to readers. Writes: Upserts and Deletes to the table via a PutKV API. Lookups and Prefix Lookups against the table. Changelog scans (which can involve a hybrid read of KV snapshot files plus changelog, discussed later). A RocksDB table for storing the keyed table state. A Log Tablet for storing the changelog. This inner log tablet also acts as a write-ahead log (WAL), as described in the Writing to a KV Tablet subsection. For each record in the batch: Perform a read against the RocksDB table to determine the change to be emitted to the changelog. A change record could be: a DELETE record with the old row, if the write contains a null value. an UPDATE_BEFORE record with the original record and an UPDATE_AFTER record with the new record, if a record exists and the write has a non-null value. an INSERT with the new record, if no existing record exists. Buffer both the new changelog records and the RocksDB write in memory for now. Append all buffered changelog records (as an Arrow IPC record batch) to the child log tablet and wait for the batch to get committed (based on Kafka replication protocol). Once the change records are committed, perform the buffered RocksDB writes. The new records overwrite the existing records by default, but read about merge engines and partial updates in the next subsection. None: New row replaces old row. FIRST_ROW: Keep the old row, unless is it null, then take the new row. VERSIONED: Take whichever row has the highest version (new row version is supplied by the client). Earliest offset : Read the entire change stream from start to finish, assuming infinite retention of the log tablet. Latest offset : Read only the latest changes from the log tablet (but will miss historical data). Full : Bootstrap from a table RocksDB snapshot then switch over to reading from the log tablet. The Fluss client contacts the coordinator server to find out the KV snapshot files, along with the changelog offset they correspond to. Fluss client downloads the RocksDB snapshot files and initializes a RocksDB instance based on the files. Fluss client iterates over the records of the RocksDB table, treating each record as an insert (+I) row kind. Fluss client switches to reading from the log tablet directly, starting from the next offset after the snapshot. This follows the same logic as with a log tablet, where the client may not receive actual batches, but metadata of tiered log segments. For table buckets that are PK-based, the split enumerator also requests the metadata of the KV snapshots from the coordinator. It creates hybrid splits which contain the snapshot and log tablet metadata. For hybrid splits, each split reader first loads the bounded snapshot into a RocksDB instance and processes the records, then switches to the unbounded log tablet (the changelog). Regular cluster metadata Internal tiering metadata and coordination Lakehouse tiering coordination (see next section) Serving metadata directly to clients (for client-side stitching) Logical table (log table, primary key table). Partitioned and bucketed. Fluss table . Served by a Fluss cluster. Stored on disk and internally tiered segments/snapshot files. Lakehouse table . A Paimon table (Iceberg coming soon), fed by Lakehouse tiering. Lakehouse tiering : Copying from Fluss tables to Lakehouse tables. Storage unification : Unifying historical lakehouse data with real-time Fluss tablet server data Learn of what tables must be tiered (via the Fluss coordinators) Do the tiering (read from Fluss table, write to Lakehouse table) Notify the coordinators of tiering task completion or failure, and metadata regarding lakehouse snapshot metadata and how it maps to Fluss table offset metadata. Tiering of Fluss storage (log segments) to object storage, performed by tablet servers. This is classic internal tiering. Tiering of Fluss storage to the lakehouse (Paimon), performed by Flink using Fluss client-side modules and coordinated by Fluss Coordinators. Tiering of log segment files (as-is). This is the direct-access form of tiering discussed in my conceptual model post. Tiering of log data (in its logical form), which is of the API-access form of tiering. First of all, I am sure linking the lifecycles of Fluss table and lakehouse table data will eventually be implemented, as it looks pretty trivial. It might be more reliable to keep internal log segment tiering, despite also using lakehouse tiering, should the lakehouse become unavailable for a period. For example, the write availability of Fluss would be tightly linked to the availability of the lakehouse catalog without internal tiering acting as an efficient spill-over safety mechanism. Without lifecycle-linked tiers, lakehouse tiering actually has more in common with materialization (data copy not move). This is not a bad thing, there are cases where it might be advantageous to keep the lifecycle of tiered log segments separate from that of the lakehouse. For example, some Fluss clients may wish to consume Log Tables in a purely sequential fashion and might benefit from reading tiered log segments rather than a Paimon table. Fluss also places Kafka API support on the roadmap, which makes maintaining internal tiering and lakehouse tiering as separately lifecycle managed processes compelling (as it avoids the issues of conversion and performance optimization constraints). Kafka clients could be served from internally tiered log segments and Flink could continue to merge both Fluss table and lakehouse table data, thereby avoiding the pitfalls of shared tiering (as the lakehouse still only must support one master). This is another case where lakehouse tiering starts looking more like materialization when using Fluss as a Kafka compatible system.