Build a Data Lake with AWS Services (Part 4)

DevOps

Note: This blog post was reviewed using AI for factual correctness and clarity. All content was tested in my private homelab to ensure accuracy.

After the hard work of creating an EMR cluster, there are two main issues to address. First, our current EMR lacks the permission to access S3, meaning it can only perform basic tasks like WordCount. Second, due to traditional cluster practices, every time we submit a new task, we have to remotely log in to an EMR cluster machine, copy the required Jar package, and run it using a command like spark-submit. This whole process is manual and lacks the efficiency of automation. Let’s tackle these problems one by one.

Enabling EMR Access to S3

To allow EMR access to S3, we need to assign an IAM role with appropriate permissions to the EMR machines. Let’s take a look at what permissions are needed, including those for writing logs.

data "aws_iam_policy_document" "emr_instance_assume_role" {
  statement {
    principals {
      type = "Service"
      identifiers = ["ec2.amazonaws.com"]
    }
    actions = ["sts:AssumeRole"]
  }
}

resource "aws_iam_role" "emr_instance_role" {
  name               = "${var.team_name}-emr-instance-role-${var.environment}"
  assume_role_policy = data.aws_iam_policy_document.emr_instance_assume_role.json
  tags               = local.common.tags
  path               = "/${var.team_name}/"
}

data "aws_iam_policy_document" "data_access_policy" {
  statement {
    actions = [
      "s3:AbortMultipartUpload",
      "s3:CreateBucket",
      "s3:DeleteObject",
      "s3:GetBucketVersioning",
      "s3:GetObject",
      "s3:GetObjectTagging",
      "s3:GetObjectVersion",
      "s3:ListBucket",
      "s3:ListBucketMultipartUploads",
      "s3:ListBucketVersions",
      "s3:ListMultipartUploadParts",
      "s3:ListObjects",
      "s3:ListObjectsV2",
      "s3:PutBucketVersioning",
      "s3:PutObject",
      "s3:PutObjectTagging",
    ]
    resources = [
      aws_s3_bucket.data.arn,
      aws_s3_bucket.emr_log.arn,
      "${aws_s3_bucket.data.arn}/*",
      "${aws_s3_bucket.emr_log.arn}/*"
    ]
  }
}

resource "aws_iam_policy" "data_access_policy" {
  name        = "${var.team_name}-emr-instance-policy-${var.environment}"
  path        = "/${var.team_name}/"
  description = "s3 access policy for EMR"
  policy      = data.aws_iam_policy_document.data_access_policy.json
}

resource "aws_iam_role_policy_attachment" "emr_service_role_policy_attachment" {
  role       = aws_iam_role.emr_service_role.name
  policy_arn = aws.aws_iam_policy.data_access_policy.arn
}

We create a blank role, define the policy, and attach the policy to the role in one go. Finally, we apply this role to EMR.


resource "aws_iam_instance_profile" "emr_instance_profile" {
  name = "${var.team_name}-emr-instance-profile-${var.environment}"
  role = aws.aws_iam_policy.data_access_policy.name
}

resource "aws_emr_cluster" "etl_cluster" {
  ...
ec2_attributes {
  instance_profile = aws.aws_iam_instance_profile.emr_instance_profile.arn
  ...
}
}

Now our EMR cluster can access S3. In Spark tasks, we can read and write data to S3 using code like this:

val dataset = spark.write.parquet("s3://<bucket-name>/path")
// use dataset as normal

With IAM roles, we don’t have to worry about S3 keys in Spark’s configuration, as all access permissions are managed by IAM.

However, using S3 as a simple filesystem is often insufficient. After getting used to Hive databases and convenient SQL queries, reverting to file access feels like a step backward. Thus, we need AWS Glue’s help to provide an interface similar to Hive for our S3 data.

data "aws_caller_identity" "current" {
}

data "aws_iam_policy_document" "data_access_policy" {
  ...
statement {
  actions = [
    "glue:GetDatabase",
    "glue:GetDatabases",
    "glue:CreateDatabase",
    "glue:GetUserDefinedFunctions",
    "glue:CreateTable",
    "glue:UpdateTable",
    "glue:DeleteTable",
    "glue:GetTable",
    "glue:GetTables",
    "glue:GetConnection",
    "glue:GetTableVersions",
    "glue:CreatePartition",
    "glue:BatchCreatePartition",
    "glue:UpdatePartition",
    "glue:DeletePartition",
    "glue:BatchDeletePartition",
    "glue:GetPartition",
    "glue:GetPartitions",
    "glue:BatchGetPartition",
    "glue:StartCrawler",
    "glue:GetCrawlers",
  ]
  resources = [
    "arn:aws:glue:us-east-1:${data.aws_caller_identity.current.account_id}:catalog",
    "arn:aws:glue:us-east-1:${data.aws_caller_identity.current.account_id}:database/default",
    "arn:aws:glue:us-east-1:${data.aws_caller_identity.current.account_id}:database/global_temp",
    "arn:aws:glue:us-east-1:${data.aws_caller_identity.current.account_id}:database/${var.team_name}-*",
    "arn:aws:glue:us-east-1:${data.aws_caller_identity.current.account_id}:table/*"
  ]
}
}

Then, we configure EMR so that Glue can serve as a Hive interface. We also add some common performance optimizations. The most important part is the spark-hive-site section, where we use AWS’s tool to make Glue resemble Hive.

data "template_file" "emr_config" {
  template = <<EOF
[
  {
      "Classification":"spark",
      "Properties":{
        "maximizeResourceAllocation": "true"
      }
  },
  {
      "Classification":"spark-defaults",
      "Properties":{
        "spark.network.timeout": "800s",
        "spark.executor.heartbeatInterval": "60s",
        "spark.sql.catalogImplementation": "hive",
        "spark.sql.sources.partitionOverwriteMode": "dynamic",
        "spark.dynamicAllocation.enabled": "true",
        "spark.dynamicAllocation.initialExecutors": "20",
        "spark.dynamicAllocation.maxExecutors": "80",
        "spark.dynamicAllocation.executorAllocationRatio": "1",
        "spark.yarn.scheduler.reporterThread.maxFailures": "5",
        "spark.shuffle.compress": "true",
        "spark.shuffle.spill.compress": "true",
        "spark.default.parallelism": "4000"
      }
  },
  {
    "Classification": "mapred-site",
    "Properties": {
      "mapreduce.map.output.compress": "true"
    }
  },
  {
      "Classification":"spark-hive-site",
      "Properties":{
        "hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
        "hive.metastore.connect.retries": "15",
        "hive.blobstore.optimizations.enabled": "false",
        "hive.exec.dynamic.partition": "true",
        "hive.exec.dynamic.partition.mode": "nonstrict",
        "hive.mv.files.thread": "20"
      }
  },
  {
      "Classification":"yarn-site",
      "Properties":{
        "yarn.nodemanager.vmem-check-enabled":"false",
        "yarn.nodemanager.pmem-check-enabled":"false"
      }
  }
]
EOF
}

resource "aws_emr_cluster" "etl_cluster" {
  ...
configurations = data.template_file.emr_config.rendered
}

Now our EMR can use Hive to read and write to S3 efficiently. Here’s an example of reading data:

val dataFrame = spark.sql(
  s"""
    | select * from test_table where date='2021-01-29' and data_type=456
  """.stripMargin
)
// use dataFrame as normal

How do we ensure this reads from S3? Where did the test_table come from? Let’s revisit the creation process of test_table.

val tableName = "test_table"
spark.sql(
  s"""
    | CREATE TABLE IF NOT EXIST $tableName
    |   (id BIGINT, name VARCHAR(100), data_type INT)
    | PARTITIONED BY (date VARCHAR(15))
    | STORED AS ORC
    | LOCATION 's3://<bucket-name>/hive/$tableName'
  """
)

Submitting Tasks Remotely to EMR via Steps

Here, our focus is on the inconvenient and insecure practice of SSH login to EMR for job submission. Instead, we look towards AWS’s more abstract concept: Steps.

val emr = AmazonElasticMapReduceClientBuilder.standard()
                .withRegion(Regions.US_EAST_1)
                .build()
val clusterName = "ETL-emr-cluster"

fun getClusterId(): String {
  val listClusterRequest = ListClustersRequest().withClusterStates("RUNNING", "WAITING")
  val response = emr.listClusters(listClusterRequest)
  val cluster = response.clusters.first { it.name == emrClusterName }
  return cluster.id
}

fun runJob(jobName: String, jarPathOnS3: String, className: String, additionalArgs: List<String>): String {
  val args = arrayOf(
          "spark-submit",
          "--class", className
      ) + additionalArgs + arrayOf(jarPathOnS3)
  val step = HadoopJarStepConfig()
      .withJar("command-runner.jar")
      .withArgs(*args)
  val stepConfig = StepConfig(jobName, step)
      .withActionOnFailure(ActionOnFailure.CONTINUE)
  val result = emr.addJobFlowSteps(
                AddJobFlowStepsRequest(getClusterId())
                        .withSteps(stepConfig)
  )
  return result.stepIds.first()
}

fun waitForStepFinish(stepId: String) {
  val describeStepRequest = DescribeStepRequest()
                .withStepId(stepId)
                .withClusterId(getClusterId())
        emr.waiters().stepComplete().run(WaiterParameters(describeStepRequest))
}

What permissions do we need to run these functions?

data "aws_iam_policy_document" "step_manager_policy" {
  statement {
    actions = [
      "elasticmapreduce:AddJobFlowSteps",
      "elasticmapreduce:CancelSteps",
      "elasticmapreduce:DescribeCluster",
      "elasticmapreduce:DescribeStep",
      "elasticmapreduce:ListSteps",
      "elasticmapreduce:ListSteps",
      "elasticmapreduce:ListInstanceGroups",
      "elasticmapreduce:ModifyInstanceGroups",
      "elasticmapreduce:ModifyCluster",
    ]
    resources = ["*"]
    condition {
      test     = "StringEquals"
      variable = "elasticmapreduce:ResourceTag/useTag"
      values = ["etl"]
    }
  }
  statement {
    actions = [
      "elasticmapreduce:ListClusters",
    ]
    resources = ["*"]
  }
}