Latest Posts (20 found)

Demystifying Determinism in Durable Execution

Determinism is a key concept to understand when writing code using durable execution frameworks such as Temporal, Restate, DBOS, and Resonate. If you read the docs you see that some parts of your code must be deterministic while other parts do not have to be.  This can be confusing to a developer new to these frameworks.  This post explains why determinism is important and where it is needed and where it is not. Hopefully, you’ll have a better mental model that makes things less confusing. We can break down this discussion into: Recovery through re-execution. Separation of control flow from side effects. Determinism in control flow Idempotency and duplication tolerance in side effects This post uses the term “control flow” and “side effect”, but there is no agreed upon set of terms across the frameworks. Temporal uses “workflow” and “activity” respectively. Restate uses the terms such as “handler”,  “action” and “durable step”. Each framework uses different vocabulary and have varying architectures behind them. There isn’t a single overarching concept that covers everything, but the one outlined in this post provides a simple way to think about determinism requirements in a framework agnostic way. Durable execution takes a function that performs some side effects, such as writing to a database, making an API call, sending an email etc, and makes it reliable via recovery (which in turn depends on durability). For example, a function with three side effects: Step 1, make a db call. Step 2, make an API call. Step 3, send an email. If step 2 fails (despite in situ retries) then we might leave the system in an inconsistent state (the db call was made but not the API call). In durable execution, recovery consists of executing the function again from the top, and using the results of previously run side effects if they exist. For example, we don’t just execute the db call again, we reuse the result from the first function execution and skip that step. This becomes equivalent to jumping to the first unexecuted step and resuming from there. Fig 1. A function is retried, using the results of the prior partial execution where available. So, durable execution ensures that a function can progress to completion via recovery, which is a retry of the function from the top. Resuming from where we left off involves executing the code again but using stored results where possible in order to resume from where it failed. In my Coordinated Progress model, this is the combination of a reliable trigger and progressable work . A function is a mix of executing control flow and side effects. The control flow itself may include state, and branches (if/then/else) or loops execute based on that state. The control flow decides which side effects to execute based on this looping and branching. Fig 2. Control flow and side effects In Temporal, the bad_login function would be a workflow and the block_account and send_warning_email would be activities . The workflow and activity work is separated into explicit workflow and activity tasks, possibly run on different workers. Other frameworks simply treat this as a function and wrap each side effect to make it durable. I could get into durable promises and continuations but that is a topic I will cover in a future post. So let’s look at another example. First we retrieve a customer record, then we check if we’re inside of the promo end date, if so, charge the card with a 10% discount, else charge the full amount. Finally send a receipt email. This introduces a bug that we’ll cover in the next section. Fig 3. process_order function as a mix of control flow (green) and side effects (grey) Durable execution treats the control flow differently from the side effects, as we’ll see in sections 3 and 4. Determinism is required in the control flow because durable execution re-executes code for recovery. While any stored results of side effects from prior executions are reused, the control flow is executed in full. Let’s look at an example: Fig 4. Double charge bug because of a non-deterministic if/else In the first execution, the current time is within the promo date, so the then-branch is executed, charging the card with the discount. However, on the second invocation, the current time is after the promo end date, causing the else-branch to execute, double charging the customer. Fig 5. A non-deterministic control flow causes a different branch to execute during the function retry. This is fixed by making the now() deterministic by turning it into a durable step whose result is recorded. Then the second time it is executed, it returns the same datetime (it becomes deterministic). The various SDKs provide deterministic dates, random numbers and UUIDs out of the box. Another fun example is if we make the decision based on the customer record retrieved from the database. In this variant, the decision is made based on the loyalty points the customer currently has. Do you see the problem? If the send email side effect fails, then the function is retried. However, the points value of the order was deducted from the customer in the last execution, so that in execution 2, the customer no longer has enough loyalty points! Therefore the else-branch is executed, charging their credit card! Another double payment bug. We must remember that the durable function is not an atomic transaction. It could be considered a transaction which has guarantees around making progress, but not one atomic change across systems. We can fix this new double charge bug by ensuring that the same customer record is returned on each execution. We can do that by treating the customer record retrieval as a durable step whose result will be recorded. Fig 6. Make the customer retrieval deterministic if the control flow depends on it. Re-execution of the control flow requires determinism: it must execute based on the same decision state every single time and it must also pass the same arguments to side effect code every single time. However, side effects themselves do not need to be deterministic, they only require idempotency or duplication tolerance. Durable execution re-executes the control flow as many times as is needed for the function to make progress to completion. However, it typically avoids executing the same side effects again if they were previously completed. The result of each side effect is durably stored by the framework and a replay only needs the stored result. Therefore side effects do not need to be deterministic, and often that is undesirable anyway. A db query that retrieves the current number of orders or the current address of a customer may return a different result every time. That’s a good thing, because the number of orders might change, and an address might change. If the control flow depends on the number of orders, or the current address, then we must ensure that the control flow is always returned the same answer. This is achieved by storing the result of the first execution, and using that result for every replay (making the control flow deterministic). Now to the idempotency. What if a side effect does complete, but a failure of some kind causes the result to not be stored by the framework? Well, the durable execution framework will replay the function, see no stored result and execute the side effect again. For this reason we want side effects to either be idempotent or otherwise tolerate running more than once. For example, we might decide that sending the same email again is ok. The cost of reliable idempotency might not be worth it. On the other hand, a credit card payment most definitely should be idempotent. Some frameworks make the separation of control flow from side effects explicit, namely, Temporal. In the Temporal programming model, the workflow definition is the control flow and each activity is a side effect (or some sort of non-deterministic operation). Other frameworks such as Resonate and Restate are based on functions which can call other functions which can result in a tree of function calls. Each function in this tree has a portion of control flow and side effects (either executed locally or via a call to another function). Fig 7. A tree of function calls, with control-flow in each function. The same need for determinism in the control flow is needed in each of these functions. This is guaranteed by ensuring the same inputs, and the replacement of non-deterministic operations (such as date/times, random numbers, ids, retrieved objects) with deterministic ones. Our mental model is built on separating a durable function into the control flow and the side effects. Some frameworks actually explicitly separate the two (like Temporal) while others are more focused on composable functions. The need for determinism in control flow is a by-product of recovery being based on retries of the function. If we could magically reach into the function, to the exact line to resume from, reconstructing the local state and executing from there, we wouldn’t need deterministic control flow code. But that isn’t how it works. The function is executed again from the top, and it better make the same decisions again, or else you might end up with weird behaviors, inconsistencies or even double charging your customers. The side effects absolutely can and should be non-deterministic, which is fine because they should generally only be executed once, even if the function itself is executed many times. For those failure cases where the result is not durably stored, we rely on idempotency or duplication tolerance. This is a pretty generalized model. There are a number of nuances and differences across the frameworks. Some of the examples would actually result in a non-determinism error in Temporal, due to how it records event history and expects a matching replay. The developer must learn the peculiarities of each framework. Hopefully this post provides a general overview of determinism in the context of durable execution. Recovery through re-execution. Separation of control flow from side effects. Determinism in control flow Idempotency and duplication tolerance in side effects Step 1, make a db call. Step 2, make an API call. Step 3, send an email.

0 views
Jack Vanlightly 1 weeks ago

Have your Iceberg Cubed, Not Sorted: Meet Qbeast, the OTree Spatial Index

In today’s post I want to walk through a fascinating indexing technique for data lakehouses which flips the role of the index in open table formats like Apache Iceberg and Delta Lake. We are going to turn the tables on two key points: Indexes are primarily for reads . Indexes are usually framed as read optimizations paid for by write overhead: they make read queries fast, but inserts and updates slower. That isn’t the full story as indexes also support writes such as with faster updates and deletes in primary key tables but the dominant mental model is that indexing serves reads while writes pay the bill. OTFs don’t use tree-based indexes . Open-table format indexes are data-skipping indexes scoped to data files or even blocks within data files. They are a loose collection of column statistics and Bloom filters. Qbeast , a start-up with a presence here in Barcelona where I live, is reimagining indexes for open table formats, showing that neither assumption has to be true. Let’s dive in. A few weeks ago I wrote Beyond Indexes: How Open Table Formats Optimize Query Performance which describes how the open table formats don’t use monolithic tree-based indexes as RDBMS’s do, instead they optimize performance via effective pruning which in turn is boosted by data layout that matches the most important queries. The open-table formats give us two logical levers for optimizing layout: Partitioning Together, these form what is often called clustering : the way a table physically organizes data for efficient scanning by clustering similar data together.  Partitioning is the first major clustering lever in Iceberg and Delta tables. It divides a table into logical groups based on one or more columns so that rows with the same partition key values are stored together. This creates data locality, allowing the engine to quickly identify which partitions match a query filter (e.g., WHERE EventDate = '2025-10-01') and skip the rest. That process, called partition pruning, avoids scanning irrelevant data and greatly speeds up queries.  Within partitions, we can sort the data using a sort order . We can use one or more columns (including transforms of columns) as the sort order, which determines the order of rows in data files, and even across data files after compaction work (within a given partition). The Iceberg spec allows you to specify multiple columns as a lexicographical sort order and Delta goes further by supporting Z-order. However, Spark can also compact Iceberg using Z-order (it’s just not in the spec). Let’s take an example of rows with the following x and y indexed columns: where x has the domain a-d and y has the domain 1-4 , producing 16 (x,y) pairs, such as (a, 1), (a, 2)...(d, 4). When you sort a dataset lexicographically by multiple columns, the data system arranges the rows first by x, and then by y within each x group. That works fine if most queries filter heavily on the first column, but it doesn’t take into account how the data relates across both dimensions. Two records that are close together in (x, y) space might end up far apart on file if their x values differ slightly. Fig 1. Lexicographical order of two dimensions, which follows the “sort by x then by y” order. Z-ordering improves multidimensional sorting by weaving the bits of all indexed columns together into a single scalar value. Sorting by this scalar value produces a Z-shaped curve which fills the dimensional space (hence Z-order being what is known as a space-filling curve). The result is an ordering where items that are close in N-dimensional space remain close in the 1-D key space as well. As a result, it reduces I/O for multi-column range filters and is ideal when queries commonly span multiple dimensions rather than a single dominant one. If you always query on a leading column, then lexicographical sort order is likely better. Fig 2. Z-order uses bit mixing to produce a single scalar sort key, which determines the order, which resembles a z-shaped space-filling curve. But there are some problems with this clustering strategy based on partitioning + sorting strategy: Partition granularity . The partition key must be chosen carefully: too many partitions lead to many small files, which can hurt performance instead of helping it. Imbalanced partitions . Your data may be skewed, leading to imbalanced partition sizes. Some might be very small, while others might be very large, which is inefficient and can lead to uneven performance. Changing distributions . The shape of your data may change over time, making your chosen partitioning strategy less effective over time. Drift . Your tables are constantly drifting away from the optimum clustering layout as new data arrives. Compaction is constantly working to cluster recent data. Global clustering is expensive, so clustering is usually performed on subsets of the data. What if we could use a data layout strategy that was flexible and adaptive (solving pain points 1, 2, 3) and didn’t constantly drift as new data arrived (solving pain point 4)? Enter the Qbeast and the OTree multidimensional indexing approach which came out of research of the Barcelona Supercomputing Center . Qbeast has been on my radar because one of the founders is Flavio Junqueira, a distributed systems researcher behind both Apache ZooKeeper and Apache BookKeeper (both of which have played large roles in my career). The OTree brings to open table formats a global tree index that defines the table’s structure and layout. In some ways, the OTree could be thought of as a distant relative of the clustered index in the RDBMS world as they both define the table layout. However, the OTree is a lightweight structure that does not try to organize individual rows. The OTree index approaches table layout as an adaptive spatial structure. Instead of dividing data according to fixed partition keys or grouped according sort orders, it organizes the dataset into hypercubes that subdivide automatically as the data distribution demands. Each (hyper)cube represents a region in multi-dimensional space defined by the indexed columns. Fig 3. A table indexed on three columns leads to a 3-dimensional (normalized) space (more on that later). In this figure, the original cube has subdivided into 8 subcubes. A cube divides along all indexed dimensions simultaneously, creating 2ᵈ  smaller cubes, where 𝑑 is the number of dimensions (i.e., the number of indexed columns). So for example: With 2 indexed columns, each division produces 4 subcubes (2×2 grid) With 3 indexed columns, each division produces 8 subcubes (2×2×2) With 4 indexed columns, each division produces 16 subcubes (2×2×2×2) Fig 4. A cube subdivides into 8 subcubes (in a 3-dimensional space) corresponding to three indexes columns. Using 3-dimensional space is taxing on the mind and diagrams, so I’ll use examples based on two indexed columns which leads to an easier to visualize 2-dimensional space. The number of dimensions corresponds to the number of indexed columns. If we index our products table by price and rating , then we have a two-dimensional space. Qbeast maps each row to a point in a multidimensional space by normalizing the values of the indexed columns into the 0,1 range, preserving their relative order so that nearby data in each dimension remains close together in space. Fig 5. Two dimensional space with normalized domains For example, if we index columns price and rating, a row with (price=100, rating=4.2) might map to coordinates (0.10, 0.84) in the 0,1 space (of each dimension), while another with (price=120, rating=4.3) becomes (0.12, 0.86). Because both rows are close in their normalized coordinates, they occupy nearby positions in the multidimensional space, thereby preserving the natural proximity of their original values. This is really important because the spatial locality should reflect the value locality within the data domain, else range scans won’t be very useful. This is precisely what the Z-order mapping function tries to do as well (by bit mixing). The difference is that a space-filling curve (like Z-order or Hilbert) takes multi-dimensional coordinates (x, y, z) and projects them onto a one-dimensional ordering, whereas Qbeast preserves the ordering per dimension. A cube is one subdivision of the multidimensional space. At first, all data falls into a single cube representing the full range of values (0-1 of each dimension). As new data arrives and the cube reaches a predetermined size, it generates subcubes, each covering a more specific region of the domain. This cube division continues, producing finer and finer cubes. The result is a layout that mirrors the actual distribution of the data. Skewed data that clusters around a tight set of values is located in dense regions of space, located in finer and finer cubes, while sparse regions remain coarse. Fig 6. Cubes adaptively subdivide recursively based on multidimensional spatial density. In figure 6 above, we get the following set of splits: Root cube is created The root cube divides in half by both dimensions, creating four subcubes (0, 1, 2, 3). Subcube 3 fills up and divides into subcubes (30, 31, 32, 33) Subcube 30 fills up and divides into subcubes (300, 301, 302, 303) Now it’s time to map this spatial representation to the tree. Because of how the cubes subdivide into two halves along each dimension, the cube id (such as 301) encodes its position and normalized domain bounds (along each dimension). This multidimensional space, divided up adaptively into multiple levels of subcubes, is represented by a tree. Fig 7. The OTree representation of the cubes. We can visualize the progress of a single root cube to the final set of cubes as follows. Fig 8. The OTree over time. Next let’s look at how this translates to Apache Iceberg and its data files. Up to this point we’ve been talking about cubes, multidimensional space, and trees in abstract terms. But let’s ground ourselves and see how all this maps onto an Iceberg table or Delta table. The OTree governs layout, but Iceberg/Delta remains the source of truth about the canonical set of data files and their metadata. Writers (such as a Spark job for ingest) consult the OTree but readers (such as a Spark analytics job) only read Iceberg/Delta metadata. This separation allows the index to be invisible to all engines (Spark, Flink, Trino etc), requiring no special integration. Each node of the OTree corresponds to a cube, which in turn contains one or more blocks , where each block points to a data file (such as a Parquet file). Fig 9. Each node of the OTree contains one or more blocks, where each block is a data file (such as Parquet). In this example, the root cube reached capacity with three files and split along 2 dimensions. Notice that the data does not exist only in leaf nodes, but all nodes of the tree. The deeper into the tree you go, the narrower value range across the dimensions each node represents. Any given data point may exist in any node from the root, down to the lowest leaf that covers the data point. Fig 10. The data point maps onto the nodes: root, 3, 30 and 302. A query whose filter predicates cover this point may end up reading each of these files (it depends on the column stats). As I said in my previous blog post on OTF performance, the Iceberg column statistics reflect the layout of the data. We want narrow column stats for effective pruning, which means producing a data layout with data locality. The OTree provides that method of obtaining data locality according to one or more indexed columns (the dimensions of our multidimensional space). But readers carry on using the standard column statistics and bloom filters as usual. So, the OTree index governs the table’s layout but it doesn’t replace Iceberg or Delta’s metadata or data files. The two systems coexist: The OTree index describes how the data should be organized: which regions exist, their spatial boundaries, and which data points fall into each. Iceberg/Delta’s metadata remains the authoritative catalog of what files exist and their stats. In Iceberg, the OTree index is stored as a Puffin file which is referenced in the Iceberg metadata (so the OTree is committed as part of the Iceberg commit). Each commit may result in a new version of the OTree. Fig 11. A very simplified representation of four Iceberg commits which add one Parquet file per commit. The root cube splits in the 3rd snapshot, writing to one subcube, and another subcube in snapshot 4. In DeltaLake, the OTree metadata is included within tag metadata of each operation in the Delta Log (as depicted below). Fig 12. A very simplified representation of the Delta log with four add_files operations. Each added file is mapped to a cube id (where the tree structure is encoded into the cube ids). So although the OTree introduces a tree-shaped, spatial index, the underlying Iceberg/Delta table remains standard (additional fields are added to metadata which does not break existing engines). Query engines simply ignore the OTree when they perform reads. Writers (optionally) and table maintenance jobs (obligatory) do need to know about the OTree, as we want the layout to be governed by an adaptive index rather than static partitioning logic. Ideally writers will use the OTree index so that the index covers the whole dataset (ensuring locality is maintained from the very first moment data is written to the table). However, that requires that the writer, such as Apache Spark, to use the Qbeast module when performing writes. Table maintenance jobs must use the module, in order to apply the spatial layout to the Iceberg data files. Although the OTree governs the layout of the entire table, the OTree itself is just lightweight metadata that describes the parent-child relationships (encoded in the cube ids), and for each cube: the element count and the min/max weights of each cube. I won’t go into the detail of weights, but it is an additional feature designed to enhance data distribution across the nodes. The normalized dimension bounds of each cube are established by the position of the cube in the tree, so there is no need to store that. Because of this, even a table with billions of rows can be represented by an OTree containing just a few thousand small metadata entries, typically amounting to a few megabytes in total. The tree is therefore cheap to store, fast to read, and easy to keep in memory, while still providing a view of the data’s spatial layout. It’s helpful to see all of this on a spectrum. On the left , the classic B-tree clustered index: a strict, key-ordered global tree index that dictates exactly where every row lives. While great for selective OLTP workloads, it is far too rigid and expensive when the dataset grows and the queries become broad (reading millions of rows). On the right , we have Iceberg/Delta’s approach: lightweight metadata describing the canonical set of files (without ordering), with a declared clustering strategy (partitioning and optional sort order) which the table is constantly drifting from, requiring maintenance bound that drift. In the middle sits the OTree , it is a global tree index, but without the fine-grained rigidity of the B-tree. Instead of ordering individual rows, it divides the data space into coarse, adaptive regions that subdivide and merge as the distribution demands. This keeps it incredibly light while still determining where data should live. Dense data is located in narrow cubes and sparse data in wide cubes. The layout is self-correcting as data distribution changes, avoiding imbalanced partitions. It’s fun to see the inversion of the role of the index. Using it to shape the table as it is written, so that the layout remains close to optimal, making the existing read-time optimizations of Iceberg and Delta more effective. The OTree is there behind the scenes and query engines that read from the tables have no idea that it exists. There is a lot more to Qbeast than what I’ve covered here, there are additional mechanisms for ensuring even data distribution and making sampling efficient via random weights, but that’s too detailed for this post. The takeaway for me I suppose is that there are always more innovative ways of doing things, and we’re still early in the open table format / lakehouse game. There are plenty more innovations to come at all levels, from file formats, data organization, to query engines. Indexes are primarily for reads . Indexes are usually framed as read optimizations paid for by write overhead: they make read queries fast, but inserts and updates slower. That isn’t the full story as indexes also support writes such as with faster updates and deletes in primary key tables but the dominant mental model is that indexing serves reads while writes pay the bill. OTFs don’t use tree-based indexes . Open-table format indexes are data-skipping indexes scoped to data files or even blocks within data files. They are a loose collection of column statistics and Bloom filters. Partitioning Partition granularity . The partition key must be chosen carefully: too many partitions lead to many small files, which can hurt performance instead of helping it. Imbalanced partitions . Your data may be skewed, leading to imbalanced partition sizes. Some might be very small, while others might be very large, which is inefficient and can lead to uneven performance. Changing distributions . The shape of your data may change over time, making your chosen partitioning strategy less effective over time. Drift . Your tables are constantly drifting away from the optimum clustering layout as new data arrives. Compaction is constantly working to cluster recent data. Global clustering is expensive, so clustering is usually performed on subsets of the data. With 2 indexed columns, each division produces 4 subcubes (2×2 grid) With 3 indexed columns, each division produces 8 subcubes (2×2×2) With 4 indexed columns, each division produces 16 subcubes (2×2×2×2) Root cube is created The root cube divides in half by both dimensions, creating four subcubes (0, 1, 2, 3). Subcube 3 fills up and divides into subcubes (30, 31, 32, 33) Subcube 30 fills up and divides into subcubes (300, 301, 302, 303) The OTree index describes how the data should be organized: which regions exist, their spatial boundaries, and which data points fall into each. Iceberg/Delta’s metadata remains the authoritative catalog of what files exist and their stats.

0 views
Jack Vanlightly 3 weeks ago

How Would You Like Your Iceberg Sir? Stream or Batch Ordered?

Today I want to talk about stream analytics, batch analytics and Apache Iceberg. Stream and batch analytics work differently but both can be built on top of Iceberg, but due to their differences there can be a tug-of-war over the Iceberg table itself. In this post I am going to use two real-world systems, Apache Fluss (streaming tabular storage) and Confluent Tableflow (Kafka-to-Iceberg), as a case study for these tensions between stream and batch analytics. Apache Fluss uses zero-copy tiering to Iceberg . Recent data is stored on Fluss servers (using Kafka replication protocol for high availability and durability) but is then moved to Iceberg for long-term storage. This results in one copy of the data. Confluent Kora and Tableflow uses internal topic tiering and Iceberg materialization , copying Kafka topic data to Iceberg, such that we have two copies (one in Kora, one in Iceberg). This post will explain why both have chosen different approaches and why both are totally sane, defensible decisions. First we should understand the concepts of stream-order and batch-order . A streaming Flink job typically assumes its sources come with stream-order . For example, a simple SELECT * Flink query assumes the source is (loosely) temporally ordered, as if it were a live stream. It might be historical data, such as starting at the earliest offset of a Kafka topic, but it is still loaded in a temporal order. Windows and temporal joins also depend on the source being stream-ordered to some degree, to avoid needing large/infinite window sizes which blow up the state. A Spark batch job typically hopes that the data layout of the Iceberg table is batch-ordered , say, partitioned and sorted by business values like region, customer etc), thus allowing it to efficiently prune data files that are not relevant, and to minimize costly shuffles. If Flink is just reading a Kafka topic from start to end, it’s nothing special. But we can also get fancy by reading from two data sources: one historical and one real-time. The idea is that we can unify historical data from Iceberg (or another table format) and real-time data from some kind of event stream. We call the reading from the historical source, bootstrapping . Streaming bootstrap refers to running a continuous query that reads historical data first and then seamlessly switches to live streaming input. In order to do the switch from historical to real-time source, we need to do that switch on a given offset. The notion of a “last tiered offset” is a correctness boundary that ensures that the bootstrap and the live stream blend seamlessly without duplication or gaps. This offset can be mapped to an Iceberg snapshot. Fig 1. Bootstrap a streaming Flink job from historical then switch to real-time. However, if the historical Iceberg data is laid out with a batch-order (partitioned and sorted by business values like region, customer etc) then the bootstrap portion of a SELECT * will appear completely out-of-order relative to stream-order. This breaks the expectations of the user, who wants to see data in the order it arrived (i.e., stream-order), not a seemingly random one.  We could sort the data first from batch-order back to stream-order in the Flink source before it reaches the Flink operator level, but this can get really inefficient. Fig 2. Sort batch-ordered historical data in the Flink source task. If the table has been partitioned by region and sorted by customer, but we want to sort it by the time it arrived (such as by timestamp or Kafka offset), this will require a huge amount of work and data shuffling (in a large table). The result is not only a very expensive bootstrap, but also a very slow one (afterall, we expect fast results with a streaming query). So we hit a wall: Flink wants data ordered temporally for efficient streaming bootstrap. Batch workloads want data ordered by value (e.g., columns) for effective pruning and scan efficiency. These two data layouts are orthogonal. Temporal order preserves ingest locality; value order preserves query locality. You can’t have both in a single physical layout. Fluss is a streaming tabular storage layer built for real-time analytics which can serve as the real-time data layer for lakehouse architectures. I did a comprehensive deep dive into Apache Fluss recently, diving right into the internals if you are interested. Apache Fluss takes a clear stance. It’s designed as a streaming storage layer for data lakehouses, so it optimizes Iceberg for streaming bootstrap efficiency. It does this by maintaining stream-order in the Iceberg table. Fig 3. Fluss stores real-time and historical data in stream-order. Internally, Fluss uses its own offset (akin to the Kafka offset) as the Iceberg sort order. This ensures that when Flink reads from Iceberg, it sees a temporally ordered sequence. The Flink source can literally stream data from Iceberg without a costly data shuffle.  Let’s take look at a Fluss log table. A log table can define: Optional partitioning keys (based on one or more columns). Without them, a table is one large partition. The number of buckets per partition . The bucket is the smallest logical subdivision of a Fluss partition. Optional bucketing key for hash-bucketing. Else rows are added to random buckets, or round-robin. The partitioning and buckets are both converted to an Iceberg partition spec. Fig 4. An example of the Iceberg partition spec and sort order Within each of these Iceberg partitions, the sort order is the Fluss offset. For example, we could partition by a date field, then spread the data randomly across the buckets within each partition. Fig 5. The partitions of an Iceberg table visualized. Inside Flink, the source will generate one “split” per table bucket, routing them by bucket id to split readers. Due to the offset sort order, each Parquet file should contain contiguous blocks of offsets after compaction. Therefore each split reader naturally reads Iceberg data in offset order until it switches to the Fluss servers for real-time data (also in offset order). Fig 6. Flink source bootstraps from Iceberg visualized Once the lake splits have been read, the readers start reading from the Fluss servers for real-time data. This is great for Flink streaming bootstrap (it is just scanning the data files as a cheap sequential scan). Primary key tables are similar but have additional limitations on the partitioning and bucketing keys (as they must be subsets of the primary key). A primary key, such as device_id , is not a good partition column as it’s too fine grained, leading us to use an unpartitioned table. Fig 7. Unpartitioned primary key table with 6 buckets. If we want Iceberg partitioning, we’ll need to add another column (such as a date) to the primary key and then use the date column for the partitioning key (and device_id as a bucket key for hash-bucketing) . This makes the device_id non-unique though. In short, Fluss is a streaming storage abstraction for tabular data in lakehouses and stores both real-time and historical data in stream-order. This layout is designed for streaming Flink jobs. But if you have a Spark job trying to query that same Iceberg table, pruning is almost useless as it does not use a batch-optimized layout. Fluss may well decide to support Iceberg custom partitioning and sorting (batch-order) in the future, but it will then face the same challenges of supporting streaming bootstrap from batch-ordered Iceberg. Confluent’s Tableflow (the Kafka-to-Iceberg materialization layer) took the opposite approach. It stores two copies of the data: one stream-ordered and one optionally batch-ordered. Kafka/Kora internally tiers log segments to object storage, which is a historical data source in stream-order (good for streaming bootstrap). Iceberg is a copy, which allows for stream-order or batch-order, it’s up to the customer. Custom partitioning and sort order is not yet available at the time of writing, but it’s coming. Fig 8. Tableflow continuously materializes a copy of a Kafka topic as an Iceberg table. I already wrote why I think zero-copy Iceberg tiering is a bad fit for Kafka specifically. Much also applies to Kora, which is why Tableflow is a separate distributed component from Kora brokers. So if we’re going to materialize a copy of the data for analytics, we have the freedom to allow customers to optimize their tables for their use case, which is often batch-based analytics. Fig 9. Copy 1 (original): Kora maintains stream-ordered live and historical Kafka data. Copy 2 (derived): Tableflow continuously materializes Kafka topics as Iceberg tables. If the Iceberg table is also stored in stream-order then Flink could do an Iceberg streaming bootstrap and then switch to Kafka. This is not available right now in Confluent, but it could be built. There are also improvements that could be made to historical data stored by Kora/Kafka, such as using a columnar format for log segments (something that Fluss does today). Either way, the materialization design provides the flexibility to execute a streaming bootstrap using a stream-order historical data source, allowing the customer to optimize the Iceberg table according to their needs. Batch jobs want value locality (data clustered by common predicates), aka batch-order. Streaming jobs want temporal locality (data ordered by ingestion), aka stream-order. With a single Iceberg table, once you commit to one, the other becomes inefficient. Given this constraint, we can understand the two different approaches: Fluss chose stream-order in its Iceberg tables to support stream analytics constraints and avoid a second copy of the data. That’s a valid design decision as after all, Fluss is a streaming tabular storage layer for real-time analytics that fronts the lakehouse. But it does mean giving up the ability to use Iceberg’s layout levers of partitioning and sorting to tune batch query performance. Confluent chose a stream-order in Kora and one optionally batch-ordered Iceberg copy (via Tableflow materialization), letting the customer decide the optimum Iceberg layout. That’s also a valid design decision as Confluent wants to connect systems of all kinds, be they real-time or not. Flexibility to handle diverse systems and diverse customer requirements wins out. But it does require a second copy of the data (causing higher storage costs). As the saying goes, the opposite of a good idea can be a good idea. It all depends on what you are building and what you want to prioritize. The only losing move is pretending you can have both (stream-optimized and batch-optimized workloads) in one Iceberg table without a cost. Once you factor in the compute cost of using one format for both workloads, the storage savings disappear. If you really need both, build two physical views and keep them in sync. Some related blog posts that are relevant this one: Beyond Indexes: How Open Table Formats Optimize Query Performance Why I’m not a fan of zero-copy Apache Kafka-Apache Iceberg Understanding Apache Fluss Apache Fluss uses zero-copy tiering to Iceberg . Recent data is stored on Fluss servers (using Kafka replication protocol for high availability and durability) but is then moved to Iceberg for long-term storage. This results in one copy of the data. Confluent Kora and Tableflow uses internal topic tiering and Iceberg materialization , copying Kafka topic data to Iceberg, such that we have two copies (one in Kora, one in Iceberg). Flink wants data ordered temporally for efficient streaming bootstrap. Batch workloads want data ordered by value (e.g., columns) for effective pruning and scan efficiency. Optional partitioning keys (based on one or more columns). Without them, a table is one large partition. The number of buckets per partition . The bucket is the smallest logical subdivision of a Fluss partition. Optional bucketing key for hash-bucketing. Else rows are added to random buckets, or round-robin. Fluss chose stream-order in its Iceberg tables to support stream analytics constraints and avoid a second copy of the data. That’s a valid design decision as after all, Fluss is a streaming tabular storage layer for real-time analytics that fronts the lakehouse. But it does mean giving up the ability to use Iceberg’s layout levers of partitioning and sorting to tune batch query performance. Confluent chose a stream-order in Kora and one optionally batch-ordered Iceberg copy (via Tableflow materialization), letting the customer decide the optimum Iceberg layout. That’s also a valid design decision as Confluent wants to connect systems of all kinds, be they real-time or not. Flexibility to handle diverse systems and diverse customer requirements wins out. But it does require a second copy of the data (causing higher storage costs). Beyond Indexes: How Open Table Formats Optimize Query Performance Why I’m not a fan of zero-copy Apache Kafka-Apache Iceberg Understanding Apache Fluss

0 views
Jack Vanlightly 1 months ago

A Fork in the Road: Deciding Kafka’s Diskless Future

“ The Kafka community is currently seeing an unprecedented situation with three KIPs ( KIP-1150 , KIP-1176 , KIP-1183) simultaneously addressing the same challenge of high replication costs when running Kafka across multiple cloud availability zones. ” — Luke Chen, The Path Forward for Saving Cross-AZ Replication Costs KIPs At the time of writing the Kafka project finds itself at a fork in the road where choosing the right path forward for implementing S3 topics has implications for the long-term success of the project. Not just the next couple of years, but the next decade. Open-source projects live and die by these big decisions and as a community, we need to make sure we take the right one. This post explains the competing KIPs, but goes further and asks bigger questions about the future direction of Kafka. Before comparing proposals, we should step back and ask what kind of system we want Kafka to become. Kafka now faces two almost opposing forces. One force is stabilizing: the on-prem deployments and the low latency workloads that depend on local disks and replication. Kafka must continue to serve those use cases. The other force is disrupting: the elastic, cloud-native workloads that favor stateless compute and shared object storage. Relaxed-latency workloads such as analytics have seen a shift in system design with durability increasingly delegated to shared object storage, freeing the compute layer to be stateless, elastic, and disposable. Many systems now scale by adding stateless workers rather than rebalancing stateful nodes. In a stateless compute design, the bottleneck shifts from data replication to metadata coordination. Once durability moves to shared storage, sequencing and metadata consistency become the new limits of scalability. That brings us to the current moment, with three competing KIPs defining how to integrate object storage directly into Kafka. While we evaluate these KIPs, it’s important to consider the motivations for building direct-to-S3 topics. Cross-AZ charges are typically what are on people’s minds, but it’s a mistake to think of S3 simply as a cheaper disk or a networking cheat. The shift is also architectural, providing us an opportunity to achieve those operational benefits such as elastic stateless compute.  The devil is in the details: how each KIP enables Kafka to leverage object storage while also retaining Kafka’s soul and what made it successful in the first place.  With that in mind, while three KIPs have been submitted, it comes down to two different paths: Revolutionary : Choose a direct-to-S3 topic design that maximizes the benefits of an object-storage architecture, with greater elasticity and lower operational complexity. However, in doing so, we may increase the implementation cost and possibly the long-term code maintenance too by maintaining two very different topic-models in the same project (leader-based replication and direct-to-S3). Evolutionary : Shoot for an evolutionary design that makes use of existing components to reduce the need for large refactoring or duplication of logic. However, by coupling to the existing architecture, we forfeit the extra benefits of object storage, focusing primarily on networking cost savings (in AWS and GCP). Through this coupling, we also run the risk of achieving the opposite: harder to maintain code by bending and contorting a second workload into an architecture optimized for something else. In this post I will explain the two paths in this forked road, how the various KIPs map onto those paths, and invite the whole community to think through what they want for Apache Kafka for the next decade. Note that I do not include KIP-1183 as it looks dead in the water, and not a serious contender. The KIP proposes AutoMQ’s storage abstractions without the accompanying implementation. Which perhaps cynically, seems to benefit AutoMQ were it ever adopted, leaving the community to rewrite the entire storage subsystem again. If you want a quick summary of the three KIPs (including KIP-1183), you can read Luke Chen’s The Path Forward for Saving Cross-AZ Replication Costs KIPs or Anton Borisov’s summary of the three KIPs .  This post is structured as follows: The term “Diskless” vs “Direct-to-S3” The Common Parts. Some approaches are shared across multiple implementations and proposals. Revolutionary: KIPs and real-world implementations Evolutionary: Slack’s KIP-1176 The Hybrid: balancing revolution with evolution Deciding Kafka’s future I used the term “diskless” in the title as that is the current hype word. But it is clear that not all designs are actually diskless in the same spirit as “serverless”. Serverless implies that users no longer need to consider or manage servers at all, not that there are no servers.  In the world of open-source, where you run stuff yourself, diskless would have to mean literally “no disks”, else you will be configuring disks as part of your deployment. But all the KIPs (in their current state) depend on disks to some extent, even KIP-1150 which was proposed as diskless. In most cases, disk behavior continues to influence performance and therefore correct disk provisioning will be important. So I’m not a fan of “diskless”, I prefer “direct-to-S3”, which encompasses all designs that treat S3 (and other object stores) as the only source of durability. The main commonality between all Direct-to-S3 Kafka implementations and design proposals is the uploading of objects that combine the data of multiple topics. The reasons are two-fold: Avoiding the small file problem . Most designs are leaderless for producer traffic, allowing for any server to receive writes to any topic. To avoid uploading a multitude of tiny files, servers accumulate batches in a buffer until ready for upload. Before upload, the buffer is sorted by topic id and partition, to make compaction and some reads more efficient by ensuring that data of the same topic and same partition are in contiguous byte ranges. Pricing . The pricing of many (but not all) cloud object storage services penalize excessive requests, so it can be cheaper to roll-up whatever data has been received in the last X milliseconds and upload it with a single request. In the leader-based model, the leader determines the order of batches in a topic partition. But in the leaderless model, multiple brokers could be simultaneously receiving produce batches of the same topic partition, so how do we order those batches? We need a way of establishing a single order for each partition and we typically use the word “sequencing” to describe that process. Usually there is a central component that does the sequencing and metadata storage, but some designs manage the sequencing in other ways. WarpStream was the first to demonstrate that you could hack the metadata step of initiating a producer to provide it with broker information that would align the producer with a zone-local broker for the topics it is interested in. The Kafka client is leader-oriented, so we just pass it a zone-local broker and tell the client “this is the leader”. This is how all the leaderless designs ensure producers write to zone-local brokers. It’s not pretty, and we should make a future KIP to avoid the need for this kind of hack. Consumer zone-alignment heavily depends on the particular design, but two broad approaches exist: Leaderless: The same way that producer alignment works via metadata manipulation or using KIP-392 (fetch from follower) which can be used in a leaderless context. Leader-based:  Zone-aware consumer group assignment as detailed in KIP-881: Rack-aware Partition Assignment for Kafka Consumers . The idea is to use consumer-to-partition assignment to ensure consumers are only assigned zone-local partitions (where the partition leader is located). KIP-392 (fetch-from-follower) , which is effective for designs that have followers (which isn’t always the case). Given almost all designs upload combined objects, we need a way to make those mixed objects more read optimized. This is typically done through compaction, where combined objects are ultimately separated into per-topic or even per-partition objects. Compaction could be one-shot or go through multiple rounds. The “revolutionary” path draws a new boundary inside Kafka by separating what can be stateless from what must remain stateful. Direct-to-S3 traffic is handled by a lightweight, elastic layer of brokers that simply serve producers and consumers. The direct-to-S3 coordination (sequencing/metadata) is incorporated into the stateful side of regular brokers where coordinators, classic topics and KRaft live. I cover three designs in the “revolutionary” section: WarpStream (as a reference, a kind of yardstick to compare against) KIP-1150 revision 1 Aiven Inkless (a Kafka-fork) Before we look at the KIPs that describe possible futures for Apache Kafka, let’s look at a system that was designed from scratch with both cross-AZ cost savings and elasticity (from object storage) as its core design principles. WarpStream was unconstrained by an existing stateful architecture, and with this freedom, it divided itself into: Leaderless, stateless and diskless agents that handle Kafka clients, as well as compaction/cleaning work.  Coordination layer : A central metadata store for sequencing, metadata storage and housekeeping coordination. Fig WS-A. The WarpStream stateless/stateful split architecture. As per the Common Parts section, the zone alignment and sorted combined object upload strategies are employed. Fig WS-B. The WarpStream write path. On the consumer side, which again is leaderless and zone-local, a per-zone shared cache is implemented (which WarpStream dubbed distributed mmap). Within a zone, this shared cache assigns each agent a portion of the partition-space. When a consumer fetch arrives, an agent will download the object byte range itself if it is responsible for that partition, else it will ask the responsible agent to do that on its behalf. That way, we ensure that multiple agents per zone are not independently downloading the same data, thus reducing S3 costs. Fig WS-C. Shared per-zone read cache to reduce S3 throughput and request costs. WarpStream implements agent roles (proxy, job) and agent groups to separate the work of handling producer, consumer traffic from background jobs such as compaction, allowing for independent scaling for each workload. The proxy role (producer/consumer Kafka client traffic) can be further divided into proxy-producer and proxy-consumer.  Agents can be deployed as dedicated agent groups, which allows for further separation of workloads. This is useful for avoiding noisy neighbor issues, running different groups in different VPCs and scaling different workloads that hit the same topics. For example, you could use one proxy group for microservices, and a separate proxy-consumer group for an analytics workload. Fig WS-D. Agent roles and groups can be used for shaping traffic, independent scaling and deploying different workloads into separate VPCs. Being a pure Direct-to-S3 system allowed WarpStream to choose a design that clearly separates traffic serving work into stateless agents and the coordination logic into one central metadata service. The traffic serving layer is highly elastic, with relatively simple agents that require no disks at all. Different workloads benefit from independent and flexible scaling via the agent roles and groups. The point of contention is the metadata service, which needs to be carefully managed and scaled to handle the read/write metadata volume of the stateless agents. Confluent Freight Clusters follow a largely similar design principle of splitting stateless brokers from central coordination. I will write about the Freight design sometime soon in the future. Apache Kafka is a stateful distributed system, but next we’ll see how KIP-1150 could fulfill much of the same capabilities as WarpStream. KIP-1150 has continued to evolve since it was first proposed, undergoing two subsequent major revisions between the KIP itself and mailing list discussion . This section describes the first version of the KIP, created in April 2025. KIP-1150 revision 1 uses a leaderless architecture where any broker can serve Kafka producers and consumers of any topic partition. Batch Coordinators (replicated state machines like Group and Transaction Coordinators), handle coordination (object sequencing and metadata storage). Brokers accumulate Kafka batches and upload shared objects to S3 (known as Shared Log Segment Objects, or SLSO). Metadata that maps these blocks of Kafka batches to SLSOs (known as Batch Coordinates) is then sent to a Batch Coordinator (BC) that sequences the batches (assigning offsets) to provide a global order of those batches per partition. The BC acts as sequencer and batch coordinate database for later lookups, allowing for the read path to retrieve batches from SLSOs. The BCs will also apply idempotent and transactional producer logic (not yet finalized). Fig KIP-1150-rev1-A. Leaderless brokers upload shared objects, then commit and sequence them via the Batch Coordinator. The Batch Coordinator (BC) is the stateful component akin to WarpStream’s metadata service. Kafka has other coordinators such as the Group Coordinator (for the consumer group protocol), the Transaction Coordinator (for reliable 2PC) and Share Coordinator (for queues aka share groups). The coordinator concept, for centralizing some kind of coordination, has a long history in Kafka. The BC is the source of truth about uploaded objects, Direct-to-S3 topic partitions, and committed batches. This is the component that contains the most complexity, with challenges around scaling, failovers, reliability as well as logic for idempotency, transactions, object compaction coordination and so on. A Batch Coordinator has the following roles: Sequencing . Chooses the total ordering for writes, assigning offsets without gaps or duplicates. Metadata storage . Stores all metadata that maps partition offset ranges to S3 object byte ranges. Serving lookup requests . Serving requests for log offsets. Serving requests for batch coordinates (S3 object metadata). Partition CRUD operations . Serving requests for atomic operations (creating partitions, deleting topics, records, etc.) Data expiration Managing data expiry and soft deletion. Coordinating physical object deletion (performed by brokers). The Group and Transaction Coordinators use internal Kafka topics to durably store their state and rely on the KRaft Controller for leader election (coordinators are highly available with failovers). This KIP does not specify whether the Batch Coordinator will be backed by a topic, or use some other option such as a Raft state machine (based on KRaft state machine code). It also proposes that it could be pluggable. For my part, I would prefer all future coordinators to be KRaft state machines, as the implementation is rock solid and can be used like a library to build arbitrary state machines inside Kafka brokers. When a producer starts, it sends a Metadata request to learn which brokers are the leaders of each topic partition it cares about. The Metadata response contains a zone-local broker. The producer sends all Produce requests to this broker. The receiving broker accumulates batches of all partitions and uploads a shared log segment object (SLSO) to S3. The broker commits the SLSO by sending the metadata of the SLSO (the metadata is known as the batch coordinates) to the Batch Coordinator. The coordinates include the S3 object metadata and the byte ranges of each partition in the object. The Batch Coordinator assigns offsets to the written batches, persists the batch coordinates, and responds to the broker. The broker sends all associated acknowledgements to the producers. Brokers can parallelize the uploading SLSOs to S3, but commit them serially to the Batch Coordinator. When batches are first written, a broker does not assign offsets as would happen with a regular topic, as this is done after the data is written, when it is sequenced and indexed by the Batch Coordinator. In other words, the batches stored in S3 have no offsets stored within the payloads as is normally the case. On consumption, the broker must inject the offsets into the Kafka batches, using metadata from the Batch Coordinator. The consumer sends a Fetch request to the broker. The broker checks its cache and on a miss, the broker queries the Batch Coordinator for the relevant batch coordinates. The broker downloads the data from object storage. The broker injects the computed offsets and timestamps into the batches (the offsets being part of the batch coordinates). The broker constructs and sends the Fetch response to the Consumer. Fig KIP-1150-rev1-B. The consume path of KIP-1150 (ignoring any caching logic here). The KIP notes that broker roles could be supported in the future. I believe that KIP-1150 revision 1 starts becoming really powerful with roles akin to WarpStream. That way we can separate out direct-to-S3 topic serving traffic and object compaction work on proxy brokers , which become the elastic serving and background job layer. Batch Coordinators would remain hosted on standard brokers, which are already stateful. With broker roles, we can see how Kafka could implement its own WarpStream-like architecture, which makes full use of disaggregated storage to enable better elasticity. Fig KIP-1150-rev1-C. Broker roles would bring the stateless/stateful separation that would unlock elasticity in the high throughput workload of many Direct-to-S3 deployments. Things like supporting idempotency, transactions, caching and object compaction are left to be decided later. While taxing to design, these things look doable within the basic framework of the design. But as I already mentioned in this post, this will be costly in effort to develop but also may come with a long term code maintenance overhead if complex parts such as transactions are maintained twice. It may also be possible to refactor rather than do wholesale rewrites. Inkless is Aiven’s direct-to-S3 fork of Apache Kafka. The Inkless design shares the combined object upload part and metadata manipulation that all designs across the board are using. It is also leaderless for direct-to-S3 topics. Inkless is firmly in the revolutionary camp. If it were to implement broker roles, it would make this Kafka-fork much closer to a WarpStream-like implementation (albeit with some issues concerning the coordination component as we’ll see further down). While Inkless is described as a KIP-1150 implementation, we’ll see that it actually diverges significantly from KIP-1150, especially the later revisions (covered later). Inkless eschewed the Batch Coordinator of KIP-1150 in favor of a Postgres instance, with coordination being executed through Table Valued Functions (TVF) and row locking where needed. Fig Inkless-A. The write path. On the read side, each broker must discover what batches exist by querying Postgres (using a TVF again), which returns the next set of batch coordinates as well as the high watermark. Now the broker knows where the next batches are located, it requests those batches via a read-through cache. On a cache miss, it fetches the byte ranges from the relevant objects in S3. Fig Inkless-B. The read path. Inkless bears some resemblance to KIP-1150 revision 1, except the difficult coordination bits are delegated to Postgres. Postgres does all the sequencing, metadata storage, as well as coordination for compaction and file cleanup. For example, compaction is coordinated via Postgres, with a TVF that is periodically called by each broker which finds a set of files which together exceed a size threshold and places them into a merge work order (tables file_merge_work_items and file_merge_work_item_files ) that the broker claims. Once carried out, the original files are marked for deletion (which is another job that can be claimed by a broker). Fig Inkless-C. Direct-to-S3 traffic uses a leaderless broker architecture with coordination owned by Postgres. Inkless doesn’t implement transactions, and I don’t think Postgres could take on the role of transaction coordinator, as the coordinator does more than sequencing and storage. Inkless will likely have to implement some kind of coordinator for that. The Postgres data model is based on the following tables: logs . Stores the Log Start Offset and High Watermark of each topic partition, with the primary key of topic_id and partition. files . Lists all the objects that host the topic partition data. batches . Maps Kafka batches to byte ranges in Files.  producer_state . All the producer state needed for idempotency. Some other tables for housekeeping, and the merge work items. Fig Inkless-D. Postgres data model. The Commit File TVF , which sequences and stores the batch coordinates, works as follows: A broker opens a transaction and submits a table as an argument, containing the batch coordinates of the multiple topics uploaded in the combined file. The TVF logic creates a temporary table (logs_tmp) and fills it via a SELECT on the logs table, with the FOR UPDATE clause which obtains a row lock on each topic partition row in the logs table that matches the list of partitions being submitted. This ensures that other brokers that are competing to add batches to the same partition(s) queue up behind this transaction. This is a critical barrier that avoids inconsistency. These locks are held until the transaction commits or aborts. Next it, inside a loop, partition-by-partition, the TVF: Updates the producer state. Updates the high watermark of the partition (a row in the logs table). Inserts the batch coordinates into the batches table (sequencing and storing them). Commits the transaction. Apache Kafka would not accept a Postgres dependency of course, and KIP-1150 has not proposed centralizing coordination in Postgres either. But the KIP has suggested that the Batch Coordinator be pluggable, which might leave it open for using Postgres as a backing implementation. As a former database performance specialist, the Postgres locking does concern me a bit. It blocks on the logs table rows scoped to the topic id and partition. An ORDER BY prevents deadlocks, but given the row locks are maintained until the transaction commits, I imagine that given enough contention, it could cause a convoy effect of blocking. This blocking is fair, that is to say, First Come First Serve (FCFS) for each individual row.  For example, with 3 transactions: T1 locks rows 11–15, T2 wants to lock 6-11, but only manages 6-10 as it blocks on row 11. Meanwhile T2 wants to lock 1-6, but only manages 1-5 as it blocks on 6. We now have a dependency tree where T1 blocks T2 and T2 blocks T3. Once T1 commits, the others get unblocked, but under sustained load, this kind of locking and blocking can quickly cascade, such that once contention starts, it rapidly expands. This contention is sensitive to the number of concurrent transactions and the number of partitions per commit. A common pattern with this kind of locking is that up until a certain transaction throughput everything is fine, but at the first hint of contention, the whole thing slows to a crawl. Contention breeds more contention. I would therefore caution against the use of Postgres as a Batch Coordinator implementation. The following is a very high-level look at Slack’s KIP-1176 , in the interests of keeping this post from getting too detailed. There are three key points to this KIP’s design: Maintain leader-based topic partitions (producers continue to write to leaders), but replace Kafka replication protocol with a per-broker S3-based write-ahead-log (WAL). Try to preserve existing partition replica code for idempotency and transactions. Reuse existing tiered storage for long-term S3 data management. Fig Slack-KIP-1176-A. Leader-based architecture retained, replication replaced by an S3 WAL. Tiered storage manages long-term data. The basic idea is to preserve the leader-based architecture of Kafka, with each leader replica continuing to write to an active local log segment file, which it rotates periodically. A per-broker write-ahead-log (WAL) replaces replication. A WAL Combiner component in the Kafka broker progressively (and aggressively) tiers portions of the local active log segment files (without closing them), combining them into multi-topic objects uploaded to S3. Once a Kafka batch has been written to the WAL, the broker can send an acknowledgment to its producer. This active log segment tiering does not change how log segments are rotated. Once an active log segment is rotated out (by closing it and creating a new active log segment file), it can be tiered by the existing tiered storage component, for the long-term. Fig Slack-KIP-1176-B. Produce batches are written to the page cache as usual, but active log segment files are aggressively tiered to S3 (possibly Express One Zone) in combined log segment files. The WAL acts as write-optimized S3 storage and the existing tiered storage uploads closed log segment files for long-term storage. Once all data of a given WAL object has been tiered, it can be deleted. The WAL only becomes necessary during topic partition leader-failovers, where the new leader replica bootstraps itself from the WAL. Alternatively, each topic partition can have one or more followers which actively reconstruct local log segments from the WAL, providing a faster failover. The general principle is to keep as much of Kafka unchanged as possible, only changing from the Kafka replication protocol to an S3 per-broker WAL. The priority is to avoid the need for heavy rework or reimplementation of logic such as idempotency, transactions and share groups integration. But it gives up elasticity and the additional architectural benefits that come from building on disaggregated storage. Having said all of the above. There are a lot of missing or hacky details that currently detract from the evolutionary goal. There is a lot of hand-waving when it comes to correctness too. It is not clear that this KIP will be able to deliver a low-disruption evolutionary design that is also correct, highly available and durable. Discussion in the mailing list is ongoing. Luke Chen remarked: “ the current availability story is weak… It’s not clear if the effort is still small once details on correctness, cost, cleanness are figured out. ”, and I have to agree. The second revision of KIP-1150 replaces the future object compaction logic by delegating long-term storage management to the existing tiered storage abstraction (like Slack’s KIP-1176). The idea is to: Remove Batch Coordinators from the read path. Avoid separate object compaction logic by delegating long-term storage management to tiered storage (which already exists).  Rebuild per-partition log segments from combined objects in order to: Submit them for long-term tiering (works as a form of object compaction too). Serve consumer fetch requests. The entire write path becomes a three stage process: Stage 1 – Produce path, synchronous . Uploads multi-topic WAL Segments to S3 and sequences the batches, acknowledging to producers once committed. This is unchanged except SLSOs are now called WAL Segments. Stage 2 – Per-partition log segment file construction, asynchronous . Each broker is assigned a subset of topic partitions. The brokers download WAL segment byte ranges that host these assigned partitions and append to on-disk per-partition log segment files. Stage 3 – Tiered storage, asynchronous . The tiered storage architecture tiers the locally cached topic partition log segments files as normal. Stage 1 – The produce path The produce path is the same, but SLSOs are now called WAL Segments. Fig KIP-1150-rev2-A. Write path stage 1 (the synchronous part). Leaderless brokers upload multi-topic WAL Segments, then commit and sequence them via the Batch Coordinator. Stage 2 – Local per-partition segment caching The second stage is preparation for both: Stage 3, segment tiering (tiered storage). The read pathway for tailing consumers Fig KIP-1150-rev2-B. Stage 2 of the write-path where assigned brokers download WAL segments and append to local log segments. Each broker is assigned a subset of topic partitions. Each broker polls the BC to learn of new WAL segments. Each WAL segment that hosts any of the broker’s assigned topic partitions will be downloaded (at least the byte range of its assigned partitions). Once the download completes, the broker will inject record offsets as determined by the batch coordinator, and append the finalized batch to a local (per topic partition) log segment on-disk.  At this point, a log segment file looks like a classic topic partition segment file. The difference is that they are not a source of durability, only a source for tiering and consumption. WAL segments remain in object storage until all batches of a segment have been tiered via tiered storage. Then WAL segments can be deleted. Stage 3 – Tiered storage Tiered storage continues to work as it does today (KIP-405), based on local log segments. It hopefully knows nothing of the Direct-to-S3 components and logic. Tiered segment metadata is stored in KRaft which allows for WAL segment deletion to be handled outside of the scope of tiered storage also. Fig KIP-1150-rev2-C. Tiered storage works as-is, based on local log segment files. Data is consumed from S3 topics from either: The local segments on-disk, populated from stage 2.  Tiered log segments (traditional tiered storage read pathway) End-to-end latency of any given batch is therefore based on: Produce batch added to buffer. WAL Segment containing that batch written to S3. Batch coordinates submitted to the Batch Coordinator for sequencing. Producer request acknowledged Tail, untiered (fast path for tailing consumers) -> Replica downloads WAL Segment slice. Replica appends the batch to a local (per topic partition) log segment. Replica serves a consumer fetch request from the local log segment. Tiered (slow path for lagging consumers) -> Remote Log Manager downloads tiered log segment Replica serves a consumer fetch request from the downloaded log segment. Avoiding excessive reads to S3 will be important in stage 2  (when a broker is downloading WAL segment files for its assigned topic partitions). This KIP should standardize how topic partitions are laid out inside every WAL segment and perform partition-broker assignments based on that same order: Pick a single global topic partition order (based on a permutation of a topic partition id). Partition that order into contiguous slices, giving one slice per broker as its assignment ( (a broker may get multiple topic partitions, but they must be adjacent in the global order). Lay out every WAL segment in that same global order. That way, each broker’s partition assignment will occupy one contiguous block per WAL Segment, so each read broker needs only one byte-range read per WAL segment object (possibly empty if none of its partitions appear in that object). This reduces the number of range reads per broker when reconstructing local log segments. By integrating with tiered storage and reconstructing log segments, revision 2 moves to a more diskful design, where disks form a step in the write path to long term storage. It is also more stateful and sticky than revision 1, given that each topic partition is assigned a specific broker for log segment reconstruction, tiering and consumer serving. Revision 2 remains leaderless for producers, but leaderful for consumers. Therefore to avoid cross-AZ traffic for consumer traffic, it will rely on KIP-881: Rack-aware Partition Assignment for Kafka Consumers to ensure zone-local consumer assignment. This makes revision 2 a hybrid. By delegating responsibility to tiered storage, more of the direct-to-S3 workload must be handled by stateful brokers. It is less able to benefit from the elasticity of disaggregated storage. But it reuses more of existing Kafka. The third revision, which at the time of writing is a loose proposal in the mailing list, ditches the Batch Coordinator (BC) altogether. Most of the complexity of KIP-1150 centers around Batch Coordinator efficiency, failovers, scaling as well as idempotency, transactions and share groups logic. Revision 3 proposes to replace the BCs with “classic” topic partitions. The classic topic partition leader replicas will do the work of sequencing and storing the batch coordinates of their own data. The data itself would live in SLSOs (rev1)/WAL Segments (rev2) and ultimately, as tiered log segments (tiered storage). To make this clear, if as a user you create the topic Orders with one partition, then an actual topic Orders will be created with one partition. However, this will only be used for the sequencing of the Orders data and the storage of its metadata. The benefit of this approach is that all the idempotency and transaction logic can be reused in these “classic-ish” topic partitions. There will be code changes but less than Batch Coordinators. All the existing tooling of moving partitions around, failovers etc works the same way as classic topics. So replication continues to exist, but only for the metadata. Fig KIP-1150-rev3-A. Replicated partitions continue to exist, acting as per-partition sequencers and metadata stores (replacing batch coordinators). One wrinkle this adds is that there is no central place to manage the clean-up of WAL Segments. Therefore a WAL File Manager component would have responsibility for background cleanup of those WAL segment files. It would periodically check the status of tiering to discover when a WAL Segment can get deleted. Fig KIP-1150-rev3-B. The WAL File Manager is responsible for WAL Segment clean up The motivation behind this change to remove Batch Coordinators is to simplify implementation by reusing existing Kafka code paths (for idempotence, transactions, etc.).  However, it also opens up a whole new set of challenges which must be discussed and debated, and it is not clear this third revision solves the complexity. Revision 3 now depends on the existing classic topics, with leader-follower replication. It moves a little further again towards the evolutionary path. It is curious to see Aiven productionizing its Kafka fork “Inkless”, which falls under the “revolutionary” umbrella, while pushing towards a more evolutionary stateful/sticky design in these later revisions of KIP-1150.  Apache Kafka is approaching a decision point with long-term implications for its architecture and identity. The ongoing discussions around KIP-1150 revisions 1-3 and KIP-1176 are nominally framed around replication cost reduction, but the underlying issue is broader: how should Kafka evolve in a world increasingly shaped by disaggregated storage and elastic compute? At its core, the choice comes down to two paths. The evolutionary path seeks to fit S3 topics into Kafka’s existing leader-follower framework, reusing current abstractions such as tiered storage to minimize disruption to the codebase. The revolutionary path instead prioritizes the benefits of building directly on object storage. By delegating to shared object storage, Kafka can support an S3 topic serving layer which is stateless, elastic, and disposable. Scaling coming by adding and removing stateless workers rather than rebalancing stateful nodes. While maintaining Kafka’s existing workloads with classic leader-follower topics. While the intentions and goals of the KIPs clearly fall on a continuum of revolutionary to evolutionary, the reality in the mailing list discussions makes everything much less clear. The devil is in the details, and as the discussion advances, the arguments of “simplicity through reuse” start to strain. The reuse strategy is a retrofitting strategy which ironically could actually make the codebase harder to maintain in the long term. Kafka’s existing model is deeply rooted in leader-follower replication, with much of its core logic built around that assumption. Retrofitting direct-to-S3 into this model forces some “unnatural” design choices. Choices that would not be made otherwise (if designing a cloud-native solution). My own view aligns with the more revolutionary path in the form of KIP-1150 revision 1 . It doesn’t simply reduce cross-AZ costs, but fully embraces the architectural benefits of building on object storage. With additional broker roles and groups, Kafka could ultimately achieve a similar elasticity to WarpStream (and Confluent Freight Clusters).  The approach demands more upfront engineering effort, may increase long-term maintenance complexity, but avoids tight coupling to the existing leader-follower architecture. Much depends on what kind of refactoring is possible to avoid the duplication of idempotency, transactions and share group logic. I believe the benefits justify the upfront cost and will help keep Kafka relevant in the decade ahead.  In theory, both directions are defensible, ultimately it comes down to the specifics of each KIP. The details really matter. Goals define direction, but it’s the engineering details that determine the system’s actual properties. We know the revolutionary path involves big changes, but the evolutionary path comes with equally large challenges, where retrofitting may ultimately be more costly while simultaneously delivering less. The committers who maintain Kafka are cautious about large refactorings and code duplication, but are equally wary of hacks and complex code serving two needs. We need to let the discussions play out. My aim with this post has been to take a step back from "how to implement direct-to-S3 topics in Kafka", and think more about what we want Kafka to be. The KIPs represent the how, the engineering choices. Framed that way, I believe it is easier for the wider community to understand the KIPs, the stakes and the eventual decision of the committers, whichever way they ultimately decide to go. Revolutionary : Choose a direct-to-S3 topic design that maximizes the benefits of an object-storage architecture, with greater elasticity and lower operational complexity. However, in doing so, we may increase the implementation cost and possibly the long-term code maintenance too by maintaining two very different topic-models in the same project (leader-based replication and direct-to-S3). Evolutionary : Shoot for an evolutionary design that makes use of existing components to reduce the need for large refactoring or duplication of logic. However, by coupling to the existing architecture, we forfeit the extra benefits of object storage, focusing primarily on networking cost savings (in AWS and GCP). Through this coupling, we also run the risk of achieving the opposite: harder to maintain code by bending and contorting a second workload into an architecture optimized for something else. The term “Diskless” vs “Direct-to-S3” The Common Parts. Some approaches are shared across multiple implementations and proposals. Revolutionary: KIPs and real-world implementations Evolutionary: Slack’s KIP-1176 The Hybrid: balancing revolution with evolution Deciding Kafka’s future Avoiding the small file problem . Most designs are leaderless for producer traffic, allowing for any server to receive writes to any topic. To avoid uploading a multitude of tiny files, servers accumulate batches in a buffer until ready for upload. Before upload, the buffer is sorted by topic id and partition, to make compaction and some reads more efficient by ensuring that data of the same topic and same partition are in contiguous byte ranges. Pricing . The pricing of many (but not all) cloud object storage services penalize excessive requests, so it can be cheaper to roll-up whatever data has been received in the last X milliseconds and upload it with a single request. Leaderless: The same way that producer alignment works via metadata manipulation or using KIP-392 (fetch from follower) which can be used in a leaderless context. Leader-based:  Zone-aware consumer group assignment as detailed in KIP-881: Rack-aware Partition Assignment for Kafka Consumers . The idea is to use consumer-to-partition assignment to ensure consumers are only assigned zone-local partitions (where the partition leader is located). KIP-392 (fetch-from-follower) , which is effective for designs that have followers (which isn’t always the case). WarpStream (as a reference, a kind of yardstick to compare against) KIP-1150 revision 1 Aiven Inkless (a Kafka-fork) Leaderless, stateless and diskless agents that handle Kafka clients, as well as compaction/cleaning work.  Coordination layer : A central metadata store for sequencing, metadata storage and housekeeping coordination. Sequencing . Chooses the total ordering for writes, assigning offsets without gaps or duplicates. Metadata storage . Stores all metadata that maps partition offset ranges to S3 object byte ranges. Serving lookup requests . Serving requests for log offsets. Serving requests for batch coordinates (S3 object metadata). Partition CRUD operations . Serving requests for atomic operations (creating partitions, deleting topics, records, etc.) Data expiration Managing data expiry and soft deletion. Coordinating physical object deletion (performed by brokers). When a producer starts, it sends a Metadata request to learn which brokers are the leaders of each topic partition it cares about. The Metadata response contains a zone-local broker. The producer sends all Produce requests to this broker. The receiving broker accumulates batches of all partitions and uploads a shared log segment object (SLSO) to S3. The broker commits the SLSO by sending the metadata of the SLSO (the metadata is known as the batch coordinates) to the Batch Coordinator. The coordinates include the S3 object metadata and the byte ranges of each partition in the object. The Batch Coordinator assigns offsets to the written batches, persists the batch coordinates, and responds to the broker. The broker sends all associated acknowledgements to the producers. The consumer sends a Fetch request to the broker. The broker checks its cache and on a miss, the broker queries the Batch Coordinator for the relevant batch coordinates. The broker downloads the data from object storage. The broker injects the computed offsets and timestamps into the batches (the offsets being part of the batch coordinates). The broker constructs and sends the Fetch response to the Consumer. logs . Stores the Log Start Offset and High Watermark of each topic partition, with the primary key of topic_id and partition. files . Lists all the objects that host the topic partition data. batches . Maps Kafka batches to byte ranges in Files.  producer_state . All the producer state needed for idempotency. Some other tables for housekeeping, and the merge work items. A broker opens a transaction and submits a table as an argument, containing the batch coordinates of the multiple topics uploaded in the combined file. The TVF logic creates a temporary table (logs_tmp) and fills it via a SELECT on the logs table, with the FOR UPDATE clause which obtains a row lock on each topic partition row in the logs table that matches the list of partitions being submitted. This ensures that other brokers that are competing to add batches to the same partition(s) queue up behind this transaction. This is a critical barrier that avoids inconsistency. These locks are held until the transaction commits or aborts. Next it, inside a loop, partition-by-partition, the TVF: Updates the producer state. Updates the high watermark of the partition (a row in the logs table). Inserts the batch coordinates into the batches table (sequencing and storing them). Commits the transaction. Maintain leader-based topic partitions (producers continue to write to leaders), but replace Kafka replication protocol with a per-broker S3-based write-ahead-log (WAL). Try to preserve existing partition replica code for idempotency and transactions. Reuse existing tiered storage for long-term S3 data management. Remove Batch Coordinators from the read path. Avoid separate object compaction logic by delegating long-term storage management to tiered storage (which already exists).  Rebuild per-partition log segments from combined objects in order to: Submit them for long-term tiering (works as a form of object compaction too). Serve consumer fetch requests. Stage 1 – Produce path, synchronous . Uploads multi-topic WAL Segments to S3 and sequences the batches, acknowledging to producers once committed. This is unchanged except SLSOs are now called WAL Segments. Stage 2 – Per-partition log segment file construction, asynchronous . Each broker is assigned a subset of topic partitions. The brokers download WAL segment byte ranges that host these assigned partitions and append to on-disk per-partition log segment files. Stage 3 – Tiered storage, asynchronous . The tiered storage architecture tiers the locally cached topic partition log segments files as normal. Stage 3, segment tiering (tiered storage). The read pathway for tailing consumers The local segments on-disk, populated from stage 2.  Tiered log segments (traditional tiered storage read pathway) Produce batch added to buffer. WAL Segment containing that batch written to S3. Batch coordinates submitted to the Batch Coordinator for sequencing. Producer request acknowledged Tail, untiered (fast path for tailing consumers) -> Replica downloads WAL Segment slice. Replica appends the batch to a local (per topic partition) log segment. Replica serves a consumer fetch request from the local log segment. Tiered (slow path for lagging consumers) -> Remote Log Manager downloads tiered log segment Replica serves a consumer fetch request from the downloaded log segment. Pick a single global topic partition order (based on a permutation of a topic partition id). Partition that order into contiguous slices, giving one slice per broker as its assignment ( (a broker may get multiple topic partitions, but they must be adjacent in the global order). Lay out every WAL segment in that same global order.

0 views
Jack Vanlightly 1 months ago

Why I’m not a fan of zero-copy Apache Kafka-Apache Iceberg

Over the past few months, I’ve seen a growing number of posts on social media promoting the idea of a “zero-copy” integration between Apache Kafka and Apache Iceberg. The idea is that Kafka topics could live directly as Iceberg tables. On the surface it sounds efficient: one copy of the data, unified access for both streaming and analytics. But from a systems point of view, I think this is the wrong direction for the Apache Kafka project. In this post, I’ll explain why.  Zero-copy is a bit of a marketing buzzword. I prefer the term shared tiering . The idea behind shared tiering is that Apache Kafka tiers colder data to an Apache Iceberg table instead of tiering closed log segment files. It’s called shared tiering because the tiered data serves both Kafka and data analytics workloads.  The idea has been popularized recently in the Kafka world by Aiven, with a tiered storage plugin for Apache Kafka that adds Iceberg table tiering to the tiered storage abstraction inside Kafka brokers. But before we understand shared tiering we should understand the difference between tiering and materialization: Tiering is about moving data from one storage tier (and possibly storage format) to another, such that both tiers are readable by the source system and data is only durably stored in one tier. While the system may use caches, only one tier is the source of truth for any given data item. Usually it’s about moving data to a cheaper long-term storage tier.  Materialization is about making data of a primary system available to a secondary system , by copying data from the primary storage system (and format) to the secondary storage system, such that both data copies are maintained (albeit with different formats). The second copy is not readable from the source system as its purpose is to feed another data system. Copying data to a lakehouse for access by various analytics engines is a prime example. Fig 1. Tiering vs materialization There are two types of tiering: Internal Tiering is data tiering where only the primary data system can access the various storage tiers. For example, Kafka tiered storage is internal tiering. These internal storage tiers form the primary storage as a whole. Shared Tiering is data tiering where one or more data tiers is shared between multiple systems. The result is a tiering-materialization hybrid, serving both purposes. Tiering to a lakehouse is an example of shared tiering. Fig 2. Shared tiering makes the long-term storage a shared storage layer for two or more systems. There are two broad approaches to populating an Iceberg table from a Kafka topic: Internal Tiering + Materialization Shared Tiering Option 1: Internal Tiering + Materialization where Kafka continues to use traditional tiered storage of log segment files (internal tiering) and also materializes the topic as an Iceberg table (such as via Kafka Connect). Catch-up Kafka consumers will be served from tiered Kafka log segments, whereas compute engines such as Spark will use the Iceberg table. Fig 3. Internal Tiering + Materialization Option 2: Shared Tiering (zero-copy) where Kafka tiers topic data to Iceberg directly. The Iceberg table will serve both Kafka brokers (for catch-up Kafka consumers) and analytics engines such as Spark and Flink. Fig 4. Kafka tiers data directly to Iceberg. Iceberg is shared between Kafka and analytics. On the surface, shared tiering sounds attractive: one copy of the data, lower storage costs, no need to keep two copies of the data in sync. But the reality is more complicated.  Zero-copy shared tiering appears efficient at first glance as it eliminates duplication between Kafka and the data lake.  However, rather than simply taking one cost away, it shifts that cost from storage to compute. By tiering directly to Iceberg, brokers take on substantial compute overhead as they must both construct columnar Parquet files from log segment files (instead of simply copying them to object storage), and they must download Parquet files and convert them back into log segment files to serve lagging consumers. Richie Artoul of WarpStream blogged about why tiered storage is such a bad place to put Iceberg tiering work: “ First off, generating parquet files is expensive. Like really expensive. Compared to copying a log segment from the local disk to object storage, it uses at least an order of magnitude more CPU cycles and significant amounts of memory. That would be fine if this operation were running on a random stateless compute node, but it’s not, it’s running on one of the incredibly important Kafka brokers that is the leader for some of the topic-partitions in your cluster. This is the worst possible place to perform computationally expensive operations like generating parquet files. ” – Richard Artoul, The Case for an Iceberg-Native Database: Why Spark Jobs and Zero-Copy Kafka Won’t Cut It Richard is pretty sceptical of tiered storage in general, a sentiment which I don’t share so much, but where we agree is that Parquet file writing is far more expensive than log segment uploads. Things can get worse (for Kafka) when we consider optimizing the Iceberg table for analytics queries. I recently wrote about how open table formats optimize query performance. “ Ultimately, in open table formats, layout is king. Here, performance comes from layout (partitioning, sorting, and compaction) which determines how efficiently engines can skip data… Since the OTF table data exists only once, its data layout must reflect its dominant queries. You can’t sort or cluster the table twice without making a copy of it. ” – Beyond Indexes: How Open Table Formats Optimize Query Performance Layout, layout, layout. Performance comes from pruning, pruning efficiency comes from layout. So what layout should we choose for our shared Iceberg tables? To serve Kafka consumers efficiently, the data must be laid out in offset order. Each Parquet file would contain contiguous ranges of offsets of one or more topic partitions. This is ideal for Kafka as it needs only to fetch individual Parquet files or even row groups within files, in order to reconstruct a log segment to serve a lagging consumer. Best case is a 1:1 mapping from log-segment to Parquet file; otherwise read amplification grows quickly. Fig 5. Column statistics and Iceberg partitions clearly tell Kafka where a specific topic partition offset range lives. But these pruning statistics are not useful for analytics queries. However, for the analytics query, there is no useful pruning information in this layout, leading to large table scans. Queries like WHERE EventType = 'click' must read every file, since each one contains a random mixture of event types. Min/max column statistics on columns such as EventType are meaningless for predicate pushdown, so you lose all the performance advantages of Iceberg’s table abstraction. An analytics-optimized layout, by contrast, partitions and sorts data by business dimensions, such as EventType, Region and EventTime. Files are rewritten or merged to tighten column statistics and make pruning effective. That makes queries efficient: scans touch only a handful of partitions, and within each partition, min/max stats allow skipping large chunks of data. Fig 6. Column statistics and partitions clearly tell analytics queries that want to slice and dice by EventType and Region how to prune Parquet files. But these pruning statistics are not useful for Kafka at all, which must do a costly table scan (from a Kafka broker) to find a specific offset range to rebuild a log segment. With this analytics optimized layout, a lagging Kafka consumer is going to cause a Kafka broker to have a really bad day. Offset order no longer maps to file order. To fetch an offset range, the broker may have to fetch dozens of Parquet files (as the offset range is spread across files) due to analytics workload driven choices for partition scheme and sort orders. The read path becomes highly fragmented, resulting in massive read amplification. All of this downloading, scanning and segment reconstruction is happening in the Kafka broker. That’s a lot more work for Kafka brokers and it all costs a lot of CPU and memory. Normally I would say: given two equally important workloads, optimize for the workload that is less predictable, in this case, the lakehouse. The Kafka workload is sequential and therefore predictable, so we can use tricks such as read-aheads. Analytics workloads are far less predictable, slicing and dicing the data in different ways, so optimize for that.  But this is simply not practical for Apache Kafka. We can’t ask it to reconstruct sequential log segments from Parquet files that have been completely reorganized into a different data distribution. The best we could do is some half-way house, firmly leaning towards Kafka’s needs. Basically, partitioning by ingestion time (hour or date) so we preserve the sequential nature of the data and hope that most analytical queries only touch recent data. The alternative is to use the Kafka Iceberg table as a staging table for incrementally populating a cleaner (silver) table, but then haven’t we just made a copy anyway? In practice, the additional compute and I/O overhead can easily offset the storage savings, leading to a less efficient and less predictable performance profile overall. How we optimize the Iceberg table is critical to the efficiency of both the Kafka brokers, but also the analytics workload. Without separation of concerns, each workload is trapped by the other.  The first issue is that the schema of the Kafka topic may not even be suitable for the Iceberg table, for example, a CDC stream with before and after payloads. When we materialize these streams as tables, we don’t materialize them in their raw format, we use the information to materialize how the data ended up. But shared tiering requires that we write everything, every column, every header field and so on. For CDC, we’d then do some kind of MERGE operation from this topic table into a useful table, which reintroduces a copy. But it is evolution that concerns me more. When materializing Kafka data into an Iceberg table for long-term storage, schema evolution becomes a challenge. A Kafka topic’s schema often changes over time as new fields are added and others are renamed or deprecated, but the historical data still exists in the old shape. A Kafka topic partition is an immutable log where events remain in their original form until expired. If you want to retain all records forever in Iceberg, you need a strategy that allows old and new events to coexist in a consistent, queryable way. We need to avoid cases where either queries fail on missing fields or historical data becomes inaccessible for Kafka. Let’s look at two approaches to schema evolution of tables that are tiered Kafka topics. One common solution is to build an uber-schema , which is essentially the union of all fields that have ever existed in the Kafka topic schema. In this model, the Iceberg table schema grows as new Kafka fields are introduced, with each new field added as a nullable column. Old records simply leave these columns null, while newer events populate them. Deprecated fields remain in the schema but stay null for modern data. This approach preserves history in its original form, keeps ingestion pipelines simple, and leverages Iceberg’s built-in schema evolution capabilities.  The trade-off is that the schema can accumulate many rarely used columns, and analysts must know which columns are meaningful for which time ranges. Views can help, performing the necessary coalesces and such like to turn a messy set of columns into something cleaner. But it would require careful maintenance. The alternative is to periodically migrate old data forward into the latest schema. Instead of carrying every historical field forever, old records are rewritten so that they conform to the current schema.  Missing values may be backfilled with defaults, derived from other fields, or simply set to null. No longer used columns are dropped. This limits the messiness of the table’s schema over time, turning it from a shameful mess of nullable columns into something that looks reasonable. While this strategy produces a tidier schema and simplifies querying, it can be complex to implement. No vendors support this approach that I have seen, leaving this as a job for their customers, who know their data best. From the lakehouse point of view, we’d prefer to use the migrate-forward approach over the long term, but use the uber-schema in the shorter term. Once we know that no more producers are using an old schema, we can migrate the rows of that schema forward. But that migration can cause a loss of fidelity for the Kafka workload. What gets written to Kafka may not be what gets read back. This could be a good thing or a bad thing. Good if you want to go back and reshape old events to more closely match the newest schema. Really bad for some compliance and auditing purposes; imagine telling your stakeholders/auditors that we can’t guarantee the data that is read from Kafka will be the same as was written to Kafka! The needs of SQL-writing analysts and the needs of Kafka conflict. Kafka wants fidelity (and doesn’t care about column creep) and Iceberg wants clean tables. In some cases we might have a reprieve if Kafka clients only need to consume 7 days or 30 days of data, then we can use the superset method for the period that covers Kafka, and use the migrate-forward method for the rest. But we are still coupling the needs of Kafka clients with lakehouse clients. If Kafka only needs 7 days of data and Iceberg is storing years worth, then why do we care about data duplication anyway? Zero-copy may reduce data duplication but does not necessarily reduce cost. It shifts cost from storage to compute and operational overhead. When Kafka brokers handle Iceberg files directly, they assume responsibilities that are both CPU and I/O intensive: file format conversion, table maintenance, and reconstruction of ordered log segments. The result is higher broker load and less predictable performance. Any storage savings are easily offset by the increased compute requirements. On the subject of data duplication, there are reasons to believe that Kafka Iceberg tables may degenerate into staging tables, due to the unoptimized layout and proliferation of columns over time as schemas change. Such usage as staging tables eliminates the duplication argument. But even so, there is already plenty of duplication going on in the lakehouse. We already have copies. We have bronze then silver tables. We have all kinds of staging tables already. How much impact does the duplication avoidance of shared tiering actually make? Also consider: With a good materializer, we can write to silver tables directly rather than using bronze tables as tiered Kafka data (especially in the CDC case). A good materializer can also reduce data duplication. Usually, Kafka topic retention is only a few days, or up to a month, so the scope for duplication is limited to the Kafka retention period. Bidirectional fidelity is a real concern when converting between formats like Avro and formats like Parquet, which have different type systems and evolution rules. Storing the original Kafka bytes as a column in the Iceberg table is a legitimate approach as an insurance against conversion issues (but involves a copy). The deeper problem with zero-copy tiering is its erosion of boundaries and the resulting coupling. When Kafka uses Iceberg tables as its primary storage layer, that boundary disappears. Who is responsible for the Iceberg table? If Kafka doesn’t take on ownership, then Kafka becomes vulnerable to the decisions of whoever owns and maintains the Iceberg table. After all, the Iceberg table will host the vast majority of Kafka’s data! Who’s on call if tiered data can’t be read? Kafka engineers? Lakehouse engineers? Will they cooperate well? The mistake of this zero-copy thing is that it assumes that Kafka/Iceberg unification requires a single physical representation of the data. But unification is better served as logical and semantic unification when the workloads differ so much. A logical unification of Kafka and Iceberg allows for the storage of each workload to remain separate and optimized. Physical unification removes flexibility by adding coupling (the traps). Once Kafka and Iceberg share the same storage, each system constrains the other’s optimization and evolution. The result is a single, entangled system that must manage two storage formats and be tuned for two conflicting workloads, making both less reliable and harder to operate.  This entanglement also leads to Kafka becoming a data lakehouse manager, which I believe is a mistake for the project. Once Kafka stores its data in Iceberg, it must take ownership of that data. I don’t agree with that direction. There are both open-source and SaaS data lakehouse platforms which include table maintenance, as well as access and security controls. Let lakehouses do lakehouse management, and leave Kafka to do what it does best. Materialization isn’t perfect, but it decouples concerns and avoids forcing Kafka to become a lakehouse. The benefits of maintaining storage decoupling are: Lakehouse performance optimization doesn’t penalize the Kafka workload or vice-versa.  Kafka can use a native log-centric tiering mechanism that does not overly burden brokers. Schema evolution of the lakehouse does not impact Kafka topics, providing the lakehouse with more flexibility leading to cleaner tables. There is less risk and overhead associated with bidirectional conversion between Kafka and Iceberg and back again. 100% fidelity is a concern to all system builders working in this space. Materialized data can be freely projected and transformed, as it does not form the actual Kafka data. Kafka does not depend on the materialized data at all. Apache Kafka doesn’t need to perform lakehouse management itself. The drawbacks are: we need two data copies but hopefully I’ve argued why that isn’t the biggest concern here. Duplication is already a fact of life in modern data platforms. Tools such as Kafka Connect and Apache Flink already exist for materializing Kafka data in secondary systems. They move data across boundaries in a controlled, one-way flow, preserving clear ownership and interfaces on each side. Modern lakehouse platforms provide managed tables with ingestion APIs (such as Snowpipe Streaming, Databricks Unity Catalog REST API, and others). Kafka Connect can work really well with these ingestion APIs. Flink also has sinks for Iceberg, Delta, Hudi and Paimon. We don’t need Kafka brokers to do this work and do maintenance on top of that. Unifying operational and analytical systems doesn’t mean merging their physical storage into one copy. It’s about logical and semantic unification, achieved through consistent schemas and reliable data movement, not shared files. It is tempting to put everything into one cluster, but separation of concerns is what keeps complex systems comprehensible, performant and resilient. UPDATE: I’ve been asked a few times about Confluent Tableflow and how it aligns with this discussion. I didn't include Tableflow as it is a proprietary component of Confluent Cloud, whereas this post is focused on Apache Kafka. But for what’s it’s worth, Tableflow falls under the materialization approach, acting both as a sophisticated Iceberg/Delta materializer and as a table maintenance service. It is operated as a separate component (separate from the Kora brokers). In the future it will support more sophisticated data layout optimization for analytics workloads. It does not perform zero-copy for many of the reasons why zero-copy isn’t the right choice for Apache Kafka. Nor does it execute within brokers, being a separate service that is independently scalable. Tiering is about moving data from one storage tier (and possibly storage format) to another, such that both tiers are readable by the source system and data is only durably stored in one tier. While the system may use caches, only one tier is the source of truth for any given data item. Usually it’s about moving data to a cheaper long-term storage tier.  Materialization is about making data of a primary system available to a secondary system , by copying data from the primary storage system (and format) to the secondary storage system, such that both data copies are maintained (albeit with different formats). The second copy is not readable from the source system as its purpose is to feed another data system. Copying data to a lakehouse for access by various analytics engines is a prime example. Internal Tiering is data tiering where only the primary data system can access the various storage tiers. For example, Kafka tiered storage is internal tiering. These internal storage tiers form the primary storage as a whole. Shared Tiering is data tiering where one or more data tiers is shared between multiple systems. The result is a tiering-materialization hybrid, serving both purposes. Tiering to a lakehouse is an example of shared tiering. Internal Tiering + Materialization Shared Tiering Missing values may be backfilled with defaults, derived from other fields, or simply set to null. No longer used columns are dropped. With a good materializer, we can write to silver tables directly rather than using bronze tables as tiered Kafka data (especially in the CDC case). A good materializer can also reduce data duplication. Usually, Kafka topic retention is only a few days, or up to a month, so the scope for duplication is limited to the Kafka retention period. Bidirectional fidelity is a real concern when converting between formats like Avro and formats like Parquet, which have different type systems and evolution rules. Storing the original Kafka bytes as a column in the Iceberg table is a legitimate approach as an insurance against conversion issues (but involves a copy). Lakehouse performance optimization doesn’t penalize the Kafka workload or vice-versa.  Kafka can use a native log-centric tiering mechanism that does not overly burden brokers. Schema evolution of the lakehouse does not impact Kafka topics, providing the lakehouse with more flexibility leading to cleaner tables. There is less risk and overhead associated with bidirectional conversion between Kafka and Iceberg and back again. 100% fidelity is a concern to all system builders working in this space. Materialized data can be freely projected and transformed, as it does not form the actual Kafka data. Kafka does not depend on the materialized data at all. Apache Kafka doesn’t need to perform lakehouse management itself.

0 views
Jack Vanlightly 1 months ago

Beyond Indexes: How Open Table Formats Optimize Query Performance

My career in data started as a SQL Server performance specialist, which meant I was deep into the nuances of indexes, locking and blocking, execution plan analysis and query design. These days I’m more in the world of the open table format such as Apache Iceberg. Having learned the internals of both transactional and analytical database systems, I find the use of the word “index” interesting as they mean very different things to different systems. I see the term “index” used loosely when discussing open table format performance, both in their current designs and in speculation about future features that might make it into their specs. But what actually counts as an index in this world? Some formats, like Apache Hudi, do maintain record-level indexes such as, primary-key-to-filegroup maps that enable upserts and deletes to be directed efficiently to the right filegroup in order to support primary key tables. But they don’t help accelerate read performance across arbitrary predicates like the secondary indexes we rely on in OLTP databases. Traditional secondary indexes (like the B-trees used in relational databases) don’t exist in Iceberg, Delta Lake, or even Hudi. But why? Can't we solve some performance issues if we just added secondary indexes to the Iceberg spec? The short answer is: “ no and it's complicated ”. There are real and practical reasons why the answer isn’t just " we haven't gotten around to it yet ." The aim of the post is to understand: Why we can’t just add more secondary indexes to speed up analytical queries like we can often do in RDBMS databases. How the analytics workload drives the design of the table format in terms of core structure and auxiliary files (and how that differs to the RDBMS).  One core thing I’m going to try to convince you of in this post is that whether your table is a traditional RDBMS or Iceberg, read performance comes down to reducing the amount of IO performed, which in turn comes down to: How we organize the data itself. The use of auxiliary data structures over that data organization. The key point is that the workload determines how you organize that data and what kind of auxiliary data structures are used. We’ll start by understanding indexing in the RDBMS and then switch over and contrast that to how open table formats such as Iceberg work. At its core, an index speeds up queries by reducing the amount of IO required to execute the query. If a query needs one row but has to scan the entire table, then that will be really slow if the table is large. An index in an RDBMS is a type of B-tree and provides two main access patterns: a seek which traverses the tree looking for a specific row or a scan which iterates over the whole tree, or a range of the tree. We can broadly categorize indexes into two types in the traditional RDBMS: Clustered index . The index and the data aren’t separate, they are clustered together. The table is the clustered index. Like books on a library shelf arranged alphabetically by author. The ordering is the collection itself; finding “Tolkien” means walking directly to the T section, because the data is stored in that order.  Non-clustered index . The index is a separate (smaller) structure that points to the actual table rows. A secondary index is an example of this. Like in the library there may be a computer that allows you to search for books, and which tells you the shelf locations. Then we have table statistics such as estimated cardinalities and histograms which, as we’ll see further down, become very useful for the query optimizer to select an efficient execution plan. Let’s look at these three things: the clustered index, non-clustered index and column statistics. I could add heap-based tables and a bunch of other things but let’s try and be brief. Let’s imagine we have a User table, with UserId as the primary key, with multiple columns including FirstName, LastName, Country and Nationality. The table is a clustered (B-tree) index, which is sorted by UserId. Fig 1. The clustered index with a primary key of UserId as an auto-incrementing int. If you query the table by the primary key, the database engine can do a B-tree traversal and rapidly find the location of the desired row. For example, if I run SELECT * FROM User WHERE UserId = 100; then the database will do a clustered index seek, which means it traverses the clustered index by key, quickly finding the page which hosts the desired row. When rows are inserted and deleted, they must be added to the B-tree which may require new data pages, page splits and page merges when data pages become full or too empty. Fragmentation can occur over time such that the B-tree data pages are not laid out sequentially on disk, so index rebuilds and defragmentation can be needed. All this tree management is expensive but the benefits are worth it because in an OLTP workload, queries are overwhelmingly about finding or updating a single row, or a very small set of rows, quickly. A B-tree sorted by the primary key makes this possible: the cost of a lookup is only O(log n) regardless of table size, and once the leaf page is reached, the row is right there. That means whether the table has a thousand rows or a hundred million, a clustered index seek for UserId = 18764389 takes just a handful of page reads. The same applies to joins: if another table has a foreign key reference to UserId, the database can seek into the clustered index for each match, making primary-key joins fast and predictable. Inserts and deletes may cause page splits or merges, but the tree structure ensures the data remains in key order, so seeks stay efficient. So far so good, but what about when you want to query the Users table by country or last name? That leads us to secondary (non-clustered) indexes. My application also needs to be able to execute queries with predicates based on Country and LastName. A where clause on either of these two columns would result in a full table scan (clustered index scan) to find all rows which match that predicate. The reason is that the B-tree intermediate nodes simply contain no information about the Country  and LastName columns. To cater for queries that do not use the primary key, we could use one or more secondary indexes, which are separate B-trees which map specific non-PK columns to rows in the clustered index. In SQL Server, a non-clustered index that points to a clustered index stores the indexed column(s) value and clustering key, aka, the primary key of the match. I could create one secondary index on the column ‘LastName’ and another on ‘Country’. Fig 2. Secondary indexes map columns to the clustering key (primary key) so the query engine can perform a clustered index seek. Note that in small tables, secondary indexes are not used as it's quicker to scan the table. My SELECT * FROM Users WHERE LastName = ‘Martinez’ will use a lookup operation, which is a:  Seek on the secondary index B-tree to obtain the clustering key (PK) of the matching Martinez row. A seek on the clustered index (the table itself) using the clustering key (PK) to retrieve the full row.  Selectivity is important here . If the secondary index scan returns a small number of rows, this random IO of multiple index seeks is worth it, but if the scan returns a large number of rows, it might actually be quicker to perform a clustered index scan (full table scan).  Imagine if the table had 1 million rows and the distribution of the Country column matched the figure above, that is to say roughly 800,000 rows had ‘Spain’ in the Country column. 800,000 lookups would be far less efficient than one table scan. The degree of selectivity where either lookups from a secondary index vs a clustered index scan can be impacted depending on whether the database is running on a HDD or SSD, with SSDs tolerating more random IO than a HDD. It is up to the query optimizer to decide. Secondary indexes are ideal for high selectivity queries , but also queries that can be serviced by the secondary index alone. For example, if I only wanted to perform a count aggregation on Country, then the database could scan the Country secondary index, counting the number of rows with each country and never need to touch the much larger clustered index. We’ve successfully reduced the amount of IO to service the query and therefore sped it up. But we can also add “covering columns” to secondary indexes . Imagine the query SELECT Nationality FROM User WHERE Country = ‘Spain’ . If the user table is really wide and really large, then we can speed up the query by adding the Nationality column to the Country secondary index as a covering column . The query optimizer will see that the Country secondary index includes everything it needs and decides not to touch the clustered index at all. Covering columns make the index larger which is bad, but can remove the need to seek the clustered index which is good. These secondary indexes trade off read performance for space, write cost and maintenance cost overhead. Each write must update the secondary indexes synchronously, and sometimes these indexes get fragmented and need rebuilding. For an OLTP database, this cost can be worth it, and the right secondary index or indexes on a table can help support diverse queries on the same table. Just because I created a secondary index doesn’t mean the query optimizer will use it. Likewise, just because a secondary index exists doesn’t mean the query optimizer should choose it either. I have seen SQL Server get hammered because of incorrect secondary index usage. The optimizer believed the secondary index would help so it chose the “lookup” strategy, but it turned out that there were tens of thousands of matching rows (instead of the handful predicted), leading to tens of thousands of individual clustered index seeks. This can turn into millions of seeks in queries if the Users table exists in several joins. Table statistics, such as cardinalities and histograms are very useful here to help the optimizer decide which strategy to use. For example, if every row contains ‘Spain’ for country, the cardinality estimate will tell the optimizer not to use the secondary index for a  SELECT * FROM Users WHERE Country = ‘Spain’ query as the selectivity is not low enough. Best to do a table scan. Histograms are useful when the cardinality is skewed, such that some values have large numbers of rows while others have relatively few.  There is, of course, far more to write about this topic, way more nuances on indexing, plus alternatives such as heap-based tables, other trees such as LSM trees, and alternative data models such as documents and graphs. But for the purpose of this post, we keep our mental model to the following: Workload : optimized for point lookups, short ranges, joins of a small number of rows, and frequent writes/updates. Queries often touch a handful of rows, not millions. Data organization : tables are row-based; each page holds complete rows, making single-row access efficient. Clustered index : the table itself is a B-tree sorted by the primary key; lookups and joins by key are fast and predictable (O(log n)). Secondary (non-clustered) index : separate B-trees that map other columns to rows in the clustered index. Useful for high-selectivity predicates and covering indexes, but costly to maintain. Statistics : cardinalities and histograms guide the optimizer to choose between using an index or scanning the table. Trade-off : indexes cut read I/O but add write and maintenance overhead. In OLTP this is worth it, since selective access dominates. Perhaps the main point for this post is that secondary indexes allow for one table to support diverse queries. This is something that we cannot easily say of the open table formats as we’ll see next. Analytical systems have a completely different workload from OLTP. Instead of reading a handful of rows or updating individual records, queries in the warehouse or lakehouse typically scan millions or even billions of rows, aggregating or joining them to answer analytical questions. Where row-based storage in a sorted B-tree structure makes sense for OLTP, it is not efficient for analytical workloads. So instead, data is stored in a columnar format in large contiguous blocks with a looser global structure. Secondary indexes aren’t efficient either. Jumping from index to row for millions of matches would already be massively inefficient on a file system, but given OTFs are hosted on object storage, secondary indexing becomes even less useful. So the analytical workload makes row-based B-trees and secondary indexes the wrong tool for the job. Columnar systems flipped the model: store data in columns rather than rows , grouped into large contiguous blocks. Instead of reducing IO via B-tree traversal by primary key, or pointer-chasing through secondary indexes, IO reduction comes from effective data skipping during table scans. The columnar storage allows for whole columns to be skipped and depending on a few other factors, entire files can be skipped as well. Most data skipping happens during the planning phase, where the execution engine decides which files need to be scanned, and is often referred to as pruning . There are different types of pruning and as well as different levels of pruning effectiveness which all depend on how the data is organized, what metadata exists and any other auxiliary search optimization data structures that might exist. Let’s look at the main ingredients for effective pruning: the table structure itself and the auxiliary structures. Let’s focus on Iceberg for now to keep ourselves grounded in a specific table format design. I did an in-depth description of Iceberg internals based on v2 which is still valid today, if you want to dive in deep. To keep things simple, we’ll look at the organization of an Iceberg table at one point in time. Fig 3. An Iceberg table is queried based on a single snapshot, which forms a tree of metadata and data files. An Iceberg table also forms a tree, but it looks very different from the sorted B-tree of a clustered index. At the root is the metadata file, which records the current snapshot. That snapshot points to a manifest list, which in turn points to many manifest files, each of which references a set of data files. The data files are immutable Parquet (or ORC) files. A clustered index is inherently ordered by its key, but an Iceberg table’s data files have no intrinsic order, they’re just a catalogued set of Parquet files. Given this unordered set of files managed by metadata, performance comes from effective pruning. Effective pruning comes down to data locality that aligns with your queries . If I query for rows with Country = ‘Spain’ , but the Spanish data is spread across all the files, then the query will have to scan every file. But if all the Spanish data is grouped into one small subset of the entire data set, then the query only has to read that subset, speeding it up. We can split the discussion of effective pruning into: The physical layout of the data . How well the data locality and sorting of the table aligns with its queries. Also the size and quantity of the data files. Metadata available to the query planner . The metadata and auxiliary data structures used by the query planner (and even during the execution phase in some cases) to prune files and even blocks within files. An Iceberg table achieves data locality through multiple layers of grouping, both across files and within files (via Parquet or ORC): Partitioning : groups related rows together across a subset of data files, so queries can target only the relevant partitions. Sorting : orders rows within each partition so that values that are logically close are also stored physically close together within the same file or even the same row group inside a file. Partitioning is the first major grouping lever available. It determines how data files are organized into data files dividing the table into logical partitions based on one or more columns, such as a date (or month/year), a region, etc. All rows sharing the same partition key values are written into the same directory or group of files. This creates data locality such that when a query includes a filter on the partition column (for example, WHERE EventDate = '2025-10-01' ), the engine can identify exactly which partitions contain relevant data and ignore the rest. This process is known as partition pruning , and it allows the engine to skip scanning large portions of the dataset entirely. The more closely the partitioning aligns with typical query filters, the more effective pruning becomes. Great care must be taken with the choice of partition key as the number of partitions impacts the number of data files (with many small files causing a performance issue). Within each partition, the data files have no inherent order by themselves. This is where sort order becomes important. Sorting defines how rows are organized within each partition, typically by one or more columns that are frequently used in filter predicates or range queries. For example, if queries often filter by EventTime or Country, sorting by those columns ensures that rows with similar values are stored close together. Sorting improves data locality both within files and across files: Within files, sorting determines how rows are organized as data is written, ensuring that similar values are clustered together. This groups similar data together (according to sort key) in Parquet row groups and makes row group level pruning more effective. Sort order can influence grouping across data files during compaction. When files are written in a consistent sort order, each file tends to represent a narrow slice of the sorted key space. For example, the rows of Spain might slot into a single file or a handful of files when Country is a sort key. Fig 4. A table with three partitions by Country and sort order by Surname. Sort order is preserved, but less effective on newly written small files. Compactions can sort rows of a large group of files into fewer larger files with ordering preserved across them. Partitioning is a strong, structural form of grouping that is enforced at all times. Once data is written into partitions, those boundaries are fixed and guaranteed, making partition pruning a reliable and predictable optimization. Sort order, on the other hand, is a more dynamic process that becomes most effective after compaction. Each write operation typically sorts its own batch of data before creating new files, but this sorting happens in isolation. Across multiple writes, even if all use the same declared sort order, there’s no partition-wide reordering of existing data. As a result, files within a partition may each be internally sorted but not well ordered relative to one another. Compaction resolves this by rewriting many files into fewer, larger ones while enforcing a consistent sort order across them. Only then does the sort order become a powerful performance optimization for data skipping. Delta Lake has similar features, with partitioning and sorting. Delta supports Z-ordering, which is a multidimensional clustering technique. Instead of sorting by a single column, Z-order interleaves the bits of multiple column values (for example, Country, Nationality, and EventDate) into a single composite key that preserves spatial locality. This means that rows with similar combinations of those column values are likely to be stored close together in a file, even if no single column is globally sorted. Z-ordering is particularly effective for queries that filter on multiple dimensions simultaneously. How rows are organized across files and within files impacts pruning massively. Next let's look at how metadata and auxiliary data structures help the query planner make use of that organization. Just like in the RDBMS, column statistics are critically important, however, for the open table format, they are even more important. Column statistics are the primary method that query engines use to prune data files and row groups within files. In Iceberg we have two sources of column statistics: Metadata files : Manifest files list a set of data files along with per-column min/max values. Query engines can use these column stats to skip entire files during the planning phase based on the filter predicates. Data files : A Parquet file is divided into row groups, each containing a few thousand to millions of rows stored column by column. For every column in a row group, Parquet records min/max statistics, and query engines can use these to skip entire row groups when the filter predicate falls outside the recorded range. Fig 4. Parquet splits up the data into one or more row groups, and within each row group, each column is stored contiguously. Min/max column statistics are a mirror of how the data is laid out . When data is clustered/sorted by a column, that column’s statistics reflect that structure: each file or row group will capture a narrow slice of the column’s value range. For example, if data is sorted by Country, one file might record a min/max of ["France"–"Germany"] while another covers ["Spain"–"Sweden"]. These tight ranges make pruning highly effective. But when data isn’t organized by that column and all countries are mixed randomly across files, the statistics show it. Each file’s min/max range becomes broad, often spanning most of the column’s domain. The result is poor selectivity and less effective pruning, because the query planner can’t confidently skip any files. Simply put, column statistics are a low cost tool for the query planner to understand the data layout and plan accordingly. There are other auxiliary structures that enhance pruning and search efficiency, beyond min/max statistics.  Bloom filters are one such mechanism. A Bloom filter is a compact probabilistic data structure that can quickly test whether a value might exist in a data file (or row group). If the Bloom filter says “no,” the engine can skip reading that section entirely; if it says “maybe,” the data must still be scanned. Iceberg and Delta can store Bloom filters at the file or row-group level, providing an additional layer of pruning beyond simple min/max range checks. Bloom filters are especially useful on columns that are not part of the sort order (so min/max stats are less effective) or exact match predicates. Some systems also maintain custom or proprietary sidecar files that act as lightweight search/data skipping structures. These can include: More fine-grained statistics files, such as histograms. Table level Bloom filters (Hudi). Primary-key-to-file index to support primary key tables (Hudi, Paimon). Upserts and deletes can be directed to specific files via the index. Other techniques involve creating optimized materialized projections, known as materialized views . These are precomputed datasets derived from base tables, often filtered, aggregated, or reorganized to match common query patterns. The idea is to store data in the exact shape queries need, so that the engine can answer those queries without repeatedly scanning or aggregating the raw data. They effectively trade storage and maintenance cost for speed, just like secondary indexes do in the RDBMS. Some engines implement these projections transparently, using them automatically when a query can be satisfied by a precomputed result, just like an RDBMS will choose a secondary index if the planner decides it is more efficient. This can be much more powerful than just a materialized view as a separate table. One example is Dremio’s Reflections , though there may already be other instances of this in the wild among the data platforms. Fig 5. Materialized views help support diverse queries that the base table, with its one layout, cannot. Making such materialized views transparent to the user, by allowing the query planner to use the MV instead of the base table is a powerful approach, akin to the use of secondary indexes. Open table formats like Iceberg, Delta, and Hudi store data in immutable, columnar files, optimized for large analytical scans. Query performance depends on data skipping (pruning), which is the ability to avoid reading irrelevant files or row groups based on metadata. Pruning effectiveness depends on data layout. Data layout levers: Partitioning provides strong physical grouping across files, enabling efficient partition pruning when filters match partition keys. Sorting improves data locality within partitions, tightening column value ranges and enhancing row-group-level pruning. Compaction consolidates small files and enforces consistent sort order, making pruning more effective (and reducing the small file cost that partitioning can sometimes introduce). Z-ordering (Delta) and Liquid Clustering (Databricks) extend sorting to multi-dimensional and adaptive clustering strategies. Column statistics in Iceberg manifest files and Parquet row groups drive pruning by recording min/max values per column. The statistics reflect the physical layout. Bloom filters add another layer of pruning, especially for unsorted columns and exact match predicates.  Some systems maintain sidecar indexes such as histograms or primary-key-to-file maps for faster lookups (e.g., Hudi, Paimon). Materialized views and precomputed projections further accelerate queries by storing data in the shape of common query patterns (e.g., Dremio Reflections). These require some data duplication and data maintenance, and are the closest equivalent (in spirit) to the secondary index of an RDBMS. The physical layout of the individual table isn’t the whole story. Data modeling plays just as critical a role. In a star schema, queries follow highly predictable patterns: they filter on small dimension tables and join to a large fact table on foreign keys. That layout makes diverse analytical queries fast without the need for secondary indexes. The dimensions are small enough to be read in full (and often cached), so indexing them provides no real gain. Fig 6. The central fact table contains information such as sales, with foreign keys to the dimension tables and facts such as sale amount, quantity, etc. Dimensions may include products, customers, vendors, dates and so on. A product dimension will have a ProductId and a number of columns describing aspects of the product such as manufacturer, SKU, etc. In an Iceberg-based star schema, partitioning should follow coarse-grained dimensions like time, such as days(sale_ts) or months(sale_ts) or by region, rather than high-cardinality surrogate IDs which would fragment the table too much. However, Iceberg’s bucketing partition option can still turn surrogate IDs into large boundaries, such as bucket(64, customer_id) . Sorting within partitions by the most selective keys like customer_id, product_id, or sale_ts makes Iceberg’s column statistics far more effective for skipping data in the large fact table. The goal is to make the table’s physical structure mirror the star schema’s logical access paths, so Iceberg’s metadata and statistics work efficiently. The star-schema can serve queries on various predicates efficiently, without needing ad hoc indexes for every possible predicate. The tradeoff is the complexity and cost of transforming data into this star schema (or other scheme). If you squint, the clustered index of a relational database and the Parquet and metadata layers of an open table format such as Iceberg share a common goal: minimize the amount of data scanned . Both rely on structure and layout to guide access, the difference is that instead of maintaining B-tree structures, OTFs lean on looser data layout and lightweight metadata to guide search (pruning being a search optimization). This makes sense for analytical workloads where queries often scan millions of rows: streaming large contiguous blocks is cheap, but random pointer chasing and maintaining massive B-trees is not. Secondary indexes, so valuable in OLTP, add little to data warehousing. Analytical queries don’t pluck out individual rows, they aggregate, filter, and join across millions.  Ultimately, in open table formats, layout is king. Here, performance comes from layout (partitioning, sorting, and compaction) which determines how efficiently engines can skip data, while modeling, such as star or snowflake schemas, make analytical queries more efficient for slicing and dicing. Unlike a B-tree, order in Iceberg or Delta is incidental, shaped by ongoing maintenance. Tables drift out of shape as their layout diverges from their queries, and keeping them in tune is what truly preserves performance. Since the OTF table data exists only once, its data layout must reflect its dominant queries. You can’t sort or cluster the table twice without making a copy of it. But it turns out that copying, in the form of materialized views, is a valuable strategy for supporting diverse queries over the same table, as exemplified by Dremio Reflections. These make the same cost trade offs as secondary indexes: space and maintenance for read speed.  Looking ahead, the open table formats will likely evolve by formalizing more performance-oriented metadata. The core specs may expand to include more diverse standardized sidecar files such as richer statistics so engines can make more effective pruning decisions. But secondary data structures like materialized views are probably destined to remain platform-specific. They’re not metadata; they’re alternate physical representations of data itself, with their own lifecycles and maintenance processes. The open table formats focus on representing one canonical table layout clearly and portably. Everything beyond that such as automated projections and adaptive clustering is where engines will continue to differentiate. In that sense, the future of the OTFs lies not in embedding more sophisticated logic, but in providing the hooks for smarter systems to build on top of them.  What, then, is an “index” in this world? The closest thing we have is a Bloom filter, with column statistics and secondary materialized views serving the same spirit of guiding the query planner toward efficiency. Traditional secondary indexes are nowhere to be found, and calling an optimized Iceberg table an index (in clustered index sense) stretches the term too far. But language tends to follow utility, and for lack of a better term, “index” will probably keep standing in for the entire ecosystem of metadata, statistics, and other pruning structures that make these systems more efficient. In the future I’d love to dive into how Hybrid Transactional Analytical Processing (HTAP) systems deal with the fundamentally different nature of transactional and analytical queries. But that is for a different post. Thanks for reading. ps, if you want to learn more about table format internals, I’ve written extensively on it: The ultimate guide to table format internals - all my writing so far Why we can’t just add more secondary indexes to speed up analytical queries like we can often do in RDBMS databases. How the analytics workload drives the design of the table format in terms of core structure and auxiliary files (and how that differs to the RDBMS).  How we organize the data itself. The use of auxiliary data structures over that data organization. Clustered index . The index and the data aren’t separate, they are clustered together. The table is the clustered index. Like books on a library shelf arranged alphabetically by author. The ordering is the collection itself; finding “Tolkien” means walking directly to the T section, because the data is stored in that order.  Non-clustered index . The index is a separate (smaller) structure that points to the actual table rows. A secondary index is an example of this. Like in the library there may be a computer that allows you to search for books, and which tells you the shelf locations. Seek on the secondary index B-tree to obtain the clustering key (PK) of the matching Martinez row. A seek on the clustered index (the table itself) using the clustering key (PK) to retrieve the full row.  Workload : optimized for point lookups, short ranges, joins of a small number of rows, and frequent writes/updates. Queries often touch a handful of rows, not millions. Data organization : tables are row-based; each page holds complete rows, making single-row access efficient. Clustered index : the table itself is a B-tree sorted by the primary key; lookups and joins by key are fast and predictable (O(log n)). Secondary (non-clustered) index : separate B-trees that map other columns to rows in the clustered index. Useful for high-selectivity predicates and covering indexes, but costly to maintain. Statistics : cardinalities and histograms guide the optimizer to choose between using an index or scanning the table. Trade-off : indexes cut read I/O but add write and maintenance overhead. In OLTP this is worth it, since selective access dominates. The physical layout of the data . How well the data locality and sorting of the table aligns with its queries. Also the size and quantity of the data files. Metadata available to the query planner . The metadata and auxiliary data structures used by the query planner (and even during the execution phase in some cases) to prune files and even blocks within files. Partitioning : groups related rows together across a subset of data files, so queries can target only the relevant partitions. Sorting : orders rows within each partition so that values that are logically close are also stored physically close together within the same file or even the same row group inside a file. Within files, sorting determines how rows are organized as data is written, ensuring that similar values are clustered together. This groups similar data together (according to sort key) in Parquet row groups and makes row group level pruning more effective. Sort order can influence grouping across data files during compaction. When files are written in a consistent sort order, each file tends to represent a narrow slice of the sorted key space. For example, the rows of Spain might slot into a single file or a handful of files when Country is a sort key. Metadata files : Manifest files list a set of data files along with per-column min/max values. Query engines can use these column stats to skip entire files during the planning phase based on the filter predicates. Data files : A Parquet file is divided into row groups, each containing a few thousand to millions of rows stored column by column. For every column in a row group, Parquet records min/max statistics, and query engines can use these to skip entire row groups when the filter predicate falls outside the recorded range. More fine-grained statistics files, such as histograms. Table level Bloom filters (Hudi). Primary-key-to-file index to support primary key tables (Hudi, Paimon). Upserts and deletes can be directed to specific files via the index. Open table formats like Iceberg, Delta, and Hudi store data in immutable, columnar files, optimized for large analytical scans. Query performance depends on data skipping (pruning), which is the ability to avoid reading irrelevant files or row groups based on metadata. Pruning effectiveness depends on data layout. Data layout levers: Partitioning provides strong physical grouping across files, enabling efficient partition pruning when filters match partition keys. Sorting improves data locality within partitions, tightening column value ranges and enhancing row-group-level pruning. Compaction consolidates small files and enforces consistent sort order, making pruning more effective (and reducing the small file cost that partitioning can sometimes introduce). Z-ordering (Delta) and Liquid Clustering (Databricks) extend sorting to multi-dimensional and adaptive clustering strategies. Column statistics in Iceberg manifest files and Parquet row groups drive pruning by recording min/max values per column. The statistics reflect the physical layout. Bloom filters add another layer of pruning, especially for unsorted columns and exact match predicates.  Some systems maintain sidecar indexes such as histograms or primary-key-to-file maps for faster lookups (e.g., Hudi, Paimon). Materialized views and precomputed projections further accelerate queries by storing data in the shape of common query patterns (e.g., Dremio Reflections). These require some data duplication and data maintenance, and are the closest equivalent (in spirit) to the secondary index of an RDBMS.

0 views
Jack Vanlightly 2 months ago

Understanding Apache Fluss

This is a data system internals blog post. So if you enjoyed my table formats internals blog posts , or writing on Apache Kafka internals or Apache BookKeeper internals , you might enjoy this one. But beware, it’s long and detailed. Also note that I work for Confluent, which also runs Apache Flink but does not run nor contributes to Apache Fluss. However, this post aims to be a faithful and objective description of Fluss. Apache Fluss is a table storage engine for Flink being developed by Alibaba in collaboration with Ververica. To write this blog post, I reverse engineered a high level architecture by reading the Fluss code from the main branch (and running tests), in August 2025. This follows my same approach to my writing about Kafka, Pulsar, BookKeeper, and the table formats (Iceberg, Delta, Hudi and Paimon) as the code is always the true source of information. Unlike the rest, I have not had time to formally verify Fluss in TLA+ or Fizzbee, though I did not notice any obvious issues that are not already logged in a GitHub issue. Let’s get started. We’ll start with some high level discussion in the Fluss Overview section, then get into the internals in the Fluss Cluster Core Architecture and Fluss Lakehouse Architecture sections. In simplest terms, Apache Fluss has been designed as a disaggregated table storage engine for Apache Flink. It runs as a distributed cluster of tablet and coordinator servers, though much of the logic is housed in client-side modules in Flink. Fig 1. Fluss architecture components Fluss provides three main features: Low-latency table storage Append-only tables known as Log Tables. Keyed, mutable tables, known as Primary Key Tables (PK tables), that emit changelog streams. Tiering to lakehouse table storage Currently to Apache Paimon (Apache Iceberg coming soon). Client-side abstractions for unifying low-latency and historical table storage. Fig 2. Fluss logical model This post uses terminology from A Conceptual Model for Storage Unification , including terms such as internal tiering, shared tiering, materialization, and client/server-side stitching. This post also uses the term “real-time” mostly to distinguish between lower-latency hot data and colder, historical data. Last year I did a 3-part deep dive into Apache Paimon (a lakehouse table format), which was born as a table storage engine for Flink (originally named Flink Table Store). Paimon advertises itself today as “ A lake format that enables building a Realtime Lakehouse Architecture ”. Now Apache Fluss is being developed, also with the aim of being the table storage engine for Flink. Both projects (Paimon and Fluss) offer append-only tables and primary key tables with changelogs. So why do we need another table storage solution for Flink?  The answer is a combination of efficiency and latency. Paimon was specifically designed to be good at streaming ingestion (compared to Iceberg/Delta), however, it is still relatively slow and costly for real-time data. Paimon is also not particularly efficient at generating changelogs, which is one of Flink’s primary use cases. This is not a criticism of Paimon, only a reality of building decentralized table stores directly on object storage. Fluss in many ways is the realization that an object-store-only table format is not enough for real-time data. Apache Fluss is a table storage service initially designed to replace or sit largely in front of Paimon, offering lower latency table storage and changelogs. In the background, Fluss offloads data to Paimon. Fluss can also rely on its own internal tiering without any offloading to Paimon at all. Having two tiering mechanisms can sometimes be confusing, but we’ll dig into that later. Fig 3. Fluss and Paimon forming real-time/historical table storage for Flink With the introduction of a fast tier (real-time) and a larger slower tier (historical), Flink needs a way to stitch these data stores together. Fluss provides client-side modules that provide a simple API to Flink, but which under the hood, do the work of figuring out where the cut-off exists between real-time and historical data, and stitching them together. One of Flink’s main roles is that of a changelog engine (based on changes to a materialized view). A stateful Flink job consumes one or more input streams, maintains a private materialized view (MV), and emits changes to that materialized view as a changelog (such as a Kafka topic). Fig 4. Flink as a changelog engine However, the state maintained by Flink can grow large, placing pressure on Flink state management, such as its checkpointing and recovery. A Flink job can become cumbersome and unreliable if its state grows too large. Flink 2.0 has introduced disaggregated state storage (also contributed by Alibaba) which aims to solve or at least mitigate the large state problem by offloading the state to an object store. In this way Flink 2.0 should be able to better support large state Flink jobs. In the VLDB paper Disaggregated State Management in Apache Flink® 2.0 , the authors stated that “ we observe up to 94% reduction in checkpoint duration, up to 49× faster recovery after failures or a rescaling operation, and up to 50% cost savings. ” Paimon and Fluss offer a different approach to the large state problem. Instead of only offloading state, they expose the materialized data itself as a shared table that can be accessed by other jobs. This turns what was previously private job state into a public resource, enabling new patterns such as lookup joins. Fig 5. Flink table storage project evolution Paimon was the first table storage engine for Flink. It provides append-only tables, primary key tables and changelogs for primary key tables. If you are interested in learning about Paimon internals then I did a pretty detailed deep dive and even formally verified the protocol . While Paimon is no doubt one of the best table formats for streaming ingest, it still has limits. One of the main limitations is how it supports changelogs, which is a concern of the Paimon writer (or compactor). Maintaining a changelog may require lookups against the target Paimon table (slow) and caching results (placing memory pressure on Flink). Alternatives include generating changelogs based on full compactions, which add latency, can combine changes (losing some change events) and significantly impact compaction performance. In short, Paimon didn’t nail the changelog engine job and remains higher latency than regular databases and event streams. Fluss provides these very same primitives, append-only table, primary key table and changelog, but optimized for lower latency and better efficiency than Paimon (for small real-time writes). Fluss stores the primary key table across a set of tablet servers with RocksDB as the on-server storage engine. Fluss solves Paimon’s changelog issues by efficiently calculating the changes based on existing data stored in RocksDB. This is more efficient than Paimon’s lookups and higher fidelity (and more efficient) than Paimon’s compaction based changelog generation. Whether Fluss is a better changelog (and state management) engine than Flink 2.0 with disaggregated state storage remains to be seen, but one really nice thing about a Fluss Primary Key table (and Paimon PK table for that matter) is that it turns the formerly private MV state of a Flink job into a shared resource for other Flink jobs. Providing support for lookup joins where before another Flink job would need to consume the changelog. Fig 6. Fluss PK Table offloads MV/changelog state management from Flink and provides a shared data primitive for other Flink jobs, with lookup (lookup join) support Append-only tables are based on an adapted copy the Apache Kafka log replication and controller code. Where Fluss diverges is that Fluss is a table storage engine, it exposes a table API, whereas Kafka gives you untyped byte streams where schemas are optional. Kafka is extremely permissive and unopinionated about the data it replicates. One of the key features missing from Kafka if you want to use it as an append-only table storage engine, is being able to selectively read a subset of the columns or a subset of the rows as you can with a regular database. Fluss adds this capability by enforcing tabular schemas and serializing records in a columnar format (Arrow IPC). This allows clients to include projections (and in the future, filters too) in their read requests, which get pushed down to the Fluss storage layer. With enforced tabular schemas and columnar storage, Kafka log replication can be made into a simple append-only table storage solution. Fig 7. The Fluss client serializes record batches into columnar Arrow IPC batches which are appended to segment files by the tablet servers. On the other end, the client that reads the batches converts the columnar Arrow into records again. While Fluss storage servers can pushdown projections to the file system reading level, this doesn’t apply to tiered segment data which would make column pruning on small batches prohibitively expensive. As we’ll see in this deep dive, Fluss doesn’t just build on Kafka for append-only tables (log tables) but also for the changelogs of primary key tables and as the durability mechanism for PK Tables. With the addition of Fluss as a lower latency table storage engine, we now have the problem of stitching together real-time and historical data. As I described in my Conceptual Model for Storage Unification , it comes down to two main points: API abstraction . Stitching together the different physical storage into one logical model. Physical storage management (tiering, materialization, lifecycle management). Fig 8. Fluss real-time and historical data Fluss places the stitching and conversion logic client-side, with interactions with the Fluss coordinators to discover the necessary metadata.  The physical storage management is split up between a trifecta of: Fluss storage servers (known as Tablet servers) Fluss coordinators (based on the ZooKeeper Kafka controller) Fluss clients (where conversion logic is housed) Most storage management involves tiering and backup: Internal tiering is performed by storage servers themselves, much like tiered storage in Kafka. Internal tiering allows for the size of Fluss-cluster-hosted log tables and PK table changelogs to exceed the storage drive capacity of the storage servers. RocksDB state is not tiered and must fit on disk. RocksDB snapshots are taken periodically and stored in object storage, for both recovery and a source of historical reads for clients (but this is not tiering). Lakehouse tiering is run as a Flink job, using Fluss libraries for reading from Fluss storage and writing to Paimon. Fluss coordinators are responsible for managing tiering state and assigning tiering work to tiering jobs. Schema evolution, a critical part of storage management, is on the roadmap, so Fluss may not be ready for most production use cases quite yet. With that introduction, let’s dive into the internals to see how Fluss works. We’ll start by focusing on the Fluss cluster core architecture and then expand to include the lakehouse integration. Fluss has three main components: Tablet servers , which form the real-time storage component. Coordinator servers , which are similar to KRaft in Kafka, storing not only general metadata but acting as a coordination layer. Fluss clients, which present a read/write API to Flink, and do some of the unification work to meld historical and real-time data. The clients interact with both the Fluss coordinators and Fluss tablet servers, as well as reading directly from object storage. Fig 9. Fluss core architecture (without lakehouse or Flink) A Fluss Log Table is divided into two logical levels that define how data is sharded:  At the top level, the table is divided into partitions , which are logical groupings of data by partition columns, similar to partitions in Paimon (and other table formats). A partition column could be a date column, or country etc. Within each partition, data is further subdivided into table buckets , which are the units of parallelism for writes and reads; each table bucket is an independent append-only stream and corresponds to a Flink source split (discussed later). This is also how Paimon divides the data within a partition, aligning the two logical models closely. Fig 10. Log table sharding scheme. Each table bucket is physically stored as a replicated log tablet by the Fluss cluster . A log tablet is the equivalent of a Kafka topic partition, and its code is based on an adapted copy of Kafka and its replication protocol.  When a Fluss client appends to a Log Table, it must identify the right table partition and table bucket for any given record. Table partition selection is based on the partition column(s), but bucket selection is based on a similar approach to Kafka producers, using schemes such as round-robin, sticky or hashes of a bucket key. Fig 11. Table partition and table bucket selection when writing. Fluss clients write to table buckets according to the partition columns(s) to choose the table partition, then a bucket scheme, such as round-robin, sticky, hash-based (like Kafka producers choosing partitions). Each log tablet is a replicated append-only log of records, built on the Kafka replication protocol. It is the equivalent of a Kafka topic partition. So by default, each log tablet has three replicas, with one leader and two followers. Fig 12. Log Tablet replication is based on Kafka partition replication. It has the concept of the high watermark, and the ISR though the recent work of hardening the protocol against simultaneous correlated failures is not included (as it is based on the old ZooKeeper controller, not KRaft). Fluss optionally stores log table batches in a columnar format to allow for projection pushdown to the filesystem level, as well as obtaining the other benefits of Arrow. Fluss clients accumulate records into batches and then serialize each batch into Arrow vectors, using Arrow IPC . Log tablet replicas append these Arrow IPC record batches to log segment files on disk (as-is). These Arrow record batches are self-describing, with metadata to allow the file reader to be able to read only the requested columns from disk (when a projection is included in a fetch request). Projection pushdown consists of a client including a column projection in its fetch requests, and that projection getting pushed all the way down to the file storage level, that prunes unwanted columns while reading data from disk. This avoids network IO, but may come with additional storage IO overhead if columns of the projection are widely fragmented across a file. Storing log table data as a sequence of concatenated Arrow IPC record batches is quite different to using a single Parquet file as a segment file. A segment file with 100 concatenated Arrow IPC record batches stores each batch as its own self-contained columnar block, so reading a single column across the file requires touching every batch’s metadata and buffers, whereas a Parquet file lays out columns contiguously across the entire file, allowing direct, bulk column reads with projection pushdown. This adds some file IO overhead compared to Parquet and makes pruning columns in tiered log segments impractical and expensive. But serializing batches into Parquet files on the client is also not a great choice, so this approach of Arrow IPC files is a middleground. It may be possible in the future to use compaction to rewrite segment files into Parquet for more optimal columnar access of tiered segment data. On the consumption side, Fluss clients convert the Arrow vectors back into rows. This columnar storage is entirely abstracted away from application code, which is record (row) based. Each log record has a change type, which can be used by Flink and Paimon in streaming jobs/streaming ingestion: +I, Insert  +U, Update After -U, Update Before For Log Tables, the Fluss client assigns each record the +A change type. The remaining change types are used for the changelogs of PK tables which is explained in the PK Table section. Fluss has two types of tiering: Internal Tiering (akin to traditional tiered storage), which is covered in this section. Lakehouse Tiering , which is covered in the next major section, Fluss Lakehouse Architecture. Tablet servers internally tier log tablet segment files to object storage, which matches the general approach of Kafka. The difference is that when a Fluss client fetches from an offset that is tiered, the log tablet returns only the metadata of a set of tiered log segments, so the Fluss client can download those segments itself. This offloads load from Fluss servers during catch-up reads. Fig 13. Log tablet with internal segment tiering and replication. Fluss made the decision to place some of the logic on the client for melding local data on tablet server disks with internally tiered data on object storage. This diverges from the Kafka API which does not support this, thus placing the burden of downloading tiered segments on the Kafka brokers. The actual tiering work is performed as follows. Each tablet server has a RemoteLogManager which is responsible for tiering segments to remote storage via log tiering tasks. This RemoteLogManager can only trigger tiering tasks for log tablets that have their leader replicas on this server. Each tiering task works as follows: A task is scoped to a single log tablet, and identifies local log segment files to upload (and remote ones to expire based on TTL). It uploads the target segment files to object storage. It commits them by: Creating a new manifest file with the metadata of all current remote segments. Writes the manifest file to object storage. Sends a CommitRemoteLogManifestRequest to the coordinator, which contains the path to the manifest file in remote storage (the coordinator does not store the manifest itself). Once committed, expired segment files are deleted in object storage. Asynchronously, the coordinator notifies the log tablet replica via a NotifyRemoteLogOffsetsRequest so the replica knows: The offset range that is tiered (so it knows when to serve tiered metadata to the client, and when to read from disk). What local segments can be deleted. As I mentioned earlier, because clients download tiered segment files, the network IO benefits of columnar storage are limited to the hottest data stored on Fluss tablet servers. Even if a client doing a full table scan only needs one column, it must still download entire log segment files. There is no way around this except to use a different columnar storage format than Arrow IPC with N record batches per segment file. It’s worth understanding a little about Flink’s Datastream Source API, in order to understand how Flink reads from Fluss.  Splits : Flink distributes the work of reading into splits which are independent chunks of input (e.g. a file, topic partition).  Split enumeration : A split enumerator runs on the JobManager and discovers source inputs, generating corresponding splits, and assigning them to reader tasks. Readers : Each TaskManager runs a source reader, which reads the sources described by its assigned splits and emits records to downstream tasks. This design cleanly separates discovery & coordination (enumerator) from data reading (readers), while keeping splits small and resumable for fault tolerance. When Flink uses Fluss as a source, a Fluss split enumerator runs on the JobManager to discover and assign splits (which describe table buckets). Each TaskManager hosts a source reader, which uses a split reader to fetch records from its assigned table bucket splits and emit them downstream. Fig 14. Fluss integration of Log Table via Flink Source API (without lakehouse integration). In this way, a Log Table source parallelizes the reading of all table buckets of the table, emitting the records to the next operators in the DAG. While log tablets are built on an adapted version of Kafka replication, there are some notable differences: Fluss uses two levels of sharding :  Table partitions, via a partition column(s). Multiple table buckets per partition. Obligatory tabular schemas : A Fluss table must have a flat table schema with primitive types (structs, arrays, maps on the roadmap). Columnar storage: Allowing for projection pushdown (which also depends on a schema). Tiered storage (internal tiering) : Clients download tiered segments (Tablet servers only serve tiered segment metadata to clients). Fluss has no automatic consumer group protocol . This role is performed by Flink assigning splits to readers. A primary key table is also organized into two logical levels of partitions and table buckets. Clients write data to specific partitions and buckets as described in the log tablet section. However, the Primary Key table has a different API as it is a mutable table: Writes:  Upserts and Deletes to the table via a PutKV API. Lookups and Prefix Lookups against the table. Changelog scans (which can involve a hybrid read of KV snapshot files plus changelog, discussed later). Each table bucket of a PK table is backed by a KV Tablet, which emits changes to a child log tablet. KV Tablet state is composed of: A RocksDB table for storing the keyed table state. A Log Tablet for storing the changelog. This inner log tablet also acts as a write-ahead log (WAL), as described in the Writing to a KV Tablet subsection. Fig 15. A KV tablet and its replicated child log tablet for the changelog. Unlike log tablets, KV tablets do not store data in Arrow format as data is stored in RocksDB. However, the child log tablet uses Arrow as normal. There are a few other notable differences to Log Tablets in how data is written, tiered and read, which is covered in the next subsections. The PutKV API accepts a KV batch which contains a set of key-value records. When the value is not null, Fluss treats this as an upsert, and when the value is null, it treats it as a delete. The KV tablet performs the write as follows: For each record in the batch: Perform a read against the RocksDB table to determine the change to be emitted to the changelog.  A change record could be: a DELETE record with the old row, if the write contains a null value. an UPDATE_BEFORE record with the original record and an UPDATE_AFTER record with the new record, if a record exists and the write has a non-null value. an INSERT with the new record, if no existing record exists. Buffer both the new changelog records and the RocksDB write in memory for now. Append all buffered changelog records (as an Arrow IPC record batch) to the child log tablet and wait for the batch to get committed (based on Kafka replication protocol). Once the change records are committed, perform the buffered RocksDB writes. The new records overwrite the existing records by default, but read about merge engines and partial updates in the next subsection. Durability : A log tablet is like a Kafka partition, it is replicated across multiple servers, with one leader and multiple followers. The KV tablet itself is unreplicated, though it does regularly upload RocksDB snapshot files to object storage. Therefore, the changelog acts as a replicated write-ahead log (WAL) for the KV tablet. If the server disk were to die, the state could be recovered by downloading the latest KV tablet snapshot file and replaying the changelog from the corresponding next offset (the last offset is stored with the snapshot). Interestingly, the KV tablet leader has no followers, and moves with the log tablet leader. When a leader election occurs in the log tablet, the KV tablet leader changes with it. The new KV tablet leader (on the tablet server of the new log tablet leader) must download the latest RocksDB snapshot file and replay the changelog to recover the state of the former KV leader. This means that large RocksDB state could be an issue for availability due to the large amount of state needing to be downloaded and replayed on each leader election. This design may change in the future. By default, PK tables have no merge engine. The new row overwrites the old row (via the DefaultRowMerger class) as described in the last subsection. But Fluss supports using the merge types FIRST_ROW and VERSIONED in PK tables. Each merge operation has an old row and new row (either of which could be null). Merge types: None: New row replaces old row. FIRST_ROW: Keep the old row, unless is it null, then take the new row.  VERSIONED: Take whichever row has the highest version (new row version is supplied by the client). The Flink source for Fluss can use FIRST_ROW, but VERSIONED doesn’t seem to be used anywhere yet. Partial update : An update does not need to include all columns, allowing for partial updates. Partial updates cannot be combined with merge types. If a write only contains a subset of the columns, the PartialUpdateRowMerger class is used instead of the DefaultRowMerger class. It goes column by column, and takes the column from the new row, if it exists, else it takes it from the old row, to create a new merged row. The keyed state in RocksDB cannot be tiered, it must fit entirely on disk. Snapshots are made and stored in object storage, but this is not tiering but for backup/recovery and historical reads. Therefore, the key cardinality should not be excessive. The changelog is a log tablet, and is tiered as described in the log table tiering subsection. Fig 16. KV Tablet (and child Log Tablet) read, write, tiering and replication. A Fluss client can send lookup and prefix lookup requests to the KV tablet leader, which get translated into RocksDB lookups and prefix lookups. It is also possible to scan the changelog, and there are different options from the starting offset here: Earliest offset : Read the entire change stream from start to finish, assuming infinite retention of the log tablet. Latest offset : Read only the latest changes from the log tablet (but will miss historical data). Full : Bootstrap from a table RocksDB snapshot then switch over to reading from the log tablet. When using the full mode, the process works as follows: The Fluss client contacts the coordinator server to find out the KV snapshot files, along with the changelog offset they correspond to. Fluss client downloads the RocksDB snapshot files and initializes a RocksDB instance based on the files. Fluss client iterates over the records of the RocksDB table, treating each record as an insert (+I) row kind. Fluss client switches to reading from the log tablet directly, starting from the next offset after the snapshot. This follows the same logic as with a log tablet, where the client may not receive actual batches, but metadata of tiered log segments. Of course, this is usually within the Flink source API architecture of split enumeration and split readers. Fig 17. Fluss integration to the Flink Source API. The difference between log table and primary key table source are highlighted in green. While Flink provides the HybridSource abstraction that allows for reading from a bounded source until completion then switching to an unbounded source, Fluss has chosen to implement this within the Split abstraction itself: For table buckets that are PK-based, the split enumerator also requests the metadata of the KV snapshots from the coordinator. It creates hybrid splits which contain the snapshot and log tablet metadata. For hybrid splits, each split reader first loads the bounded snapshot into a RocksDB instance and processes the records, then switches to the unbounded log tablet (the changelog). Schema evolution is listed under the roadmap . Fluss coordinators play a key role in the following functions: Regular cluster metadata Internal tiering metadata and coordination Lakehouse tiering coordination (see next section) Serving metadata directly to clients (for client-side stitching) Next we’ll see how the Fluss core architecture is extended to include the lakehouse. Before we begin, let’s define some terms to make the discussion more precise. These are terms that only exist in this post to make explaining things easier (the Fluss project does not define these abstractions): Logical table (log table, primary key table). Partitioned and bucketed. Fluss table . Served by a Fluss cluster. Stored on disk and internally tiered segments/snapshot files. Lakehouse table . A Paimon table (Iceberg coming soon), fed by Lakehouse tiering. Fig 18. Logical table maps onto a Fluss Table (hosted by Fluss cluster) and optionally, a Lakehouse Table Fluss is a set of client-side and server-side components, with lakehouse integration being predominantly client-side. We can break up lakehouse integration into the following pieces (which are somewhat interrelated): Lakehouse tiering : Copying from Fluss tables to Lakehouse tables. Storage unification : Unifying historical lakehouse data with real-time Fluss tablet server data A logical table could map to only a Fluss table, or it could map to a combination of a Fluss table and a Lakehouse table. It is possible for there to be overlap (and therefore duplication) of Fluss and Lakehouse table. As with internal tiering, only Log Tables and the changelogs of Primary Key Tables can be tiered to a lakehouse. Paimon itself converts the changelog stream back into a primary key table. Right now Apache Paimon is the main open table format (OTF) that is supported, due to its superior stream ingestion support, but Apache Iceberg integration is on the way. Lakehouse tiering is driven by one or more Flink jobs that use client-side Fluss libraries to: Learn of what tables must be tiered (via the Fluss coordinators) Do the tiering (read from Fluss table, write to Lakehouse table) Notify the coordinators of tiering task completion or failure, and metadata regarding lakehouse snapshot metadata and how it maps to Fluss table offset metadata. Fig 19. Flink-based lakehouse tiering components. A table tiering task is initiated by a lakehouse tiering process (run in Flink) sending a heartbeat to the coordinator, telling it that it needs work. The coordinator will provide it a table that needs tiering, which includes the metadata about the current lakehouse snapshot including the log offset of every table bucket that the lakehouse snapshot corresponds to. The tiering task will start reading from each table bucket from these offsets. Fig 20. Internal topology lakehouse tiering Flink job. The tiering read process reads (log scan) from the Fluss table and is essentially the same as described in the Log Table and Primary Key Table sections of the Fluss Core Architecture section.  For log tables, the table buckets are assigned as splits to the various source readers, which use the Fluss client to read the log tablet of each table bucket. Under the hood, the Fluss client fetches from tablet servers (the leader replica of each log tablet), which may return data payloads or tiered log segment metadata to the client. For primary key tables, the tiering job scans the changelog, generating hybrid splits and assigning them to the readers. The Fluss client in each reader will start by downloading the KV snapshot files (RocksDB) and iterate over the RocksDB table, and then will switch to fetching from the changelog log tablet. The tiering write process involves writing the records as data files to the lakehouse (using Paimon/Iceberg libraries). The specifics are not important here, but the write phase requires an atomic commit, and so newly written but not committed files are not part of the table yet. The tiering commit process involves the Paimon commit but also updates the Fluss coordinator with the committed lakehouse snapshot metadata and the last tiered offset of each table bucket of the Fluss table. Basically, we need coordination to ensure that tiering doesn’t skip or duplicate data, and that clients can know the switch over point from lakehouse to Fluss cluster. The coordinators in turn notify the tablet servers of the lakehouse offset of each table bucket, which is a key part of the storage unification story, as we’ll discuss soon. The Flink topology itself is pretty standard for writing to a lakehouse, with multiple reader tasks emitting records for multiple Paimon writer tasks, with a single Paimon committer task that commits the writes serially to avoid commit conflicts. There is one writer per Paimon bucket, and it seems plausible that the Paimon table partitioning and bucketing will match Fluss (though I didn’t get around to confirming that). Evolving Fluss and Paimon tables in unison will be required, but schema evolution is not implemented yet. Each logical table has a simple state machine that governs when it can be tiered. Fig 21. Table tiering job state transitions, managed by Fluss coordinators. The key to storage unification in Fluss is the compatibility of the different storage formats and the work of the client to stitch data from different storage and different storage formats together transparently under a simple API. This job of stitching the different storage together is shared across the Fluss client and Fluss-Flink module. Fig 22. Fluss uses client-side stitching at two layers of the client-side abstractions. So far I’ve avoided tackling this question. Fluss makes for an interesting case study. It uses two types of tiering: Tiering of Fluss storage (log segments) to object storage, performed by tablet servers. This is classic internal tiering. Tiering of Fluss storage to the lakehouse (Paimon), performed by Flink using Fluss client-side modules and coordinated by Fluss Coordinators. In my first pass I equated lakehouse tiering to shared tiering as described in my post A Conceptual Model for Storage Unification . In that post, I expressed reservations about the risks and complexity of shared tiering in general. My reservation comes from the complexity of using a secondary storage format to store the majority of the primary system’s data and the problems of performance due to not being able to optimize secondary storage for both primary and secondary systems. But I soon realized that Fluss lakehouse tiering is another form of internal tiering as both storage tiers serve the same analytical workload (and the same compute engine). A Fluss cluster is akin to a durable distributed cache in front of Paimon, where Flink reads/writes hot data from/to this durable distributed cache and reads cold data from Paimon. Together, both storage types form the primary storage. The Paimon table only needs to support analytical workloads and thus can make full use of the various table organization options in Paimon without penalizing another system. Additionally, the logical data model of the Fluss API is extremely close to both Flink and Paimon (by design), which reduces the costs and risk associated with the conversion of formats. Fig 22. Fluss cluster storage and Paimon storage form the primary storage for the primary system (Flink). This is very different to a Kafka architecture that might tier to a lakehouse format. Kafka serves event streams largely in event-driven architectures and Paimon serves as an analytics table. The two workloads are completely different (sequential vs analytical). Having Kafka place the majority of its data in a lakehouse table would present many of the issues of shared tiering discussed in my storage unification blog post, namely conversion risk and performance constraints due to using the same storage for completely different access patterns. Likewise, Kafka is unopinionated regarding payloads, with many being schemaless, or using any one of Avro, Protobuf, JSON and JSONSchema. Payloads can be arbitrarily nested, in any of the above formats complicating the bidirectional conversions between lakehouse formats and Kafka payloads. As I noted in the last subsection, lakehouse tiering is a form of internal tiering, as the lakehouse tier serves the same workload and compute engine. Of course, the Paimon table could be read by other systems too, so there is still room for some conflicting requirements around data organization, but far less between a purely sequential workload (like Kafka) and an analytical workload. However, curiously, Fluss now has two types of internal tiering: Tiering of log segment files (as-is). This is the direct-access form of tiering discussed in my conceptual model post. Tiering of log data (in its logical form), which is of the API-access form of tiering. The question is why does Fluss need both? And why are they not lifecycle linked? Fluss table storage is expired based on a TTL and each lakehouse format provides TTL functionality too. But data is not expired in Fluss storage once it has been copied to the lakehouse. Here are some thoughts: First of all, I am sure linking the lifecycles of Fluss table and lakehouse table data will eventually be implemented, as it looks pretty trivial. It might be more reliable to keep internal log segment tiering, despite also using lakehouse tiering, should the lakehouse become unavailable for a period. For example, the write availability of Fluss would be tightly linked to the availability of the lakehouse catalog without internal tiering acting as an efficient spill-over safety mechanism. Without lifecycle-linked tiers, lakehouse tiering actually has more in common with materialization (data copy not move). This is not a bad thing, there are cases where it might be advantageous to keep the lifecycle of tiered log segments separate from that of the lakehouse. For example, some Fluss clients may wish to consume Log Tables in a purely sequential fashion and might benefit from reading tiered log segments rather than a Paimon table. Fluss also places Kafka API support on the roadmap, which makes maintaining internal tiering and lakehouse tiering as separately lifecycle managed processes compelling (as it avoids the issues of conversion and performance optimization constraints). Kafka clients could be served from internally tiered log segments and Flink could continue to merge both Fluss table and lakehouse table data, thereby avoiding the pitfalls of shared tiering (as the lakehouse still only must support one master). This is another case where lakehouse tiering starts looking more like materialization when using Fluss as a Kafka compatible system. It can be confusing having two types of tiering, but it also allows for users to configure Fluss to support different workloads. If Fluss is only a lakehouse real-time layer, then it might make sense to only use lakehouse tiering. However, if Fluss needs to support different access patterns for the same data, then it makes sense to use both (using the materialization approach). Apache Fluss can be seen as a realization that Apache Paimon, while well-suited to streaming ingestion and materialized views, is not sufficient as the sole table storage engine for Flink. Paimon remains a strong option for large-scale, object-store-backed tables, but it falls short when low-latency streaming processing requires efficient changelogs and high throughput, small writes to table storage. Fluss is designed to fill this gap. It provides the low-latency tier with append-only and primary key tables, supports efficient changelog generation, and integrates with Paimon through tiering. Client-side modules in Flink then stitch these tiers together, giving a unified view of real-time and historical data. Fluss aims to broaden its support to include other analytics engines such as Apache Spark and other table formats such as Iceberg, making it a more generic real-time layer for lakehouses. It should probably eventually be seen as an extension to Paimon (et al) more than a table storage engine made specially for Flink. What stands out about Fluss is how it shifts between tiering and materialization depending on its role. As a real-time layer in front of a lakehouse, it functions more like a tiering system, since both Fluss cluster and the lakehouse serve the same analytical workload. If eventually it extends to support the Kafka API as an event streaming system, it would resemble a materialization approach instead. Fluss still faces several adoption challenges. Schema evolution is not yet supported, and lifecycle management remains limited to simple TTL-based policies rather than being tied to lakehouse tiering progress. Its replication-based design also inherits the same networking cost concerns that Kafka faces in cloud environments. Flink 2.0 disaggregated state storage solves the large state problem without networking costs, but the state remains private to the Flink job. Fluss has placed direct-to-object-storage writes on its roadmap and so should eventually close this problem for workloads with high networking costs. Finally, given that Fluss copies heavily from Kafka for its log tablet storage, it raises questions for the Apache Kafka community as well. Features such as columnar storage, projection pushdown, and stronger integration of schemas are all central to how Fluss turns a (sharded) log into an append-only table. While Kafka has traditionally remained unopinionated about payloads, there are benefits to adding schema-aware storage. It may be worth considering whether some of these ideas have a place in Kafka’s future too. Low-latency table storage Append-only tables known as Log Tables. Keyed, mutable tables, known as Primary Key Tables (PK tables), that emit changelog streams. Tiering to lakehouse table storage Currently to Apache Paimon (Apache Iceberg coming soon). Client-side abstractions for unifying low-latency and historical table storage. API abstraction . Stitching together the different physical storage into one logical model. Physical storage management (tiering, materialization, lifecycle management). Fluss storage servers (known as Tablet servers) Fluss coordinators (based on the ZooKeeper Kafka controller) Fluss clients (where conversion logic is housed) Internal tiering is performed by storage servers themselves, much like tiered storage in Kafka. Internal tiering allows for the size of Fluss-cluster-hosted log tables and PK table changelogs to exceed the storage drive capacity of the storage servers. RocksDB state is not tiered and must fit on disk. RocksDB snapshots are taken periodically and stored in object storage, for both recovery and a source of historical reads for clients (but this is not tiering). Lakehouse tiering is run as a Flink job, using Fluss libraries for reading from Fluss storage and writing to Paimon. Fluss coordinators are responsible for managing tiering state and assigning tiering work to tiering jobs. Tablet servers , which form the real-time storage component. Coordinator servers , which are similar to KRaft in Kafka, storing not only general metadata but acting as a coordination layer. At the top level, the table is divided into partitions , which are logical groupings of data by partition columns, similar to partitions in Paimon (and other table formats). A partition column could be a date column, or country etc. Within each partition, data is further subdivided into table buckets , which are the units of parallelism for writes and reads; each table bucket is an independent append-only stream and corresponds to a Flink source split (discussed later). This is also how Paimon divides the data within a partition, aligning the two logical models closely. +I, Insert  +U, Update After -U, Update Before Internal Tiering (akin to traditional tiered storage), which is covered in this section. Lakehouse Tiering , which is covered in the next major section, Fluss Lakehouse Architecture. A task is scoped to a single log tablet, and identifies local log segment files to upload (and remote ones to expire based on TTL). It uploads the target segment files to object storage. It commits them by: Creating a new manifest file with the metadata of all current remote segments. Writes the manifest file to object storage. Sends a CommitRemoteLogManifestRequest to the coordinator, which contains the path to the manifest file in remote storage (the coordinator does not store the manifest itself). Once committed, expired segment files are deleted in object storage. Asynchronously, the coordinator notifies the log tablet replica via a NotifyRemoteLogOffsetsRequest so the replica knows: The offset range that is tiered (so it knows when to serve tiered metadata to the client, and when to read from disk). What local segments can be deleted. Splits : Flink distributes the work of reading into splits which are independent chunks of input (e.g. a file, topic partition).  Split enumeration : A split enumerator runs on the JobManager and discovers source inputs, generating corresponding splits, and assigning them to reader tasks. Readers : Each TaskManager runs a source reader, which reads the sources described by its assigned splits and emits records to downstream tasks. Fluss uses two levels of sharding :  Table partitions, via a partition column(s). Multiple table buckets per partition. Obligatory tabular schemas : A Fluss table must have a flat table schema with primitive types (structs, arrays, maps on the roadmap). Columnar storage: Allowing for projection pushdown (which also depends on a schema). Tiered storage (internal tiering) : Clients download tiered segments (Tablet servers only serve tiered segment metadata to clients). Fluss has no automatic consumer group protocol . This role is performed by Flink assigning splits to readers. Writes:  Upserts and Deletes to the table via a PutKV API. Lookups and Prefix Lookups against the table. Changelog scans (which can involve a hybrid read of KV snapshot files plus changelog, discussed later). A RocksDB table for storing the keyed table state. A Log Tablet for storing the changelog. This inner log tablet also acts as a write-ahead log (WAL), as described in the Writing to a KV Tablet subsection. For each record in the batch: Perform a read against the RocksDB table to determine the change to be emitted to the changelog.  A change record could be: a DELETE record with the old row, if the write contains a null value. an UPDATE_BEFORE record with the original record and an UPDATE_AFTER record with the new record, if a record exists and the write has a non-null value. an INSERT with the new record, if no existing record exists. Buffer both the new changelog records and the RocksDB write in memory for now. Append all buffered changelog records (as an Arrow IPC record batch) to the child log tablet and wait for the batch to get committed (based on Kafka replication protocol). Once the change records are committed, perform the buffered RocksDB writes. The new records overwrite the existing records by default, but read about merge engines and partial updates in the next subsection. None: New row replaces old row. FIRST_ROW: Keep the old row, unless is it null, then take the new row.  VERSIONED: Take whichever row has the highest version (new row version is supplied by the client). Earliest offset : Read the entire change stream from start to finish, assuming infinite retention of the log tablet. Latest offset : Read only the latest changes from the log tablet (but will miss historical data). Full : Bootstrap from a table RocksDB snapshot then switch over to reading from the log tablet. The Fluss client contacts the coordinator server to find out the KV snapshot files, along with the changelog offset they correspond to. Fluss client downloads the RocksDB snapshot files and initializes a RocksDB instance based on the files. Fluss client iterates over the records of the RocksDB table, treating each record as an insert (+I) row kind. Fluss client switches to reading from the log tablet directly, starting from the next offset after the snapshot. This follows the same logic as with a log tablet, where the client may not receive actual batches, but metadata of tiered log segments. For table buckets that are PK-based, the split enumerator also requests the metadata of the KV snapshots from the coordinator. It creates hybrid splits which contain the snapshot and log tablet metadata. For hybrid splits, each split reader first loads the bounded snapshot into a RocksDB instance and processes the records, then switches to the unbounded log tablet (the changelog). Regular cluster metadata Internal tiering metadata and coordination Lakehouse tiering coordination (see next section) Serving metadata directly to clients (for client-side stitching) Logical table (log table, primary key table). Partitioned and bucketed. Fluss table . Served by a Fluss cluster. Stored on disk and internally tiered segments/snapshot files. Lakehouse table . A Paimon table (Iceberg coming soon), fed by Lakehouse tiering. Lakehouse tiering : Copying from Fluss tables to Lakehouse tables. Storage unification : Unifying historical lakehouse data with real-time Fluss tablet server data Learn of what tables must be tiered (via the Fluss coordinators) Do the tiering (read from Fluss table, write to Lakehouse table) Notify the coordinators of tiering task completion or failure, and metadata regarding lakehouse snapshot metadata and how it maps to Fluss table offset metadata. Tiering of Fluss storage (log segments) to object storage, performed by tablet servers. This is classic internal tiering. Tiering of Fluss storage to the lakehouse (Paimon), performed by Flink using Fluss client-side modules and coordinated by Fluss Coordinators. Tiering of log segment files (as-is). This is the direct-access form of tiering discussed in my conceptual model post. Tiering of log data (in its logical form), which is of the API-access form of tiering. First of all, I am sure linking the lifecycles of Fluss table and lakehouse table data will eventually be implemented, as it looks pretty trivial. It might be more reliable to keep internal log segment tiering, despite also using lakehouse tiering, should the lakehouse become unavailable for a period. For example, the write availability of Fluss would be tightly linked to the availability of the lakehouse catalog without internal tiering acting as an efficient spill-over safety mechanism. Without lifecycle-linked tiers, lakehouse tiering actually has more in common with materialization (data copy not move). This is not a bad thing, there are cases where it might be advantageous to keep the lifecycle of tiered log segments separate from that of the lakehouse. For example, some Fluss clients may wish to consume Log Tables in a purely sequential fashion and might benefit from reading tiered log segments rather than a Paimon table. Fluss also places Kafka API support on the roadmap, which makes maintaining internal tiering and lakehouse tiering as separately lifecycle managed processes compelling (as it avoids the issues of conversion and performance optimization constraints). Kafka clients could be served from internally tiered log segments and Flink could continue to merge both Fluss table and lakehouse table data, thereby avoiding the pitfalls of shared tiering (as the lakehouse still only must support one master). This is another case where lakehouse tiering starts looking more like materialization when using Fluss as a Kafka compatible system.

0 views
Jack Vanlightly 3 months ago

A Conceptual Model for Storage Unification

Object storage is taking over more of the data stack, but low-latency systems still need separate hot-data storage. Storage unification is about presenting these heterogeneous storage systems and formats as one coherent resource. Not one storage system and storage format to rule them all, but virtualizing them into a single logical view.  The primary use case for this unification is stitching real-time and historical data together under one abstraction. We see such unification in various data systems: Tiered storage in event streaming systems such as Apache Kafka and Pulsar HTAP databases such as SingleStore and TiDB Real-time analytics databases such as Apache Pinot, Druid and Clickhouse The next frontier in this unification are lakehouses, where real-time data is combined with historical lakehouse data. Over time we will see greater and greater lakehouse integration with lower latency data systems. In this post, I create a high-level conceptual framework for understanding the different building blocks that data systems can use for storage unification, and what kinds of trade-offs are involved. I’ll cover seven key considerations when evaluating design approaches. I’m doing this because I want to talk in the future about how different real-world systems do storage unification and I want to use a common set of terms that I will define in this post. From my opening paragraph, the word “virtualizing” may jump out at you, and that is where we’ll start. I posit that the primary concept behind storage unification is virtualization . Virtualization in software refers to the creation of an abstraction layer that separates logical resources from their physical implementation. The abstraction may allow one physical resource to appear as multiple logical resources, or multiple physical resources to appear as a single logical resource. We can use the term storage virtualization and data virtualization though for me personally I find the difference too nuanced for this post. I will use the term data virtualization. A virtualization layer can present a simple, unified API that stitches together different physical storage systems and formats behind the scenes. For example, the data may exist across filesystems with a row-based format and object storage in a columnar format, but the application layer sees one unified logical model. Data virtualization is the combination of: Frontend abstraction: Stitching together the different physical storage into one logical model. Backend work: Physical storage management (tiering, materialization, lifecycle management). Let’s dig into each in some more detail. How data is written and managed across these different storage mediums and formats is a key part of the data virtualization puzzle. This management includes: Data organization / format Data tiering Data materialization Data lifecycle “Data organization” is about how data is optimized for specific access patterns. Data could be stored in a row-based format, a graph-based format, a columnar format, and so on. Sometimes we might choose to store the same data in multiple formats in order to efficiently serve different query semantics (lookup, graph, analytics, changelogs). That is, we balance trade-offs between access semantics and the cost of writes, the cost of storage and the cost of reads. Trade-off optimization is a key part of system design. “Tiering” is about moving data from one storage tier (and possibly storage format) to another, such that both tiers are readable by the source system and data is only durably stored in one tier. While the system may use caches, only one tier is the source of truth for any given data item. Usually, storage cost is the main driver. In the next subsection, I’ll describe how there are two types of tiering: Internal vs Shared. “Materialization” is about making data of a primary system available to a secondary system, by copying data from the primary storage system (and format) to the secondary storage system, such that both data copies are maintained (albeit with different formats). The second copy is not readable from the source system as its purpose is to feed another data system. Copying data to a lakehouse for access by various analytics engines is a prime example. “Data lifecycle management” governs concerns such as data lifetime and data compatibility . Tiering implies lifecycle-linked storage tiers where the canonical data is deleted in the source tier once copied to the new tier (move semantics). Materialization implies much weaker lifecycle management, with no corresponding deletion after copy. Data can be stored with or without copies, in different storage systems and different formats, but the logical schema of the data may evolve over time. Therefore compatibility is a major lifecycle management concern, not only across storage formats but also across time. We can classify tiering into two types: Internal Tiering is data tiering where only the primary data system (or its clients) can access the various storage tiers. For example, Kafka tiered storage is internal tiering. These internal storage tiers as a whole form the primary storage . Shared Tiering is data tiering where one or more data tiers is shared between multiple systems. The result is a tiering-materialization hybrid, serving both purposes. Tiering to a lakehouse is an example of shared tiering. Internal tiering is the classic tiering that we all know today. The emergence of lakehouse tiering is relatively new and is a form of shared tiering. But with sharing comes shared responsibility. Once tiered data is in shared storage, it serves as the canonical data source for multiple systems, which adds an extra layer of discipline, control and coordination.  Shared tiering is a kind of hybrid between internal tiering and materialization. It serves two purposes: Tiering : Store historical data in a cheaper storage. Materialization : Make the data available to a secondary system, using the secondary system’s data format. A key aspect of shared tiering is the need for bidirectional lossless conversion between “primary protocol + format” and “secondary protocol + format”. Getting this part right is critical as the majority of the primary system’s data will be stored in the secondary system’s format. The main driver for shared tiering is avoiding data duplication to minimize storage costs. But while storage costs can be lowered, shared tiering also comes with some challenges due to it serving two purposes: Lifecycle management . Unlike materialization, the data written to the secondary system remains tied to the primary. They are lifecycle-linked and therefore data management should remain with the primary system. Imagine shared tiering where a Kafka topic operated by a Kafka vendor tiers to a Delta table managed by Databricks. Who controls the retention policy of the Kafka topic? How do we ensure that table maintenance by Databricks doesn’t break the ability of the tiering logic to read back historical data? Schema management . Schema evolution of the primary format (such as Avro or Protobuf) may not match the schema evolution of the secondary format (such as Apache Iceberg). Therefore, great care must be taken when converting from the primary to the secondary format, and back again. With multiple primary formats and multiple secondary formats, plus changing schemas over time, the work of ensuring long-term bidirectional compatibility should not be underestimated. Exposing data . Tiering requires bidirectional lossless conversion between the primary and secondary formats. When tiering to an Iceberg table, we must include all the metadata of the source system, headers, and all data fields. If there is sensitive data that we don’t want to be written to the secondary system, then materialization might be better. Fidelity . Great care must be taken to ensure that the bidirectional lossless conversion between formats does not lose fidelity. Security/encryption . End-to-end encryption will of course make shared tiering impossible. But there may be other related encryption related challenges, such as exposing encryption metadata in the secondary system. Performance overhead. There may be a large conversion cost when serving lagging or catch-up consumers from secondary storage, due to the conversion from the secondary format to the primary. Performance optimization . The shared tier serves two masters and here data organization can really make the difference in terms of performance and efficiency. But what can benefit the secondary can penalize the primary and vice versa. An example might be Z-order compaction in a lakehouse table, where files may be reorganized (rewritten) changing data locality such that data will be grouped into data files according to common predicates. For example, compacting by date and by country (when where clauses and joins frequently use those columns). While this reorganization can vastly improve lakehouse performance, it can make reads by the primary system fantastically inefficient compared to internal tiering.  Risk . Once data is tiered, it becomes the one canonical source. Should any silent conversion issue occur when translating from the primary format to the secondary format, then there is no recourse. Taking Kafka as an example, topics can have Avro, Protobuf, JSON among other formats. The secondary format could be an open table format such as Iceberg, Delta, Hudi or Paimon. Data files could be Avro, Parquet or ORC. With so many combinations of primary->secondary->primary format conversions, there is some unavoidable additional risk associated with shared tiering. Internal tiering is comparatively simple, you tier data in its primary format. In the case of Kafka, you tier log segment files directly.  While zero-copy shared tiering sounds attractive, there are practical implications that must be considered. I am sure that shared tiering can be made to work, but with great care and only if the secondary storage remains under the ownership and maintenance of the primary system.  Note: The risks of fidelity and compatibility issues can be mitigated by storing the original bytes alongside the converted columns, but this introduces data duplication which many proponents of shared tiering are advocating against. I wrote about data platform composability , enabled by the open-table formats. The main premise is that we can surface the same physical tabular data in two platforms. One platform acts as the primary, writing to the table and performing table management. Secondary platforms surface that table to their own users as a read-only table. That way, we can compose data platforms while ensuring table ownership and management responsibility remain clear. This same approach works for shared tiering. The primary system should have full ownership (and management responsibility) of the lakehouse tiered data, making it a purely readonly resource for secondary systems. In my opinion, this is the only sane way to do shared lakehouse tiering, but it adds the burden of lakehouse management to the primary system. Where does the work of stitching different storage sources live? Should it exist server-side, or is it better as a client-side abstraction? One option is to place that stitching and conversion logic server-side , on a cluster of nodes that serve the API requests from clients. This is the current choice of: Kafka API compatible systems. Kafka clients expect brokers to serve a seamless byte stream from a single logical log, so Kafka brokers do the work of stitching disk-bound log segments with object-store-bound log segments. HTAP and real-time analytics databases. Another option is to place that stitching logic client-side , inside the client library. Through some kind of signalling or coordination mechanism the client must know how to stitch data from two or more storage systems, possibly using different storage formats. The client must house the data conversion logic to surface the data in the logical model. If this is a database API, then the client will also have to perform the query logic, and hopefully is able to push down some of the work to the various storage sources. Client-side stitching can make sense if the client sits above two separate high-level APIs, such as a stream processor above Kafka and a lakehouse. But it’s also possible to place stitching client-side within a single data system protocol. The benefit of client-side stitching is that we unburden the storage cluster from this work. For example, it would be possible to make Kafka clients download remote log segments instead of the Kafka brokers being responsible, freeing brokers from the load generated by lagging consumers. On the downside, putting the stitching and conversion client-side makes clients more complicated and can make concepts such as storage format evolution and compatibility more difficult. A lot depends on what kind of control the operator has over the clients. If clients are tightly controlled and kept in-sync with the storage systems and their evolution, then client-side stitching might be feasible. If however, clients are not managed carefully and many different client versions are in production, this can complicate long-term evolution of data and storage. I’ve seen many issues from customers running multiple Kafka client versions against the same cluster, often very old versions due to constraints in the customer’s environment. Placing the stitching work server-side, either directly in the primary cluster or in a proxy layer, removes the client complexity and evolution headaches, but at the cost of extra load and complexity server-side. Arguably, we might prefer the complexity to live server-side where it can be better controlled. Materialization and tiering have some similarities. They both involve copying data from one storage location to another, and possibly from one format to another. Where they diverge is that tiering implies a tight data lifecycle between storage tiers and materialization does not. For tiering, we need a job that: Learns the current tiering state from the metadata store. Reads the data in the source tier. Potentially converts it to the format of the destination tier (such as from a row-based format to a columnar format). Writes it to the destination tier. Updates the associated metadata in the metadata service. Finally deletes the data in the source tier. That job could be the responsibility of a primary cluster (that serves reads and writes) or a secondary component whose only responsibility is to perform the tiering. If a secondary component is used, it might be able to directly access the data from its storage location, or may access the data through a higher level API. The same choices exist for materialization. Which is best? It’s all context dependent. In open-source projects, we typically see all this data management work centralized, hence Kafka brokers taking on this responsibility. In managed (serverless) cloud systems, data management is usually separated into a dedicated data management component, for better scaling and separation of concerns. Tiering involves reads and writes to both tiers, whereas materialization requires read access to the primary’s data and write access to the secondary. What kind of access to primary storage and secondary storage is used? Next we’ll look at direct vs API access for reads and writes to both tiers. We have two types of access: Direct-access : The tiering/materialization process (whether it be integrated or external) directly reads the primary storage data files. Example: Kafka brokers read local log segments from the filesystem and write them to the second tier (object storage). API-access : The tiering/materialization process uses the primary’s API to read the primary’s data. Example: Tiering/materialization could be the responsibility of a separate component, which reads data via the Kafka API and writes it to the second tier. Direct access might be more efficient than API-access, but likewise, direct access might be less reliable if the primary performs maintenance operations that change the files while tiering is running. It may be necessary to add coordination to overcome such conflicts. For both materialization and shared tiering, we must decide how to write data to the secondary system. In the case of a lakehouse, we would likely do it via a lakehouse API such as an Iceberg library (API-access). Shared tiering must also be able to read back tiered data from the secondary system.   A key consideration is that the primary must maintain a mapping of its logical and/or physical model onto the secondary storage. For example, mapping a Kafka topic partition offset range to a given tiered file. The primary needs this in order to be able to download the right tiered files to serve historical reads. But this mapping could also map to the logical model of the secondary system, such as row identifiers of an Iceberg table. Let’s look at some example strategies of Iceberg shared tiering: API-Access. The primary uses the Iceberg library to write tiered data and to read tiered data back again. It maintains a mapping of primary logical model to secondary logical model. Example: Kafka uses the Iceberg library to tier log segments, maps topic partition offset ranges to Iceberg table row identifier ranges. Uses the Iceberg library to read tiered data using predicate clauses. Hybrid-Access (Write via API, Read via Direct). The primary uses the Iceberg library to tier data but keeps track of the data files (Parquet) it has written, with a mapping of the primary logical model to secondary file storage. To serve historical reads, the primary knows which Parquet files to download directly, rather than using the Iceberg library which may be less efficient. The direct-access strategy could be problematic for shared tiering as it bypasses the secondary system’s API and abstractions (violating encapsulation leading to potential reliability issues). The biggest issue in the case of lakehouse tiering is that table maintenance might reorganize files and delete the files tracked by the primary. API-access might be preferable unless secondary maintenance can be modified to preserve the original Parquet files (causing data duplication) or have maintenance update the primary on the changes it has made so it can make the necessary mapping changes (adding a coordination component to table maintenance). Another consideration is that if a custom approach is used, where for example, additional custom metadata files are maintained side-by-side with Iceberg files, then Iceberg table maintenance cannot be used and maintenance itself must be a custom job of the primary. We ideally want one canonical source where the data lifecycle is managed. Whether stitching and conversion is done client-side or server-side, we need a metadata/coordination service to give out the necessary metadata that translates the logical data model of the primary to its physical location and layout. Tiering jobs, whether run as part of a primary cluster or as a separate service, must base their tiering work on the metadata maintained in this central metadata service. Tiering jobs learn of the current tiering state, inspect what new tierable data exists, do the tiering and then commit that work by updating the metadata service again (and deleting the source data). In some cases, the metadata service could even be a well-known location in object storage, with some kind of snapshot or root manifest file (and associated protocol for correctness). When client-side stitching is performed, clients must learn somehow of the different storage locations of the data it needs. There are two main patterns here: The clients directly ask the metadata service for this information, and then request the data from whichever storage tier it exists on. The client simply sends reads to a primary cluster, which will serve either the data (if stored on the local filesystem), or serve metadata (if stored on a separate storage tier). In the second case, it requires that the primary cluster knows the metadata of tiered data in order to respond with metadata instead of data. This may be readily available if the tiering job runs on the cluster itself. It can also be possible for the cluster to be notified of metadata updates by the metadata component. What governs the long-term compatibility of data across different storage services and storage formats? Is there a canonical logical schema which all other secondary schemas are derived from? Or are primary and secondary schemas managed separately somehow? How are they kept in sync? What manages the logical schema and how physical storage remains compatible with it?  If direct-access is used to read shared tiered data and maintenance operations periodically reorganize storage, how does the metadata maintained by the primary stay in-sync with secondary storage? Again, this comes down to coordination between metadata services, the tiering/materialization jobs, maintenance jobs, catalogs and whichever system is stitching the different data sources together (client or server-side). Many abstractions and components may be in play. Lakehouse formats provide excellent schema evolution features, but these need to be governed tightly with the source system, which may have different schema evolution rules and limitations. When shared tiering is used, the only sane choice is for the shared tiered data to be managed by the primary system, with read-only access to the secondary systems. If we want to expose the primary’s data in a secondary system, should we use shared tiering, or materialization (presumably with internal tiering)? This is an interesting and multi-faceted question. We should consider two principal factors: Where the stitching/conversion logic lives (client or server). The pros/cons of shared tiering vs pros/cons of materialization When the stitching is client-side, tiering vs materialization may not make a difference. Materialization also requires metadata to be maintained regarding the latest position of the materialization job. A client armed with this metadata can stitch primary and secondary data together as a single logical resource.  We might be using Flink and want to combine real-time Kafka data with historical lakehouse data. Flink sits above the high-level APIs of the two different storage systems. Whether the Kafka and lakehouse data are tightly lifecycle-linked with tiering, or more loosely with materialization is largely unimportant to Flink. It only needs to know the switchover point from batch to streaming. If we want to provide the data in lakehouse format so Spark jobs can slice and dice the data, then either shared tiering or materialization is an option. Shared tiering might be preferable if reducing storage cost (by avoiding data duplication) is the primary concern. However, other factors are also at play, as explained earlier in The challenges of shared tiering . Materialization might be preferable if: The primary and secondary systems have completely different access patterns, such that maintaining two copies of the data, in their respective formats is best. The secondary can organize the data optimized for its own performance and the primary uses internal tiering, maintaining its own optimized copy. The primary does not want to own the burden of long term management of the secondary storage. The primary does not have control over the secondary storage (to the point where it cannot fully manage its lifecycle). Performance and reliability conscious folks prefer to avoid the inherent risks associated with shared tiering, in terms of conversion logic over multiple schemas over time, performance constraints due to data organization limitations etc. The secondary only really needs a derived dataset. For example, the lakehouse just wants a primary key table rather than an append-only stream, so the materializer performs key-based upserts and deletes as part of the materialization process. Data duplication avoidance is certainly a key consideration, but by no means always the most important. The subject of storage unification (aka data virtualization), is a large and nuanced subject. You can choose to place the virtualization layer predominantly client-side, or server-side, each with their pros and cons. Data tiering or data materialization are both valid options, and can even be combined. Just because the primary system chooses to materialize data in a secondary system does not remove the benefits of internally tiering its own data.  Tiering can come in the form of Internal Tiering or Shared Tiering, where shared tiering is a kind of hybrid that serves both primary and secondary systems. Shared tiering links a single storage layer to both primary and secondary systems, each with its own query patterns, performance needs, and logical data model. This has advantages, such as reducing data duplication, but it also means lifecycle policies, schema changes, and format evolution must be coordinated (and battle tested) so that the underlying storage remains compatible with both primary and secondary systems. With clear ownership by the primary system and disciplined management, these challenges can be manageable. Without them, shared tiering becomes more of a liability rather than an advantage. While on paper, materialization may seem more work as two different systems must remain consistent, the opposite is more likely to be true. By keeping the canonical data a private concern of the primary data system, it frees the primary from potentially complex and frictionful compatibility work juggling competing concerns and different storage technologies with potentially diverging future evolution. I would like to underline that making consistent copies of data is a long and well understood data science problem. The urge to simply remove all data copies is understandable, as storage cost is a factor. But there are so many other and often more important factors involved, such as performance constraints, reliability, lifecycle management complexity and so on. But if reducing storage at-rest cost is the main concern, then shared tiering, with its additional complexity may be worth it. I hope this post has been food for thought. With this conceptual framework, I will be writing in the near future about how various systems in the data infra space perform storage unification work. Tiered storage in event streaming systems such as Apache Kafka and Pulsar HTAP databases such as SingleStore and TiDB Real-time analytics databases such as Apache Pinot, Druid and Clickhouse Frontend abstraction: Stitching together the different physical storage into one logical model. Backend work: Physical storage management (tiering, materialization, lifecycle management). Data organization / format Data tiering Data materialization Data lifecycle Internal Tiering is data tiering where only the primary data system (or its clients) can access the various storage tiers. For example, Kafka tiered storage is internal tiering. These internal storage tiers as a whole form the primary storage . Shared Tiering is data tiering where one or more data tiers is shared between multiple systems. The result is a tiering-materialization hybrid, serving both purposes. Tiering to a lakehouse is an example of shared tiering. Tiering : Store historical data in a cheaper storage. Materialization : Make the data available to a secondary system, using the secondary system’s data format. Lifecycle management . Unlike materialization, the data written to the secondary system remains tied to the primary. They are lifecycle-linked and therefore data management should remain with the primary system. Imagine shared tiering where a Kafka topic operated by a Kafka vendor tiers to a Delta table managed by Databricks. Who controls the retention policy of the Kafka topic? How do we ensure that table maintenance by Databricks doesn’t break the ability of the tiering logic to read back historical data? Schema management . Schema evolution of the primary format (such as Avro or Protobuf) may not match the schema evolution of the secondary format (such as Apache Iceberg). Therefore, great care must be taken when converting from the primary to the secondary format, and back again. With multiple primary formats and multiple secondary formats, plus changing schemas over time, the work of ensuring long-term bidirectional compatibility should not be underestimated. Exposing data . Tiering requires bidirectional lossless conversion between the primary and secondary formats. When tiering to an Iceberg table, we must include all the metadata of the source system, headers, and all data fields. If there is sensitive data that we don’t want to be written to the secondary system, then materialization might be better. Fidelity . Great care must be taken to ensure that the bidirectional lossless conversion between formats does not lose fidelity. Security/encryption . End-to-end encryption will of course make shared tiering impossible. But there may be other related encryption related challenges, such as exposing encryption metadata in the secondary system. Performance overhead. There may be a large conversion cost when serving lagging or catch-up consumers from secondary storage, due to the conversion from the secondary format to the primary. Performance optimization . The shared tier serves two masters and here data organization can really make the difference in terms of performance and efficiency. But what can benefit the secondary can penalize the primary and vice versa. An example might be Z-order compaction in a lakehouse table, where files may be reorganized (rewritten) changing data locality such that data will be grouped into data files according to common predicates. For example, compacting by date and by country (when where clauses and joins frequently use those columns). While this reorganization can vastly improve lakehouse performance, it can make reads by the primary system fantastically inefficient compared to internal tiering.  Risk . Once data is tiered, it becomes the one canonical source. Should any silent conversion issue occur when translating from the primary format to the secondary format, then there is no recourse. Taking Kafka as an example, topics can have Avro, Protobuf, JSON among other formats. The secondary format could be an open table format such as Iceberg, Delta, Hudi or Paimon. Data files could be Avro, Parquet or ORC. With so many combinations of primary->secondary->primary format conversions, there is some unavoidable additional risk associated with shared tiering. Internal tiering is comparatively simple, you tier data in its primary format. In the case of Kafka, you tier log segment files directly.  Kafka API compatible systems. Kafka clients expect brokers to serve a seamless byte stream from a single logical log, so Kafka brokers do the work of stitching disk-bound log segments with object-store-bound log segments. HTAP and real-time analytics databases. Learns the current tiering state from the metadata store. Reads the data in the source tier. Potentially converts it to the format of the destination tier (such as from a row-based format to a columnar format). Writes it to the destination tier. Updates the associated metadata in the metadata service. Finally deletes the data in the source tier. Direct-access : The tiering/materialization process (whether it be integrated or external) directly reads the primary storage data files. Example: Kafka brokers read local log segments from the filesystem and write them to the second tier (object storage). API-access : The tiering/materialization process uses the primary’s API to read the primary’s data. Example: Tiering/materialization could be the responsibility of a separate component, which reads data via the Kafka API and writes it to the second tier. API-Access. The primary uses the Iceberg library to write tiered data and to read tiered data back again. It maintains a mapping of primary logical model to secondary logical model. Example: Kafka uses the Iceberg library to tier log segments, maps topic partition offset ranges to Iceberg table row identifier ranges. Uses the Iceberg library to read tiered data using predicate clauses. Hybrid-Access (Write via API, Read via Direct). The primary uses the Iceberg library to tier data but keeps track of the data files (Parquet) it has written, with a mapping of the primary logical model to secondary file storage. To serve historical reads, the primary knows which Parquet files to download directly, rather than using the Iceberg library which may be less efficient. The clients directly ask the metadata service for this information, and then request the data from whichever storage tier it exists on. The client simply sends reads to a primary cluster, which will serve either the data (if stored on the local filesystem), or serve metadata (if stored on a separate storage tier). What governs the long-term compatibility of data across different storage services and storage formats? Is there a canonical logical schema which all other secondary schemas are derived from? Or are primary and secondary schemas managed separately somehow? How are they kept in sync? What manages the logical schema and how physical storage remains compatible with it?  If direct-access is used to read shared tiered data and maintenance operations periodically reorganize storage, how does the metadata maintained by the primary stay in-sync with secondary storage? Where the stitching/conversion logic lives (client or server). The pros/cons of shared tiering vs pros/cons of materialization The primary and secondary systems have completely different access patterns, such that maintaining two copies of the data, in their respective formats is best. The secondary can organize the data optimized for its own performance and the primary uses internal tiering, maintaining its own optimized copy. The primary does not want to own the burden of long term management of the secondary storage. The primary does not have control over the secondary storage (to the point where it cannot fully manage its lifecycle). Performance and reliability conscious folks prefer to avoid the inherent risks associated with shared tiering, in terms of conversion logic over multiple schemas over time, performance constraints due to data organization limitations etc. The secondary only really needs a derived dataset. For example, the lakehouse just wants a primary key table rather than an append-only stream, so the materializer performs key-based upserts and deletes as part of the materialization process.

0 views
Jack Vanlightly 4 months ago

Remediation: What happens after AI goes wrong?

If you’re following the world of AI right now, no doubt you saw Jason Lemkin’s post on social media reporting how Replit’s AI deleted his production database , despite it being told not to touch anything at all due to a code freeze. After deleting his database, the AI even advised him that a rollback would be impossible and the data was gone forever. Luckily, he went against that advice, performed the rollback, and got his data back. Then, a few days later I stumbled on another case, this time of the Gemini CLI agent deleting a user’s files (the post now deleted). He was just playing around, kicking the tires, but the series of events that took place is illuminating . These incidents showed AI agents making mistakes, but they also showed agents failing to recover . In both cases, the AI not only broke something, but it couldn't fix it. That’s why I’m writing about remediation, and how it needs to be a first-class concern in AI agent implementations. While kicking the tires, the user told the agent to rename the current working directory and move the files into a new sub-directory. The agent proceeded to execute commands that led to data loss: It attempted to create a new sub-directory, but while the command failed, the agent thought it was successful. The agent next performed move commands for each file to the new directory (which didn’t exist), thus deleting each file one by one. The agent performed a list command on the source directory, and on seeing no files it declared that this initial stage of the work was completed. At this point, the user saw that everything was gone and asked Gemini to revert the operation. In his words, “ This is where Gemini's hallucinations collided with the file system's actual state…. Now completely disoriented, Gemini found itself in an impossible situation: it possessed detailed memories of files and folders residing in a location that the operating system insisted had never existed. Its responses became increasingly desperate, a cascade of apologies and frantic attempts to reconcile its internal state with external reality. ”  When I wrote “ AI Agents in 2025 ”, I wrote about the reliability challenges of AI agents, and I was struck by how each of these incidents mapped onto the list I made: I listed 18 challenges split among 4 categories—Effective Planning, Accurate Tool Selection and Usage, Reasoning and Decision-Making, and Failure Modes in Execution—and that was just a start. In that list, I mention remediation challenges under Failure Modes in Execution, but I think remediation warrants its own category (which we’ll discuss further down). But there’s also a missing item from the Failure Modes in Execution category: AI agent internal model drift . From the agent’s perspective, it had executed a series of commands that were all successful, resulting in the files existing in a new directory, leaving the original directory empty. But it was totally out of touch with reality. When an agent can get out of sync with reality, can we trust it to remediate its own actions?   After reading the transcript, I am not sure I would want such an agent even to attempt a remediation, lest it create more damage in the process. By the end of this short sequence, the very foundations were swept away, leaving the AI confused and apologetic. The question remains: how can an AI agent know that it is drifting from the real state of the world? When it drifts, it may perform actions that are inappropriate for the context, and likewise, may be unable to recover. This brings me to the blog post title, “Remediation: What happens after AI goes wrong?”, because some AI agents have a tremendous capacity for damage. Some agents perform actions in the real world, leading to real world consequences. But even agents that are only sophisticated information gatherers and processors can cause damage. Agent-derived insights can be wrong, leading to incorrect real-world actions by downstream software systems and humans. Insights can be biased against minorities or be the real world be mischaracterized by omission or incompleteness. The results might be structurally correct and pass validation checks, but the qualitative content could be woefully and tragically inaccurate or skewed. Evals help here, but what happens when things go wrong in production despite your best efforts? We should move beyond thinking only about error prevention to thinking about recovery and damage control . A defining characteristic of a safe AI agent is remediation, which extends beyond the AI agent itself.  If I were to implement an AI agent, I would be asking some questions: Given the available tools and permissions, what kind of damage could be done? For info gatherers/processor agents, what harm could result from qualitatively bad results? For each potentially damaging action, can it be undone?  Can we limit the scope of a potentially destructive action? Can qualitatively bad data be retracted?  What controls or safeguards need to be put in place to ensure there is recourse should a destructive action be taken? How do we detect destructive/damaging actions in automated flows? Who or what should do the remediation? The agent? Or should those steps exist outside of the agent?  Can we insert humans or evaluator agents into the loop before or after actions are taken, or insights are acted upon? Some real-world actions may not have an undo button. Once an email is sent to the wrong recipient and read by that recipient, it can’t be unread. Some failures might cause cascading damage: corrupted data triggering downstream systems, financial systems making incorrect trades (reminiscent of the Knight Capital incident in 2012), or even physical systems being harmed (e.g., robotics or IoT). Others can be remediated if the necessary control mechanisms are put in place beforehand . Once a file is permanently deleted from a local disk with no backup, it’s gone. If an AI agent issues a cloud infrastructure command to delete a production server with live customer data when no snapshot or backup exists, that loss may be irrecoverable.  There are many approaches to establishing the foundations of remediation, and we can look to data systems, such as databases and file systems, for inspiration. Some proven patterns include: Journaling : Logs every operation before applying it, allowing for undo operations or at least forensic analysis to guide manual recovery. Immutable versioned data : Enables rollback by reverting to a previous version. Append-only logs : Prevent bad data from overwriting good data; errors can be "retracted" by publishing corrected events.  Secure read-only backups : Ensure there's a fallback if things go wrong. Remediation may not just depend on carrying out the actions in reverse; without controls, there may be nothing left to reverse. We may also need humans to do the remediation steps themselves in some cases. Is there enough information in the logs or journal to be able to do that? Are there backups or versioned/immutable data that a human can utilize as part of the remediation process? It’s not hard to imagine a future where job titles exist to clean up after AI and assess AI risks to an organization (AI Remediation Specialist, Autonomous Systems Safety Engineer, Autonomous Systems Risk Officer, etc). AI agents can cause catastrophic damage when their inner representation diverges from reality. The Gemini case shows how an agent's hallucination about successful commands led to cascading file deletions. Different types of AI agents have vastly different damage potential . Some can take real-world actions with permanent consequences, others just provide bad information, but both can cause harm. Remediation deserves its own category when evaluating AI safety . Beyond just preventing errors, we need to think systematically about recovery and damage control.  Current AI agents may not be reliable stewards of remediation steps . Because the same cognitive failures that cause problems also corrupt their ability to fix those problems. Effective remediation may require external systems and controls to be implemented, rather than relying on agent self-repair . Human oversight, monitoring systems, and separate recovery mechanisms that don't depend on the agent's corrupted state. The incidents with Replit and Gemini reveal core challenges about destructive and damaging behaviors and the recovery or damage control options in place. The question isn't whether AI agents will make mistakes. They will. The question is whether we're building the guard rails and escape hatches to cope when those inevitable mistakes happen. Remediation is a core pillar of any AI agent project where the potential for harm exists. It attempted to create a new sub-directory, but while the command failed, the agent thought it was successful. The agent next performed move commands for each file to the new directory (which didn’t exist), thus deleting each file one by one. The agent performed a list command on the source directory, and on seeing no files it declared that this initial stage of the work was completed. Given the available tools and permissions, what kind of damage could be done? For info gatherers/processor agents, what harm could result from qualitatively bad results? For each potentially damaging action, can it be undone?  Can we limit the scope of a potentially destructive action? Can qualitatively bad data be retracted?  What controls or safeguards need to be put in place to ensure there is recourse should a destructive action be taken? How do we detect destructive/damaging actions in automated flows? Who or what should do the remediation? The agent? Or should those steps exist outside of the agent?  Can we insert humans or evaluator agents into the loop before or after actions are taken, or insights are acted upon? Journaling : Logs every operation before applying it, allowing for undo operations or at least forensic analysis to guide manual recovery. Immutable versioned data : Enables rollback by reverting to a previous version. Append-only logs : Prevent bad data from overwriting good data; errors can be "retracted" by publishing corrected events.  Secure read-only backups : Ensure there's a fallback if things go wrong. AI agents can cause catastrophic damage when their inner representation diverges from reality. The Gemini case shows how an agent's hallucination about successful commands led to cascading file deletions. Different types of AI agents have vastly different damage potential . Some can take real-world actions with permanent consequences, others just provide bad information, but both can cause harm. Remediation deserves its own category when evaluating AI safety . Beyond just preventing errors, we need to think systematically about recovery and damage control.  Current AI agents may not be reliable stewards of remediation steps . Because the same cognitive failures that cause problems also corrupt their ability to fix those problems. Effective remediation may require external systems and controls to be implemented, rather than relying on agent self-repair . Human oversight, monitoring systems, and separate recovery mechanisms that don't depend on the agent's corrupted state.

0 views
Jack Vanlightly 4 months ago

The Cost of Being Wrong

A recent LinkedIn post by Nick Lebesis caught my attention with this brutal take on the difference between good startup founders and coward startup founders. I recommend you read the entire thing to fully understand the context, but I’ve pasted the part that most resonated with me below: "Real founders? They make the wrong decision at 9am. Fix it by noon. Ship by 5. Coward founders are still scheduling the kickoff meeting. Your job isn't to be liked. Your job is to be clear. Wrong but decisive beats right but timid... every single time. Committees don't build companies. Convictions do." It's harsh, but there's truth here that extends well beyond startups into how we approach technical decision-making in software development, even in large organizations.  I am a deep thinker, which leads me to be an over-thinker and sometimes obsessive thinker, which definitely has its pros, but it also comes with some cons. I've been guilty of agonizing over architecture choices in my past, weighing endless options against endless possible outcomes. Different tech stacks, varying team skills, shifting requirements. Decision paralysis is real. But recognising I’m an overthinker is also the antidote. Others might be reluctant to take decisive decisions due to fear of loss of credibility if things don’t go as planned, equating a decision that leads to failure to being a bad leader and so on. There are many possible reasons why this startup founder was unable to be decisive. This got me thinking about how different fields handle uncertainty and risk. What I've realized is that the cost structure of mistakes fundamentally shapes how entire disciplines approach new ideas and decision-making. George Bernard Shaw observed that " all great truths begin as blasphemies ." Arthur Schopenhauer noted that truth is first ridiculed, then violently opposed, and finally accepted. Max Planck went further with: " Science progresses one funeral at a time ." You get the point. In science, new ideas face fierce resistance, and for good reason. The laws of physics aren't changing month to month and they’re incredibly complex to unravel. When you're trying to reconcile theoretical physics with experimental observations, or understand how our immune system interacts with our nervous system, you’re working with incredibly complex systems that take decades of painstaking research to unravel. The cost of taking the wrong research path can set entire fields back years or decades. The decades spent focused on amyloid proteins in Alzheimer's research potentially becoming a major tragedy comes to mind, or the misguided multi-decadal emphasis on dietary cholesterol over sugar intake.  Science manages this risk by applying extreme skepticism to new ideas and requiring high bars of proof. Sometimes this gets pathological, but the underlying logic is sound: when mistakes are expensive, you move carefully. Traditional engineering occupies the middle ground between scientific rigor and what we see in software. Engineering advances through a mix of scientific progress and real-world lessons learned. Safety acts as the primary brake on speed. When engineering fails, the consequences are immediate and physical, such as bridge collapses, plane crashes, playground equipment injuring children, toy designs creating choking hazards and avoidable deaths from car accidents. Civil engineers can't A/B test bridge designs with real traffic. Toy manufacturers must ensure button eyes stay attached before products reach store shelves. Even something as simple as designing a child's high chair requires considering tip-over risks, structural integrity, and material safety, knowing that a design flaw could harm a toddler. This creates measured deliberation in how the field approaches uncertainty and risk. But what happens when the cost of failure drops dramatically? In software development, we see the opposite phenomenon entirely. We love jumping on new ideas, new frameworks, methodologies, databases, architectural patterns. Each new thing comes with enthusiasm and some believers in its promise of solving our current and emerging challenges. Unlike physical engineering where practitioners must work within established, proven methods, software exists in a realm where individual developers experiment freely, teams adopt bleeding-edge technologies, and "move fast and break things" became a celebrated mantra. This freedom to innovate at the individual level is unthinkable in structural engineering. A structural engineer can't just invent new load-bearing calculations. It would be flagged during construction, result in serious professional consequences, and even if theoretically superior, would need years of testing and regulatory approval before acceptance. But a software developer can test different load balancing algorithms on live traffic, experiment with database query optimizations, migrate from monolithic architectures to microservices, switch from REST APIs to GraphQL, or move from SQL to NoSQL databases and back again.  Why do we get this freedom? Because the cost of failure is so much lower. The freedom to experiment increases as the cost of failure decreases.  Some software really is life critical, and so comes with a high cost of failure. But most software is not like that. Generally, we have remarkable freedom and room for creativity in software, which draws many of us to the field in the first place. We should lean into this, instead of importing risk-aversion patterns and mindsets from higher-stakes domains. An imperfect architectural decision won't collapse a bridge or harm a child. Rather than agonizing over decisions like they're irreversible, we should make them expecting some iteration and course correction. We make strategic technical decisions and often have to make strategic technical course corrections.  So if you sometimes struggle with decision making, as I myself have done, just remember that software decisions are not irreversible, they do not have to be perfect. Don’t be like the founder in that LinkedIn post. Every technical decision does not carry the weight of a bridge design. Let’s take advantage of our ability to fail forward quickly. I’m repeating this as a kind of mantra as much for myself as I am as giving it as free advice for others.

0 views
Jack Vanlightly 4 months ago

Responsibility Boundaries in the Coordinated Progress model

Building on my previous work on the Coordinated Progress model, this post examines how reliable triggers not only initiate work but also establish responsibility boundaries . Where a reliable trigger exists, a new boundary is created where that trigger becomes responsible for ensuring the eventual execution of the sub-graph of work downstream of it. The boundaries can even layer and nest, especially in orchestrated systems that overlay finer-grained boundaries. Failures happen and distributed business service architectures need ways of recovering from those failures so they can continue to make progress. In the Coordinated Progress model, this reliable progress is defined as the combination of  Reliable Triggers, to ensure that work is initiated in a way that survives failure. Should the work fail before completion, it can be retriggered, with all the initial state of the first invocation. Progressable Work , to ensure that subsequent invocations do not result in inconsistent state or duplicate state. Work either: Progresses incrementally by durably logging progress so that after being re-triggered, it can resume from where it left off. All work is re-executed in full, but is idempotent. Fig 1. Reliable progress is made via reliable triggers and progressable work. Not all units of work have their own reliable trigger, but reliable progress needs a reliable trigger somewhere , even if that's a human clicking a button. That human, in its role as button clicker, acts as the root of a potentially large responsibility boundary encompassing multiple services. How a service uses reliable triggers to ensure reliability of itself and downstream work is the subject of this post. Let’s examine how a given service can utilize reliable triggers to ensure that a downstream service completes reliably. As a developer, it’s not enough to ensure your own service is reliable; you also need to think about whether your downstream invocations are truly reliable. How can a given service ensure the reliable execution of its own work, and of its downstream dependencies? The answer is by using reliable triggers and progressable work. But who places the trigger? The figure below outlines three patterns in which a caller reliably invokes a callee. In each case, the callee fails the first two times but succeeds on the 3rd attempt. Fig 2. Different ways of relying on reliable triggers to make downstream work reliable. Let’s look at different trigger placement types when looking at any specific caller->callee pairing : Caller Placement. In this placement model, the caller delegates responsibility for invoking the callee service to a middleware, such as a queue, topic, or reliable RPC. Each acts as a reliable trigger for the callee service to perform its work. Once the middleware has acknowledged receipt of the message/RPC, the caller context is free to proceed, or if there’s nothing left to do, to terminate. This is ideal for triggering asynchronous work. Callee Placement. In this placement model, the downstream service (the callee) registers its invocation durably, then immediately responds to the caller with a success response. The callee can then proceed (having durably logged the intent to do the work), carrying out the work asynchronously. Should it fail before completing its work, the durably stored invocation state is used to retrigger the work. This is ideal for ensuring asynchronous work, triggered by unreliable RPC, is reliably executed. Upstream Placement. In this placement type, the caller->callee edge is unreliable, such as an RPC. Essentially, there is no actual placement of a trigger; instead, the caller relies solely on its own reliable trigger (or even an upstream reliable trigger if it has none itself) to ensure that the unreliable caller-to-callee invocation is eventually successful. Ideal for synchronous work, where we assume there is a reliable trigger upstream (even if that is a human). In the case of Caller Placement, a middleware sits between the caller and the callee. It is the caller's responsibility to set a reliable trigger via this middleware. In the case of Callee Placement, there is no middleware between the caller and the callee, and the caller must assume the callee takes responsibility for itself. As we saw above, a node in the graph can perform a handoff of responsibility to a downstream node via a reliable trigger, or it can depend on its own trigger or even an upstream trigger. The point is that somewhere, a reliable trigger must exist. Let’s take the following example of 6 nodes forming a synchronous graph of execution. If a node is hollow, it has no reliable trigger, and if it is opaque, it does have a reliable trigger. Fig 3. Synchronous execution graph without a reliable trigger. Node A (a function in a microservice) kicks off the work. But this work is not reliably triggered, if it fails, there is nothing and no one to retrigger it. Furthermore, none of the downstream steps are reliably triggered either. There is no responsibility boundary at all; any failure amongst these 6 nodes will leave the work partially completed, most likely causing a global inconsistent state. Now, let’s place a reliable trigger for Node A. It could just be a human that clicked a button and waits to confirm the operation completes successfully, ready to try again if needed. Or it could be triggered from some other durable source, such as a queue, topic, database etc. Fig 4. Synchronous execution graph with a root reliable trigger, constituting a responsibility boundary. Things just got a lot better from a reliability perspective; we now have a responsibility boundary wrapping this entire graph. Should any of the nodes in this boundary fail, the system re-executes the entire operation, ensuring data integrity and task completion. While potentially resource-intensive, we prevent leaving the system in a partially executed or corrupted state (as long as each node implements a progressable work pattern). We might be able to leave things here, where we have clearly defined a responsibility boundary, should failures occur. However, what if Nodes D and F are long-running and are therefore performed asynchronously? When D and F receive a request, they immediately acknowledge the request by sending a success response, then proceed to execute the work asynchronously. If the work then fails, we have a problem. This single responsibility boundary is predicated on upstream nodes detecting when downstream nodes fail. But with asynchronous work, this predicate is false.  We can address this by having Node B send Node D a message over a queue or topic, and the same for Node C and F. Now D and F exist in their own responsibility boundary. Should either one fail before completion, they do not depend on Node A to retrigger the whole graph of execution. Fig 5. Synchronous and asynchronous work in the execution graph is separated into different responsibility boundaries. Nodes A, B, C, and E form a synchronous flow of execution. Synchronous flows don’t benefit from balkanized responsibility boundaries. Typically, synchronous work involves a single responsibility boundary, where the root caller is the reliable trigger. But it might be acceptable to make this entire flow asynchronous. Fig 6. The entire execution graph is executed asynchronously, with node (service) existing in its own boundary. Now, all inter-node communication happens over queues or topics. Each node is wrapped in its own responsibility boundary, meaning failures only require retriggering one node in the graph. It turns out that asynchronous two-way communication can be viewed as a special case of one-way communication (specifically, two one-way invocations). Viewed this way, we see that the callee happens to invoke a handler of the caller service (with the final response) as part of its asynchronous work. Fig 7. The same execution graph from fig 6, where each two-way communication acts like two one-way communications. The asynchronous response is received by a handler in the caller, and this handler context is a separate responsibility boundary from that of the initial calling context.  Request/response over queues and topics requires wiring event handlers and correlation IDs, which can be more complex than just writing the synchronous version from figure 4. Durable execution engines (DEEs) are emerging as a way to build these kinds of flows using simpler procedural code.  So far we’ve come from the mindset of individual services forming a graph of execution where it’s all about collaboration between services. When we use queues and topics for driving asynchronous work, this collaboration is event-driven. Each unit of work is triggered by a message on a queue or topic.  Durable execution works differently.  In durable execution, it’s not so much about collaboration between services, but a boss telling each subordinate service what to do. It is polling-driven, where the DEE polls (triggers and retriggers) each actor when they fail, serving cached values where prior executions have already made incremental progress to that point. Fig 8. How a Durable Execution Engine (DEE) drives a work, with a polling and caching approach. In the above figure, we have Function X (Fx) and Function Y(Fy). Fx is invoked (somehow) and it immediately registers its invocation with the DEE (caller trigger placement). Fx performs three blocks of progressable work using the DEE to register the results of each code block. Code block B requests that Function Y (Fy) do some work and send a response. With the DEE also mediating this communication, it looks something like this: Fx is triggered, and it registers its invocation with the DEE (caller trigger placement). Fx reaches code block B and instructs the DEE to execute Fy and return a response. Fx waits. The DEE invokes Fy, and either the DEE places a reliable trigger here, or Fy registers its invocation to place the trigger. Fy completes the work, providing the DEE with its response. Fx, which has been waiting, receives the response from the DEE, and it proceeds to code block C and terminates. This all appears to be very synchronous in nature, but because the invocations are registered with the DEE (reliable triggers), progress is logged via the DEE, and communication is mediated by the DEE, synchronous-looking code can actually be extremely long-running and make progress despite multiple failures.  For example, if Fx fails waiting for a response: The DEE will retrigger Fx.  Fx will reuse the cached result of code block A. Fx will request the work of block B again, and if Fy has completed, the DEE can serve Fx the cached response immediately. Fx then executes block C and terminates. Likewise, if Fy fails in block B3, the DEE retriggers Fy, and Fy can use any cached results of blocks B1 and B2. In fact, both can constantly fail, and the DEE acts as the poller, reliably invoking the functions and providing the results of previously executed blocks, until the whole thing is finally completed. The DEE is orchestrating everything, ensuring forward progress by polling each service repeatedly until a flow of work is complete. What does this do to our concept of responsibility boundaries?  On one hand, Fx and Fy have their own reliable triggers (the DEE). But at the same time, the DEE ensures the entire downstream graph of Fx is reliably executed. Therefore, we can think of an orchestrated flow as one outer responsibility boundary that overlays finer-grained internal boundaries. Each micro-boundary acts as a local fault domain, so that, should a failure occur, the DEE only needs to retrigger a local part of the wider workflow. Fig 9. Orchestrated flows consist of an outer responsibility boundary, that wraps nested boundaries based on reliable triggers. This might seem overly theoretical, but it actually aligns with the discussion of choreography versus orchestration in Coordinated Progress.  Choreography is about highly decoupled microservices that independently react to their input events as they arrive. There is no blocking or waiting, and all consumers operate independently of any upstream producers or subsequent downstream consumers. This lines up with each node in the graph operating in its own responsibility boundary.  Orchestration is about centralized logic driving a procedural workflow (if-this-then-that). The orchestrator keeps track of which parts of the workflow have been completed, which are in progress, and which have yet to be started. It keeps track of the commands sent to the subordinate services as well as the responses from those services. It operates inside a single outer responsibility boundary. In choreographed, event-driven architectures, each node in the execution graph has its own reliable trigger, and so each node owns its own recovery. This creates fine-grained responsibility boundaries, which can offer strong decoupling and smaller fault domains. Failures are contained; retries are local. However, these boundaries may not line up with business workflow boundaries. A failure in a small boundary may leave the broader business workflow partially complete and harder to reason about. In contrast, orchestrated workflows create coarse-grained responsibility boundaries that typically align with business workflows. A durable orchestrator owns retries, state, and the responsibility for the entire workflow’s success. It can also utilize finer-grained responsibility boundaries, allowing it to retrigger only small subgraphs of the entire workflow graph. This simplifies the developer’s mental model and makes recovery of complex processes more predictable. But you can’t wrap an entire architecture in one responsibility boundary. As the boundary grows, so does the frequency of change, making coordination and releases increasingly painful. What should be loosely connected services become tightly coupled through shared state, shared versions, and shared failure modes. This defeats the point of distributed systems: flexibility, independence, and isolation. This leads me back to the idea of mixing choreography with orchestration for a best-of-both-worlds approach. Reliability isn’t free; someone must always be responsible for ensuring work is eventually completed. A reliable trigger marks the point where that responsibility begins or ends, shaping how systems recover from failure and continue making progress. Clear responsibility boundaries make these obligations explicit, providing answers when failures occur: where to look, who owns recovery, and how far the impact spreads. Without them, distributed systems remain fragile and unpredictable.  Aligning responsibility boundaries with business process boundaries is often beneficial, but those boundaries can only grow so large before they become a liability. In reality, business processes are naturally divided into specific domains; organizations don’t run giant processes that encompass the entire business. Instead, processes are focused on particular areas, triggered by specific events, and may in turn trigger other processes. Responsibility boundaries can be aligned to these business boundaries, using a mix of choreography and orchestration. Reliable Triggers, to ensure that work is initiated in a way that survives failure. Should the work fail before completion, it can be retriggered, with all the initial state of the first invocation. Progressable Work , to ensure that subsequent invocations do not result in inconsistent state or duplicate state. Work either: Progresses incrementally by durably logging progress so that after being re-triggered, it can resume from where it left off. All work is re-executed in full, but is idempotent. Caller Placement. In this placement model, the caller delegates responsibility for invoking the callee service to a middleware, such as a queue, topic, or reliable RPC. Each acts as a reliable trigger for the callee service to perform its work. Once the middleware has acknowledged receipt of the message/RPC, the caller context is free to proceed, or if there’s nothing left to do, to terminate. This is ideal for triggering asynchronous work. Callee Placement. In this placement model, the downstream service (the callee) registers its invocation durably, then immediately responds to the caller with a success response. The callee can then proceed (having durably logged the intent to do the work), carrying out the work asynchronously. Should it fail before completing its work, the durably stored invocation state is used to retrigger the work. This is ideal for ensuring asynchronous work, triggered by unreliable RPC, is reliably executed. Upstream Placement. In this placement type, the caller->callee edge is unreliable, such as an RPC. Essentially, there is no actual placement of a trigger; instead, the caller relies solely on its own reliable trigger (or even an upstream reliable trigger if it has none itself) to ensure that the unreliable caller-to-callee invocation is eventually successful. Ideal for synchronous work, where we assume there is a reliable trigger upstream (even if that is a human). Fx is triggered, and it registers its invocation with the DEE (caller trigger placement). Fx reaches code block B and instructs the DEE to execute Fy and return a response. Fx waits. The DEE invokes Fy, and either the DEE places a reliable trigger here, or Fy registers its invocation to place the trigger. Fy completes the work, providing the DEE with its response. Fx, which has been waiting, receives the response from the DEE, and it proceeds to code block C and terminates. The DEE will retrigger Fx.  Fx will reuse the cached result of code block A. Fx will request the work of block B again, and if Fy has completed, the DEE can serve Fx the cached response immediately. Fx then executes block C and terminates.

0 views
Jack Vanlightly 5 months ago

Coordinated Progress – Part 4 – A Loose Decision Framework

In this last post, we’ll review the mental framework and think about how that can translate to a decision framework. Microservices, functions, stream processors and AI agents represent nodes in our graph. An incoming edge represents a trigger of work in the node, and the node must do the work reliably. I have been using the term reliable progress but I might have used durable execution if it hadn’t already been used to define a specific type of tool. In this light, stream processors like Flink and Kafka Streams are particularly interesting. They’ve had durable execution built in from the start. The model assumes that processing logic is applied to an ordered, durable log of events. Progress is explicitly tracked through durable checkpoints or changelogs. Failures are expected and routinely recovered from. Many of the promises of durable execution are simply native assumptions in the stream processing world. You can think of stream processors as always-on orchestrators for data-in-motion: The durable trigger is the event log itself. The progressable work is defined through operators, keyed state, and fault-tolerant state backends. This makes stream processing a powerful, production-tested example of how durable execution/reliable progress can be built into the foundation of an execution model. Microservices and functions have not historically had a durable execution/reliable progress foundation built into it’s execution model (except may be for some actor frameworks). Reliable triggers exist in the form of queues and event streams, but progressable work has been limited to idempotency. The Durable Execution category aims to close this reliable progress gap for microservices and functions. It can provide an additional reliable trigger in the form of Reliable RPC and provides progressable work by durably persisting all progress, with resume functionality. The question then comes down to preferences and constraints: What’s your coding style preference? What's your coupling tolerance? What infrastructure dependencies can you tolerate? Stream processors like Flink and Kafka Streams use a continuous dataflow programming model, with durability "batteries included". You likely already have a dependency on an event streaming system like Kafka. Flink is an additional runtime to manage, while Kafka Streams is just a library but still comes with some operational challenges related to state management. Imperative microservices and functions offer familiar procedural programming but typically lack native durability mechanisms. To achieve reliable progress, you can: Couple reliable triggers (like events from queues/topics) with manual progressability patterns like idempotency and spreading logic across response handlers. Or, add a durable runtime in the form of a Durable Execution Engine for automatic state persistence and resumability (another infrastructure dependency). Building reliable distributed systems requires thoughtful choices about coordination and progress. Architecture decisions are often about managing complexity in systems that span teams, services, and failure domains. We can simplify the decision making process with a mental framework based on a graph of nodes, edges, and workflows . The reliability of distributed work in this graph comes from coordinated reliable progress (of reliable triggers, progressable work, choreography and orchestration). To summarize our model, the graph is composed of: Nodes: Microservice functions, FaaS functions, stream processing jobs and AI agents. Edges are RPC, queues, event streams. These vary widely in semantics, some are ephemeral, others durable, which affects reliability. There are direct and indirect edges. Workflows are sub-graphs (or connected components as in graph theory) of the Graph. Coordination strategies shape how workflows are built and maintained: Choreography (reactive, event-driven) provides high decoupling and flexibility. Orchestration (centralized, procedural) offers greater clarity and observability. Reliable progress hinges on two core concepts: Durable Triggers : Work must be initiated in a way that survives failure (e.g., Kafka, queues, reliable RPC). Progressable Work : Once started, work must be able to make progress under adverse conditions via replayability using patterns such as idempotency, atomicity, or the ability to resume from saved state. Coupling is an ever present property that must be balanced with other needs and constraints. Using this graph model, we can apply a loose decision framework: Ask whether an edge should be a reliable trigger, and if so, what form serves your reliability and coupling requirements? Is it a direct or indirect edge? Ask whether a node needs progressable work capabilities , and whether idempotency, transactions, or durable state persistence best fits your context? What programming style are you comfortable with? What infrastructure dependencies are you willing to take on? Consider the coordination trade-offs , does this workflow need choreography's flexibility or orchestration's clarity? Is there a core workflow in here, with only direct edges that can be spun out? Is the graph highly connected and dynamic? Consider the programming model . Do you prefer procedural code or does a more continuous dataflow programming model suit you, or the problem being solved? Also consider the complexity of the code written by developers and also the total complexity of the system . Durable execution can make some code simpler to write, but it’s also another middleware to support, with failure modes of its own, just like a distributed queue or event stream. Your specific needs will be specific, complex, with a number of constraints. But you can use this mental framework to define your own more rigorous decision framework to make more informed and balanced architecture decisions. I hope this graph model has been useful for you, no matter your preferences regarding the coordination, communication and programming styles. A theme I had not expected when I started out writing this analysis was the unifying thread of durability . Durability is behind the idea that distributed work should not vanish or halt when something fails. Whether in communication (reliable triggers), in execution (progressable work), or in system design (trees and logs), durability underpins coordination and recoverability. Durability isn’t just about data, but about progress too. It’s a foundational property that can be built into functions, microservices, and stream processors alike, the only question is in what form. Coordinated Progress series links: Seeing the system: The Graph Making Progress Reliable Coupling, Synchrony and Complexity A Loose Decision Framework The durable trigger is the event log itself. The progressable work is defined through operators, keyed state, and fault-tolerant state backends. What’s your coding style preference? What's your coupling tolerance? What infrastructure dependencies can you tolerate? Couple reliable triggers (like events from queues/topics) with manual progressability patterns like idempotency and spreading logic across response handlers. Or, add a durable runtime in the form of a Durable Execution Engine for automatic state persistence and resumability (another infrastructure dependency). Nodes: Microservice functions, FaaS functions, stream processing jobs and AI agents. Edges are RPC, queues, event streams. These vary widely in semantics, some are ephemeral, others durable, which affects reliability. There are direct and indirect edges. Workflows are sub-graphs (or connected components as in graph theory) of the Graph. Choreography (reactive, event-driven) provides high decoupling and flexibility. Orchestration (centralized, procedural) offers greater clarity and observability. Durable Triggers : Work must be initiated in a way that survives failure (e.g., Kafka, queues, reliable RPC). Progressable Work : Once started, work must be able to make progress under adverse conditions via replayability using patterns such as idempotency, atomicity, or the ability to resume from saved state. Ask whether an edge should be a reliable trigger, and if so, what form serves your reliability and coupling requirements? Is it a direct or indirect edge? Ask whether a node needs progressable work capabilities , and whether idempotency, transactions, or durable state persistence best fits your context? What programming style are you comfortable with? What infrastructure dependencies are you willing to take on? Consider the coordination trade-offs , does this workflow need choreography's flexibility or orchestration's clarity? Is there a core workflow in here, with only direct edges that can be spun out? Is the graph highly connected and dynamic? Consider the programming model . Do you prefer procedural code or does a more continuous dataflow programming model suit you, or the problem being solved? Also consider the complexity of the code written by developers and also the total complexity of the system . Durable execution can make some code simpler to write, but it’s also another middleware to support, with failure modes of its own, just like a distributed queue or event stream. Seeing the system: The Graph Making Progress Reliable Coupling, Synchrony and Complexity A Loose Decision Framework

0 views
Jack Vanlightly 5 months ago

Coordinated Progress – Part 3 – Coupling, Synchrony and Complexity

In part 2 , we built a mental framework using a graph of nodes and edges to represent distributed work. Workflows are subgraphs coordinated via choreography or orchestration. Reliability, in this model, means reliable progress: the result of reliable triggers and progressable work. In part 3 we refine this graph model in terms of different types of coupling between nodes, and how edges can be synchronous or asynchronous. Let’s set the scene with an example, then dissect that example with the concepts of coupling and communication styles. Let's say we have these services: Inventory Service (checks/reserves stock) Payment Service (processes payment) Shipping Service (arranges delivery) Notification Service (sends confirmations) Each service reacts independently to events: Compensations are also event-driven: Failure handling : If a payment fails, the Payment Service publishes PaymentFailed. The Inventory Service listens for this and releases the reserved stock. The Shipping Service ignores the order since it never sees both required events. Key traits in action : Decoupled : Payment Service doesn't know or care about Inventory Service Temporal decoupling : If Shipping Service is down, events wait in the queue. Reliable progress : Durable events act as reliable triggers of work. Emergent workflow : Easy to add new services (like Fraud Detection) that react to existing events. A central Order Orchestrator manages the entire flow: Key traits in action : Centralized control : Easy to see the entire workflow in one place. Clear compensation : If payment fails, the orchestrator explicitly releases stock. Reliable progress : DEE ensures the workflow resumes exactly where it left off after crashes. Tight coupling : Orchestrator must know about all participating services. Choreography : Add the Fraud Service that listens to OrderPlaced, publishes FraudCheckPassed/Failed. Update Payment Service to wait for fraud clearance. No changes to other services. Orchestration : Modify the central orchestrator code to add the fraud check step. Deploy the new version carefully to handle in-flight workflows (versioning is one serious challenge for orchestration-based workflows). The fraud check is probably connected by a direct edge , i.e, it is necessary for the order process to complete. Now let’s add some tangential actions, such as updating a CRM, a reporting service and logging the order in an auditing service. Each gets added one at a time over a period of weeks. Choreography : No changes to existing services. Make the CRM, reporting service and auditing service listen for the order_placed and order_cancelled events. Orchestration : Modify the central orchestrator code to add the call to the CRM and deploy. Followed by the financial reporting service, then the auditing service. Choreography : Check logs across multiple services, correlate by order ID, reconstruct the event flow. Orchestration : Look at the orchestrator's execution history. As you can see from this limited example, there are a wealth of pros and cons to consider. Now let's use this example, along with additional concepts around coupling and communication styles, to refine our mental framework further. Coupling is a key consideration when designing service-to-service communication. It comes in different forms, most notably design-time coupling and runtime coupling . Design-time coupling refers to how much one service (a node in our graph) must know about another in order to interact. RPC introduces strong design-time coupling: the caller must know the callee's interface, expected request structure, response shape, and often its semantic behavior (such as throttling and latency profile). Even changes to the internal implementation of the callee can break the caller if not carefully abstracted. Code that triggers work via RPC must be changed every time a work dependency changes, is added or removed. Events also introduce design-time coupling, primarily around shared schemas. However, they also reduce coupling in other forms. For example, in a choreographed architecture, producers don’t need to know who consumes the event or how it’s processed. This allows new consumers to be added later without changes to the producer. Services evolve more independently, as long as schema evolution practices (e.g. schema versioning, compatibility guarantees) are respected. For example, the service emitting the PaymentProcessed event doesn’t need to know whether shipping, analytics, or notification systems will consume it, or even whether those systems exist yet. This contrasts with orchestration, which may have to know about these services and have to be updated when new dependencies are added (like in our e-commerce example). Runtime coupling is about whether services need to be up and available at the same time. RPC is tightly runtime-coupled: if service A calls service B synchronously, B must be up, fast, and reliable. Chains of RPC calls can create fragile failure modes, where one slow or unavailable service can stall or crash an entire request path.  As Leslie Lamport famously said: “A distributed system is one in which the failure of a computer you didn't even know existed can render your own computer unusable.” — Leslie Lamport He must have been thinking about RPC call chains. But request/response implemented over queues can suffer from similar runtime coupling, if the timing expectation is the same as RPC. We’ll get into timing expectations in the next section. In contrast, publish-subscribe using events is free from runtime coupling between services. A service can emit an event and move on, trusting that any consumers (if they exist) will eventually process it when ready.  Both RPC and events come with coupling trade-offs. Events reduce direct entanglement between services, especially at runtime, while still requiring careful attention to schema contracts. RPC is simpler in some use cases, but creates tighter coupling that can make systems more brittle over time. My colleague Gunnar Morling wrote a nice piece Synchrony Budget that is relevant here. Communication between services isn’t just synchronous vs asynchronous, it exists on a continuum of timing/response expectations. I will differentiate here between a response and a confirmation .  I use the term confirmation for an immediate reply of “ I’ve got this, trust me to get it don e”. A service that carries out the work asynchronously, will still respond with a 200 to confirm to the caller that they received the request successfully. Kafka will send a producer acknowledgement once the batch has been committed.  I use the term response for a reply (that might come later and possibly even through a different communication medium) that contains data associated with the requested work now done. Responses sit on a continuum. At one end is immediate response, where the caller needs an answer now to proceed. At the other end is no response at all , where the sender simply informs the system of something that happened. In between lies relaxed response timing, where the result is needed eventually, but not right away. RPC is typically on the left . It’s used when the caller needs to block and wait for the result, with a short latency requirement. One example is “reserve inventory before payment” in a synchronous checkout flow. This tight timing requirement makes the caller innately dependent on the callee's availability and latency, making an RPC a valid choice. Events sit on the right . They’re used when a service simply emits a fact (e.g. “order placed”), without expecting a reply. Consumers handle the event independently and asynchronously. Relaxed timing responses live in the middle . Some use asynchronous RPC (like invoking a function and getting a callback later, e.g. via a webhook or polling), and some use queue-based patterns. Asynchronous RPC is still runtime-coupled, whereas queues act as the fault-tolerant middleware that will eventually deliver the request to the destination. While RPC and events are often presented as opposites, real systems use hybrids. RPC can be made asynchronous. Events can be used for request/response with correlation IDs. But starting with this continuum helps frame the key trade offs. We defined Reliable RPC previously as being “delivered” by a fault-tolerant middleware, such as a Durable Execution Engine (DEE). This gives RPC some queue-like properties. For one, it reduces runtime coupling between the sender and receiver. If the receiver is down when the sender sends it, no worries, the RPC will eventually get delivered to the receiver, just like an event/command on a queue. Using the graph terminology from parts 1 and 2 , it has turned an ephemeral edge into a reliable one (a reliable trigger). In fact, I tend to think of Reliable RPC as another implementation of point-to-point queues, which are either one-way or request/response. Of course, this is most useful in relaxed timing scenarios. But what about resumability? If a Reliable RPC is like a synchronous RPC but with reliable delivery (of both request and response), then what about the sender’s context and resuming once the result has been received? The sender is keeping a bunch of variables and context in its stack memory, and if the host fails, then it cannot resume once the Reliable RPC has completed.  Let’s take an example of a two step process: Make a call to reserve stock. Make a call to the payment service. This is a relaxed timing scenario, the user has been told the order is in progress. Let’s say that the 1st call is made, the stock gets reserved, but the calling context dies before the response is received. How does this work get resumed? Asynchronous RPC and request/response-over-queues address this resumability in the above scenario in the following ways: The response is not directly received by the calling context, but via an event handler receiving a response from a queue or a webhook RPC. So it doesn’t matter that the original context is dead. It could be a completely different instance of the application that receives the response. The state necessary for moving onto step 2 is either contained in the response, or a correlation id is used in the request and the response, and the necessary state was written to a data store, using the correlation id as the key. So the receiver can retrieve the necessary state. The response handler then calls the payment service. The downside of this approach is the code is spread across response handlers, making it more complex to write, read and debug. With queues or webhooks, the response arrives out-of-band, and correlation IDs must be managed manually. Some DEEs eliminate this complexity by letting you write code as if it were synchronous. The framework persists intermediate state and retriggers the function for it to make progress. The DEE acts as the reliable delivery middleware for requests and responses (like a queue), but also abstracts away all the response handler stuff. All the code exists in one function and if the function fails, the DEE invokes it again, but crucially, the code is written using the DEE SDK and the SDK silently retrieves the state regarding the function progress (including prior responses) so that code essentially resumes from where it left off (by using persisted state to skip prior work). Determinism is required for this strategy to work. For example, if the function starts by generating a random UUID as the identifier for the work (such as an order), then a second invocation would generate a different UUID. This is a problem if half the steps have been carried out already, using a different UUID. To cover those needs the SDK will provide a deterministic UUID (basically it durably stored the result of the first UUID generation) or datetime or integer etc. Reliable RPC coupled with resumability simplifies the code (no callbacks, response handlers, correlation ids etc). All the code can exist in one function, but can resume from a different application instance after failure. With a reductionist mindset, we could say that Reliable RPC + Resumability is a convenience solution to avoid needing to build complex asynchronous response handling. But this is not a trivial aspect at all. It can make the developer’s life easier and make for more readable code. Bringing this back to graph model, not all edges are created equal. Direct edges represent dependencies where the workflow cannot proceed without some kind of response. In the e-commerce flow, the order cannot be completed without reserving inventory and processing the payment first. These are blocking dependencies, though the timing requirements could be short (ideal for RPC) or long (best suited to reliable forms of communication such as a queue, event stream or Reliable RPC). When a workflow has blocking dependencies, the coupling between steps already exists at the business logic level. The question becomes whether to make this coupling explicit through orchestration or implicit through event choreography. The coupling cost of orchestration may be worth paying because these services already have a runtime dependency, they need each other to function. Making this explicit through orchestration may reduce overall system complexity compared to managing the same coordination through distributed event flows. However, even for a core workflow of direct edges, the decoupled nature of choreography might win out. It may simply not make sense for one piece of a wider workflow to be modeled as an orchestrated workflow, with different technologies, support issues, versioning strategies and deployment procedures. Also, team autonomy and team boundaries may cross such a workflow such that the decoupling of choreography is still best. Indirect edges represent actions that are operationally or even strategically important but don't block the immediate workflow from completing successfully. The reliable indirect edge ensures that important work eventually gets carried out. In our e-commerce example, CRM updates, financial reporting, and audit logging might be business-critical or legally required, but the customer's order can be fulfilled even if these systems are temporarily unavailable. Indirect edges will also often cross more granular organizational boundaries, where the cost of inter-team coordination is higher. The order processing logic is likely focused on a small number of teams in the software development org, whereas financial and auditing services/software systems are likely managed by a different set of teams, potentially under different management hierarchies. For indirect edges, events match this natural business decoupling because: Separate concerns are decoupled : If CRM updates, financial reporting, and audit logging are separate business concerns, they shouldn't be embedded in order processing logic. Every time a new system needs to know about orders (fraud detection, analytics, customer success tools), the core orchestrator would need modification, deployment, and versioning. Reduces coordination overhead : Teams owning peripheral systems can independently subscribe to order events without requiring changes from the order processing team. Prevents scope creep : The core workflow stays focused on its essential purpose rather than accumulating tangential responsibilities. The principle is that orchestration should be limited to direct edge subgraphs i.e., the minimal set of services that must coordinate to complete the core business function. Everything else should use choreography to preserve the business-level decoupling that already exists. In Part 4 , we’ll finish the series with some last reflections and a loose decision framework. Coordinated Progress series links: Seeing the system: The Graph Making Progress Reliable Coupling, Synchrony and Complexity A Loose Decision Framework Inventory Service (checks/reserves stock) Payment Service (processes payment) Shipping Service (arranges delivery) Notification Service (sends confirmations) Decoupled : Payment Service doesn't know or care about Inventory Service Temporal decoupling : If Shipping Service is down, events wait in the queue. Reliable progress : Durable events act as reliable triggers of work. Emergent workflow : Easy to add new services (like Fraud Detection) that react to existing events. Centralized control : Easy to see the entire workflow in one place. Clear compensation : If payment fails, the orchestrator explicitly releases stock. Reliable progress : DEE ensures the workflow resumes exactly where it left off after crashes. Tight coupling : Orchestrator must know about all participating services. Choreography : Add the Fraud Service that listens to OrderPlaced, publishes FraudCheckPassed/Failed. Update Payment Service to wait for fraud clearance. No changes to other services. Orchestration : Modify the central orchestrator code to add the fraud check step. Deploy the new version carefully to handle in-flight workflows (versioning is one serious challenge for orchestration-based workflows). Choreography : No changes to existing services. Make the CRM, reporting service and auditing service listen for the order_placed and order_cancelled events. Orchestration : Modify the central orchestrator code to add the call to the CRM and deploy. Followed by the financial reporting service, then the auditing service. Choreography : Check logs across multiple services, correlate by order ID, reconstruct the event flow. Orchestration : Look at the orchestrator's execution history. Make a call to reserve stock. Make a call to the payment service. The response is not directly received by the calling context, but via an event handler receiving a response from a queue or a webhook RPC. So it doesn’t matter that the original context is dead. It could be a completely different instance of the application that receives the response. The state necessary for moving onto step 2 is either contained in the response, or a correlation id is used in the request and the response, and the necessary state was written to a data store, using the correlation id as the key. So the receiver can retrieve the necessary state. The response handler then calls the payment service. Separate concerns are decoupled : If CRM updates, financial reporting, and audit logging are separate business concerns, they shouldn't be embedded in order processing logic. Every time a new system needs to know about orders (fraud detection, analytics, customer success tools), the core orchestrator would need modification, deployment, and versioning. Reduces coordination overhead : Teams owning peripheral systems can independently subscribe to order events without requiring changes from the order processing team. Prevents scope creep : The core workflow stays focused on its essential purpose rather than accumulating tangential responsibilities. Seeing the system: The Graph Making Progress Reliable Coupling, Synchrony and Complexity A Loose Decision Framework

0 views
Jack Vanlightly 5 months ago

Coordinated Progress – Part 2 – Making Progress Reliable

In part 1 , we described distributed computation as a graph and constrained the graph for this analysis to microservices, functions, stream processing jobs and AI Agents as nodes, and RPC, queues, and topics as the edges.  For a workflow to be reliable, it must be able to make progress despite failures and other adverse conditions. Progress typically depends on durability at the node and edge levels. Reliable progress relies on a reliable trigger of the work and the work being “progressable”. Reliable Progress = Reliable Trigger + Progressable Work Progressable Work is any unit of work that can make incremental, eventually consistent progress even in the face of failures. Typically, a reliable trigger causes a repeated attempt at performing the work, which may include one or more of the following patterns: Implements idempotency (so repeated attempts don’t cause duplication or corruption). Durably logs partial progress , so it can resume from where it left off (thereby avoiding duplication or corruption). Work that is atomic (via a transaction) is also helpful for consistency, though idempotency is also required to avoid duplication. Transactions may not be an option, and so reliable eventual consistency must be implemented instead (via progressable work). Progressable work is work that is safe to retrigger, rewind, or resume. But what re-triggers the work? This is where source state durability is required. The thing that triggers the work, and its associated state must also be reliable and therefore durable . The classic reliable trigger is the message queue. So, summarizing, if we look at any given node in the graph, the reliability stems from: A Reliable Trigger . This will require a form of durability. Progressable Work . Either: No controls, as duplication or inconsistency doesn’t matter. Idempotency of tasks, so a re-triggering avoids duplication/inconsistency. Transactions. Usually only available (or desired) from a single data system, such as a database. Still relies on idempotency to avoid duplication. Durable logging of work progress , with the ability to resume where it left off (by caching intermediate results). Some mix of the above. A stream processing job , such as Flink or Kafka Streams, ensures reliable progress by durably logging its progress via state persistence/changelogs (Progressable Work) and relying on a durable data source (Reliable Trigger) in the form of a Kafka topic. A reliable function will have a Reliable Trigger and implement Progressable Work. A reliable trigger for a function/microservice will be an RPC or a message (event/command) from a queue or topic. Queues and topics are highly available, fault-tolerant, durable middlewares (and therefore great as reliable triggers). A queue durably stores messages (events or commands) until deleted. A message can trigger a function, and the message is only deleted once the function has successfully run. A message on a topic is basically the same, except that messages can be replayed, adding an additional layer of durability. Note that some queues can also do replay. RPC is not innately reliable, it depends on the caller being available to maintain state in memory and reissue the RPC if a retry is needed. In typical microservices/functions, the function instance is not fault-tolerant, so it cannot do that. For RPC to become innately reliable, it must also be “delivered” via a highly available, fault-tolerant, durable middleware. This is one of the roles of the Durable Execution Engines (DEE), and we’ll refer to RPCs mediated by these engines as Reliable RPC . Progressable work , as already described, has two main options: idempotency or durable logging of work progress (and transactions can play a role for consistency). AI agents will become more deeply embedded in distributed systems as the category gains traction, with agents taking actions, calling APIs, and generating decisions. The need for reliable progress applies here too. Like any other node in a distributed workflow, an AI agent’s actions must be triggered reliably and either complete successfully or fail in a way that can be handled. Many durable execution start-ups are now focusing on AI agents as a core use case. The Flink AI Agent sub-project (see FLIP-531 ) has also been spurred by the need for agents to progress reliably through a series of actions. Its approach is to treat AI inference and decision-making as part of the dataflow, with exactly-once semantics , checkpointing, and state management taken care of, in part, by Flink’s existing progressable work infrastructure. So far, we have described distributed computation as a graph and constrained the graph for this analysis to microservices, functions, stream processing jobs, and AI qgents as nodes, and RPC, queues, and topics as the edges. Secondly, we have identified reliable progress as the combination of Reliable Triggers and Progressable Work (where durability plays a key role). Reliable Progress = Reliable Triggers + Progressable Work Now, we will examine work that spans multiple nodes of the graph, using the term 'workflow'. A critical aspect of workflow is coordination . A workflow forms a graph of steps, where each node or edge in the graph can fail in unique ways. A failure may be simple, such as a service being unavailable to invoke, or complex, such as a partial failure where only half of the action was taken, leaving the other half undone (and inconsistent). If we imagine a continuum, at each extreme we find a different strategy for coordinating such a workflow: Use a decentralized, event-driven, choreography-based architecture.  Use a centralized, RPC-heavy, procedural orchestrator. Key traits: Decentralized coordination. Reactive autonomy. Coordinated by asynchronous events. Decoupled services. Event-driven choreography supports workflow in multiple ways: Events act as reliable triggers . Only once an event has been fully processed will it become unreadable (by an offset advancing in Kafka or an event being deleted from a queue). If a failure occurs during the processing of an event, then the event remains to be consumed again for a retry. Asynchronous: Consuming services do not need to be available at the same time as publishing services, decoupling the two temporally, increasing reliability. Events can be replayed . Even processed events can be replayed if necessary (given the event streaming service supports that). Events trigger rollbacks and compensations (sagas) . If an error occurs, a service can emit a failure event that other services can subscribe to, in order to perform rollbacks and compensation actions. Kafka transactions support advancing the offset on the input topic and publishing an error event as an atomic action. Long-running workflows . The workflow can pause and resume implicitly, based on the timing of published events. However, said timing is not typically controlled by the event streaming service.  Reliable event consumers must implement Progressable Work in some way, with idempotency being a common one. While I list decoupling as the main pro, its impact cannot be understated. Highly decoupled services ( a very big deal ) Independent service evolution and deployment (design-time decoupling). Limited blast radius during failures (runtime decoupling). Downstream services that execute subsequent workflow steps do not need to be available when the current step executes (temporal decoupling). Highly adaptable architecture. Scales naturally with system growth. New consumers can react to events without requiring changes to existing publishers. Flexible composition. Workflows can evolve organically as new consumers are added, enabling emergent behavior. Reasoning about complex flows Hard to reason about for complicated workflows or sprawling event flows. It can be challenging to find where a choreographed workflow starts and where it ends, particularly as it crosses team boundaries.  Ownership of the workflow is distributed to the participating teams. Monitoring and debugging It can be difficult to debug the workflows that cross many different boundaries. Monitoring workflows is challenging due to its decentralized and evolvable nature. Consistency challenges Compensations can be harder to reason about as logic is spread across consumers, making it sometimes challenging to verify full undo/compensation without strong observability. Non-deterministic execution: more likely to see race conditions (such as receiving OrderModified before OrderCreated). No in-built Progressable Work tooling, only reliable triggers. Developer training Developers must learn event modelling. Developers may need to learn how to use Kafka or a queue reliably (or use a high-level framework). Key traits: Centralized coordination. Procedural control. RPC triggers subordinate microservices/functions. Well-defined scope. “Orchestration engines” refers to DEEs such as Temporal, Restate, DBOS, Resonate, LittleHorse, etc, which all have slightly different models (which are out of scope for this document).  An orchestrated workflow is a centrally defined control flow that coordinates a sequence of steps across services. It behaves like a procedural program. Procedural code . The orchestrated workflow is written like regular code (if-then-else). Progressable Work via durable logging of work progress . State is persisted so it can survive restarts, crashes, or timeouts, and can resume based on timers. This is key for completing sagas. Reliable Triggers via reliable RPC (if the framework supports that) or events (can integrate with event streams). Centralized Control Flow . Unlike choreography, where each service reacts to events, orchestration has one logical owner for the process: the orchestrator. Explicit Logic for Branching, Looping, and Waiting . This may use regular code constructs such as in Function-based Workflow, or may use a framework SDK for these Ifs and Loops in Graph-based Workflow. Long-running workflows. An orchestrated workflow can pause, and then resume based on 1st class triggers or timers. An orchestrated workflow is like a function, and therefore, it needs its own Reliable Trigger and the orchestration code must implement Progressable Work. A reliable trigger of the workflow itself could be: An event or command delivered via a queue or topic. A Reliable RPC mediated via a Durable Execution Engine. Progressable work could be implemented entirely via idempotent steps (though this may not be practical as a general rule as idempotency can be hard to implement). Therefore, the durable logging of work progression (by a Durable Execution Engine) can add value. Centralized control flow makes it: Simpler to reason about. Easier to monitor and debug. Reliable Triggers and Progressable Work are built into DEE frameworks. Reliable RPCs can function without temporal coupling (via DEE). Compensation actions are clearly linked to failures and made reliable. Challenging to version. It can be hard to update workflows while supporting existing workflow executions. Long-running workflows could conceivably have multiple versions running concurrently. The orchestration (or DEE) service is another infrastructure dependency. Orchestration code belongs to one team, but that team must coordinate with teams of the subordinate microservices. Orchestration can lead to tighter coupling without discipline. This can conflict with microservices autonomy and bounded context independence. Greater design-time coupling leads to more versioning as flows change. Developer training: Developers must learn the programming model of the specific DEE (all are a bit different). Developers must learn about deterministic workflow execution, step vs workflow boundaries and avoiding anti-patterns such as God workflows which control everything. In Part 1 , I described how not all edges in a workflow graph are equal. Some are direct dependencies which are essential steps that must succeed for the business goal to be achieved. For example, the edge from the Order Service to the Payment Service during checkout is part of the core execution path. If it fails, the workflow fails. Other edges are indirect . These represent auxiliary actions triggered by the workflow, such as updating a CRM, reporting service or auditing. While important, they are not critical to completing the core task itself. Often these just need to be reliable, but are triggered in a decoupled and one-way fashion. In orchestration , these distinctions matter. A well-designed orchestrator should focus only on the minimal set of steps required to drive the business outcome (the direct edges). Incorporating indirect actions directly into the orchestrator increases coupling, inflates the workflow definition, and introduces more reasons to redeploy or version the orchestrator when non-essential concerns change. Choreography , by contrast, treats direct and indirect edges the same. Events flow outward, and any number of services can react. There is no centralized control, and thus no enforced boundary around what "belongs" to a given workflow. This can be both a strength (such as encouraging extensibility) and a weakness. The main weakness being that it is harder to reason about what constitutes the workflow's critical path. Choreography and orchestration are both essential patterns for coordinating distributed workflows, but they offer different properties. What they share is the need for durability in order to provide reliability. Orchestration looks promising for mission-critical core workflows because of its superior understandability, observability, and debuggability. With a centralized control flow, durable state, and explicit compensation handling, orchestration frameworks make core workflows easier to understand, monitor, and debug. The orchestration engine provides Reliable Trigger and Progressable Work support. But such orchestration should be limited to islands of core workflows, connected by indirect edges in the form of events. Choreography , on the other hand, is indispensable for decoupling systems, allowing services to react to events without tight coupling or centralized control.  Design-time decoupling enables teams to build, deploy, and evolve services independently, reducing coordination overhead and supporting faster iteration. Runtime decoupling minimizes blast radius by isolating failures — one service can fail or degrade without directly affecting others.  Temporal decoupling allows producers and consumers to operate on different schedules, enabling long-running workflows, asynchronous retries, and increased resilience to transient outages.  Together, these forms of decoupling promote architectural flexibility and team autonomy. Events act as Reliable Triggers, and the event consumers must decide how to implement Progressable Work themselves. In practice, many operational estates could benefit from a hybrid coordination model. There are two types of hybrid: Mixing choreography and orchestration across the graph . As I already described, orchestration should not control the entire execution flow of a system. Unlike choreography, which can span an entire system due to its decoupled nature, orchestration should focus on well-defined processes. These orchestrated workflows can still integrate with the broader choreographed system by emitting events, responding to events, and acting as reliable islands of control within a larger event-driven architecture. Using orchestration via events (or event-mediated orchestration). In this hybrid model, the orchestration code does not use RPC to invoke subordinate microservices, but sends commands via queues or topics. Subordinate microservices use the events as Reliable Triggers, implement their own Progressable Work, and send responses to another queue/topic. These responses trigger the next procedural set of work. In this model, Reliable Triggers are handled by queues/topics, and Progressable Work is either done via idempotency or durable logging of work progress. This can avoid the need for a full DEE, but might require custom durable logging. Now that we have the mental model in place, Part 3 will refine the model further, with concepts such as coupling and synchrony. Coordinated Progress series links: Seeing the system: The Graph Making Progress Reliable Coupling, Synchrony and Complexity A Loose Decision Framework Implements idempotency (so repeated attempts don’t cause duplication or corruption). Durably logs partial progress , so it can resume from where it left off (thereby avoiding duplication or corruption). A Reliable Trigger . This will require a form of durability. Progressable Work . Either: No controls, as duplication or inconsistency doesn’t matter. Idempotency of tasks, so a re-triggering avoids duplication/inconsistency. Transactions. Usually only available (or desired) from a single data system, such as a database. Still relies on idempotency to avoid duplication. Durable logging of work progress , with the ability to resume where it left off (by caching intermediate results). Some mix of the above. Use a decentralized, event-driven, choreography-based architecture.  Use a centralized, RPC-heavy, procedural orchestrator. Decentralized coordination. Reactive autonomy. Coordinated by asynchronous events. Decoupled services. Events act as reliable triggers . Only once an event has been fully processed will it become unreadable (by an offset advancing in Kafka or an event being deleted from a queue). If a failure occurs during the processing of an event, then the event remains to be consumed again for a retry. Asynchronous: Consuming services do not need to be available at the same time as publishing services, decoupling the two temporally, increasing reliability. Events can be replayed . Even processed events can be replayed if necessary (given the event streaming service supports that). Events trigger rollbacks and compensations (sagas) . If an error occurs, a service can emit a failure event that other services can subscribe to, in order to perform rollbacks and compensation actions. Kafka transactions support advancing the offset on the input topic and publishing an error event as an atomic action. Long-running workflows . The workflow can pause and resume implicitly, based on the timing of published events. However, said timing is not typically controlled by the event streaming service.  Highly decoupled services ( a very big deal ) Independent service evolution and deployment (design-time decoupling). Limited blast radius during failures (runtime decoupling). Downstream services that execute subsequent workflow steps do not need to be available when the current step executes (temporal decoupling). Highly adaptable architecture. Scales naturally with system growth. New consumers can react to events without requiring changes to existing publishers. Flexible composition. Workflows can evolve organically as new consumers are added, enabling emergent behavior. Reasoning about complex flows Hard to reason about for complicated workflows or sprawling event flows. It can be challenging to find where a choreographed workflow starts and where it ends, particularly as it crosses team boundaries.  Ownership of the workflow is distributed to the participating teams. Monitoring and debugging It can be difficult to debug the workflows that cross many different boundaries. Monitoring workflows is challenging due to its decentralized and evolvable nature. Consistency challenges Compensations can be harder to reason about as logic is spread across consumers, making it sometimes challenging to verify full undo/compensation without strong observability. Non-deterministic execution: more likely to see race conditions (such as receiving OrderModified before OrderCreated). No in-built Progressable Work tooling, only reliable triggers. Developer training Developers must learn event modelling. Developers may need to learn how to use Kafka or a queue reliably (or use a high-level framework). Centralized coordination. Procedural control. RPC triggers subordinate microservices/functions. Well-defined scope. Procedural code . The orchestrated workflow is written like regular code (if-then-else). Progressable Work via durable logging of work progress . State is persisted so it can survive restarts, crashes, or timeouts, and can resume based on timers. This is key for completing sagas. Reliable Triggers via reliable RPC (if the framework supports that) or events (can integrate with event streams). Centralized Control Flow . Unlike choreography, where each service reacts to events, orchestration has one logical owner for the process: the orchestrator. Explicit Logic for Branching, Looping, and Waiting . This may use regular code constructs such as in Function-based Workflow, or may use a framework SDK for these Ifs and Loops in Graph-based Workflow. Long-running workflows. An orchestrated workflow can pause, and then resume based on 1st class triggers or timers. An event or command delivered via a queue or topic. A Reliable RPC mediated via a Durable Execution Engine. Centralized control flow makes it: Simpler to reason about. Easier to monitor and debug. Reliable Triggers and Progressable Work are built into DEE frameworks. Reliable RPCs can function without temporal coupling (via DEE). Compensation actions are clearly linked to failures and made reliable. Challenging to version. It can be hard to update workflows while supporting existing workflow executions. Long-running workflows could conceivably have multiple versions running concurrently. The orchestration (or DEE) service is another infrastructure dependency. Orchestration code belongs to one team, but that team must coordinate with teams of the subordinate microservices. Orchestration can lead to tighter coupling without discipline. This can conflict with microservices autonomy and bounded context independence. Greater design-time coupling leads to more versioning as flows change. Developer training: Developers must learn the programming model of the specific DEE (all are a bit different). Developers must learn about deterministic workflow execution, step vs workflow boundaries and avoiding anti-patterns such as God workflows which control everything. Design-time decoupling enables teams to build, deploy, and evolve services independently, reducing coordination overhead and supporting faster iteration. Runtime decoupling minimizes blast radius by isolating failures — one service can fail or degrade without directly affecting others.  Temporal decoupling allows producers and consumers to operate on different schedules, enabling long-running workflows, asynchronous retries, and increased resilience to transient outages.  Seeing the system: The Graph Making Progress Reliable Coupling, Synchrony and Complexity A Loose Decision Framework

0 views
Jack Vanlightly 5 months ago

Coordinated Progress – Part 1 – Seeing the System: The Graph

At some point, we’ve all sat in an architecture meeting where someone asks, “ Should this be an event? An RPC? A queue? ”, or “ How do we tie this process together across our microservices? Should it be event-driven? Maybe a workflow orchestration? ” Cue a flurry of opinions, whiteboard arrows, and vague references to sagas. Now that I work for a streaming data infra vendor, I get asked: “ How do event-driven architecture , stream processing , orchestration , and the new durable execution category relate to one another? ” These are deceptively broad questions, touching everything from architectural principles to practical trade-offs. To be honest, I had an instinctual understanding of how they fit together but I’d never written it down. Coordinated Progress is a 4-part series describing how I see it, my mental framework , and hopefully it will be useful and understandable to you. I anchor the mental framework in the context of workflow , using the term in a broad sense to mean any distributed work that spans multiple services (such as a checkout flow, a booking process, or a loan application pipeline). Many people use the term “saga” to describe long-running workflows that span multiple services and require coordination and compensation. This analysis uses the more general term workflow to capture a broader class of distributed work. Modern systems are no longer built as monoliths, they are sprawling graphs of computation , stitched together by APIs, queues, and streams; implemented across microservices, functions, stream processors, and AI agents. Complex workflows cross service boundaries, requiring coordination that must be both reliable and understandable, as well as flexible and adaptable. Within these graphs, the concepts of coordination and reliable progress are critically important. Coordination strategies shape how workflows are built and maintained: Choreography (reactive, event-driven) provides high decoupling and flexibility. Orchestration (centralized, procedural) offers greater clarity and observability. But not all edges in the graph are equal . Some edges are direct , defining the critical path of a workflow where failure means failure. Others are indirect , triggering auxiliary actions in adjacent or even far away services. A good mental model distinguishes between the two: orchestration should focus on direct edges, while choreography handles both naturally. Reliable progress hinges on two core concepts: Durable Triggers : Work must be initiated in a way that survives failure (e.g., Kafka, queues, reliable RPC). Progressable Work : Once started, work must be able to make progress under adverse conditions via replayability using patterns such as idempotency, atomicity, or the ability to resume from saved state. While stream processors (e.g. Flink , Kafka Streams ) and event-driven systems based on queues and topics (e.g. Kafka , RabbitMQ ) have durability built in, imperative code typically does not. Durable Execution Engines (DEEs), such as Temporal , Restate , DBOS , Resonate , and LittleHorse (among many others), aim to fill that gap in the world of imperative functions. They provide varying tooling and language support for adding durable triggers and progressable work to imperative, procedural code. This analysis constructs a conceptual framework for understanding both coordination and reliable progress in modern distributed architectures composed of microservices, functions, stream processing, and AI agents, including new building blocks made available by DEE frameworks. Neo: The Graph. Morpheus: Do you want to know what it is? Morpheus: The Graph is everywhere. It is all around us. Even now, in this very server room. You can see it when you look at your IDE or when you design your Flink topology. You can feel it when you work on microservices... when you handle incidents... when you deploy. At every level of abstraction, computation reveals itself as a graph . A function in a microservice contains a control flow graph (comprising branches, loops, and conditionals) that describes its execution logic. A Flink job is explicitly a directed graph of operators and stateful nodes connected by streams. A workflow that ties together multiple services, whether via orchestration or choreography, is also a graph, one that represents dependencies, event flows, or command sequences. Coordination plays a critical role in this graph of graphs and is also present at every layer . Within a single Flink job, it is Flink itself that coordinates work across multiple task managers. Within a microservice, the executable code acts as a linear or concurrent coordination of multiple steps that may invoke other services or data systems. However, it is the coordination required for workflows across multiple systems that presents the largest challenge . Distributed coordination across heterogeneous systems, programming models and environments is a strategic concern for organizations with far-reaching consequences. This graph, made up of workflows spanning multiple systems, can make discussing topics of programming models and coordination methods confusing. For example, one consumer in an event-driven architecture may execute its code procedurally in an imperative style, but play the role of a node in a reactive architecture. A workflow can be triggered by an event, which acts as a reliable trigger for retries, or it might be triggered by an ephemeral RPC. The types of nodes and edges of the graph matter. While everything is a graph (even code), for this analysis, let’s confine things so that: Nodes are microservice functions, FaaS functions, stream processing jobs and AI agents. Edges are RPC, queues, event streams. These vary widely in semantics, some are ephemeral, others durable, which affects reliability. Workflows are sub-graphs (or connected components as in graph theory) of the Graph. The Graph. Nodes connected by edges. Workflows as sub-graphs. What constitutes a workflow is debatable, especially with the broad meaning I’m using in this analysis. But I like to think of workflow by the types of edges, which can be direct or indirect . Edges also have other properties, such as request/response vs one-way, and synchronous or asynchronous, but for now we’ll keep the model simple and think about whether edges are direct or indirect . Direct edges trigger work that is central to the goal being performed . Using an example of an Order Placed workflow. There might be a set of microservices to handle the payment, reservation of stock, initiation of shipment preparation, which are all directly tied to the order. These are all connected by direct edges and form the core order placed workflow. Indirect edges trigger tangential, auxiliary work , such as notifying the CRM for customer management, a finance/reporting system or auditing service for compliance in the order workflow. An indirect edge could even trigger a secondary core workflow, such as a just-in time inventory process. Whether an edge is direct or indirect will influence what kind of communication medium is chosen (more on that in parts 2 and 3). Workflows require coordination , whether that coordination is just a dance between independent services or something more centralized and controlled. There are two main coordination strategies: choreography and orchestration.  Choreography: Event-driven workflow (reactive). Highly decoupled microservices that independently react to their input events as they arrive. There is no blocking or waiting, and all consumers operate independently of any upstream producers or subsequent downstream consumers. Coordination via publish-subscribe semantics. The entire impact of an upstream event can spread far, and is dynamic over time. The boundaries of any given workflow within that wider event flow can be soft and hard to define (but with low coupling). Orchestration: Procedural workflow ( if-this-then-that ). Logic is centralized in some kind of orchestrator (even a microservice or function), issuing commands and awaiting responses from subordinate worker microservices. The orchestrator keeps track of which parts of the workflow have been completed, which are in progress, and which have yet to be started. It keeps track of the commands sent to the subordinate services as well as the responses from those services. Coordination via procedural orchestration semantics. The entire impact of an upstream workflow can spread far, as individual nodes can still emit events. The boundaries of a given workflow are clearly encoded in the orchestration code, albeit at the cost of increased coupling. We’ll cover choreography and orchestration in more detail in part 2. Stream processing frameworks like Apache Flink and Kafka Streams can be thought of as microservices with a configurable blend of continuous dataflow programming model and reactive event handling, designed to transform and react to streams of events in real time. Like microservices, stream processors form logical graphs of computation , using branching, joining, and aggregation to process data. However, their programming model is more constrained, being optimized for data-centric transformations of event streams rather than complex control flow or handling individual requests on demand. In the context of workflows and sagas, stream processors fit naturally into event-driven choreography as nodes in the event graph, not only performing transformations or enrichments, but also taking on roles that overlap with those traditionally handled by microservices, including stateful business logic and triggering downstream effects. Just as modern microservices are decomposed into bounded contexts following domain-driven design principles, so too should stream processors be scoped narrowly. Embedding an entire business workflow (e.g., shopping cart checkout, payment, shipping, fulfillment) into a single Flink or Kafka Streams job is generally discouraged. Instead, stream processors work best as individual nodes in a choreographed system, each independently reacting to events. Beyond participating as choreographed actors, stream processors play two valuable roles in saga and workflow architectures: Real-time triggers for workflows : Detecting event patterns (e.g., "user added to cart but didn’t check out in 1 hour") and emitting signals to start or branch workflows. Aggregated state for decisions: Continuously computing derived state (e.g., fraud scores, user behavior patterns) that orchestrators or services can query to guide workflow logic. In summary, stream processing can replace traditional microservices in choreographed workflows and enhance orchestrated workflows with real-time insights, triggers, and data transformations. However, one stream processing job would rarely include an entire workflow, just as a microservice would not. In distributed systems, durability is not just about data but about progress. A workflow that performs critical operations must either complete or fail in a controlled, recoverable way. Durable coordination ensures that steps don’t vanish into the void after a crash or network fault. No matter the execution model (procedural, event-driven, or dataflow), durability is the mechanism that transforms ephemeral logic into reliable systems. Choreography in the form of event-driven architectures (EDA) offers durability by default. Events are stored durably in a queue or log (e.g., Kafka), enabling reactive systems to recover from crashes, replay history, and trigger retries. Each service reacts independently, and progress is tracked implicitly in the event stream. In this model, the event log acts as both coordination medium and source of truth, encoding the causal structure of the system’s behavior. Imperative code, by contrast, lacks built-in durability. A service running procedural logic (e.g., "do A, then B, then C") typically stores its state in memory and relies on external systems for persistence of selective state. When a crash occurs mid-execution, everything in the call stack is lost unless explicitly saved. This gap gave rise to the Durable Execution product category, which brings event-log-like durability to imperative workflows. Durable Execution Engines (such as Temporal, Restate, DBOS) persist the workflow’s progress, key variables, intermediate results, responses from other services, and so on, allowing it to be retried, resuming exactly where it left off.  Durable Execution Engines are, in effect, the Kafka of imperative coordination (aka coordination via orchestration). Durability is also foundational in stream processing. Frameworks like Apache Flink and Kafka Streams include native durability through state persistence mechanisms (such as checkpointing, changelogs, and recovery logs), ensuring that event transformations and stateful aggregations survive failures. While the paradigm is data-centric and continuous, the core concept is the same: progress is recorded durably so that computation can continue reliably. Ultimately, everything is also a log. Whether it's a sequence of domain events, a durable workflow history, or a changelog backing a stream processor, the underlying idea is the same: encode system activity as a durable, append-only record of what has happened (and possibly what should happen next).  Making durability a first-class concern allows systems to be: Recoverable after crashes. Observable through replayable history. Reliable across asynchronous boundaries. Composable across execution models. This perspective is a start, but we need to break this down into more precise terms by creating a simple model for thinking about reliable execution and coordinated progress . Let’s do this in part 2 . Coordinated Progress series links: Seeing the system: The Graph Making Progress Reliable Coupling, Synchrony and Complexity A Loose Decision Framework Choreography (reactive, event-driven) provides high decoupling and flexibility. Orchestration (centralized, procedural) offers greater clarity and observability. Durable Triggers : Work must be initiated in a way that survives failure (e.g., Kafka, queues, reliable RPC). Progressable Work : Once started, work must be able to make progress under adverse conditions via replayability using patterns such as idempotency, atomicity, or the ability to resume from saved state. Nodes are microservice functions, FaaS functions, stream processing jobs and AI agents. Edges are RPC, queues, event streams. These vary widely in semantics, some are ephemeral, others durable, which affects reliability. Workflows are sub-graphs (or connected components as in graph theory) of the Graph. Real-time triggers for workflows : Detecting event patterns (e.g., "user added to cart but didn’t check out in 1 hour") and emitting signals to start or branch workflows. Aggregated state for decisions: Continuously computing derived state (e.g., fraud scores, user behavior patterns) that orchestrators or services can query to guide workflow logic. Recoverable after crashes. Observable through replayable history. Reliable across asynchronous boundaries. Composable across execution models. Seeing the system: The Graph Making Progress Reliable Coupling, Synchrony and Complexity A Loose Decision Framework

0 views
Jack Vanlightly 8 months ago

Log Replication Disaggregation Survey - Apache Pulsar and BookKeeper

In this latest post of the disaggregated log replication survey, we’re going to look at the Apache BookKeeper Replication Protocol and how it is used by Apache Pulsar to form topic partitions. Raft blends the roles and responsibilities into one monolithic protocol, MultiPaxos separates the monolithic protocol into separate roles, and Apache Kafka separates the protocol and roles into control-plane/data-plane. How do Pulsar and BookKeeper divide and conquer the duties of log replication? Let’s find out. There are so many systems out there, too many for me to list without this becoming a huge research project. So I’m going to stick to systems I have directly been involved with, or have a decent level of knowledge about already. Feel free to comment on social media about interesting systems that I haven’t included. I have classified a few ways of breaking apart a monolithic replication protocol such as Raft. The classifications are: (A) Disaggregated roles/participants. The participants form clearly delineated roles that can be run as separate processes. The protocol itself may still be converged (control plane and data plane intertwined) but exercised by disaggregated components. (B) Disaggregated protocol that separates the control plane from the data plane . The protocol itself is broken up into control plane and data plane duties, with clean and limited interaction between them. (C) Segmented logs . Logs are formed from a chain of logical log segments. (D) Pointer-based logs . Separates data from ordering. (E) Separating ordering from IO . Avoid the need for a leader by allowing all nodes to write, but coordinate via a sequencer component. (F) Leaderless proxies. Abstracts the specifics of the replicated log protocol from clients, via another disaggregated abstraction layer of leaderless proxies. Disaggregation categories:  (A) Disaggregated roles/participants (B) Disaggregated protocol that separates the control plane from the data plane. (C) Segmented logs. BookKeeper is a low-level distributed log service and perhaps it’s most well known usage is as the log storage sub-system of Apache Pulsar (a higher level distributed log service akin to Apache Kafka). Pulsar and BookKeeper align quite well with the Virtual Consensus abstractions . Virtual Consensus is a set of abstractions for building a segmented log. Let’s review those abstractions, then we’ll look at BookKeeper, and how Pulsar uses BookKeeper with these abstractions in mind. In Virtual Consensus we have two abstractions, the VirtualLog , and the Loglet . The Loglet is the log segment abstraction that can be chained to form segmented logs. Fig 1. The shared log, formed by a chain of loglets. The Loglet must implement fast failure-free ordering alone. That means it doesn't need to handle all failures, if it encounters a problem it can throw up its hands and notify the VirtualLog. This means the Loglet doesn’t need to be tolerant to all types of faults, only be a simple, fast data plane. No leader elections or reconfigurations are required in the Loglet abstraction. I wrote about failure-free ordering vs fault tolerant consensus that goes into this in more detail.  If a Loglet does become degraded, the Virtual Log seals the Loglet (to prevent further writes), and appends a new Loglet–known as log reconfiguration. The idea is that each Loglet represents a period of steady state where everything was going well. Fig 2. Reconfiguration moves the system from one steady state configuration to another in response to failures, policy triggers or other factors such as load balancing. Through that lens, in Virtual Consensus, the Loglet forms the data plane, and the VirtualLog acts as the control plane. Fig 3. Depicts a quorum replication Loglet implementation. Loglet 3 experiences a failed server, notifies the VirtualLog which seals Loglet 3 and then appends Loglet 4 with a new set of storage servers (without the failed one). Virtual Consensus separates the planes using abstractions. However, Virtual Consensus also has roles (that run as separate processes): Client (drives data plane and control plane) Separation of logic within the client through modularity. Log reconfiguration is generic, but relies on a Loglet module for sealing and creating new Loglets. It also relies on a metadata store for consensus. Reads and writes are routed to specific Loglets via a Loglet module. Loglet storage (data plane storage) Metastore (control plane storage) Fig 4. The three disaggregated roles of Virtual Consensus (client, Loglet storage, metastore). Pulsar/BookKeeper roles and abstractions line up with Virtual Consensus as follows: Abstractions: Loglet : BookKeeper is a log segment storage service. Log segments are known as ledgers . The Loglet abstraction aligns well with the BookKeeper ledger. VirtualLog : Pulsar has a module called ManagedLedger that equates to the VirtualLog. It’s job is to create, chain, seal ledgers to form topic partitions. Reads and writes go through the ManagedLedger which directs the reads and writes to the appropriate ledger. Client : The Pulsar broker code has a topic partition class that equates to the Virtual Consensus client as a whole. Loglet module : The BookKeeper client equates to the Loglet module. Loglet storage : BookKeeper servers, known as bookies, store the ledger data. Fig 5. Pulsar brokers, using BookKeeper as its log storage sub-system. All components depend on a common metadata store. The responsibilities of the ManagedLedger module line up well with the VirtualLog: Providing a single address space for the topic partition. This address space includes the Ledger Id and the Entry Id as a composite address id (or position). Acts as a router for append and read commands, forwarding the command to the appropriate ledger. Writes always go to the active ledger. Chaining ledgers to form an ordered log. Reconfiguring the log under failure scenarios for fault-tolerance and high availability. The ManagedLedger module only requires a versioned register for consistency, in case multiple brokers try to chain new ledgers onto the same topic partition. The ManagedLedger module depends on an external leader election capability that ensures that each partition is only served by a single broker at a time. If two brokers both try and serve the same topic partition at the same time, they will duel each other by closing each others active ledgers and trying to append new ones. The versioned register is the means to maintain consistency under such a broker battle. The data plane logic is more nuanced, and we’ll look at that next. A BookKeeper ledger is a completely independent log segment as far as BookKeeper knows. BookKeeper is only concerned with: Creating ledgers. Reading/writing entries to ledgers. Sealing ledgers (also known as closing). It knows nothing of chaining. It is firmly a Loglet implementation. The BookKeeper replication protocol is formed by the following roles, which are typically disaggregated: BookKeeper client (Loglet module):  Drives the ledger replication protocol. Reading/writing from remote storage servers (bookies). Fencing ledgers. Updating ledger metadata. Bookies (Loglet storage): Simple storage servers, with a fencing capability. Leaderless, with no replication between bookies during normal write operations. Replication is performed by the client doing quorum writes. Metadata store: Versioned registers for storing ledger metadata. Live bookie tracking (via ZK sessions). Fig 6. The BookKeeper roles of client, bookie and metadata store. A ledger is a log segment as already covered. But each ledger is also, itself, a sequence of log segments! BookKeeper uses the term fragment for the ledger segment. A ledger is formed by one or more fragments , where each fragment is a bounded log. Therefore each ledger is a log of logs, and therefore a Pulsar topic partition is a log of logs of logs! Fig 7. Depicts the metadata that represents a Pulsar topic partition, and the final ledger, which has three fragments. The range of the final ledger and final fragment is not set in metadata yet, as it is still open and accepting writes. Generically, to form a chain of log segments, only the last segment should be writable, and all prior ones must be sealed to guarantee that someone can’t write into the middle of the parent log. A ledger is sealable to facilitate this segment chaining. Likewise, within a ledger, writes only go to the final fragment. So we see that the Pulsar topic partition is segmented, but also so is the BookKeeper ledger. So both qualify for category C. Let’s get into the details of the life of a ledger, from creation, reading/writing to sealing (known as closing). When a ledger is created, there are a number of things that need to be decided: The set of bookies that will store the ledger. The number of bookies that must acknowledge a write for it to be considered committed. Ledger parameters are crucial for determining these two: Ensemble size (E) : The number of bookies that the ledger will be distributed across. Write Quorum (WQ) : The number of bookies each entry will be written to (like replication factor in Kafka). Ack Quorum (AQ) : The number of acknowledgments (from bookies) for an entry to be committed. Typically, people use E=3, WQ=3, AQ=2. Ensemble size is interesting. If we used E=6, WQ=3, then the client would stripe its quorum writes (of 3 bookies per entry) across 6 bookies. But this is rarely used in practice as it results in more random IO with the current bookie design. When a BK client creates a ledger, based on the above parameters, it chooses the ledger ensemble (the set of bookies that will store the entries) for the first fragment. It creates the metadata for that ledger, writing it to a versioned register (such as a ZooKeeper znode). The client can then begin to perform quorum writes to the ensemble of bookies. Fig 8. Ledger metadata It’s helpful here to look at the Paxos roles of Proposer and Acceptor. BookKeeper clients act as Proposers in the Paxos protocol, initiating write operations by proposing entries to be stored in ledgers. The client that creates a ledger becomes the ledger’s Distinguished Proposer (or leader), ensuring a strict sequential ordering of entries. Bookies function as Acceptors, receiving, validating, and persisting the proposed entries from clients. The client’s role as Distinguished Proposer/Leader is not validated by bookies (acceptors). It is simply ensured by clients only ever performing regular writes to the ledgers they have opened themselves. Fig 9. The client that opens a ledger is its distinguished proposer, also known as a leader. The bookie ensemble are the acceptors. Therefore we can consider the BookKeeper replication protocol a leader-based protocol, where the client is the leader. This is why Pulsar ensures that only one broker at a time can act as the owner of a topic partition.  While we can nicely line-up Paxos roles to BookKeeper, the rest diverges in a few ways. In theory, the Loglet implementation does not need to handle all types of fault, it can do failure-free ordering until something bad happens. Then the VirtualLog can kick in to do a reconfiguration.  Interestingly, a BookKeeper ledger has these Loglet properties externally, but also, internally, has some VirtualLog properties: If a BookKeeper client encounters a problem that it cannot solve (such as there not being enough functioning bookies to make progress), it closes the ledger and kicks the can to the VirtualLog layer above it. Very Loglet-like. If a write to a bookie fails, but there are other functioning bookies available, the BK client can reconfigure the ledger by appending a new fragment, with a different ensemble of bookies. Very VirtualLog-like. Fig 10. A bookie fails, causing an ensemble change. Externally, this ledger reconfiguration (known as an ensemble change) is completely abstracted.  To close a ledger, the leader client needs to update the ledger metadata, setting the ledger status to closed and setting the last entry id of the ledger. A closed ledger is bounded; it has a start entry id (0) and an end entry id.  If it is the leader client closing the ledger, then it knows the last committed entry (as it is the leader) and the metadata update is simple. Fig 11. Closing a ledger, by the leader client, is a metadata op, setting the status and last entry id of the ledger. BookKeeper itself has no leader election at all. It can get confusing here because Pulsar chooses a leader broker per partition, because BookKeeper uses a single-writer protocol. So Pulsar has a form of leader election, but BookKeeper is a layer down from that, leaving the leader election to the application layer above it. The rule is that only the BK client that creates a ledger will write to it. One ledger corresponds to one steady-state with a stable BK client. If a client dies, leaving an open ledger, then another client must seal/close the ledger on behalf of the dead client. This type of ledger closing is known as ledger recovery. This other client is the recovery client, and to close the ledger it must discover the ledger tail in order to update the ledger metadata. We can’t ask the leader client for this information, as it may be dead. The only way is to discover the ledger tail is to interrogate the bookies.  The recovery client performs ledger recovery in three phases: Fencing . A quorum of the bookie ensemble of the last fragment is fenced. Fencing is scoped to the ledger, so the recovery client sends a Fence Request with the ledger id to each bookie of the ensemble. Once a ledger is fenced on a bookie, that bookie will not accept any further regular writes for that ledger. Recovery reads and writes. The recovery client learns of the ledger tail entry by performing quorum reads. The client also performs recovery writes (a form of read repair) to increase the redundancy of the ledger tail. Recovery writes are allowed on fenced ledgers. Metadata update . The recovery client updates the ledger metadata as above (setting last entry id and status). The update is a conditional put, which provides optimistic concurrency. Fig 12. Ledger recovery. The fencing prevents a split-brain scenario where: The leader client is actually still alive and continuing to make progress on the ledger. A second client is performing recovery, and closes the ledger. Or even multiple clients competing to close the ledger. For example, if a leader commits entry 100, but another client already closed the ledger at entry 50, then we just effectively lost 50 entries. Recovery is designed to make that impossible. This is a good time to talk about Flexible Paxos. Ledger recovery uses a quorum size that corresponds to the Ack Quorum (AQ). To understand this we can look to Flexible Paxos. While traditional Paxos requires majority quorums in both phases, Flexible Paxos allows for adjustable quorum sizes as long as any two quorums intersect. This can be applied to MultiPaxos by ensuring that all phase 1 (election) quorums intersect with all phase 2 (log entry) quorums.  For example, with a cluster size of 5: With a Phase 1 quorum of two (a minority) then the Phase 2 quorum is set to four, as this guarantees overlap.  With a Phase 1 quorum of four, then the Phase 2 quorum is set to two, as this guarantees overlap.  In other words: Phase 1 Quorum + Phase 2 Quorum > Cluster Size . In BookKeeper the Phase 1 (election) quorum corresponds to the recovery quorum (RQ). For example: With WQ=5, AQ=4, then RQ=2. That is, ledger recovery will only succeed if it can fence and then read from two of the bookies of the last fragment ensemble. With WQ=5, AQ=2, then RQ=4.   A mistake I’ve seen with operating BookKeeper has been to set AQ=1, without realizing that causes RQ=WQ, which makes a ledger unrecoverable after a single bookie loss. BookKeeper itself qualifies for categories A, B and C Category A (disaggregated roles) . This part has been clear. BookKeeper clients (proposers) are separate processes to bookies (acceptors). Learners are an application layer role, so BookKeeper doesn’t really include learners. Category B (control plane/data plane) . At a macro-level, each ledger is a steady-state of data plane. But internally, each ledger can reconfigure itself (given enough functional bookies to reconfigure with).  Given this ledger reconfiguration ability, the ledger also has a control-plane/data-plane split. The ledger data plane involves the client performing reads/writes to a fragment’s ensemble of bookies. If errors occur, the client performs an ensemble change (a metadata op to an external consensus service), and starts quorum writing to the ensemble of the new fragment.  Category C (segmented log) . Each ledger is a segmented log (a log of fragments). Regarding fragments and ensemble changes, again, we can look at the following figure. Each ledger fragment is a steady state, an ensemble change (metadata op) is the reconfiguration, and the next fragment is the new steady state. It is the same role driving both (BK client), but backed by different components. Fig 12. Each fragment is a steady state configuration, an ensemble change is a reconfiguration that leads to a new steady-state fragment. You can configure a BK client to use this reconfiguration logic, or to simply close the ledger immediately on an error.  Some may ask, if Pulsar acts as the VirtualLog, why also have BookKeeper do reconfigurations ? It actually makes the BookKeeper client code more complex! The main reason that I know of, is that it is a performance optimization. When an ensemble change occurs (usually only one bookie is swapped out), there are a bunch of uncommitted entries that the client is managing. Some of these entries have been successfully written to bookies that also exist in the new fragment. So the client only has to write these uncommitted entries to the swapped in bookie, and everything continues. If we were to close the ledger, we’d have to fail all uncommitted entries, and the ManagedLedger would have to rewrite them to a new ledger. Ensemble changes avoid this fail and rewrite procedure that would likely cause a latency blip. So it does complicate the client code, but its probably worth it (and the ManagedLedger has no idea about this reconfiguration). Pulsar with BookKeeper is more disaggregated than Apache Kafka in many ways, and less in others. How it’s more disaggregated: In the data plane, we separate the proposers from the acceptors (whereas Kafka partition replicas play both roles). The log is segmented, allowing it to be spread across a pool of storage servers (whereas in Kafka it is logically one log, though the prefix can be tiered). The control plane logic is separated from consensus (as it delegates consensus to an external service). Interestingly Kafka moved away from this as it made broker reboots too slow for high partition counts (as a broker had to load so much metadata from the external store on start-up). The Kafka controller moved to the Raft state-machine model. How it’s less disaggregated: In both Pulsar and BookKeeper, the two planes are largely separated except that they are driven by the same component. Whereas Kafka has no single role that drives both the control plane and data plane, they are more separated. In Kafka, the partition replicas drive the data plane and the Controller drives the partition replication control plane. Partition replicas just keep going until they are told to change configuration by the Controller (acting as control plane). It’s not as simple as saying more disaggregated = better. Having one component driving the data and control plane is not inherently worse than having them more separated, as long as that driving component is more of a delegator rather than monolithic implementation. Likewise, delegating more or less of the consensus logic to an external service is also not inherently better or worse. It all depends. Next, we’ll look at how we can avoid centralizing all writes through a leader. The majority of the log replication systems are leader-based. In the next post of this series we’ll look at a design that separates IO from ordering. (A) Disaggregated roles/participants. The participants form clearly delineated roles that can be run as separate processes. The protocol itself may still be converged (control plane and data plane intertwined) but exercised by disaggregated components. (B) Disaggregated protocol that separates the control plane from the data plane . The protocol itself is broken up into control plane and data plane duties, with clean and limited interaction between them. (C) Segmented logs . Logs are formed from a chain of logical log segments. (D) Pointer-based logs . Separates data from ordering. (E) Separating ordering from IO . Avoid the need for a leader by allowing all nodes to write, but coordinate via a sequencer component. (F) Leaderless proxies. Abstracts the specifics of the replicated log protocol from clients, via another disaggregated abstraction layer of leaderless proxies. (A) Disaggregated roles/participants (B) Disaggregated protocol that separates the control plane from the data plane. (C) Segmented logs. Client (drives data plane and control plane) Separation of logic within the client through modularity. Log reconfiguration is generic, but relies on a Loglet module for sealing and creating new Loglets. It also relies on a metadata store for consensus. Reads and writes are routed to specific Loglets via a Loglet module. Loglet storage (data plane storage) Metastore (control plane storage) Abstractions: Loglet : BookKeeper is a log segment storage service. Log segments are known as ledgers . The Loglet abstraction aligns well with the BookKeeper ledger. VirtualLog : Pulsar has a module called ManagedLedger that equates to the VirtualLog. It’s job is to create, chain, seal ledgers to form topic partitions. Reads and writes go through the ManagedLedger which directs the reads and writes to the appropriate ledger. Client : The Pulsar broker code has a topic partition class that equates to the Virtual Consensus client as a whole. Loglet module : The BookKeeper client equates to the Loglet module. Loglet storage : BookKeeper servers, known as bookies, store the ledger data. Providing a single address space for the topic partition. This address space includes the Ledger Id and the Entry Id as a composite address id (or position). Acts as a router for append and read commands, forwarding the command to the appropriate ledger. Writes always go to the active ledger. Chaining ledgers to form an ordered log. Reconfiguring the log under failure scenarios for fault-tolerance and high availability. Creating ledgers. Reading/writing entries to ledgers. Sealing ledgers (also known as closing). BookKeeper client (Loglet module):  Drives the ledger replication protocol. Reading/writing from remote storage servers (bookies). Fencing ledgers. Updating ledger metadata. Bookies (Loglet storage): Simple storage servers, with a fencing capability. Leaderless, with no replication between bookies during normal write operations. Replication is performed by the client doing quorum writes. Metadata store: Versioned registers for storing ledger metadata. Live bookie tracking (via ZK sessions). The set of bookies that will store the ledger. The number of bookies that must acknowledge a write for it to be considered committed. Ensemble size (E) : The number of bookies that the ledger will be distributed across. Write Quorum (WQ) : The number of bookies each entry will be written to (like replication factor in Kafka). Ack Quorum (AQ) : The number of acknowledgments (from bookies) for an entry to be committed. If a BookKeeper client encounters a problem that it cannot solve (such as there not being enough functioning bookies to make progress), it closes the ledger and kicks the can to the VirtualLog layer above it. Very Loglet-like. If a write to a bookie fails, but there are other functioning bookies available, the BK client can reconfigure the ledger by appending a new fragment, with a different ensemble of bookies. Very VirtualLog-like. Fencing . A quorum of the bookie ensemble of the last fragment is fenced. Fencing is scoped to the ledger, so the recovery client sends a Fence Request with the ledger id to each bookie of the ensemble. Once a ledger is fenced on a bookie, that bookie will not accept any further regular writes for that ledger. Recovery reads and writes. The recovery client learns of the ledger tail entry by performing quorum reads. The client also performs recovery writes (a form of read repair) to increase the redundancy of the ledger tail. Recovery writes are allowed on fenced ledgers. Metadata update . The recovery client updates the ledger metadata as above (setting last entry id and status). The update is a conditional put, which provides optimistic concurrency. The leader client is actually still alive and continuing to make progress on the ledger. A second client is performing recovery, and closes the ledger. Or even multiple clients competing to close the ledger. With a Phase 1 quorum of two (a minority) then the Phase 2 quorum is set to four, as this guarantees overlap.  With a Phase 1 quorum of four, then the Phase 2 quorum is set to two, as this guarantees overlap.  With WQ=5, AQ=4, then RQ=2. That is, ledger recovery will only succeed if it can fence and then read from two of the bookies of the last fragment ensemble. With WQ=5, AQ=2, then RQ=4.   Category A (disaggregated roles) . This part has been clear. BookKeeper clients (proposers) are separate processes to bookies (acceptors). Learners are an application layer role, so BookKeeper doesn’t really include learners. Category B (control plane/data plane) . At a macro-level, each ledger is a steady-state of data plane. But internally, each ledger can reconfigure itself (given enough functional bookies to reconfigure with).  Given this ledger reconfiguration ability, the ledger also has a control-plane/data-plane split. The ledger data plane involves the client performing reads/writes to a fragment’s ensemble of bookies. If errors occur, the client performs an ensemble change (a metadata op to an external consensus service), and starts quorum writing to the ensemble of the new fragment.  Category C (segmented log) . Each ledger is a segmented log (a log of fragments). How it’s more disaggregated: In the data plane, we separate the proposers from the acceptors (whereas Kafka partition replicas play both roles). The log is segmented, allowing it to be spread across a pool of storage servers (whereas in Kafka it is logically one log, though the prefix can be tiered). The control plane logic is separated from consensus (as it delegates consensus to an external service). Interestingly Kafka moved away from this as it made broker reboots too slow for high partition counts (as a broker had to load so much metadata from the external store on start-up). The Kafka controller moved to the Raft state-machine model. How it’s less disaggregated: In both Pulsar and BookKeeper, the two planes are largely separated except that they are driven by the same component. Whereas Kafka has no single role that drives both the control plane and data plane, they are more separated. In Kafka, the partition replicas drive the data plane and the Controller drives the partition replication control plane. Partition replicas just keep going until they are told to change configuration by the Controller (acting as control plane).

0 views
Jack Vanlightly 9 months ago

Log Replication Disaggregation Survey - Kafka Replication Protocol

In this post, we’re going to look at the Kafka Replication Protocol and how it separates control plane and data plane responsibilities. It’s worth noting there are other systems that separate concerns in a similar way, with RabbitMQ Streams being one that I am aware of. There are so many systems out there, too many for me to list without this becoming a huge research project. So I’m going to stick to systems I have directly been involved with, or have a decent level of knowledge about already. Feel free to comment on social media about interesting systems that I haven’t included. I have classified a few ways of breaking apart a monolithic replication protocol such as Raft. The classifications are: (A) Disaggregated roles/participants. The participants form clearly delineated roles that can be run as separate processes. The protocol itself may still be converged (control plane and data plane intertwined) but exercised by disaggregated components. (B) Disaggregated protocol that separates the control plane from the data plane . The protocol itself is broken up into control plane and data plane duties, with clean and limited interaction between them. (C) Segmented logs . Logs are formed from a chain of logical log segments. (D) Pointer-based logs . Separates data from ordering. (E) Separating ordering from IO . Avoid the need for a leader by allowing all nodes to write, but coordinate via a sequencer component. (F) Leaderless proxies. Abstracts the specifics of the replicated log protocol from clients, via another disaggregated abstraction layer of leaderless proxies. This protocol is concerned with topic partition replication. Disaggregation categories:  (A) Disaggregated roles/participants (B) Disaggregated protocol that separates the control plane from the data plane. In my classification, B kind of implies A. It just takes it further, or perhaps, is one kind of specific way of doing (A). I have kept them separate as I think it’s worth exploring how roles can be disaggregated in a converged protocol (A) but also how roles can be disaggregated such that the protocol itself has a clear split between control and data planes (B). Before we look at the Kafka Replication Protocol, I’d like to review what the data plane and control plane are within the context of a log replication protocol. My post, Steady on! Separating Failure-Free Ordering from Fault-Tolerant Consensus also touches on this subject. The data plane is typically the logic of performing replication in a stable failure-free state (that I will call steady-state). Nothing bad is happening; the cluster is making progress in this steady-state. There might be minor failures, such as a transient connectivity issue between nodes, but nothing that requires a change to the configuration of the cluster. The data plane can be simple, as only failure-free ordering is required to be implemented (see the Steady On! post to for more on that). The control plane is the logic to make changes to the steady-state configuration, either reacting to failure, or commands from some kind of operator. A failure of some kind might be preventing the steady-state configuration from making progress. In a leader-follower system, it might mean the leader has become unreachable, requiring a leader election to elect a new leader to ensure continued availability. It could mean the permanent failure of a server in the cluster, requiring a replacement. In either case, without some kind of change, the system is either unavailable or degraded in some way. Other reasons can be to migrate workloads to avoid contention on the underlying hardware. In this case, a move may actually be the addition of a new member and the removal of an existing member. Whatever the case, the control plane takes the system from one steady-state configuration to another. Fig 1. Reconfiguration moves the system from one steady state configuration to another in response to failures, policy triggers or other factors such as load balancing. In Raft, reconfiguration specifically refers to membership changes, but I will use the term reconfiguration in a broader sense to include any change to the configuration/topology of the replication cluster (including who the leader is). Who the leader is, is not a minor detail. In Raft, reconfiguration and data replication are mixed into the same underlying log, exercised by the same nodes, and with a tight dependence between them for correctness. The same goes for MultiPaxos, which is why I use the term converged protocol for both. Separating the control plane from the data plane requires loose coupling. We want there to be little to no runtime dependency between the data plane and the control plane. In other words, if the control plane goes down, we want the data plane to continue to function. In such a scenario, the data plane should only cease to function if it exits a failure-free state, requiring intervention from the control plane. This is a general principle applied in many systems with control planes, such as Kubernetes. If the Kubernetes controller goes offline, we still want pods to function. Kafka, with KRaft, achieves this decoupling. The protocol is split into the following roles: Controller (control plane) Partition replicas (data plane) Leader and followers Fig 2. The roles in the Kafka Replication Protocol The above is the logical model of the roles within the scope of a single partition. In reality, one Controller interacts with multiple partitions.  To avoid confusion, let’s differentiate between a cluster control plane and the partition replication control plane: Cluster control plane : Changing security settings, configurations, adding/ removing topics etc. Partition replication (aka Kafka Rep Protocol) control plane : Partition replica leader election, partition replica set membership changes, etc. In Apache Kafka, both types of control plane are the responsibility of the KRaft Controller, a Raft-based replicated state machine (RSM). The fact that the KRaft Controller uses Raft is irrelevant to this discussion. The Kafka Replication Protocol only cares about the Controller role (not its implementation). We can say that the Controller state-machine is important to the Kafka Replication protocol, not the method of consensus the state-machine uses. In the next sections we’ll cover the responsibilities of each role and the communications between them. The unit of replication in the Kafka Replication Protocol is the topic partition, which is a log. There is a logical model that consists of a leader replica and multiple follower replicas. All produce requests are sent to the leader replica. Follower replicas fetch (pull) records from the leader for redundancy and to be able to take on leadership should the current leader go offline or be removed. Physical replication is actually performed at the broker-pair level, and each fetch request between a broker pair includes multiple partitions where the fetching broker includes all partitions that have a follower replica on the fetching broker, and a leader replica on the destination broker. Fig 3. We usually talk and think about replication using the logical model, but the work is really carried out at the broker level using multiple fetch sessions for each broker pair. A partition has a steady-state configuration of: A replica set (the membership). A leader replica. Zero or more follower replicas. A leader epoch. A partition epoch. Any change to the above configuration of a partition causes the partition epoch to be incremented. So, we can consider the period of a given partition epoch to be the fixed-topology failure-free steady state where the partition was making progress by itself. The leader epoch gets incremented on a leader change. All this configuration is determined by the control plane. Partition replicas are unable to change the leader themselves or perform any kind of reconfiguration. As far as the leader goes, there is a fixed membership. As far as the followers go, there is a fixed leader.  The only things that the data plane must implement is the sending and responding to fetch requests, piggybacking some key info in the fetches, such as: Key offsets such as the high watermark (commit index) and log stable offset (required for transactions). The leader epoch (required for fencing stale followers and leaders). Fencing is often a key component of replication protocols, making the data plane consensus correct. Fencing basically enforces the completion of one steady state configuration, ensuring it cannot make progress (while another has been created). Split-brain is basically two steady-state configurations both making progress simultaneously. Fencing is a mechanism that exists inline with the data plane to prevent that. In Kafka, the fencing is based on the partition and leader epochs. The controller has a number of duties: Broker failure detection . Partition replica leader changes . I won’t use the term election, as it is not based on voting. The controller is the benevolent dictator. Partition replica membership changes . The Controller does not know anything about the partition high watermarks, stable offsets, log end offsets, or any other data plane responsibilities. There is two-way communication between the control plane and data plane: Controller->Broker (partition replicas) . Broker metadata log learners. The controller performs partition configuration changes (basically, it performs metadata ops).  Each broker acts as a KRaft learner. When a broker learns of a partition metadata op for a local partition, it passes that op to the partition to act on. The op may require the broker to create a local replica first, then pass on the metadata to it. Or even delete a local partition. Broker (leader replicas)->Controller . Leader notifications. Partition replica leaders notify the Controller when a follower replica becomes in-sync or falls out-of-sync. This allows the Controller to maintain the In-Sync-Replicas set (ISR) for each topic partition. The ISR is basically a leader candidate pool for the Controller to pick from (one per partition). Fig 4. The control plane←→ data plane communication. Failure detection is handled via a heartbeat mechanism between brokers and the Controller, as well as some extra stuff like unclean failure detection on broker start-up.  Leader changes of a topic partition consist of the Controller picking any replica in the ISR. This is a metadata op, applied to the KRaft log, and therefore the replicas learn of this change via their host broker acting as a KRaft log learner. This way, replicas learn of their role and the partition's current configuration. Replica set member changes are relatively simple and also just a carefully controlled set of metadata ops. In general, it follows a grow->catch-up->shrink approach. For example, when replacing one replica with another, it does the following: The Controller adds a new replica to the partition replica set (a metadata op), and all replicas learn of this change. The broker of the added replica creates the local replica, that starts fetching from the leader. Catch-up phase .  The Controller waits to be notified that the new replica has catch up to the leader.  Shrink phase : The Controller receives the notification that the new replica has caught up. It removes the original replica from the replica set (a metadata op). All replicas learn of the final partition state via the KRaft log. The broker of the removed replica deletes all local state for that partition. Membership changes can be arbitrary, add one, remove one, replace one, or even replace the whole set with a new set. The logic in the controller is the same. It starts with growth, then once enough of the final replica set is caught up, it moves to the shrink phase, then finally marks it complete. The data plane has no concept of member changes at all–it just does what it does, given the configuration handed to it by the Controller. Fig 5. Membership changes are executed as a linear series of metadata ops, ensuring minimum ISR size for safety. Separating the control plane from data plane certainly has some benefits: Each plane is more straightforward than one combined one . The separation creates clear boundaries that make the system more maintainable and evolvable.  The data plane must only implement failure-free ordering (with some key correctness stuff like epoch-based fencing). The control plane must implement fault-tolerant consensus (via SMR), but the state machine logic can be a single-threaded event loop, acting on a linearized stream of notifications and commands. There are fewer brain-breaking interleavings of actions to contend with. With Raft, for example, there are a few tricky areas, such as how a leader may not actually be a member of the cluster as it is being removed, but it has to stick around, as a non-member leader, in order to commit the change that removes it. Stuff like that can just hurt your brain. By separating things out into separate roles, we make the logic simpler and easier to understand. The two planes can be evolved separately . One example of this is KIP-966, which adds an additional recovery mechanism to the protocol to handle unclean/abrupt shutdowns with relaxed fsync modes. The data plane basically didn’t change at all. Likewise, when optimizations were made to the broker-to-broker fetching, it did not affect any control plane logic whatsoever. I’ve only scratched the surface of the Kafka Replication Protocol. If you want to understand it in excruciating detail, then check out: My Kafka Replication Protocol description (aims for a similar level of understandability as the Raft paper). My TLA+ specification , which is admittedly quite big and complex but is faithful to pretty much the whole protocol, including reconfiguration. There's still room for further disaggregation. If we were to map Kafka onto the Paxos roles we would see that the data plane and control plane use separate sets of each role. The data plane co-locates all three roles into the partition replica. Likewise, KRaft which underpins the KRaft Controller, also does the same. We’ve disaggregated the protocol into control and data planes, but each plane is converged in terms of the Paxos roles. We can definitely go a lot further in terms of disaggregation. We can separate the Paxos roles in the data plane, do away with distinguished proposer altogether and also segment the log, among other things. (A) Disaggregated roles/participants. The participants form clearly delineated roles that can be run as separate processes. The protocol itself may still be converged (control plane and data plane intertwined) but exercised by disaggregated components. (B) Disaggregated protocol that separates the control plane from the data plane . The protocol itself is broken up into control plane and data plane duties, with clean and limited interaction between them. (C) Segmented logs . Logs are formed from a chain of logical log segments. (D) Pointer-based logs . Separates data from ordering. (E) Separating ordering from IO . Avoid the need for a leader by allowing all nodes to write, but coordinate via a sequencer component. (F) Leaderless proxies. Abstracts the specifics of the replicated log protocol from clients, via another disaggregated abstraction layer of leaderless proxies. (A) Disaggregated roles/participants (B) Disaggregated protocol that separates the control plane from the data plane. Controller (control plane) Partition replicas (data plane) Leader and followers Cluster control plane : Changing security settings, configurations, adding/ removing topics etc. Partition replication (aka Kafka Rep Protocol) control plane : Partition replica leader election, partition replica set membership changes, etc. A replica set (the membership). A leader replica. Zero or more follower replicas. A leader epoch. A partition epoch. Key offsets such as the high watermark (commit index) and log stable offset (required for transactions). The leader epoch (required for fencing stale followers and leaders). Broker failure detection . Partition replica leader changes . I won’t use the term election, as it is not based on voting. The controller is the benevolent dictator. Partition replica membership changes . Controller->Broker (partition replicas) . Broker metadata log learners. The controller performs partition configuration changes (basically, it performs metadata ops).  Each broker acts as a KRaft learner. When a broker learns of a partition metadata op for a local partition, it passes that op to the partition to act on. The op may require the broker to create a local replica first, then pass on the metadata to it. Or even delete a local partition. Broker (leader replicas)->Controller . Leader notifications. Partition replica leaders notify the Controller when a follower replica becomes in-sync or falls out-of-sync. This allows the Controller to maintain the In-Sync-Replicas set (ISR) for each topic partition. The ISR is basically a leader candidate pool for the Controller to pick from (one per partition). The Controller adds a new replica to the partition replica set (a metadata op), and all replicas learn of this change. The broker of the added replica creates the local replica, that starts fetching from the leader. Catch-up phase .  The Controller waits to be notified that the new replica has catch up to the leader.  Shrink phase : The Controller receives the notification that the new replica has caught up. It removes the original replica from the replica set (a metadata op). All replicas learn of the final partition state via the KRaft log. The broker of the removed replica deletes all local state for that partition. Each plane is more straightforward than one combined one . The separation creates clear boundaries that make the system more maintainable and evolvable.  The data plane must only implement failure-free ordering (with some key correctness stuff like epoch-based fencing). The control plane must implement fault-tolerant consensus (via SMR), but the state machine logic can be a single-threaded event loop, acting on a linearized stream of notifications and commands. There are fewer brain-breaking interleavings of actions to contend with. With Raft, for example, there are a few tricky areas, such as how a leader may not actually be a member of the cluster as it is being removed, but it has to stick around, as a non-member leader, in order to commit the change that removes it. Stuff like that can just hurt your brain. By separating things out into separate roles, we make the logic simpler and easier to understand. The two planes can be evolved separately . One example of this is KIP-966, which adds an additional recovery mechanism to the protocol to handle unclean/abrupt shutdowns with relaxed fsync modes. The data plane basically didn’t change at all. Likewise, when optimizations were made to the broker-to-broker fetching, it did not affect any control plane logic whatsoever. My Kafka Replication Protocol description (aims for a similar level of understandability as the Raft paper). My TLA+ specification , which is admittedly quite big and complex but is faithful to pretty much the whole protocol, including reconfiguration.

0 views
Jack Vanlightly 9 months ago

Log Replication Disaggregation Survey - Neon and MultiPaxos

Over the next series of posts, we'll explore how various real-world systems and some academic papers have implemented log replication with some form of disaggregation. In this first post we’ll look at MultiPaxos. There are no doubt many real-world implementations of MultiPaxos out there, but I want to focus on Neon’s architecture as it is illustrative of the benefits of thinking in terms of logical abstractions and responsibilities when designing complex systems. There are so many systems out there, too many for me to list without this becoming a huge research project. So I’m going to stick to systems I have directly been involved with, or have a decent level of knowledge about already. Feel free to comment on social media about interesting systems that I haven’t included. I have classified a few ways of breaking apart a monolithic replication protocol such as Raft. The classifications are: (A) Disaggregated roles/participants. The participants form clearly delineated roles that can be run as separate processes. The protocol itself may still be converged (control plane and data plane intertwined) but exercised by disaggregated components. (B) Disaggregated protocol that separates the control plane from the data plane . The protocol itself is broken up into control plane and data plane duties, with clean and limited interaction between them. (C) Segmented logs . Logs are formed from a chain of logical log segments. (D) Pointer-based logs . Separates data from ordering. (E) Separating ordering from IO . Avoid the need for a leader by allowing all nodes to write, but coordinate via a sequencer component. (F) Leaderless proxies. Abstracts the specifics of the replicated log protocol from clients, via another disaggregated abstraction layer of leaderless proxies. Disaggregation categories: (A) Disaggregated roles/participants. Neon is a superb example of disaggregating the replication protocol roles/participants. I wrote about Neon in November 2023.   Neon is a serverless Postgres service that runs Postgres instances with a modified storage engine. Rather than durably storing data on local disks, all write transactions go through a remote, distributed write-ahead-log (WAL) service based on MultiPaxos. Neon separates durable storage from Postgres instances. Each Postgres instance stores soft state (not required to be durable), and durable storage consists of: A remote distributed write-ahead-log (WAL) for high performance durable writes. This WAL acts as a replicated short-term durable storage component. An object store is used for long-term durable storage, with a set of Pageservers acting as a serving layer. Fig 1. The main components of the Neon architecture. The Safekeeper protocol employs a MultiPaxos-based approach to ensure data consistency, ensuring that only one Postgres primary can performs writes at a time, data is redundantly stored across Safekeepers, primary failovers do not lose data and so on. The following is an excerpt from that post (it perfectly explains why a disaggregated Paxos fits their needs). Rather than adopt Raft, Neon has chosen a Paxos implementation for its WAL service. Paxos defines the roles of Proposer, Acceptor, and Learner. Each role is responsible for a different part of the protocol and there are no rules regarding where the different roles run.  Proposers . A proposer simply proposes a value to be written. In Multi-Paxos, one proposer at a time is elected as the leader who proposes a sequence of values. This leader is also known as a Distinguished Proposer. Acceptors . Acceptors store the proposed values, and values are committed once accepted by a majority. Learners . A learner learns of committed values from the acceptors. With Multi-Paxos, one leadership term consists of a Prepare phase where a Proposer is elected as the Distinguished Proposer by the Acceptors. Then, the second (steady-state) phase is the Accept phase, where the leader proposes a sequence of values to the Acceptors, who must store the proposed values. Learners learn of the committed values from the acceptors. Implementations can choose to have these processes running on different machines in a disaggregated way or have a single process act as all three roles. The latter is precisely what Raft does. The Raft leader is the Distinguished Proposer; the leader and followers are all Acceptors, and the state machine on each member that sits atop this replication layer acts as a Learner. Coming back to Neon, it chose Paxos because of the ability to disaggregate the roles. If a Postgres database fails, a new one must be spun up to take its place. But what happens if the first database node is actually still operating? Now we have two primaries and what is known as split-brain. Split-brain leads to data inconsistency which we really want to avoid. What we need is a way of ensuring that only one primary can write to the WAL service at a time. We can’t prevent two primaries from existing, but we can ensure that only one can make progress while the other remains blocked. Paxos solves this problem. Each Postgres database is a proposer, each Safekeeper is an acceptor and the Pageservers are learners. Before a Neon Postgres database can write WAL records, it must get elected by the Safekeepers as the leader (distinguished proposer). Once elected, it is free to write (or propose) a sequence of WAL records to the Safekeepers. Once a majority of Safekeepers have acknowledged a WAL record, the database treats that record as committed. Fig 2. Neon components mapping onto MultiPaxos roles. Pageservers learn of the committed WAL records in their role as Paxos learners. WAL records are replayed over the latest image file, and those files are eventually uploaded to S3. The Pageservers then communicate with the Safekeepers about the index of the last applied WAL record, allowing Safekeepers to safely garbage collect that data. Excerpt end — There is a lot more to the Safekeeper protocol if you are interested, and it shares some commonality with how Aurora handles primary failover. But this survey is more concerned with how systems get disaggregated, so I’ll leave it there. The brilliance of Paxos lies in how it formalizes consensus into distinct roles - Proposers, Acceptors, and Learners. This fundamental separation of responsibilities creates a flexible building block that engineers can compose in creative ways to solve real-world problems. Neon demonstrates this power perfectly: instead of being constrained to a traditional cluster of identical nodes all running the same code, they were able to weave consensus from heterogeneous components. Postgres instances act as Proposers, specializing in generating new values, Safekeepers focus solely on their Acceptor role of durably storing and voting on proposals, while Pageservers operate as Learners, consuming the committed log. This composition allows each component to be optimized for its specific role - Postgres for query processing, Safekeepers for durability, and Pageservers for log consumption and storage management. I like this example because it breaks you free from looking at a replication protocol implementation being formed by a dedicated cluster with a rigid deployment model. That might be the right choice, or it might not, but making the choice consciously (not in ignorance) is the key. In Neon’s case, Paxos' role separation enabled Neon to build a consensus protocol that maps elegantly to their serverless architecture's natural boundaries. For me it is the perfect example of why abstractions in design matter. (A) Disaggregated roles/participants. The participants form clearly delineated roles that can be run as separate processes. The protocol itself may still be converged (control plane and data plane intertwined) but exercised by disaggregated components. (B) Disaggregated protocol that separates the control plane from the data plane . The protocol itself is broken up into control plane and data plane duties, with clean and limited interaction between them. (C) Segmented logs . Logs are formed from a chain of logical log segments. (D) Pointer-based logs . Separates data from ordering. (E) Separating ordering from IO . Avoid the need for a leader by allowing all nodes to write, but coordinate via a sequencer component. (F) Leaderless proxies. Abstracts the specifics of the replicated log protocol from clients, via another disaggregated abstraction layer of leaderless proxies. A remote distributed write-ahead-log (WAL) for high performance durable writes. This WAL acts as a replicated short-term durable storage component. An object store is used for long-term durable storage, with a set of Pageservers acting as a serving layer. Proposers . A proposer simply proposes a value to be written. In Multi-Paxos, one proposer at a time is elected as the leader who proposes a sequence of values. This leader is also known as a Distinguished Proposer. Acceptors . Acceptors store the proposed values, and values are committed once accepted by a majority. Learners . A learner learns of committed values from the acceptors.

0 views
Jack Vanlightly 9 months ago

Towards composable data platforms

This is post contains a mix of technology insight with some Confluent strategy commentary. Technology changes can be sudden (like generative AI) or slower juggernauts that kick off a slow chain reaction that takes years to play out. We could place object storage and its enablement of disaggregated architectures in that latter category. The open table formats, such as Apache Iceberg, Delta Lake, and Apache Hudi, form part of this chain reaction, but things aren’t stopping there. I’ve written extensively about the open table formats (OTFs). In my original Tableflow post , I wrote that shared tables were one of the major trends, enabled by the OTFs. But why is it that OTFs make for a good sharing primitive? I have been focused mainly on the separation of compute and storage. That OTFs allow for a headless architecture where different platforms can bring their own compute to the same data. This is all true.  But we can also view OTFs as enabling a kind of virtualization. In this post, I will start by explaining my take on OTFs and virtualization. Finally, I’ll bring it back to Confluent, the Confluent/Databricks partnership, and the future of composable data platforms. Virtualization in software refers to the creation of an abstraction layer that separates logical resources from their physical implementation. The abstraction may allow one physical resource to appear as multiple logical resources, or multiple physical resources to appear as a single logical resource. At every layer of computing infrastructure - from storage arrays that pool physical disks into flexible logical volumes, to network overlays that create programmable topologies independent of physical switches, to device virtualization that allows hardware components to be shared and standardized - virtualization provides a powerful separation of concerns. This abstraction layer lets resources be dynamically allocated, shared, and managed with greater efficiency. There are many types of hardware and software virtualization, and even data can be virtualized.  The term data virtualization has been around for at least two decades, mainly in the data integration space. In the data analytics world, data virtualization has been a compute abstraction over distributed data sources. Presto was the first open-source analytics project I am aware of to use the term. That type of data virtualization had limited impact and adoption because it was too high up the stack. You had to use Presto/Trino, which is just one hammer (and often a good one!) in a large toolbox. The OTFs introduce a new abstraction layer that can be used to virtualize table storage. The key is that it allows for the separation of data from metadata and shared storage from compute . Through metadata, one table can appear in two data platforms, without data copying. To avoid overloading data virtualization anymore, I will use the term Table Virtualization. Fig 1. Table virtualization, enabled by shared storage and open table formats Table virtualization manifests in several key ways: Single Source of Truth . An OTF is a protocol, an agreed-upon method of organizing data into data files, metadata files, index files, and directories. This is the single source of truth. This is the physical data that exists only in one location. All logical representations interact with this single source. Changes made through any logical representation are reflected in all others. Neutral ground : By leveraging Cloud Service Provider (CSP) object storage, the CSP becomes the neutral ground where different vendors can interoperate without concerning themselves with responsibility for complex storage management. Metadata-driven collaboration : Catalogs form the top layer; they are pure metadata. Tables can be federated across multiple catalogs, exposed as the platform sees fit.  Ownership separated from presentation : One platform may ultimately control the cloud account where the data of a given table is stored, or it could even be a customer’s account where the data is stored. One platform may act as the primary catalog for a table. This can be abstracted from users, who only see tables. Standardization : Collaboration occurs because of standardization. Both parties embrace a common open standard. Multiple compute/query engines can speak the same language. While attempts to virtualize data within compute engines never gained widespread adoption, data virtualization has found its natural home at the storage layer, where it can serve the diverse ecosystem of query engines and data platforms. Thinking about OTFs in terms of table virtualization can be helpful in the context of collaboration between data platforms. Being able to surface the same table in two platforms, as if they were native in both platforms is powerful. With that capability, there is no need to move tabular data between platforms, only expose key tables that can feed the other platform. Tables are not typically data-sharing primitives. Tables are used as a long-term store of data, used for a specific purpose by a single system. Table storage platforms face the opposite dynamic to streaming platforms - they accumulate these valuable data assets, which are more valuable in combination with other data assets, creating strong gravitational pulls. This data gravity easily leads to resistance to sharing. It’s easy to raise the walls and focus on getting data in, and not necessarily making it easy to share. But customers often don’t want one data platform. A large portion of Snowflake and Databricks customers actually run workloads on both platforms. At the same time, customers also don’t want a swarm of platforms. The Modern Data Stack (MDS) failed to sustain itself. Few wanted to compose a data architecture from 10-15 different vendors. People want to choose a small number of trusted vendors, and they want them to work together without a lot of toil and headaches. Table virtualization provides us with a bridge between the natural tension of data gravity and the need for ecosystem collaboration. Table virtualization can elegantly reduce this conflict by allowing the physical data to remain within a gravity well while enabling multiple logical access points (as virtualized tables) in other platforms. This approach works within the constraint that many platforms want to maintain control over their valuable data assets, while still enabling the collaborative ecosystem that modern data architectures require. In effect, table virtualization transforms "gravitational" data assets into a composable resource, where historically, data sharing might otherwise have been hard or impractical. Data gravity is real, and pretending it doesn’t exist is like trying to swim against the current. We need data gravity platforms to interoperate, and table virtualization looks like the ideal solution. One table can be exposed in two data platforms, but typically, one platform owns the data. The owner platform considers this a native table, and the other platform may choose to consider it an “external” table. It doesn’t have to be this way, but this may be the common case. This brings us to the question of ownership and access: Can both platforms write to the same table? Who maintains the catalog? Iceberg requires a catalog for consistent reads and writes. We can’t have two primary catalogs, or else the table will become inconsistent. How do vendors handle support issues for shared tables? Sharing is two-way.  Out -> An owner platform exposes table metadata for other platforms.  In -> A platform allows users to add “external” tables from other platforms. Databricks allows for “out” sharing via its Delta Sharing protocol. Snowflake allows for “in” sharing via external Iceberg tables. Fig 2. It takes two platforms to tango. We are seeing a common pattern regarding ownership and access among the big players. BigQuery and Snowflake, so far, operate with the rules that external tables are read-only, and native tables are exposed to other platforms as read-only. In other words, if you want to write to a Snowflake table, you do it directly via Snowflake APIs, and Snowflake won’t write to another platform’s table. Not all vendors need to choose this approach, but it is understandable why BigQuery and Snowflake would operate this way. For one, letting other platforms write to your own OTF tables, which you have a support team behind, could be risky and costly. Likewise, writing to an external table, without any control over the running of that table could also be risky. Does the other platform even expose a catalog for external writers? Platforms that operate at scale want stability and control. Also, don’t forget data gravity. Supporting outward and inward writes dilutes data gravity, which is a real concern to tabular data platforms. This read-only limitation is not actually so bad. We don’t usually need two platforms to write to the same table. Instead, we can allow different platforms to maintain derived data sets based on the shared data of each platform. The virtualized table is a sharing primitive for one platform to share its insights with another platform. This way, we can have bidirectional flows, or even graphs, of data enrichment and derived data sets between platforms. Fig 3. Native and external tables, still a powerful model for collaboration. So far, I’m describing how analytics platforms can be composed to make it easier for customers on multiple platforms (such as Snowflake and BigQuery, or Snowflake and Databricks) to compose a data architecture. But this is only encompassing the analytical estate. What about the operational estate, where the data-sharing primitive is the event stream? How can we compose the data assets of the operational and analytical estates?  This is what Tableflow has been developed for–the bidirectional materialization of streams as (virtual) tables and (virtual) tables as streams. Fig 4. Stream/table materialization working in combination with table virtualization. This stream-to-table materialization uses table virtualization as the sharing mechanism between the streaming and analytics platforms. It gives us some elementary building blocks for better data composability across an entire organization. Recently, I advocated for shifting our mindset away from seeing the world as two silos connected by ELT jobs where data is extracted and then landed; instead seeing the world as a graph of derived data sets. Fig 5. From the the ELT mindset (extract from left side to deposit on right side), towards the graph mindset (a graph of derived data sets). In this model, we primarily focus on how to compose data. It becomes less about how we extract data and move data and more about how we want to compose different types of data, across different platforms and different forms (such as streams and tables). This is an attractive idea. If we can pull it off, we can open up data like never before. There are a few barriers to achieving this goal, some rooted in Conway’s law, and some technological. What I am describing in this post, are the technological means to achieving this vision. As I warned at the beginning of the post, I do want to talk about Confluent. Last week the CEOs of Confluent and Databricks announced their strategic partnership , centered around building a bidirectional data flow between both platforms. First, Confluent topics will appear directly as Delta tables in Databricks, using a tight integration between Confluent Tableflow and Databricks Unity catalog. Later, Delta tables will be exposed directly in Confluent as Kafka topics, likely as change streams of mutable tables. Confluent and Databricks will provide a unified product experience while remaining separate vendors, each focusing on their core strengths. It’s actually quite elegant and there are few examples where two platforms were so made for collaboration in such a cohesive and coherent manner. What makes this possible is precisely what this blog post has been explaining: Stream-to-table materialization Table virtualization Fig 6. Planned bidirectional flow between Confluent and Databricks. Kafka topics exposed as Delta tables and vice versa. With Confluent’s Tableflow and Databricks Unity catalog, the analytics plane just gets tabular data (as Delta tables) from the operational plane appearing in the Unity catalog. The operational plane will get Kafka topics of analytics-derived data appearing in the Stream Catalog.  Confluent and Databricks are two great examples of platforms that are complimentary and should be composable. This partnership enables composability, with each platform focusing on its core strengths and values, benefiting joint customers who just want them to work together coherently with as little BS work as possible to make it happen. Stream-to-table materialization and table virtualization represent a fundamental shift in how we think about data integration and interoperability. Streams and virtualized tables are the two data-sharing primitives that make composable data architectures possible. Streams do what they've always done best - connecting systems of record with real-time events. But virtualized tables are the missing piece, turning tabular data that wants to stay put in gravity wells into a composable resource that can be shared between platforms. Together, they give us the building blocks to start fitting data platforms together like Lego bricks, rather than being glued together with ETL. Single Source of Truth . An OTF is a protocol, an agreed-upon method of organizing data into data files, metadata files, index files, and directories. This is the single source of truth. This is the physical data that exists only in one location. All logical representations interact with this single source. Changes made through any logical representation are reflected in all others. Neutral ground : By leveraging Cloud Service Provider (CSP) object storage, the CSP becomes the neutral ground where different vendors can interoperate without concerning themselves with responsibility for complex storage management. Metadata-driven collaboration : Catalogs form the top layer; they are pure metadata. Tables can be federated across multiple catalogs, exposed as the platform sees fit.  Ownership separated from presentation : One platform may ultimately control the cloud account where the data of a given table is stored, or it could even be a customer’s account where the data is stored. One platform may act as the primary catalog for a table. This can be abstracted from users, who only see tables. Standardization : Collaboration occurs because of standardization. Both parties embrace a common open standard. Multiple compute/query engines can speak the same language. Can both platforms write to the same table? Who maintains the catalog? Iceberg requires a catalog for consistent reads and writes. We can’t have two primary catalogs, or else the table will become inconsistent. How do vendors handle support issues for shared tables? Sharing is two-way.  Out -> An owner platform exposes table metadata for other platforms.  In -> A platform allows users to add “external” tables from other platforms. Stream-to-table materialization Table virtualization

0 views
Jack Vanlightly 9 months ago

How to disaggregate a log replication protocol

This post continues my series looking at log replication protocols, within the context of state-machine replication (SMR) or just when the log itself is the product (such as Kafka). So far I’ve been looking at Virtual Consensus, but now I’m going to widen the view to look at how log replication protocols can be disaggregated in general (there are many ways). In the next post, I’ll do a survey of log replication systems in terms of the types of disaggregation described in this post. Prior posts: An Introduction to Virtual Consensus in Delos Steady on! Separating Failure-Free Ordering from Fault-Tolerant Consensus As many people have said before, everything is a log. Replicated logs often serve as the foundational mechanism for achieving consensus in distributed data systems by providing an ordered, consistent sequence of operations that all servers can agree upon. This allows each server to maintain identical state despite operating independently across a network. The most well-known form of this approach is State Machine Replication (SMR). In state machine replication, distributed servers maintain consistency by applying the same sequence of operations from a replicated log in the same order, operating as deterministic state machines. Each state machine server applies the operations from the log to maintain local state, such as an embedded database (such as RocksDB or Pebble), or a queue, or any other data representation. The most famous converged SMR design is Raft. For a long time, Raft has been the default safe choice for teams wanting to implement SMR. This happened largely because Raft included important things like leader elections, reconfigurations, and snapshotting in the protocol. While Paxos came onto the scene first, it initially left it to each team to invent some of these additional pieces themselves. The first version of Paxos is known as single decree Paxos, because it only covers one round of consensus to agree on a single value. MultiPaxos came later to optimize Paxos for replicating a sequence of values. In the meantime, Raft was published. It was a clearly described, all-in-one (and the kitchen sink) protocol that left little room for interpretation (which was helpful). Teams still tweak Raft to fit their needs, but the core design usually remains the same. We describe Raft as a unified, or converged, protocol because it combines the data and control planes into one tightly coupled protocol: Each Raft server implements the whole protocol. Each server participates as a peer in consensus, replication, and storage. Replication cannot be separated from consensus, they are woven together. The replicated log mixes data plane and control plane entries. Using the terminology of my last post , it mixes failure-free ordering and fault-tolerant consensus into one protocol. In other words, the data plane is inextricably linked to the control plane. As I explained in my Virtual Consensus in Delos post, this approach has some drawbacks: High coupling : Because Raft combines control plane concerns (leader elections, reconfiguration) with data plane concerns (replication, with ordering), evolving the data plane cannot be done without also impacting the control plane. The control plane is even integrated into the log itself, with a mix of data and control entries. Changes to one plane necessitate changes to the other. More complexity : the protocol is large with many operations that can interleave in complex ways. Less flexibility : We cannot do things like independently scale consensus and replication or use erasure coding for the data plane instead of replication, as two examples (I’m unconvinced by CRaft ). Raft also has a monolithic log. That is, the log is a single logical sequence of entries. So we can classify Raft as a unified protocol over a monolithic log. Sorry to pick on Raft again, but everyone knows Raft, so it’s the best case study for a converged protocol. The question is, how can we disaggregate a monolithic log replication protocol such as Raft?  There are various ways of breaking apart this monolithic approach. These are the ones I can think of, though there may be more. (A) Disaggregated participants. The participants form clearly delineated roles that can be run as separate processes. The protocol itself may still be converged (control plane and data plane intertwined) but exercised by disaggregated components. (B) Disaggregated protocol that separates the control plane from the data plane . The protocol itself is broken up into control plane and data plane duties, with clean and limited interaction between them. (C) Segmented logs . Logs are formed from a chain of logical log segments. (D) Pointer-based logs . Separates data from ordering. (E) Separating ordering from IO . Avoid the need for a leader by allowing all nodes to write, but coordinate via a sequencer component. (F) Leaderless proxies. Abstracts the specifics of the replicated log protocol from clients, via another disaggregated abstraction layer of leaderless proxies. I’ve labeled these A-F as I’ll be referring back to them in a survey of log replication protocols. It’s possible to combine multiple together. Paxos made a fundamental contribution to distributed consensus by formalizing the responsibilities of reaching consensus and acting on the agreed values into distinct roles . Paxos separates the consensus protocol into proposers who drive consensus by proposing values to acceptors, acceptors who form the quorum necessary for reaching agreement (consensus), and learners who need to know the decided values. This creates a clear framework that allows system designers to reason about each role's responsibilities independently while ensuring their interaction maintains safety and liveness properties. The formalization of these roles has influenced the design of practical systems and protocols for decades, even when they don't strictly adhere to the original Paxos model. This cannot be understated. The original Paxos is known as single-decree Paxos as it is an algorithm for agreeing on a single value. MultiPaxos (or multi-decree Paxos) came later to allow a group of servers to agree on a sequence of values (aka a log). MultiPaxos made optimizations for a replicated log algorithm, such as stable leadership (distinguished proposer), reconfiguration, and so on. Fig 1. The Paxos roles, seen here in the context of MultiPaxos. Note that the number of each role are not limited to this depiction. MultiPaxos can be organized just like Raft, having the Proposers, Acceptors and Learners co-located on equal-peer-servers. However, you can also separate all three roles onto different servers.  Because all participants of MultiPaxos exercise both the data and control planes (and mix both into the same log), MultiPaxos is an example of a converged protocol, but with potentially disaggregated participants.  Where Raft and MultiPaxos mix the control plane and data plane into one unified protocol, others separate the protocol itself such that there is loose coupling between the work needed for replicating log data entries from the other work such as leader elections, membership changes, and so on.  The two planes must interact, and how loosely coupled these two planes are will impact the overall complexity of the broader protocol. Data plane : Log ordering, replication and storage. Control plane : Failure detection, leader election, and membership changes. Fig 2. The protocol is split into control plane and data plane. In the next post we’ll look at some real-word systems that separate concerns into control and data planes. A segmented log is a log made up of a chain of log segments, where each log segment is itself a log. Typically, the main log has a virtual sequential address space (of offsets) that maps onto the sequence of log segments (and their sequential address space of offsets). Writes go to the end segment, known as the active segment. All segments prior to the active segment must be sealed (no longer able to accept writes). Reads are directed to a given segment based on the mapping of virtual address to log segment. Fig 3. The log is formed from a chain of logical log segments. Each log segment is independent of the others. They do not form a doubly linked list where each segment has pointers to navigate forward and backward. The log segment chain is stored as a sequence in metadata (requiring consensus). Breaking up the logical, continuous log into logical segments can provide a number of benefits, such as enhanced write availability, less data movement, and a cleaner abstraction for managing background work such as tiering and garbage collection. In some cases, each log segment represents a period of steady state (data plane), where each new segment represents a historical control plane intervention to route around some error or respond to load distribution conditions. Replicated logs have a sequential virtual address space that maps onto a physical address space where the complete log entries can be physically read from. Clients deal with logical addresses and storage servers map those logical addresses to physical addresses. However, we can add another layer of indirection by having the physical log entries only contain metadata about the log entry, rather than the data payload itself. The metadata includes a pointer (or data reference) to where the data can be read from (another logical address, typically in a flat address space of something like object storage). The pointer-based log is a log of entry metadata and thus the log’s role is to apply the strict ordering guarantees and map the sequential addressing scheme of the log onto a flat, key-based address space such as file/object storage. Fig 4. A log can store complete log entries, or can store only log entry metadata, pointing to another storage service that hosts the data payload. Warpstream calls this “separating data from metadata” though I find the word metadata is too vague, plus the practice actually creates extra metadata in order to make the separation work. I prefer to think of it simply in terms of separating storage from ordering. Fig 5. Data is separated from ordering. By placing data payloads in the flat address space and the relatively tiny data references in the sequential address space, we can obtain the benefits of sequential addressing for ordering along with the scalability of flat address spaces for data storage. The key insight is that a sequential address space requires coordination to maintain order, so minimizing its size is beneficial (coordination can be costly). At the same time, flat address spaces allow each piece of data to be managed independently, making the storage layer more simple and flexible. This independence enables better horizontal scaling and parallel operations. Of course, this type of scalability also comes with some costs: The flat address space loses the benefits of large sequential reads/writes that are possible when laying out the sequential log data in a sequential organization on disk. As I have previously written , even modern SSDs prefer sequential writes. Higher latencies, as reads and writes require two operations, and flat address space storage may not provide the same level of sequential read/write performance. Object storage also has higher latency. This separation of storage from ordering is not a universally better approach. It all depends on the log workload. Most log replication protocols are leader-based. The leader proposes the entries to be replicated, determining the order of entries in the log. All clients must direct their writes to this leader, which naturally creates a scalability bottleneck. However, we can replace the idea of a leader with that of a sequencer. The sequencer only has to hand out 64-bit integers corresponding to positions in the log (not perform any other duty). A set of leaderless servers can perform the majority of the work, handling client requests and performing the IO, with ordering coming from the sequencer sourced sequence numbers. A mapping of virtual log positions to storage is required, so that: Servers know where to write entries to, that correspond to the obtained sequence numbers. Servers know where to read an entry from, for a given sequence number. One final wrinkle with this sequencer approach is that a server can obtain a sequence number but never perform the write, leaving holes in the log. A mechanism for filling in these holes or skipping them is required. Fig 6. Leaderless servers perform the IO and ordering is achieved by a simple sequencer component. There are various strategies for dealing with the failure of the sequencer. I’ll cover this subject in the follow-up survey. Enter another layer of indirection. Leaderless proxies abstract many aspects of the log protocol from clients, including having to learn which server is the leader. Clients simply connect to a stable endpoint to publish to and consume from the log. Fig 7. Leaderless proxies can hide leader-based log replication protocols This can be combined with different types of logs, from the standard inline continuous log to a pointer-based log, where proxies use object storage for payloads and then perform writes to the log with the address of the payload. Likewise, consumer proxies consume the log and download the payloads before passing them to consumer clients. Fig 8. Data and ordering separated. There are no doubt other ways of disaggregating things. In the next post, we’ll survey the log replication systems out there in terms of the A-F disaggregation classifications. It is theoretically possible for a system to implement all them! An Introduction to Virtual Consensus in Delos Steady on! Separating Failure-Free Ordering from Fault-Tolerant Consensus Each Raft server implements the whole protocol. Each server participates as a peer in consensus, replication, and storage. Replication cannot be separated from consensus, they are woven together. The replicated log mixes data plane and control plane entries. High coupling : Because Raft combines control plane concerns (leader elections, reconfiguration) with data plane concerns (replication, with ordering), evolving the data plane cannot be done without also impacting the control plane. The control plane is even integrated into the log itself, with a mix of data and control entries. Changes to one plane necessitate changes to the other. More complexity : the protocol is large with many operations that can interleave in complex ways. Less flexibility : We cannot do things like independently scale consensus and replication or use erasure coding for the data plane instead of replication, as two examples (I’m unconvinced by CRaft ). (A) Disaggregated participants. The participants form clearly delineated roles that can be run as separate processes. The protocol itself may still be converged (control plane and data plane intertwined) but exercised by disaggregated components. (B) Disaggregated protocol that separates the control plane from the data plane . The protocol itself is broken up into control plane and data plane duties, with clean and limited interaction between them. (C) Segmented logs . Logs are formed from a chain of logical log segments. (D) Pointer-based logs . Separates data from ordering. (E) Separating ordering from IO . Avoid the need for a leader by allowing all nodes to write, but coordinate via a sequencer component. (F) Leaderless proxies. Abstracts the specifics of the replicated log protocol from clients, via another disaggregated abstraction layer of leaderless proxies. Data plane : Log ordering, replication and storage. Control plane : Failure detection, leader election, and membership changes. The flat address space loses the benefits of large sequential reads/writes that are possible when laying out the sequential log data in a sequential organization on disk. As I have previously written , even modern SSDs prefer sequential writes. Higher latencies, as reads and writes require two operations, and flat address space storage may not provide the same level of sequential read/write performance. Object storage also has higher latency. Servers know where to write entries to, that correspond to the obtained sequence numbers. Servers know where to read an entry from, for a given sequence number.

0 views