Funqy Knative Events バインディング
Quarkus Funqy Knative Eventsは、 Funqy HTTPエクステンションをベースに構築されており、Funqy関数内でKnativeイベントをルーティングして処理することができます。
このガイドでは、クイックスタートコードを使って、Knative Events を使って Funqy 関数をデプロイして呼び出す方法を説明します。
前提条件
このガイドを完成させるには、以下が必要です:
-
ざっと 1 hour
-
IDE
-
JDK 17+がインストールされ、
JAVA_HOME
が適切に設定されていること -
Apache Maven 3.9.9
-
使用したい場合は、 Quarkus CLI
-
Funqy の基礎 を読んで下さい。 これは短い読み物です!
-
Knative チュートリアル 、具体的には ブローカーとトリガー
Knative の設定
Minikube 環境で Knative をローカルに設定することは、このガイドの範囲を超えています。Red HatがまとめたKnativeチュートリアル に従うことをお勧めします。このチュートリアルでは、ローカル環境でMinikubeやOpenShift上でKnativeをセットアップする方法を説明しています。
このガイドでは、クイックスタートコードをトリガーするためにブローカーを呼び出す必要があるため、特に ブローカーとトリガー のチュートリアルを実行する必要があります。 |
Cloud Events についての記事を読む
Knative Events をより深く理解するために、Cloud Events の 仕様 を読んでおくと良いでしょう。
クイックスタート
Gitレポジトリをクローンするか git clone https://github.com/quarkusio/quarkus-quickstarts.git
、 アーカイブ をダウンロードします。
The solution is located in the funqy-knative-events-quickstart
directory.
クイックスタートの流れ
クイックスタートは、Cloud Eventを含む HTTP リクエストを curl
を使用して Knative Broker に手動で送信することで動作します。Knative Broker はリクエストを受信し、クイックスタートによって構築された Funqy コンテナの起動をトリガーします。イベントは、一連の Funqy 関数の呼び出しをトリガーします。1 つの関数の出力は、別の Funqy 関数の呼び出しをトリガーします。
Funqy と Cloud Events
Knative Events環境内で稼働する場合、Funqy関数は特定のタイプの Cloud Event によってトリガーされます。1つのアプリケーション/デプロイメント内に複数のFunqy関数を持つことができますが、それらは特定のタイプの Cloud Event によってトリガーされなければなりません。このルールの例外は、アプリケーション内にFunqy関数が1つしかない場合です。この場合、イベントは Cloud Event タイプに関係なく、その関数にプッシュされます。
現在、FunqyはJSONベースのデータしか消費できません。それ自体はバイナリモードと構造化モードの両方をサポートしていますが、Cloud EventメッセージのデータコンポーネントはJSONでなければなりません。このJSONは、関数のJavaパラメータやリターンタイプとの間でマーシャル可能でなければなりません。
コード
Let’s start looking at our quickstart code so that you can understand how Knative Events map to Funqy. Open up SimpleFunctionChain.java
最初に見る関数は defaultChain
です。
import io.quarkus.funqy.Funq;
public class SimpleFunctionChain {
@Funq
public String defaultChain(String input) {
log.info("*** defaultChain ***");
return input + "::" + "defaultChain";
}
この状態で、Funqy関数はデフォルトの Cloud Event マッピングを持っています。デフォルトでは、 Cloud Event のタイプは、トリガする関数の関数名と一致していなければなりません。関数が出力を返す場合、レスポンスは Cloud Event に変換され、ブローカに返されて他のトリガーにルーティングされます。このレスポンスのデフォルトの Cloud Event タイプは、関数名 + .output
です。デフォルトの Cloud Event のソースは、関数名です。
したがって、 defaultChain
関数の場合、この関数をトリガーする Cloud Event のタイプは defaultChain
です。これは、タイプが defaultChain.output
でイベントソースが defaultChain
である新しい Cloud Event をトリガーするレスポンスを生成します。
デフォルトのマッピングはシンプルですが、必ずしもうまくいくとは限りません。このデフォルトのマッピングは設定で変更することができます。次の関数を見てみましょう。
import io.quarkus.funqy.Funq;
public class SimpleFunctionChain {
@Funq
public String configChain(String input) {
log.info("*** configChain ***");
return input + "::" + "configChain";
}
The configChain
function has its Cloud Event mapping changed by configuration within application.properties.
quarkus.funqy.knative-events.mapping.configChain.trigger=defaultChain.output
quarkus.funqy.knative-events.mapping.configChain.response-type=annotated
quarkus.funqy.knative-events.mapping.configChain.response-source=configChain
この場合、この構成では、受信するCloud Eventのタイプ defaultChain.output
を configChain
関数にマッピングします。 configChain
関数は、そのレスポンスを annotated
Cloud Event タイプに、Cloud Event ソース configChain
にマッピングします。
-
quarkus.funqy.knative-events.mapping.{function name}.trigger
は、特定の関数をトリガーする Cloud Event タイプを設定します。特別な値*
をキャッチオール値として使用することができます。 この場合、この関数はすべてのイベントタイプに使用されます。 -
quarkus.funqy.knative-events.mapping.{function name}.response-type
は、レスポンスの Cloud Event タイプを設定します。 -
quarkus.funqy.knative-events.mapping.{function name}.resource-source
は、レスポンスの Cloud Event のソースを設定します。
Funqy Knative Events エクステンションには、この Cloud Event の関数へのマッピングを行うためのアノテーションも用意されています。 annotatedChain
メソッドを見てみましょう。
import io.quarkus.funqy.Funq;
import io.quarkus.funqy.knative.events.CloudEventMapping;
public class SimpleFunctionChain {
@Funq
@CloudEventMapping(trigger = "annotated", responseSource = "annotated", responseType = "lastChainLink")
public String annotatedChain(String input) {
log.info("*** annotatedChain ***");
return input + "::" + "annotatedChain";
}
関数に @CloudEventMapping
アノテーションを使用すると、Cloud Event タイプのトリガーとCloud Event レスポンスをマッピングできます。この例では、 annotatedChain
関数は annotated
Cloud Event タイプによってトリガされ、レスポンスは lastChainLink
タイプと annotated
Cloud Eventソースにマッピングされます。
つまり、 SimpleFunctionChain
内で定義されているすべての関数を見てみると、ある関数が次の関数を引き金にしていることに気づくでしょう。最後にトリガーされる関数は lastChainLink
です。
import io.quarkus.funqy.Context;
import io.quarkus.funqy.Funq;
public class SimpleFunctionChain {
@Funq
public void lastChainLink(String input, @Context CloudEvent event) {
log.info("*** lastChainLink ***");
log.info(input + "::" + "lastChainLink");
}
}
この機能には2つの注意点があります。一つは、この関数には出力がありません。あなたの関数は出力を返す必要はありません。第二に、この関数には追加の event
パラメータがあります。
受信したcloud eventに関する追加情報を知りたい場合は、Funqy @Context
アノテーションを使用して io.quarkus.funqy.knative.events.CloudEvent
インターフェースを注入することができます。 CloudEvent
インターフェースは、トリガーとなるイベントに関する情報を公開します。
public interface CloudEvent {
String id();
String specVersion();
String source();
String subject();
OffsetDateTime time();
}
Maven
If you look at the POM, you’ll see that it is a typical Quarkus POM that pulls in one Funqy dependency:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-funqy-knative-events</artifactId>
</dependency>
開発モードとテスト
Funqy Knative Eventsは、RestAssuredを使った開発モードとユニットテストをサポートしています。Funqy Knative Eventsの関数は、通常のHTTPリクエストを使った Funqy HTTPと同じ呼び出しモデル、またはCloud Event Binaryモード、Structuredモードを使って呼び出すことができます。すべての呼び出しモードが同時にサポートされています。
So, if you open up the unit test code in FunqyTest.java you’ll see that its simply using RestAssured to make HTTP invocations to test the functions.
FunqyはQuarkus 開発モードにも対応しています!
プロジェクトのビルド
まず、Javaアーティファクトをビルドします。
quarkus build
./mvnw install
次に、Knativeではdockerイメージが必要なので、続いてそれをビルドする必要があります。
docker build -f src/main/docker/Dockerfile.jvm -t yourAccountName/funqy-knative-events-quickstart .
docker build
を実行する際には、必ず yourAccountName
を docker または quay のアカウント名に置き換えてください。Dockerfileは標準のQuarkusのdockerfileです。特別なKnativeマジックはありません。
イメージをDocker HubやQuayにプッシュする
docker push yourAccountName/funqy-knative-events-quickstart
繰り返しになりますが、 docker push
を実行する際には、 yourAccountName
を docker または quay のアカウント名に置き換えてください。
Kubernetes/OpenShiftへのデプロイ
最初のステップは、私たちの名前空間にブローカーを設定することです。以下は、Knative cliのコマンド例です。
kn broker create default \
--namespace knativetutorial
作成したブローカーは default
といい、このブローカーはクラウドのイベントを受け取ります。このブローカは関数のYAMLファイルでも参照されています。
The second step is to define a Kubernetes/OpenShift service to point to the Docker image you created and pushed during build. Take a look at funqy-service.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: funqy-knative-events-quickstart
spec:
template:
metadata:
name: funqy-knative-events-quickstart-v1
annotations:
autoscaling.knative.dev/target: "1"
spec:
containers:
- image: docker.io/yourAccountName/funqy-knative-events-quickstart
これはKubernetesの標準的なサービス定義YAMLファイルです。
イメージのURLを、先に作成してプッシュしたイメージを指すように変更してください。 |
今回のクイックスタートでは、1つのKubernetesサービスにすべての関数を格納します。このクイックスタートを複数の異なるプロジェクトに分割し、各関数にサービスをデプロイできない理由はありません。簡単にするために、また関数ごとにデプロイメントを行う必要がないことを示すために、クイックスタートはすべてを1つのプロジェクト、イメージ、サービスにまとめます。
サービスyamlをデプロイします。
kubectl apply -n knativetutorial -f src/main/k8s/funqy-service.yaml
次のステップは、それぞれのイベントタイプにKnative Eventトリガーを展開することです。コードセクションで述べたように、Funqyの各関数は特定のCloud Eventタイプにマッピングされています。Cloud Eventをマッピングし、特定のKubernetesサービスにルーティングするKnative Event Triggersを作成する必要があります。トリガーは4種類あります。
apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
name: defaultchain
spec:
broker: default
filter:
attributes:
type: defaultChain
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: funqy-knative-events-quickstart
spec:filter:attributes:type
は、 spec:subscriber:ref
で定義されている Kubernetes サービスにCloud Eventのタイプをマッピングします。Cloud EventがBrokerにプッシュされると、そのイベントにマッピングされたサービスのスピンアップがトリガーされます。
4つのFunqy関数ごとにTrigger YAMLファイルが用意されています。これらをすべてデプロイします:
kubectl apply -n knativetutorial -f src/main/k8s/defaultChain-trigger.yaml
kubectl apply -n knativetutorial -f src/main/k8s/configChain-trigger.yaml
kubectl apply -n knativetutorial -f src/main/k8s/annotatedChain-trigger.yaml
kubectl apply -n knativetutorial -f src/main/k8s/lastChainLink-trigger.yaml
デモの実行
2つの異なるターミナルウィンドウが必要です。一つは Broker への curl リクエストを行うためのもので、もう一つはポッドのログファイルを見るためのもので、Funqy 関数のイベントチェーンを流れるメッセージを見ることができます。
stern
ツールがインストールされていることを確認してください。それについての情報は Knative チュートリアルのセットアップを参照してください。Funqy デプロイメントが出力したログを探すために stern を実行します。
stern funq user-container
別の端末を開きます。まずブローカーのURLを知る必要があります。このコマンドを実行して探します。
kubectl get broker default -o jsonpath='{.status.address.url}'
これにより、例えば以下のようなURLが表示されます: http://broker-ingress.knative-eventing.svc.cluster.local/knativetutorial/default
。このURLを覚えておいてください。
次にやるべきことは、Kubernetesクラスタにsshして、ブローカーにcurlリクエストを送ることです。以下のコマンドでシンプルなOSポッドを作成し、関数にcurlできるようにします。
kubectl -n knativetutorial apply -f src/main/k8s/curler.yaml
curler ポッドが立ち上がるまで、数秒待つ必要があるかもしれません。以下を実行して、curler pod への bash アクセスを取得します:
kubectl -n knativetutorial exec -it curler -- /bin/bash
これで、Kubernetesクラスタ内のシェルに入ります。シェル内で次のcurlコマンドを実行します。ブローカーのアドレスは一例であり、プロジェクトやブローカー名によって異なる場合があります。
curl -v "http://default-broker.knativetutorial.svc.cluster.local" \
-X POST \
-H "Ce-Id: 1234" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: defaultChain" \
-H "Ce-Source: curl" \
-H "Content-Type: application/json" \
-d '"Start"'
これは、ブローカーに Knative イベントをポストし、 defaultChain
関数をトリガーします。前述したように、 defaultChain
の出力は、 configChain
に投稿されたイベントをトリガーにして、 annotatedChain
に投稿されたイベントをトリガーにして、最後に lastChainLink
関数をトリガーにします。この流れは stern
ウィンドウで見ることができます。以下のようなものが出力されるはずです。
funqy-knative-events-quickstart-v1-deployment-59bb88bcf4-9jwdx user-container 2020-05-12 13:44:02,256 INFO [org.acm.fun.SimpleFunctionChain] (executor-thread-1) *** defaultChain ***
funqy-knative-events-quickstart-v1-deployment-59bb88bcf4-9jwdx user-container 2020-05-12 13:44:02,365 INFO [org.acm.fun.SimpleFunctionChain] (executor-thread-2) *** configChain ***
funqy-knative-events-quickstart-v1-deployment-59bb88bcf4-9jwdx user-container 2020-05-12 13:44:02,394 INFO [org.acm.fun.SimpleFunctionChain] (executor-thread-1) *** annotatedChain ***
funqy-knative-events-quickstart-v1-deployment-59bb88bcf4-9jwdx user-container 2020-05-12 13:44:02,466 INFO [org.acm.fun.SimpleFunctionChain] (executor-thread-2) *** lastChainLink ***
funqy-knative-events-quickstart-v1-deployment-59bb88bcf4-9jwdx user-container 2020-05-12 13:44:02,467 INFO [org.acm.fun.SimpleFunctionChain] (executor-thread-2) Start::defaultChain::configChain::annotatedChain::lastChainLink