Posts in Database (20 found)

Video + notes on upgrading a Datasette plugin for the latest 1.0 alpha, with help from uv and OpenAI Codex CLI

I'm upgrading various plugins for compatibility with the new Datasette 1.0a20 alpha release and I decided to record a video of the process. This post accompanies that video with detailed additional notes. I picked a very simple plugin to illustrate the upgrade process (possibly too simple). datasette-checkbox adds just one feature to Datasette: if you are viewing a table with boolean columns (detected as integer columns with names like or or ) and your current user has permission to update rows in that table it adds an inline checkbox UI that looks like this: I built the first version with the help of Claude back in August 2024 - details in this issue comment . Most of the implementation is JavaScript that makes calls to Datasette 1.0's JSON write API . The Python code just checks that the user has the necessary permissions before including the extra JavaScript. The first step in upgrading any plugin is to run its tests against the latest Datasette version. Thankfully makes it easy to run code in scratch virtual environments that include the different code versions you want to test against. I have a test utility called (for "test against development Datasette") which I use for that purpose. I can run it in any plugin directory like this: And it will run the existing plugin tests against whatever version of Datasette I have checked out in my directory. You can see the full implementation of (and its friend described below) in this TIL - the basic version looks like this: I started by running in the directory, and got my first failure... but it wasn't due to permissions, it was because the for the plugin was pinned to a specific mismatched version of Datasette: I fixed this problem by swapping to and ran the tests again... and they passed! Which was a problem because I was expecting permission-related failures. It turns out when I first wrote the plugin I was lazy with the tests - they weren't actually confirming that the table page loaded without errors. I needed to actually run the code myself to see the expected bug. First I created myself a demo database using sqlite-utils create-table : Then I ran it with Datasette against the plugin's code like so: Sure enough, visiting produced a 500 error about the missing method. The next step was to update the test to also trigger this error: And now fails as expected. It this point I could have manually fixed the plugin itself - which would likely have been faster given the small size of the fix - but instead I demonstrated a bash one-liner I've been using to apply these kinds of changes automatically: runs OpenAI Codex in non-interactive mode - it will loop until it has finished the prompt you give it. I tell it to consult the subset of the Datasette upgrade documentation that talks about Datasette permissions and then get the command to pass its tests. This is an example of what I call designing agentic loops - I gave Codex the tools it needed ( ) and a clear goal and let it get to work on my behalf. The remainder of the video covers finishing up the work - testing the fix manually, commiting my work using: Then shipping a 0.1a4 release to PyPI using the pattern described in this TIL . Finally, I demonstrated that the shipped plugin worked in a fresh environment using like this: Executing this command installs and runs a fresh Datasette instance with a fresh copy of the new alpha plugin ( ). It's a neat way of confirming that freshly released software works as expected. This video was shot in a single take using Descript , with no rehearsal and perilously little preparation in advance. I recorded through my AirPods and applied the "Studio Sound" filter to clean up the audio. I pasted in a closing slide from my previous video and exported it locally at 1080p, then uploaded it to YouTube. Something I learned from the Software Carpentry instructor training course is that making mistakes in front of an audience is actively helpful - it helps them see a realistic version of how software development works and they can learn from watching you recover. I see this as a great excuse for not editing out all of my mistakes! I'm trying to build new habits around video content that let me produce useful videos while minimizing the amount of time I spend on production. I plan to iterate more on the format as I get more comfortable with the process. I'm hoping I can find the right balance between production time and value to viewers. You are only seeing the long-form articles from my blog. Subscribe to /atom/everything/ to get all of my posts, or take a look at my other subscription options .

0 views

How Would You Like Your Iceberg Sir? Stream or Batch Ordered?

Today I want to talk about stream analytics, batch analytics and Apache Iceberg. Stream and batch analytics work differently but both can be built on top of Iceberg, but due to their differences there can be a tug-of-war over the Iceberg table itself. In this post I am going to use two real-world systems, Apache Fluss (streaming tabular storage) and Confluent Tableflow (Kafka-to-Iceberg), as a case study for these tensions between stream and batch analytics. Apache Fluss uses zero-copy tiering to Iceberg . Recent data is stored on Fluss servers (using Kafka replication protocol for high availability and durability) but is then moved to Iceberg for long-term storage. This results in one copy of the data. Confluent Kora and Tableflow uses internal topic tiering and Iceberg materialization , copying Kafka topic data to Iceberg, such that we have two copies (one in Kora, one in Iceberg). This post will explain why both have chosen different approaches and why both are totally sane, defensible decisions. First we should understand the concepts of stream-order and batch-order . A streaming Flink job typically assumes its sources come with stream-order . For example, a simple SELECT * Flink query assumes the source is (loosely) temporally ordered, as if it were a live stream. It might be historical data, such as starting at the earliest offset of a Kafka topic, but it is still loaded in a temporal order. Windows and temporal joins also depend on the source being stream-ordered to some degree, to avoid needing large/infinite window sizes which blow up the state. A Spark batch job typically hopes that the data layout of the Iceberg table is batch-ordered , say, partitioned and sorted by business values like region, customer etc), thus allowing it to efficiently prune data files that are not relevant, and to minimize costly shuffles. If Flink is just reading a Kafka topic from start to end, it’s nothing special. But we can also get fancy by reading from two data sources: one historical and one real-time. The idea is that we can unify historical data from Iceberg (or another table format) and real-time data from some kind of event stream. We call the reading from the historical source, bootstrapping . Streaming bootstrap refers to running a continuous query that reads historical data first and then seamlessly switches to live streaming input. In order to do the switch from historical to real-time source, we need to do that switch on a given offset. The notion of a “last tiered offset” is a correctness boundary that ensures that the bootstrap and the live stream blend seamlessly without duplication or gaps. This offset can be mapped to an Iceberg snapshot. Fig 1. Bootstrap a streaming Flink job from historical then switch to real-time. However, if the historical Iceberg data is laid out with a batch-order (partitioned and sorted by business values like region, customer etc) then the bootstrap portion of a SELECT * will appear completely out-of-order relative to stream-order. This breaks the expectations of the user, who wants to see data in the order it arrived (i.e., stream-order), not a seemingly random one.  We could sort the data first from batch-order back to stream-order in the Flink source before it reaches the Flink operator level, but this can get really inefficient. Fig 2. Sort batch-ordered historical data in the Flink source task. If the table has been partitioned by region and sorted by customer, but we want to sort it by the time it arrived (such as by timestamp or Kafka offset), this will require a huge amount of work and data shuffling (in a large table). The result is not only a very expensive bootstrap, but also a very slow one (afterall, we expect fast results with a streaming query). So we hit a wall: Flink wants data ordered temporally for efficient streaming bootstrap. Batch workloads want data ordered by value (e.g., columns) for effective pruning and scan efficiency. These two data layouts are orthogonal. Temporal order preserves ingest locality; value order preserves query locality. You can’t have both in a single physical layout. Fluss is a streaming tabular storage layer built for real-time analytics which can serve as the real-time data layer for lakehouse architectures. I did a comprehensive deep dive into Apache Fluss recently, diving right into the internals if you are interested. Apache Fluss takes a clear stance. It’s designed as a streaming storage layer for data lakehouses, so it optimizes Iceberg for streaming bootstrap efficiency. It does this by maintaining stream-order in the Iceberg table. Fig 3. Fluss stores real-time and historical data in stream-order. Internally, Fluss uses its own offset (akin to the Kafka offset) as the Iceberg sort order. This ensures that when Flink reads from Iceberg, it sees a temporally ordered sequence. The Flink source can literally stream data from Iceberg without a costly data shuffle.  Let’s take look at a Fluss log table. A log table can define: Optional partitioning keys (based on one or more columns). Without them, a table is one large partition. The number of buckets per partition . The bucket is the smallest logical subdivision of a Fluss partition. Optional bucketing key for hash-bucketing. Else rows are added to random buckets, or round-robin. The partitioning and buckets are both converted to an Iceberg partition spec. Fig 4. An example of the Iceberg partition spec and sort order Within each of these Iceberg partitions, the sort order is the Fluss offset. For example, we could partition by a date field, then spread the data randomly across the buckets within each partition. Fig 5. The partitions of an Iceberg table visualized. Inside Flink, the source will generate one “split” per table bucket, routing them by bucket id to split readers. Due to the offset sort order, each Parquet file should contain contiguous blocks of offsets after compaction. Therefore each split reader naturally reads Iceberg data in offset order until it switches to the Fluss servers for real-time data (also in offset order). Fig 6. Flink source bootstraps from Iceberg visualized Once the lake splits have been read, the readers start reading from the Fluss servers for real-time data. This is great for Flink streaming bootstrap (it is just scanning the data files as a cheap sequential scan). Primary key tables are similar but have additional limitations on the partitioning and bucketing keys (as they must be subsets of the primary key). A primary key, such as device_id , is not a good partition column as it’s too fine grained, leading us to use an unpartitioned table. Fig 7. Unpartitioned primary key table with 6 buckets. If we want Iceberg partitioning, we’ll need to add another column (such as a date) to the primary key and then use the date column for the partitioning key (and device_id as a bucket key for hash-bucketing) . This makes the device_id non-unique though. In short, Fluss is a streaming storage abstraction for tabular data in lakehouses and stores both real-time and historical data in stream-order. This layout is designed for streaming Flink jobs. But if you have a Spark job trying to query that same Iceberg table, pruning is almost useless as it does not use a batch-optimized layout. Fluss may well decide to support Iceberg custom partitioning and sorting (batch-order) in the future, but it will then face the same challenges of supporting streaming bootstrap from batch-ordered Iceberg. Confluent’s Tableflow (the Kafka-to-Iceberg materialization layer) took the opposite approach. It stores two copies of the data: one stream-ordered and one optionally batch-ordered. Kafka/Kora internally tiers log segments to object storage, which is a historical data source in stream-order (good for streaming bootstrap). Iceberg is a copy, which allows for stream-order or batch-order, it’s up to the customer. Custom partitioning and sort order is not yet available at the time of writing, but it’s coming. Fig 8. Tableflow continuously materializes a copy of a Kafka topic as an Iceberg table. I already wrote why I think zero-copy Iceberg tiering is a bad fit for Kafka specifically. Much also applies to Kora, which is why Tableflow is a separate distributed component from Kora brokers. So if we’re going to materialize a copy of the data for analytics, we have the freedom to allow customers to optimize their tables for their use case, which is often batch-based analytics. Fig 9. Copy 1 (original): Kora maintains stream-ordered live and historical Kafka data. Copy 2 (derived): Tableflow continuously materializes Kafka topics as Iceberg tables. If the Iceberg table is also stored in stream-order then Flink could do an Iceberg streaming bootstrap and then switch to Kafka. This is not available right now in Confluent, but it could be built. There are also improvements that could be made to historical data stored by Kora/Kafka, such as using a columnar format for log segments (something that Fluss does today). Either way, the materialization design provides the flexibility to execute a streaming bootstrap using a stream-order historical data source, allowing the customer to optimize the Iceberg table according to their needs. Batch jobs want value locality (data clustered by common predicates), aka batch-order. Streaming jobs want temporal locality (data ordered by ingestion), aka stream-order. With a single Iceberg table, once you commit to one, the other becomes inefficient. Given this constraint, we can understand the two different approaches: Fluss chose stream-order in its Iceberg tables to support stream analytics constraints and avoid a second copy of the data. That’s a valid design decision as after all, Fluss is a streaming tabular storage layer for real-time analytics that fronts the lakehouse. But it does mean giving up the ability to use Iceberg’s layout levers of partitioning and sorting to tune batch query performance. Confluent chose a stream-order in Kora and one optionally batch-ordered Iceberg copy (via Tableflow materialization), letting the customer decide the optimum Iceberg layout. That’s also a valid design decision as Confluent wants to connect systems of all kinds, be they real-time or not. Flexibility to handle diverse systems and diverse customer requirements wins out. But it does require a second copy of the data (causing higher storage costs). As the saying goes, the opposite of a good idea can be a good idea. It all depends on what you are building and what you want to prioritize. The only losing move is pretending you can have both (stream-optimized and batch-optimized workloads) in one Iceberg table without a cost. Once you factor in the compute cost of using one format for both workloads, the storage savings disappear. If you really need both, build two physical views and keep them in sync. Some related blog posts that are relevant this one: Beyond Indexes: How Open Table Formats Optimize Query Performance Why I’m not a fan of zero-copy Apache Kafka-Apache Iceberg Understanding Apache Fluss Apache Fluss uses zero-copy tiering to Iceberg . Recent data is stored on Fluss servers (using Kafka replication protocol for high availability and durability) but is then moved to Iceberg for long-term storage. This results in one copy of the data. Confluent Kora and Tableflow uses internal topic tiering and Iceberg materialization , copying Kafka topic data to Iceberg, such that we have two copies (one in Kora, one in Iceberg). Flink wants data ordered temporally for efficient streaming bootstrap. Batch workloads want data ordered by value (e.g., columns) for effective pruning and scan efficiency. Optional partitioning keys (based on one or more columns). Without them, a table is one large partition. The number of buckets per partition . The bucket is the smallest logical subdivision of a Fluss partition. Optional bucketing key for hash-bucketing. Else rows are added to random buckets, or round-robin. Fluss chose stream-order in its Iceberg tables to support stream analytics constraints and avoid a second copy of the data. That’s a valid design decision as after all, Fluss is a streaming tabular storage layer for real-time analytics that fronts the lakehouse. But it does mean giving up the ability to use Iceberg’s layout levers of partitioning and sorting to tune batch query performance. Confluent chose a stream-order in Kora and one optionally batch-ordered Iceberg copy (via Tableflow materialization), letting the customer decide the optimum Iceberg layout. That’s also a valid design decision as Confluent wants to connect systems of all kinds, be they real-time or not. Flexibility to handle diverse systems and diverse customer requirements wins out. But it does require a second copy of the data (causing higher storage costs). Beyond Indexes: How Open Table Formats Optimize Query Performance Why I’m not a fan of zero-copy Apache Kafka-Apache Iceberg Understanding Apache Fluss

0 views
The Coder Cafe 3 days ago

Build Your Own Key-Value Storage Engine—Week 1

Curious how leading engineers tackle extreme scale challenges with data-intensive applications? Join Monster Scale Summit (free + virtual). It’s hosted by ScyllaDB, the monstrously fast and scalable database. Agenda Week 0: Introduction Week 1: In-Memory Store Welcome to week 1 of Build Your Own Key-Value Storage Engine ! Let’s start by making sure what you’re about to build in this series makes complete sense: what’s a storage engine? A storage engine is the part of a database that actually stores, indexes, and retrieves data, whether on disk or in memory. Think of the database as the restaurant, and the storage engine as the kitchen that decides how food is prepared and stored. Some databases let you choose the storage engine. For example, MySQL uses InnoDB by default (based on B+-trees). Through plugins, you can switch to RocksDB, which is based on LSM trees. This week, you will build an in-memory storage engine and the first version of the validation client that you will reuse throughout the series. 💬 If you want to share your progress, discuss solutions, or collaborate with other coders, join the community Discord server ( channel): Join the Discord Keys are lowercase ASCII strings. Values are ASCII strings. NOTE : Assumptions persist for the rest of the series unless explicitly discarded. The request body contains the value. If the key exists, update its value and return success. If the key doesn’t exist, create it and return success. Keep all data in memory. If the key exists, return 200 OK with the value in the body. If the key does not exist, return . Implement a client to validate your server: Read the testing scenario from this file: put.txt . Run an HTTP request for each line: → Send a to with body . → Send a to . Confirm that is returned. If not, something is wrong with your implementation. → Send a GET to . Confirm that is returned. If not, something is wrong with your implementation. Each request must be executed sequentially, one line at a time; otherwise, out-of-order responses may fail the client’s assertions. If you want to generate an input file with a different number of lines, you can use this Go generator : is the format to generate. is the number of lines. At this stage, you need a -type file, so for example, if you need one million lines: Add basic metrics for latency: Record start and end time for each request. Keep a small histogram of latencies in milliseconds. At the end, print , , and . This work is optional as there is no latency target in this series. However, it can be an interesting point of comparison across weeks to see how your changes affect latency. That’s it for this week! You have built a simple storage engine that keeps everything in memory. In two weeks, we will level up. You will delve into a data structure widely used in key-value databases: LSM trees. Missing direction in your tech career? At The Coder Cafe, we serve timeless concepts with your coffee to help you master the fundamentals. Written by a Google SWE and trusted by thousands of readers, we support your growth as an engineer, one coffee at a time. ❤️ If you enjoyed this post, please hit the like button. Week 0: Introduction Week 1: In-Memory Store Welcome to week 1 of Build Your Own Key-Value Storage Engine ! Let’s start by making sure what you’re about to build in this series makes complete sense: what’s a storage engine? A storage engine is the part of a database that actually stores, indexes, and retrieves data, whether on disk or in memory. Think of the database as the restaurant, and the storage engine as the kitchen that decides how food is prepared and stored. Some databases let you choose the storage engine. For example, MySQL uses InnoDB by default (based on B+-trees). Through plugins, you can switch to RocksDB, which is based on LSM trees. This week, you will build an in-memory storage engine and the first version of the validation client that you will reuse throughout the series. Your Tasks 💬 If you want to share your progress, discuss solutions, or collaborate with other coders, join the community Discord server ( channel): Join the Discord Assumptions Keys are lowercase ASCII strings. Values are ASCII strings. : The request body contains the value. If the key exists, update its value and return success. If the key doesn’t exist, create it and return success. Keep all data in memory. : If the key exists, return 200 OK with the value in the body. If the key does not exist, return . Read the testing scenario from this file: put.txt . Run an HTTP request for each line: → Send a to with body . → Send a to . Confirm that is returned. If not, something is wrong with your implementation. → Send a GET to . Confirm that is returned. If not, something is wrong with your implementation. Each request must be executed sequentially, one line at a time; otherwise, out-of-order responses may fail the client’s assertions. is the format to generate. is the number of lines. Record start and end time for each request. Keep a small histogram of latencies in milliseconds. At the end, print , , and .

0 views
Simon Willison 3 days ago

A new SQL-powered permissions system in Datasette 1.0a20

Datasette 1.0a20 is out with the biggest breaking API change on the road to 1.0, improving how Datasette's permissions system works by migrating permission logic to SQL running in SQLite. This release involved 163 commits , with 10,660 additions and 1,825 deletions, most of which was written with the help of Claude Code. Datasette's permissions system exists to answer the following question: Is this actor allowed to perform this action , optionally against this particular resource ? An actor is usually a user, but might also be an automation operating via the Datasette API. An action is a thing they need to do - things like view-table, execute-sql, insert-row. A resource is the subject of the action - the database you are executing SQL against, the table you want to insert a row into. Datasette's default configuration is public but read-only: anyone can view databases and tables or execute read-only SQL queries but no-one can modify data. Datasette plugins can enable all sorts of additional ways to interact with databases, many of which need to be protected by a form of authentication Datasette also 1.0 includes a write API with a need to configure who can insert, update, and delete rows or create new tables. Actors can be authenticated in a number of different ways provided by plugins using the actor_from_request() plugin hook. datasette-auth-passwords and datasette-auth-github and datasette-auth-existing-cookies are examples of authentication plugins. The previous implementation included a design flaw common to permissions systems of this nature: each permission check involved a function call which would delegate to one or more plugins and return a True/False result. This works well for single checks, but has a significant problem: what if you need to show the user a list of things they can access, for example the tables they can view? I want Datasette to be able to handle potentially thousands of tables - tables in SQLite are cheap! I don't want to have to run 1,000+ permission checks just to show the user a list of tables. Since Datasette is built on top of SQLite we already have a powerful mechanism to help solve this problem. SQLite is really good at filtering large numbers of records. The biggest change in the new release is that I've replaced the previous plugin hook - which let a plugin determine if an actor could perform an action against a resource - with a new permission_resources_sql(actor, action) plugin hook. Instead of returning a True/False result, this new hook returns a SQL query that returns rules helping determine the resources the current actor can execute the specified action against. Here's an example, lifted from the documentation: This hook grants the actor with ID "alice" permission to view the "sales" table in the "accounting" database. The object should always return four columns: a parent, child, allow (1 or 0), and a reason string for debugging. When you ask Datasette to list the resources an actor can access for a specific action, it will combine the SQL returned by all installed plugins into a single query that joins against the internal catalog tables and efficiently lists all the resources the actor can access. This query can then be limited or paginated to avoid loading too many results at once. Datasette has several additional requirements that make the permissions system more complicated. Datasette permissions can optionally act against a two-level hierarchy . You can grant a user the ability to insert-row against a specific table, or every table in a specific database, or every table in every database in that Datasette instance. Some actions can apply at the table level, others the database level and others only make sense globally - enabling a new feature that isn't tied to tables or databases, for example. Datasette currently has ten default actions but plugins that add additional features can register new actions to better participate in the permission systems. Datasette's permission system has a mechanism to veto permission checks - a plugin can return a deny for a specific permission check which will override any allows. This needs to be hierarchy-aware - a deny at the database level can be outvoted by an allow at the table level. Finally, Datasette includes a mechanism for applying additional restrictions to a request. This was introduced for Datasette's API - it allows a user to create an API token that can act on their behalf but is only allowed to perform a subset of their capabilities - just reading from two specific tables, for example. Restrictions are described in more detail in the documentation. That's a lot of different moving parts for the new implementation to cover. Since permissions are critical to the security of a Datasette deployment it's vital that they are as easy to understand and debug as possible. The new alpha adds several new debugging tools, including this page that shows the full list of resources matching a specific action for the current user: And this page listing the rules that apply to that question - since different plugins may return different rules which get combined together: This screenshot illustrates two of Datasette's built-in rules: there is a default allow for read-only operations such as view-table (which can be over-ridden by plugins) and another rule that says the root user can do anything (provided Datasette was started with the option.) Those rules are defined in the datasette/default_permissions.py Python module. There's one question that the new system cannot answer: provide a full list of actors who can perform this action against this resource. It's not possibly to provide this globally for Datasette because Datasette doesn't have a way to track what "actors" exist in the system. SSO plugins such as mean a new authenticated GitHub user might show up at any time, with the ability to perform actions despite the Datasette system never having encountered that particular username before. API tokens and actor restrictions come into play here as well. A user might create a signed API token that can perform a subset of actions on their behalf - the existence of that token can't be predicted by the permissions system. This is a notable omission, but it's also quite common in other systems. AWS cannot provide a list of all actors who have permission to access a specific S3 bucket, for example - presumably for similar reasons. Datasette's plugin ecosystem is the reason I'm paying so much attention to ensuring Datasette 1.0 has a stable API. I don't want plugin authors to need to chase breaking changes once that 1.0 release is out. The Datasette upgrade guide includes detailed notes on upgrades that are needed between the 0.x and 1.0 alpha releases. I've added an extensive section about the permissions changes to that document. I've also been experimenting with dumping those instructions directly into coding agent tools - Claude Code and Codex CLI - to have them upgrade existing plugins for me. This has been working extremely well . I've even had Claude Code update those notes itself with things it learned during an upgrade process! This is greatly helped by the fact that every single Datasette plugin has an automated test suite that demonstrates the core functionality works as expected. Coding agents can use those tests to verify that their changes have had the desired effect. I've also been leaning heavily on to help with the upgrade process. I wrote myself two new helper scripts - and - to help test the new plugins. The and implementations can be found in this TIL . Some of my plugin upgrades have become a one-liner to the command, which runs OpenAI Codex CLI with a prompt without entering interactive mode: There are still a bunch more to go - there's a list in this tracking issue - but I expect to have the plugins I maintain all upgraded pretty quickly now that I have a solid process in place. This change to Datasette core by far the most ambitious piece of work I've ever attempted using a coding agent. Last year I agreed with the prevailing opinion that LLM assistance was much more useful for greenfield coding tasks than working on existing codebases. The amount you could usefully get done was greatly limited by the need to fit the entire codebase into the model's context window. Coding agents have entirely changed that calculation. Claude Code and Codex CLI still have relatively limited token windows - albeit larger than last year - but their ability to search through the codebase, read extra files on demand and "reason" about the code they are working with has made them vastly more capable. I no longer see codebase size as a limiting factor for how useful they can be. I've also spent enough time with Claude Sonnet 4.5 to build a weird level of trust in it. I can usually predict exactly what changes it will make for a prompt. If I tell it "extract this code into a separate function" or "update every instance of this pattern" I know it's likely to get it right. For something like permission code I still review everything it does, often by watching it as it works since it displays diffs in the UI. I also pay extremely close attention to the tests it's writing. Datasette 1.0a19 already had 1,439 tests, many of which exercised the existing permission system. 1.0a20 increases that to 1,583 tests. I feel very good about that, especially since most of the existing tests continued to pass without modification. I built several different proof-of-concept implementations of SQL permissions before settling on the final design. My research/sqlite-permissions-poc project was the one that finally convinced me of a viable approach, That one started as a free ranging conversation with Claude , at the end of which I told it to generate a specification which I then fed into GPT-5 to implement. You can see that specification at the end of the README . I later fed the POC itself into Claude Code and had it implement the first version of the new Datasette system based on that previous experiment. This is admittedly a very weird way of working, but it helped me finally break through on a problem that I'd been struggling with for months. Now that the new alpha is out my focus is upgrading the existing plugin ecosystem to use it, and supporting other plugin authors who are doing the same. The new permissions system unlocks some key improvements to Datasette Cloud concerning finely-grained permissions for larger teams, so I'll be integrating the new alpha there this week. This is the single biggest backwards-incompatible change required before Datasette 1.0. I plan to apply the lessons I learned from this project to the other, less intimidating changes. I'm hoping this can result in a final 1.0 release before the end of the year! You are only seeing the long-form articles from my blog. Subscribe to /atom/everything/ to get all of my posts, or take a look at my other subscription options . Understanding the permissions system Permissions systems need to be able to efficiently list things The new permission_resources_sql() plugin hook Hierarchies, plugins, vetoes, and restrictions New debugging tools The missing feature: list actors who can act on this resource Upgrading plugins for Datasette 1.0a20 Using Claude Code to implement this change Starting with a proof-of-concept Miscellaneous tips I picked up along the way What's next? = "test against datasette dev" - it runs a plugin's existing test suite against the current development version of Datasette checked out on my machine. It passes extra options through to so I can run or as needed. = "run against datasette dev" - it runs the latest dev command with the plugin installed. When working on anything relating to plugins it's vital to have at least a few real plugins that you upgrade in lock-step with the core changes. The and shortcuts were invaluable for productively working on those plugins while I made changes to core. Coding agents make experiments much cheaper. I threw away so much code on the way to the final implementation, which was psychologically easier because the cost to create that code in the first place was so low. Tests, tests, tests. This project would have been impossible without that existing test suite. The additional tests we built along the way give me confidence that the new system is as robust as I need it to be. Claude writes good commit messages now! I finally gave in and let it write these - previously I've been determined to write them myself. It's a big time saver to be able to say "write a tasteful commit message for these changes". Claude is also great at breaking up changes into smaller commits. It can also productively rewrite history to make it easier to follow, especially useful if you're still working in a branch. A really great way to review Claude's changes is with the GitHub PR interface. You can attach comments to individual lines of code and then later prompt Claude like this: . This is a very quick way to apply little nitpick changes - rename this function, refactor this repeated code, add types here etc. The code I write with LLMs is higher quality code . I usually find myself making constant trade-offs while coding: this function would be neater if I extracted this helper, it would be nice to have inline documentation here, this changing this would be good but would break a dozen tests... for each of those I have to determine if the additional time is worth the benefit. Claude can apply changes so much faster than me that these calculations have changed - almost any improvement is worth applying, no matter how trivial, because the time cost is so low. Internal tools are cheap now. The new debugging interfaces were mostly written by Claude and are significantly nicer to use and look at than the hacky versions I would have knocked out myself, if I had even taken the extra time to build them. That trick with a Markdown file full of upgrade instructions works astonishingly well - it's the same basic idea as Claude Skills . I maintain over 100 Datasette plugins now and I expect I'll be automating all sorts of minor upgrades in the future using this technique.

0 views

High-Performance Query Processing with NVMe Arrays: Spilling without Killing Performance

High-Performance Query Processing with NVMe Arrays: Spilling without Killing Performance Maximilian Kuschewski, Jana Giceva, Thomas Neumann, and Viktor Leis SIGMOD'25 In database vernacular, spilling is the process of writing intermediate data to disk in order to evaluate a query with a finite amount of main memory. It goes without saying that database folks are control freaks performance conscious and don’t want to rely on generic OS paging mechanisms to handle working sets which are larger than main memory. This tidbit about Snowflake is fascinating: Only 5% of analytical queries in Snowflake’s 2018 workload trace spill data, but those 5% contribute 45% of the overall CPU time and 29% of the total execution time [77]. One fundamental problem in this area is that it is very hard to predict up-front exactly how much working memory will be needed to efficiently execute a query. Databases have to estimate based on statistics of the relations used in a query, or assume no spilling will be needed, and then gracefully fall back to spilling if necessary. The two operators which this paper deals with are joins and aggregations. Both involve key columns, and the cardinality of the key columns is critical in determining if spilling is necessary. One obvious spilling mechanism is to use a partitioning approach for joins and aggregations. I’ve described partitioned joins in summaries of these papers: Efficiently Processing Joins and Grouped Aggregations on GPUs The reason why partitioning nicely solves the problem is that the working set requirements for both steps of a partitioned join are modest. The partitioning step only requires a small amount of memory per partition (e.g., accumulate 64KiB per partition before appending partitioned tuples to on-disk storage). The join step only needs enough working memory to join a single partition. Section 4.1 of the paper claims that partitioning slows down TPC-H queries by 2-5x. My spider sense is tingling, but let’s take this as an axiom for now. Here is the premise of the paper: partitioning is better for queries that must spill, but worse for queries that can be completed without spilling . What is an efficient and simple design given that a database cannot perfectly predict up-front if it will need to spill? Prior work along similar lines introduced the hybrid hash join. A hybrid hash join partitions the left (build) input of the join and dynamically decides what percentage of build partitions must be spilled. A hash table is built containing all non-spilled build partitions. Next the right (probe) input to the join is processed. For each probe tuple, the database determines if the associated partition was spilled. If the partition was spilled, then the probe tuple is spilled. If not, then the probe tuple is processed immediately via a lookup in the hash table. Finally, all spilled partitions are processed. The downside of this approach is that it always partitions the build side, even when that is unnecessary. This paper proposes a join implementation that only pays the cost of partitioning when spilling is required. It is a two-phase process. In the materialization phase, the build table is scanned (and pushed-down filters are applied). The resulting tuples are stored in a list of pages. At first, the system optimistically assumes that no spilling is necessary and appends each tuple to the current page. If a memory limit is reached, then partitioning is enabled. Each tuple processed after that point is partitioned, and per-partition lists of pages are allocated. If a further memory limit is reached, then some partitions are spilled to disk. Next, the build and probe phase executes in a manner similar to hybrid hash join. However, there is a fast path for the case where no spilling occurred. In this case, the tuples produced by the materialization phase are inserted into one large hash table, and then the probe tuples are streamed, with one hash table lookup per tuple. If partitioning (but no spilling) occurred, then the hash table inserts will have high locality (assuming a chained hash table). If spilling did occur, then the build and probe phase operates like a hybrid hash join, spilling probe tuples if and only if the associated build partition was spilled. The paper isn’t clear on what happens to non-partitioned build tuples once the system decides to start spilling build partitions. My assumption is that in this case, probe tuples must probe both the spilled build tuples and these build tuples that were processed before partitioning was enabled. The implementation of aggregation described in this paper follows a similar 2-phase approach. The materialization phase performs pre-aggregation, the system starts by aggregating into an in-memory hash table. If the hash table grows too large, then partitioning kicks in. Tuples from in-memory hash tables are evicted into per-partition page lists, and subsequent tuples are directly stored in these per-partition page lists. These per-partition pages can be spilled if further memory limits are reached. The subsequent phase then performs any necessary aggregation. If no eviction occurred, then this phase has no work to do. The paper describes an adaptive compression algorithm that improves spilling performance. Some interesting numbers from the paper: The number of CPU cycles per input byte for executing an in-memory TPC-H queries ranges from 3.3 to 60.3 The number of CPU cycles required to write and read a byte to/from SSD is 11.1 The adaptive nature of this scheme is driven by the fact that queries are diverse, and compressors have knobs which trade off speed for compression ratio, as illustrated by Fig. 3: Source: https://dl.acm.org/doi/10.1145/3698813 When spilling occurs, the system dynamically balances CPU usage and IO bandwidth by adjusting these knobs. Fig. 5 shows in-memory throughput while Fig. 6 shows throughput when spilling occurs: Source: https://dl.acm.org/doi/10.1145/3698813 Dangling Pointers The design of the unified hash join hinges on the fact that partitioning is a bad idea for the in-memory case. This is in contrast to papers describing in-memory partitioning join implementation on other types of chips like SPID-Join and Efficiently Processing Joins and Grouped Aggregations on GPUs . I imagine there is a lot of literature about this topic that I haven’t read. Leave a comment with your experience. Thanks for reading Dangling Pointers! Subscribe for free to receive new posts. The number of CPU cycles per input byte for executing an in-memory TPC-H queries ranges from 3.3 to 60.3 The number of CPU cycles required to write and read a byte to/from SSD is 11.1

0 views
Armin Ronacher 5 days ago

Absurd Workflows: Durable Execution With Just Postgres

It’s probably no surprise to you that we’re building agents somewhere. Everybody does it. Building a good agent, however, brings back some of the historic challenges involving durable execution. Entirely unsurprisingly, a lot of people are now building durable execution systems. Many of these, however, are incredibly complex and require you to sign up for another third-party service. I generally try to avoid bringing in extra complexity if I can avoid it, so I wanted to see how far I can go with just Postgres. To this end, I wrote Absurd 1 , a tiny SQL-only library with a very thin SDK to enable durable workflows on top of just Postgres — no extension needed. Durable execution (or durable workflows) is a way to run long-lived, reliable functions that can survive crashes, restarts, and network failures without losing state or duplicating work. Durable execution can be thought of as the combination of a queue system and a state store that remembers the most recently seen execution state. Because Postgres is excellent at queues thanks to , you can use it for the queue (e.g., with pgmq ). And because it’s a database, you can also use it to store the state. The state is important. With durable execution, instead of running your logic in memory, the goal is to decompose a task into smaller pieces (step functions) and record every step and decision. When the process stops (whether it fails, intentionally suspends, or a machine dies) the engine can replay those events to restore the exact state and continue where it left off, as if nothing happened. Absurd at the core is a single file ( ) which needs to be applied to a database of your choice. That SQL file’s goal is to move the complexity of SDKs into the database. SDKs then make the system convenient by abstracting the low-level operations in a way that leverages the ergonomics of the language you are working with. The system is very simple: A task dispatches onto a given queue from where a worker picks it up to work on. Tasks are subdivided into steps , which are executed in sequence by the worker. Tasks can be suspended or fail, and when that happens, they execute again (a run ). The result of a step is stored in the database (a checkpoint ). To avoid repeating work, checkpoints are automatically loaded from the state storage in Postgres again. Additionally, tasks can sleep or suspend for events and wait until they are emitted. Events are cached, which means they are race-free. What is the relationship of agents with workflows? Normally, workflows are DAGs defined by a human ahead of time. AI agents, on the other hand, define their own adventure as they go. That means they are basically a workflow with mostly a single step that iterates over changing state until it determines that it has completed. Absurd enables this by automatically counting up steps if they are repeated: This defines a single task named , and it has just a single step. The return value is the changed state, but the current state is passed in as an argument. Every time the step function is executed, the data is looked up first from the checkpoint store. The first checkpoint will be , the second , , etc. Each state only stores the new messages it generated, not the entire message history. If a step fails, the task fails and will be retried. And because of checkpoint storage, if you crash in step 5, the first 4 steps will be loaded automatically from the store. Steps are never retried, only tasks. How do you kick it off? Simply enqueue it: And if you are curious, this is an example implementation of the function used above: And like Temporal and other solutions, you can yield if you want. If you want to come back to a problem in 7 days, you can do so: Or if you want to wait for an event: Which someone else can emit: Really, that’s it. There is really not much to it. It’s just a queue and a state store — that’s all you need. There is no compiler plugin and no separate service or whole runtime integration . Just Postgres. That’s not to throw shade on these other solutions; they are great. But not every problem necessarily needs to scale to that level of complexity, and you can get quite far with much less. Particularly if you want to build software that other people should be able to self-host, that might be quite appealing. It’s named Absurd because durable workflows are absurdly simple, but have been overcomplicated in recent years. ↩ It’s named Absurd because durable workflows are absurdly simple, but have been overcomplicated in recent years. ↩

0 views

Low-Latency Transaction Scheduling via Userspace Interrupts: Why Wait or Yield When You Can Preempt?

Low-Latency Transaction Scheduling via Userspace Interrupts: Why Wait or Yield When You Can Preempt? Kaisong Huang, Jiatang Zhou, Zhuoyue Zhao, Dong Xie, and Tianzheng Wang SIGMOD'25 Say you are a database, and your job is to execute two kinds of queries (both from different TPC benchmarks ): High-priority New-Order queries from TPC-C (OLTP) Low-priority Q2 queries from TPC-H (OLAP) Congratulations, you are a hybrid transaction/analytical processing ( HTAP ) database! You would like OLTP transactions to experience low tail latency, while OLAP transactions run at high throughput. How can transactions be scheduled to achieve these goals? A Non-Preemptive FIFO policy runs transactions to completion in the order they came. This is easy but has a high tail latency for OLTP transactions that have to sit in line behind a long queue of OLAP transaction. A cooperative policy involves yield points at specific places in the database code. At each yield point, an OLAP transaction can realize that an OLTP transaction is waiting and yield the CPU. It is a hassle to insert yield points, and it is hard to find the Goldilocks frequency. Checking for high-priority transactions too often adds overhead, but checking infrequently increases tail latency. A preemptive policy allows high-priority OLTP transactions to borrow a CPU core as soon as possible. In the past, the only practical way to do this involved OS context switching, which is expensive. Fig. 2. illustrates these policies: Source: https://dl.acm.org/doi/abs/10.1145/3725319 Enter userspace interrupts . These allow preemption without OS kernel involvement. Section 4.2 of the paper makes it clear that it isn’t totally easy to implement userspace context switching on top of userspace interrupts. An idiomatic use case for userspace interrupts is for an interrupt handler to quickly save some data and then return back to the code that was running. The context switch case is not idiomatic. For each CPU core, two pthreads threads are created, and there are two stacks. Say the CPU core is running a low-priority (OLAP) transaction and a userspace interrupt is delivered to the core. The userspace interrupt handler is invoked, which mucks around with the CPU registers and (including the stack pointer), and then returns. But it doesn’t return to the code that was running the low-priority transaction, it returns to code which runs the high-priority (OLTP) transaction . Once the high-priority transaction finishes, it calls a voluntary context switch function, which again mucks around with CPU registers and the stack pointer in just the correct manner so that it returns back to the code running the low-priority transaction. There are some nitty-gritty details to get this working correctly. Tricky cases have to be handled such as: A userspace interrupt occurring in the middle of the context switch function Support for database code which uses thread-local storage (e.g., the modifier in C++ ) Avoiding deadlocks associated with a userspace interrupt occurring while a database lock is acquired As seen with xUI , while userspace interrupts are cheap, they still incur a cost. This paper proposes firing a single interrupt to execute a batch of high-priority transactions. Section 5 also describes a starvation avoidance mechanism to ensure that low-priority transactions eventually finish. Note that when a low-priority transaction is not preempted, it is not automatically aborted . The paper assumes the underlying database uses multi-versioning and optimistic concurrency control. Fig. 10 has the headline results. represents FIFO scheduling. represents the case where the OLAP query can occasionally yield the CPU core. is the work described in this paper. Tail latencies for OLTP queries are significantly reduced, while performance of OLAP queries does not change much. Source: https://dl.acm.org/doi/abs/10.1145/3725319 Dangling Pointers It would be nice to see a comparison against a version which uses traditional OS thread synchronization rather than userspace interrupts. The details of userspace context switching are tricky and seem orthogonal to databases. A library or OS functionality which provides a robust implementation seems like a useful thing to exist. The paper doesn’t mention what happens if the work associated with a single query is parallelized across multiple CPU cores. I imagine this complicates the scheduling policy. Thanks for reading Dangling Pointers! Subscribe for free to receive new posts. High-priority New-Order queries from TPC-C (OLTP) Low-priority Q2 queries from TPC-H (OLAP) Source: https://dl.acm.org/doi/abs/10.1145/3725319 Enter userspace interrupts . These allow preemption without OS kernel involvement. Context Switch Complications Section 4.2 of the paper makes it clear that it isn’t totally easy to implement userspace context switching on top of userspace interrupts. An idiomatic use case for userspace interrupts is for an interrupt handler to quickly save some data and then return back to the code that was running. The context switch case is not idiomatic. For each CPU core, two pthreads threads are created, and there are two stacks. Say the CPU core is running a low-priority (OLAP) transaction and a userspace interrupt is delivered to the core. The userspace interrupt handler is invoked, which mucks around with the CPU registers and (including the stack pointer), and then returns. But it doesn’t return to the code that was running the low-priority transaction, it returns to code which runs the high-priority (OLTP) transaction . Once the high-priority transaction finishes, it calls a voluntary context switch function, which again mucks around with CPU registers and the stack pointer in just the correct manner so that it returns back to the code running the low-priority transaction. There are some nitty-gritty details to get this working correctly. Tricky cases have to be handled such as: A userspace interrupt occurring in the middle of the context switch function Support for database code which uses thread-local storage (e.g., the modifier in C++ ) Avoiding deadlocks associated with a userspace interrupt occurring while a database lock is acquired

0 views
The Coder Cafe 1 weeks ago

Build Your Own Key-Value Storage Engine

Welcome to The Coding Corner ! This is our new section at The Coder Cafe, where we build real-world systems together, one step at a time. Next week, we will launch the first post series: Build Your Own Key-Value Storage Engine . Are you interested in understanding how key-value databases work? Tackling challenges like durability, partitioning, and compaction? Exploring data structures like LSM trees, Bloom filters, and tries? Then this series is for you. Build Your Own Key-Value Storage Engine focuses on the storage engine itself; we will stay single-node. Topics such as replication and consensus are out of scope. Yet, if this format works, we may cover them in a future series. The structure of each post will be as follows: Introduction : The theory for what you are about to build that week. Your tasks : A list of tasks to complete the week’s challenges. Note that you can complete the series in any programming language you want. Further notes : Additional perspective on how things work in real systems. If you’re not going to implement things yourself but are interested in databases, you may still want to read sections 1 and 3 at least. Each week out of two, a new post of the series will be released. Last but not least, I’m delighted to share that this series was written in collaboration with ScyllaDB. They reviewed the content for accuracy and shared practical context from real systems, providing a clearer view of how production databases behave and the problems they solve. Huge thanks to , Felipe Cardeneti Mendes , and ScyllaDB. By the way, they host a free virtual conference called Monster Scale Summit, and the content is always excellent. If you care about scaling challenges, it’s absolutely worth registering! Also, if you’re interested in giving a talk, the CFP closes in two days. Curious how leading engineers tackle extreme scale challenges with data-intensive applications? Join Monster Scale Summit (free + virtual). It’s hosted by ScyllaDB, the monstrously fast and scalable database. On a personal note, this has been the most time-consuming project I have done for The Coder Cafe . I really hope you will enjoy it! See you this Friday for a special post for Halloween and next Wednesday for the first post of the series. Missing direction in your tech career? At The Coder Cafe, we serve timeless concepts with your coffee to help you master the fundamentals. Written by a Google SWE and trusted by thousands of readers, we support your growth as an engineer, one coffee at a time. ❤️ If you enjoyed this post, please hit the like button. Welcome to The Coding Corner ! This is our new section at The Coder Cafe, where we build real-world systems together, one step at a time. Next week, we will launch the first post series: Build Your Own Key-Value Storage Engine . Are you interested in understanding how key-value databases work? Tackling challenges like durability, partitioning, and compaction? Exploring data structures like LSM trees, Bloom filters, and tries? Then this series is for you. Build Your Own Key-Value Storage Engine focuses on the storage engine itself; we will stay single-node. Topics such as replication and consensus are out of scope. Yet, if this format works, we may cover them in a future series. The structure of each post will be as follows: Introduction : The theory for what you are about to build that week. Your tasks : A list of tasks to complete the week’s challenges. Note that you can complete the series in any programming language you want. Further notes : Additional perspective on how things work in real systems.

0 views
Alex Jacobs 1 weeks ago

The Case Against pgvector

If you’ve spent any time in the vector search space over the past year, you’ve probably read blog posts explaining why pgvector is the obvious choice for your vector database needs. The argument goes something like this: you already have Postgres, vector embeddings are just another data type, why add complexity with a dedicated vector database when you can keep everything in one place? It’s a compelling story. And like most of the AI influencer bullshit that fills my timeline, it glosses over the inconvenient details. I’m not here to tell you pgvector is bad. It’s not. It’s a useful extension that brings vector similarity search to Postgres. But after spending some time trying to build a production system on top of it, I’ve learned that the gap between “works in a demo” and “scales in production” is… significant. What bothers me most: the majority of content about pgvector reads like it was written by someone who spun up a local Postgres instance, inserted 10,000 vectors, ran a few queries, and called it a day. The posts are optimistic, the benchmarks are clean, and the conclusions are confident. They’re also missing about 80% of what you actually need to know. I’ve read through   dozens of these posts. × Understanding Vector Search and HNSW Index with pgvector HNSW Indexes with Postgres and pgvector Understand Indexes in pgvector External Indexing for pgvector Exploring Postgres pgvector HNSW Index Storage pgvector v0.5.0: Faster semantic search with HNSW indexes Early Look at HNSW Performance with pgvector Vector Indexes in Postgres using pgvector: IVFFlat vs HNSW Vector Database Basics: HNSW Index PostgreSQL Vector Indexing with HNSW They all cover the same ground: here’s how to install pgvector, here’s how to create a vector column, here’s a simple similarity search query. Some of them even mention that you should probably add an index. What they don’t tell you is what happens when you actually try to run this in production. Let’s start with indexes, because this is where the tradeoffs start. pgvector gives you two index types: IVFFlat and HNSW. The blog posts will tell you that HNSW is newer and generally better, which is… technically true but deeply unhelpful. IVFFlat (Inverted File with Flat quantization) partitions your vector space into clusters. During search, it identifies the nearest clusters and only searches within those. Image source: IVFFlat or HNSW index for similarity search? by Simeon Emanuilov HNSW (Hierarchical Navigable Small World) builds a multi-layer graph structure for search. Image source: IVFFlat or HNSW index for similarity search? by Simeon Emanuilov None of the blogs mention that building an HNSW index on a few million vectors can consume 10+ GB of RAM or more (depending on your vector dimensions and dataset size). On your production database. While it’s running. For potentially hours. In a typical application, you want newly uploaded data to be searchable immediately. User uploads a document, you generate embeddings, insert them into your database, and they should be available in search results. Simple, right? When you insert new vectors into a table with an index, one of two things happens: IVFFlat : The new vectors are inserted into the appropriate clusters based on the existing structure. This works, but it means your cluster distribution gets increasingly suboptimal over time. The solution is to rebuild the index periodically. Which means downtime, or maintaining a separate index and doing an atomic swap, or accepting degraded search quality. HNSW : New vectors are added to the graph structure. This is better than IVFFlat, but it’s not free. Each insertion requires updating the graph, which means memory allocation, graph traversals, and potential lock contention. Neither of these is a deal-breaker in isolation. But here’s what happens in practice: you’re inserting vectors continuously throughout the day. Each insertion is individually cheap, but the aggregate load adds up. Your database is now handling your normal transactional workload, analytical queries, AND maintaining graph structures in memory for vector search. Let’s say you’re building a document search system. Users upload PDFs, you extract text, generate embeddings, and insert them. The user expects to immediately search for that document. Here’s what actually happens: With no index : The insert is fast, the document is immediately available, but your searches do a full sequential scan. This works fine for a few thousand documents. At a few hundred thousand? Your searches start taking seconds. Millions? Good luck. With IVFFlat : The insert is still relatively fast. The vector gets assigned to a cluster. But whoops, a problem. Those initial cluster assignments were based on the data distribution when you built the index. As you add more data, especially if it’s not uniformly distributed, some clusters get overloaded. Your search quality degrades. You rebuild the index periodically to fix this, but during the rebuild (which can take hours for large datasets), what do you do with new inserts? Queue them? Write to a separate unindexed table and merge later? With HNSW : The graph gets updated on each insert through incremental insertion, which sounds great. But updating an HNSW graph isn’t free—you’re traversing the graph to find the right place to insert the new node and updating connections. Each insert acquires locks on the graph structure. Under heavy write load, this becomes a bottleneck. And if your write rate is high enough, you start seeing lock contention that slows down both writes and reads. Here’s the real nightmare: you’re not just storing vectors. You have metadata—document titles, timestamps, user IDs, categories, etc. That metadata lives in other tables (or other columns in the same table). You need that metadata and the vectors to stay in sync. In a normal Postgres table, this is easy—transactions handle it. But when you’re dealing with index builds that take hours, keeping everything consistent gets complicated. For IVFFlat, periodic rebuilds are basically required to maintain search quality. For HNSW, you might need to rebuild if you want to tune parameters or if performance has degraded. The problem is that index builds are memory-intensive operations, and Postgres doesn’t have a great way to throttle them. You’re essentially asking your production database to allocate multiple (possibly dozens) gigabytes of RAM for an operation that might take hours, while continuing to serve queries. You end up with strategies like: None of these are “wrong” exactly. But they’re all workarounds for the fact that pgvector wasn’t really designed for high-velocity real-time ingestion. Okay but let’s say you solve your index and insert problems. Now you have a document search system with millions of vectors. Documents have metadata—maybe they’re marked as , , or . A user searches for something, and you only want to return published documents. Simple enough. But now you have a problem: should Postgres filter on status first (pre-filter) or do the vector search first and then filter (post-filter)? This seems like an implementation detail. It’s not. It’s the difference between queries that take 50ms and queries that take 5 seconds. It’s also the difference between returning the most relevant results and… not. Pre-filter works great when the filter is highly selective (1,000 docs out of 10M). It works terribly when the filter isn’t selective—you’re still searching millions of vectors. Post-filter works when your filter is permissive. Here’s where it breaks: imagine you ask for 10 results with . pgvector finds the 10 nearest neighbors, then applies your filter. Only 3 of those 10 are published. You get 3 results back, even though there might be hundreds of relevant published documents slightly further away in the embedding space. The user searched, got 3 mediocre results, and has no idea they’re missing way better matches that didn’t make it into the initial k=10 search. You can work around this by fetching more vectors (say, ) and then filtering, but now: With pre-filter, you avoid this problem, but you get the performance problems I mentioned. Pick your poison. Now add another dimension: you’re filtering by user_id AND category AND date_range. What’s the right strategy now? The planner will look at table statistics, index selectivity, and estimated row counts and come up with a plan. That plan will probably be wrong, or at least suboptimal, because the planner’s cost model wasn’t built for vector similarity search. And it gets worse: you’re inserting new vectors throughout the day. Your index statistics are outdated. The plans get increasingly suboptimal until you ANALYZE the table. But ANALYZE on a large table with millions of rows takes time and resources. And it doesn’t really understand vector data distribution in a meaningful way—it can tell you how many rows match , but not how clustered those vectors are in the embedding space, which is what actually matters for search performance. You end up with hacks: query rewriting for different user types, partitioning your data into separate tables, CTE optimization fences to force the planner’s hand, or just fetching way more results than needed and filtering in application code. None of these are sustainable at scale. Dedicated vector databases have solved this. They understand the cost model of filtered vector search and make intelligent decisions: OpenSearch’s k-NN plugin, for example, lets you specify pre-filter or post-filter behavior. Pinecone automatically handles filter selectivity. Weaviate has optimizations for common filter patterns. With pgvector, you get to build all of this yourself. Or live with suboptimal queries. Or hire a Postgres expert to spend weeks tuning your query patterns. Oh, and if you want hybrid search—combining vector similarity with traditional full-text search—you get to build that yourself too. Postgres has excellent full-text search capabilities. pgvector has excellent vector search capabilities. Combining them in a meaningful way? That’s on you. You need to: Again, not impossible. Just another thing that many dedicated vector databases provide out of the box. Timescale has released pgvectorscale , which addresses some of these issues. It adds: This is great! It’s also an admission that pgvector out of the box isn’t sufficient for production use cases. pgvectorscale is still relatively new, and adopting it means adding another dependency, another extension, another thing to manage and upgrade. For some teams, that’s fine. For others, it’s just more evidence that maybe the “keep it simple, use Postgres” argument isn’t as simple as it seemed. Oh, and if you’re running on RDS, pgvectorscale isn’t available. AWS doesn’t support it. So enjoy managing your own Postgres instance if you want these improvements, or just… keep dealing with the limitations of vanilla pgvector. The “just use Postgres” simplicity keeps getting simpler. I get the appeal of pgvector. Consolidating your stack is good. Reducing operational complexity is good. Not having to manage another database is good. But here’s what I’ve learned: for most teams, especially small teams, dedicated vector databases are actually simpler. With a managed vector database (Pinecone, Weaviate, Turbopuffer, etc.), you typically get: Yes, it’s another service to pay for. But compare: Turbopuffer starts at $64 month with generous limits. For a lot of teams, the managed service is actually cheaper. pgvector is an impressive piece of technology. It brings vector search to Postgres in a way that’s technically sound and genuinely useful for many applications. But it’s not a panacea. Understand the tradeoffs. If you’re building a production vector search system: Index management is hard . Rebuilds are memory-intensive, time-consuming, and disruptive. Plan for this from day one. Query planning matters . Filtered vector search is a different beast than traditional queries, and Postgres’s planner wasn’t built for this. Real-time indexing has costs . Either in memory, in search quality, or in engineering time to manage it. The blog posts are lying to you (by omission). They’re showing you the happy path and ignoring the operational reality. Managed offerings exist for a reason . There’s a reason that Pinecone, Weaviate, Qdrant, and others exist and are thriving. Vector search at scale has unique challenges that general-purpose databases weren’t designed to handle. The question isn’t “should I use pgvector?” It’s “am I willing to take on the operational complexity of running vector search in Postgres?” For some teams, the answer is yes. You have database expertise, you need the tight integration, you’re willing to invest the time. For many teams—maybe most teams—the answer is probably no. Use a tool designed for the job. Your future self will thank you. Lower memory footprint during index creation Reasonable query performance for many use cases Index creation is faster than HNSW Requires you to specify the number of lists (clusters) upfront That number significantly impacts both recall and query performance The commonly recommended formula ( ) is a starting point at best Recall can be… disappointing depending on your data distribution New vectors get assigned to existing clusters, but clusters don’t rebalance without a full rebuild Better recall than IVFFlat for most datasets More consistent query performance Scales well to larger datasets Significantly higher memory requirements during index builds Index creation is slow—painfully slow for large datasets The memory requirements aren’t theoretical; they are real, and they’ll take down your database if you’re not careful IVFFlat : The new vectors are inserted into the appropriate clusters based on the existing structure. This works, but it means your cluster distribution gets increasingly suboptimal over time. The solution is to rebuild the index periodically. Which means downtime, or maintaining a separate index and doing an atomic swap, or accepting degraded search quality. HNSW : New vectors are added to the graph structure. This is better than IVFFlat, but it’s not free. Each insertion requires updating the graph, which means memory allocation, graph traversals, and potential lock contention. Write to a staging table, build the index offline, then swap it in (but now you have a window where searches miss new data) Maintain two indexes and write to both (double the memory, double the update cost) Build indexes on replicas and promote them Accept eventual consistency (users upload documents that aren’t searchable for N minutes) Provision significantly more RAM than your “working set” would suggest You’re doing way more distance calculations than needed You still don’t know if 100 is enough Your query performance suffers You’re guessing at the right oversampling factor Apply all filters first, then search? (Pre-filter) Search first, then apply all filters? (Post-filter) Apply some filters first, search, then apply remaining filters? (Hybrid) Which filters should you apply in which order? Adaptive strategies : Some databases dynamically choose pre-filter or post-filter based on estimated selectivity Configurable modes : Others let you specify the strategy explicitly when you know your data distribution Specialized indexes : Some build indexes that support efficient filtered search (like filtered HNSW) Query optimization : They track statistics specific to vector operations and optimize accordingly Decide how to weight vector similarity vs. text relevance Normalize scores from two different scoring systems Tune the balance for your use case Probably implement Reciprocal Rank Fusion or something similar StreamingDiskANN, a new search backend that’s more memory-efficient Better support for incremental index builds Improved filtering performance Intelligent query planning for filtered searches Hybrid search built in Real-time indexing without memory spikes Horizontal scaling without complexity Monitoring and observability designed for vector workloads The cost of a managed vector database for your workload vs. the cost of over-provisioning your Postgres instance to handle index builds vs. the engineering time to tune queries and manage index rebuilds vs. the opportunity cost of not building features because you’re fighting your database Index management is hard . Rebuilds are memory-intensive, time-consuming, and disruptive. Plan for this from day one. Query planning matters . Filtered vector search is a different beast than traditional queries, and Postgres’s planner wasn’t built for this. Real-time indexing has costs . Either in memory, in search quality, or in engineering time to manage it. The blog posts are lying to you (by omission). They’re showing you the happy path and ignoring the operational reality. Managed offerings exist for a reason . There’s a reason that Pinecone, Weaviate, Qdrant, and others exist and are thriving. Vector search at scale has unique challenges that general-purpose databases weren’t designed to handle.

0 views

Fast and Scalable Data Transfer Across Data Systems

Fast and Scalable Data Transfer Across Data Systems Haralampos Gavriilidis, Kaustubh Beedkar, Matthias Boehm, and Volker Mark SIGMOD'25 We live in exciting times, unimaginably large language models getting better each day, and a constant stream of amazing demos. And yet, efficiently transferring a table between heterogeneous systems is an open research problem! An example from the paper involves transferring data from PostgreSQL to pandas. Optimizing this transfer time is important and non-trivial. The paper describes a system named XDBC. XDBC software runs on both the source and the destination data management systems (DMS), as illustrated by Fig. 4: Source: https://dl.acm.org/doi/10.1145/3725294 The XDBC client/server processes are organized as a pipeline. Data parallelism within a stage is exploited by assigning 1 or more workers (e.g., cores) to each stage. There are a lot of knobs which can affect end-to-end throughput: Number of workers assigned to each task Data interchange format (row-major, column-major, Arrow ) Compression ( zstd , snappy , lzo , lz4 ) Section 4.1 of the paper claims the search space is so large that brute force search will not work, so a heuristic algorithm is used. The heuristic algorithm assumes accurate performance models which can estimate performance of each pipeline stage given a specific configuration. This model is based on real-world single-core performance measurements, and Gustafson’s law to estimate multi-core scaling. The algorithm starts by assigning 1 worker to each pipeline stage (in both the client and server). An iterative process then locates the pipeline stage which is estimated to be the slowest and assigns additional workers to it until it is no longer the bottleneck. This process continues until no more improvement can be found, due to one of the following reasons: All available CPU cores have been assigned Network bandwidth is the bottleneck If the process ends with more CPU cores available, then a hard-coded algorithm determines the best compression algorithm given the number of cores remaining. The data interchange format is determined based on which formats the source and destination DMSs support, and which compression algorithm was chosen. The XDBC optimizer has a lot of similarities with the Alkali optimizer . Here are some differences: Alkali does not require tasks to be executed on separate cores. For example, Alkali would allow a single core to execute both the and pipeline stages. Alkali uses an SMT solver to determine the number of cores to assign to each stage. The Alkali performance model explicitly takes into account inter-core bandwidth requirements. Alkali doesn’t deal with compression. Fig. 7(a) shows results from the motivating example (PostgreSQL→Pandas). Fig. 7(b) compares XDBC vs built-in Pandas functions to read CSV data over HTTP. connector-x is a more specialized library which supports reading data into Python programs specifically. Source: https://dl.acm.org/doi/10.1145/3725294 Dangling Pointers There are many search spaces which are too large for brute force. Special-case heuristic algorithms are one fallback, but as the Alkali paper shows, there are other approaches (e.g., LP solvers, ILP solvers, SMT solvers, machine learning models). It would be great to see cross-cutting studies comparing heuristics to other approaches. Subscribe now Source: https://dl.acm.org/doi/10.1145/3725294 The XDBC client/server processes are organized as a pipeline. Data parallelism within a stage is exploited by assigning 1 or more workers (e.g., cores) to each stage. There are a lot of knobs which can affect end-to-end throughput: Number of workers assigned to each task Data interchange format (row-major, column-major, Arrow ) Compression ( zstd , snappy , lzo , lz4 ) All available CPU cores have been assigned Network bandwidth is the bottleneck Alkali does not require tasks to be executed on separate cores. For example, Alkali would allow a single core to execute both the and pipeline stages. Alkali uses an SMT solver to determine the number of cores to assign to each stage. The Alkali performance model explicitly takes into account inter-core bandwidth requirements. Alkali doesn’t deal with compression.

0 views
Phil Eaton 2 weeks ago

Transaction pooling for Postgres with pgcat

This is an external post of mine. Click here if you are not redirected.

0 views
Jack Vanlightly 2 weeks ago

A Fork in the Road: Deciding Kafka’s Diskless Future

“ The Kafka community is currently seeing an unprecedented situation with three KIPs ( KIP-1150 , KIP-1176 , KIP-1183) simultaneously addressing the same challenge of high replication costs when running Kafka across multiple cloud availability zones. ” — Luke Chen, The Path Forward for Saving Cross-AZ Replication Costs KIPs At the time of writing the Kafka project finds itself at a fork in the road where choosing the right path forward for implementing S3 topics has implications for the long-term success of the project. Not just the next couple of years, but the next decade. Open-source projects live and die by these big decisions and as a community, we need to make sure we take the right one. This post explains the competing KIPs, but goes further and asks bigger questions about the future direction of Kafka. Before comparing proposals, we should step back and ask what kind of system we want Kafka to become. Kafka now faces two almost opposing forces. One force is stabilizing: the on-prem deployments and the low latency workloads that depend on local disks and replication. Kafka must continue to serve those use cases. The other force is disrupting: the elastic, cloud-native workloads that favor stateless compute and shared object storage. Relaxed-latency workloads such as analytics have seen a shift in system design with durability increasingly delegated to shared object storage, freeing the compute layer to be stateless, elastic, and disposable. Many systems now scale by adding stateless workers rather than rebalancing stateful nodes. In a stateless compute design, the bottleneck shifts from data replication to metadata coordination. Once durability moves to shared storage, sequencing and metadata consistency become the new limits of scalability. That brings us to the current moment, with three competing KIPs defining how to integrate object storage directly into Kafka. While we evaluate these KIPs, it’s important to consider the motivations for building direct-to-S3 topics. Cross-AZ charges are typically what are on people’s minds, but it’s a mistake to think of S3 simply as a cheaper disk or a networking cheat. The shift is also architectural, providing us an opportunity to achieve those operational benefits such as elastic stateless compute.  The devil is in the details: how each KIP enables Kafka to leverage object storage while also retaining Kafka’s soul and what made it successful in the first place.  With that in mind, while three KIPs have been submitted, it comes down to two different paths: Revolutionary : Choose a direct-to-S3 topic design that maximizes the benefits of an object-storage architecture, with greater elasticity and lower operational complexity. However, in doing so, we may increase the implementation cost and possibly the long-term code maintenance too by maintaining two very different topic-models in the same project (leader-based replication and direct-to-S3). Evolutionary : Shoot for an evolutionary design that makes use of existing components to reduce the need for large refactoring or duplication of logic. However, by coupling to the existing architecture, we forfeit the extra benefits of object storage, focusing primarily on networking cost savings (in AWS and GCP). Through this coupling, we also run the risk of achieving the opposite: harder to maintain code by bending and contorting a second workload into an architecture optimized for something else. In this post I will explain the two paths in this forked road, how the various KIPs map onto those paths, and invite the whole community to think through what they want for Apache Kafka for the next decade. Note that I do not include KIP-1183 as it looks dead in the water, and not a serious contender. The KIP proposes AutoMQ’s storage abstractions without the accompanying implementation. Which perhaps cynically, seems to benefit AutoMQ were it ever adopted, leaving the community to rewrite the entire storage subsystem again. If you want a quick summary of the three KIPs (including KIP-1183), you can read Luke Chen’s The Path Forward for Saving Cross-AZ Replication Costs KIPs or Anton Borisov’s summary of the three KIPs .  This post is structured as follows: The term “Diskless” vs “Direct-to-S3” The Common Parts. Some approaches are shared across multiple implementations and proposals. Revolutionary: KIPs and real-world implementations Evolutionary: Slack’s KIP-1176 The Hybrid: balancing revolution with evolution Deciding Kafka’s future I used the term “diskless” in the title as that is the current hype word. But it is clear that not all designs are actually diskless in the same spirit as “serverless”. Serverless implies that users no longer need to consider or manage servers at all, not that there are no servers.  In the world of open-source, where you run stuff yourself, diskless would have to mean literally “no disks”, else you will be configuring disks as part of your deployment. But all the KIPs (in their current state) depend on disks to some extent, even KIP-1150 which was proposed as diskless. In most cases, disk behavior continues to influence performance and therefore correct disk provisioning will be important. So I’m not a fan of “diskless”, I prefer “direct-to-S3”, which encompasses all designs that treat S3 (and other object stores) as the only source of durability. The main commonality between all Direct-to-S3 Kafka implementations and design proposals is the uploading of objects that combine the data of multiple topics. The reasons are two-fold: Avoiding the small file problem . Most designs are leaderless for producer traffic, allowing for any server to receive writes to any topic. To avoid uploading a multitude of tiny files, servers accumulate batches in a buffer until ready for upload. Before upload, the buffer is sorted by topic id and partition, to make compaction and some reads more efficient by ensuring that data of the same topic and same partition are in contiguous byte ranges. Pricing . The pricing of many (but not all) cloud object storage services penalize excessive requests, so it can be cheaper to roll-up whatever data has been received in the last X milliseconds and upload it with a single request. In the leader-based model, the leader determines the order of batches in a topic partition. But in the leaderless model, multiple brokers could be simultaneously receiving produce batches of the same topic partition, so how do we order those batches? We need a way of establishing a single order for each partition and we typically use the word “sequencing” to describe that process. Usually there is a central component that does the sequencing and metadata storage, but some designs manage the sequencing in other ways. WarpStream was the first to demonstrate that you could hack the metadata step of initiating a producer to provide it with broker information that would align the producer with a zone-local broker for the topics it is interested in. The Kafka client is leader-oriented, so we just pass it a zone-local broker and tell the client “this is the leader”. This is how all the leaderless designs ensure producers write to zone-local brokers. It’s not pretty, and we should make a future KIP to avoid the need for this kind of hack. Consumer zone-alignment heavily depends on the particular design, but two broad approaches exist: Leaderless: The same way that producer alignment works via metadata manipulation or using KIP-392 (fetch from follower) which can be used in a leaderless context. Leader-based:  Zone-aware consumer group assignment as detailed in KIP-881: Rack-aware Partition Assignment for Kafka Consumers . The idea is to use consumer-to-partition assignment to ensure consumers are only assigned zone-local partitions (where the partition leader is located). KIP-392 (fetch-from-follower) , which is effective for designs that have followers (which isn’t always the case). Given almost all designs upload combined objects, we need a way to make those mixed objects more read optimized. This is typically done through compaction, where combined objects are ultimately separated into per-topic or even per-partition objects. Compaction could be one-shot or go through multiple rounds. The “revolutionary” path draws a new boundary inside Kafka by separating what can be stateless from what must remain stateful. Direct-to-S3 traffic is handled by a lightweight, elastic layer of brokers that simply serve producers and consumers. The direct-to-S3 coordination (sequencing/metadata) is incorporated into the stateful side of regular brokers where coordinators, classic topics and KRaft live. I cover three designs in the “revolutionary” section: WarpStream (as a reference, a kind of yardstick to compare against) KIP-1150 revision 1 Aiven Inkless (a Kafka-fork) Before we look at the KIPs that describe possible futures for Apache Kafka, let’s look at a system that was designed from scratch with both cross-AZ cost savings and elasticity (from object storage) as its core design principles. WarpStream was unconstrained by an existing stateful architecture, and with this freedom, it divided itself into: Leaderless, stateless and diskless agents that handle Kafka clients, as well as compaction/cleaning work.  Coordination layer : A central metadata store for sequencing, metadata storage and housekeeping coordination. Fig WS-A. The WarpStream stateless/stateful split architecture. As per the Common Parts section, the zone alignment and sorted combined object upload strategies are employed. Fig WS-B. The WarpStream write path. On the consumer side, which again is leaderless and zone-local, a per-zone shared cache is implemented (which WarpStream dubbed distributed mmap). Within a zone, this shared cache assigns each agent a portion of the partition-space. When a consumer fetch arrives, an agent will download the object byte range itself if it is responsible for that partition, else it will ask the responsible agent to do that on its behalf. That way, we ensure that multiple agents per zone are not independently downloading the same data, thus reducing S3 costs. Fig WS-C. Shared per-zone read cache to reduce S3 throughput and request costs. WarpStream implements agent roles (proxy, job) and agent groups to separate the work of handling producer, consumer traffic from background jobs such as compaction, allowing for independent scaling for each workload. The proxy role (producer/consumer Kafka client traffic) can be further divided into proxy-producer and proxy-consumer.  Agents can be deployed as dedicated agent groups, which allows for further separation of workloads. This is useful for avoiding noisy neighbor issues, running different groups in different VPCs and scaling different workloads that hit the same topics. For example, you could use one proxy group for microservices, and a separate proxy-consumer group for an analytics workload. Fig WS-D. Agent roles and groups can be used for shaping traffic, independent scaling and deploying different workloads into separate VPCs. Being a pure Direct-to-S3 system allowed WarpStream to choose a design that clearly separates traffic serving work into stateless agents and the coordination logic into one central metadata service. The traffic serving layer is highly elastic, with relatively simple agents that require no disks at all. Different workloads benefit from independent and flexible scaling via the agent roles and groups. The point of contention is the metadata service, which needs to be carefully managed and scaled to handle the read/write metadata volume of the stateless agents. Confluent Freight Clusters follow a largely similar design principle of splitting stateless brokers from central coordination. I will write about the Freight design sometime soon in the future. Apache Kafka is a stateful distributed system, but next we’ll see how KIP-1150 could fulfill much of the same capabilities as WarpStream. KIP-1150 has continued to evolve since it was first proposed, undergoing two subsequent major revisions between the KIP itself and mailing list discussion . This section describes the first version of the KIP, created in April 2025. KIP-1150 revision 1 uses a leaderless architecture where any broker can serve Kafka producers and consumers of any topic partition. Batch Coordinators (replicated state machines like Group and Transaction Coordinators), handle coordination (object sequencing and metadata storage). Brokers accumulate Kafka batches and upload shared objects to S3 (known as Shared Log Segment Objects, or SLSO). Metadata that maps these blocks of Kafka batches to SLSOs (known as Batch Coordinates) is then sent to a Batch Coordinator (BC) that sequences the batches (assigning offsets) to provide a global order of those batches per partition. The BC acts as sequencer and batch coordinate database for later lookups, allowing for the read path to retrieve batches from SLSOs. The BCs will also apply idempotent and transactional producer logic (not yet finalized). Fig KIP-1150-rev1-A. Leaderless brokers upload shared objects, then commit and sequence them via the Batch Coordinator. The Batch Coordinator (BC) is the stateful component akin to WarpStream’s metadata service. Kafka has other coordinators such as the Group Coordinator (for the consumer group protocol), the Transaction Coordinator (for reliable 2PC) and Share Coordinator (for queues aka share groups). The coordinator concept, for centralizing some kind of coordination, has a long history in Kafka. The BC is the source of truth about uploaded objects, Direct-to-S3 topic partitions, and committed batches. This is the component that contains the most complexity, with challenges around scaling, failovers, reliability as well as logic for idempotency, transactions, object compaction coordination and so on. A Batch Coordinator has the following roles: Sequencing . Chooses the total ordering for writes, assigning offsets without gaps or duplicates. Metadata storage . Stores all metadata that maps partition offset ranges to S3 object byte ranges. Serving lookup requests . Serving requests for log offsets. Serving requests for batch coordinates (S3 object metadata). Partition CRUD operations . Serving requests for atomic operations (creating partitions, deleting topics, records, etc.) Data expiration Managing data expiry and soft deletion. Coordinating physical object deletion (performed by brokers). The Group and Transaction Coordinators use internal Kafka topics to durably store their state and rely on the KRaft Controller for leader election (coordinators are highly available with failovers). This KIP does not specify whether the Batch Coordinator will be backed by a topic, or use some other option such as a Raft state machine (based on KRaft state machine code). It also proposes that it could be pluggable. For my part, I would prefer all future coordinators to be KRaft state machines, as the implementation is rock solid and can be used like a library to build arbitrary state machines inside Kafka brokers. When a producer starts, it sends a Metadata request to learn which brokers are the leaders of each topic partition it cares about. The Metadata response contains a zone-local broker. The producer sends all Produce requests to this broker. The receiving broker accumulates batches of all partitions and uploads a shared log segment object (SLSO) to S3. The broker commits the SLSO by sending the metadata of the SLSO (the metadata is known as the batch coordinates) to the Batch Coordinator. The coordinates include the S3 object metadata and the byte ranges of each partition in the object. The Batch Coordinator assigns offsets to the written batches, persists the batch coordinates, and responds to the broker. The broker sends all associated acknowledgements to the producers. Brokers can parallelize the uploading SLSOs to S3, but commit them serially to the Batch Coordinator. When batches are first written, a broker does not assign offsets as would happen with a regular topic, as this is done after the data is written, when it is sequenced and indexed by the Batch Coordinator. In other words, the batches stored in S3 have no offsets stored within the payloads as is normally the case. On consumption, the broker must inject the offsets into the Kafka batches, using metadata from the Batch Coordinator. The consumer sends a Fetch request to the broker. The broker checks its cache and on a miss, the broker queries the Batch Coordinator for the relevant batch coordinates. The broker downloads the data from object storage. The broker injects the computed offsets and timestamps into the batches (the offsets being part of the batch coordinates). The broker constructs and sends the Fetch response to the Consumer. Fig KIP-1150-rev1-B. The consume path of KIP-1150 (ignoring any caching logic here). The KIP notes that broker roles could be supported in the future. I believe that KIP-1150 revision 1 starts becoming really powerful with roles akin to WarpStream. That way we can separate out direct-to-S3 topic serving traffic and object compaction work on proxy brokers , which become the elastic serving and background job layer. Batch Coordinators would remain hosted on standard brokers, which are already stateful. With broker roles, we can see how Kafka could implement its own WarpStream-like architecture, which makes full use of disaggregated storage to enable better elasticity. Fig KIP-1150-rev1-C. Broker roles would bring the stateless/stateful separation that would unlock elasticity in the high throughput workload of many Direct-to-S3 deployments. Things like supporting idempotency, transactions, caching and object compaction are left to be decided later. While taxing to design, these things look doable within the basic framework of the design. But as I already mentioned in this post, this will be costly in effort to develop but also may come with a long term code maintenance overhead if complex parts such as transactions are maintained twice. It may also be possible to refactor rather than do wholesale rewrites. Inkless is Aiven’s direct-to-S3 fork of Apache Kafka. The Inkless design shares the combined object upload part and metadata manipulation that all designs across the board are using. It is also leaderless for direct-to-S3 topics. Inkless is firmly in the revolutionary camp. If it were to implement broker roles, it would make this Kafka-fork much closer to a WarpStream-like implementation (albeit with some issues concerning the coordination component as we’ll see further down). While Inkless is described as a KIP-1150 implementation, we’ll see that it actually diverges significantly from KIP-1150, especially the later revisions (covered later). Inkless eschewed the Batch Coordinator of KIP-1150 in favor of a Postgres instance, with coordination being executed through Table Valued Functions (TVF) and row locking where needed. Fig Inkless-A. The write path. On the read side, each broker must discover what batches exist by querying Postgres (using a TVF again), which returns the next set of batch coordinates as well as the high watermark. Now the broker knows where the next batches are located, it requests those batches via a read-through cache. On a cache miss, it fetches the byte ranges from the relevant objects in S3. Fig Inkless-B. The read path. Inkless bears some resemblance to KIP-1150 revision 1, except the difficult coordination bits are delegated to Postgres. Postgres does all the sequencing, metadata storage, as well as coordination for compaction and file cleanup. For example, compaction is coordinated via Postgres, with a TVF that is periodically called by each broker which finds a set of files which together exceed a size threshold and places them into a merge work order (tables file_merge_work_items and file_merge_work_item_files ) that the broker claims. Once carried out, the original files are marked for deletion (which is another job that can be claimed by a broker). Fig Inkless-C. Direct-to-S3 traffic uses a leaderless broker architecture with coordination owned by Postgres. Inkless doesn’t implement transactions, and I don’t think Postgres could take on the role of transaction coordinator, as the coordinator does more than sequencing and storage. Inkless will likely have to implement some kind of coordinator for that. The Postgres data model is based on the following tables: logs . Stores the Log Start Offset and High Watermark of each topic partition, with the primary key of topic_id and partition. files . Lists all the objects that host the topic partition data. batches . Maps Kafka batches to byte ranges in Files.  producer_state . All the producer state needed for idempotency. Some other tables for housekeeping, and the merge work items. Fig Inkless-D. Postgres data model. The Commit File TVF , which sequences and stores the batch coordinates, works as follows: A broker opens a transaction and submits a table as an argument, containing the batch coordinates of the multiple topics uploaded in the combined file. The TVF logic creates a temporary table (logs_tmp) and fills it via a SELECT on the logs table, with the FOR UPDATE clause which obtains a row lock on each topic partition row in the logs table that matches the list of partitions being submitted. This ensures that other brokers that are competing to add batches to the same partition(s) queue up behind this transaction. This is a critical barrier that avoids inconsistency. These locks are held until the transaction commits or aborts. Next it, inside a loop, partition-by-partition, the TVF: Updates the producer state. Updates the high watermark of the partition (a row in the logs table). Inserts the batch coordinates into the batches table (sequencing and storing them). Commits the transaction. Apache Kafka would not accept a Postgres dependency of course, and KIP-1150 has not proposed centralizing coordination in Postgres either. But the KIP has suggested that the Batch Coordinator be pluggable, which might leave it open for using Postgres as a backing implementation. As a former database performance specialist, the Postgres locking does concern me a bit. It blocks on the logs table rows scoped to the topic id and partition. An ORDER BY prevents deadlocks, but given the row locks are maintained until the transaction commits, I imagine that given enough contention, it could cause a convoy effect of blocking. This blocking is fair, that is to say, First Come First Serve (FCFS) for each individual row.  For example, with 3 transactions: T1 locks rows 11–15, T2 wants to lock 6-11, but only manages 6-10 as it blocks on row 11. Meanwhile T2 wants to lock 1-6, but only manages 1-5 as it blocks on 6. We now have a dependency tree where T1 blocks T2 and T2 blocks T3. Once T1 commits, the others get unblocked, but under sustained load, this kind of locking and blocking can quickly cascade, such that once contention starts, it rapidly expands. This contention is sensitive to the number of concurrent transactions and the number of partitions per commit. A common pattern with this kind of locking is that up until a certain transaction throughput everything is fine, but at the first hint of contention, the whole thing slows to a crawl. Contention breeds more contention. I would therefore caution against the use of Postgres as a Batch Coordinator implementation. The following is a very high-level look at Slack’s KIP-1176 , in the interests of keeping this post from getting too detailed. There are three key points to this KIP’s design: Maintain leader-based topic partitions (producers continue to write to leaders), but replace Kafka replication protocol with a per-broker S3-based write-ahead-log (WAL). Try to preserve existing partition replica code for idempotency and transactions. Reuse existing tiered storage for long-term S3 data management. Fig Slack-KIP-1176-A. Leader-based architecture retained, replication replaced by an S3 WAL. Tiered storage manages long-term data. The basic idea is to preserve the leader-based architecture of Kafka, with each leader replica continuing to write to an active local log segment file, which it rotates periodically. A per-broker write-ahead-log (WAL) replaces replication. A WAL Combiner component in the Kafka broker progressively (and aggressively) tiers portions of the local active log segment files (without closing them), combining them into multi-topic objects uploaded to S3. Once a Kafka batch has been written to the WAL, the broker can send an acknowledgment to its producer. This active log segment tiering does not change how log segments are rotated. Once an active log segment is rotated out (by closing it and creating a new active log segment file), it can be tiered by the existing tiered storage component, for the long-term. Fig Slack-KIP-1176-B. Produce batches are written to the page cache as usual, but active log segment files are aggressively tiered to S3 (possibly Express One Zone) in combined log segment files. The WAL acts as write-optimized S3 storage and the existing tiered storage uploads closed log segment files for long-term storage. Once all data of a given WAL object has been tiered, it can be deleted. The WAL only becomes necessary during topic partition leader-failovers, where the new leader replica bootstraps itself from the WAL. Alternatively, each topic partition can have one or more followers which actively reconstruct local log segments from the WAL, providing a faster failover. The general principle is to keep as much of Kafka unchanged as possible, only changing from the Kafka replication protocol to an S3 per-broker WAL. The priority is to avoid the need for heavy rework or reimplementation of logic such as idempotency, transactions and share groups integration. But it gives up elasticity and the additional architectural benefits that come from building on disaggregated storage. Having said all of the above. There are a lot of missing or hacky details that currently detract from the evolutionary goal. There is a lot of hand-waving when it comes to correctness too. It is not clear that this KIP will be able to deliver a low-disruption evolutionary design that is also correct, highly available and durable. Discussion in the mailing list is ongoing. Luke Chen remarked: “ the current availability story is weak… It’s not clear if the effort is still small once details on correctness, cost, cleanness are figured out. ”, and I have to agree. The second revision of KIP-1150 replaces the future object compaction logic by delegating long-term storage management to the existing tiered storage abstraction (like Slack’s KIP-1176). The idea is to: Remove Batch Coordinators from the read path. Avoid separate object compaction logic by delegating long-term storage management to tiered storage (which already exists).  Rebuild per-partition log segments from combined objects in order to: Submit them for long-term tiering (works as a form of object compaction too). Serve consumer fetch requests. The entire write path becomes a three stage process: Stage 1 – Produce path, synchronous . Uploads multi-topic WAL Segments to S3 and sequences the batches, acknowledging to producers once committed. This is unchanged except SLSOs are now called WAL Segments. Stage 2 – Per-partition log segment file construction, asynchronous . Each broker is assigned a subset of topic partitions. The brokers download WAL segment byte ranges that host these assigned partitions and append to on-disk per-partition log segment files. Stage 3 – Tiered storage, asynchronous . The tiered storage architecture tiers the locally cached topic partition log segments files as normal. Stage 1 – The produce path The produce path is the same, but SLSOs are now called WAL Segments. Fig KIP-1150-rev2-A. Write path stage 1 (the synchronous part). Leaderless brokers upload multi-topic WAL Segments, then commit and sequence them via the Batch Coordinator. Stage 2 – Local per-partition segment caching The second stage is preparation for both: Stage 3, segment tiering (tiered storage). The read pathway for tailing consumers Fig KIP-1150-rev2-B. Stage 2 of the write-path where assigned brokers download WAL segments and append to local log segments. Each broker is assigned a subset of topic partitions. Each broker polls the BC to learn of new WAL segments. Each WAL segment that hosts any of the broker’s assigned topic partitions will be downloaded (at least the byte range of its assigned partitions). Once the download completes, the broker will inject record offsets as determined by the batch coordinator, and append the finalized batch to a local (per topic partition) log segment on-disk.  At this point, a log segment file looks like a classic topic partition segment file. The difference is that they are not a source of durability, only a source for tiering and consumption. WAL segments remain in object storage until all batches of a segment have been tiered via tiered storage. Then WAL segments can be deleted. Stage 3 – Tiered storage Tiered storage continues to work as it does today (KIP-405), based on local log segments. It hopefully knows nothing of the Direct-to-S3 components and logic. Tiered segment metadata is stored in KRaft which allows for WAL segment deletion to be handled outside of the scope of tiered storage also. Fig KIP-1150-rev2-C. Tiered storage works as-is, based on local log segment files. Data is consumed from S3 topics from either: The local segments on-disk, populated from stage 2.  Tiered log segments (traditional tiered storage read pathway) End-to-end latency of any given batch is therefore based on: Produce batch added to buffer. WAL Segment containing that batch written to S3. Batch coordinates submitted to the Batch Coordinator for sequencing. Producer request acknowledged Tail, untiered (fast path for tailing consumers) -> Replica downloads WAL Segment slice. Replica appends the batch to a local (per topic partition) log segment. Replica serves a consumer fetch request from the local log segment. Tiered (slow path for lagging consumers) -> Remote Log Manager downloads tiered log segment Replica serves a consumer fetch request from the downloaded log segment. Avoiding excessive reads to S3 will be important in stage 2  (when a broker is downloading WAL segment files for its assigned topic partitions). This KIP should standardize how topic partitions are laid out inside every WAL segment and perform partition-broker assignments based on that same order: Pick a single global topic partition order (based on a permutation of a topic partition id). Partition that order into contiguous slices, giving one slice per broker as its assignment ( (a broker may get multiple topic partitions, but they must be adjacent in the global order). Lay out every WAL segment in that same global order. That way, each broker’s partition assignment will occupy one contiguous block per WAL Segment, so each read broker needs only one byte-range read per WAL segment object (possibly empty if none of its partitions appear in that object). This reduces the number of range reads per broker when reconstructing local log segments. By integrating with tiered storage and reconstructing log segments, revision 2 moves to a more diskful design, where disks form a step in the write path to long term storage. It is also more stateful and sticky than revision 1, given that each topic partition is assigned a specific broker for log segment reconstruction, tiering and consumer serving. Revision 2 remains leaderless for producers, but leaderful for consumers. Therefore to avoid cross-AZ traffic for consumer traffic, it will rely on KIP-881: Rack-aware Partition Assignment for Kafka Consumers to ensure zone-local consumer assignment. This makes revision 2 a hybrid. By delegating responsibility to tiered storage, more of the direct-to-S3 workload must be handled by stateful brokers. It is less able to benefit from the elasticity of disaggregated storage. But it reuses more of existing Kafka. The third revision, which at the time of writing is a loose proposal in the mailing list, ditches the Batch Coordinator (BC) altogether. Most of the complexity of KIP-1150 centers around Batch Coordinator efficiency, failovers, scaling as well as idempotency, transactions and share groups logic. Revision 3 proposes to replace the BCs with “classic” topic partitions. The classic topic partition leader replicas will do the work of sequencing and storing the batch coordinates of their own data. The data itself would live in SLSOs (rev1)/WAL Segments (rev2) and ultimately, as tiered log segments (tiered storage). To make this clear, if as a user you create the topic Orders with one partition, then an actual topic Orders will be created with one partition. However, this will only be used for the sequencing of the Orders data and the storage of its metadata. The benefit of this approach is that all the idempotency and transaction logic can be reused in these “classic-ish” topic partitions. There will be code changes but less than Batch Coordinators. All the existing tooling of moving partitions around, failovers etc works the same way as classic topics. So replication continues to exist, but only for the metadata. Fig KIP-1150-rev3-A. Replicated partitions continue to exist, acting as per-partition sequencers and metadata stores (replacing batch coordinators). One wrinkle this adds is that there is no central place to manage the clean-up of WAL Segments. Therefore a WAL File Manager component would have responsibility for background cleanup of those WAL segment files. It would periodically check the status of tiering to discover when a WAL Segment can get deleted. Fig KIP-1150-rev3-B. The WAL File Manager is responsible for WAL Segment clean up The motivation behind this change to remove Batch Coordinators is to simplify implementation by reusing existing Kafka code paths (for idempotence, transactions, etc.).  However, it also opens up a whole new set of challenges which must be discussed and debated, and it is not clear this third revision solves the complexity. Revision 3 now depends on the existing classic topics, with leader-follower replication. It moves a little further again towards the evolutionary path. It is curious to see Aiven productionizing its Kafka fork “Inkless”, which falls under the “revolutionary” umbrella, while pushing towards a more evolutionary stateful/sticky design in these later revisions of KIP-1150.  Apache Kafka is approaching a decision point with long-term implications for its architecture and identity. The ongoing discussions around KIP-1150 revisions 1-3 and KIP-1176 are nominally framed around replication cost reduction, but the underlying issue is broader: how should Kafka evolve in a world increasingly shaped by disaggregated storage and elastic compute? At its core, the choice comes down to two paths. The evolutionary path seeks to fit S3 topics into Kafka’s existing leader-follower framework, reusing current abstractions such as tiered storage to minimize disruption to the codebase. The revolutionary path instead prioritizes the benefits of building directly on object storage. By delegating to shared object storage, Kafka can support an S3 topic serving layer which is stateless, elastic, and disposable. Scaling coming by adding and removing stateless workers rather than rebalancing stateful nodes. While maintaining Kafka’s existing workloads with classic leader-follower topics. While the intentions and goals of the KIPs clearly fall on a continuum of revolutionary to evolutionary, the reality in the mailing list discussions makes everything much less clear. The devil is in the details, and as the discussion advances, the arguments of “simplicity through reuse” start to strain. The reuse strategy is a retrofitting strategy which ironically could actually make the codebase harder to maintain in the long term. Kafka’s existing model is deeply rooted in leader-follower replication, with much of its core logic built around that assumption. Retrofitting direct-to-S3 into this model forces some “unnatural” design choices. Choices that would not be made otherwise (if designing a cloud-native solution). My own view aligns with the more revolutionary path in the form of KIP-1150 revision 1 . It doesn’t simply reduce cross-AZ costs, but fully embraces the architectural benefits of building on object storage. With additional broker roles and groups, Kafka could ultimately achieve a similar elasticity to WarpStream (and Confluent Freight Clusters).  The approach demands more upfront engineering effort, may increase long-term maintenance complexity, but avoids tight coupling to the existing leader-follower architecture. Much depends on what kind of refactoring is possible to avoid the duplication of idempotency, transactions and share group logic. I believe the benefits justify the upfront cost and will help keep Kafka relevant in the decade ahead.  In theory, both directions are defensible, ultimately it comes down to the specifics of each KIP. The details really matter. Goals define direction, but it’s the engineering details that determine the system’s actual properties. We know the revolutionary path involves big changes, but the evolutionary path comes with equally large challenges, where retrofitting may ultimately be more costly while simultaneously delivering less. The committers who maintain Kafka are cautious about large refactorings and code duplication, but are equally wary of hacks and complex code serving two needs. We need to let the discussions play out. My aim with this post has been to take a step back from "how to implement direct-to-S3 topics in Kafka", and think more about what we want Kafka to be. The KIPs represent the how, the engineering choices. Framed that way, I believe it is easier for the wider community to understand the KIPs, the stakes and the eventual decision of the committers, whichever way they ultimately decide to go. Revolutionary : Choose a direct-to-S3 topic design that maximizes the benefits of an object-storage architecture, with greater elasticity and lower operational complexity. However, in doing so, we may increase the implementation cost and possibly the long-term code maintenance too by maintaining two very different topic-models in the same project (leader-based replication and direct-to-S3). Evolutionary : Shoot for an evolutionary design that makes use of existing components to reduce the need for large refactoring or duplication of logic. However, by coupling to the existing architecture, we forfeit the extra benefits of object storage, focusing primarily on networking cost savings (in AWS and GCP). Through this coupling, we also run the risk of achieving the opposite: harder to maintain code by bending and contorting a second workload into an architecture optimized for something else. The term “Diskless” vs “Direct-to-S3” The Common Parts. Some approaches are shared across multiple implementations and proposals. Revolutionary: KIPs and real-world implementations Evolutionary: Slack’s KIP-1176 The Hybrid: balancing revolution with evolution Deciding Kafka’s future Avoiding the small file problem . Most designs are leaderless for producer traffic, allowing for any server to receive writes to any topic. To avoid uploading a multitude of tiny files, servers accumulate batches in a buffer until ready for upload. Before upload, the buffer is sorted by topic id and partition, to make compaction and some reads more efficient by ensuring that data of the same topic and same partition are in contiguous byte ranges. Pricing . The pricing of many (but not all) cloud object storage services penalize excessive requests, so it can be cheaper to roll-up whatever data has been received in the last X milliseconds and upload it with a single request. Leaderless: The same way that producer alignment works via metadata manipulation or using KIP-392 (fetch from follower) which can be used in a leaderless context. Leader-based:  Zone-aware consumer group assignment as detailed in KIP-881: Rack-aware Partition Assignment for Kafka Consumers . The idea is to use consumer-to-partition assignment to ensure consumers are only assigned zone-local partitions (where the partition leader is located). KIP-392 (fetch-from-follower) , which is effective for designs that have followers (which isn’t always the case). WarpStream (as a reference, a kind of yardstick to compare against) KIP-1150 revision 1 Aiven Inkless (a Kafka-fork) Leaderless, stateless and diskless agents that handle Kafka clients, as well as compaction/cleaning work.  Coordination layer : A central metadata store for sequencing, metadata storage and housekeeping coordination. Sequencing . Chooses the total ordering for writes, assigning offsets without gaps or duplicates. Metadata storage . Stores all metadata that maps partition offset ranges to S3 object byte ranges. Serving lookup requests . Serving requests for log offsets. Serving requests for batch coordinates (S3 object metadata). Partition CRUD operations . Serving requests for atomic operations (creating partitions, deleting topics, records, etc.) Data expiration Managing data expiry and soft deletion. Coordinating physical object deletion (performed by brokers). When a producer starts, it sends a Metadata request to learn which brokers are the leaders of each topic partition it cares about. The Metadata response contains a zone-local broker. The producer sends all Produce requests to this broker. The receiving broker accumulates batches of all partitions and uploads a shared log segment object (SLSO) to S3. The broker commits the SLSO by sending the metadata of the SLSO (the metadata is known as the batch coordinates) to the Batch Coordinator. The coordinates include the S3 object metadata and the byte ranges of each partition in the object. The Batch Coordinator assigns offsets to the written batches, persists the batch coordinates, and responds to the broker. The broker sends all associated acknowledgements to the producers. The consumer sends a Fetch request to the broker. The broker checks its cache and on a miss, the broker queries the Batch Coordinator for the relevant batch coordinates. The broker downloads the data from object storage. The broker injects the computed offsets and timestamps into the batches (the offsets being part of the batch coordinates). The broker constructs and sends the Fetch response to the Consumer. logs . Stores the Log Start Offset and High Watermark of each topic partition, with the primary key of topic_id and partition. files . Lists all the objects that host the topic partition data. batches . Maps Kafka batches to byte ranges in Files.  producer_state . All the producer state needed for idempotency. Some other tables for housekeeping, and the merge work items. A broker opens a transaction and submits a table as an argument, containing the batch coordinates of the multiple topics uploaded in the combined file. The TVF logic creates a temporary table (logs_tmp) and fills it via a SELECT on the logs table, with the FOR UPDATE clause which obtains a row lock on each topic partition row in the logs table that matches the list of partitions being submitted. This ensures that other brokers that are competing to add batches to the same partition(s) queue up behind this transaction. This is a critical barrier that avoids inconsistency. These locks are held until the transaction commits or aborts. Next it, inside a loop, partition-by-partition, the TVF: Updates the producer state. Updates the high watermark of the partition (a row in the logs table). Inserts the batch coordinates into the batches table (sequencing and storing them). Commits the transaction. Maintain leader-based topic partitions (producers continue to write to leaders), but replace Kafka replication protocol with a per-broker S3-based write-ahead-log (WAL). Try to preserve existing partition replica code for idempotency and transactions. Reuse existing tiered storage for long-term S3 data management. Remove Batch Coordinators from the read path. Avoid separate object compaction logic by delegating long-term storage management to tiered storage (which already exists).  Rebuild per-partition log segments from combined objects in order to: Submit them for long-term tiering (works as a form of object compaction too). Serve consumer fetch requests. Stage 1 – Produce path, synchronous . Uploads multi-topic WAL Segments to S3 and sequences the batches, acknowledging to producers once committed. This is unchanged except SLSOs are now called WAL Segments. Stage 2 – Per-partition log segment file construction, asynchronous . Each broker is assigned a subset of topic partitions. The brokers download WAL segment byte ranges that host these assigned partitions and append to on-disk per-partition log segment files. Stage 3 – Tiered storage, asynchronous . The tiered storage architecture tiers the locally cached topic partition log segments files as normal. Stage 3, segment tiering (tiered storage). The read pathway for tailing consumers The local segments on-disk, populated from stage 2.  Tiered log segments (traditional tiered storage read pathway) Produce batch added to buffer. WAL Segment containing that batch written to S3. Batch coordinates submitted to the Batch Coordinator for sequencing. Producer request acknowledged Tail, untiered (fast path for tailing consumers) -> Replica downloads WAL Segment slice. Replica appends the batch to a local (per topic partition) log segment. Replica serves a consumer fetch request from the local log segment. Tiered (slow path for lagging consumers) -> Remote Log Manager downloads tiered log segment Replica serves a consumer fetch request from the downloaded log segment. Pick a single global topic partition order (based on a permutation of a topic partition id). Partition that order into contiguous slices, giving one slice per broker as its assignment ( (a broker may get multiple topic partitions, but they must be adjacent in the global order). Lay out every WAL segment in that same global order.

0 views
Marc Brooker 2 weeks ago

Fixing UUIDv7 (for database use-cases)

How do I even balance a V7? RFC9562 defines UUID Version 7. This has made a lot of people very angry and been widely regarded as a bad move 1 . More seriously, UUIDv7 has received a lot of criticism, despite seemingly achieving what it set out to do. The legitimate criticism seems to be on a few points. V7 UUIDs: Before thinking about how we might fix these issues, let’s understand why folks are drawn to UUIDv7. Most of the use-cases I see are related to increasing database insert performance. To quote the RFC: Time-ordered monotonic UUIDs benefit from greater database-index locality because the new values are near each other in the index. As a result, objects are more easily clustered together for better performance. The real-world differences in this approach of index locality versus random data inserts can be one order of magnitude or more. This effect is very real. Random DB keys like UUIDv4 destroy spatial locality ( as I’ve written about before ), making database caches less effective, almost always reducing insert performance, and reducing query performance where queries have substantial temporal locality. The slight upside to this is that they also avoid hot spotting in distributed or sharded architectures. Can we both have good insert performance and avoid the downsides of UUIDv7? Yes, I believe we can. Let’s keep the overall format from the RFC: Instead of the field being the unmodified , let’s replace it with: , where is a keyed hash function (more on that below), is a parameter that trades off locality and leading entropy, and is an arbitrary identifier for some unit of infrastructure (e.g. a database cluster id, customer id, region ID). The choice of is a trade off between ID spread and database cache locality. For most installations and query patterns, I’d expect to restore full insert performance. The function is a special keyed hash (such as an HMAC) of , keyed with , which is the XORed with the . That means that IDs are stable (and UUIDv7-like) for $2^N$ milliseconds. This means that all the pages they pull into the DB cache stay stable for $2^N$ milliseconds, where they can be hit over and over. After $2^N$ milliseconds, we need a new set of pages (unless they can all fit in cache). The use of an arbitrary along with a cryptographic hash function allows providers to choose the radius that they issue UUIDs over. An empty would produce a single global stream of UUIDs. A fine-grained per-client UUID would produce a per-client stream of UUIDs, at the cost of some locality. Per-server, per-cluster, per-AZ, and other scopes for s add flexibility to trade off between the locality advantages and disadvantages of UUIDs. Depending on how you read the RFC, this may be allowed in the letter of the law. It does say: Implementations MAY alter the actual timestamp. But it does seem to clearly violate the spirit. Still, I think this is a UUID format that avoids a lot of the downsides of UUIDv7, while keeping most of the database performance benefits. As for whether you should use UUID entropy for security, that’s a different topic. Leak information (namely the server timestamp). Are a bad choice for cases where security or operational requirements require UUIDs that are hard to guess, because they have less entropy. Introduce correlated behavior between datacenters, regions, and installations of applications, increasing the probability of triggering bugs across failure boundaries. Are hard to present in UIs, because the format doesn’t work, because of deterministic first digits. Not really, but I couldn’t resist the Guide reference. You should re-read the Guide.

0 views
Emil Privér 2 weeks ago

We Re-Built Our Integration Service Using Postgres and Go

Our integration service connects our platform to external systems. Earlier this year, we reached a scaling limit at 40 integrations and rebuilt it from the ground up. The service handles three primary responsibilities: sending data to external systems, managing job queues, and prioritizing work based on criticality. The original implementation functioned but had architectural constraints that prevented horizontal scaling. We use microservices because different components have conflicting requirements. The management API handles complex business logic with normalized schemas—separate tables for translations and categories. The public API optimizes for read performance under load, using denormalized data by adding translations directly into category tables and handling filtering in Go. A monolithic architecture would require compromising performance in one area to accommodate the other. The integration service currently processes millions of events daily, with volume increasing as we onboard new customers. This post describes our implementation of a queue system using PostgreSQL and Go, focusing on design decisions and technical trade-offs. The first implementation used GCP Pub/Sub, a topic-to-many-subscription service where messages are replicated across multiple queues. This architecture introduced several scalability issues. The integration service maintained a database for integration configurations but lacked ownership of its operational data. This violated a distributed systems principle: services should own their data rather than depend on other services for it. This dependency forced our management service to serialize complete payloads into the queue. Updating a single attribute on a sub-object required sending the entire parent object with all nested sub-objects, metadata, and relationships. Different external APIs have varying data requirements—some need individual sub-objects while others require complete hierarchies. For clients with records containing 300-500 sub-objects, this resulted in significant message size inflation. GCP charges by message size rather than count, making large messages substantially more expensive than smaller ones. GCP’s WebSocket delivery requires clients to buffer messages internally. With 40 integrations running separate consumers with filters, traffic spikes created memory pressure: This prevented horizontal scaling and limited us to vertical scaling approaches. External APIs enforce varying rate limits. Our in-memory rate limiter tracked requests per integration but prevented horizontal scaling since state couldn’t be shared across instances without risking rate limit violations. By early 2025, these issues had compounded: excessive message sizes increasing costs, memory bloat requiring oversized containers, vertical-only scaling, high operational expenses, rate limiting preventing horizontal scale, and lack of data independence. The system couldn’t accommodate our growth trajectory. A complete rebuild was necessary. The v2 design addressed specific limitations: Additional improvements: The standard approach involves the producer computing payloads and sending them to the queue for consumer processing. We used this in v1 but rejected it for v2. Customers frequently make multiple rapid changes to the same record—updating a title, then a price, then a description. Each change triggers an event. Instead of sending three separate updates, we consolidate changes into a single update. We implemented a in the jobs table. Multiple updates to the same record within a short time window are deduplicated into a single job, reducing load on both our system and recipient systems. We chose PostgreSQL as our queue backend for several reasons: Often, we think we need something bigger like Apache Kafka when a relational database like PostgreSQL is sufficient for our requirements. The jobs table structure: Each job tracks: Postgres-backed queues require careful indexing. We use partial indexes (with WHERE clauses) only for actively queried states: , , , and . We don’t index or states. These statuses contain the majority of jobs in the table and aren’t needed in the job processing flow. Indexing them would just add more data into the memory when we don’t use it in the flow. Jobs are ordered by for FIFO processing, with priority queue overrides when applicable. Jobs follow a defined lifecycle: Timestamp fields serve observability purposes, measuring job duration and identifying bottlenecks. For jobs, retry timing is calculated using exponential backoff. The worker system requirements: We evaluated two approaches: maintaining in-memory queues with multiple goroutines using for and select to fetch jobs, or having goroutines fetch data from the database and iterate over the results. We chose the database iteration approach for its simplicity. pgxpool handles connection pooling, eliminating the need for channel-based in-memory queues. Each worker runs in a separate goroutine, using a ticker to poll for jobs every second. Before processing, workers check for shutdown signals ( or channel). When shutdown is initiated, workers stop accepting new jobs and mark in-flight jobs as . This prevents stalled jobs from blocking integration queues. Checking shutdown signals between jobs ensures clean shutdowns. During shutdown, we create a fresh context with for retrying jobs. This prevents database write failures when the main context is canceled. The query implements fair scheduling to prevent high-volume integrations from monopolizing workers: Query breakdown: Step 1: Identify busy integrations This CTE identifies integrations with 50+ concurrent processing jobs. Step 2: Select jobs with priority ordering Jobs are selected from integrations not in the busy list. Priority updates are ordered first, followed by FIFO ordering. locks selected rows to the current transaction, preventing duplicate processing by concurrent workers. Step 3: Update job status Selected jobs are updated to status with a recorded start time. This ensures fair resource allocation across integrations. Job timeouts are critical for queue health. In the initial release, we reused the global context for job processing. When jobs hung waiting for slow external APIs, they couldn’t be marked completed or failed due to context lifecycle coupling. Jobs accumulated in state indefinitely. The solution: context separation. The global context controls worker lifecycle. Each job receives its own context with a timeout. Timed-out jobs are marked , allowing queue progression. This also enables database writes during shutdown using a fresh context, even when the global context is canceled. Failed jobs require retry logic with appropriate timing. Immediate retries against failing external APIs are counterproductive. We implement exponential backoff: instant first retry, 10 seconds for the second, 30 seconds for the third, up to 30 minutes. The field drives backoff calculation. After 10 attempts, jobs are marked . Error types guide retry behavior: This allows each integration to decide how to handle errors based on the external API’s response. For example, a 400 Bad Request might be a permanent validation failure (NonRetryableError), while a 503 Service Unavailable is transient and should retry (RetryableError). The integration implementation determines the appropriate error type for each scenario. Jobs occasionally become stuck in state due to worker panics, database connection failures, or unexpected container termination. A cron job runs every minute, identifying jobs in state beyond the expected duration. These jobs are moved to with incremented retry counts, treating them as standard failures. This ensures queue progression despite unexpected failures. Rate limiting across multiple containers was v2’s most complex challenge. V1’s in-memory rate limiter worked for single containers but couldn’t share state across instances. While Redis was an option, we already had PostgreSQL with sufficient performance. The solution: a table tracking request counts per integration per second: Before external API requests, we increment the counter for the integration’s current time window (rounded to the second). PostgreSQL returns the new count. If the count exceeds the limit, we sleep 250ms and retry. If under the limit, we proceed. This works because all containers share the database as the source of truth for rate limiting. Occasionally, jobs are rate-limited during heavy load due to the gap between count checking and request sending. These jobs retry immediately. The occurrence rate is acceptable. Hope you enjoyed this article and learned something new. This system has worked really well so far, and we’ve had only a few minor issues that we fixed quickly. I will update this article over time. Mass updates generate large objects per record Objects are duplicated for each configured integration Copies buffer across 5-10 consumer instances Infrastructure requires 2GB RAM and 2 cores to handle spikes, despite needing only 512MB and 1 core during normal operation Horizontal scaling - Enable scaling across multiple containers Distributed rate limiting - Coordinate rate limits across instances Data ownership - Store operational data within the service Delta updates - Send only changed data rather than complete records Fair scheduling - Prevent single integrations from monopolizing resources Priority queuing - Process critical updates before lower-priority changes Self-service re-sync - Enable customers to re-sync catalogs independently Visibility - Provide APIs for customers to monitor sent data and queue status Performance - PostgreSQL is fast enough for our use case. We don’t need sub-second message delivery. Simplicity - Using a managed PostgreSQL instance on GCP is significantly simpler than introducing new infrastructure. Familiarity - Most developers understand SQL, reducing onboarding time. Existing infrastructure - We already use PostgreSQL for our data, eliminating the need for additional systems. - Links logs across services - Specifies the action (e.g., ) - Records failure details - Tracks current workflow state - Counts retry attempts - Schedules next retry , , - Provides metrics for observability - Links to specific integrations - Identifies the platform - Contains job data - Prevents duplicate execution Created → Initial state: Picked up → Transitions to Success → Becomes , records Failed (10 retries) → Becomes , records Failed (retries remaining) → Becomes , increments , calculates Parallel worker execution Horizontal scaling across containers Graceful shutdowns without job loss Distributed rate limit enforcement—we need to respect rate limits no matter how many containers we run - Permanent failures (e.g., validation errors). No retry. - Transient failures (e.g., 500 Internal Server Error). Retry with backoff. - Retry limit reached. Mark failed.

0 views
Jack Vanlightly 3 weeks ago

Why I’m not a fan of zero-copy Apache Kafka-Apache Iceberg

Over the past few months, I’ve seen a growing number of posts on social media promoting the idea of a “zero-copy” integration between Apache Kafka and Apache Iceberg. The idea is that Kafka topics could live directly as Iceberg tables. On the surface it sounds efficient: one copy of the data, unified access for both streaming and analytics. But from a systems point of view, I think this is the wrong direction for the Apache Kafka project. In this post, I’ll explain why.  Zero-copy is a bit of a marketing buzzword. I prefer the term shared tiering . The idea behind shared tiering is that Apache Kafka tiers colder data to an Apache Iceberg table instead of tiering closed log segment files. It’s called shared tiering because the tiered data serves both Kafka and data analytics workloads.  The idea has been popularized recently in the Kafka world by Aiven, with a tiered storage plugin for Apache Kafka that adds Iceberg table tiering to the tiered storage abstraction inside Kafka brokers. But before we understand shared tiering we should understand the difference between tiering and materialization: Tiering is about moving data from one storage tier (and possibly storage format) to another, such that both tiers are readable by the source system and data is only durably stored in one tier. While the system may use caches, only one tier is the source of truth for any given data item. Usually it’s about moving data to a cheaper long-term storage tier.  Materialization is about making data of a primary system available to a secondary system , by copying data from the primary storage system (and format) to the secondary storage system, such that both data copies are maintained (albeit with different formats). The second copy is not readable from the source system as its purpose is to feed another data system. Copying data to a lakehouse for access by various analytics engines is a prime example. Fig 1. Tiering vs materialization There are two types of tiering: Internal Tiering is data tiering where only the primary data system can access the various storage tiers. For example, Kafka tiered storage is internal tiering. These internal storage tiers form the primary storage as a whole. Shared Tiering is data tiering where one or more data tiers is shared between multiple systems. The result is a tiering-materialization hybrid, serving both purposes. Tiering to a lakehouse is an example of shared tiering. Fig 2. Shared tiering makes the long-term storage a shared storage layer for two or more systems. There are two broad approaches to populating an Iceberg table from a Kafka topic: Internal Tiering + Materialization Shared Tiering Option 1: Internal Tiering + Materialization where Kafka continues to use traditional tiered storage of log segment files (internal tiering) and also materializes the topic as an Iceberg table (such as via Kafka Connect). Catch-up Kafka consumers will be served from tiered Kafka log segments, whereas compute engines such as Spark will use the Iceberg table. Fig 3. Internal Tiering + Materialization Option 2: Shared Tiering (zero-copy) where Kafka tiers topic data to Iceberg directly. The Iceberg table will serve both Kafka brokers (for catch-up Kafka consumers) and analytics engines such as Spark and Flink. Fig 4. Kafka tiers data directly to Iceberg. Iceberg is shared between Kafka and analytics. On the surface, shared tiering sounds attractive: one copy of the data, lower storage costs, no need to keep two copies of the data in sync. But the reality is more complicated.  Zero-copy shared tiering appears efficient at first glance as it eliminates duplication between Kafka and the data lake.  However, rather than simply taking one cost away, it shifts that cost from storage to compute. By tiering directly to Iceberg, brokers take on substantial compute overhead as they must both construct columnar Parquet files from log segment files (instead of simply copying them to object storage), and they must download Parquet files and convert them back into log segment files to serve lagging consumers. Richie Artoul of WarpStream blogged about why tiered storage is such a bad place to put Iceberg tiering work: “ First off, generating parquet files is expensive. Like really expensive. Compared to copying a log segment from the local disk to object storage, it uses at least an order of magnitude more CPU cycles and significant amounts of memory. That would be fine if this operation were running on a random stateless compute node, but it’s not, it’s running on one of the incredibly important Kafka brokers that is the leader for some of the topic-partitions in your cluster. This is the worst possible place to perform computationally expensive operations like generating parquet files. ” – Richard Artoul, The Case for an Iceberg-Native Database: Why Spark Jobs and Zero-Copy Kafka Won’t Cut It Richard is pretty sceptical of tiered storage in general, a sentiment which I don’t share so much, but where we agree is that Parquet file writing is far more expensive than log segment uploads. Things can get worse (for Kafka) when we consider optimizing the Iceberg table for analytics queries. I recently wrote about how open table formats optimize query performance. “ Ultimately, in open table formats, layout is king. Here, performance comes from layout (partitioning, sorting, and compaction) which determines how efficiently engines can skip data… Since the OTF table data exists only once, its data layout must reflect its dominant queries. You can’t sort or cluster the table twice without making a copy of it. ” – Beyond Indexes: How Open Table Formats Optimize Query Performance Layout, layout, layout. Performance comes from pruning, pruning efficiency comes from layout. So what layout should we choose for our shared Iceberg tables? To serve Kafka consumers efficiently, the data must be laid out in offset order. Each Parquet file would contain contiguous ranges of offsets of one or more topic partitions. This is ideal for Kafka as it needs only to fetch individual Parquet files or even row groups within files, in order to reconstruct a log segment to serve a lagging consumer. Best case is a 1:1 mapping from log-segment to Parquet file; otherwise read amplification grows quickly. Fig 5. Column statistics and Iceberg partitions clearly tell Kafka where a specific topic partition offset range lives. But these pruning statistics are not useful for analytics queries. However, for the analytics query, there is no useful pruning information in this layout, leading to large table scans. Queries like WHERE EventType = 'click' must read every file, since each one contains a random mixture of event types. Min/max column statistics on columns such as EventType are meaningless for predicate pushdown, so you lose all the performance advantages of Iceberg’s table abstraction. An analytics-optimized layout, by contrast, partitions and sorts data by business dimensions, such as EventType, Region and EventTime. Files are rewritten or merged to tighten column statistics and make pruning effective. That makes queries efficient: scans touch only a handful of partitions, and within each partition, min/max stats allow skipping large chunks of data. Fig 6. Column statistics and partitions clearly tell analytics queries that want to slice and dice by EventType and Region how to prune Parquet files. But these pruning statistics are not useful for Kafka at all, which must do a costly table scan (from a Kafka broker) to find a specific offset range to rebuild a log segment. With this analytics optimized layout, a lagging Kafka consumer is going to cause a Kafka broker to have a really bad day. Offset order no longer maps to file order. To fetch an offset range, the broker may have to fetch dozens of Parquet files (as the offset range is spread across files) due to analytics workload driven choices for partition scheme and sort orders. The read path becomes highly fragmented, resulting in massive read amplification. All of this downloading, scanning and segment reconstruction is happening in the Kafka broker. That’s a lot more work for Kafka brokers and it all costs a lot of CPU and memory. Normally I would say: given two equally important workloads, optimize for the workload that is less predictable, in this case, the lakehouse. The Kafka workload is sequential and therefore predictable, so we can use tricks such as read-aheads. Analytics workloads are far less predictable, slicing and dicing the data in different ways, so optimize for that.  But this is simply not practical for Apache Kafka. We can’t ask it to reconstruct sequential log segments from Parquet files that have been completely reorganized into a different data distribution. The best we could do is some half-way house, firmly leaning towards Kafka’s needs. Basically, partitioning by ingestion time (hour or date) so we preserve the sequential nature of the data and hope that most analytical queries only touch recent data. The alternative is to use the Kafka Iceberg table as a staging table for incrementally populating a cleaner (silver) table, but then haven’t we just made a copy anyway? In practice, the additional compute and I/O overhead can easily offset the storage savings, leading to a less efficient and less predictable performance profile overall. How we optimize the Iceberg table is critical to the efficiency of both the Kafka brokers, but also the analytics workload. Without separation of concerns, each workload is trapped by the other.  The first issue is that the schema of the Kafka topic may not even be suitable for the Iceberg table, for example, a CDC stream with before and after payloads. When we materialize these streams as tables, we don’t materialize them in their raw format, we use the information to materialize how the data ended up. But shared tiering requires that we write everything, every column, every header field and so on. For CDC, we’d then do some kind of MERGE operation from this topic table into a useful table, which reintroduces a copy. But it is evolution that concerns me more. When materializing Kafka data into an Iceberg table for long-term storage, schema evolution becomes a challenge. A Kafka topic’s schema often changes over time as new fields are added and others are renamed or deprecated, but the historical data still exists in the old shape. A Kafka topic partition is an immutable log where events remain in their original form until expired. If you want to retain all records forever in Iceberg, you need a strategy that allows old and new events to coexist in a consistent, queryable way. We need to avoid cases where either queries fail on missing fields or historical data becomes inaccessible for Kafka. Let’s look at two approaches to schema evolution of tables that are tiered Kafka topics. One common solution is to build an uber-schema , which is essentially the union of all fields that have ever existed in the Kafka topic schema. In this model, the Iceberg table schema grows as new Kafka fields are introduced, with each new field added as a nullable column. Old records simply leave these columns null, while newer events populate them. Deprecated fields remain in the schema but stay null for modern data. This approach preserves history in its original form, keeps ingestion pipelines simple, and leverages Iceberg’s built-in schema evolution capabilities.  The trade-off is that the schema can accumulate many rarely used columns, and analysts must know which columns are meaningful for which time ranges. Views can help, performing the necessary coalesces and such like to turn a messy set of columns into something cleaner. But it would require careful maintenance. The alternative is to periodically migrate old data forward into the latest schema. Instead of carrying every historical field forever, old records are rewritten so that they conform to the current schema.  Missing values may be backfilled with defaults, derived from other fields, or simply set to null. No longer used columns are dropped. This limits the messiness of the table’s schema over time, turning it from a shameful mess of nullable columns into something that looks reasonable. While this strategy produces a tidier schema and simplifies querying, it can be complex to implement. No vendors support this approach that I have seen, leaving this as a job for their customers, who know their data best. From the lakehouse point of view, we’d prefer to use the migrate-forward approach over the long term, but use the uber-schema in the shorter term. Once we know that no more producers are using an old schema, we can migrate the rows of that schema forward. But that migration can cause a loss of fidelity for the Kafka workload. What gets written to Kafka may not be what gets read back. This could be a good thing or a bad thing. Good if you want to go back and reshape old events to more closely match the newest schema. Really bad for some compliance and auditing purposes; imagine telling your stakeholders/auditors that we can’t guarantee the data that is read from Kafka will be the same as was written to Kafka! The needs of SQL-writing analysts and the needs of Kafka conflict. Kafka wants fidelity (and doesn’t care about column creep) and Iceberg wants clean tables. In some cases we might have a reprieve if Kafka clients only need to consume 7 days or 30 days of data, then we can use the superset method for the period that covers Kafka, and use the migrate-forward method for the rest. But we are still coupling the needs of Kafka clients with lakehouse clients. If Kafka only needs 7 days of data and Iceberg is storing years worth, then why do we care about data duplication anyway? Zero-copy may reduce data duplication but does not necessarily reduce cost. It shifts cost from storage to compute and operational overhead. When Kafka brokers handle Iceberg files directly, they assume responsibilities that are both CPU and I/O intensive: file format conversion, table maintenance, and reconstruction of ordered log segments. The result is higher broker load and less predictable performance. Any storage savings are easily offset by the increased compute requirements. On the subject of data duplication, there are reasons to believe that Kafka Iceberg tables may degenerate into staging tables, due to the unoptimized layout and proliferation of columns over time as schemas change. Such usage as staging tables eliminates the duplication argument. But even so, there is already plenty of duplication going on in the lakehouse. We already have copies. We have bronze then silver tables. We have all kinds of staging tables already. How much impact does the duplication avoidance of shared tiering actually make? Also consider: With a good materializer, we can write to silver tables directly rather than using bronze tables as tiered Kafka data (especially in the CDC case). A good materializer can also reduce data duplication. Usually, Kafka topic retention is only a few days, or up to a month, so the scope for duplication is limited to the Kafka retention period. Bidirectional fidelity is a real concern when converting between formats like Avro and formats like Parquet, which have different type systems and evolution rules. Storing the original Kafka bytes as a column in the Iceberg table is a legitimate approach as an insurance against conversion issues (but involves a copy). The deeper problem with zero-copy tiering is its erosion of boundaries and the resulting coupling. When Kafka uses Iceberg tables as its primary storage layer, that boundary disappears. Who is responsible for the Iceberg table? If Kafka doesn’t take on ownership, then Kafka becomes vulnerable to the decisions of whoever owns and maintains the Iceberg table. After all, the Iceberg table will host the vast majority of Kafka’s data! Who’s on call if tiered data can’t be read? Kafka engineers? Lakehouse engineers? Will they cooperate well? The mistake of this zero-copy thing is that it assumes that Kafka/Iceberg unification requires a single physical representation of the data. But unification is better served as logical and semantic unification when the workloads differ so much. A logical unification of Kafka and Iceberg allows for the storage of each workload to remain separate and optimized. Physical unification removes flexibility by adding coupling (the traps). Once Kafka and Iceberg share the same storage, each system constrains the other’s optimization and evolution. The result is a single, entangled system that must manage two storage formats and be tuned for two conflicting workloads, making both less reliable and harder to operate.  This entanglement also leads to Kafka becoming a data lakehouse manager, which I believe is a mistake for the project. Once Kafka stores its data in Iceberg, it must take ownership of that data. I don’t agree with that direction. There are both open-source and SaaS data lakehouse platforms which include table maintenance, as well as access and security controls. Let lakehouses do lakehouse management, and leave Kafka to do what it does best. Materialization isn’t perfect, but it decouples concerns and avoids forcing Kafka to become a lakehouse. The benefits of maintaining storage decoupling are: Lakehouse performance optimization doesn’t penalize the Kafka workload or vice-versa.  Kafka can use a native log-centric tiering mechanism that does not overly burden brokers. Schema evolution of the lakehouse does not impact Kafka topics, providing the lakehouse with more flexibility leading to cleaner tables. There is less risk and overhead associated with bidirectional conversion between Kafka and Iceberg and back again. 100% fidelity is a concern to all system builders working in this space. Materialized data can be freely projected and transformed, as it does not form the actual Kafka data. Kafka does not depend on the materialized data at all. Apache Kafka doesn’t need to perform lakehouse management itself. The drawbacks are: we need two data copies but hopefully I’ve argued why that isn’t the biggest concern here. Duplication is already a fact of life in modern data platforms. Tools such as Kafka Connect and Apache Flink already exist for materializing Kafka data in secondary systems. They move data across boundaries in a controlled, one-way flow, preserving clear ownership and interfaces on each side. Modern lakehouse platforms provide managed tables with ingestion APIs (such as Snowpipe Streaming, Databricks Unity Catalog REST API, and others). Kafka Connect can work really well with these ingestion APIs. Flink also has sinks for Iceberg, Delta, Hudi and Paimon. We don’t need Kafka brokers to do this work and do maintenance on top of that. Unifying operational and analytical systems doesn’t mean merging their physical storage into one copy. It’s about logical and semantic unification, achieved through consistent schemas and reliable data movement, not shared files. It is tempting to put everything into one cluster, but separation of concerns is what keeps complex systems comprehensible, performant and resilient. UPDATE: I’ve been asked a few times about Confluent Tableflow and how it aligns with this discussion. I didn't include Tableflow as it is a proprietary component of Confluent Cloud, whereas this post is focused on Apache Kafka. But for what’s it’s worth, Tableflow falls under the materialization approach, acting both as a sophisticated Iceberg/Delta materializer and as a table maintenance service. It is operated as a separate component (separate from the Kora brokers). In the future it will support more sophisticated data layout optimization for analytics workloads. It does not perform zero-copy for many of the reasons why zero-copy isn’t the right choice for Apache Kafka. Nor does it execute within brokers, being a separate service that is independently scalable. Tiering is about moving data from one storage tier (and possibly storage format) to another, such that both tiers are readable by the source system and data is only durably stored in one tier. While the system may use caches, only one tier is the source of truth for any given data item. Usually it’s about moving data to a cheaper long-term storage tier.  Materialization is about making data of a primary system available to a secondary system , by copying data from the primary storage system (and format) to the secondary storage system, such that both data copies are maintained (albeit with different formats). The second copy is not readable from the source system as its purpose is to feed another data system. Copying data to a lakehouse for access by various analytics engines is a prime example. Internal Tiering is data tiering where only the primary data system can access the various storage tiers. For example, Kafka tiered storage is internal tiering. These internal storage tiers form the primary storage as a whole. Shared Tiering is data tiering where one or more data tiers is shared between multiple systems. The result is a tiering-materialization hybrid, serving both purposes. Tiering to a lakehouse is an example of shared tiering. Internal Tiering + Materialization Shared Tiering Missing values may be backfilled with defaults, derived from other fields, or simply set to null. No longer used columns are dropped. With a good materializer, we can write to silver tables directly rather than using bronze tables as tiered Kafka data (especially in the CDC case). A good materializer can also reduce data duplication. Usually, Kafka topic retention is only a few days, or up to a month, so the scope for duplication is limited to the Kafka retention period. Bidirectional fidelity is a real concern when converting between formats like Avro and formats like Parquet, which have different type systems and evolution rules. Storing the original Kafka bytes as a column in the Iceberg table is a legitimate approach as an insurance against conversion issues (but involves a copy). Lakehouse performance optimization doesn’t penalize the Kafka workload or vice-versa.  Kafka can use a native log-centric tiering mechanism that does not overly burden brokers. Schema evolution of the lakehouse does not impact Kafka topics, providing the lakehouse with more flexibility leading to cleaner tables. There is less risk and overhead associated with bidirectional conversion between Kafka and Iceberg and back again. 100% fidelity is a concern to all system builders working in this space. Materialized data can be freely projected and transformed, as it does not form the actual Kafka data. Kafka does not depend on the materialized data at all. Apache Kafka doesn’t need to perform lakehouse management itself.

0 views

Arrows to Arrows, Categories to Queries

I’ve had a little time off of work as of late, and been spending it in characteristically unwise ways. In particular, I’ve written a little programming language that compiles to SQL . I call it catlang . That’s not to say that I’ve written a new query language. It’s a programming language, whose compiler spits out one giant statement. When you run that query in postgres, you get the output of your program. Why have I done this? Because I needed a funny compilation target to test out the actual features of the language, which is that its intermediary language is a bunch of abstract category theory nonsense. Which I’ll get to. But I’m sure you first want to see this bad boy in action. Behold, the function that returns 100 regardless of what input you give it. But it does it with the equivalent of a while loop: If you’re familiar with arrow notation , you’ll notice the above looks kinda like one big block. This is not a coincidence (because nothing is a coincidence). I figured if I were to go through all of this work, we might as well get a working arrow desugarer out of the mix. But I digress; that’s a story for another time. Anyway, what’s going on here is we have an arrow , which takes a single argument . We then loop, starting from the value of . Inside the loop, we now have a new variable , which we do some voodoo on to compute —the current value of the loop variable. Then we subtract 100 from , and take the absolute value. The function here is a bit odd; it returns if the input was negative, and otherwise. Then we branch on the output of , where and have been renamed and respectively. If was less than zero, we find ourselves in the case, where we add 1 to and wrap the whole thing in —which the loop interprets as “loop again with this new value.” Otherwise, was non-negative, and so we can return directly. Is it roundabout? You bet! The obtuseness here is not directly a feature, I was just looking for conceptually simple things I could do which would be easy to desugar into category-theoretical stuff. Which brings us to the intermediary language. After desugaring the source syntax for above, we’re left with this IL representation: We’ll discuss all of this momentarily, but for now, just let your eyes glaze over the pretty unicode. The underlying idea here is that each of these remaining symbols has very simple and specific algebraic semantics. For example, means “do and pipe the result into .” By giving a transformation from this categorical IL into other domains, it becomes trivial to compile catlang to all sorts of weird compilation targets. Like SQL. You’re probably wondering what the generated SQL looks like. Take a peek if you dare. It’s not pretty, rather amazingly, running the above query in postgres 17 will in fact return a single row with a single column whose value is 100. And you’d better believe it does it by actually looping its way up to 100. If you don’t believe me, make the following change: which will instead return a row for each step of the iteration. There are some obvious optimizations I could make to the generated SQL, but it didn’t seem worth my time, since that’s not the interesting part of the project. Let’s take some time to discuss the underlying category theory here. I am by no means an expert, but what I have learned after a decade of bashing my head against this stuff is that a little goes a long way. For our intents and purposes, we have types, and arrows (functions) between types. We always have the identity “do nothing arrow” : and we can compose arrows by lining up one end to another: 1 Unlike Haskell (or really any programming language, for that matter), we DO NOT have the notion of function application. That is, there is no arrow: You can only compose arrows, you can’t apply them. That’s why we call these things “arrows” rather than “functions.” There are a bundle of arrows for working with product types. The two projection functions correspond to and , taking individual components out of pairs: How do we get things into pairs in the first place? We can use the “fork” operation, which takes two arrows computing and , and generates a new arrow which generates a pair of : If you’re coming from a Haskell background, it’s tempting to think of this operation merely as the pair constructor. But you’ll notice from the type of the computation that there can be no data dependency between and , thus we are free to parallelize each side of the pair. In category theory, the distinction between left and right sides of an arrow is rather arbitrary. This gives rise to a notion called duality where we can flip the arrows around, and get cool new behavior. If we dualize all of our product machinery, we get the coproduct machinery, where a coproduct of and is “either or , but definitely not both nor neither.” Swapping the arrow direction of and , and replacing with gives us the following injections: and the following “join” operation for eliminating coproducts: Again, coming from Haskell this is just the standard function. It corresponds to a branch between one of two cases. As you can see, with just these eight operations, we already have a tremendous amount of expressivity. We can express data dependencies via and branching via . With we automatically encode opportunities for parallelism, and gain the ability to build complicated data structures, with and allowing us to get the information back out of the data structures. You’ll notice in the IL that there are no variable names anywhere to be found. The desugaring of the source language builds a stack (via the pattern), and replaces subsequent variable lookups with a series of projections on the stack to find the value again. On one hand, this makes the categorical IL rather hard to read, but it makes it very easy to re-target! Many domains do have a notion of grouping, but don’t have a native notion of naming. For example, in an electronic circuit, I can have a ribbon of 32 wires which represents an . If I have another ribbon of 32 wires, I can trivially route both wires into a 64-wire ribbon corresponding to a pair of . By eliminating names before we get to the IL, it means no compiler backend ever needs to deal with names. They can just work on a stack representation, and are free to special-case optimize series of projections if they are able to. Of particular interest to this discussion is how we desugar loops in catlang. The underlying primitive is : which magically turns an arrow on s into an arrow without the eithers. We obviously must run that arrow on eithers. If that function returns , then we’re happy and we can just output that. But if the function returns , we have no choice but to pass it back in to the eithered arrow. In Haskell, cochoice is implemented as: which as you can see, will loop until finally returns a . What’s neat about this formulation of a loop is that we can statically differentiate between our first and subsequent passes through the loop body. The first time through is , while for all other times it is . We don’t take advantage of it in the original program, but how many times have you written loop code that needs to initialize something its first time through? So that’s the underlying theory behind the IL. How can we compile this to SQL now? As alluded to before, we simply need to give SQL implementations for each of the operations in the intermediary language. As a simple example, compiles to , where is the input of the arrow. The hardest part here was working out a data representation. It seems obvious to encode each element of a product as a new column, but what do we do about coproducts? After much work thought, I decided to flatten out the coproducts. So, for example, the type: would be represented as three columns: with the constraint that exactly one of or would be at any given point in time. With this hammered out, almost everything else is pretty trivial. Composition corresponds to a nested query. Forks are s which concatenate the columns of each sub-query. Joins are s, where we add a clause to enforce we’re looking at the correct coproduct constructor. Cochoice is the only really tricky thing, but it corresponds to a recursive CTE . Generating a recursive CTE table for the computation isn’t too hard, but getting the final value out of it was surprisingly tricky. The semantics of SQL tables is that they are multisets and come with an arbitrary greatest element. Which is to say, you need an column structured in a relevant way in order to query the final result. Due to some quirks in what postgres accepts, and in how I structured my queries, it was prohibitively hard to insert a “how many times have I looped” column and order by that. So instead I cheated and added a column which looks at the processor clock and ordered by that. This is clearly a hack, and presumably will cause problems if I ever add some primitives which generate more than one row, but again, this is just for fun and who cares. Send me a pull request if you’re offended by my chicanery! I’ve run out of vacation time to work on this project, so I’m probably not going to get around to the meta-circular stupidity I was planning. The compiler still needs a few string-crunching primitives (which are easy to add), but then it would be simple to write a little brainfuck interpreter in catlang. Which I could then compile to SQL. Now we’ve got a brainfuck interpreter running in postgres. Of course, this has been done by hand before, but to my knowledge, never via compilation. There exist C to brainfuck compilers. And postgres is written in C. So in a move that would make Xzibit proud, we could run postgres in postgres. And of course, it would be fun to run brainfuck in brainfuck. That’d be a cool catlang backend if someone wanted to contribute such a thing. I am not the first person to do anything like this. The source language of catlang is heavily inspired by Haskell’s arrow syntax , which in turn is essentially a desugaring algorithm for Arrows . Arrows are slightly the wrong abstraction because they require an operation —which requires you to be able to embed Haskell functions in your category, something which is almost never possible. Unfortunately, arrow syntax in Haskell desugars down to for almost everything it does, which in turn makes arrow notation effectively useless. In an ideal world, everything I described in this blog post would be a tiny little Haskell library, with arrow notation doing the heavy lifting. But that is just not the world we live in. Nor am I the first person to notice that there are categorical semantics behind programming languages. I don’t actually know whom to cite on this one, but it is well-established folklore that the lambda calculus corresponds to cartesian-closed categories . The “closed” part of “cartesian-closed” means we have an operation , but everyone and their dog has implemented the lambda calculus, so I thought it would be fun to see how far we can get without it. This is not a limitation on catlang’s turing completeness (since gives us everything we need.) I’ve been thinking about writing a category-first programming language for the better part of a decade, ever since I read Compiling to Categories . That paper takes Haskell and desugars it back down to categories. I stole many of the tricks here from that paper. Anyway. All of the code is available on github if you’re interested in taking a look. The repo isn’t up to my usual coding standards, for which you have my apologies. Of note is the template-haskell backend which can spit out Haskell code; meaning it wouldn’t be very hard to make a quasiquoter to compile catlang into what Haskell’s arrow desugaring ought to be. If there’s enough clamor for such a thing, I’ll see about turning this part into a library. When looking at the types of arrows in this essay, we make the distinction that are arrows that we can write in catlang, while exist in the metatheory. ↩︎ When looking at the types of arrows in this essay, we make the distinction that are arrows that we can write in catlang, while exist in the metatheory. ↩︎

1 views

Optimizing Datalog for the GPU

Optimizing Datalog for the GPU Yihao Sun, Ahmedur Rahman Shovon, Thomas Gilray, Sidharth Kumar, and Kristopher Micinski ASPLOS'25 Datalog source code comprises a set of relations, and a set of rules. A relation can be explicitly defined with a set of tuples. A running example in the paper is to define a graph with a relation named : A relation can also be implicitly defined with a set of rules. The paper uses the relation as an example: Rule 1 states that two vertices ( and ) are part of the same generation if they both share a common ancestor ( ), and they are not actually the same vertex ( ). Rule 2 states that two vertices ( and ) are part of the same generation if they have ancestors ( and ) from the same generation. “Running a Datalog program” entails evaluating all rules until a fixed point is reached (no more tuples are added). One key idea to internalize is that evaluating a Datalog rule is equivalent to performing a SQL join. For example, rule 1 is equivalent to joining the relation with itself, using as the join key, and as a filter. Semi-naïve Evaluation is an algorithm for performing these joins until convergence, while not wasting too much effort on redundant work. The tuples in a relation are put into three buckets: : holds tuples that were discovered on the current iteration holds tuples which were added in the previous iteration : holds all tuples that have been found in any iteration For a join involving two relations ( and ), is computed as the union of the result of 3 joins: joined with joined with joined with The key fact for performance is that is never joined with . More details on Semi-naïve Evaluation can be found in these notes . This paper introduces the hash-indexed sorted array for storing relations while executing Semi-naïve Evaluation on a GPU. It seems to me like this data structure would work well on other chips too. Fig. 2 illustrates the data structure (join keys are in red): Source: https://dl.acm.org/doi/10.1145/3669940.3707274 The data array holds the actual tuple data. It is densely packed in row-major order. The sorted index array holds pointers into the data array (one pointer per tuple). These pointers are lexicographically sorted (join keys take higher priority in the sort). The hash table is an open-addressed hash table which maps a hash of the join keys to the first element in the sorted index array that contains those join keys. A join of relations A and , can be implemented with the following pseudo-code: Memory accesses when probing through the sorted index array are coherent. Memory accesses when accessing the data array are coherent up to the number of elements in a tuple. Table 3 compares the results from this paper (GPULog) against a state-of-the-art CPU implementation (Soufflé). HIP represents GPULog ported to AMD’s HIP runtime and then run on the same Nvidia GPU. Source: https://dl.acm.org/doi/10.1145/3669940.3707274 Dangling Pointers The data structure and algorithms described by this paper seem generic, it would be interesting to see them run on other chips (FPGA, DPU, CPU, HPC cluster). I would guess most of GPULog is bound by memory bandwidth, not compute. I wonder if there are Datalog-specific algorithms to reduce the bandwidth/compute ratio. Subscribe now : holds tuples that were discovered on the current iteration holds tuples which were added in the previous iteration : holds all tuples that have been found in any iteration joined with joined with joined with

2 views
Jeremy Daly 3 weeks ago

Announcing Data API Client v2

A complete TypeScript rewrite with drop-in ORM support, full mysql2/pg compatibility layers, and smarter parsing for Aurora Serverless v2's Data API.

0 views