Using StsAssumeRoleCredentialsProvider with Glue Schema Registry Integration in Kafka Producer

In this technical blog, we’ll discuss how to use the StsAssumeRoleCredentialsProvider to produce data to an IAM topic with Glue schema registry integrated in a Kafka producer.

Introduction

The provided code example demonstrates the usage of the AWS SDK for Java v2 to create a Kafka producer that publishes messages to an IAM topic with Glue schema registry integration. The StsAssumeRoleCredentialsProvider is used to obtain temporary security credentials to authenticate requests made by the producer.

Prerequisites

Before diving into the code, ensure you have the following prerequisites:

  • AWS SDK for Java v2 installed.
  • Kafka producer set up with appropriate configurations.
  • IAM role ARNs for assuming roles with appropriate permissions.
  • Access to Glue schema registry.

Code Example

import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafkaSerializer;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleWithWebIdentityCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.awssdk.services.sts.model.AssumeRoleWithWebIdentityRequest;
import software.amazon.awssdk.utils.IoUtils;

import java.io.FileInputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;


public class STSProducer {

    private static final Logger log = LoggerFactory.getLogger(STSProducer.class);

    public static void main(String[] args) {


        String bootstrapServers = "ENTER_BROKER_LIST";
        HashMap<String,String> properties = new HashMap<>();

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                GlueSchemaRegistryKafkaSerializer.class.getName());
        properties.put(AWSSchemaRegistryConstants.DATA_FORMAT,"avro");
        properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME,"ENTER_THE_SCHEMA_REGISTRY_NAME");
        properties.put(AWSSchemaRegistryConstants.SCHEMA_NAME,"Counter");
        properties.put(AWSSchemaRegistryConstants.AWS_REGION, Regions.EU_WEST_2.getName());
        properties.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true");
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(
                SaslConfigs.SASL_JAAS_CONFIG, "software.amazon.msk.auth.iam.IAMLoginModule required awsDebugCreds=true;");

        properties.put(SaslConfigs.SASL_MECHANISM, "AWS_MSK_IAM");
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        properties.put(
                SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS,
                "software.amazon.msk.auth.iam.IAMClientCallbackHandler");


        // create the producer

        KafkaProducer producer=(KafkaProducer)
                (KafkaProducer<String, Object>)
                        new KafkaProducer(
                                properties,
                                new StringSerializer(),
                                new GlueSchemaRegistryKafkaSerializer(
                                        getCredentialsProvider("token"),
                                        properties));


        Counter counterEvent =
                Counter.newBuilder()
                        .setFullname("This is an example for using StsAssumeRoleCredentialsProvider")
                        .setMyrecord(String.valueOf(Math.random() * 10))
                        .setLastname("How to use sts credential provider")
                        .setTime(Instant.now().toEpochMilli())
                        .build();


        // create a producer record
        ProducerRecord<String, Object> producerRecord =
                new ProducerRecord<>("ENTER_YOUR_TOPIC_NAME_HERE", counterEvent);


        // send data - asynchronous
        producer.send(producerRecord);

        // flush data - synchronous
        producer.flush();
        // flush and close producer
        producer.close();
    }

    public static AwsCredentialsProvider getCredentialsProvider(String authType) {

            StsClient stsClient = getSTSClient(authType);
            if (authType.equals("token")) {
                return StsAssumeRoleWithWebIdentityCredentialsProvider.builder()
                        .stsClient(stsClient)
                        .prefetchTime(Duration.ofMinutes(15))
                        .asyncCredentialUpdateEnabled(true)
                        .refreshRequest(
                                () -> {
                                    String tokenContent = null;
                                    try {
                                        FileInputStream fis = new FileInputStream("ENTER_TOKEN_FILE_PATH");
                                        tokenContent = IoUtils.toUtf8String(fis);
                                    } catch (Exception ex) {
                                        log.error(
                                                "Some error occurred while reading the token file: " + ex.getMessage());
                                    }
                                    return AssumeRoleWithWebIdentityRequest.builder()
                                            .webIdentityToken(tokenContent)
                                            .roleArn("ENTER_ROLE_ARN")
                                            .roleSessionName("test-app")
                                            .durationSeconds(3600)
                                            .build();
                                })
                        .build();
            }


                return StsAssumeRoleCredentialsProvider.builder()
                        .stsClient(stsClient)
                        .prefetchTime(Duration.ofMinutes(15))
                        .asyncCredentialUpdateEnabled(true)
                        .refreshRequest(
                                () ->
                                        AssumeRoleRequest.builder()
                                                .roleArn("ENTER_ROLE_ARN")
                                                .roleSessionName("test-app")
                                                .durationSeconds(3600)
                                                .build())
                        .build();


    }

    public static StsClient getSTSClient(String authtype) {

       if(authtype.equals("token"))
       {
            return StsClient.builder()
                    .credentialsProvider(
                            () ->
                                    AwsBasicCredentials.create(
                                            "sts_client_credential_provider_key",
                                            "sts_client_credential_provider_access"))
                    .httpClient(UrlConnectionHttpClient.create())
                    .build();
        }  else if (authtype.equals("profile")) {
           ProfileCredentialsProvider profileCredentialsProvider =
                   new ProfileCredentialsProvider();

           return StsClient.builder()
                   .region(Region.EU_WEST_2)
                   .credentialsProvider(
                           () ->
                                   AwsBasicCredentials.create(
                                           profileCredentialsProvider.getCredentials().getAWSAccessKeyId(),
                                           profileCredentialsProvider.getCredentials().getAWSSecretKey()))
                   .httpClient(UrlConnectionHttpClient.create())
                   .build();

       } else {
            return StsClient.builder().httpClient(UrlConnectionHttpClient.create()).build();
        }
    }
}

The provided code example is designed to handle two types of authentication mechanisms: token-based authentication and profile-based authentication.

Token-based Authentication: This authentication method involves using a JWT token to authenticate into the AWS MSK (Managed Streaming for Apache Kafka) cluster. The code obtains the JWT token from a file and uses it to authenticate requests made by the Kafka producer to the AWS MSK cluster. This authentication method is suitable for scenarios where temporary credentials need to be obtained dynamically, such as in serverless environments or when integrating with third-party systems.

Profile-based Authentication: In this method, the Kafka producer authenticates using AWS profile credentials stored locally on the system. The code utilizes the AWS SDK’s ProfileCredentialsProvider to obtain the AWS credentials associated with a named profile. These credentials are then used to authenticate requests made by the Kafka producer. Profile-based authentication is useful in development and testing environments where AWS credentials are managed locally using the AWS CLI or SDK.

Conclusion

By following the steps outlined in this technical blog, you can effectively use the StsAssumeRoleCredentialsProvider to produce data to an IAM topic with Glue schema registry integration in a Kafka producer. This ensures secure and authenticated communication while leveraging AWS services for schema management.