E. NestJs sebagai Producer
Konfigurasi
Pertama kita module dan service untuk kafka terlebih dahulu
Kemudian kita definisikan sebuah modul global di NestJS yang mengonfigurasi klien Kafka menggunakan ClientsModule. Modul ini mencakup KafkaService sebagai penyedia layanan dan mengekspornya sehingga dapat digunakan di seluruh aplikasi tanpa perlu mendeklarasikannya lagi di modul lain. Pendekatan ini membantu dalam memisahkan konfigurasi dari logika bisnis dan memudahkan pengelolaan serta penggunaan kembali komponen di berbagai tempat dalam aplikasi.
import { Global, Module } from '@nestjs/common';
import { KafkaService } from './kafka.service';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { kafkaConfig } from 'src/config/kafka.config';
@Global()
@Module({
imports: [
ClientsModule.register([
{
name: 'LATIHAN_KAFKA',
...kafkaConfig,
},p
]),
],
providers: [KafkaService],
exports: [KafkaService],
})
export class KafkaModule {}
!!! note - @Global()
: Menjadikan modul ini tersedia di seluruh aplikasi tanpa perlu mengimpor di setiap modul yang membutuhkan. - ClientsModule.register()
: Mengonfigurasi klien Kafka dengan menggunakan konfigurasi yang tela dipisahkan dalam file kafka.config.ts. - KafkaService
: Layanan yang berisi logika untuk berinteraksi dengan Kafka, diekspor agar dapat digunakan oleh modul lain.
Kemudian Kita implementasikan lifecycle onModuleInit
pada service untuk menghubungkan klien Kafka ketika modul diinisialisasi. Ini memastikan bahwa klien siap untuk mengirim dan menerima pesan ketika aplikasi berjalan.
import { Inject, Injectable, OnModuleInit } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
@Injectable()
export class KafkaService implements OnModuleInit {
constructor(@Inject('LATIHAN_KAFKA') private kafkaClient: ClientKafka) {}
async onModuleInit() {
await this.kafkaClient.connect();
}
async sendMessagewithEmit(topic: string, key:string, payload: any) {
return this.kafkaClient.emit(topic, {
key : key,
value : payload
});
}
}
Pada service di atas kita membuat method untuk memproduce message ke kafka dengan method emit dari kafkaClient.
Note
- Metode
sendMessagewithEmit
digunakan untuk mengirim pesan ke topik Kafka tertentu. topic
: Parameter yang menunjukkan topik Kafka yang akan menerima pesan.key
: Parameter yang menunjukkan kunci pesan. Kunci ini digunakan oleh Kafka untuk menentukan partisi tempat pesan akan disimpanpayload
: Data yang akan dikirim sebagai nilai pesan.this.kafkaClient.emit()
: Metode ini mengirimkan pesan ke topik Kafka dengan menggunakan klien Kafka yang telah terhubung. Pesan dikirim dalam format objek yang berisi key dan value.
Implentasikan pada app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { TypeOrmModule } from '@nestjs/typeorm';
import { typeOrmConfig } from './config/typeorm.config';
import { AuthModule } from './app/auth/auth.module';
import { MailModule } from './app/mail/mail.module';
import { ConfigModule } from '@nestjs/config';
import { KategoriModule } from './app/kategori/kategori.module';
import { ProdukModule } from './app/produk/produk.module';
import { UploadController } from './app/upload/upload.controller';
import { ServeStaticModule } from '@nestjs/serve-static';
import { join } from 'path';
import { KonsumenModule } from './app/konsumen/konsumen.module';
import { UniqueValidator } from './utils/validator/unique.validator';
import { OrderModule } from './app/order/order.module';
import { OrderDetailModule } from './app/order_detail/order_detail.module';
import { UploadModule } from './app/upload/upload.module';
import { BookModule } from './book/book.module';
import { ProfileModule } from './app/profile/profile.module';
import { QueryBuilderModule } from './query-builder/query-builder.module';
import { KafkaModule } from './kafka/kafka.module';
@Module({
imports: [
ServeStaticModule.forRoot({
rootPath: join(__dirname, '..', 'public'),
}),
ConfigModule.forRoot({
isGlobal: true,
}),
TypeOrmModule.forRoot(typeOrmConfig),
AuthModule,
MailModule,
KategoriModule,
ProdukModule,
BookModule,
KonsumenModule,
OrderModule,
OrderDetailModule,
UploadModule,
ProfileModule,
QueryBuilderModule,
KafkaModule,
],
controllers: [AppController, UploadController],
providers: [AppService, UniqueValidator],
})
export class AppModule {}
Case Produce message ke kafka tanpa menunggu response
Pada case ini kita akan mengimplentasikan ketika create order dengan memanfaatkan kafka. adapun logic create order sama seperti saat kita membuat api dengan beberapa penyesuaian.
Sebelum memulai case ini , pertama kita buat dahulu topic order
Langkah 1
Pertama kita buat dulu method sebagai prosedur untuk mengirimkan message ke kafka pada topic order seperti berikut.
import {
HttpException,
HttpStatus,
Inject,
Injectable,
NotFoundException,
} from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import BaseResponse from 'src/utils/response/base.response';
import { Between, Like, Repository } from 'typeorm';
import { Order } from './order.entity';
import { ResponsePagination, ResponseSuccess } from 'src/interface/response';
import { CreateOrderDto, UpdateOrderDto, findAllOrderDto } from './order.dto';
import { REQUEST } from '@nestjs/core';
import { Workbook } from 'exceljs';
import { Response } from 'express';
import { KafkaService } from 'src/kafka/kafka.service';
@Injectable()
export class OrderService extends BaseResponse {
constructor(
@InjectRepository(Order)
private readonly orderRepository: Repository<Order>,
@Inject(REQUEST) private req: any,
private readonly kafkaService: KafkaService,
) {
super();
}
...
async sendOrderToKafka(payload: CreateOrderDto) {
await this.kafkaService.sendMessagewithEmit(
'order',
'order_key',
JSON.stringify(payload),
);
}
...
}
Langkah 2
Kemudian kita membuat Controller untuk mengirimkan payload dari client
import {
Body,
Controller,
Delete,
Get,
Param,
Post,
Put,
Res,
UseGuards,
} from '@nestjs/common';
import { OrderService } from './order.service';
import { JwtGuard } from '../auth/auth.guard';
import { InjectCreatedBy } from 'src/utils/decorator/inject-created_by.decorator';
import { CreateOrderDto, UpdateOrderDto, findAllOrderDto } from './order.dto';
import { Pagination } from 'src/utils/decorator/pagination.decorator';
import { InjectUpdatedBy } from 'src/utils/decorator/inject-updated_by.decorator';
import { Response } from 'express';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { validate } from 'class-validator';
import { plainToInstance } from 'class-transformer';
@Controller('order')
export class OrderController {
constructor(private readonly orderService: OrderService) {}
@UseGuards(JwtGuard)
@Post('tambah-kafka')
async createOrderKafka(@InjectCreatedBy() payload: CreateOrderDto) {
return this.orderService.sendOrderToKafka(payload);
}
...
}
{
"tanggal_order" : "2023-09-01",
"status" : "belum bayar",
"total_bayar" : 40000,
"konsumen_id" : 1,
"order_detail" : [
{
"jumlah" : 10,
"harga" : 20000,
"produk" : {
"id" : 1
}
},
{
"jumlah" : 5,
"harga" : 50000,
"produk" : {
"id" : 1
}
}
]
}
Langkah 3
Kemudian kita akan membuat consumer ketika ada message baru di kafka pada topic order
import {
Body,
Controller,
Delete,
Get,
Param,
Post,
Put,
Res,
UseGuards,
} from '@nestjs/common';
import { OrderService } from './order.service';
import { JwtGuard } from '../auth/auth.guard';
import { InjectCreatedBy } from 'src/utils/decorator/inject-created_by.decorator';
import { CreateOrderDto, UpdateOrderDto, findAllOrderDto } from './order.dto';
import { Pagination } from 'src/utils/decorator/pagination.decorator';
import { InjectUpdatedBy } from 'src/utils/decorator/inject-updated_by.decorator';
import { Response } from 'express';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { validate } from 'class-validator';
import { plainToInstance } from 'class-transformer';
@Controller('order')
export class OrderController {
constructor(private readonly orderService: OrderService) {}
@MessagePattern('order')
async getPayloadFormKafka(@Payload() payload) {
console.log('payl', payload)
}
...
}
Lakukan Pengujian
Langkah 4
Setelah berhasil langkah selanjutkan kita akan membaut service untuk menyimpan message ke database dan merubah method getPayloadFormKafka di controller
import {
HttpException,
HttpStatus,
Inject,
Injectable,
NotFoundException,
} from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import BaseResponse from 'src/utils/response/base.response';
import { Between, Like, Repository } from 'typeorm';
import { Order } from './order.entity';
import { ResponsePagination, ResponseSuccess } from 'src/interface/response';
import { CreateOrderDto, UpdateOrderDto, findAllOrderDto } from './order.dto';
import { REQUEST } from '@nestjs/core';
import { Workbook } from 'exceljs';
import { Response } from 'express';
import { KafkaService } from 'src/kafka/kafka.service';
@Injectable()
export class OrderService extends BaseResponse {
constructor(
@InjectRepository(Order)
private readonly orderRepository: Repository<Order>,
@Inject(REQUEST) private req: any,
private readonly kafkaService: KafkaService,
) {
super();
}
...
async createOrderFromKafka(
payload: CreateOrderDto,
): Promise<ResponseSuccess> {
try {
const invoice = this.generateInvoice();
payload.nomor_order = invoice;
payload.order_detail &&
payload.order_detail.forEach((item) => {
item.created_by = { id: payload.created_by.id };
});
await this.orderRepository.save({
...payload,
konsumen: {
id: payload.konsumen_id,
},
});
return this._success('OK');
} catch (err) {
console.log('err', err);
throw new HttpException('Ada Kesalahan', HttpStatus.UNPROCESSABLE_ENTITY);
}
}
...
}
...
@Controller('order')
export class OrderController {
constructor(private readonly orderService: OrderService) {}
@MessagePattern('order')
async getPayloadFormKafka(@Payload() payload) {
console.log('payl', payload)
return this.orderService.createOrderFromKafka(payload)
}
...
}
Lakukan Pengujian
Terlihat pada hasil tes di atas kita berhasil menyimpan order dengan menggunakan kafka dengan skenarion Producer dan Consumer.
Note
Pada contoh implementasi ini kita menjadikan producer dan consumer pada topic yang sama di satu aplikasi backend. Pada projek rill biasanya kita mengimplementasikan pada aplikasi backend berbeda
Case Produce message ke kafka dengan meneunggu response dari consumer
Pada contoh sebelumnya kita terlah berhasil membuat order namun kalau kita perhatikan saat producer mengirimkan message ke kafka dan consume oleh consumer , Producer tidak tahu apakah order yang dikirim berhasil di simpan atau enggk.
Terlihat tidak ada response dari ketika data dikirim ke kafka. Hal ini terjadi karena kita menggunkan method emit ketika mengirimkan order ke kafka karena emit
tidak memerlukan response. Untuk mengatasi masalah ini kita bisa menggunakan method send
seperti contoh implementasi berikut.
Langkah 1
import { Inject, Injectable, OnModuleInit } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { firstValueFrom } from 'rxjs';
@Injectable()
export class KafkaService implements OnModuleInit {
constructor(@Inject('LATIHAN_KAFKA') private kafkaClient: ClientKafka) {}
async onModuleInit() {
this.kafkaClient.subscribeToResponseOf('order');
await this.kafkaClient.connect();
}
async sendMessagewithEmit(topic: string, key: string, payload: any) {
return this.kafkaClient.emit(topic, {
key: key,
value: payload,
});
}
async sendMessagewithSend(topic: string, key: string, payload: any) {
try {
const result = await firstValueFrom(
this.kafkaClient.send(topic, {
key: key,
value: payload,
}),
);
console.log('response kafka', result);
return result;
} catch (error) {
console.error('Error sending message to Kafka:', error);
throw error;
}
}
}
Penjelasan
this.kafkaClient.subscribeToResponseOf('order');
adalah metode yang digunakan dalam konteks Kafka dengan NestJS untuk memberitahu klien Kafka agar mendengarkan respons dari topik tertentu setelah mengirim pesan ke topik tersebut.
Langkah 2
Kita akan membuat method baru untuk mengirimkan data ke kafka dengan send
@Injectable()
export class OrderService extends BaseResponse {
constructor(
@InjectRepository(Order)
private readonly orderRepository: Repository<Order>,
@Inject(REQUEST) private req: any,
private readonly kafkaService: KafkaService,
) {
super();
}
...
async sendOrderToKafkaWithSend(payload: CreateOrderDto) {
const respons = await this.kafkaService.sendMessagewithSend(
'order',
'order_send',
JSON.stringify(payload),
);
return respons
}
...
}
Langkah 3
...
@Controller('order')
export class OrderController {
constructor(private readonly orderService: OrderService) {}
@UseGuards(JwtGuard)
@Post('tambah-kafka')
async createOrderKafka(@InjectCreatedBy() payload: CreateOrderDto) {
return this.orderService.sendOrderToKafkaWithSend(payload);
}
....
}
Lakukan Pengujian
Catatan Penting
Dalam pola komunikasi Request-Response Pattern
dengan method send
maka secara otomatis kafka akan membuat topic .reply
. Pada contoh ini kafka membuat topic order.reply
secara otomaris, diamana topic ini digunakan kafka khusus untuk menangani response terhadap request yang dikirim ke topik order
.
Sebagai gambar alur kerja dari proses sebagai berikut.
- Producer mengirimkan pesan ke topik order menggunakan metode send
- Consumer listen ke topik order, memproses message tersebut, dan kemudian mengirimkan balasan ke topik order.reply.
- Producer subscribe pada topik order.reply melalui metode
subscribeToResponseOf('order')
, sehingga dapat menerima balasan tersebut.
Note
Dalam konteks NestJS yang menggunakan microservices dan Kafka, emit dan send adalah dua metode berbeda yang digunakan untuk mengirim pesan ke topik Kafka, tetapi keduanya memiliki perbedaan dalam cara kerja dan tujuan penggunaannya.
- Penggunaan:
emit
digunakan untuk mengirim pesan tanpa mengharapkan balasan. Ini adalah pola "fire-and-forget", di mana kita hanya mengirimkan pesan ke topik tertentu dan tidak menunggu respons atau balasan dari konsumen yang menerima pesan tersebut. - Penggunaan: send digunakan ketika kita mengirim pesan dan mengharapkan balasan dari konsumen. Ini adalah pola "request-response", di mana kita mengirim pesan ke topik tertentu dan menunggu balasan dari konsumen yang menangani pesan tersebut. Balasan ini biasanya dikirim ke topik balasan yang terpisah.