Skip to main content

Usage

Broker

@services/broker.js
import {ServiceBroker} from '@playtini/beta7';

const broker = new ServiceBroker({
nodeID: env('NODE_ID'), // Уникальный для сервиса
kafkaBrokers: env('KAFKA_BROKERS'), // Массив URL:PORT для подключения к Kafka
schemaRegistry: env('SCHEMA_REGISTRY'), // URL для подключения к Schema Registry,
});

export default broker;

//app.js
await broker.start(); // ОБЯЗАТЕЛЬНО ПЕРЕД готовностью обрабатывать запросы e.g.: server.listen()

Consumer

import {broker} from '@services';

export default broker.createService({
name: 'favorites.consumer',
listeners: {
'favorite.games.change': {
// Название прослушиваемого топика (Может быть RegExp, e.g. '/favorite.games.*/' )
fromBeginning: true, // Читаем ли мы с самого начала топика (если offset = undefined) или с места когда подключились (offset === kafka's offset)

// Описываем Handler-Функцию, которая вызовется при новом сообщении в топике
async handler(payload) {
console.warn(payload); // payload => decoded message.value
},g
},
},
});

Avro schema

Schema creation

Если вы используете Consume в данном сервисе, создайте AVRO-схему

@schemas/FavoriteChange.avro.js

import avro from 'avsc';

export default {
name: 'FavoriteChange',
scheme: avro.Type.forSchema({
type: 'record',
namespace: 'favorite.games',
name: 'Action',
fields: [
{
name: 'user',
type: 'string',
},
{
name: 'game_id',
type: 'string',
},
{
name: 'action',
type: {type: 'enum', name: 'action', symbols: ['ADD', 'DELETE']},
},
{
name: 'timestamp', // !Обязательное поле для всех схем!
type: 'long',
},
{
name: 'event_version', // !Обязательное поле для всех схем!
type: 'int',
},
],
}),
};

Schema import

Импортируйте описанную схему с помощью Брокера

@schemas/index.js
import {FavoriteChange} from '@schemas';
import {broker} from '@services';

const initSchemaRegistry = async () => {
await broker.registerSchema(FavoriteChange);
};

export default initSchemaRegistry;

//app.js
await initSchemaRegistry(); // ОБЯЗАТЕЛЬНО ПОСЛЕ await broker.start(); И ПЕРЕД готовностью обрабатывать запросы e.g.: server.listen()
await broker.sendMessage(env('BASE_TOPIC_NAME'), {
schemaID: broker.schemas[FavoriteChange.name],
value: {
user: req.user.uuid,
game_id: matchedReq.game_id,
action: 'ADD',
//Если вы не передаете event_version, будет подставленна дефолтная - 1, timestamp - Date.now()
},
});