The Security Setting For AWS MSK
Updates: This article is imported from my old blog around 2020. Back then, AWS MSK’s IAM Authentication was either non-existent or hiding from me (probably the latter, knowing AWS’s documentation). Fast forward to today, and we have IAM Authentication - a much more elegant solution that I’ll cover in a future post. However, since many platforms ( including AWS’s own DMS) still haven’t caught up with the IAM party, the certificate-based authentication method described here remains relevant.
In the world of modern software architecture, asynchronous operations and message queues have become the cool kids on the block. It’s like having a conversation where you don’t have to wait for the other person to finish their coffee before you can start talking. And when you’re designing for “just a casual hundred million users” (because why aim low, right?), AWS MSK (Managed Streaming for Kafka) becomes your best friend - it’s like having a personal Kafka butler who handles all the messy cluster management for you.
While AWS offers other messaging services like SQS, SNS, and Kinesis, Kafka stands out with its universal API and extensive third-party support (because who doesn’t love not being locked into a single cloud provider?). However, there’s a catch - Kafka’s IAM security is still in its awkward teenage phase, and many platforms (yes, including AWS’s own DMS) haven’t quite figured out how to work with it. That’s why we’re diving into certificate-based authentication, the more “mature” approach to securing your Kafka cluster.
Security in MSK is like a two-factor authentication for your data: first, we encrypt the messages (because we’re not running a public broadcasting service), and second, we ensure only authorized clients can connect (no party crashers allowed). Let’s walk through setting up a secure AWS MSK cluster, step by step.
Create a Kafka Cluster
Let’s start with a basic MSK cluster. We’re using version 3.5.1 (because we like our software like we like our coffee - stable and reliable). We’re allocating 500GB of disk space, which should be plenty for most use cases. And if you need more, it’s not like we’re dealing with AWS’s most expensive service here.
resource "aws_msk_cluster" "msk_cluster" {
cluster_name = "${var.team_name}-msk-${var.environment}"
kafka_version = "3.5.1"
number_of_broker_nodes = "3"
broker_node_group_info {
instance_type = "kafka.m5.xlarge"
ebs_volume_size = "500"
az_distribution = "DEFAULT"
}
tags = local.common.tags
}
This gives us a basic MSK cluster, but to make it “production-ready” (because we’re not running a lemonade stand here), we need to add some essential configurations: logging, monitoring, and the all-important security settings.
First up, monitoring. If you’re lucky enough to have a Prometheus setup (the monitoring system that’s become as standard as using Git for version control), you’re in for a treat. If not, there’s always Grafana (though their SaaS solution might make your wallet cry a bit). Setting up monitoring for MSK is refreshingly straightforward:
resource "aws_msk_cluster" "msk_cluster" {
...
open_monitoring {
prometheus {
jmx_exporter {
enabled_in_broker = true
}
node_exporter {
enabled_in_broker = true
}
}
}
}
Next, we need to handle logs. Because in the world of distributed systems, if you’re not logging, you’re basically flying blind. We’ll store logs in an S3 bucket with a 30-day retention period. If your Kafka cluster has been down for a month and nobody noticed, the logs are probably the least of your worries.
resource "aws_s3_bucket" "log-bucket" {
force_destroy = true
bucket = "${var.company_name}-${var.team_name}-msk-log-${var.environment}"
acl = "private"
tags = local.common.tags
lifecycle_rule {
id = "${var.company_name}-${var.team_name}-msk-log-${var.environment}-rule"
enabled = true
expiration {
days = 120
}
}
}
resource "aws_msk_cluster" "msk_cluster" {
...
logging_info {
broker_logs {
s3 {
enabled = true
bucket = aws_s3_bucket.log-bucket.id
prefix = "logs/msk-"
}
}
}
}
Finally, we add some Kafka-specific configurations, like enabling or disabling topic deletion or automatic topic creation. These settings are unrelated to AWS; please refer to Kafka’s documentation for details. For convenience, we allow clients to freely add or remove topics. We just need to ensure the Kafka cluster functions properly.
resource "aws_msk_configuration" "msk_cluster_config" {
kafka_versions = ["3.5.1"]
name = "${var.team_name}-msk-config-${var.environment}"
server_properties = <<PROPERTIES
auto.create.topics.enable = true
delete.topic.enable = true
PROPERTIES
}
resource "aws_msk_cluster" "msk_cluster" {
...
configuration_info {
arn = aws_msk_configuration.msk_cluster_config.arn
revision = aws_msk_configuration.msk_cluster_config.latest_revision
}
}
Data Security - KMS-Based Encryption
Let’s start our security setup. First, data encryption. We create an encryption key in KMS and use it to encrypt data. Note that our clients need permission to access this key; otherwise, they can’t read the data. We’ll discuss client permissions in detail when we introduce the client. For now, we use an encryption key stored in KMS to tie Kafka’s data security to IAM authentication. As long as IAM is secure and our clients can access the key, the system will operate securely.
resource "aws_kms_key" "encrypt_key" {
description = "KMS key for MSK"
deletion_window_in_days = 10
key_usage = "ENCRYPT_DECRYPT"
customer_master_key_spec = "SYMMETRIC_DEFAULT"
enable_key_rotation = true
tags = local.common.tags
}
We enable automatic key rotation (key_rotation) for security, ensuring that even if a key is compromised, it won’t cause permanent damage. KMS records all historical keys, so there’s no worry about service disruption due to key updates. We use symmetric encryption (SYMMETRIC_DEFAULT) as it’s currently the only method supported by Kafka.
For management convenience, we assign an alias to this key in AWS.
resource "aws_kms_alias" "encrypt_key" {
name = "alias/${var.team_name}-msk-key-${var.environment}"
target_key_id = aws_kms_key.encrypt_key.key_id
}
Finally, we tell MSK to use this key for data encryption.
resource "aws_msk_cluster" "msk_cluster" {
...
encryption_info {
encryption_at_rest_kms_key_arn = "${aws_kms_key.encrypt_key.arn}"
}
}
Done! Encryption and decryption are fully automated, as long as permissions are properly configured. This is known as Server Side Encryption (SSE), and it’s very convenient.
Connection Security - Certificate-Based Client Authentication
The verification process for connections works like this: the client connects using the SSL protocol with a certificate recognized by the Kafka cluster, which then allows the connection. The specifics can be found in SSL documentation ( which few people have the patience to read through). Simply put, the client obtains a certificate through a Certificate Authority (CA), and the cluster verifies the certificate’s validity through the same CA. To keep it simple, every AWS account has a root CA that can be used. Alternatively, AWS PCA can be used to create a private CA. In most companies, creating a CA, a process that seems to go against regulations, isn’t something individuals can do. Therefore, in the code below, the PCA is passed in as a variable.
resource "aws_msk_cluster" "msk_cluster" {
...
client_authentication {
tls {
certificate_authority_arns = ["${var.acm_pca_arn}"]
}
}
}
Note that since we’ve set TLS, this cluster now only allows SSL connections, which greatly enhances security.
In many companies, creating certificates can be cumbersome, involving coordination with various security teams. Once obtained, the certificates must be carefully stored as they are essentially the username and password for connecting to the cluster. Managing these certificates and their associated complexities can be challenging. Here, we take a different approach by auto-generating certificates and storing them in an S3 bucket. This way, clients only need permission to download the certificate from S3 to connect to the Kafka cluster, turning a difficult certificate management issue into a more manageable IAM validation problem.
Let’s create an S3 bucket for storing certificates. For security, we encrypt objects in S3 and require all file operations to be encrypted. We use the same encryption key as for the Kafka cluster. We also manage the object lifecycle to ensure certificates do not exist permanently.
data "aws_iam_policy_document" "bucket_policy" {
statement {
effect = "Deny"
principals {
type = "AWS"
identifiers = ["*"]
}
actions = [
"s3:PutObject",
]
resources = [
"arn:aws:s3:::${var.company_name}-${var.team_name}-msk-certificate-${var.environment}/*",
]
condition {
test = "StringNotEquals"
variable = "s3:x-amz-server-side-encryption"
values = ["aws:kms"]
}
}
statement {
effect = "Deny"
principals {
type = "AWS"
identifiers = ["*"]
}
actions = [
"s3:PutObject",
]
resources = [
"arn:aws:s3:::${var.company_name}-${var.team_name}-msk-certificate-${var.environment}/*",
]
condition {
test = "Null"
variable = "s3:x-amz-server-side-encryption"
values = ["true"]
}
}
}
resource "aws_s3_bucket" "certificate-bucket" {
force_destroy = true
bucket = "${var.company_name}-${var.team_name}-msk-certificate-${var.environment}"
acl = "private"
tags = local.common.tags
server_side_encryption_configuration {
rule {
apply_server_side_encryption_by_default {
kms_master_key_id = aws_kms_key.encrypt_key.arn
sse_algorithm = "aws:kms"
}
}
}
policy = data.aws_iam_policy_document.bucket_policy.json
lifecycle_rule {
id = "${var.company_name}-${var.team_name}-msk-certificate-${var.environment}-rule"
enabled = true
abort_incomplete_multipart_upload_days = 3
expiration {
days = 732
}
}
}
Creating Certificates for Connection
Creating certificates can be a complex process in many companies, potentially involving interaction with several security teams. Once you have the certificate, it needs to be securely stored, as it’s akin to a username and password for accessing the cluster. All the complexities of password management become evident.
So, can we use IAM to manage certificate generation and updates, turning the tricky certificate issue into a more manageable IAM authentication problem? Certainly. We break down the certificate management process into these steps:
- Every time a client connects to the Kafka cluster, it checks an S3 bucket for an existing certificate and whether it’s expired (e.g., checking a tag on the certificate object). If the certificate exists and is valid, it’s downloaded and used.
- If the certificate is missing or expired, a new one is generated. This can be done by a separate service.
- We create a certificate through the AWS Certificate Manager. The validity period can be set shorter, like ten days or half a month.
- Then, the certificate is stored in S3 with a predetermined key, and the expiry date can be added to the S3 object’s tag.
- Once the certificate is generated, we return to step 1 and use the newly generated certificate.
Step 2 is the most challenging. Let’s see how it’s implemented in Kotlin. Our goal is to create a KeyPair object and store it in a file in a specific format, completing the certificate.
Entering the world of Java Cryptograph Architecture (JCA), one of the most complex parts of Java, is no easy feat. The process is as follows:
- First, create a blank KeyPair.
- Then, generate a certificate request, specifying the service name and other owner information. This request is stored in the newly created KeyPair object.
- Send the generated certificate request to AWS’s ACM-PCA service to generate a real certificate. Note, this step is typically asynchronous.
- Save the downloaded certificate in X.509 format in the KeyPair created in step 1, then store the KeyPair in a file. This is your certificate file.
- Finally, upload the generated certificate file to S3.
First, the function to create an empty KeyPair:
fun generateKeyPair(): KeyPair {
val keyGen = KeyPairGenerator.getInstance("RSA")
keyGen.initialize(1024)
return keyGen.genKeyPair()
}
Next, generate the certificate request. To simplify, we only provide one piece of information: our service name. Here, we need the service name and the previously generated KeyPair. To keep the code concise, we use the bouncycastle library.
fun generateCSR(x500DistinguishedName: String, keyPair: KeyPair): String {
val p10Builder = JcaPKCS10CertificationRequestBuilder(X500Principal(x500DistinguishedName), keyPair.public)
val signer = JcaContentSignerBuilder(signatureAlgorithm).build(keyPair.private)
val certRequest = p10Builder.build(signer)
return listOf(
"-----BEGIN CERTIFICATE REQUEST-----",
Base64.getEncoder().encodeToString(certRequest.encoded).chunked(64).joinToString("\n"),
"-----END CERTIFICATE REQUEST-----"
).joinToString("\n")
}
Then, generate the certificate by calling AWS’s PCA service. In addition to the certificate request string, configure the PCA’s ARN and the signing algorithm.
val awsAcmPca: AWSACMPCA = AWSACMPCAClient.builder()
.withRegion(region)
.build()
val pcaArn: String = "<ARN-HERE>"
val validityDays: Int = 60
val csrSigningAlgorithm = "SHA256WITHECDSA"
fun generateNewCertificate(csrStr: String): X509Certificate {
val issueCertificateRequest = IssueCertificateRequest().apply {
certificateAuthorityArn = pcaArn
csr = ByteBuffer.wrap(csrStr.toByteArray())
signingAlgorithm = csrSigningAlgorithm
validity = Validity().withType(ValidityPeriodType.DAYS).withValue(validityDays.toLong())
}
val response = awsAcmPca.issueCertificate(issueCertificateRequest)
// wait for PCA processing cert request
Thread.sleep(5000)
val getCertificateResult = awsAcmPca.getCertificate(
GetCertificateRequest()
.withCertificateArn(response.certificateArn)
.withCertificateAuthorityArn(pcaArn)
)
val cf = CertificateFactory.getInstance("X.509")
return cf.generateCertificate(getCertificateResult.certificate.byteInputStream()) as X509Certificate
}
We’ve now generated a certificate (in the most commonly used X509Certificate format). The following function saves the data to a temporary file. We use a temporary file because our ultimate goal is to store the certificate in S3, so the local storage location is irrelevant.
fun saveCertificateToFile(certificate: X509Certificate,
keystorePassword: String,
keyPair: KeyPair): File {
val keyStoreFile = File.createTempFile("keyStore-", ".jks")
val keyStore = KeyStore.getInstance(keystoreType)
keyStore.load(null, null)
keyStore.setKeyEntry(alias, keyPair.private, keystorePassword.toCharArray(), arrayOf(certificate))
keyStoreFile.outputStream().use {
keyStore.store(it, keystorePassword.toCharArray())
}
return keyStoreFile
}
Finally, we upload the certificate to S3. It’s crucial to note that this S3 bucket is vital for the security of our Kafka cluster. Therefore, it’s advisable to enable Server Side Encryption (SSE) for the bucket to ensure that data is encrypted during transit and at rest. Since the certificate files are small, a single HTTP request suffices for the upload. Here’s the Kotlin code for uploading:
val s3Client: AmazonS3 = AmazonS3ClientBuilder.standard()
.withRegion(region)
.build()
val sseKey = "<Key-ARN>"
fun putCertificate(keystore: File, serviceName: String, validityDays: Int) {
val tags = listOf(
Tag("createdDate", Instant.now().toString()),
Tag("expiryDate", Instant.now().plus(validityDays.toLong(), ChronoUnit.DAYS).toString()))
val objectKey = constructObjectKey(serviceName)
val objectMetadata = ObjectMetadata().apply {
sseAlgorithm = SSEAlgorithm.KMS.algorithm
setHeader(Headers.SERVER_SIDE_ENCRYPTION_AWS_KMS_KEYID, sseKey)
}
val request = PutObjectRequest(bucketName, objectKey, keystore)
.withMetadata(objectMetadata)
.withTagging(ObjectTagging(tags))
s3Client.putObject(request)
}
Required IAM Roles for Certificate Generation
Let’s look at the permissions needed for generating certificates:
- Access to S3 for operations such as retrieving, tagging, and deleting objects.
- Operations related to KMS (Key Management Service) for encrypting files in S3.
- ACM PCA (AWS Certificate Manager Private Certificate Authority) operations for issuing and retrieving certificates.
data "aws_iam_policy_document" "generate-cert-policy-doc" {
statement {
actions = [
"s3:GetObject",
"s3:PutObject",
"s3:ListBucket",
"s3:DeleteObject",
"s3:ListBucketMultipartUploads",
"s3:AbortMultipartUpload",
"s3:PutObjectTagging",
]
resources = [
aws_s3_bucket.certificate-bucket.arn,
"${aws_s3_bucket.certificate-bucket.arn}/*",
]
}
statement {
actions = [
"s3:GetBucketLocation",
"s3:ListAllMyBuckets",
]
resources = [
"*",
]
}
statement {
actions = [
"kms:GenerateDataKey*",
"kms:Encrypt",
"kms:Decrypt",
"kms:ReEncrypt*",
"kms:DescribeKey",
]
resources = [
aws_kms_key.encrypt_key.arn,
]
}
statement {
actions = [
"acm-pca:IssueCertificate",
"acm-pca:GetCertificate"
]
resources = [
"${var.acm_pca_arn}"
]
}
}
For client-side permissions, basic read access to S3 and relevant KMS permissions are sufficient.
data "aws_iam_policy_document" "generate-cert-policy-doc" {
statement {
actions = [
"s3:GetObject",
"s3:ListBucket",
]
resources = [
aws_s3_bucket.certificate-bucket.arn,
"${aws_s3_bucket.certificate-bucket.arn}/*",
]
}
statement {
actions = [
"s3:GetBucketLocation",
"s3:ListAllMyBuckets",
]
resources = [
"*",
]
}
statement {
actions = [
"kms:GenerateDataKey*",
"kms:Encrypt",
"kms:Decrypt",
"kms:ReEncrypt*",
"kms:DescribeKey",
]
resources = [
aws_kms_key.encrypt_key.arn,
]
}
}
Conclusion
With these configurations, you now have a relatively secure Kafka cluster. To further enhance security, focus on network-level configurations. For instance, if your Kafka cluster is currently public, consider moving it to a private subnet and configuring appropriate security groups:
resource "aws_msk_cluster" "msk_cluster" {
...
broker_node_group_info {
...
client_subnets = ["<subnet-1>", "<subnet-2>", "<subnet-3>"]
security_groups = ["<security-group-1>", "<security-group-2>"]
...
}
}