Amazon Athenaのパーティション射影と、Amazon Kinesis Data Firehoseの動的パーティショニングの使いどころ(構築編)
前回、紹介した"Amazon Athena(以下 Athena)のパーティション射影と、Amazon Kinesis Data Firehose(以下 Firehose)の動的パーティショニングについて、CDKで構築します。
使用しているライブラリのバージョンは以下になります。
@aws-cdk/aws-glue-alpha@2.50.0-alpha.0
@aws-cdk/aws-kinesisfirehose-alpha@2.50.0-alpha.0
@aws-cdk/aws-kinesisfirehose-destinations-alpha@2.50.0-alpha.0
aws-cdk-lib@2.50.0
aws-cdk@2.50.0
想定するデータとして以下の様な構成を想定します。
- user_id(int)
- status(string)
- url(string)
Kinesis Firehoseに送られてきたデータの中のuser_idを動的パーティショニングでpartitionとして設定します。 CDKのコードは以下の様になります。
import * as cdk from "aws-cdk-lib";
import * as glue from "@aws-cdk/aws-glue-alpha";
import * as cfnGlue from "aws-cdk-lib/aws-glue";
import * as iam from "aws-cdk-lib/aws-iam";
import * as firehose from "@aws-cdk/aws-kinesisfirehose-alpha";
import * as cfnFirehose from "aws-cdk-lib/aws-kinesisfirehose";
import * as destinations from "@aws-cdk/aws-kinesisfirehose-destinations-alpha";
import * as s3 from "aws-cdk-lib/aws-s3";
import { Construct } from "constructs";
export class AppStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
const databaseName = "test-database";
const tableName = "test-table";
const bucketName = "test-glue-firehose";
const destPrefix = "firehose-dest";
const streamName = "firehose-stream";
const bucket = new s3.Bucket(this, "Bucket", {
bucketName,
removalPolicy: cdk.RemovalPolicy.DESTROY,
});
const database = new glue.Database(this, "Database", {
databaseName,
});
new cfnGlue.CfnTable(this, "Table", {
catalogId: cdk.Stack.of(this).account,
databaseName: database.databaseName,
tableInput: {
name: tableName,
tableType: "EXTERNAL_TABLE",
partitionKeys: [{ name: "userid", type: "int" }],
parameters: {
"projection.enabled": true,
"projection.userid.type": "integer",
"projection.userid.range": "01,99",
"projection.userid.interval": 1,
"storage.location.template":
`s3://${bucket.bucketName}/${destPrefix}/` +
"${userid}",
},
storageDescriptor: {
columns: [
{ name: "url", type: "string" },
{ name: "status", type: "string" },
],
inputFormat:
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
outputFormat:
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
serdeInfo: {
serializationLibrary:
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
},
location: `s3://${bucket.bucketName}/${destPrefix}`,
},
},
});
const logDestination = new destinations.S3Bucket(bucket, {
dataOutputPrefix: `${destPrefix}/!{partitionKeyFromQuery:userid}/`,
errorOutputPrefix: `kinesis-error/${destPrefix}/!{firehose:error-output-type}/dt=!{timestamp:yyyy'-'MM'-'dd}/`,
});
const stream = new firehose.DeliveryStream(this, "DeliveryStream", {
destinations: [logDestination],
deliveryStreamName: streamName,
});
const firehoseRole = stream.node.findChild("Service Role") as iam.Role;
firehoseRole.addToPolicy(
new iam.PolicyStatement({
resources: ["*"],
actions: [
"glue:GetTable",
"glue:GetTableVersion",
"glue:GetTableVersions",
],
})
);
const cfnstream = stream.node
.defaultChild as cfnFirehose.CfnDeliveryStream;
// Parquet形式への変換
cfnstream.addPropertyOverride(
"ExtendedS3DestinationConfiguration.DataFormatConversionConfiguration",
{
Enabled: true,
InputFormatConfiguration: {
Deserializer: {
OpenXJsonSerDe: {
CaseInsensitive: true,
},
},
},
OutputFormatConfiguration: {
Serializer: {
ParquetSerDe: {
Compression: "SNAPPY",
},
},
},
SchemaConfiguration: {
DatabaseName: {
Ref: "MyDatabase1E2517DB",
},
TableName: tableName,
RoleArn: firehoseRole.roleArn,
},
}
);
// 動的パーティショニングの有効化
cfnstream.addPropertyOverride(
"ExtendedS3DestinationConfiguration.DynamicPartitioningConfiguration",
{
Enabled: true,
}
);
// 動的パーティションの設定
cfnstream.addPropertyOverride(
"ExtendedS3DestinationConfiguration.ProcessingConfiguration",
{
Enabled: true,
Processors: [
{
Type: "MetadataExtraction",
Parameters: [
{
ParameterName: "MetadataExtractionQuery",
ParameterValue: "{userid:.user_id}",
},
{
ParameterName: "JsonParsingEngine",
ParameterValue: "JQ-1.6",
},
],
},
{
Type: "AppendDelimiterToRecord",
Parameters: [
{
parameterName: "Delimiter",
parameterValue: "\\n",
},
],
},
],
}
);
}
}
以下、ポイントを記述します。
parameters: {
"projection.enabled": true,
"projection.userid.type": "integer",
"projection.userid.range": "01,99",
"projection.userid.interval": 1,
"storage.location.template":
`s3://${bucket.bucketName}/${destPrefix}/` +
"${userid}",
},
動的パーティショニングを使用する際に、カラム名をprojection.{カラム名}.typeといった形で定義します。 “storage.location.template"で指定する値はKinesis Firehoseで出力先に指定した場所と合わせます。
storageDescriptor: {
columns: [
{ name: "url", type: "string" },
{ name: "status", type: "string" },
],
inputFormat:
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
outputFormat:
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
serdeInfo: {
serializationLibrary:
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
},
location: `s3://${bucket.bucketName}/${destPrefix}`,
},
inputFormat、outputFormat、serdeInfoについては、今回Parquetにしてますが、保存形式に応じて変更します。 ここのlocationには、partition(今回の場合はuserid)を含めないのでご注意ください。
const logDestination = new destinations.S3Bucket(bucket, {
dataOutputPrefix: `${destPrefix}/!{partitionKeyFromQuery:userid}/`,
errorOutputPrefix: `kinesis-error/${destPrefix}/!{firehose:error-output-type}/dt=!{timestamp:yyyy'-'MM'-'dd}/`,
});
dataOutputPrefixで、!{partitionKeyFromQuery:userid}の様に指定することで、Firehoseの処理の中でデータから読み取った値を出力先のprefixに使えます。 つまり、今回の例で言えば、ユーザIDが42のユーザについてのログは、
`${destPrefix}/42/xxxxxxx.parquet`
といったような形で出力されることになります。
Processors: [
{
Type: "MetadataExtraction",
Parameters: [
{
ParameterName: "MetadataExtractionQuery",
ParameterValue: "{userid:.user_id}",
},
{
ParameterName: "JsonParsingEngine",
ParameterValue: "JQ-1.6",
},
],
},
{
Type: "AppendDelimiterToRecord",
Parameters: [
{
parameterName: "Delimiter",
parameterValue: "\\n",
},
],
},
],
MetadataExtractionのMetadataExtractionQueryに指定した
ParameterValue: "{userid:.user_id}",
がデータから値を取得しています。これによって、データの中のuser_idを動的パーティショニングとして使用することができるようになります。
上記をデプロイしたら、
aws firehose put-record --delivery-stream-name firehose-stream --record '{"Data":"{\"user_id\":42, \"url\": \"aaa\", \"status\": \"ok\"}"}'
のような形で、データを送ると、
s3://test-glue-firehose/firehose-dest/42/firehose-stream-5-2022-11-19-16-40-09-44b296fe-3709-34eb-be72-1289c859cfa1.parquet
としてデータが出力されます。