KB
When producing messages to Memphis without attaching a schema, the SDK
produce
function is expecting to receive a bytes array, and therefore the standard message-producing implementation uses Buffer.from(<string>)
When attaching a schema, the producer is forced to pass the format that used in the schema creation, for example protobuf, and therefore we remove the
Buffer.from(<string>)
and use (example in JS) -await producer.produce({
message: payload,
headers: headers
});
1
const memphis = require("memphis-dev");
2
var protobuf = require("protobufjs");
3
4
(async function () {
5
try {
6
await memphis.connect({
7
host: "localhost",
8
username: "root",
9
connectionToken: "memphis"
10
});
11
12
const consumer = await memphis.consumer({
13
stationName: "marketing",
14
consumerName: "cons1",
15
consumerGroup: "cg_cons1",
16
maxMsgDeliveries: 3,
17
maxAckTimeMs: 2000,
18
genUniqueSuffix: true
19
});
20
21
const root = await protobuf.load("schema.proto");
22
var TestMessage = root.lookupType("Test");
23
24
consumer.on("message", message => {
25
const x = message.getData()
26
var msg = TestMessage.decode(x);
27
console.log(msg)
28
message.ack();
29
});
30
consumer.on("error", error => {
31
console.log(error);
32
});
33
} catch (ex) {
34
console.log(ex);
35
memphis.close();
36
}
37
})();
Last modified 7mo ago