The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.
このページを編集

Funqy Knative Events バインディング

Quarkus Funqy Knative Eventsは、 Funqy HTTPエクステンションをベースに構築されており、Funqy関数内でKnativeイベントをルーティングして処理することができます。

このガイドでは、クイックスタートコードを使って、Knative Events を使って Funqy 関数をデプロイして呼び出す方法を説明します。

前提条件

このガイドを完成させるには、以下が必要です:

Knative の設定

Minikube 環境で Knative をローカルに設定することは、このガイドの範囲を超えています。Red HatがまとめたKnativeチュートリアル に従うことをお勧めします。このチュートリアルでは、ローカル環境でMinikubeやOpenShift上でKnativeをセットアップする方法を説明しています。

このガイドでは、クイックスタートコードをトリガーするためにブローカーを呼び出す必要があるため、特に ブローカーとトリガー のチュートリアルを実行する必要があります。

Cloud Events についての記事を読む

Knative Events をより深く理解するために、Cloud Events の 仕様 を読んでおくと良いでしょう。

クイックスタート

Gitレポジトリをクローンするか git clone https://github.com/quarkusio/quarkus-quickstarts.gitアーカイブ をダウンロードします。

ソリューションは funqy-knative-events-quickstart ディレクトリ にあります。

クイックスタートの流れ

クイックスタートは、Cloud Eventを含む HTTP リクエストを curl を使用して Knative Broker に手動で送信することで動作します。Knative Broker はリクエストを受信し、クイックスタートによって構築された Funqy コンテナの起動をトリガーします。イベントは、一連の Funqy 関数の呼び出しをトリガーします。1 つの関数の出力は、別の Funqy 関数の呼び出しをトリガーします。

Funqy と Cloud Events

Knative Events環境内で稼働する場合、Funqy関数は特定のタイプの Cloud Event によってトリガーされます。1つのアプリケーション/デプロイメント内に複数のFunqy関数を持つことができますが、それらは特定のタイプの Cloud Event によってトリガーされなければなりません。このルールの例外は、アプリケーション内にFunqy関数が1つしかない場合です。この場合、イベントは Cloud Event タイプに関係なく、その関数にプッシュされます。

現在、FunqyはJSONベースのデータしか消費できません。それ自体はバイナリモードと構造化モードの両方をサポートしていますが、Cloud EventメッセージのデータコンポーネントはJSONでなければなりません。このJSONは、関数のJavaパラメータやリターンタイプとの間でマーシャル可能でなければなりません。

コード

まずはクイックスタートコードを見て、Knative EventsとFunqyの対応関係を理解しましょう。 SimpleFunctionChain.java を開いてください。

最初に見る関数は defaultChain です。

import io.quarkus.funqy.Funq;

public class SimpleFunctionChain {
    @Funq
    public String defaultChain(String input) {
        log.info("*** defaultChain ***");
        return input + "::" + "defaultChain";
    }

この状態で、Funqy関数はデフォルトの Cloud Event マッピングを持っています。デフォルトでは、 Cloud Event のタイプは、トリガする関数の関数名と一致していなければなりません。関数が出力を返す場合、レスポンスは Cloud Event に変換され、ブローカに返されて他のトリガーにルーティングされます。このレスポンスのデフォルトの Cloud Event タイプは、関数名 + .output です。デフォルトの Cloud Event のソースは、関数名です。

したがって、 defaultChain 関数の場合、この関数をトリガーする Cloud Event のタイプは defaultChain です。これは、タイプが defaultChain.output でイベントソースが defaultChain である新しい Cloud Event をトリガーするレスポンスを生成します。

デフォルトのマッピングはシンプルですが、必ずしもうまくいくとは限りません。このデフォルトのマッピングは設定で変更することができます。次の関数を見てみましょう。

import io.quarkus.funqy.Funq;

public class SimpleFunctionChain {
    @Funq
    public String configChain(String input) {
        log.info("*** configChain ***");
        return input + "::" + "configChain";
    }

configChain 関数のクラウド イベント マッピングは、 application.properties 内の設定プロパティによって変更されます。

quarkus.funqy.knative-events.mapping.configChain.trigger=defaultChain.output
quarkus.funqy.knative-events.mapping.configChain.response-type=annotated
quarkus.funqy.knative-events.mapping.configChain.response-source=configChain

この場合、この構成では、受信するCloud Eventのタイプ defaultChain.outputconfigChain 関数にマッピングします。 configChain 関数は、そのレスポンスを annotated Cloud Event タイプに、Cloud Event ソース configChain にマッピングします。

  • quarkus.funqy.knative-events.mapping.{function name}.trigger は、特定の関数をトリガーする Cloud Event タイプを設定します。特別な値 * をキャッチオール値として使用することができます。 この場合、この関数はすべてのイベントタイプに使用されます。

  • quarkus.funqy.knative-events.mapping.{function name}.response-type は、レスポンスの Cloud Event タイプを設定します。

  • quarkus.funqy.knative-events.mapping.{function name}.resource-source は、レスポンスの Cloud Event のソースを設定します。

Funqy Knative Events エクステンションには、この Cloud Event の関数へのマッピングを行うためのアノテーションも用意されています。 annotatedChain メソッドを見てみましょう。

import io.quarkus.funqy.Funq;
import io.quarkus.funqy.knative.events.CloudEventMapping;

public class SimpleFunctionChain {
    @Funq
    @CloudEventMapping(trigger = "annotated", responseSource = "annotated", responseType = "lastChainLink")
    public String annotatedChain(String input) {
        log.info("*** annotatedChain ***");
        return input + "::" + "annotatedChain";
    }

関数に @CloudEventMapping アノテーションを使用すると、Cloud Event タイプのトリガーとCloud Event レスポンスをマッピングできます。この例では、 annotatedChain 関数は annotated Cloud Event タイプによってトリガされ、レスポンスは lastChainLink タイプと annotated Cloud Eventソースにマッピングされます。

つまり、 SimpleFunctionChain 内で定義されているすべての関数を見てみると、ある関数が次の関数を引き金にしていることに気づくでしょう。最後にトリガーされる関数は lastChainLink です。

import io.quarkus.funqy.Context;
import io.quarkus.funqy.Funq;

public class SimpleFunctionChain {
    @Funq
    public void lastChainLink(String input, @Context CloudEvent event) {
        log.info("*** lastChainLink ***");
        log.info(input + "::" + "lastChainLink");
    }
}

この機能には2つの注意点があります。一つは、この関数には出力がありません。あなたの関数は出力を返す必要はありません。第二に、この関数には追加の event パラメータがあります。

受信したcloud eventに関する追加情報を知りたい場合は、Funqy @Context アノテーションを使用して io.quarkus.funqy.knative.events.CloudEvent インターフェースを注入することができます。 CloudEvent インターフェースは、トリガーとなるイベントに関する情報を公開します。

public interface CloudEvent {
    String id();
    String specVersion();
    String source();
    String subject();
    OffsetDateTime time();
}

Maven

POM を見ると、典型的なQuarkusのPOMで、Funqyの依存関係を1つ取り込んでいることがわかります:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-funqy-knative-events</artifactId>
</dependency>

開発モードとテスト

Funqy Knative Eventsは、RestAssuredを使った開発モードとユニットテストをサポートしています。Funqy Knative Eventsの関数は、通常のHTTPリクエストを使った Funqy HTTPと同じ呼び出しモデル、またはCloud Event Binaryモード、Structuredモードを使って呼び出すことができます。すべての呼び出しモードが同時にサポートされています。

FunqyTest.java のユニットテストコードを開くと、単純にRestAssuredを使ってHTTP呼び出しをして関数をテストしていることがわかります。

FunqyはQuarkus 開発モードにも対応しています!

プロジェクトのビルド

まず、Javaアーティファクトをビルドします。

コマンドラインインタフェース
quarkus build
Maven
./mvnw install

次に、Knativeではdockerイメージが必要なので、続いてそれをビルドする必要があります。

docker build -f src/main/docker/Dockerfile.jvm -t yourAccountName/funqy-knative-events-quickstart .

docker build を実行する際には、必ず yourAccountName を docker または quay のアカウント名に置き換えてください。Dockerfileは標準のQuarkusのdockerfileです。特別なKnativeマジックはありません。

イメージをDocker HubやQuayにプッシュする

docker push yourAccountName/funqy-knative-events-quickstart

繰り返しになりますが、 docker push を実行する際には、 yourAccountName を docker または quay のアカウント名に置き換えてください。

Kubernetes/OpenShiftへのデプロイ

最初のステップは、私たちの名前空間にブローカーを設定することです。以下は、Knative cliのコマンド例です。

kn broker create default \
  --namespace knativetutorial

作成したブローカーは default といい、このブローカーはクラウドのイベントを受け取ります。このブローカは関数のYAMLファイルでも参照されています。

2つ目のステップは、ビルド中に作成しプッシュしたDockerイメージを指すKubernetes/OpenShiftサービスを定義することです。 funqy-service.yaml を見てみましょう。

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: funqy-knative-events-quickstart
spec:
  template:
    metadata:
      name: funqy-knative-events-quickstart-v1
      annotations:
        autoscaling.knative.dev/target: "1"
    spec:
      containers:
        - image: docker.io/yourAccountName/funqy-knative-events-quickstart

これはKubernetesの標準的なサービス定義YAMLファイルです。

イメージのURLを、先に作成してプッシュしたイメージを指すように変更してください。

今回のクイックスタートでは、1つのKubernetesサービスにすべての関数を格納します。このクイックスタートを複数の異なるプロジェクトに分割し、各関数にサービスをデプロイできない理由はありません。簡単にするために、また関数ごとにデプロイメントを行う必要がないことを示すために、クイックスタートはすべてを1つのプロジェクト、イメージ、サービスにまとめます。

サービスyamlをデプロイします。

kubectl apply -n knativetutorial -f src/main/k8s/funqy-service.yaml

次のステップは、それぞれのイベントタイプにKnative Eventトリガーを展開することです。コードセクションで述べたように、Funqyの各関数は特定のCloud Eventタイプにマッピングされています。Cloud Eventをマッピングし、特定のKubernetesサービスにルーティングするKnative Event Triggersを作成する必要があります。トリガーは4種類あります。

apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
  name: defaultchain
spec:
  broker: default
  filter:
    attributes:
      type: defaultChain
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: funqy-knative-events-quickstart

spec:filter:attributes:type は、 spec:subscriber:ref で定義されている Kubernetes サービスにCloud Eventのタイプをマッピングします。Cloud EventがBrokerにプッシュされると、そのイベントにマッピングされたサービスのスピンアップがトリガーされます。

4つのFunqy関数ごとにTrigger YAMLファイルが用意されています。これらをすべてデプロイします:

kubectl apply -n knativetutorial -f src/main/k8s/defaultChain-trigger.yaml
kubectl apply -n knativetutorial -f src/main/k8s/configChain-trigger.yaml
kubectl apply -n knativetutorial -f src/main/k8s/annotatedChain-trigger.yaml
kubectl apply -n knativetutorial -f src/main/k8s/lastChainLink-trigger.yaml

デモの実行

2つの異なるターミナルウィンドウが必要です。一つは Broker への curl リクエストを行うためのもので、もう一つはポッドのログファイルを見るためのもので、Funqy 関数のイベントチェーンを流れるメッセージを見ることができます。

stern ツールがインストールされていることを確認してください。それについての情報は Knative チュートリアルのセットアップを参照してください。Funqy デプロイメントが出力したログを探すために stern を実行します。

stern funq user-container

別の端末を開きます。まずブローカーのURLを知る必要があります。このコマンドを実行して探します。

kubectl get broker default -o jsonpath='{.status.address.url}'

これにより、例えば以下のようなURLが表示されます: http://broker-ingress.knative-eventing.svc.cluster.local/knativetutorial/default 。このURLを覚えておいてください。

次にやるべきことは、Kubernetesクラスタにsshして、ブローカーにcurlリクエストを送ることです。以下のコマンドでシンプルなOSポッドを作成し、関数にcurlできるようにします。

kubectl -n knativetutorial apply -f src/main/k8s/curler.yaml

curler ポッドが立ち上がるまで、数秒待つ必要があるかもしれません。以下を実行して、curler pod への bash アクセスを取得します:

kubectl -n knativetutorial exec -it curler -- /bin/bash

これで、Kubernetesクラスタ内のシェルに入ります。シェル内で次のcurlコマンドを実行します。ブローカーのアドレスは一例であり、プロジェクトやブローカー名によって異なる場合があります。

curl -v "http://default-broker.knativetutorial.svc.cluster.local" \
-X POST \
-H "Ce-Id: 1234" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: defaultChain" \
-H "Ce-Source: curl" \
-H "Content-Type: application/json" \
-d '"Start"'

これは、ブローカーに Knative イベントをポストし、 defaultChain 関数をトリガーします。前述したように、 defaultChain の出力は、 configChain に投稿されたイベントをトリガーにして、 annotatedChain に投稿されたイベントをトリガーにして、最後に lastChainLink 関数をトリガーにします。この流れは stern ウィンドウで見ることができます。以下のようなものが出力されるはずです。

funqy-knative-events-quickstart-v1-deployment-59bb88bcf4-9jwdx user-container 2020-05-12 13:44:02,256 INFO  [org.acm.fun.SimpleFunctionChain] (executor-thread-1) *** defaultChain ***
funqy-knative-events-quickstart-v1-deployment-59bb88bcf4-9jwdx user-container 2020-05-12 13:44:02,365 INFO  [org.acm.fun.SimpleFunctionChain] (executor-thread-2) *** configChain ***
funqy-knative-events-quickstart-v1-deployment-59bb88bcf4-9jwdx user-container 2020-05-12 13:44:02,394 INFO  [org.acm.fun.SimpleFunctionChain] (executor-thread-1) *** annotatedChain ***
funqy-knative-events-quickstart-v1-deployment-59bb88bcf4-9jwdx user-container 2020-05-12 13:44:02,466 INFO  [org.acm.fun.SimpleFunctionChain] (executor-thread-2) *** lastChainLink ***
funqy-knative-events-quickstart-v1-deployment-59bb88bcf4-9jwdx user-container 2020-05-12 13:44:02,467 INFO  [org.acm.fun.SimpleFunctionChain] (executor-thread-2) Start::defaultChain::configChain::annotatedChain::lastChainLink

関連コンテンツ