Skip to content

D. NestJs sebagai Consumer

Konfigurasi

Langkah pertama buatkan konfig untuk kafka pada folder config

config/kafka.config.ts
import { KafkaOptions, Transport } from '@nestjs/microservices';

export const kafkaConfig: KafkaOptions = {
  transport: Transport.KAFKA,

  options: {
    client: {
      clientId: `backend-smkmq`,
      brokers: [`localhost:9092`],
    },
    consumer: {
      groupId: `smkmq-group-1`,
    },
  },
};

Kemudian pada kita implemantasikan config tersebut pada main.ts

main.ts
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { ValidationPipe } from '@nestjs/common';
import { useContainer } from 'class-validator';
import { kafkaConfig } from './config/kafka.config';
import { MicroserviceOptions } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);



  app.enableCors();
  app.useGlobalPipes(
    new ValidationPipe({
      whitelist: true,
      forbidUnknownValues: true,
      transform: true,
      validateCustomDecorators: true,
      transformOptions: {
        enableImplicitConversion: true,
      },
    }),
  );
  app.connectMicroservice<MicroserviceOptions>(kafkaConfig);

  app.startAllMicroservices();
  useContainer(app.select(AppModule), { fallbackOnErrors: true });
  await app.listen(5002);
}
bootstrap();

alt text

apabila prosenya sudah benar, kita akan melihat consumer kita di kafka dengan nama smkmq-group-1-server dengan state stable

Membuat MessagePattern

Decorator MessagePattern digunakan untuk menangani pesan dalam microservice architecture. Decorator ini merupakan bagian dari package @nestjs/microservices dan dirancang khusus untuk menangani pesan yang dikirim melalui berbagai lapisan transport seperti Kafka, RabbitMQ, NATS, dan lain-lain.

Penggunaan Utama MessagePattern

  • Message Routing: Memungkinkan routing pesan ke handler yang sesuai berdasarkan pola pesan. Pola ini bisa berupa string, objek, atau array yang mendefinisikan jenis pesan yang harus diproses oleh handler.
  • Decoupling: Membantu dalam mendekopling produsen pesan dari konsumen. Produsen mengirim pesan ke topik atau kanal, dan layanan apa pun dengan handler untuk pola tersebut dapat mengonsumsi pesan.
  • Scalability: Memungkinkan skala horizontal mikroservis. Beberapa instance dari sebuah layanan dapat menangani pola pesan yang sama, sehingga beban dapat didistribusikan ke beberapa instance.
  • Protocol Agnostic: Bekerja dengan berbagai lapisan transportasi yang didukung oleh NestJS, sehingga fleksibel untuk berbagai sistem pesan.

Pada contoh ini kita akan implementasikan MessagePAtter pada controller di konsumen.

konsumen.controller.ts
import { Controller, Get, Post, UseGuards } from '@nestjs/common';
import { KonsumenService } from './konsumen.service';
import { InjectCreatedBy } from 'src/utils/decorator/inject-created_by.decorator'; //import disini
import { CreateKonsumenDto, findAllKonsumenDto } from './konsumen.dto';
import { JwtGuard } from 'src/app/auth/auth.guard';
import { Pagination } from 'src/utils/decorator/pagination.decorator';
import { MessagePattern, Payload } from '@nestjs/microservices';

@Controller('konsumen')
export class KonsumenController {
  constructor(private konsumenService: KonsumenService) {}
  @UseGuards(JwtGuard)
  @Post('create')
  async create(@InjectCreatedBy() payload: CreateKonsumenDto) {
    console.log('pay', payload);
    return this.konsumenService.create(payload);
  }
  @UseGuards(JwtGuard)
  @Get('list')
  async findAll(@Pagination() query: findAllKonsumenDto) {
    return this.konsumenService.findAll(query);
  }

  @MessagePattern('belajar-kafka')
  async simpan(@Payload() payload) {
    console.log('payload', payload);


  }
}

Case menerima pesan string

alt text alt text

Case meneriman pesan object

alt text alt text

Case mengirimkan konsumen baru dari kafka daan simpan ke database

alt text

Pada materi sebelumny kita terlah membuat api dan service untuk menambahkan konsumen yang baru. Pada contoh kasus kali ini kita akan mengirimkan message ke kafka dan akan di consume oleh nestjs data dari konsumen yang baru untuk kemudian di simpan di dalam database. Kita tambahkan kode untuk memanggil service create konsumen dari konsumen.service.ts seperti di bawah

konsumen.controller.ts
  @MessagePattern('belajar-kafka')
  async simpan(@Payload() payload) {
    console.log('payload', payload);

    return this.konsumenService.create({
      ...payload,
      created_by: {
        id: 1,
      },
    });
  }
}

Kemudian kita coba kirimkan data berikut di kafka

data konsumen baru
{
    "nama_konsumen" : "Rizky Alfiansyah",
    "alamat_konsumen" : "Kp. Kebonjati Rt. 002",
    "email" : "rizky@gmail.com",
    "nomor_handphone" : "0895320050322"

}

alt text

alt text

Kita lihat di log di atas jika data sudah berhasil disimpan pada database, untuk memastikan kita lihat data konsumen dengan api list konsumen

alt text

Kita lihat bahwa data konsumen berhasil di tambahkan ke database.

Case Backend Down

Pada contoh kali ini kita coba kirimkan data konsumen baru ke kafka, sementara BE mati

data konsumen baru
{
    "nama_konsumen" : "Nayhan",
    "alamat_konsumen" : "Kp. Kebonjati Rt. 002",
    "email" : "nayhan@gmail.com",
    "nomor_handphone" : "0895320050322"

}

Pertama kita matikan NestJs terlebih dahulu

alt text

Kemudian kita produce message data di atasi di kafka alt text

Kemudian kita nyalakan kembali NestJS

alt text

Pada saat data kita nyalakan NestJs maka data akan langsung di consume dan di simpan ke database.

Case Data yang dikirim tidak sesuai

alt text

alt text

konsumen.controller.ts
import { Controller, Get, Post, UseGuards } from '@nestjs/common';
import { KonsumenService } from './konsumen.service';
import { InjectCreatedBy } from 'src/utils/decorator/inject-created_by.decorator'; //import disini
import { CreateKonsumenDto, findAllKonsumenDto } from './konsumen.dto';
import { JwtGuard } from 'src/app/auth/auth.guard';
import { Pagination } from 'src/utils/decorator/pagination.decorator';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { plainToInstance } from 'class-transformer';
import { validate } from 'class-validator';

@Controller('konsumen')
export class KonsumenController {
  constructor(private konsumenService: KonsumenService) {}

  ....

  @MessagePattern('belajar-kafka')
  async simpan(@Payload() payload) {
    console.log('payload', payload);

    try {
      const dto = plainToInstance(CreateKonsumenDto, payload);
      const errors = await validate(dto);
      if (errors.length > 0) {
        console.log('Validation failed:', errors);
        return;
      }
      await this.konsumenService.create({
        ...payload,
        created_by: {
          id: 1,
        },
      });
    } catch (err) {
      console.log('err', err);
    }
  }
}

alt text

alt text