Amazon SageMaker Processingで前処理を行う

目次

機械学習にとって、前処理は学習処理と並んで重要な要素です。現実世界で発生するデータは機械学習でそのまま使えるデータということはほとんどありません。なので、前処理をして学習に使用できるようにする必要があります。今回はAWSが提供している、Amazon SageMaker Processing(以下Processing)を使ってデータの前処理をする方法について書きます。

Amazon SageMaker Processingについて

Amazon SageMakerはAWSが提供する機械学習のプラットフォームです。Processingはその中でデータの前処理を担うコンポーネントです。

前処理にProcessingを使うことのメリット

前処理は端的に言ってしまえばデータを一定の形式に加工する処理です。なので、コードを実行できる環境さえあれば目的は達成できます。例えばAWS Lambdaでイベント処理したり、AWS Batchで一括処理しても、同様の結果を得ることはできるでしょう。

では、Processingを使うメリットは何でしょうか。Processingを使うことで得られるメリットとして、以下のようなものを挙げることができると思います。

  • SageMakerのパイプラインの中でデータリネージ(データ追跡)を管理できる
  • 様々な種類の処理を一貫した方式で管理・処理できる

SageMakerのパイプラインの中でデータリネージ(データ追跡)を管理できる

SageMakerというプラットフォームは、前処理、学習、評価、運用、監視という機械学習のあらゆるフェイズをカバーしています。その中でProcessingによって前処理を行うことで、データリネージ(今運用しているモデルの学習データはどれなのか)を行うことが容易になります。

様々な種類の処理を一貫した方式で管理・処理できる

Processingはデータ処理を行うサーバの起動や停止を管理してくれます。利用者は台数やスペックを指定するだけです。

また、データ処理のフレームワークに応じたProcessorが用意されています。

例えば特徴量のスケーリングや、訓練データ、テストデータ、検証データなどへの振り分けはscikit-learnを使うことが多いかと思いますが、そのためのSKLearnProcessorなども用意されています。利用者はコードを用意するだけです。

あるいは、Sparkを使って複数のサーバを用いた分散処理をしたい場合は、SparkProcessorを使用することができます。

独自の処理をしたい場合にはScriptProcessorを利用することで、フレームワークに依存しない処理を実施することもできます。

いずれにしても、利用者は一貫した方式で管理・実行することができます。

Processingでテキストデータを前処理する

Processingを使ってデータを処理する例として、以下のようなシナリオを想定してみます。

  • OCRによって取得した英語のテキストデータがある
  • パラグラフ(段落)単位になっていないので、1行1パラグラフのデータに整形する

これをProcessingで実行します。

OCRのデータは以下のような形式の想定です。

This truth would seem to hold in most branches of human effort. Scott's gallant
but unavailing attempt to reach the South Pole lives in the world's memory,
while the successful ventures of Amundsen and Peary are fading. In sport, Do-
rando's Marathon is an enduring memory; but who among the general public
could recall the name of Hayes, the actual victor, or, indeed, that of any subse-
quent Marathon winner.
For this irrational, this sentimental verdict, it is fashionable to fix the blame
on modern journalism, yet the barest survey of history shows that its origins lie
far back in the mists of time. On the historian, in fact—who of all men should
(省略)
almost contemporary with events, the friend of Gaius Lælius, Scipio's constant
subordinate, from whom he could get first-hand evidence and judgments. He
had the family archives of the Scipios at his disposal for research, and he had
been over the actual battlefields while many of the combatants were still alive.
Thus he gained an almost unique base upon which to form his estimate.

文章は、リデルハート著「Scipio Africanus: Greater Than Napoleon」より。

各行はパラグラフの改行とは無関係な位置で改行(あるいは単語の途中で改行)されているため、これを1行ごとに1パラグラフにまとめます。

その際ルールを以下とします。

  • 改行コードの前の文字が"."(ピリオド)で終わっている場合は改行として扱う
    • 厳密には、パラグラフの最後ではない".“があるケースが考えられるが、今回は無視する
  • 行末の”-"(ダッシュ)は単語の続きとして削除して次の行の先頭の単語と繋げる
  • それ以外は次の行をスペース区切りで繋げる

dockerイメージの作成

公式にあるように、Processingを実行するためのコンテナイメージを作成します。必要な依存関係は処理内容に必要なもののみです。

後で見るように、このコンテナはS3からデータを読みこみ、処理結果をS3に格納しますが、ProcessingがS3のデータをコンテナの指定したパスに配置してくれ、処理後に指定したパスのデータをS3にアップロードしてくれるので、botoなどのS3にアクセスするためのライブラリなどは必要ありません。

FROM python:3.10-slim-buster

ENV PYTHONUNBUFFERED=TRUE
ENTRYPOINT ["python3"]

Dockerfileは上記です。 今回は、標準ライブラリを使ってテキストファイルを整形するだけなので、環境変数とENTRYPOINTしか設定していません。

これをECRにアップロードします。

docker build -t script-processor .
docker tag script-processor:xxxx ACCOUNT_ID.dkr.ecr.ap-northeast-1.amazonaws.com/script-processor:xxxx
docker push ACCOUNT_ID.dkr.ecr.ap-northeast-1.amazonaws.com/script-processor:xxxx

今回はscript-processorといったリポジトリ名にしています。ACCOUNT_IDやタグは適宜ご変更ください。

処理スクリプトの作成

Processingで、データを処理するスクリプトを作ります。

from io import TextIOWrapper
import logging
import pathlib
import re

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


class Processor:
    def __init__(self, output_file: TextIOWrapper):
        self.paragraph = ""
        self.output_file = output_file

    def process(self, line: str):
        # 空行は読み飛ばす
        if line == "\n":
            return

        # 最後がピリオドで終わっている場合、ファイルに追記
        if line[-2] == "." or line[-3:-1] == '."':
            self.paragraph += line
            self.write_paragraph_to_file(self.paragraph)
        else:
            if line[-2] == "-":
                # 改行コードと文末のダッシュは除いてparagraphに追加
                self.paragraph += re.sub("-\n", "", line)
            else:
                # 改行コードを除外して空白をparagraphに追加、
                self.paragraph += re.sub("\n", " ", line)

    def write_paragraph_to_file(self, paragraph: str):
        self.output_file.write(paragraph)
        self.paragraph = ""


if __name__ == "__main__":
    logger.info("Starting processing.")
    base_dir = "/opt/ml/processing"
    input_dir = f"{base_dir}/input"
    output_dir = f"{base_dir}/output"
    pathlib.Path(f"{base_dir}/output").mkdir(parents=True, exist_ok=True)

    with open(f"{input_dir}/data.txt", "r") as input_f:
        with open(f"{output_dir}/data.txt", "w") as output_f:
            conv = Processor(output_f)
            while True:
                line = input_f.readline()
                if not line:
                    break
                conv.process(line)

    logger.info("Finished processing.")

ポイントは、

base_dir = "/opt/ml/processing"
input_dir = f"{base_dir}/input"
output_dir = f"{base_dir}/output"

です。これはこの次に見る、Processingの起動処理の際に、データの配置場所として指定したパスと対応します。

ProcessingはS3の指定の場所からinputの場所にデータをおいてくれ、処理が終わった時点でのoutputにおいてあるデータをS3の指定の場所へとアップロードしてくれます。

あとは、単なるPythonスクリプトです。

Processingの起動

Processingの起動にはsagemakerのpython SDKを使います。 以下のようにインストールします。

pip install sagemaker

ここまでで、Processingを起動する準備が整いました。 下記がProcessorを起動するスクリプトです。

from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput

image_url = (
    "ACCOUNT_ID.dkr.ecr.ap-northeast-1.amazonaws.com/script-processor:xxxx"
)

script_processor = ScriptProcessor(
    command=["python3"],
    image_uri=image_url,
    role="arn:aws:iam::ACCOUNT_ID:role/SageMakerExecutionRole",
    instance_count=1,
    instance_type="ml.t3.medium",
)

script_processor.run(
    code="processing.py",
    inputs=[
        ProcessingInput(
            source="s3://DATA-BUCKET-NAME/input",
            destination="/opt/ml/processing/input",
        )
    ],
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/output",
            destination="s3://DATA-BUCKET-NAME/output",
        )
    ],
)

ポイントは以下の通りです。

  • ScriptProcessorの引数のroleに与えるIAM Roleに、データの読み込み、書き込みをするS3バケットへのアクセス権を付与する
  • ScriptProcessorのrunに与える引数として、先ほど作成した処理スクリプトのファイルを指定
    • sagemakerのライブラリがローカルにあるファイルをアップロードしてくれるので、ローカルのパスを指定する
  • inputsに与えるProcessingInputを作成する際に、sourceにS3のデータのprefixを指定
  • inputsに与えるProcessingInputを作成する際に、destinationにデータを配置するコンテナのディレクトリを指定
  • outputsに与えるProcessingOutputを作成する際に、sourceに処理結果のデータを配置するコンテナ内のディレクトリを指定
  • outputsに与えるProcessingOutputを作成する際に、destinationにデータをアップロードするS3のprefixを指定

上記のスクリプトを実行して、しばらくすると、ProcessingのJobが完了します。

指定したS3バケットを見てみると、

This truth would seem to hold in (省略) winner.
For this irrational, (省略) were still alive.
Thus he gained an almost unique base upon which to form his estimate.

といった形で、データ変換がなされたファイルを確認できます。