Producing events and handling credentials refresh for IAM enabled aws msk cluster using aws msk IAM auth library

Amazon Managed Streaming for Apache Kafka (MSK) is a fully managed service that makes it easy to build and run applications that use Apache Kafka. One of the key features of MSK is its support for IAM authentication, which allows you to secure access to your Kafka clusters and control who can access them. In this article, we will explore how to produce events to IAM authenticated MSK clusters using assumeRoleWithWebIdentity and also how to handle temporary credentials refresh as the credentials returned by assume role has limited lifespan and need to be refreshed periodically to maintain access to the service.

This article is targeted for deploying the java application in a kubernetes environment which uses service account token projection from kubernetes which can be used to genarate a JWT token that can be used to perform an aws sts assume-role-with-web-identity for the purposes of authenticating against msk IAM cluster.

The kubelet will request and store the token on behalf of the Pod and makes the token available to the Pod at a configurable file path and also refreshes the token as it approaches expiration. The kubelet proactively requests rotation for the token if it is older than 80% of its total time-to-live, or if the token is older than 24 hours.

The application is responsible for reloading the token when it rotates. It’s often good enough for the application to load the token on a schedule, without tracking the actual expiry time. In the solution discussed in this article we have a watchService implementation which monitors the file token for any modification and reloads the token and refreshes the temporary credentials as well.

For more information on how to enable service account token projection in kubernetes refer below link

https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/#service-account-token-volume-projection

The solution that is discussed in the below article uses the JWT token that is created by the kubernetes token projection to assumeRoleWithWebIdentity and the solution also has examples how we can configure a credential refresh as the credentials returned by the assumeRoleWithWebIdentity is temporary credentials which by default will be valid for 15 min and can be configured using the durationSeconds function upto a maximum of 12 hours.

The solution also discusses a custom implementation of IAMClientCallbackHandler and a custom credential provider chain to override the chaining order. We are using amazon MSK Library for AWS Identity and Access Management which enables us to use AWS Identity and Access Management (IAM) to connect to the Amazon MSK clusters. Below is the git link for the library

https://github.com/aws/aws-msk-iam-auth

Let’s begin with the following code , which has all the required kafka producer properties, here we are passing the CustomIAMCallbackHandler as we want to override the order of the credential chaining. Also we are going to use avro as a data format with an integration to AWS glue schema registry which is a service that allows you to store, manage, and share schemas across multiple data systems and data processing workflows. It enables you to store and retrieve versions of your schema, and provides a way to register and discover schemas across different applications, data systems, and teams. With AWS Glue Schema Registry, you can also validate incoming data against registered schemas to ensure data quality and consistency.

package com.timepass.techies;

import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafkaSerializer;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;

public class ProducerProperties {

  public static final Properties kafkaParams = new Properties();

  public static Properties getKafkaProducerProperties()
  {
      kafkaParams.put(
              ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer");
      kafkaParams.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
      kafkaParams.put(ProducerConfig.ACKS_CONFIG, 1);
      kafkaParams.put(SaslConfigs.SASL_MECHANISM, "AWS_MSK_IAM");
      kafkaParams.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
      kafkaParams.put(
              SaslConfigs.SASL_JAAS_CONFIG, "software.amazon.msk.auth.iam.IAMLoginModule required;");
      kafkaParams.put(AWSSchemaRegistryConstants.DATA_FORMAT, "avro");
      kafkaParams.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "glueSchemaRegistryName");
      kafkaParams.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "schemaName");
      kafkaParams.put(AWSSchemaRegistryConstants.AWS_REGION, "eu-west-2");
      kafkaParams.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
      kafkaParams.put(
              SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS,
              "com.timepass.techies.CustomIAMCallbackHandler");
      return kafkaParams;

  }
}

Let’s code the KafkaProducerServer which initialises an instance of org.apache.kafka.clients.producer.KafkaProducer with all the required producer configs to produce avro message to a kafka broker which is IAM enabled to control access to the topic.

package com.timepass.techies;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;

public class KafkaProducerServer {
  private KafkaProducerServer() {}

  public static final Producer<String, Event> producer =
      new KafkaProducer<>(ProducerProperties.getKafkaProducerProperties());

  public static Producer<String, Event> getInstance() {
    return producer;
  }
}

Let us implement a TokenService class which uses AWS sts client which enables you to request temporary, limited-privilege credentials for AWS Identity and Access Management (IAM) users or for users that you authenticate (federated users). To use the AWS STS in Java you can use the AWS SDK for Java, specifically the AWSSecurityTokenServiceClient class. To assume a role using web identity federation in the AWS SDK for Java, you can use the assumeRoleWithWebIdentity method of the AWSSecurityTokenService class.

package com.timepass.techies;

import com.amazonaws.SDKGlobalConfiguration;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.core.SdkSystemSetting;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.model.AssumeRoleWithWebIdentityRequest;
import software.amazon.awssdk.services.sts.model.AssumeRoleWithWebIdentityResponse;
import software.amazon.awssdk.services.sts.model.Credentials;
import software.amazon.awssdk.utils.IoUtils;
import java.io.FileInputStream;
import java.io.IOException;

public class TokenService {

    public void assumeWebIdentityRole(String roleToAssume,String tokenFilePath) {

        try {

            FileInputStream fis = null;
            fis = new FileInputStream(tokenFilePath);
            String tokenContent = null;
            tokenContent = IoUtils.toUtf8String(fis);
            AssumeRoleWithWebIdentityRequest assumeRoleWithWebIdentityRequest =
                    AssumeRoleWithWebIdentityRequest.builder()
                            .webIdentityToken(tokenContent)
                            .roleArn(roleToAssume)
                            .roleSessionName("sessionName")
                            .durationSeconds(3600)
                            .build();

            StsClient stsClient =
                    StsClient.builder()
                            .credentialsProvider(
                                    () ->
                                            AwsBasicCredentials.create(
                                                    "sts_client_credential_provider_key",
                                                    "sts_client_credential_provider_access"))
                            .httpClient(UrlConnectionHttpClient.create())
                            .build();

            AssumeRoleWithWebIdentityResponse assumeRoleWithWebIdentityResponse =
                    stsClient.assumeRoleWithWebIdentity(assumeRoleWithWebIdentityRequest);
            Credentials credentials = assumeRoleWithWebIdentityResponse.credentials();
            System.setProperty(
                    SDKGlobalConfiguration.ACCESS_KEY_SYSTEM_PROPERTY, credentials.accessKeyId());
            System.setProperty(
                    SDKGlobalConfiguration.SECRET_KEY_SYSTEM_PROPERTY, credentials.secretAccessKey());
            System.setProperty(
                    SdkSystemSetting.AWS_SECRET_ACCESS_KEY.property(), credentials.secretAccessKey());
            System.setProperty(
                    SDKGlobalConfiguration.SESSION_TOKEN_SYSTEM_PROPERTY, credentials.sessionToken());
            fis.close();
        } catch (IOException exception) {
            throw new AssumeRoleException(exception.getMessage());
        }
    }

}

Below is a custom exception class which we are using in the code example above

package com.timepass.techies;

public final class AssumeRoleException extends RuntimeException {
  private static final String MESSAGE = "Exception when assuming the role. ";

  public AssumeRoleException(String message) {
    super(MESSAGE.concat(message));
  }
}

Let us implement a file monitor which monitors the token file for changes , we will be using Java’s WatchService which is a utility class in the java.nio.file package. The WatchService monitors a directory or file for changes and can be used to watch for the creation, deletion, or modification of files or directories within a specified directory. The WatchService API uses a “watch key” to represent a registration of interest in an event or set of events for a particular directory. Once the service is started, it will listen for events and you can retrieve the events using the poll() or take() methods. This is a useful tool for implementing file-based event-driven systems.

The below class also invokes tokenService.assumeWebIdentityRole once a new token is available which in turn refreshes the temporary credentials used to authenticate and authorise into IAM kafka cluster.

package com.timepass.techies;

import java.io.File;
import java.io.IOException;
import java.nio.file.*;
import java.util.concurrent.Executors;

public class TokenMonitor {
  private String tokenPath;

  private String role;

  public TokenMonitor(String tokenPath, String role) {
    this.tokenPath = tokenPath;
    this.role = role;
  }

  public void start() {

    try {
      TokenService tokenService = new TokenService();
      WatchService watchService = FileSystems.getDefault().newWatchService();
      Path path = new File(tokenPath).getParentFile().toPath();
      path.register(
          watchService,
          StandardWatchEventKinds.ENTRY_MODIFY,
          StandardWatchEventKinds.ENTRY_CREATE,
          StandardWatchEventKinds.ENTRY_DELETE);

      Executors.newSingleThreadExecutor()
          .submit(
              () -> {
                while (true) {
                  Thread.currentThread().setName("watch-service-thread");
                  WatchKey watchKey = null;
                  try {
                    watchKey = watchService.poll();
                    if (watchKey != null && !watchKey.pollEvents().isEmpty()) {
                      tokenService.assumeWebIdentityRole(role, tokenPath);
                    }
                    Thread.sleep(30000);
                  } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                  } catch (Exception e) {
                  } finally {
                    if (watchKey != null) {
                      watchKey.reset();
                    }
                  }
                }
              });

    } catch (IOException e) {
      throw new TokenMonitorException(e.getMessage());
    }
  }
}

Below is a TokenMonitorException class which we are using in the code example above

package com.timepass.techies;

public final class TokenMonitorException extends RuntimeException {
private static final String MESSAGE = "Exception in token monitor service. ";

public TokenMonitorException(String message) {
super(MESSAGE.concat(message));
}
}

The IAMClientCallbackHandler in the msk iam auth library is used to extract AWSCredentials. The credentials are based on JaasConfig options passed to IAMLoginModule which used the DefaultAWSCredentialsProviderChain which looks for the aws credential in the below order

  1.  Environment variables: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
  2.  Java system properties: aws.accessKeyId and aws.secretKey.
  3.  Web Identity Token credentials from the environment or container.
  4.  The default credential profiles file typically located at ~/.aws/credentials (location can vary per platform), and shared by many of the AWS SDKs and by the AWS CLI. You can create a credentials file by using the aws configure command provided by the AWS CLI, or you can create it by editing the file with a text editor. For information about the credentials file format, see AWS Credentials File Format.
  5.  It can be used to load credentials from credential profiles other than the default one by setting the environment variable AWS_PROFILE to the name of the alternate credential profile. Profiles can be used to load credentials from other sources such as AWS IAM Roles. See AWS Credentials File Format for more details.
  6.  Amazon ECS container credentials– loaded from the Amazon ECS if the environment variable AWS_CONTAINER_CREDENTIALS_RELATIVE_URI is set.
  7.  Instance profile credentials: used on EC2 instances, and delivered through the Amazon EC2 metadata service.

if the above chaining works for your case you can directly use the IAMClientCallbackHandler in the producer configuration as below

kafkaParams.put(
SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS,
"software.amazon.msk.auth.iam.IAMClientCallbackHandler");

In many cases using the environmental variable can lead to conflicts if some other apps has already configured the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. To overcome this we will implement a custom IAMClientCallbackHandler which uses a CustomCredentialProvider to override the above order to give first priority to Java system properties aws.accessKeyId and aws.secretKey which will be specific to the jvm in which your app is running .

Below is the CustomCredentialProvider

package com.timepass.techies;

import com.amazonaws.auth.*;
import java.util.Map;

public class CustomCredentialProvider
extends software.amazon.msk.auth.iam.internals.MSKCredentialProvider {
public CustomCredentialProvider(Map<String, ?> options) {
super(options);
}

@Override
protected AWSCredentialsProviderChain getDefaultProvider() {
return new AWSCredentialsProviderChain(
new SystemPropertiesCredentialsProvider(),
new EnvironmentVariableCredentialsProvider(),
WebIdentityTokenCredentialsProvider.create(),
new software.amazon.msk.auth.iam.internals.EnhancedProfileCredentialsProvider(),
new EC2ContainerCredentialsProviderWrapper());
}
}

Below is the CustomIAMCallbackHandler implementation

package com.timepass.techies;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.security.auth.login.AppConfigurationEntry;
import software.amazon.msk.auth.iam.internals.AWSCredentialsCallback;

public class CustomIAMCallbackHandler
    extends software.amazon.msk.auth.iam.IAMClientCallbackHandler {
  private AWSCredentialsProvider provider;

  @Override
  public void configure(
      Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
    if (!software.amazon.msk.auth.iam.IAMLoginModule.MECHANISM.equals(saslMechanism)) {
      throw new IllegalArgumentException("Unexpected SASL mechanism: " + saslMechanism);
    }
    final Optional<AppConfigurationEntry> configEntry =
        jaasConfigEntries.stream()
            .filter(
                j ->
                    software.amazon.msk.auth.iam.IAMLoginModule.class
                        .getCanonicalName()
                        .equals(j.getLoginModuleName()))
            .findFirst();
    provider =
        configEntry
            .map(c -> (AWSCredentialsProvider) new CustomCredentialProvider(c.getOptions()))
            .orElse(DefaultAWSCredentialsProviderChain.getInstance());
  }

  @Override
  protected void handleCallback(AWSCredentialsCallback callback) {
    try {
      provider.refresh();
      callback.setAwsCredentials(provider.getCredentials());
    } catch (Exception e) {
      callback.setLoadingException(e);
    }
  }
}

Lets code the main class to test the example

package com.timepass.techies;

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaEventProducer {

  public static void main(String[] args) {
    Producer<String, GenericRecord> producer = KafkaProducerServer.getInstance();
    AvroGenericRecordExample event =
            AvroGenericRecordExample.newBuilder()
                    .setHost("hostname")
                    .setLanguage("java")
                    .setVersion("version")
                    .build();
    new TokenMonitor("token_path_here", "role_to_assume_web_identity_here").start();
    while (true) {
      producer.send(new ProducerRecord<String, GenericRecord>("movie", event));
      try {
        Thread.sleep(5000);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RuntimeException(e);
      }
    }
  }
}

Below is the AvroGenericRecordExample which was created using the avro-tools-1.11.1.jar using the below command which converts the avro schema into a java class. The generated code includes Java classes for your Avro records and the corresponding builder classes, as well as utility classes for serializing and deserializing Avro data. You can use these classes in your Java code to work with Avro data.

java -jar avro-tools-1.11.1.jar compile schema AvroGenericRecordExample.avsc .
package com.timepass.techies;
/**
 * Autogenerated by Avro
 *
 * <p>DO NOT EDIT DIRECTLY
 */
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.SchemaStore;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.util.Utf8;

@org.apache.avro.specific.AvroGenerated
public class AvroGenericRecordExample extends org.apache.avro.specific.SpecificRecordBase
    implements org.apache.avro.specific.SpecificRecord {
  private static final long serialVersionUID = 6024506916854012259L;

  public static final org.apache.avro.Schema SCHEMA$ =
      new org.apache.avro.Schema.Parser()
          .parse(
              "{\"type\":\"record\",\"name\":\"AvroGenericRecordExample\",\"fields\":[{\"name\":\"language\",\"type\":\"string\"},{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"host\",\"type\":\"string\"}]}");

  public static org.apache.avro.Schema getClassSchema() {
    return SCHEMA$;
  }

  private static final SpecificData MODEL$ = new SpecificData();

  private static final BinaryMessageEncoder<AvroGenericRecordExample> ENCODER =
      new BinaryMessageEncoder<AvroGenericRecordExample>(MODEL$, SCHEMA$);

  private static final BinaryMessageDecoder<AvroGenericRecordExample> DECODER =
      new BinaryMessageDecoder<AvroGenericRecordExample>(MODEL$, SCHEMA$);

  /**
   * Return the BinaryMessageEncoder instance used by this class.
   *
   * @return the message encoder used by this class
   */
  public static BinaryMessageEncoder<AvroGenericRecordExample> getEncoder() {
    return ENCODER;
  }

  /**
   * Return the BinaryMessageDecoder instance used by this class.
   *
   * @return the message decoder used by this class
   */
  public static BinaryMessageDecoder<AvroGenericRecordExample> getDecoder() {
    return DECODER;
  }

  /**
   * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link
   * SchemaStore}.
   *
   * @param resolver a {@link SchemaStore} used to find schemas by fingerprint
   * @return a BinaryMessageDecoder instance for this class backed by the given SchemaStore
   */
  public static BinaryMessageDecoder<AvroGenericRecordExample> createDecoder(SchemaStore resolver) {
    return new BinaryMessageDecoder<AvroGenericRecordExample>(MODEL$, SCHEMA$, resolver);
  }

  /**
   * Serializes this AvroGenericRecordExample to a ByteBuffer.
   *
   * @return a buffer holding the serialized data for this instance
   * @throws java.io.IOException if this instance could not be serialized
   */
  public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
    return ENCODER.encode(this);
  }

  /**
   * Deserializes a AvroGenericRecordExample from a ByteBuffer.
   *
   * @param b a byte buffer holding serialized data for an instance of this class
   * @return a AvroGenericRecordExample instance decoded from the given buffer
   * @throws java.io.IOException if the given bytes could not be deserialized into an instance of
   *     this class
   */
  public static AvroGenericRecordExample fromByteBuffer(java.nio.ByteBuffer b)
      throws java.io.IOException {
    return DECODER.decode(b);
  }

  private CharSequence language;
  private CharSequence version;
  private CharSequence host;

  /**
   * Default constructor. Note that this does not initialize fields to their default values from the
   * schema. If that is desired then one should use <code>newBuilder()</code>.
   */
  public AvroGenericRecordExample() {}

  /**
   * All-args constructor.
   *
   * @param language The new value for language
   * @param version The new value for version
   * @param host The new value for host
   */
  public AvroGenericRecordExample(CharSequence language, CharSequence version, CharSequence host) {
    this.language = language;
    this.version = version;
    this.host = host;
  }

  public SpecificData getSpecificData() {
    return MODEL$;
  }

  public org.apache.avro.Schema getSchema() {
    return SCHEMA$;
  }
  // Used by DatumWriter.  Applications should not call.
  public Object get(int field$) {
    switch (field$) {
      case 0:
        return language;
      case 1:
        return version;
      case 2:
        return host;
      default:
        throw new IndexOutOfBoundsException("Invalid index: " + field$);
    }
  }

  // Used by DatumReader.  Applications should not call.
  @SuppressWarnings(value = "unchecked")
  public void put(int field$, Object value$) {
    switch (field$) {
      case 0:
        language = (CharSequence) value$;
        break;
      case 1:
        version = (CharSequence) value$;
        break;
      case 2:
        host = (CharSequence) value$;
        break;
      default:
        throw new IndexOutOfBoundsException("Invalid index: " + field$);
    }
  }

  /**
   * Gets the value of the 'language' field.
   *
   * @return The value of the 'language' field.
   */
  public CharSequence getLanguage() {
    return language;
  }

  /**
   * Sets the value of the 'language' field.
   *
   * @param value the value to set.
   */
  public void setLanguage(CharSequence value) {
    this.language = value;
  }

  /**
   * Gets the value of the 'version' field.
   *
   * @return The value of the 'version' field.
   */
  public CharSequence getVersion() {
    return version;
  }

  /**
   * Sets the value of the 'version' field.
   *
   * @param value the value to set.
   */
  public void setVersion(CharSequence value) {
    this.version = value;
  }

  /**
   * Gets the value of the 'host' field.
   *
   * @return The value of the 'host' field.
   */
  public CharSequence getHost() {
    return host;
  }

  /**
   * Sets the value of the 'host' field.
   *
   * @param value the value to set.
   */
  public void setHost(CharSequence value) {
    this.host = value;
  }

  /**
   * Creates a new AvroGenericRecordExample RecordBuilder.
   *
   * @return A new AvroGenericRecordExample RecordBuilder
   */
  public static Builder newBuilder() {
    return new Builder();
  }

  /**
   * Creates a new AvroGenericRecordExample RecordBuilder by copying an existing Builder.
   *
   * @param other The existing builder to copy.
   * @return A new AvroGenericRecordExample RecordBuilder
   */
  public static Builder newBuilder(Builder other) {
    if (other == null) {
      return new Builder();
    } else {
      return new Builder(other);
    }
  }

  /**
   * Creates a new AvroGenericRecordExample RecordBuilder by copying an existing AvroGenericRecordExample
   * instance.
   *
   * @param other The existing instance to copy.
   * @return A new AvroGenericRecordExample RecordBuilder
   */
  public static Builder newBuilder(AvroGenericRecordExample other) {
    if (other == null) {
      return new Builder();
    } else {
      return new Builder(other);
    }
  }

  /** RecordBuilder for AvroGenericRecordExample instances. */
  @org.apache.avro.specific.AvroGenerated
  public static class Builder
      extends org.apache.avro.specific.SpecificRecordBuilderBase<AvroGenericRecordExample>
      implements org.apache.avro.data.RecordBuilder<AvroGenericRecordExample> {

    private CharSequence language;
    private CharSequence version;
    private CharSequence host;

    /** Creates a new Builder */
    private Builder() {
      super(SCHEMA$, MODEL$);
    }

    /**
     * Creates a Builder by copying an existing Builder.
     *
     * @param other The existing Builder to copy.
     */
    private Builder(Builder other) {
      super(other);
      if (isValidValue(fields()[0], other.language)) {
        this.language = data().deepCopy(fields()[0].schema(), other.language);
        fieldSetFlags()[0] = other.fieldSetFlags()[0];
      }
      if (isValidValue(fields()[1], other.version)) {
        this.version = data().deepCopy(fields()[1].schema(), other.version);
        fieldSetFlags()[1] = other.fieldSetFlags()[1];
      }
      if (isValidValue(fields()[2], other.host)) {
        this.host = data().deepCopy(fields()[2].schema(), other.host);
        fieldSetFlags()[2] = other.fieldSetFlags()[2];
      }
    }

    /**
     * Creates a Builder by copying an existing AvroGenericRecordExample instance
     *
     * @param other The existing instance to copy.
     */
    private Builder(AvroGenericRecordExample other) {
      super(SCHEMA$, MODEL$);
      if (isValidValue(fields()[0], other.language)) {
        this.language = data().deepCopy(fields()[0].schema(), other.language);
        fieldSetFlags()[0] = true;
      }
      if (isValidValue(fields()[1], other.version)) {
        this.version = data().deepCopy(fields()[1].schema(), other.version);
        fieldSetFlags()[1] = true;
      }
      if (isValidValue(fields()[2], other.host)) {
        this.host = data().deepCopy(fields()[2].schema(), other.host);
        fieldSetFlags()[2] = true;
      }
    }

    /**
     * Gets the value of the 'language' field.
     *
     * @return The value.
     */
    public CharSequence getLanguage() {
      return language;
    }

    /**
     * Sets the value of the 'language' field.
     *
     * @param value The value of 'language'.
     * @return This builder.
     */
    public Builder setLanguage(CharSequence value) {
      validate(fields()[0], value);
      this.language = value;
      fieldSetFlags()[0] = true;
      return this;
    }

    /**
     * Checks whether the 'language' field has been set.
     *
     * @return True if the 'language' field has been set, false otherwise.
     */
    public boolean hasLanguage() {
      return fieldSetFlags()[0];
    }

    /**
     * Clears the value of the 'language' field.
     *
     * @return This builder.
     */
    public Builder clearLanguage() {
      language = null;
      fieldSetFlags()[0] = false;
      return this;
    }

    /**
     * Gets the value of the 'version' field.
     *
     * @return The value.
     */
    public CharSequence getVersion() {
      return version;
    }

    /**
     * Sets the value of the 'version' field.
     *
     * @param value The value of 'version'.
     * @return This builder.
     */
    public Builder setVersion(CharSequence value) {
      validate(fields()[1], value);
      this.version = value;
      fieldSetFlags()[1] = true;
      return this;
    }

    /**
     * Checks whether the 'version' field has been set.
     *
     * @return True if the 'version' field has been set, false otherwise.
     */
    public boolean hasVersion() {
      return fieldSetFlags()[1];
    }

    /**
     * Clears the value of the 'version' field.
     *
     * @return This builder.
     */
    public Builder clearVersion() {
      version = null;
      fieldSetFlags()[1] = false;
      return this;
    }

    /**
     * Gets the value of the 'host' field.
     *
     * @return The value.
     */
    public CharSequence getHost() {
      return host;
    }

    /**
     * Sets the value of the 'host' field.
     *
     * @param value The value of 'host'.
     * @return This builder.
     */
    public Builder setHost(CharSequence value) {
      validate(fields()[2], value);
      this.host = value;
      fieldSetFlags()[2] = true;
      return this;
    }

    /**
     * Checks whether the 'host' field has been set.
     *
     * @return True if the 'host' field has been set, false otherwise.
     */
    public boolean hasHost() {
      return fieldSetFlags()[2];
    }

    /**
     * Clears the value of the 'host' field.
     *
     * @return This builder.
     */
    public Builder clearHost() {
      host = null;
      fieldSetFlags()[2] = false;
      return this;
    }

    @Override
    @SuppressWarnings("unchecked")
    public AvroGenericRecordExample build() {
      try {
        AvroGenericRecordExample record = new AvroGenericRecordExample();
        record.language =
            fieldSetFlags()[0] ? this.language : (CharSequence) defaultValue(fields()[0]);
        record.version =
            fieldSetFlags()[1] ? this.version : (CharSequence) defaultValue(fields()[1]);
        record.host = fieldSetFlags()[2] ? this.host : (CharSequence) defaultValue(fields()[2]);
        return record;
      } catch (org.apache.avro.AvroMissingFieldException e) {
        throw e;
      } catch (Exception e) {
        throw new org.apache.avro.AvroRuntimeException(e);
      }
    }
  }

  @SuppressWarnings("unchecked")
  private static final org.apache.avro.io.DatumWriter<AvroGenericRecordExample> WRITER$ =
      (org.apache.avro.io.DatumWriter<AvroGenericRecordExample>) MODEL$.createDatumWriter(SCHEMA$);

  @Override
  public void writeExternal(java.io.ObjectOutput out) throws java.io.IOException {
    WRITER$.write(this, SpecificData.getEncoder(out));
  }

  @SuppressWarnings("unchecked")
  private static final org.apache.avro.io.DatumReader<AvroGenericRecordExample> READER$ =
      (org.apache.avro.io.DatumReader<AvroGenericRecordExample>) MODEL$.createDatumReader(SCHEMA$);

  @Override
  public void readExternal(java.io.ObjectInput in) throws java.io.IOException {
    READER$.read(this, SpecificData.getDecoder(in));
  }

  @Override
  protected boolean hasCustomCoders() {
    return true;
  }

  @Override
  public void customEncode(org.apache.avro.io.Encoder out) throws java.io.IOException {
    out.writeString(this.language);

    out.writeString(this.version);

    out.writeString(this.host);
  }

  @Override
  public void customDecode(org.apache.avro.io.ResolvingDecoder in) throws java.io.IOException {
    org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff();
    if (fieldOrder == null) {
      this.language = in.readString(this.language instanceof Utf8 ? (Utf8) this.language : null);

      this.version = in.readString(this.version instanceof Utf8 ? (Utf8) this.version : null);

      this.host = in.readString(this.host instanceof Utf8 ? (Utf8) this.host : null);

    } else {
      for (int i = 0; i < 3; i++) {
        switch (fieldOrder[i].pos()) {
          case 0:
            this.language =
                in.readString(this.language instanceof Utf8 ? (Utf8) this.language : null);
            break;

          case 1:
            this.version = in.readString(this.version instanceof Utf8 ? (Utf8) this.version : null);
            break;

          case 2:
            this.host = in.readString(this.host instanceof Utf8 ? (Utf8) this.host : null);
            break;

          default:
            throw new java.io.IOException("Corrupt ResolvingDecoder.");
        }
      }
    }
  }
}

In conclusion, producing messages to an IAM-enabled MSK cluster is a straightforward process that can be achieved using the Amazon MSK Library for IAM Authentication. By following the steps outlined in this article, you can successfully configure your application to send messages to an MSK cluster, while also ensuring that your data is secure and protected by IAM authentication.

It’s important to note that this is just one way of producing messages to an IAM-enabled MSK cluster and there are other ways that can be used as well depending on your specific use case. As always, it’s recommended to thoroughly test your application before deploying it to production to ensure that it is working as expected. With the help of the kafka client and aws iam auth library, you can easily integrate your application with an IAM-enabled MSK cluster and start sending messages securely and efficiently.