スクエニ ITエンジニア ブログ

シリーズ・すこしずつがんばる streaming data 処理 (4) Apache Flink を試す

シリーズ・すこしずつがんばる streaming data 処理の四回目です。 (初回はこちら)
ステップを踏んですこしずつ進めていますので、ぜひ他の記事も見てみてください。

今回は、streaming data 処理の他の例として Apache Flink を試してみます。Flink を触るのは今回はじめてです。Beam の他にどんなものがあるのかな? と調べてみると思った以上にいろいろとあり、その中で 比較的シンプルそう・スケールする・比較的新しそう ということで選択しました。

ほんとうは Apache Spark を試そうと思っていたのですが、Spark Streaming Programming Guide を見ただけでつらく、断念しました…

今回は こちら の内容を参考にさせていただきました。動作の内容はほぼおなじで、(WSL ではなく) Docker Compose で起動している Flink を実行環境にしているところが違うくらいです。

まずは Maven でテンプレを生成します。
groupId artifactId はお好みに合わせて変えてください。

$ mvn archetype:generate -D archetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.16.0 \
    -DgroupId=com.example -DartifactId=flink-qstart1 -Dversion=0.1 \
    -Dpackage=flinkStart -DinteractiveMode=false

そのまま build してみます。

$ mvn clean package

target/ 以下に jar ができます。上の例そのままであれば flink-qstart1-0.1.jar という名前で生成されているかと思います。

こちらの例 をそのまま使い、Docker Compose を用いて Flinkを起動します。

http://localhost:8081/ で Flink の web UI も使えるようになります。便利。

flink cli の使い方は こちら を参照してください。

Docker Compose で起動した Flink jobmanager の container へ上記で生成した jar を docker cp などしておき、(docker exec などして) container 内の shell に入って以下のように実行してみます。

$ flink run /tmp/flink-qstart1-0.1.jar
  ...
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No operators defined in streaming topology. Cannot execute.

失敗しました。
Maven で生成したままのサンプルコードでは中身がなさすぎて上記のようなエラーになってしまうようです。

ということで、参考にさせていただいたサイトにならって こちらのサンプル から処理内容をコピーします。

サンプルのコメントに詳細が説明されていますが、このサンプルでは nc で起動した text server で受け付けた入力を stream の入力として処理するようになっています。
以下のように、text server (相当の nc) を起動して LISTEN しておきます。
※ 今回の例では localhost で接続するため、jobmanager ではなく実際に Flink job が起動する taskmanager 側で起動することに注意してください。

ということで、同様に taskmanager container に対して docker exec などして shell に入り、以下のようにします。

$ nc -l 8000

※ わたしが試した時点では taskmanager で使われている image flink:latest では nc が入っていませんでした。必要に応じて、apt update && apt install netcat などして入れてください

この Terminal は後で使用するのでこのまま置いておきます。

最後に、option (textserver の host と port) を追加して、サンプルプログラムを起動します。以下のコマンドにより、Flink に job として submit します。

$ flink run /tmp/flink-qstart1-0.1.jar --hostname localhost --port 8000

Flink の web UI で動作しているのを確認できます。

Flink web UI
Flink web UI
Flink web UI

nc を動かしている Terminal で英文などを入力すると、それが空白で区切られてカウントが送信され、web UI で “Records Sent” などの数値がカウントアップされるのが確認できます。詳しくはサンプルアプリケーションのソースを参照してください。

メモリ使用量の状況なども視覚的に確認できます。

Flink web UI

先程はコマンドで実行しましたが、web UI から job を submit することもできるようです。

Flink web UI

まとめ

Flink を体験してみました。今回はカスタム処理を書くところまでも行けませんでした (どころか、参考にさせていただいたサイトの内容以上の情報がないです…) が、なんとかサンプルの動作の確認はできました。Kafka を入力として処理したデータを MySQL などのデータベースに出力するようなことができるとおもしろそうです。どれくらいのスループットが出せるかなど気になるところですが、将来の課題としたいと思います 😇

参考

実行環境例

この記事を書いた人

記事一覧
SQUARE ENIXでは一緒に働く仲間を募集しています!
興味をお持ちいただけたら、ぜひ採用情報ページもご覧下さい!