Fn Flowを触ってみる

こんにちは id:dhigashi です。

以前、 Fn Project を簡単に触ってみましたので、今回は Fn Flow を触ってみたいと思います。

Fn Project の記事はこちらからご覧ください。 cloudii.atomitech.jp

Fn Flow について

Fn Flow のREADME には Fn Flow について以下のように紹介されており、Fn 単体では不足していた実際のアプリケーション開発で必要になる機能をサポートしています。

Flow empowers you to build workflows as distributed programs that are as complex as you need them to be and supports a rich set of concurrency primitives including fork-join, chaining, delays and error handling.

今回は Fn Flow を用いた並列処理 (fork-join)、連鎖 (chaining)、遅延処理 (delays) を実装してみたいと思います。

Word Count with Fn Flow

今回作成する Fn Flow アプリケーションは指定された複数のテキストファイルの中の各単語の出現頻度を数え上げる、なんちゃっての MapReduce っぽいアプリケーションです。

f:id:dhigashi:20180904154155p:plain

入力にテキストファイルのURLと任意で単語を指定すると、その単語の出現頻度を出力します。
単語を指定しない場合はすべての単語の出現頻度を出力します。

例として、ハムレットを幕毎に分割し Object Storage に配置しダウンロード URL を指定し、単語に「hamlet」「love」に指定してみます。

"hamlet"は94回、"love"は62回出現しているようです。(単語の Split 処理が雑な為、正しい数になっていない気がします)

# Request
{
    "urls": [
        "https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT1.txt",
        "https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT2.txt",
        "https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT3.txt",
        "https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT4.txt",
        "https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT5.txt"
    ],
    "words": [
        "hamlet",
        "love"
    ]
}

# Result
{"love":62,"hamlet":94}

アプリケーションのソースコードはこちらから参照して下さい。

github.com

事前準備

Fn サーバがインストール済みで関数のデプロイ等が行える事を前提とします。
準備がまだの場合は冒頭で紹介した記事をご覧ください。

本記事執筆時の環境は以下の通りです。

$ fn version
Client version is latest version: 0.4.153
Server version:  0.3.545

尚、すべての作業は同じサーバインスタンス上で行うものとします。

Flow サーバの起動

Fn サーバが起動できたら Flow サーバを起動します。

Flow サーバは Fn サーバを呼び出せる必要があるため IP アドレスを取得し、Flow サーバの起動時に環境変数で与えます

$ FNSERVER_IP=$(docker inspect --type container -f '{{.NetworkSettings.IPAddress}}' fnserver)
$ docker run --rm -d \
     -p 8081:8081 \
     -e API_URL="http://$FNSERVER_IP:8080/r" \
     -e no_proxy=$FNSERVER_IP \
     --name flowserver \
     fnproject/flow:latest

Flow サーバが起動している事が確認できます。

$ docker ps
CONTAINER ID        IMAGE                   COMMAND                  CREATED              STATUS              PORTS                              NAMES
a9947d53982d        fnproject/flow:latest   "/fnproject/flow-ser…"   About a minute ago   Up About a minute   0.0.0.0:8081->8081/tcp             flowserver
f7ff0cd11b34        fnproject/fnserver      "./fnserver"             6 minutes ago        Up 6 minutes        2375/tcp, 0.0.0.0:8080->8080/tcp   fnserver

Fn Flow アプリケーションの準備

まずデプロイするアプリケーションを取得しておきます。

$ git clone https://github.com/cloudii-oc/examples.git
$ cd examples/fnproject/flow-word-count
$ ls
app.yaml  counter  mapper

このアプリケーションには countermapper の2つの関数が含まれています。

mapper は独立した関数で、指定された URL からテキストファイルを読み込み各単語の出現頻度を数え上げます。

counter がメインの関数で、リクエストを分割し mapper 関数を呼び出すなどの処理を行います。
mapper 関数が返した出現頻度のマージ処理、単語の絞り込みなども担当します。

Java での Fn Flow アプリケーションについて

Flow アプリケーションの開発の導入は チュートリアルFnFlowsUserGuide を参考にしました。

Fn Flow アプリケーション開発に用いる Flow API は CompletableFuture API と共通の API が多くあり、CompletableFuture API を用いた並行プログラミングの経験者にはとっつき易くなっています。

以下では、どのように Fn Flow アプリケーションを開発しているか counter 関数を例に簡単にご紹介します。

FlowFuture を作成する

Flow#invokeFunction で関数を呼び出し FlowFuture を作成します。

FlowFuture<Message.MapperResponse> future =
        flow.invokeFunction("./mapper", mapperRequest, Message.MapperResponse.class);

処理を並列で実行する

複数の関数を呼び出し、Flow#allOf で全ての FlowFuture が完了したときに新しい FlowFuture を作成します。

List<FlowFuture<Message.MapperResponse>> futures = /* 複数関数の呼び出し */

flow.allOf(futures.toArray(new FlowFuture[0]))
        .thenApply(...

処理を連鎖して実行する

FlowFuture が完了したとき、その結果を Flow#thenApply に与えた関数の引数に与え実行し、その結果を新たな FlowFuture として返します。

FlowFuture<List<Message.MapperResponse>> futureResults = flow.allOf(futures.toArray(new FlowFuture[0]))
        .thenApply(VOID -> futures.stream()
                .map(FlowFuture::get)
                .collect(Collectors.toList()));

処理の結果を取得する

Flow#get で FlowFuture が完了するまで待ち、その結果を取得します。

FlowFuture<Map<String, Integer>> futureCount = /* 出現頻度を取得する */
futureCount.get();

Fn Flow アプリケーションのデプロイ

それでは、取得した Fn Flow アプリケーションを Fn サーバにデプロイします。
この時 fn deploy コマンドの --all オプションで全ての関数をデプロイします。

$ fn deploy --local --all
Deploying counter to app: flow-word-count at path: /counter
Bumped to version 0.0.4
Building image counter:0.0.4 .................................
Updating route /counter using image counter:0.0.4...
Deploying mapper to app: flow-word-count at path: /mapper
Bumped to version 0.0.4
Building image mapper:0.0.4 .....................
Updating route /mapper using image mapper:0.0.4...

デプロイ後ルーティングを確認すると /counter/mapper の2つが存在する事が確認できます。

$ fn list routes flow-word-count
PATH            IMAGE           ENDPOINT
/counter        counter:0.0.4   localhost:8080/r/flow-word-count/counter
/mapper         mapper:0.0.4    localhost:8080/r/flow-word-count/mapper

正しくデプロイできるか確かめる為、 mapper 関数を実行してみます。

$ echo -n '{"url":"https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT1.txt"}' | fn call flow-word-count /mapper
{"count":{"(within)":1,"spoke":1,"hamlet":23,"your":49,"without":3,"youth":5,"lion's":1,"these":13,"would":14,"'that":1,"sovereignty":1,"prison":1  (略)

正常にデプロイができている事が確認できます。

同様に counter 関数を実行してみます。

$ echo -n '{"urls":["https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT1.txt"]}' | fn call flow-word-count /counter
com.fnproject.fn.runtime.exception.PlatformCommunicationException: Failed to create flow
        at com.fnproject.fn.runtime.flow.RemoteFlowApiClient.createFlow(RemoteFlowApiClient.java:51)
        at com.fnproject.fn.runtime.flow.FlowContinuationInvoker$2.currentFlow(FlowContinuationInvoker.java:172)
        at com.fnproject.fn.api.flow.Flows.currentFlow(Flows.java:41)
       (略)

Flow が作成できないとエラーとなってしまいました。

FLow の設定

このままでは Flow アプリケーション (counter 関数) を実行できないので設定を行います。

Fn サーバが Flow サーバと通信できるようにするため、Flow サーバの IP アドレスを取得し設定を行います。

$ FLOWSERVER_IP=$(docker inspect --type container -f '{{.NetworkSettings.IPAddress}}' flowserver)
$ fn config app flow101 COMPLETER_BASE_URL "http://$FLOWSERVER_IP:8081"

以上でアプリケーションを実行できるようになりました。

改めて counter 関数を実行してみます。

$ echo -n '{"urls":["https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT1.txt"]}' | fn call flow-word-count /counter
{"(within)":1,"spoke":1,"hamlet":23,"your":49,"without":3,"youth":5,"lion's":1,"these":13,"would":14,"'that":1,"sovereignty":1,"prison":1,"sister":4,"whatsoever":1,"know't":1,"blasts":1,"everlasting":1,"audience":1,(略)

実行できる事が確認できました。

冒頭で例として載せた、ハムレットに「hamlet」「love」という単語がいくつ含まれるのかを調べてみます。

$ cat hamlet.json
{
    "urls": [
        "https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT1.txt",
        "https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT2.txt",
        "https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT3.txt",
        "https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT4.txt",
        "https://objectstorage.us-phoenix-1.oraclecloud.com/n/NAMESPACE/b/hamlet/o/ACT5.txt"
    ],
    "words": [
        "hamlet",
        "love"
    ]
}
$ fn call flow-word-count /counter < hamlet.json
{"love":62,"hamlet":94}

まとめ

Fn Flow を用いて外部の関数を非同期に呼び出す、並行処理を実装する、など単純な Fn 関数より複雑なアプリケーションを作成してみました。
今回はエラーハンドリングなど検証不足の部分も多いですが、興味を持たれた方はぜひ触ってみて下さい。

参考文献