Apache PulsarによるQuarkusメッセージングの開始
このガイドでは、QuarkusアプリケーションがQuarkus Messagingを利用してApache Pulsarとやりとりする方法を示します。
前提条件
このガイドを完成させるには、以下が必要です:
-
約15分
-
IDE
-
JDK 17+がインストールされ、
JAVA_HOME
が適切に設定されていること -
Apache Maven 3.9.9
-
Docker と Docker Compose、または Podman 、および Docker Compose
-
使用したい場合は、 Quarkus CLI
-
ネイティブ実行可能ファイルをビルドしたい場合、MandrelまたはGraalVM(あるいはネイティブなコンテナビルドを使用する場合はDocker)をインストールし、 適切に設定していること
アーキテクチャ
このガイドでは、Pulsar と通信する 2 つのアプリケーションを開発します。最初のアプリケーションは 見積りリクエスト を Pulsar に送信し、見積り トピックからの Pulsar メッセージを消費します。2 つ目のアプリケーションは 見積りリクエスト を受信し、見積り を送り返します。

1つ目のアプリケーションである プロデューサー は、ユーザーが HTTP エンドポイントを介していくつかの見積をリクエストできるようにします。見積リクエストごとにランダムな識別子が生成されてユーザーに返され、見積リクエストを 保留 としてマークします。同時に、生成されたリクエスト ID は Pulsar トピック quote-requests
を介して送信されます。

2 つ目のアプリケーションである processor は、 quote-requests
トピックから読み取り、見積にランダムな価格を設定し、 quotes
という名前の Pulsar トピックに送信します。
最後に、プロデューサー は見積を読み取り、サーバーから送信されたイベントを使用してブラウザーに送信します。したがって、ユーザーには、見積価格が 保留 から受信した価格にリアルタイムで更新されていることがわかります。
ソリューション
次の章で紹介する手順に沿って、ステップを踏んでアプリを作成することをお勧めします。ただし、完成した例にそのまま進んでも構いません。
Gitレポジトリをクローンするか git clone https://github.com/quarkusio/quarkus-quickstarts.git
、 アーカイブ をダウンロードします。
解決策は、 pulsar-quickstart
ディレクトリー に配置されています。
Mavenプロジェクトの作成
まず、 producer と processor の2つのプロジェクトを作成する必要があります。
producer プロジェクトを作成するには、ターミナルで次のように実行します:
Windowsユーザーの場合:
-
cmdを使用する場合、(バックスラッシュ
\
を使用せず、すべてを同じ行に書かないでください)。 -
Powershellを使用する場合は、
-D
パラメータを二重引用符で囲んでください。例:"-DprojectArtifactId=pulsar-quickstart-producer"
このコマンドは、プロジェクト構造を作成し、使用する 2 つの Quarkus エクステンションを選択します。
-
Quarkus REST (以前の RESTEasy Reactive) とその Jackson サポート (JSON を処理するため) による HTTP エンドポイントの提供。
-
リアクティブメッセージング用の Pulsar コネクター
processor プロジェクトを作成するには、同じディレクトリから、次のように実行します:
Windowsユーザーの場合:
-
cmdを使用する場合、(バックスラッシュ
\
を使用せず、すべてを同じ行に書かないでください)。 -
Powershellを使用する場合は、
-D
パラメータを二重引用符で囲んでください。例:"-DprojectArtifactId=pulsar-quickstart-processor"
その時点で、次の構造になっているはずです。
.
├── pulsar-quickstart-processor
│ ├── README.md
│ ├── mvnw
│ ├── mvnw.cmd
│ ├── pom.xml
│ └── src
│ └── main
│ ├── docker
│ ├── java
│ └── resources
│ └── application.properties
└── pulsar-quickstart-producer
├── README.md
├── mvnw
├── mvnw.cmd
├── pom.xml
└── src
└── main
├── docker
├── java
└── resources
└── application.properties
2つのプロジェクトをお好みのIDEで開きます。
Dev Services
開発モードを使用するとき、またはテストのために、Pulsar ブローカーを起動する必要はありません。Quarkus が自動的にブローカーを開始します。詳細については、Dev Services for Pulsar を参照してください。 |
見積オブジェクト
Quote
クラスは、producer および processor の両方のプロジェクトで使用されます。簡単にするために、ここではクラスを複製します。どちらのプロジェクトでも、次の内容の src/main/java/org/acme/pulsar/model/Quote.java
ファイルを作成します。
package org.acme.pulsar.model;
public class Quote {
public String id;
public int price;
/**
* Default constructor required for Jackson serializer
*/
public Quote() { }
public Quote(String id, int price) {
this.id = id;
this.price = price;
}
@Override
public String toString() {
return "Quote{" +
"id='" + id + '\'' +
", price=" + price +
'}';
}
}
Quote
オブジェクトの JSON 表現は、Pulsar トピックに送信されるメッセージ、および Web ブラウザーに送信されるサーバー送信イベントで使用されます。
Quarkus には、JSON Pulsar メッセージを処理する機能が組み込まれています。次のセクションでは、Jackson のシリアライザー/デシリアライザークラスを作成します。
見積もり依頼の送信
プロデューサー プロジェクト内に、 src/main/java/org/acme/pulsar/producer/QuotesResource.java
ファイルを作成し、次のコンテンツを追加します。
package org.acme.pulsar.producer;
import java.util.UUID;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.acme.pulsar.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
@Path("/quotes")
public class QuotesResource {
@Channel("quote-requests")
Emitter<String> quoteRequestEmitter; (1)
/**
* Endpoint to generate a new quote request id and send it to "quote-requests" Pulsar topic using the emitter.
*/
@POST
@Path("/request")
@Produces(MediaType.TEXT_PLAIN)
public String createRequest() {
UUID uuid = UUID.randomUUID();
quoteRequestEmitter.send(uuid.toString()); (2)
return uuid.toString(); (3)
}
}
1 | Reactive Messaging Emitter を注入して、 quote-requests チャンネルにメッセージを送信します。 |
2 | ポストリクエストで、ランダムな UUID を生成し、エミッターを使用してそれを Pulsar トピックに送信します。 |
3 | 同じ UUID をクライアントに返します。 |
quote-requests
チャネルは、クラスパス上の唯一のコネクターであるため、Pulsar トピックとして管理されます。特に明記されていない限り、この例のように、Quarkus はチャネル名をトピック名として使用します。したがって、この例では、アプリケーションは quote-requests
トピックに書き込みます。Quarkus は、 Emitter
が String
値を生成することを検出するため、シリアライザーも自動的に設定します。
複数のコネクターがある場合は、アプリケーション設定で使用するコネクターを指定する必要があります。 |
見積依頼の処理
ここでは、見積りリクエストを使用して価格を提示します。プロセッサー プロジェクト内に、 src/main/java/org/acme/pulsar/processor/QuotesProcessor.java
ファイルを作成し、次のコンテンツを追加します。
package org.acme.pulsar.processor;
import java.util.Random;
import jakarta.enterprise.context.ApplicationScoped;
import org.acme.pulsar.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.reactive.messaging.annotations.Blocking;
/**
* A bean consuming data from the "quote-requests" Pulsar topic (mapped to "requests" channel) and giving out a random quote.
* The result is pushed to the "quotes" Pulsar topic.
*/
@ApplicationScoped
public class QuotesProcessor {
private Random random = new Random();
@Incoming("requests") (1)
@Outgoing("quotes") (2)
@Blocking (3)
public Quote process(String quoteRequest) throws InterruptedException {
// simulate some hard working task
Thread.sleep(200);
return new Quote(quoteRequest, random.nextInt(100));
}
}
1 | メソッドが requests チャネルからのアイテムを消費することを示します。 |
2 | メソッドによって返されるオブジェクトが quotes チャネルに送信されることを示します。 |
3 | 処理が blocking であり、呼び出し元のスレッドでは実行できないことを示します。 |
quote-requests
トピックからのすべての Pulsar message について、Reactive Messaging は process
メソッドを呼び出し、返された Quote
オブジェクトを quotes
チャネルに送信します。この場合、 application.properties
ファイルでチャネルを設定して、 requests
および quotes
チャネルを設定する必要があります。
%dev.quarkus.http.port=8081
# Configure the incoming `quote-requests` Pulsar topic
mp.messaging.incoming.requests.topic=quote-requests
mp.messaging.incoming.requests.subscriptionInitialPosition=Earliest
この場合、着信側と発信側のコネクタ設定が1つずつあり、それぞれに明確な名前が付けられていることに注意してください。 設定プロパティは以下のような構造になっています:
mp.messaging.[outgoing|incoming].{channel-name}.property=value
channel-name
セグメントは、 @Incoming
および @Outgoing
アノテーションで設定された値と一致する必要があります。
-
quote-requests
→ この Pulsar トピックから見積リクエストを読みます -
quotes
→ この Pulsar トピックに見積を書き込みます
この設定の詳細は、Pulsar ドキュメントの https://pulsar.apache.org/docs/3.0.x/concepts-messaging/ セクションを参照してください。
これらのプロパティーは、プレフィックス |
mp.messaging.incoming.requests.subscriptionInitialPosition=Earliest
は、以前に確認応答されたメッセージがない場合に、トピックの最初のメッセージからトピックの読み取りを開始するようにアプリケーションに指示します。
つまり、プロセッサーアプリケーションを起動する前に送信されたメッセージも処理します。
スキーマを設定する必要はありません。
Quarkus はそれらを検出し、1 つも見つからない場合は適切なスキーマタイプを使用して生成します。
Quote
Bean のような構造化型は JSON スキーマを使用します。
見積の受け取り
プロデューサー プロジェクトに戻ります。 QuotesResource
を変更して、Pulsarからの見積りを消費し、サーバー送信イベントを介してクライアントに送り返します。
import io.smallrye.mutiny.Multi;
...
@Channel("quotes")
Multi<Quote> quotes; (1)
/**
* Endpoint retrieving the "quotes" Pulsar topic and sending the items to a server sent event.
*/
@GET
@Produces(MediaType.SERVER_SENT_EVENTS) (2)
public Multi<Quote> stream() {
return quotes; (3)
}
1 | @Channel 修飾子を使用して quotes のチャネルを注入します |
2 | Server Sent Events を使用してコンテンツが送信されたことを示します。 |
3 | ストリーム (Reactive Stream) を返します。 |
Quarkus は quotes
チャネルを quotes
Pulsar トピックに自動的に関連付けるため、何も設定する必要はありません。また、 Quote
クラスのデシリアライザーも生成します。
Pulsar のメッセージスキーマ
この例では、Pulsar メッセージで JSON スキーマを使用しています。 Pulsar スキーマの詳細は、 Pulsar リファレンスガイド - スキーマ を参照してください。 |
HTML ページ
見積をリクエストし、SSE で取得した価格を表示する HTML ページの最終調整を行います。
pulsar-quickstart-producer プロジェクト内に、次の内容の src/main/resources/META-INF/resources/quotes.html
ファイルを作成します。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Prices</title>
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
<div class="card">
<div class="card-body">
<h2 class="card-title">Quotes</h2>
<button class="btn btn-info" id="request-quote">Request Quote</button>
<div class="quotes"></div>
</div>
</div>
</div>
</body>
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script>
$("#request-quote").click((event) => {
fetch("/quotes/request", {method: "POST"})
.then(res => res.text())
.then(qid => {
var row = $(`<h4 class='col-md-12' id='${qid}'>Quote # <i>${qid}</i> | <strong>Pending</strong></h4>`);
$(".quotes").prepend(row);
});
});
var source = new EventSource("/quotes");
source.onmessage = (event) => {
var json = JSON.parse(event.data);
$(`#${json.id}`).html((index, html) => {
return html.replace("Pending", `\$\xA0${json.price}`);
});
};
</script>
</html>
ここでは何も特別なことはありません。ユーザーがボタンをクリックすると、見積をリクエストするための HTTP リクエストが作成され、保留中の見積がリストに追加されます。SSE を介して受け取った見積もりごとに、リスト内の対応するアイテムが更新されます。
実行
両方のアプリケーションを実行する必要があります。1 つの端末で、次を実行します。
mvn -f pulsar-quickstart-producer quarkus:dev
別の端末で、次を実行します。
mvn -f pulsar-quickstart-processor quarkus:dev
Quarkus は、Pulsar ブローカーを自動的に起動し、アプリケーションを設定して、異なるアプリケーション間で Pulsar ブローカーインスタンスを共有します。詳細については、Dev Services for Pulsar を参照してください。
http://localhost:8080/quotes.html
をブラウザで開き、ボタンをクリックして見積を依頼してください。
JVM またはネイティブモードでの実行
開発モードまたはテストモードで実行していない場合は、Pulsar ブローカーを起動する必要があります。
docker でスタンドアロン Pulsar クラスターを実行する の手順にを実行するか、次の内容の docker-compose.yaml
ファイルを作成します。
version: '3.8'
services:
pulsar:
image: apachepulsar/pulsar:3.2.4
command: [
"sh", "-c",
"bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone -nfw -nss"
]
ports:
- "6650:6650"
- "8080:8080"
tmpfs:
- /pulsar/data
healthcheck:
test: curl --fail http://localhost:8080/admin/v2/clusters || exit 1
interval: 10s
timeout: 10s
retries: 5
start_period: 5s
environment:
PULSAR_PREFIX_advertisedListeners: internal:pulsar://localhost:6650,external:pulsar://pulsar:6650
PULSAR_PREFIX_transactionCoordinatorEnabled: true
PULSAR_PREFIX_systemTopicEnabled: true
networks:
- pulsar-quickstart-network
producer:
image: quarkus-quickstarts/pulsar-quickstart-producer:1.0-${QUARKUS_MODE:-jvm}
depends_on:
pulsar:
condition: service_healthy
build:
context: pulsar-quickstart-producer
dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
deploy:
restart_policy:
condition: on-failure
environment:
PULSAR_CLIENT_SERVICE_URL: pulsar://pulsar:6650
ports:
- "8082:8080"
networks:
- pulsar-quickstart-network
processor:
image: quarkus-quickstarts/pulsar-quickstart-processor:1.0-${QUARKUS_MODE:-jvm}
depends_on:
pulsar:
condition: service_healthy
build:
context: pulsar-quickstart-processor
dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
deploy:
restart_policy:
condition: on-failure
environment:
QUARKUS_HTTP_PORT: 8082
PULSAR_CLIENT_SERVICE_URL: pulsar://pulsar:6650
ports:
- "8083:8080"
networks:
- pulsar-quickstart-network
networks:
pulsar-quickstart-network:
name: pulsar-quickstart
最初に、次のコマンドを使用して両方のアプリケーションを JVM モードでビルドします。
mvn -f pulsar-quickstart-producer package
mvn -f pulsar-quickstart-processor package
パッケージ化したら、 docker-compose up
を実行します。
これは開発クラスターであり、本番では使用しないでください。 |
アプリケーションをネイティブ実行可能ファイルとしてビルドし、実行することもできます。まず、両方のアプリケーションをネイティブとしてコンパイルします。
mvn -f pulsar-quickstart-producer package -Dnative -Dquarkus.native.container-build=true
mvn -f pulsar-quickstart-processor package -Dnative -Dquarkus.native.container-build=true
次のコマンドでシステムを実行します。
export QUARKUS_MODE=native
docker-compose up --build
さらに詳しく
このガイドでは、Quarkus を使用して Pulsar とやりとりする方法を示しました。 SmallRye Reactive Messaging を利用して、データストリーミングアプリケーションを構築します。
機能と設定オプションの完全なリストについては、Apache Pulsar エクステンションのリファレンスガイド を確認してください。
このガイドでは、Quarkus Messaging エクステンションを使用して Apache Pulsar と対話する方法について説明します。 Pulsar クライアントディレクトリーの使用。 |