シリーズ・すこしずつがんばる streaming data 処理 (4) Apache Flink を試す
シリーズ・すこしずつがんばる streaming data 処理の四回目です。 (初回はこちら)
ステップを踏んですこしずつ進めていますので、ぜひ他の記事も見てみてください。
今回は、streaming data 処理の他の例として Apache Flink を試してみます。Flink を触るのは今回はじめてです。Beam の他にどんなものがあるのかな? と調べてみると思った以上にいろいろとあり、その中で 比較的シンプルそう・スケールする・比較的新しそう ということで選択しました。
ほんとうは Apache Spark を試そうと思っていたのですが、Spark Streaming Programming Guide を見ただけでつらく、断念しました…
Flink application をつくる
今回は こちら の内容を参考にさせていただきました。動作の内容はほぼおなじで、(WSL ではなく) Docker Compose で起動している Flink を実行環境にしているところが違うくらいです。
まずは Maven でテンプレを生成します。
groupId
artifactId
はお好みに合わせて変えてください。
$ mvn archetype:generate -D archetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.16.0 \
-DgroupId=com.example -DartifactId=flink-qstart1 -Dversion=0.1 \
-Dpackage=flinkStart -DinteractiveMode=false
そのまま build してみます。
$ mvn clean package
target/
以下に jar ができます。上の例そのままであれば flink-qstart1-0.1.jar
という名前で生成されているかと思います。
Flink を起動する
こちらの例 をそのまま使い、Docker Compose を用いて Flinkを起動します。
http://localhost:8081/
で Flink の web UI も使えるようになります。便利。
application (jar) を Flink に submit する
flink
cli の使い方は こちら を参照してください。
Docker Compose で起動した Flink jobmanager の container へ上記で生成した jar を docker cp
などしておき、(docker exec
などして) container 内の shell に入って以下のように実行してみます。
$ flink run /tmp/flink-qstart1-0.1.jar
...
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No operators defined in streaming topology. Cannot execute.
失敗しました。
Maven で生成したままのサンプルコードでは中身がなさすぎて上記のようなエラーになってしまうようです。
ということで、参考にさせていただいたサイトにならって こちらのサンプル から処理内容をコピーします。
サンプルのコメントに詳細が説明されていますが、このサンプルでは nc で起動した text server で受け付けた入力を stream の入力として処理するようになっています。
以下のように、text server (相当の nc) を起動して LISTEN しておきます。
※ 今回の例では localhost
で接続するため、jobmanager ではなく実際に Flink job が起動する taskmanager 側で起動することに注意してください。
ということで、同様に taskmanager container に対して docker exec
などして shell に入り、以下のようにします。
$ nc -l 8000
※ わたしが試した時点では taskmanager で使われている image flink:latest
では nc が入っていませんでした。必要に応じて、apt update && apt install netcat
などして入れてください
この Terminal は後で使用するのでこのまま置いておきます。
最後に、option (textserver の host と port) を追加して、サンプルプログラムを起動します。以下のコマンドにより、Flink に job として submit します。
$ flink run /tmp/flink-qstart1-0.1.jar --hostname localhost --port 8000
Flink の web UI で動作しているのを確認できます。
nc
を動かしている Terminal で英文などを入力すると、それが空白で区切られてカウントが送信され、web UI で “Records Sent” などの数値がカウントアップされるのが確認できます。詳しくはサンプルアプリケーションのソースを参照してください。
メモリ使用量の状況なども視覚的に確認できます。
先程はコマンドで実行しましたが、web UI から job を submit することもできるようです。
まとめ
Flink を体験してみました。今回はカスタム処理を書くところまでも行けませんでした (どころか、参考にさせていただいたサイトの内容以上の情報がないです…) が、なんとかサンプルの動作の確認はできました。Kafka を入力として処理したデータを MySQL などのデータベースに出力するようなことができるとおもしろそうです。どれくらいのスループットが出せるかなど気になるところですが、将来の課題としたいと思います 😇
参考
実行環境例
- Dataproc optional Flink component
- Google Cloud Dataproc では optional components feature として Flink を実行できるようです。
- Amazon EMR | Apache Flink
- Announcing Google Cloud Dataflow runner for Apache Flink
- これは逆に、Dataflow 向けの program が Flink で動作するってことでしょうか。ややこしい…