DEV Community

AutoMQ
AutoMQ

Posted on

Understanding Kafka Producer

Introduction
Today, we present an in-depth analysis of the Kafka Producer (based on [Apache Kafka 3.7][2]). Given the extensive nature of the topic, the article is divided into two segments; the first part elucidates the usage and principles of the Kafka Producer, while the second part will discuss the implementation details and prevalent issues associated with the Kafka Producer.
Usage
Before we dive into the specifics of the Kafka Producer implementation, let's first understand how to utilize it. Here's the example code for sending a message to a specified topic using Kafka Producer:



// 配置并创建一个 Producer
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(kafkaProps);

// 向指定 topic 发送一条消息
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "my-key", "my-value");
producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        // 发送失败
        exception.printStackTrace();
    } else {
        // 发送成功
        System.out.println("Record sent to partition " + metadata.partition() + " with offset " + metadata.offset());
    }
});

// 关闭 Producer,释放资源
producer.close();


Enter fullscreen mode Exit fullscreen mode

Subsequently, the primary interfaces of Kafka Producer are outlined.



public class ProducerRecord<K, V> {
    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;
}

public interface Callback {
    void onCompletion(RecordMetadata metadata, Exception exception);
}

public interface Producer<K, V> {
    // ...
    Future<RecordMetadata> send(ProducerRecord<K, V> record);
    Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
    void flush();
    void close();
    // ...
}


Enter fullscreen mode Exit fullscreen mode

Note: The Producer interface also includes several transaction-related interfaces, such as beginTransaction, commitTransaction, etc., which have been discussed in another article and will not be addressed here.
ProducerRecord
A message sent by the Producer possesses the following properties

  • topic: Required. Specifies the topic to which the record is sent
  • partition: Optional. Indicates the sequence number of the partition to which the record is sent (zero-indexed). When unspecified, a partition is chosen using either the specified Partitioner or the default BuiltInPartitioner (details provided below)
  • headers: Optional. User-defined additional key-value pair information
  • key: Optional. The key value of the message
  • value: Optional. The content of the message
  • timestamp: Optional. The timestamp when the message is sent, determined by the following logic
    • If the topic's message.timestamp.type configuration is "CreateTime"
    • If the user provides a timestamp, that value is utilized
    • Otherwise, the timestamp defaults to the message's creation time, roughly when the send method is invoked
    • If the topic's message.timestamp.type is set to "LogAppendTime", the broker's write time is used for the message, irrespective of any user-specified timestamp Callback Utilized in callbacks following message acknowledgment, potential exceptions include:
  • Non-retriable
    • InvalidTopicException: Topic name is invalid, e.g., too long, empty, or includes prohibited characters.
    • OffsetMetadataTooLarge: Metadata string used in Producer#sendOffsetsToTransaction is excessively long (controlled by offset.metadata.max.bytes, default 4 KiB).
    • RecordBatchTooLargeException: Size of the sent batch exceeds limits.
    • Exceeded the maximum size allowed (broker configuration message.max.bytes or topic configuration max.message.bytes, default 1MiB + 12 B)
    • Exceeded the segment size (broker configuration log.segment.bytes or topic configuration segment.bytes, default is 1 GiB) Note: This error may only occur in older versions of the Client
    • RecordTooLargeException: Size of a single message
    • Exceeded the maximum size of a single producer request (producer configuration max.request.size, default 1MiB)
    • Exceeded the size of the producer buffer (producer configuration buffer.memory, default 32 MiB)
    • Exceeded the maximum size allowed (broker configuration message.max.bytes or topic configuration max.message.bytes, default 1MiB + 12 B)
    • TopicAuthorizationException, ClusterAuthorizationException: Authentication failed
    • UnknownProducerIdException: In transaction requests, the PID has expired or the records associated with the PID have expired
    • InvalidProducerEpochException: In transaction requests, the epoch is illegal
    • UnknownServerException: An unspecified error occurred.
  • Retriable
    • CorruptRecordException: CRC check failed, typically because of a network error.
    • InvalidMetadataServerException: The client-side metadata is outdated.
    • UnknownTopicOrPartitionException: The topic or partition does not exist, potentially due to expired metadata
    • NotLeaderOrFollowerException: The requested broker is not the leader, possibly due to ongoing leader election
    • FencedLeaderEpochException: The leader epoch in the request is outdated, potentially caused by slow metadata refresh
    • NotEnoughReplicasException, NotEnoughReplicasAfterAppendException: Insufficient number of insync replicas (configured under broker's min.insync.replicas or similar named topic configuration, default is 1). Note that NotEnoughReplicasAfterAppendException occurs after a record is written, and retries by the producer may lead to duplicate data
    • TimeoutException: Processing timeout, which could have two possible causes
    • Synchronous operations often experience delays, especially when the producer buffer is full or there are timeouts in fetching metadata.
    • Asynchronous operations can also encounter timeouts, for example, when throttling restricts the producer from sending messages, or if a broker fails to respond promptly. Producer#send Initiate sending a message asynchronously, and if needed, activate a Callback upon message acknowledgment. Ensure that Callbacks for sending requests to the same partition are executed in the sequence they were initiated. Producer#flush Label all messages in the producer's cache as ready to send immediately, and block the current thread until all previously dispatched messages receive acknowledgments. Note: This action will block the current thread only; other threads may continue sending messages, although the completion time for messages sent post-flush is not assured. Producer#close Shut down the producer and block until all messages are sent. Note:
  • Invoking close in the Callback will immediately shut down the producer
  • Any send method still in the synchronous call phase (pulling metadata, waiting for memory allocation) will be terminated immediately and throw a KafkaException Core Components Next, we will discuss the specific implementation of the Kafka Producer, which consists of the following core components
  • ProducerMetadata & Metadata Responsible for caching and refreshing the metadata needed on the Producer side, including all metadata of the Kafka Cluster such as broker addresses, the distribution status of partitions in topics, and information about leaders and followers.
  • RecordAccumulator Responsible for managing the Producer's buffer, it groups messages for transmission based on partition, time (linger.ms), and size (batch.size) into RecordBatch and holds them for dispatch.
  • Sender Manages a daemon thread named "kafka-producer-network-thread | {client.id}" which facilitates the sending of Produce requests and processes Produce responses, as well as managing timeouts, error handling, and retries.
  • TransactionManager Charged with implementing idempotence and transaction capabilities, this involves assigning sequence numbers, tackling message loss and disorder, and managing transaction states. Sending Process The message sending process is depicted in the diagram below:

Image description

The process is divided into the following steps:

  1. Refreshing Metadata
  2. Serialize the message using the specified Serializer
  3. Select the target partition for sending the message using either a user-specified Partitioner or the BuiltInPartitioner
  4. Insert the message into the RecordAccumulator for batching
  5. Sender asynchronously retrieves the sendable batch from the RecordAccumulator (grouped by node), registers a callback, and sends
  6. Sender handles the response, and based on the situation, returns results, exceptions, or retries The following sections will detail each of these steps Refreshing Metadata ProducerMetadata is tasked with caching and updating the metadata needed on the producer side, ensuring a comprehensive view of all the topics required for the producer. It will
  7. Add topics in the following scenarios
    • When sending a message, if the specified topic is not present in the cached metadata
  8. Remove topics in the following scenarios
    • When it is determined that the metadata for a topic has been inactive for a continuous period defined by metadata.max.idle.ms
  9. Refresh metadata in the following scenarios:
    • When sending a Message, the specified start-up command is not in the cached metadata (this occurs when the number of partitions in a Topic increases)
    • When sending a Message, the leader of the specified partition is unknown
    • After sending a Message, an InvalidMetaException response is received
    • When the continuous metadata.max.age.ms setting does not refresh metadata Associated configurations include
  10. metadata.max.idle.ms The cache timeout for topic metadata, specifically, if no messages are sent to a certain topic within the specified duration, the metadata for that topic will expire; the default setting is 5 minutes.
  11. metadata.max.age.ms Mandatory metadata refresh interval, specifically, if metadata is not refreshed within the specified duration, an update is forcibly initiated; the default setting is 5 minutes. Partition selection In KIP-794[3], addressing the issue of the Sticky Partitioner in previous versions, which led to an uneven distribution of messages across brokers, a new Uniform Sticky Partitioner was introduced (and set as the default built-in Partitioner). This partitioner, without key constraints, more effectively distributes messages to faster brokers. When selecting partitions, there are two scenarios:
  12. If a user specifies a Partitioner, then that Partitioner is used to select the partition
  13. If not, the default BuiltInPartitioner is used
    • If a record key is set, a unique partition is determined based on the hash value of the key
    • Records with the same key are consistently assigned to the same partition.
    • However, this consistency is not maintained when the number of partitions within a topic is altered, as the same key may not be assigned to the original partition.
    • If no key is specified, or if the partitioner.ignore.keys is set to "true", Kafka defaults to sending more messages to faster brokers. Associated configurations include
  14. partitioner.class The class name for the partition selector can be customized by users to meet specific requirements.
    • DefaultPartitioner and UniformStickyPartitioner: These "sticky" partitioners allocate messages sequentially to each partition, filling one partition before moving to the next. However, their implementation has been problematic as it tends to overload slower brokers, leading to its deprecation.
    • RoundRobinPartitioner: This partitioner disregards the record key and distributes messages evenly across all partitions in a cyclic manner. It is important to note that it can lead to uneven message distribution when initiating new batches. It is advisable to either use the built-in partitioner or develop a custom one to suit your needs.
  15. partitioner.adaptive.partitioning.enable The decision to determine the number of messages sent based on broker speed can be enabled or disabled; if disabled, partitions are chosen at random. This is only applicable when the partitioner.class is not set, with the default setting being "true".
  16. partitioner.availability.timeout.ms This setting is effective only when partitioner.adaptive.partitioning.enable is set to "true". Should the time lapse between accumulating a batch of messages for a specific broker and sending them exceed this setting, the system will halt message allocation to that broker; a setting of 0 indicates that this feature is disabled. This applies only when the partitioner.class is not configured, with the default set to 0.
  17. partitioner.ignore.keys When selecting a partition, if "false" is selected, the partition is determined based on the hash value of the key; if not, the key is disregarded. This setting applies only if partitioner.class is not set. The default setting is "false". Message Batching In the RecordAccumulator, batches to be sent are organized by partition. Key methods include:


public RecordAppendResult append(String topic,
                                 int partition,
                                 long timestamp,
                                 byte[] key,
                                 byte[] value,
                                 Header[] headers,
                                 AppendCallbacks callbacks,
                                 long maxTimeToBlock,
                                 boolean abortOnNewBatch,
                                 long nowMs,
                                 Cluster cluster) throws InterruptedException;

public ReadyCheckResult ready(Metadata metadata, long nowMs);

public Map<Integer, List<ProducerBatch>> drain(Metadata metadata, Set<Node> nodes, int maxSize, long now);


Enter fullscreen mode Exit fullscreen mode
  • append: Adds a message to the buffer, registers a future, and returns it. This future completes either when the message is successfully sent or if it fails.
  • ready: Identifies nodes that have messages prepared for dispatch. Scenarios include:
    • A batch of messages has reached the batch.size
    • Messages have been batched continuously for longer than linger.ms
    • The memory allocated to the producer has been exhausted; specifically, the total size of the messages in the buffer has exceeded buffer.memory
    • The batch requiring a retry has already been delayed for at least retry.backoff.ms
    • The user called Producer#flush to ensure message delivery.
    • The producer is in the process of shutting down.
  • Drain: For each node, go through every partition, pulling the earliest batch from each partition (if present), until the message size hits the max.request.size, or all partitions have been checked. Associated configurations include
  • linger.ms Each batch will wait for the maximum time, which by default is 0. It's important to note that setting it to 0 doesn't eliminate batching; instead, it means there's no delay before sending. To completely disable batching, you should set batch.size to 0 or 1. Enhancing this configuration will
    • increase throughput (as the overhead of sending each message is reduced and the benefits of compression are enhanced)
    • Slightly increases latency
  • batch.size determining the maximum size of each batch, which by default is 16 KiB. Setting this value to 0 (effectively the same as setting it to 1) disables batching, so each batch contains only one Message. When a single Message exceeds the batch.size, it is sent as a standalone batch. Enhancing this configuration will
    • Boosts throughput
    • Utilizes more memory (each new batch creation allocates a memory block of batch.size)
  • max.in.flight.requests.per.connection A producer can send up to 5 batches to each broker without awaiting a response, by default
  • max.request.size The maximum total message size per all requests is also the limit for individual messages, set by default at 1 MiB Be aware that the broker's configuration message.max.bytes and the topic's configuration max.message.bytes also set boundaries on the maximum message size Timeout Handling The Kafka Producer offers various timeout-related settings to manage the allowable duration for each phase of the message delivery process, as detailed below:

Image description

These settings include

  • buffer.memory the maximum capacity of the producer buffer, set by default at 32 MiB. When this buffer is full, it will block and wait for up to max.block.ms before it issues an error.
  • max.block.ms By default, when the send method is invoked, the current thread may be blocked for a maximum of 60 seconds. It encompasses
    • Time required to retrieve metadata
    • Time incurred waiting when the producer buffer is at capacity Excluding
    • Duration required to serialize the message
    • Time spent by the Partitioner to determine a partition
  • request.timeout.ms Maximum duration from sending a request to receiving a response, typically 30s.
  • delivery.timeout.ms The entire duration of asynchronous message delivery, namely, from the moment the send method returns to the activation of the Callback. The default is set at 120s. It encompasses
    • the time required for batching by the producer
    • Sending a request to the broker and waiting for a response
    • The time for each retry Its value should be no less than linger.ms + request.timeout.ms.
  • retries The maximum number of retries, by default, is set to Integer.MAX_VALUE.
  • retry.backoff.ms and retry.backoff.max.ms These settings govern the exponential backoff strategy for retries following a send failure: each retry attempt begins with a wait time of retry.backuff.ms, which increases exponentially by a factor of 2, plus an additional 20% jitter, capped at retry.backoff.max.ms. The default values are 100ms and 1000ms, respectively. Summary Our project, AutoMQ[1], is committed to developing the next-generation, cloud-native Apache Kafka® system, specifically designed to tackle the cost and elasticity challenges of traditional Kafka. As devoted supporters and active contributors to the Kafka ecosystem, we persist in delivering top-tier Kafka technical content to enthusiasts. In our previous article, we covered the functionality of Kafka Producers and the fundamental principles behind their implementation; our upcoming article will delve into further implementation details and address typical challenges associated with Kafka Producers. Keep an eye out for more updates.

References
[1] AutoMQ: https://github.com/AutoMQ/automq
[2] Kafka 3.7: https://github.com/apache/kafka/releases/tag/3.7.0
[3] KIP-794: https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner

Top comments (0)