Setting the Scene
Nowadays, there is an increasing number of business-critical services across different industries that need to process data quicker than ever to be able to react to changing conditions with very low latencies. Irrespective of the concrete technology stack we use for near real-time stream processing, there are two broad categories of operations that we want to apply on never-ending streams of data, both of which are vastly different:
First, some applications process single messages, one by one in a stateless fashion. Examples would be to filter records according to pre-defined criteria or to apply a set of transformations on every message that passes by. Such types of workloads are straight-forward to reason about since their stateless nature lets us process a message independently from any other message that we have already acted upon or might do so in the future.
Second, there are applications which are a completely different "beast" since they need to deal with state management along the way. Examples for stateful operations in the context of stream processing are aggregations, counting, joining streams of data or keeping track of messages within certain timespans a.k.a windowing. The hardest parts to get right in this regard are related to fault-tolerance and scalability of said state.
So yes, efficient and reliable state management is a rather tricky undertaking comprising aspects like state growing very large - beyond the available memory - and at the same time guaranteeing that the maintained state stays consistent and isn't lost in case of certain failure scenarios.
Luckily, most of these challenges are typically taken care of to a very large degree by the stream processing frameworks and technologies that we use - so don't be scared away. Instead, let's take a brief look at how Apache Kafka Streams is dealing with state and how it helps to take off most of the burden from our shoulders.
In case you are already familiar with state management in Kafka Streams applications feel free to skip the following section and dive right into this blog post's matter.
State Management in Apache Kafka Streams
Whenever our stream processing applications leverage stateful operations Kafka Streams takes care to create and continuously maintain the underlying state stores to persist any changes regarding the state. In general, a single application may require one or more such state stores which can be accessed by their respective names to selectively write to and read from the state store in question.
The actual storage mechanism behind the scenes has been built with a 'pluggable architecture' in mind. Out of the box, Kafka Streams supports state stores running on top of RocksDB, which is "an embeddable persistent key-value store for fast storage". Embeddable means, that there is no need to spin-up and run external processes - as we would do for database servers - but instead, RocksDB is used as part of the Kafka Streams application and it typically persists to a locally attached disk of the corresponding processing instance. Besides leveraging RocksDB, we can opt for an in-memory only state store implementation, which essentially means that all state is kept and maintained by a hash map structure held inside the JVM's heap. The application then doesn't persist any state to its locally attached disk. Kafka Streams applications can be scaled-out, such that several processing instances are run in parallel to share the workload. Each of the instances then processes a subset of all available partitions of the involved Kafka topics. Likewise, each of the instances locally holds and maintains a partial application state only. In other words, the entire application state is distributed across multiple nodes. Since nodes can go down anytime, either unexpectedly or on purpose, the instance-local state can, of course, be temporarily lost. To cope with such situations and other failure scenarios, Kafka Streams not only performs state changes locally but writes all modifications into so-called changelog topics backed by Apache Kafka. This allows recovering from state loss because the missing state can be re-created by reprocessing the records from the corresponding Kafka topic partitions.
Handling State Store Access
From a developer's perspective, the way we create state stores and interact with them very much depends on which of the two different Kafka Streams APIs (Streams DSL vs. Processor API) we choose to write our stream processing applications. The DSL offers a very convenient way to define stream processors thanks to its declarative, functional and fluent API nature. Most users like DSL abstractions as well as higher-level building blocks and thus tend to primarily leverage the DSL. More importantly, when it comes to state management due to the use of stateful operators, the DSL takes care to auto-create the needed state stores. Additionally, the interactions with the state stores are handled in a fully transparent manner. That said, all the complexities that come along with explicitly touching state are more or less hidden away.
On the other hand, there are use cases for which we need certain functionality that simply isn't provided by the DSL. Whenever we reach that point, we can "fallback to" and use the imperative-style Processor API (PAPI). While it offers us more flexibility and lower-level building blocks, at the same time, it means we need to write more code ourselves. Also when it comes to state management, it's up to us to create any required state stores in the first place, and explicitly write to or read from them as needed.
Exposing Managed State
More often than not, the application state which is managed by Kafka Streams needs to be made available to the outside world, such that other services can directly perform state-based queries against the state at any point in time. For that purpose, there is a mechanism called Interactive Queries which allows performing simple queries based on a single or a range of keys. Additionally, Kafka Streams maintains and exposes the necessary meta-data which is required to derive which of potentially many processing instances is currently responsible for what parts of the distributed application state. This meta-data can be leveraged to route key-based lookup queries to the right target instance which locally holds and maintains the partial state containing the data for given keys.
The Case for Custom State Stores
While simple key-based state lookups might serve lots of use cases just fine, there are others for which such access patterns could be too restrictive. We might need to support richer queries against the application's state which is potentially given by rather complex aggregate structures that are stored for different keys. Luckily, we learned above that there is a 'pluggable' state store architecture in place and exactly in this regard, let's briefly explore how we could allow for arbitrary queries against the persisted state using a custom, remote state store implementation.
When we think about the implementation of custom state stores, the following high-level characteristics are rather important:
- the data store backing the Kafka Streams state store should be resilient & scalable enough and offer acceptable performance because Kafka Streams applications can cause a rather high read/write load
- since application state may consist of complex aggregate structures, support for rich querying and indexing capabilities would be beneficial to be able to go way beyond key-based lookups
- ideally, the custom state store implementation works as a drop-in replacement so that it can be used with minimal to no code and/or configuration changes for existing Kafka Streams applications
MongoDB-backed State Store Implementation
MongoDB can be considered one of the leading document-oriented databases, which are a very good fit to store, query and index complex aggregate structures. Furthermore, MongoDB offers replication and sharding out-of-the-box which are must-have features to cope with potentially high read/write loads as well as the proper level of resilience for backing Kafka Streams applications' state.
Challenge
Ideally, we would just take any application-level data structures which are used behind arbitrary stateful operations and put them into MongoDB collections as documents. However, that is easier said than done since the Kafka Streams DSL strictly limits the possibilities in that regard. The way it works under the covers is that the DSL uses several "nested stores", wrapping one another, similar to a decorator approach.
The outer-most state store "layer", for instance, is a MeteredKeyValueStore
. Then it depends, whether or not caching and/or change logging are enabled. If so, there are e.g. CachingKeyValueStore
and ChangeLoggingKeyValueStore
in-between, before we finally reach the actual store specific implementation - the RocksDBStore
per default or a custom one.
The following stack trace based on debugging an application using the default RocksDBStore
also reflects, how this looks like if both, caching and change logging are enabled:
For putting a new key-value pair into the state store:
For getting the value for a key from the state store:
Now, the main challenge about this is the fact, that very early in this series of "decorated" state stores - in fact right after the MeteredKeyValueStore
layer - the API contracts are already bound to keys and values being bytes
. As a consequence, we do not have direct access anymore to our original data structures (POJOs) at the inner-most "layer", namely the concrete and persisting key-value store implementation. Instead, we have to deal with the bytes resulting from after the serialization which took place in the MeteredKeyValueStore
based on the configured Serdes
. Leaving aside the DSL, and sticking to state stores that can only be used together with the PAPI, would allow us to implement a fully customizable state store. Doing so, we could easily break free from being tightly bound to handling bits & bytes. But then again, it is not feasible any longer to use the custom state store for any existing code in the wild which relies on the DSL. Gone would then be the option to have a nicely working drop-in replacement.
That being said, the proposed way to deal with this dilemma is to embrace the API constraints and come up with an alternate solution that can work based on the bytes contract. Obviously, the custom state store implementation needs to be aware of the configured Serde classes. It can then leverage the deserializers to restore the original key/value application-level data structures, convert these into MongoDB documents and write them into corresponding collections. Conversely, whatever needs to be read back from the database, is first mapped from documents into the application-level data structures, before being serialized and sent through the state store hierarchy as bytes. The illustration below depicts the handling of data on its way in and out of the database and through the state store hierarchy:
Resulting Sample State Documents in MongoDB
Many more Possibilities
Once we have our stream processing state persisted in MongoDB, we can leverage its full potential when it comes to exposing said state. Any of the available fields can be efficiently queried. Thanks to sophisticated secondary index support this can range from geospatial queries all the way through to full-text search or even complex aggregation pipeline queries if needed. For the simple scenario shown above, we might want to define indexes on the fields avg, min, max
and stationName
to efficiently support exact matches, range queries and sorting on any of these fields.
Furthermore, MongoDB's change streams feature can be combined with the reactive database driver to directly stream any state changes to 3rd party clients as they happen. A commonly found use case for this would be to feed a live dashboard in a single page application with either all or a specific subset of the state changes that are happening in Kafka Streams applications.
Trade-Offs
De/Serialization Overhead
Without a doubt, the suggested approach to implement a custom state store on top of MongoDB involves de/serialization overhead. On the plus side, it enables a drop-in replacement and can be leveraged together with any existing Kafka Streams DSL code that needs key-value state stores. The same approach could be used to plug-in various other state store backends with relative ease.
Remote State
Having a centralized, remote state store means that our stream processing application has to reach across network boundaries to read/write state, which may or may not be an issue depending on the provided network reliability as well as throughput needs. Furthermore, we shouldn't forget about the availability guarantees the centralized, remote state store can provide, since having the state available at more or less "all times" is vital. Network blips might be neglected and mitigated by retry mechanisms but longer network-induced downtimes of a critical subset of the database nodes would almost certainly cause us trouble.
However, all of these trade-offs depend on the performance characteristics of the underlying infrastructure: in some systems, network delay may be lower than disk access latency, and network bandwidth may be comparable to disk bandwidth. There is no universally ideal trade-off for all situations, and the merits of local versus remote state may also shift as storage and networking technologies evolve. -- Martin Kleppmann
Main Benefits
Despite the main drawbacks described above, there are a number of beneficial aspects that come along with a custom, remote state store implementation which are worth mentioning:
- persistence of state can be scaled and replicated independently from the Kafka Streams application nodes
- there is no such thing as local state loss if any of the Kafka Streams application nodes goes down
- no need to run stand-by replicas to shorten otherwise long-lasting state store rebuilding phases
- managed state often reflects rich data structures (aggregates that can be efficiently served to any other application based on arbitrary non-key queries backed by secondary indexes
- no additional processing load / network saturation caused by lots of interactive queries which have to be directly served by the Kafka Streams applications nodes themselves
- MongoDB can be configured to run on top of the in-memory storage engine to completely avoid disk I/O
Summary & Outlook
This blog post explained the importance of managing state in stream processing applications. While the default RocksDB-backed Apache Kafka Streams state store implementation serves various needs just fine, some use cases could benefit from a centralized, remote state store. In particular, one possible solution for such a customized implementation that uses MongoDB has been discussed. A major drawback of this approach is, of course, the fact that any state-related read and write access has to be done over the network. However, its main benefit is that complex state aggregates can be efficiently served to any 3rd party applications based on rich querying options and indexing support directly from the database, without the need to cause interactive-query-induced CPU/network load on the actual stream processing nodes.
Currently, there is only proof-of-concept code for the MongoDB-backed state store implementation. The idea is to open source it after polishing some rough edges, adding at least basic documentation and further tests. In case you would find that useful or even want to contribute, please just let me know so that I can ping you once the repo is public. Stay tuned!
Many thanks to Matthias J. Sax for providing me very helpful insights to better understand the constraints and trade-offs of remote state stores. Also I'm very thankful that Gunnar Morling took the time to challenge different aspects of this blog post and to provide very useful hints and comments on this work.
Top comments (4)
Thanks for the great article. Could you please share the implementation? Also, I was wondering why not just writing accumulator states to another topic from where we then could leverage kafkaConnect to write to a sink?
Hi Hans, Thanks for the article. Quite interesting to use Mongo here. Is there any benchmark done to compare rocksdb and mongo to see how much is the increased latency in such replacement.
Has the project been open-sourced?
Unfortunately not :( Never found the time to polish it and bring it beyond the POC stage.