D. NestJs sebagai Consumer
Konfigurasi
Langkah pertama buatkan konfig untuk kafka pada folder config
import { KafkaOptions, Transport } from '@nestjs/microservices';
export const kafkaConfig: KafkaOptions = {
transport: Transport.KAFKA,
options: {
client: {
clientId: `backend-smkmq`,
brokers: [`localhost:9092`],
},
consumer: {
groupId: `smkmq-group-1`,
},
},
};
Kemudian pada kita implemantasikan config tersebut pada main.ts
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { ValidationPipe } from '@nestjs/common';
import { useContainer } from 'class-validator';
import { kafkaConfig } from './config/kafka.config';
import { MicroserviceOptions } from '@nestjs/microservices';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.enableCors();
app.useGlobalPipes(
new ValidationPipe({
whitelist: true,
forbidUnknownValues: true,
transform: true,
validateCustomDecorators: true,
transformOptions: {
enableImplicitConversion: true,
},
}),
);
app.connectMicroservice<MicroserviceOptions>(kafkaConfig);
app.startAllMicroservices();
useContainer(app.select(AppModule), { fallbackOnErrors: true });
await app.listen(5002);
}
bootstrap();
apabila prosenya sudah benar, kita akan melihat consumer kita di kafka dengan nama smkmq-group-1-server
dengan state stable
Membuat MessagePattern
Decorator MessagePattern
digunakan untuk menangani pesan dalam microservice architecture
. Decorator ini merupakan bagian dari package @nestjs/microservices
dan dirancang khusus untuk menangani pesan yang dikirim melalui berbagai lapisan transport seperti Kafka, RabbitMQ, NATS, dan lain-lain.
Penggunaan Utama MessagePattern
- Message Routing: Memungkinkan routing pesan ke handler yang sesuai berdasarkan pola pesan. Pola ini bisa berupa string, objek, atau array yang mendefinisikan jenis pesan yang harus diproses oleh handler.
- Decoupling: Membantu dalam mendekopling produsen pesan dari konsumen. Produsen mengirim pesan ke topik atau kanal, dan layanan apa pun dengan handler untuk pola tersebut dapat mengonsumsi pesan.
- Scalability: Memungkinkan skala horizontal mikroservis. Beberapa instance dari sebuah layanan dapat menangani pola pesan yang sama, sehingga beban dapat didistribusikan ke beberapa instance.
- Protocol Agnostic: Bekerja dengan berbagai lapisan transportasi yang didukung oleh NestJS, sehingga fleksibel untuk berbagai sistem pesan.
Pada contoh ini kita akan implementasikan MessagePAtter pada controller di konsumen.
import { Controller, Get, Post, UseGuards } from '@nestjs/common';
import { KonsumenService } from './konsumen.service';
import { InjectCreatedBy } from 'src/utils/decorator/inject-created_by.decorator'; //import disini
import { CreateKonsumenDto, findAllKonsumenDto } from './konsumen.dto';
import { JwtGuard } from 'src/app/auth/auth.guard';
import { Pagination } from 'src/utils/decorator/pagination.decorator';
import { MessagePattern, Payload } from '@nestjs/microservices';
@Controller('konsumen')
export class KonsumenController {
constructor(private konsumenService: KonsumenService) {}
@UseGuards(JwtGuard)
@Post('create')
async create(@InjectCreatedBy() payload: CreateKonsumenDto) {
console.log('pay', payload);
return this.konsumenService.create(payload);
}
@UseGuards(JwtGuard)
@Get('list')
async findAll(@Pagination() query: findAllKonsumenDto) {
return this.konsumenService.findAll(query);
}
@MessagePattern('belajar-kafka')
async simpan(@Payload() payload) {
console.log('payload', payload);
}
}
Case menerima pesan string
Case meneriman pesan object
Case mengirimkan konsumen baru dari kafka daan simpan ke database
Pada materi sebelumny kita terlah membuat api dan service untuk menambahkan konsumen yang baru. Pada contoh kasus kali ini kita akan mengirimkan message ke kafka dan akan di consume oleh nestjs data dari konsumen yang baru untuk kemudian di simpan di dalam database. Kita tambahkan kode untuk memanggil service create konsumen dari konsumen.service.ts
seperti di bawah
@MessagePattern('belajar-kafka')
async simpan(@Payload() payload) {
console.log('payload', payload);
return this.konsumenService.create({
...payload,
created_by: {
id: 1,
},
});
}
}
Kemudian kita coba kirimkan data berikut di kafka
{
"nama_konsumen" : "Rizky Alfiansyah",
"alamat_konsumen" : "Kp. Kebonjati Rt. 002",
"email" : "rizky@gmail.com",
"nomor_handphone" : "0895320050322"
}
Kita lihat di log di atas jika data sudah berhasil disimpan pada database, untuk memastikan kita lihat data konsumen dengan api list konsumen
Kita lihat bahwa data konsumen berhasil di tambahkan ke database.
Case Backend Down
Pada contoh kali ini kita coba kirimkan data konsumen baru ke kafka, sementara BE mati
{
"nama_konsumen" : "Nayhan",
"alamat_konsumen" : "Kp. Kebonjati Rt. 002",
"email" : "nayhan@gmail.com",
"nomor_handphone" : "0895320050322"
}
Pertama kita matikan NestJs terlebih dahulu
Kemudian kita produce message data di atasi di kafka
Kemudian kita nyalakan kembali NestJS
Pada saat data kita nyalakan NestJs maka data akan langsung di consume dan di simpan ke database.
Case Data yang dikirim tidak sesuai
import { Controller, Get, Post, UseGuards } from '@nestjs/common';
import { KonsumenService } from './konsumen.service';
import { InjectCreatedBy } from 'src/utils/decorator/inject-created_by.decorator'; //import disini
import { CreateKonsumenDto, findAllKonsumenDto } from './konsumen.dto';
import { JwtGuard } from 'src/app/auth/auth.guard';
import { Pagination } from 'src/utils/decorator/pagination.decorator';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { plainToInstance } from 'class-transformer';
import { validate } from 'class-validator';
@Controller('konsumen')
export class KonsumenController {
constructor(private konsumenService: KonsumenService) {}
....
@MessagePattern('belajar-kafka')
async simpan(@Payload() payload) {
console.log('payload', payload);
try {
const dto = plainToInstance(CreateKonsumenDto, payload);
const errors = await validate(dto);
if (errors.length > 0) {
console.log('Validation failed:', errors);
return;
}
await this.konsumenService.create({
...payload,
created_by: {
id: 1,
},
});
} catch (err) {
console.log('err', err);
}
}
}