بهعنوان تامینکننده Reactor Core، مشتاقم که نحوه استفاده از Reactor Core برای جریان داده در Google Cloud Pub/Sub را با شما به اشتراک بگذارم. Google Cloud Pub/Sub یک سرویس پیام رسانی در زمان واقعی کاملاً مدیریت شده است که به شما امکان ارسال و دریافت پیام بین برنامه های کاربردی مستقل را می دهد. از سوی دیگر، Reactor Core یک کتابخانه برنامه نویسی واکنشی برای JVM است که یک API برای ساخت برنامه های ناهمزمان و رویداد محور ارائه می دهد. ترکیب این دو فناوری می تواند مزایای قابل توجهی را برای برنامه های پخش داده شما به ارمغان بیاورد.
آشنایی با Google Cloud Pub/Sub
Google Cloud Pub/Sub بر اساس مدل انتشار - مشترک است. در این مدل، ناشران به موضوعات پیام می فرستند و مشترکین پیام هایی را از اشتراک دریافت می کنند. موضوع یک کانال منطقی است که در آن پیام ها ارسال می شود و اشتراک نقطه پایانی است که پیام ها را از یک موضوع دریافت می کند.
ویژگی های کلیدی Google Cloud Pub/Sub عبارتند از:
- مقیاس پذیری: می تواند تعداد زیادی پیام در ثانیه را مدیریت کند و برای سناریوهای جریان داده با حجم بالا مناسب است.
- قابلیت اطمینان: پیامها بادوام ذخیره میشوند و Pub/Sub حداقل یک بار تحویل را تضمین میکند.
- انعطاف پذیری: از چندین زبان برنامه نویسی پشتیبانی می کند و می تواند با سرویس های مختلف Google Cloud ادغام شود.
چرا از Reactor Core با Google Cloud Pub/Sub استفاده کنید؟
Reactor Core یک مدل برنامه نویسی واکنشی ارائه می دهد که برای مدیریت عملیات ناهمزمان و غیر مسدود کننده مناسب است. هنگامی که با Google Cloud Pub/Sub استفاده می شود، می تواند مزایای زیر را ارائه دهد:
- پردازش ناهمزمان: Reactor Core به شما امکان میدهد پیامهای Pub/Sub را به صورت ناهمزمان پردازش کنید، به این معنی که برنامه شما میتواند در حین انتظار برای پیامها، به انجام کارهای دیگر ادامه دهد. این می تواند عملکرد کلی برنامه شما را بهبود بخشد.
- مدیریت پس فشار: Reactor Core دارای پشتیبانی از فشار برگشتی است. در زمینه Pub/Sub، این بدان معنی است که اگر برنامه شما نتواند پیامها را به همان سرعتی که میرسد پردازش کند، میتواند به Pub/Sub سیگنال دهد تا سرعت تحویل پیام را کاهش دهد.
- جریان های قابل ترکیب: به راحتی می توانید جریان های واکنشی مختلف را در Reactor Core بسازید. به عنوان مثال، می توانید قبل از پردازش بیشتر، پیام ها را از Pub/Sub تبدیل، فیلتر یا جمع آوری کنید.
تنظیم محیط
قبل از اینکه بتوانید از Reactor Core با Google Cloud Pub/Sub استفاده کنید، باید محیط توسعه خود را تنظیم کنید.
پیش نیازها
- حساب Google Cloud: باید یک حساب Google Cloud داشته باشید و Pub/Sub API را فعال کنید.
- کیت توسعه جاوا (JDK): Reactor Core یک کتابخانه جاوا است، بنابراین باید JDK 8 یا بالاتر را نصب کنید.
- Maven یا Gradle: برای مدیریت وابستگی های جاوا می توانید از Maven یا Gradle استفاده کنید.
افزودن وابستگی ها
اگر از Maven استفاده می کنید، وابستگی های زیر را به خود اضافه کنیدpom.xml:
<dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor - core</artifactId> <version>3.4.15</version> </dependency> <dependency> <groupId>com.google.projectreactor</groupId> <artifactId>reactor - core</artifactId> <version>3.4.15</version> </dependency> <dependency> <groupId>com.google.cloud</groupId> -artif> <version>1.122.0</version> </dependency> </dependencies>
اگر از Gradle استفاده می کنید، موارد زیر را به خود اضافه کنیدbuild.gradle:
وابستگی ها { پیاده سازی 'io.projectreactor:reactor - core:3.4.15' implement 'com.google.cloud:google - cloud - pubsub:1.122.0' }
انتشار پیامها به Google Cloud Pub/Sub با Reactor Core
بیایید با مثالی از انتشار پیام به یک موضوع Pub/Sub با استفاده از Reactor Core شروع کنیم.
وارد کردن com.google.api.gax.core.CredentialsProvider؛ وارد کردن com.google.api.gax.core.FixedCredentialsProvider. وارد کردن com.google.auth.oauth2.GoogleCredentials؛ وارد کردن com.google.cloud.pubsub.v1.Publisher. وارد کردن com.google.protobuf.ByteString; وارد کردن com.google.pubsub.v1.ProjectTopicName; وارد کردن com.google.pubsub.v1.PubsubMessage; واردات reactor.core.publisher.Flux; وارد کردن java.io.FileInputStream. وارد کردن java.io.IOException. وارد کردن java.util.UUID. کلاس عمومی PubSubPublisherExample { public static void main(String[] args) IOException را می اندازد { // تنظیم اعتبارنامه GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream("path/to/your/credentials.json")); CredentialsProvider credentialsProvider = FixedCredentialsProvider.create(credentials); // نام موضوع ایجاد کنید ProjectTopicName topicName = ProjectTopicName.of("شماره - پروژه -", "نام - موضوع - شما"); // ایجاد ناشر Publisher publisher = Publisher.newBuilder(topicName) .setCredentialsProvider(credentialsProvider) .build(); . PubsubMessage.newBuilder() .setData(data) .putAttributes("messageId", messageId) .build(}); // انتشار پیامها messageFlux.flatMap(message -> { return publisher.publish(message).toFuture().thenApply(result -> { System.out.println("پیام منتشر شده با شناسه:" + نتیجه؛ نتیجه بازگشت؛ }); }).subscribe(); // Shutdown the publisher publisher.shutdown(); } }
در این مثال، ابتدا اعتبار Google Cloud را تنظیم کردیم. سپس یک را ایجاد می کنیمناشرشی برای موضوع مشخص شده ما از a استفاده می کنیمشاربرای تولید جریانی از پیام ها سپس هر پیام به صورت ناهمزمان با استفاده ازفلت مپاپراتور
![]()

اشتراک در Google Cloud Pub/Sub با Reactor Core
اکنون، بیایید نحوه اشتراک در اشتراک Pub/Sub و پردازش پیام ها با استفاده از Reactor Core را بررسی کنیم.
وارد کردن com.google.api.gax.core.CredentialsProvider؛ وارد کردن com.google.api.gax.core.FixedCredentialsProvider. وارد کردن com.google.auth.oauth2.GoogleCredentials؛ وارد کردن com.google.cloud.pubsub.v1.AckReplyConsumer. وارد کردن com.google.cloud.pubsub.v1.MessageReceiver. وارد کردن com.google.cloud.pubsub.v1.Subscriber; وارد کردن com.google.pubsub.v1.ProjectSubscriptionName; وارد کردن com.google.pubsub.v1.PubsubMessage; واردات reactor.core.publisher.Flux; import reactor.core.publisher.Mono; وارد کردن java.io.FileInputStream. وارد کردن java.io.IOException. وارد کردن java.util.concurrent.atomic.AtomicInteger. کلاس عمومی PubSubSubscriberExample { public static void main(String[] args) IOException را پرتاب می کند { // تنظیم اعتبارنامه GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream("path/to/your/credentials.json")); CredentialsProvider credentialsProvider = FixedCredentialsProvider.create(credentials); // ایجاد نام اشتراک ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of("شماره - پروژه -"، "اشتراک - نام - نام"); // ایجاد یک گیرنده پیام سفارشی شمارنده AtomicInteger = new AtomicInteger(0); Flux<PubsubMessage> messageFlux = Flux.create(sink -> { MessageReceiver گیرنده = (پیام، مصرف کننده) -> {sink.next(message); customers.ack(); }؛ Subscriber subscriber = Subscriber.newBuilder(subscriptionName.viderCrederentials) .build(); // پردازش پیام ها messageFlux.subscribe(message -> { System.out.println("Received message: " + message.getData().toStringUtf8()); counter.incrementAndGet(); System.out.println("مجموع پیام های دریافت شده: " + counter.get()); }); } }
در این مثال یک سفارشی ایجاد می کنیمگیرنده پیامکه پیام های دریافتی را به a ارسال می کندشار. راشارسپس مشترک می شود و هر پیام در کنسول چاپ می شود. ما همچنین تعداد کل پیام های دریافتی را پیگیری می کنیم.
موارد استفاده پیشرفته
تبدیل و تجمیع پیام
Reactor Core به شما امکان می دهد پیام های Pub/Sub را به راحتی تبدیل و جمع آوری کنید. برای مثال، میتوانید دادههای پیام را از JSON به یک شی جاوا تبدیل کنید، یا میتوانید پیامها را در یک پنجره زمانی معین جمعآوری کنید.
وارد کردن com.google.pubsub.v1.PubsubMessage; واردات reactor.core.publisher.Flux; وارد کردن java.time.Duration; کلاس عمومی MessageTransformationExample { public static void main(String[] args) { Flux<PubsubMessage> messageFlux = getMessageFluxFromPubSub(); Flux<String> transformedFlux = messageFlux .map(message -> message.getData().toStringUtf8()) .map(text -> text.toUpperCase()); Flux<Integer> aggregatedFlux = transformedFlux .bufferTimeout(10, Duration.ofSeconds(5)) .map(list -> list.size()); aggregatedFlux.subscribe(count -> System.out.println("تعداد پیام ها در پنجره: " + count)); } خصوصی ثابت Flux<PubsubMessage> getMessageFluxFromPubSub() { // کد دریافت جریان پیام از Pub/Sub return Flux.empty(); } }
در این مثال ابتدا هر پیام را به حروف بزرگ تبدیل می کنیم. سپس پیام ها را در یک بافر 10 پیامی یا یک پنجره زمانی 5 ثانیه ای جمع می کنیم و تعداد پیام های هر پنجره را می شماریم.
نتیجه گیری
استفاده از Reactor Core برای استریم داده در Google Cloud Pub/Sub می تواند مزایای قابل توجهی برای برنامه های شما به همراه داشته باشد. این یک روش ناهمزمان و واکنشی برای مدیریت پیام ها ارائه می دهد که می تواند عملکرد و مقیاس پذیری راه حل های جریان داده شما را بهبود بخشد. چه در حال انتشار یا اشتراک در Pub/Sub هستید، Reactor Core یک API قدرتمند و انعطاف پذیر برای مدیریت جریان داده ارائه می دهد.
اگر علاقه مند به استفاده هستیدهسته راکتوربرای پروژه های Google Cloud Pub/Sub یا کاوش راه حل های مرتبط دیگر مانندهسته آهنی سیلیکونیلطفا برای خرید و بحث های بیشتر با ما تماس بگیرید. ما متعهد به ارائه محصولات و خدمات با کیفیت بالا برای رفع نیازهای شما هستیم.
مراجع
- Google Cloud Pub/Sub Documentation
- مستندات هسته راکتور
