-
Notifications
You must be signed in to change notification settings - Fork 1
/
kafka-consumer.service.ts
145 lines (134 loc) · 4.86 KB
/
kafka-consumer.service.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import {extensionPoint, extensions, Getter, inject} from '@loopback/core';
import {
ConsumerType,
EventsInStream,
IBaseConsumer,
IGenericConsumer,
isSharedConsumer,
isConsumer,
IStreamDefinition,
ConsumerConfig,
} from '../types';
import {ConsumerExtensionPoint, KafkaClientBindings} from '../keys';
import {Consumer, EachMessagePayload, Kafka} from 'kafkajs';
import {ILogger, LOGGER} from '@sourceloop/core';
import {KafkaErrorKeys} from '../error-keys';
@extensionPoint(ConsumerExtensionPoint.key)
/* It creates a Kafka consumer client, subscribes to the topics,
and then runs the consumer client */
export class KafkaConsumerService<T extends IStreamDefinition> {
consumers: Consumer[] = [];
constructor(
/* A way to get all the extensions that are registered for a consumer extension point. */
@extensions()
private getConsumers: Getter<ConsumerType<T, keyof T['messages']>[]>,
@inject(KafkaClientBindings.KafkaClient)
private client: Kafka,
@inject(KafkaClientBindings.ConsumerConfiguration, {optional: true})
private configuration: ConsumerConfig,
@inject(LOGGER.LOGGER_INJECT) private readonly logger: ILogger,
) {
if (!configuration) {
throw Error(KafkaErrorKeys.ConsumerConfigurationMissing);
}
}
async consume(): Promise<void> {
const kafkaConsumerClient = this.client.consumer(this.configuration);
this.consumers.push(kafkaConsumerClient);
await kafkaConsumerClient.connect();
const {consumerMap, genericConsumerMap} = await this.buildConsumerMaps();
const topics: string[] = Array.from(consumerMap.keys());
await kafkaConsumerClient.subscribe({
topics,
});
await kafkaConsumerClient.run({
eachMessage: async (payload: EachMessagePayload) => {
const eventMap = consumerMap.get(payload.topic as string);
const genericConsumer = genericConsumerMap.get(payload.topic as string);
if (payload.message.value) {
const message = JSON.parse(payload.message.value.toString('utf8'));
const consumer = eventMap?.get(message.event);
if (consumer) {
await consumer.handler(message.data);
} else if (!genericConsumer) {
this.logger.warn(
`${KafkaErrorKeys.UnhandledEvent}: ${JSON.stringify(
payload,
)} with event: ${message.event}}`,
);
} else {
this.logger.warn(
`${KafkaErrorKeys.HandleByGenericConsumer}:${message.event}`,
);
}
if (
(!consumer || this.configuration.alwaysRunGenericConsumer) &&
genericConsumer
) {
await genericConsumer.handler(message.data);
}
} else {
this.logger.warn(
`${KafkaErrorKeys.EventWithoutValue}: ${JSON.stringify(payload)}`,
);
}
},
});
this.setupConsumerEventHandlers(kafkaConsumerClient);
}
private async buildConsumerMaps(): Promise<{
consumerMap: Map<T['topic'], Map<EventsInStream<T>, IBaseConsumer<T>>>;
genericConsumerMap: Map<T['topic'], IGenericConsumer<T>>;
}> {
const consumerMap = new Map<
T['topic'],
Map<EventsInStream<T>, IBaseConsumer<T>>
>();
const genericConsumerMap = new Map<T['topic'], IGenericConsumer<T>>();
const consumers = await this.getConsumers();
for (const consumer of consumers) {
if (!consumer.topic) {
throw new Error(`${KafkaErrorKeys.ConsumerWithoutTopic}: ${consumer}`);
}
const topic = consumer.topic;
if (isSharedConsumer(consumer)) {
const eventMap =
consumerMap.get(topic) ??
new Map<EventsInStream<T>, IBaseConsumer<T>>();
consumer.events.forEach(event => {
eventMap.set(event, consumer);
});
consumerMap.set(topic, eventMap);
} else if (isConsumer(consumer)) {
const eventMap =
consumerMap.get(topic) ??
new Map<EventsInStream<T>, IBaseConsumer<T>>();
eventMap.set(consumer.event, consumer);
consumerMap.set(topic, eventMap);
} else {
if (genericConsumerMap.has(topic)) {
throw new Error(
`${KafkaErrorKeys.MultipleGenericConsumers}: ${topic}`,
);
}
genericConsumerMap.set(topic, consumer);
}
}
return {consumerMap, genericConsumerMap};
}
private setupConsumerEventHandlers(kafkaConsumerClient: Consumer) {
kafkaConsumerClient.on('consumer.connect', event => {
this.logger.debug(`${event.payload}`);
});
kafkaConsumerClient.on('consumer.crash', event => {
this.logger.debug(`${event.payload}`);
});
kafkaConsumerClient.on('consumer.disconnect', event => {
this.logger.debug(`${event.payload}`);
});
}
async stop() {
await Promise.all(this.consumers.map(consumer => consumer.disconnect()));
this.consumers = [];
}
}