Skip to content

E. NestJs sebagai Producer

Konfigurasi

Pertama kita module dan service untuk kafka terlebih dahulu

terminal
npx nest g module kafka
npx nest g service kafka

Kemudian kita definisikan sebuah modul global di NestJS yang mengonfigurasi klien Kafka menggunakan ClientsModule. Modul ini mencakup KafkaService sebagai penyedia layanan dan mengekspornya sehingga dapat digunakan di seluruh aplikasi tanpa perlu mendeklarasikannya lagi di modul lain. Pendekatan ini membantu dalam memisahkan konfigurasi dari logika bisnis dan memudahkan pengelolaan serta penggunaan kembali komponen di berbagai tempat dalam aplikasi.

kafka.module.ts
  import { Global, Module } from '@nestjs/common';
  import { KafkaService } from './kafka.service';
  import { ClientsModule, Transport } from '@nestjs/microservices';
  import { kafkaConfig } from 'src/config/kafka.config';

  @Global()
  @Module({
    imports: [
      ClientsModule.register([
        {
          name: 'LATIHAN_KAFKA',
          ...kafkaConfig,
        },p
      ]),
    ],
    providers: [KafkaService],
    exports: [KafkaService],
  })
  export class KafkaModule {}

!!! note - @Global(): Menjadikan modul ini tersedia di seluruh aplikasi tanpa perlu mengimpor di setiap modul yang membutuhkan. - ClientsModule.register(): Mengonfigurasi klien Kafka dengan menggunakan konfigurasi yang tela dipisahkan dalam file kafka.config.ts. - KafkaService: Layanan yang berisi logika untuk berinteraksi dengan Kafka, diekspor agar dapat digunakan oleh modul lain.

Kemudian Kita implementasikan lifecycle onModuleInit pada service untuk menghubungkan klien Kafka ketika modul diinisialisasi. Ini memastikan bahwa klien siap untuk mengirim dan menerima pesan ketika aplikasi berjalan.

kafka.service.ts
import { Inject, Injectable, OnModuleInit } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';

@Injectable()
export class KafkaService implements OnModuleInit {
  constructor(@Inject('LATIHAN_KAFKA') private kafkaClient: ClientKafka) {}

  async onModuleInit() {
    await this.kafkaClient.connect();
  }

  async sendMessagewithEmit(topic: string, key:string, payload: any) {
    return this.kafkaClient.emit(topic, {
        key : key,
        value : payload
    });
  }
}

alt text

Pada service di atas kita membuat method untuk memproduce message ke kafka dengan method emit dari kafkaClient.

Note

  • Metode sendMessagewithEmit digunakan untuk mengirim pesan ke topik Kafka tertentu.
  • topic: Parameter yang menunjukkan topik Kafka yang akan menerima pesan.
  • key: Parameter yang menunjukkan kunci pesan. Kunci ini digunakan oleh Kafka untuk menentukan partisi tempat pesan akan disimpan
  • payload: Data yang akan dikirim sebagai nilai pesan.
  • this.kafkaClient.emit(): Metode ini mengirimkan pesan ke topik Kafka dengan menggunakan klien Kafka yang telah terhubung. Pesan dikirim dalam format objek yang berisi key dan value.

Implentasikan pada app.module.ts

app.module.ts
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { TypeOrmModule } from '@nestjs/typeorm';
import { typeOrmConfig } from './config/typeorm.config';
import { AuthModule } from './app/auth/auth.module';
import { MailModule } from './app/mail/mail.module';
import { ConfigModule } from '@nestjs/config';
import { KategoriModule } from './app/kategori/kategori.module';
import { ProdukModule } from './app/produk/produk.module';
import { UploadController } from './app/upload/upload.controller';
import { ServeStaticModule } from '@nestjs/serve-static';
import { join } from 'path';
import { KonsumenModule } from './app/konsumen/konsumen.module';
import { UniqueValidator } from './utils/validator/unique.validator';
import { OrderModule } from './app/order/order.module';
import { OrderDetailModule } from './app/order_detail/order_detail.module';
import { UploadModule } from './app/upload/upload.module';
import { BookModule } from './book/book.module';
import { ProfileModule } from './app/profile/profile.module';
import { QueryBuilderModule } from './query-builder/query-builder.module';
import { KafkaModule } from './kafka/kafka.module';

@Module({
  imports: [
    ServeStaticModule.forRoot({
      rootPath: join(__dirname, '..', 'public'),
    }),

    ConfigModule.forRoot({
      isGlobal: true,
    }),
    TypeOrmModule.forRoot(typeOrmConfig),

    AuthModule,
    MailModule,
    KategoriModule,
    ProdukModule,
    BookModule,
    KonsumenModule,
    OrderModule,
    OrderDetailModule,
    UploadModule,
    ProfileModule,
    QueryBuilderModule,
    KafkaModule,
  ],
  controllers: [AppController, UploadController],
  providers: [AppService, UniqueValidator],
})
export class AppModule {}

Case Produce message ke kafka tanpa menunggu response

Pada case ini kita akan mengimplentasikan ketika create order dengan memanfaatkan kafka. adapun logic create order sama seperti saat kita membuat api dengan beberapa penyesuaian.

Sebelum memulai case ini , pertama kita buat dahulu topic order

alt text

Langkah 1

Pertama kita buat dulu method sebagai prosedur untuk mengirimkan message ke kafka pada topic order seperti berikut.

order.service.ts
import {
  HttpException,
  HttpStatus,
  Inject,
  Injectable,
  NotFoundException,
} from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import BaseResponse from 'src/utils/response/base.response';
import { Between, Like, Repository } from 'typeorm';
import { Order } from './order.entity';
import { ResponsePagination, ResponseSuccess } from 'src/interface/response';
import { CreateOrderDto, UpdateOrderDto, findAllOrderDto } from './order.dto';
import { REQUEST } from '@nestjs/core';
import { Workbook } from 'exceljs';
import { Response } from 'express';
import { KafkaService } from 'src/kafka/kafka.service';

@Injectable()
export class OrderService extends BaseResponse {
  constructor(
    @InjectRepository(Order)
    private readonly orderRepository: Repository<Order>,
    @Inject(REQUEST) private req: any,
    private readonly kafkaService: KafkaService,
  ) {
    super();
  }

  ...


  async sendOrderToKafka(payload: CreateOrderDto) {
    await this.kafkaService.sendMessagewithEmit(
      'order',
      'order_key',
      JSON.stringify(payload),
    );
  }

  ...
}

Langkah 2

Kemudian kita membuat Controller untuk mengirimkan payload dari client

order.controller.ts
import {
  Body,
  Controller,
  Delete,
  Get,
  Param,
  Post,
  Put,
  Res,
  UseGuards,
} from '@nestjs/common';
import { OrderService } from './order.service';
import { JwtGuard } from '../auth/auth.guard';
import { InjectCreatedBy } from 'src/utils/decorator/inject-created_by.decorator';
import { CreateOrderDto, UpdateOrderDto, findAllOrderDto } from './order.dto';
import { Pagination } from 'src/utils/decorator/pagination.decorator';
import { InjectUpdatedBy } from 'src/utils/decorator/inject-updated_by.decorator';
import { Response } from 'express';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { validate } from 'class-validator';
import { plainToInstance } from 'class-transformer';

@Controller('order')
export class OrderController {
  constructor(private readonly orderService: OrderService) {}

  @UseGuards(JwtGuard)
  @Post('tambah-kafka')
  async createOrderKafka(@InjectCreatedBy() payload: CreateOrderDto) {
    return this.orderService.sendOrderToKafka(payload);
  }


  ...
}

alt text

payload
 {
     "tanggal_order" : "2023-09-01",
     "status" : "belum bayar",
     "total_bayar" : 40000,
     "konsumen_id" : 1,
     "order_detail" : [
         {
             "jumlah" : 10,
             "harga" : 20000,
             "produk" : {
                 "id" : 1
             }


         },
         {
             "jumlah" : 5,
              "harga" : 50000,
             "produk" : {
                 "id" : 1
             }


         }
     ]
 }

Langkah 3

Kemudian kita akan membuat consumer ketika ada message baru di kafka pada topic order

order.controller.ts
import {
  Body,
  Controller,
  Delete,
  Get,
  Param,
  Post,
  Put,
  Res,
  UseGuards,
} from '@nestjs/common';
import { OrderService } from './order.service';
import { JwtGuard } from '../auth/auth.guard';
import { InjectCreatedBy } from 'src/utils/decorator/inject-created_by.decorator';
import { CreateOrderDto, UpdateOrderDto, findAllOrderDto } from './order.dto';
import { Pagination } from 'src/utils/decorator/pagination.decorator';
import { InjectUpdatedBy } from 'src/utils/decorator/inject-updated_by.decorator';
import { Response } from 'express';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { validate } from 'class-validator';
import { plainToInstance } from 'class-transformer';

@Controller('order')
export class OrderController {
  constructor(private readonly orderService: OrderService) {}

  @MessagePattern('order')
  async getPayloadFormKafka(@Payload() payload) {

    console.log('payl', payload)


  }

  ...
}

Lakukan Pengujian

alt text alt text alt text

Langkah 4

Setelah berhasil langkah selanjutkan kita akan membaut service untuk menyimpan message ke database dan merubah method getPayloadFormKafka di controller

order.service.ts
import {
  HttpException,
  HttpStatus,
  Inject,
  Injectable,
  NotFoundException,
} from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import BaseResponse from 'src/utils/response/base.response';
import { Between, Like, Repository } from 'typeorm';
import { Order } from './order.entity';
import { ResponsePagination, ResponseSuccess } from 'src/interface/response';
import { CreateOrderDto, UpdateOrderDto, findAllOrderDto } from './order.dto';
import { REQUEST } from '@nestjs/core';
import { Workbook } from 'exceljs';
import { Response } from 'express';
import { KafkaService } from 'src/kafka/kafka.service';

@Injectable()
export class OrderService extends BaseResponse {
  constructor(
    @InjectRepository(Order)
    private readonly orderRepository: Repository<Order>,
    @Inject(REQUEST) private req: any,
    private readonly kafkaService: KafkaService,
  ) {
    super();
  }

  ...


  async createOrderFromKafka(
    payload: CreateOrderDto,
  ): Promise<ResponseSuccess> {


    try {
      const invoice = this.generateInvoice();
      payload.nomor_order = invoice;

      payload.order_detail &&
        payload.order_detail.forEach((item) => {
          item.created_by = { id: payload.created_by.id };
        });

      await this.orderRepository.save({
        ...payload,
        konsumen: {
          id: payload.konsumen_id,
        },
      });

      return this._success('OK');
    } catch (err) {
      console.log('err', err);
      throw new HttpException('Ada Kesalahan', HttpStatus.UNPROCESSABLE_ENTITY);
    }
  }

  ...

}
order.controller.ts
...

@Controller('order')
export class OrderController {
  constructor(private readonly orderService: OrderService) {}

  @MessagePattern('order')
  async getPayloadFormKafka(@Payload() payload) {

    console.log('payl', payload)
    return this.orderService.createOrderFromKafka(payload)

  }

  ...
}

Lakukan Pengujian

alt text alt text alt text

Terlihat pada hasil tes di atas kita berhasil menyimpan order dengan menggunakan kafka dengan skenarion Producer dan Consumer.

Note

Pada contoh implementasi ini kita menjadikan producer dan consumer pada topic yang sama di satu aplikasi backend. Pada projek rill biasanya kita mengimplementasikan pada aplikasi backend berbeda

Case Produce message ke kafka dengan meneunggu response dari consumer

Pada contoh sebelumnya kita terlah berhasil membuat order namun kalau kita perhatikan saat producer mengirimkan message ke kafka dan consume oleh consumer , Producer tidak tahu apakah order yang dikirim berhasil di simpan atau enggk.

alt text

Terlihat tidak ada response dari ketika data dikirim ke kafka. Hal ini terjadi karena kita menggunkan method emit ketika mengirimkan order ke kafka karena emit tidak memerlukan response. Untuk mengatasi masalah ini kita bisa menggunakan method sendseperti contoh implementasi berikut.

Langkah 1

kafka.service.ts
import { Inject, Injectable, OnModuleInit } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { firstValueFrom } from 'rxjs';

@Injectable()
export class KafkaService implements OnModuleInit {
  constructor(@Inject('LATIHAN_KAFKA') private kafkaClient: ClientKafka) {}

  async onModuleInit() {
    this.kafkaClient.subscribeToResponseOf('order');
    await this.kafkaClient.connect();
  }

  async sendMessagewithEmit(topic: string, key: string, payload: any) {
    return this.kafkaClient.emit(topic, {
      key: key,
      value: payload,
    });
  }
  async sendMessagewithSend(topic: string, key: string, payload: any) {
    try {
      const result = await firstValueFrom(
        this.kafkaClient.send(topic, {
          key: key,
          value: payload,
        }),
      );

      console.log('response kafka', result);
      return result;
    } catch (error) {
      console.error('Error sending message to Kafka:', error);
      throw error;
    }
  }
}

Penjelasan

this.kafkaClient.subscribeToResponseOf('order'); adalah metode yang digunakan dalam konteks Kafka dengan NestJS untuk memberitahu klien Kafka agar mendengarkan respons dari topik tertentu setelah mengirim pesan ke topik tersebut.

Langkah 2

Kita akan membuat method baru untuk mengirimkan data ke kafka dengan send

order.service.ts
@Injectable()
export class OrderService extends BaseResponse {
  constructor(
    @InjectRepository(Order)
    private readonly orderRepository: Repository<Order>,
    @Inject(REQUEST) private req: any,
    private readonly kafkaService: KafkaService,
  ) {
    super();
  }

  ...


 async sendOrderToKafkaWithSend(payload: CreateOrderDto) {
   const respons = await this.kafkaService.sendMessagewithSend(
      'order',
      'order_send',
      JSON.stringify(payload),
    );


    return respons
  }

  ...
}

Langkah 3

order.controller.ts
...

@Controller('order')
export class OrderController {
  constructor(private readonly orderService: OrderService) {}


  @UseGuards(JwtGuard)
  @Post('tambah-kafka')
  async createOrderKafka(@InjectCreatedBy() payload: CreateOrderDto) {
    return this.orderService.sendOrderToKafkaWithSend(payload);
  }

  ....
}

Lakukan Pengujian

alt text

Catatan Penting

Dalam pola komunikasi Request-Response Pattern dengan method send maka secara otomatis kafka akan membuat topic .reply. Pada contoh ini kafka membuat topic order.reply secara otomaris, diamana topic ini digunakan kafka khusus untuk menangani response terhadap request yang dikirim ke topik order.

Sebagai gambar alur kerja dari proses sebagai berikut.

  • Producer mengirimkan pesan ke topik order menggunakan metode send
  • Consumer listen ke topik order, memproses message tersebut, dan kemudian mengirimkan balasan ke topik order.reply.
  • Producer subscribe pada topik order.reply melalui metode subscribeToResponseOf('order'), sehingga dapat menerima balasan tersebut.

alt text

Note

Dalam konteks NestJS yang menggunakan microservices dan Kafka, emit dan send adalah dua metode berbeda yang digunakan untuk mengirim pesan ke topik Kafka, tetapi keduanya memiliki perbedaan dalam cara kerja dan tujuan penggunaannya.

  • Penggunaan: emit digunakan untuk mengirim pesan tanpa mengharapkan balasan. Ini adalah pola "fire-and-forget", di mana kita hanya mengirimkan pesan ke topik tertentu dan tidak menunggu respons atau balasan dari konsumen yang menerima pesan tersebut.
  • Penggunaan: send digunakan ketika kita mengirim pesan dan mengharapkan balasan dari konsumen. Ini adalah pola "request-response", di mana kita mengirim pesan ke topik tertentu dan menunggu balasan dari konsumen yang menangani pesan tersebut. Balasan ini biasanya dikirim ke topik balasan yang terpisah.