The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Funqy Knative Events バインディング

Quarkus Funqy Knative Events builds off of the Funqy HTTP extension to allow you to route and process Knative Events within a Funqy function.

このガイドでは、クイックスタートコードを使って、Knative Events を使って Funqy 関数をデプロイして呼び出す方法を説明します。

前提条件

このガイドを完成させるには、以下が必要です:

Knative の設定

Minikube 環境で Knative をローカルに設定することは、このガイドの範囲を超えています。Red HatがまとめたKnativeチュートリアル に従うことをお勧めします。このチュートリアルでは、ローカル環境でMinikubeやOpenShift上でKnativeをセットアップする方法を説明しています。

Specifically you should run the Brokers and Triggers tutorial as this guide requires that you can invoke on a Broker to trigger the quickstart code.

Cloud Events についての記事を読む

Knative Events をより深く理解するために、Cloud Events の 仕様 を読んでおくと良いでしょう。

クイックスタート

Gitレポジトリをクローンするか git clone https://github.com/quarkusio/quarkus-quickstarts.gitアーカイブ をダウンロードします。

ソリューションは funqy-knative-events-quickstart ディレクトリ にあります。

クイックスタートの流れ

クイックスタートは、Cloud Eventを含む HTTP リクエストを curl を使用して Knative Broker に手動で送信することで動作します。Knative Broker はリクエストを受信し、クイックスタートによって構築された Funqy コンテナの起動をトリガーします。イベントは、一連の Funqy 関数の呼び出しをトリガーします。1 つの関数の出力は、別の Funqy 関数の呼び出しをトリガーします。

Funqy と Cloud Events

Knative Events環境内で稼働する場合、Funqy関数は特定のタイプの Cloud Event によってトリガーされます。1つのアプリケーション/デプロイメント内に複数のFunqy関数を持つことができますが、それらは特定のタイプの Cloud Event によってトリガーされなければなりません。このルールの例外は、アプリケーション内にFunqy関数が1つしかない場合です。この場合、イベントは Cloud Event タイプに関係なく、その関数にプッシュされます。

現在、FunqyはJSONベースのデータしか消費できません。それ自体はバイナリモードと構造化モードの両方をサポートしていますが、Cloud EventメッセージのデータコンポーネントはJSONでなければなりません。このJSONは、関数のJavaパラメータやリターンタイプとの間でマーシャル可能でなければなりません。

コード

Knative イベントがどのように Funqy にマップされるかを理解するために、クイックスタートのコードを見てみましょう。クイックスタートコードは、 SimpleFunctionChain.java を開いてください。

最初に見る関数は defaultChain です。

import io.quarkus.funqy.Funq;

public class SimpleFunctionChain {
    @Funq
    public String defaultChain(String input) {
        log.info("*** defaultChain ***");
        return input + "::" + "defaultChain";
    }

この状態で、Funqy関数はデフォルトの Cloud Event マッピングを持っています。デフォルトでは、 Cloud Event のタイプは、トリガする関数の関数名と一致していなければなりません。関数が出力を返す場合、レスポンスは Cloud Event に変換され、ブローカに返されて他のトリガーにルーティングされます。このレスポンスのデフォルトの Cloud Event タイプは、関数名 + .output です。デフォルトの Cloud Event のソースは、関数名です。

したがって、 defaultChain 関数の場合、この関数をトリガーする Cloud Event のタイプは defaultChain です。これは、タイプが defaultChain.output でイベントソースが defaultChain である新しい Cloud Event をトリガーするレスポンスを生成します。

デフォルトのマッピングはシンプルですが、必ずしもうまくいくとは限りません。このデフォルトのマッピングは設定で変更することができます。次の関数を見てみましょう。

import io.quarkus.funqy.Funq;

public class SimpleFunctionChain {
    @Funq
    public String configChain(String input) {
        log.info("*** configChain ***");
        return input + "::" + "configChain";
    }

configChain 関数は、 application.properties 内の設定により、 Cloud Event のマッピングが変更されています。

quarkus.funqy.knative-events.mapping.configChain.trigger=defaultChain.output
quarkus.funqy.knative-events.mapping.configChain.response-type=annotated
quarkus.funqy.knative-events.mapping.configChain.response-source=configChain

この場合、この構成では、受信するCloud Eventのタイプ defaultChain.outputconfigChain 関数にマッピングします。 configChain 関数は、そのレスポンスを annotated Cloud Event タイプに、Cloud Event ソース configChain にマッピングします。

  • quarkus.funqy.knative-events.mapping.{function name}.trigger は、特定の機能をトリガーする Cloud Event タイプを設定します。特別な値 * をキャッチオール値として使用することができます。 この場合、この関数はすべてのイベントタイプに使用されます。

  • quarkus.funqy.knative-events.mapping.{function name}.response-type は、レスポンスの Cloud Event タイプを設定します。

  • quarkus.funqy.knative-events.mapping.{function name}.resource-source は、レスポンスの Cloud Event のソースを設定します。

Funqy Knative Events エクステンションには、この Cloud Event の関数へのマッピングを行うためのアノテーションも用意されています。 annotatedChain メソッドを見てみましょう。

import io.quarkus.funqy.Funq;
import io.quarkus.funqy.knative.events.CloudEventMapping;

public class SimpleFunctionChain {
    @Funq
    @CloudEventMapping(trigger = "annotated", responseSource = "annotated", responseType = "lastChainLink")
    public String annotatedChain(String input) {
        log.info("*** annotatedChain ***");
        return input + "::" + "annotatedChain";
    }

関数に @CloudEventMapping アノテーションを使用すると、Cloud Event タイプのトリガーとCloud Event レスポンスをマッピングできます。この例では、 annotatedChain 関数は annotated Cloud Event タイプによってトリガされ、レスポンスは lastChainLink タイプと annotated Cloud Eventソースにマッピングされます。

So, if you look at all the functions defined within SimpleFunctionChain you’ll notice that one function triggers the next. The last function that is triggered is lastChainLink.

import io.quarkus.funqy.Context;
import io.quarkus.funqy.Funq;

public class SimpleFunctionChain {
    @Funq
    public void lastChainLink(String input, @Context CloudEvent event) {
        log.info("*** lastChainLink ***");
        log.info(input + "::" + "lastChainLink");
    }
}

この機能には2つの注意点があります。一つは、この関数には出力がありません。あなたの関数は出力を返す必要はありません。第二に、この関数には追加の event パラメータがあります。

If you want to know additional information about the incoming Cloud Event, you can inject the io.quarkus.funqy.knative.events.CloudEvent interface using the Funqy @Context annotation. The CloudEvent interface exposes information about the triggering event.

public interface CloudEvent {
    String id();
    String specVersion();
    String source();
    String subject();
    OffsetDateTime time();
}

Maven

If you look at the POM, you’ll see that it is a typical Quarkus POM that pulls in one Funqy dependency:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-funqy-knative-events</artifactId>
</dependency>

開発モードとテスト

Funqy Knative Events support dev mode and unit testing using RestAssured. You can invoke on Funqy Knative Events functions using the same invocation model as Funqy HTTP using normal HTTP requests, or Cloud Event Binary mode, or Structured Mode. All invocation modes are supported at the same time.

そのため、 FunqyTest.java のユニットテストコードを開くと、関数をテストするためのHTTP呼び出しを行うために、単にRestAssuredを使用していることがわかります。

FunqyはQuarkus 開発モードにも対応しています!

プロジェクトのビルド

まず、Javaアーティファクトをビルドします。

CLI
quarkus build
Maven
./mvnw clean package

次に、Knativeではdockerイメージが必要なので、続いてそれをビルドする必要があります。

docker build -f src/main/docker/Dockerfile.jvm -t yourAccountName/funqy-knative-events-quickstart .

docker build を実行する際には、必ず yourAccountName を docker または quay のアカウント名に置き換えてください。Dockerfileは標準のQuarkusのdockerfileです。特別なKnativeマジックはありません。

イメージをDocker HubやQuayにプッシュする

docker push yourAccountName/funqy-knative-events-quickstart

繰り返しになりますが、 docker push を実行する際には、 yourAccountName を docker または quay のアカウント名に置き換えてください。

Kubernetes/OpenShiftへのデプロイ

The first step is to set up the broker in our namespace. Following is an example command from the Knative cli.

kn broker create default \
  --namespace knativetutorial

The broker we have created is called default, this broker will receive the cloud events. The broker is also referenced in the function YAML files.

The second step is to define a Kubernetes/OpenShift service to point to the Docker image you created and pushed during build. Take a look at funqy-service.yaml

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: funqy-knative-events-quickstart
spec:
  template:
    metadata:
      name: funqy-knative-events-quickstart-v1
      annotations:
        autoscaling.knative.dev/target: "1"
    spec:
      containers:
        - image: docker.io/yourAccountName/funqy-knative-events-quickstart

これはKubernetesの標準的なサービス定義YAMLファイルです。

Make sure you change the image URL to point to the image you built and pushed earlier!

For our quickstart, one Kubernetes service will contain all functions. There’s no reason you couldn’t break up this quickstart into multiple different projects and deploy a service for each function. For simplicity, and to show that you don’t have to have a deployment per function, the quickstart combines everything into one project, image, and service.

サービスyamlをデプロイします。

kubectl apply -n knativetutorial -f src/main/k8s/funqy-service.yaml

The next step is to deploy Knative Event triggers for each of the event types. As noted in the code section, each Funqy function is mapped to a specific Cloud Event type. You must create Knative Event triggers that map a Cloud Event and route it to a specific Kubernetes service. We have 4 different triggers.

apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
  name: defaultchain
spec:
  broker: default
  filter:
    attributes:
      type: defaultChain
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: funqy-knative-events-quickstart

spec:filter:attributes:type は、 spec:subscriber:ref で定義されている Kubernetes サービスにCloud Eventのタイプをマッピングします。Cloud EventがBrokerにプッシュされると、そのイベントにマッピングされたサービスのスピンアップがトリガーされます。

There’s a trigger YAML file for each of our 4 Funqy functions. Deploy them all:

kubectl apply -n knativetutorial -f src/main/k8s/defaultChain-trigger.yaml
kubectl apply -n knativetutorial -f src/main/k8s/configChain-trigger.yaml
kubectl apply -n knativetutorial -f src/main/k8s/annotatedChain-trigger.yaml
kubectl apply -n knativetutorial -f src/main/k8s/lastChainLink-trigger.yaml

デモの実行

You’ll need two different terminal windows. One to do a curl request to the Broker, the other to watch the pod log files, so you can see the messages flowing through the Funqy function event chain.

stern ツールがインストールされていることを確認してください。それについての情報は Knative チュートリアルのセットアップを参照してください。Funqy デプロイメントが出力したログを探すために stern を実行します。

stern funq user-container

別の端末を開きます。まずブローカーのURLを知る必要があります。このコマンドを実行して探します。

kubectl get broker default -o jsonpath='{.status.address.url}'

This will provide you a URL similar to e.g.: http://broker-ingress.knative-eventing.svc.cluster.local/knativetutorial/default. Remember this URL.

Next thing we need to do is ssh into our Kubernetes cluster so that we can send a curl request to our broker. The following command will create a simple OS pod so we can curl into our functions.

kubectl -n knativetutorial apply -f src/main/k8s/curler.yaml

You might need to wait a couple of seconds until the curler pod comes up. Run the following to get bash access to the curler pod:

kubectl -n knativetutorial exec -it curler -- /bin/bash

You will now be in a shell within the Kubernetes cluster. Within the shell, execute this curl command , the broker address is an example and might differ based on your project or broker name.

curl -v "http://default-broker.knativetutorial.svc.cluster.local" \
-X POST \
-H "Ce-Id: 1234" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: defaultChain" \
-H "Ce-Source: curl" \
-H "Content-Type: application/json" \
-d '"Start"'

これは、ブローカーに Knative イベントをポストし、 defaultChain 関数をトリガーします。前述したように、 defaultChain の出力は、 configChain に投稿されたイベントをトリガーにして、 annotatedChain に投稿されたイベントをトリガーにして、最後に lastChainLink 関数をトリガーにします。この流れは stern ウィンドウで見ることができます。以下のようなものが出力されるはずです。

funqy-knative-events-quickstart-v1-deployment-59bb88bcf4-9jwdx user-container 2020-05-12 13:44:02,256 INFO  [org.acm.fun.SimpleFunctionChain] (executor-thread-1) *** defaultChain ***
funqy-knative-events-quickstart-v1-deployment-59bb88bcf4-9jwdx user-container 2020-05-12 13:44:02,365 INFO  [org.acm.fun.SimpleFunctionChain] (executor-thread-2) *** configChain ***
funqy-knative-events-quickstart-v1-deployment-59bb88bcf4-9jwdx user-container 2020-05-12 13:44:02,394 INFO  [org.acm.fun.SimpleFunctionChain] (executor-thread-1) *** annotatedChain ***
funqy-knative-events-quickstart-v1-deployment-59bb88bcf4-9jwdx user-container 2020-05-12 13:44:02,466 INFO  [org.acm.fun.SimpleFunctionChain] (executor-thread-2) *** lastChainLink ***
funqy-knative-events-quickstart-v1-deployment-59bb88bcf4-9jwdx user-container 2020-05-12 13:44:02,467 INFO  [org.acm.fun.SimpleFunctionChain] (executor-thread-2) Start::defaultChain::configChain::annotatedChain::lastChainLink