Kafka Producer: timeout configuration

2 minute read

When we want to send data to Kafka in a JVM language, we’ll (either directly or indirectly) use the send(record: ProducerRecord[K, V], callback: Callback): java.util.concurrent.Future[RecordMetadata] method: as its documentation states

The send is asynchronous and this method will return immediately once the record has been stored in the buffer of records waiting to be sent […]

Just to restate it: the fact that send() returned doesn’t mean that the record is already in the topic, just that it has been buffered. In order to know whether the record was sent to the topic, we have two options:

  • block by invoking get() on the returned Java Future, catching exceptions that might result from its execution
  • provide a Callback function that will be given the execution result (or the exception that has been raised, if any) as soon as the execution has completed

So, how long can we expect to wait before the .get()/callback method returns a response?

This is controlled by the delivery.timeout.ms configuration, which defines an upper bound to the overall time which includes:

  • the time to put the record in a batch
  • the time to get an acknowledgment from the broker, and
  • the time to send the record (with any number of retries allowed by the configuration) Once the time has elapsed (or as soon as the operation completes, whichever happens first), the callback will return metadata about the record (if successfully sent) or an exception (if sending failed).

Given that it is an overall time encompassing, among others, the time to send a record, its value must be greater than the request.timeout.ms value (which controls the timeout to send a single request): assigning a multiplier of request.timeout.ms to delivery.timeout.ms enables the producer to retry sending the request in case of failure (unless this behaviour is explicitly disabled by setting retries to 0) before giving up, which can be handy in case of temporary network glitches or high load on the broker side.

There’s a ton more Producer configuration options that can be tuned (at your own risk) :)

Side note: Obtaining a Scala Future

When using the producer from a Scala codebase, it’s usually handy to get back a Scala Future instead of its Java counterpart; this can be achieved, for example, as follows:

import org.apache.kafka.clients.producer.KafkaProducer
import scala.concurrent.Future

class MyProducer[K, V](producer: KafkaProducer[K, V]) {
  def send(topicName: String, record: V): Future[RecordMetadata] = {
    val promise = Promise[RecordMetadata]()

    try {
      producer.send(
        new ProducerRecord(topicName, record), new Callback {
          def onCompletion(metadata: RecordMetadata, exception: Exception) = {
            if (exception == null) promise.trySuccess(metadata)
            else promise.tryFailure(exception)
          }
        }
      )
    } catch {
      case NonFatal(exception) =>
        promise.tryFailure(exception)
    }

    promise.future
  }
}