Optimization for Using AWS Lambda to Send Messages to Amazon MSK

When using AWS Lambda as a producer for Amazon MSK, it’s essential to be aware of the inherent limitations and best practices. This blog focuses on key considerations for optimizing AWS Lambda when producing messages to Amazon MSK.

The Kafka protocol, along with its client, is designed for long-running applications. In particular, the producer is built to establish a connection to each broker and maintain it until the application stops. Continuously opening and closing connections adds CPU overhead to the cluster. Given that Lambda functions are ephemeral—typically running for a maximum of 15 minutes—each invocation could potentially create a new connection, depending on how the function is implemented.

Additionally, with IAM authentication, MSK limits new connections to 100 per second per broker. An increased number of Lambda invocations can lead to excessive connections, resulting in higher CPU usage, increased latency, and reduced throughput.

Best Practices for Using Lambda with Amazon MSK
Avoid Initialising Kafka Clients Inside the Handler Method

AWS Lambda invokes your function within an execution environment that offers a secure and isolated runtime. While this environment is designed to maintain resources for a short period for possible subsequent invocations, it can be terminated every few hours for maintenance and updates.

If Kafka clients are initialised within the handler method, each invocation will establish a new connection to the MSK cluster. However, if the connections are declared outside the handler method, they remain associated with the execution environment, allowing for reuse in subsequent invocations. This approach not only optimizes connection management but also enhances performance by minimising the overhead of connection establishment.

Poor Example:

Kafka producer initialised inside handleRequest method.

public void handleRequest(InputStream input, OutputStream output, Context context) throws Exception {
        Properties props = new Properties();
        props.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "BOOTSTRAP_SERVERS");
        props.put(ProducerConfig.ACKS_CONFIG, 1);
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        String value = new String("Dummy_Payload");
        ProducerRecord<String, String> record = new ProducerRecord<>("topic", value);
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
// Write success message to the output stream
                    System.out.println("Message sent successfully to topic");
                } else {
// Write error message to the output stream
                    System.out.println("Failed to send message");
                }

            }
        });
    }

Right way to do this:

Kafka producer initialised outside handleRequest method.

public void AsyncLambdaHandler() {

        Properties props = new Properties();
        props.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "BOOTSTRAP_SERVERS");
        props.put(ProducerConfig.ACKS_CONFIG, 1);
        producer = new KafkaProducer<>(props);

    }

    public void handleRequest(InputStream input, OutputStream output, Context context) throws Exception {

        String value = new String("Dummy_Payload");
        ProducerRecord<String, String> record = new ProducerRecord<>("topic", value);
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
// Write success message to the output stream
                    System.out.println("Message sent successfully to topic");
                } else {
// Write error message to the output stream
                    System.out.println("Failed to send message");
                }

            }
        });
    }

Kafka is built for long-running connections, and using Lambda does not fulfill this requirement due to its ephemeral nature. An excessive number of connections and high connection rates can lead to an overloaded cluster. However, using Lambda as a consumer of the Amazon MSK cluster is perfectly acceptable and can be a great choice for specific workloads.

MONITOR NEW CLIENT CONNECTIONS CREATED PER APPLICATION

Customers should implement client monitoring alongside the existing server-side metrics to capture client connection metrics, including:

  • Connection creation rate
  • Connection close rate
  • Connection count

The Kafka producer includes a buffer pool that stores records waiting to be sent to the server, along with a background I/O thread that converts these records into requests and sends them to the cluster. The `send()` method operates asynchronously and returns a Future. When invoked, it adds the record to a buffer of pending sends and returns immediately, enabling the producer to batch individual records for greater efficiency.

However, if you use the send function without waiting for an acknowledgment (as shown below), you won’t be able to manage any errors that arise, resulting in no delivery guarantee. This can be viewed as a “fire and forget” approach. It’s important to note that the acks configuration in Kafka will still be respected, but you won’t be able to handle timeouts, network issues, or other errors.

Poor Example:

// send data - asynchronous
producer.send(producerRecord);

Right way to do this is to add a callback to the send method to handle the response back from the server.

public void AsyncLambdaHandler() {

        Properties props = new Properties();
        props.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "BOOTSTRAP_SERVERS");
        props.put(ProducerConfig.ACKS_CONFIG, 1);
        producer = new KafkaProducer<>(props);

    }

    public void handleRequest(InputStream input, OutputStream output, Context context) throws Exception {

        String value = new String("Dummy_Payload");
        ProducerRecord<String, String> record = new ProducerRecord<>("topic", value);
        producer.send(record, new Callback() {

            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                try {
                    if (exception == null) {
                        String successMessage = "Message sent successfully to topic  at partition " + metadata.partition() + " with offset " + metadata.offset();
                        logger.info("Message sent successfully to topic");
                    } else {
                        String errorMessage = "Failed to send message: " + exception.toString();
                        logger.error(errorMessage);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }

            }
        });
    }

However, Lambda has a maximum runtime of 15 minutes. If a function runs for the full 15 minutes and ends without completing all callback functions, the next Lambda invocation from the same execution environment can resume where the previous one left off. This is possible only if the Lambda execution environment is reused between invocations.

KAFKA CLIENT CONSIDERATIONS

Increase the `linger.ms` to at least 5-10 ms and double the default `batch.size`. This will help send larger batches to the MSK cluster and reduce the number of Lambda invocations.

USE LAMBDA FOR LOW VOLUME THROUGHPUT AND SPIKY WORKLOAD

Using Lambda as a producer is ideal when the volume of incoming events is low and has a spiky pattern. This setup helps ensure that

1)  A limited number of Lambda functions are invoked, and

2)  If functions are invoked, they complete within their lifecycle. It is generally advisable to utilize containers as Kafka producer clients for improved stability and overall performance.

COST CHALLENGES

For high throughput scenarios, Lambda may be more expensive than a containerized solution since Lambda pricing is based on the number of function invocations.

ALTERNATIVE TO LAMBDA FOR PRODUCING MESSAGES

You can utilize Amazon ECS with Fargate or Amazon EKS to run your producer.