Usage
Broker
@services/broker.js
import {ServiceBroker} from '@playtini/beta7';
const broker = new ServiceBroker({
nodeID: env('NODE_ID'), // Уникальный для сервиса
kafkaBrokers: env('KAFKA_BROKERS'), // Массив URL:PORT для подключения к Kafka
schemaRegistry: env('SCHEMA_REGISTRY'), // URL для подключения к Schema Registry,
});
export default broker;
//app.js
await broker.start(); // ОБЯЗАТЕЛЬНО ПЕРЕД готовностью обрабатывать запросы e.g.: server.listen()
Consumer
import {broker} from '@services';
export default broker.createService({
name: 'favorites.consumer',
listeners: {
'favorite.games.change': {
// Название прослушиваемого топика (Может быть RegExp, e.g. '/favorite.games.*/' )
fromBeginning: true, // Читаем ли мы с самого начала топика (если offset = undefined) или с места когда подключились (offset === kafka's offset)
// Описываем Handler-Функцию, которая вызовется при новом сообщении в топике
async handler(payload) {
console.warn(payload); // payload => decoded message.value
},g
},
},
});
Avro schema
Schema creation
Если вы используете Consume в данном сервисе, создайте AVRO-схему
@schemas/FavoriteChange.avro.js
import avro from 'avsc';
export default {
name: 'FavoriteChange',
scheme: avro.Type.forSchema({
type: 'record',
namespace: 'favorite.games',
name: 'Action',
fields: [
{
name: 'user',
type: 'string',
},
{
name: 'game_id',
type: 'string',
},
{
name: 'action',
type: {type: 'enum', name: 'action', symbols: ['ADD', 'DELETE']},
},
{
name: 'timestamp', // !Обязательное поле для всех схем!
type: 'long',
},
{
name: 'event_version', // !Обязательное поле для всех схем!
type: 'int',
},
],
}),
};
Schema import
Импортируйте описанную схему с помощью Брокера
@schemas/index.js
import {FavoriteChange} from '@schemas';
import {broker} from '@services';
const initSchemaRegistry = async () => {
await broker.registerSchema(FavoriteChange);
};
export default initSchemaRegistry;
//app.js
await initSchemaRegistry(); // ОБЯЗАТЕЛЬНО ПОСЛЕ await broker.start(); И ПЕРЕД готовностью обрабатывать запросы e.g.: server.listen()
await broker.sendMessage(env('BASE_TOPIC_NAME'), {
schemaID: broker.schemas[FavoriteChange.name],
value: {
user: req.user.uuid,
game_id: matchedReq.game_id,
action: 'ADD',
//Если вы не передаете event_version, будет подставленна дефолтная - 1, timestamp - Date.now()
},
});