シリーズ・すこしずつがんばる streaming data 処理 (3) カスタム処理を書く
シリーズ・すこしずつがんばる streaming data 処理 (前回、前々回) の三回目です。
Streaming で逐次処理をやってみる
前回の記事では、固定サイズのデータを一括処理するバッチ処理を扱いました。が、Apache Beam で実現できる streaming data の逐次処理は、見逃すことができない強力な機能です。batch ではあらかじめサイズのわかっている (有限の) データを一括で扱いますが、streaming ではサイズがわからない (無限の) データを逐次に処理します。今回はこれを試してみましょう。
事前準備
今回は Pub/Sub からデータを読むので、そのための Google account と IAM 権限が必要です。
以下のようにして、Pub/Sub topic をひとつ作成しておきます。
gcloud pubsub topics create test-1
サンプル
前回はサンプルコードを download し、その中身を変更して動作させる方法を使いました。これは多くの場合いちばん手っ取り早いのですが、問題もあります。
- 余分なものが多く含まれていて、単純にサイズが大きい
- 必要になる software の version や framework など、実行するための環境が限られる
- ある程度の機能を網羅する目的を達成するために、処理が複雑になってしまっていて、知りたいことに集中できない
また GCS などへの出力は実際の用途には合うのですが、手っ取り早く動作を経験したい場合などには、手元からの距離が遠いために理解に時間がかかりがちです。 今回は極力必要な部分まで削ぎ落とした例を目指し、ローカルで実行して stdout に情報を出力するのみのサンプルとしました。Google Cloud 上の Dataflow などの環境で動作させる場合は逆にもうひと手間必要ですので、その点ご注意ください。
簡易にとは言いつつ Gradle は使います。build.gradle
は、ここ のものを download し、使用しない以下の行を削除しておきます。
implementation "org.apache.beam:beam-examples-java:${beamVersion}"
そして、以下が Java による実装です。
import java.io.IOException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.*;
import org.apache.beam.sdk.transforms.windowing.*;
import org.joda.time.Duration;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.*;
public class PubSubStream
{
static class PrintFn extends DoFn<KV<String, Long>, KV<String, Long>> {
@ProcessElement
public void processElement(@Element KV<String, Long> element, OutputReceiver<KV<String, Long>> receiver) {
System.out.println("(X) " + element.getKey() + ", " + element.getValue());
receiver.output(element);
}
}
public static class CountTransform
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> messages) {
PCollection<KV<String, Long>> wordCounts = messages
.apply(Count.perElement())
.apply(ParDo.of(new PrintFn()));
return wordCounts;
}
}
public static void main(String[] args) throws IOException {
StreamingOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation().as(StreamingOptions.class);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
final String TOPICNAME = "projects/<project-id>/topics/test-1";
pipeline
.apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(TOPICNAME))
.apply(Window.<String>into(
FixedWindows.of(Duration.standardSeconds(10)))
.withAllowedLateness(Duration.ZERO)
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes()
)
.apply(new CountTransform());
pipeline.run().waitUntilFinish();
}
}
<project-id>
を一箇所ハードコードしていますので変更してください。
また、処理内容はなるべく簡易に動作が確認しやすいように書いたつもりですが未熟者の書いたものです。pipeline.apply
で使用している FixedWindows.of
withAllowedLateness
等々の詳細については Apache Beam の document (こちら等…) をご参照ください 🙏
上記 2 ファイルを以下のような構成で置いておけばもう準備完了です。
workspace
├─ src/main/java
│ └─ PubSubStream.java
└─ build.gradle
実行は単純です。コピーして使った build.gradle
では、 -Dexec.args
で option 指定ができるようになっているので、project ID のみ指定しています。Pub/Sub topic はソースで調整する必要があったりでちぐはぐですがご容赦ください…
gradle execute -Dexec.args="--project=<project-id>"
起動が成功すると、指定した Pub/Sub topic に subscribe して待ち状態になります。別の terminal から Pub/Sub topic へ message を送信してみましょう。
gcloud pubsub topics publish test-1 --message="Hello World!"
pipeline がデータを受け取り、window しながら stdout にそれを出力します。以下は、window 時間内 (サンプルでは 10秒間) に二度 Pub/Sub topic へ送信を行った場合の出力です。
(X) Hello World, 2
Windowing
Apache Beam の特徴的な機能として、window があります。今回のサンプルでも使用していますが、実行中の実時間やデータ件数によって処理単位を区切って集計などの処理を行うことができる非常に強力な機能です。
わかりやすいのは、“5分単位でのログインユーザー数をカウントする” などの使い方です。
柔軟に設定して行う処理も独自に実装できますが、その柔軟さゆえに仕組みの理解や実装の敷居がかなり高いと感じます。
これに慣れるには、このあたり を一通り読みつつ、あとは試行錯誤しか無いかな… という絶望的な感想しか無いです。
いい方法があったら教えて下さい (;´Д`)
さて
今回くらいの内容であれば、実装を確認しつつ実行して感覚を掴んでみるのに敷居の高さはそこまで無いと思います。
逆に、ここから “ほんとうにやりたいこと” への道のりは長くなってしまうのですが… ここまでやっておけばきっと、明確になっている目的を目指して、試行錯誤でなんとかなるんじゃないでしょうか!(と思いたい)
おまけ
今回は build と実行に Gradle を使いましたが、手元にその環境が無いよというかたもいらっしゃるかと思います。
かくいう私も手元の PC に直接 install はしていませんでした。今回は VS Code Dev Container を使うことで、比較的簡単に環境が作れました。以下を .devcontainer/devcontainer.json
として置き、VS Code で “Dev Containers: Open Folder in Container…” します。
{
"name": "Java",
"image": "gradle:8-jdk8",
"mounts": [
"source=${localEnv:HOME}/.config/gcloud/application_default_credentials.json,target=/root/.gcloud/application_default_credentials.json,type=bind,consistency=cached",
],
"containerEnv": {
"GOOGLE_APPLICATION_CREDENTIALS": "/root/.gcloud/application_default_credentials.json",
},
}
今回、Dev Containers の設定を追加する機能を使って Gradle を追加 install する方法ではなぜか手元でうまく動かなかった (install されなかった) ため、Docker Hub にある Gradle の official image を使うようにしています。
host 側で gcloud auth application-default login
しておくことで application_default_credentials.json
を生成し、それを container 内に mount して環境変数 GOOGLE_APPLICATION_CREDENTIALS
でそれを使用するように指示しています。
これで Dockerfile も書かずに container で Java + Gradle の開発環境ができるので、なかなか便利なのではないでしょうか。