In a previous blog, we explored how to use Kubernetes primitives to run stateful stream processing apps powered by Kafka Streams.
With great power comes great responsibility! If you're using Interactive queries with a stateful Kafka Streams app, you need to be aware of the fact that local state stores are not in a queryable state, if your instance is rebalancing/restoring/shuffling its local state from changelog topics.
You can use Kubernetes probes in your apps to handle this gracefully.
Readiness probe
A Readiness probe is used by Kubernetes to detect when a container is ready to start accepting traffic. Applying this to your Kafka Streams application will ensure that app instances will be registered with the Kubernetes Service
only after certain (user-defined) criteria is fulfilled. If an instance is unavailable due to rebalancing, its REST endpoints will not be exposed to clients (via the Service
) for Interactive queries.
At this point, only partial state of your app might be queryable via the instances which pass the Readiness probe
After the container starts, Kubernetes will wait for the duration specified in initialDelaySeconds
before triggering the Readiness probe and repeat it after every periodSeconds
until it succeeds or a timeout is reached (timeoutSeconds
).
Liveness Probe
Even after your Kafka Streams app (and all its instances) is up and running, there might be rebalances due to multiple reasons - members joining, leaving, etc. In this scenario, you might want to check if the instance is in a ready state from an Interactive queries point of view. You can configure a Liveness probe just like its Readiness counterpart (even use the same mechanism for them e.g. a /health
endpoint) and the way it works is also the same (except for what happens when the probe fails)
Kubernetes v1.16 added a
Startup Probe
which can be used to check if a container process has started. It can be used along with Liveness and Readiness probes but it's worth mentioning that if configured, it disables liveness and readiness checks until it succeeds, making sure those probes donβt interfere with the application startup
Probe configuration
For these probes to work, you will need to:
- Implement a check mechanism, and
- Configure the probe in the Kubernetes (
StatefulSet
) manifest
To try this out, you can update the application in the Stateful Kafka Streams as per the below mentioned steps
Health check endpoint for the probe
Kubernetes supports HTTP endpoints, TCP socket and arbitrary command execution as health check probes. For our Kafka Streams app, exposing state store status info via a REST endpoint makes a lot sense as far as health check is concerned
@GET("health")
public Response health() {
Response healthCheck = null;
KafkaStreams.State streamsAppState = KafkaStreamsBootstrap.getKafkaStreamsState();
if (streamsAppState.name().equals(RUNNING_STATE)) {
healthCheck = Response.ok().build();
} else {
healthCheck = Response.status(Response.Status.SERVICE_UNAVAILABLE)
.entity(streamsAppState.name())
.build();
}
return healthCheck;
}
Probe config manifest
Here is what a Readiness probe looks like (same applies to a Liveness probe)
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 15
periodSeconds: 5
timeoutSeconds: 2
failureThreshold: 5
Although these probes will mitigate problems, there are corner cases you should be aware of while dealing with Kafka Streams app, esp. ones that involve a lot of stateful computations.
Caveats
It is possible that your Kafka Streams state is so large, that it's rebalancing period might cross the threshold/constraints specified by the Readiness and/or Liveness probes. This is what might happen in these cases:
- Readiness probe breached: The probe will take into account
failureThreshold
count before Kubernetes will stop executing it and mark the Pod asUnready
, thus rendering it unavailable. - Liveness probe breached: In this case, Kubernetes restarts the Pod after
failureThreshold
. Thus your application might end up being stuck in a loop where it tries to restore state, but it's interrupted since Kubernetes restarts (due to the probe) the Pod (and it has to start all over again!)
Possible solutions
If the state associated with your Kafka Streams app is stored externally (e.g. PersistentVolume
backed by durable cloud storage), the long winded rebalances can be avoided to a large extent
this blog covers it in detail
Another key point is to test your application, be mindful of its state and the time it might to restore/rebalance/shuffle state. You can then make an informed decision and adjust the probe configuration parameters (failureThreshold
, initialDelaySeconds
etc.) accordingly, such that your app gets enough time to actually carry out the restore process before you poke the probe at it!
You can also configure your Kafka Streams app to have standby replicas of local states (use num.standby.replicas
configuration) which are fully replicated copies of the state. When a rebalance is triggered, Kafka Streams will try to assign a task to an instance where such a standby replica already exists in order to minimize the task (re)initialization cost.
You should consider using Startup probe in future versions of Kubernetes (once its stable)
That's it for now. I really hope you enjoyed and learned something from this article. Please like and follow if you did. Happy to get feedback via Twitter or just drop a comment ππ
Top comments (0)