Amazon Athenaのパーティション射影と、Amazon Kinesis Data Firehoseの動的パーティショニングの使いどころ(構築編)

前回、紹介した"Amazon Athena(以下 Athena)のパーティション射影と、Amazon Kinesis Data Firehose(以下 Firehose)の動的パーティショニングについて、CDKで構築します。

使用しているライブラリのバージョンは以下になります。

@aws-cdk/aws-glue-alpha@2.50.0-alpha.0
@aws-cdk/aws-kinesisfirehose-alpha@2.50.0-alpha.0
@aws-cdk/aws-kinesisfirehose-destinations-alpha@2.50.0-alpha.0
aws-cdk-lib@2.50.0
aws-cdk@2.50.0

想定するデータとして以下の様な構成を想定します。

  • user_id(int)
  • status(string)
  • url(string)

Kinesis Firehoseに送られてきたデータの中のuser_idを動的パーティショニングでpartitionとして設定します。 CDKのコードは以下の様になります。

import * as cdk from "aws-cdk-lib";
import * as glue from "@aws-cdk/aws-glue-alpha";
import * as cfnGlue from "aws-cdk-lib/aws-glue";
import * as iam from "aws-cdk-lib/aws-iam";
import * as firehose from "@aws-cdk/aws-kinesisfirehose-alpha";
import * as cfnFirehose from "aws-cdk-lib/aws-kinesisfirehose";
import * as destinations from "@aws-cdk/aws-kinesisfirehose-destinations-alpha";
import * as s3 from "aws-cdk-lib/aws-s3";
import { Construct } from "constructs";

export class AppStack extends cdk.Stack {
    constructor(scope: Construct, id: string, props?: cdk.StackProps) {
        super(scope, id, props);

        const databaseName = "test-database";
        const tableName = "test-table";
        const bucketName = "test-glue-firehose";
        const destPrefix = "firehose-dest";
        const streamName = "firehose-stream";

        const bucket = new s3.Bucket(this, "Bucket", {
            bucketName,
            removalPolicy: cdk.RemovalPolicy.DESTROY,
        });

        const database = new glue.Database(this, "Database", {
            databaseName,
        });

        new cfnGlue.CfnTable(this, "Table", {
            catalogId: cdk.Stack.of(this).account,
            databaseName: database.databaseName,
            tableInput: {
                name: tableName,
                tableType: "EXTERNAL_TABLE",
                partitionKeys: [{ name: "userid", type: "int" }],
                parameters: {
                    "projection.enabled": true,
                    "projection.userid.type": "integer",
                    "projection.userid.range": "01,99",
                    "projection.userid.interval": 1,
                    "storage.location.template":
                        `s3://${bucket.bucketName}/${destPrefix}/` +
                        "${userid}",
                },
                storageDescriptor: {
                    columns: [
                        { name: "url", type: "string" },
                        { name: "status", type: "string" },
                    ],
                    inputFormat:
                        "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
                    outputFormat:
                        "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
                    serdeInfo: {
                        serializationLibrary:
                            "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
                    },
                    location: `s3://${bucket.bucketName}/${destPrefix}`,
                },
            },
        });
        const logDestination = new destinations.S3Bucket(bucket, {
            dataOutputPrefix: `${destPrefix}/!{partitionKeyFromQuery:userid}/`,
            errorOutputPrefix: `kinesis-error/${destPrefix}/!{firehose:error-output-type}/dt=!{timestamp:yyyy'-'MM'-'dd}/`,
        });

        const stream = new firehose.DeliveryStream(this, "DeliveryStream", {
            destinations: [logDestination],
            deliveryStreamName: streamName,
        });

        const firehoseRole = stream.node.findChild("Service Role") as iam.Role;
        firehoseRole.addToPolicy(
            new iam.PolicyStatement({
                resources: ["*"],
                actions: [
                    "glue:GetTable",
                    "glue:GetTableVersion",
                    "glue:GetTableVersions",
                ],
            })
        );

        const cfnstream = stream.node
            .defaultChild as cfnFirehose.CfnDeliveryStream;

        // Parquet形式への変換
        cfnstream.addPropertyOverride(
            "ExtendedS3DestinationConfiguration.DataFormatConversionConfiguration",
            {
                Enabled: true,
                InputFormatConfiguration: {
                    Deserializer: {
                        OpenXJsonSerDe: {
                            CaseInsensitive: true,
                        },
                    },
                },
                OutputFormatConfiguration: {
                    Serializer: {
                        ParquetSerDe: {
                            Compression: "SNAPPY",
                        },
                    },
                },
                SchemaConfiguration: {
                    DatabaseName: {
                        Ref: "MyDatabase1E2517DB",
                    },
                    TableName: tableName,
                    RoleArn: firehoseRole.roleArn,
                },
            }
        );

        // 動的パーティショニングの有効化
        cfnstream.addPropertyOverride(
            "ExtendedS3DestinationConfiguration.DynamicPartitioningConfiguration",
            {
                Enabled: true,
            }
        );

        // 動的パーティションの設定
        cfnstream.addPropertyOverride(
            "ExtendedS3DestinationConfiguration.ProcessingConfiguration",
            {
                Enabled: true,
                Processors: [
                    {
                        Type: "MetadataExtraction",
                        Parameters: [
                            {
                                ParameterName: "MetadataExtractionQuery",
                                ParameterValue: "{userid:.user_id}",
                            },
                            {
                                ParameterName: "JsonParsingEngine",
                                ParameterValue: "JQ-1.6",
                            },
                        ],
                    },
                    {
                        Type: "AppendDelimiterToRecord",
                        Parameters: [
                            {
                                parameterName: "Delimiter",
                                parameterValue: "\\n",
                            },
                        ],
                    },
                ],
            }
        );
    }
}

以下、ポイントを記述します。

                parameters: {
                    "projection.enabled": true,
                    "projection.userid.type": "integer",
                    "projection.userid.range": "01,99",
                    "projection.userid.interval": 1,
                    "storage.location.template":
                        `s3://${bucket.bucketName}/${destPrefix}/` +
                        "${userid}",
                },

動的パーティショニングを使用する際に、カラム名をprojection.{カラム名}.typeといった形で定義します。 “storage.location.template"で指定する値はKinesis Firehoseで出力先に指定した場所と合わせます。

                storageDescriptor: {
                    columns: [
                        { name: "url", type: "string" },
                        { name: "status", type: "string" },
                    ],
                    inputFormat:
                        "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
                    outputFormat:
                        "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
                    serdeInfo: {
                        serializationLibrary:
                            "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
                    },
                    location: `s3://${bucket.bucketName}/${destPrefix}`,
                },

inputFormat、outputFormat、serdeInfoについては、今回Parquetにしてますが、保存形式に応じて変更します。 ここのlocationには、partition(今回の場合はuserid)を含めないのでご注意ください。

        const logDestination = new destinations.S3Bucket(bucket, {
            dataOutputPrefix: `${destPrefix}/!{partitionKeyFromQuery:userid}/`,
            errorOutputPrefix: `kinesis-error/${destPrefix}/!{firehose:error-output-type}/dt=!{timestamp:yyyy'-'MM'-'dd}/`,
        });

dataOutputPrefixで、!{partitionKeyFromQuery:userid}の様に指定することで、Firehoseの処理の中でデータから読み取った値を出力先のprefixに使えます。 つまり、今回の例で言えば、ユーザIDが42のユーザについてのログは、

`${destPrefix}/42/xxxxxxx.parquet`

といったような形で出力されることになります。

                Processors: [
                    {
                        Type: "MetadataExtraction",
                        Parameters: [
                            {
                                ParameterName: "MetadataExtractionQuery",
                                ParameterValue: "{userid:.user_id}",
                            },
                            {
                                ParameterName: "JsonParsingEngine",
                                ParameterValue: "JQ-1.6",
                            },
                        ],
                    },
                    {
                        Type: "AppendDelimiterToRecord",
                        Parameters: [
                            {
                                parameterName: "Delimiter",
                                parameterValue: "\\n",
                            },
                        ],
                    },
                ],

MetadataExtractionのMetadataExtractionQueryに指定した

ParameterValue: "{userid:.user_id}",

がデータから値を取得しています。これによって、データの中のuser_idを動的パーティショニングとして使用することができるようになります。

上記をデプロイしたら、

aws firehose put-record --delivery-stream-name firehose-stream --record '{"Data":"{\"user_id\":42, \"url\": \"aaa\", \"status\": \"ok\"}"}'

のような形で、データを送ると、

s3://test-glue-firehose/firehose-dest/42/firehose-stream-5-2022-11-19-16-40-09-44b296fe-3709-34eb-be72-1289c859cfa1.parquet

としてデータが出力されます。