クラウドイベントをKafkaで送受信
クラウドイベントは、イベントを記述するための仕様です。相互運用性を容易にすることを目的としています。イベント駆動型アーキテクチャの台頭により、Cloud Eventsの人気が高まっているのは驚くに値しません。
この記事では、Quarkus、Kafka、Reactive Messagingを使用してクラウドイベントを生成して消費する方法を説明します。
クラウドイベントとは?
まずは「なぜ」を見てみましょう。イベントはどこにでもあります。最近のシステムの多くは、何らかの方法でイベントを使用しています。イベントは、イベントソーシングの実装、事実の伝達、帯域外処理のトリガー、通知の送信などに使用できます。イベントはどんなシステムにも欠かせないものとなっています。
しかし、イベント発行者はイベントを異なる形で表現する傾向があります。内容が違うという意味ではありませんが、エンベロープとイベントのフォーマットは異種混在で、たとえこれらのイベントが同じイベントメッシュ上を通過するとしてもです。アプリケーションの中には JSON を選択してイベントのペイロードにすべてをエンコードするものもあれば、Avro や Protobuf のようなバイナリー形式を好み、ヘッダやプロパティーのようなプロトコル機能を使って、ラップされたペイロードに関するメタデータを転送するものもあります。イベント駆動型アーキテクチャは外部システムとの統合を容易にすると主張していますが、この不均衡はその逆を行っています。イベントをあるフォーマットから別のフォーマットに適応させることだけを目的とした特定のイベントトランスレータを必要とすることは珍しくありません。
では、クラウドイベントとは何か?クラウドイベントは、イベントを記述するための共通の方法を提案しています。目的は明らかに相互運用性と統合負担の軽減です。クラウドイベント1.0はほぼ1年前にリリースされました。この1年で、AzureやOracleなど、多くのクラウドプロバイダーがこの形式を採用しました。また、Knative、Kogito、Debezium、Quarkusなど、いくつかのミドルウェアがクラウドイベントのサポートを追加しています。
例を見せてください!
さてさて、どんな感じでしょうか?クラウドイベントを理解する一番簡単な方法は、そのうちの一つを見てみることです:
{
"specversion" : "1.0",
"id" : "O234-345-890",
"source" : "https://reactive-coffee-shop.io/1234/order",
"type" : "me.escoffier.coffee.Order",
"subject" : "order",
"time" : "2020-11-25T09:05:00Z",
"datacontenttype" : "application/json",
"data" : "{\"name\": \"clement\", \"order\":\"espresso\"}",
"custom-attribute" : "some custom value"
}
このイベントはJSONで記述されていますが、それは可能性の一つに過ぎません。フィールドを見てみましょう。
まず、 specversion
は、使用しているクラウドイベントのバージョン(1.0)を示しています。 id
フィールドは、その特定のイベントの ID を提供します。 source
属性は、イベントソースを識別する URI であり、イベントが発生したコンテキストや、特定のイベントを発生させたアプリケーションを示します。 id
と source
を組み合わせることで、一意な識別子が得られます。このような一意性は、冪等性を実装し、潜在的な重複を処理するために不可欠です。 type
は最後の必須属性です。これはイベントの タイプ を示します。ここでは完全修飾クラス名を使用していますが、何を想像しても構いません。システムで定義されているイベントの種類を参照する必要があります。
datacontenttype
は data
属性の content-type を定義します。 subject
では、コンテキストやイベントのタイプに関する追加のヒントなど、イベントに関する追加の詳細を渡すことができます。 time
はタイムスタンプで、一般的には作成時刻を示します。私の例では使用していない別のオプションの属性があります。 dataschema
属性は、イベントデータのスキーマを渡すことができます。
data
属性には、ラップされたビジネスイベントが含まれています。これは本質的な部分であり、他の属性はその特定のビジネスイベントについての詳細を提供しているだけです。
拡張 を定義することもできます。これらの拡張は、提案された属性のセットがユースケースに十分でない場合に使用されるカスタム属性のセットになります。
これだ!これだ!と思ったイベントをまとめてみました。ということで、クラウドイベントを、 イベントを理解するために必要な最低限のメタデータ、つまり、イベントの ソース、ID、タイプ、ビジネスデータとしてまとめることができます。
転送におけるクラウドイベント - バインディング
しかし、これらのイベントはどのようにエンコードされるのでしょうか?JSON を使った先ほどの例もいいですが、プロトコルによっては、独自の機能を活用してこれらのメタデータを送信したい場合もあるでしょう。
そのため、Cloud Eventsではバインディングも提案しています。バインディングとは、あるプロトコルに固有の推奨事項のセットです。各プロトコルがどのようにCloud Eventsをエンコードすべきかを説明します。例えば、HTTP用のバインディング、Kafka用のバインディング、AMQP用のバインディングがあります。
これらのバインディングのほとんどは、2つのアプローチを提案しています:
-
structured
-
binary
構造化されたアプローチは、イベントのメタデータとデータをメッセージやリクエストのペイロードにまとめて保持します。一般的には JSON を使用してデータをエンコードします。Cloud Event の例 (上記) を HTTP リクエストで渡すと、構造化モードを使用します。また、Kafka レコードの値にその JSON スニペットを書き込む場合も構造化モードを使用します。
構造化されたアプローチでは、複数のプロトコルをまたいで簡単に転送することができます。しかし、効率的でない場合があり、ビジネスデータの種類を制約する場合があります。
もう一つのアプローチはプロトコルの能力に依存しており、効率的な転送とエンコードを可能にします。Kafkaでバイナリーモードを使用する場合、Kafkaレコードの値に data
属性の値を格納し、レコードのヘッダを使用して他の属性を渡すことになります。その結果、Avroなどのバイナリープロトコルを使ってビジネスデータをエンコードすることができ、効率化につながります。
残りの記事では、Quarkus、Kafka、Reactive Messagingを使用してクラウドイベントを送受信する方法を説明しています。
クラウドイベントをKafkaで送信する
Quarkusが使用するKafkaコネクタは、Cloud Eventsをビルトインでサポートしています。構造化モード(JSONペイロードですべてをエンコード)またはバイナリーモード(Kafkaヘッダーを使用)を使用して、クラウドイベントを送信したり消費したりすることができます。
送信メッセージをクラウドイベントとして記述するには、チャンネルに cloud-events-type
と cloud-events-source
属性を指定するだけです。
mp.messaging.outgoing.generated-price.connector=smallrye-kafka
mp.messaging.outgoing.generated-price.topic=prices
mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.generated-price.cloud-events-source=price-generators
mp.messaging.outgoing.generated-price.cloud-events-type=price
mp.messaging.outgoing.generated-price.cloud-events-subject=generated-prices
デフォルトでは、コネクタはバイナリーモードを使用してクラウドイベントを書き込みます。コネクタはメッセージごとにランダムな id
を生成します。 cloud-events-$attribute
を使用して、 cloud-events-subject
などの他のクラウドイベント属性をカスタマイズすることもできます。
上記の設定は、すべての送信メッセージに適用されます。場合によっては、各メッセージの値を個別にカスタマイズしたいこともあるでしょう。これを実現するには、メッセージに io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata
を添付して、各メッセージの id、ソース、タイプ、件名をカスタマイズすることもできます:
@Outgoing("cloud-events")
public Message<String> toCloudEvents(Message<String> in) {
return in.addMetadata(OutgoingCloudEventMetadata.builder()
.withId("id-" + in.getPayload())
.withType("greetings")
.withSource(URI.create("http://example.com"))
.withSubject("greeting-message") .build());
}
コネクタは 構造化 モードもサポートしています。 cloud-events-mode
属性を structured
に設定することで、構造化された Cloud Events を書くことができます。現時点では JSON のみサポートしています。書き込まれたレコードの content-type
ヘッダーは application/cloudevents+json; charset=UTF-8
に設定され、これにより受信者はそれが構造化されたクラウドイベントであることを理解することができます。
クラウドイベントをKafkaから消費する
当然のことながら、コネクタはクラウドイベントを消費することもできます。コネクタは、レコードのヘッダをチェックすることで、自動的にクラウドイベントを検出します。また、モードも決定します。
コネクタはクラウドイベントを受信すると、メッセージのメタデータに IncomingKafkaCloudEventMetadata
をアタッチします。そのため、拡張だけでなく様々な属性を取得することができます。
public Message<Double> process(Message<Integer> priceInUsd) {
IncomingCloudEventMetadata<Integer> cloudEventMetadata = priceInUsd.getMetadata(IncomingCloudEventMetadata.class)
.orElseThrow(() -> new IllegalArgumentException("Expected a Cloud Event"));
LOGGER.infof("Received Cloud Events (spec-version: %s): source: '%s', type: '%s', subject: '%s' ",
cloudEventMetadata.getSpecVersion(),
cloudEventMetadata.getSource(),
cloudEventMetadata.getType(),
cloudEventMetadata.getSubject().orElse("no subject"));
return priceInUsd.withPayload(Integer.valueOf(priceInUsd.getPayload()) * CONVERSION_RATE);
}
まとめ
イベント駆動型アーキテクチャの台頭に伴い、クラウドイベントの人気が高まっています。Quarkus 1.9以降、Quarkusで使用されているKafka ConnectorはCloud Eventsをビルトインでサポートしています。この投稿では、Cloud Eventsを紹介し、Cloud Eventsを簡単に書いたり読んだりする方法を紹介しました。
Many more options are available, and Kafka is not the only part of Quarkus with Cloud Events support. For example, Funqy[https://quarkus.io/guides/funqy#context-injection] also supports Cloud Event out of the box.