Search
⌃K
Links

Protobuf

Protocol Buffers (Protobuf) is a free and open-source cross-platform data format used to serialize structured data, Initially released on July 7, 2008. It is useful in developing programs to communicate with each other over a network or for storing data. The method involves an interface description language that describes the structure of some data and a program that generates source code from that description for generating or parsing a stream of bytes that represents the structured data.

Supported versions

  • proto2
  • proto3

Supported Features

  • Retrieve compiled protobuf schemas (Produce messages without .proto files)
  • Versioning
  • Embedded serialization
  • Live evolution
  • Import packages (soon)
  • Import types (soon)

Getting started

Attach a schema

Step 1: Create a new schema

GUI
SDK
Head to the "Schemaverse" page
Create a new schema by clicking on "Create from blank"
Soon.

Step 2: Attach

GUI
SDK
Head to your station, and on the top-left corner, click on "+ Attach schema"
It can be found through the different SDKs docs.

Produce a message (Serialization)

Node.js
Go
Python
TypeScript
HTTP (REST)
Memphis abstracts the need for external serialization functions and embeds them within the SDK.
In node.js, we can simply produce an object. Behind the scenes, the object will be serialized based on the attached schema and data format - protobuf.
Example schema:
syntax = "proto3";
message Test {
string field1 = 1;
string field2 = 2;
int32 field3 = 3;
}
Producing a message without a local .proto file:
1
const memphis = require("memphis-dev");
2
3
(async function () {
4
try {
5
await memphis.connect({
6
host: "MEMPHIS_BROKER_URL",
7
username: "APPLICATION_USER",
8
password: "PASSWORD"
9
});
10
const producer = await memphis.producer({
11
stationName: "STATION_NAME",
12
producerName: "PRODUCER_NAME"
13
});
14
var payload = {
15
fname: "AwesomeString",
16
lname: "AwesomeString",
17
id: 54
18
};
19
try {
20
await producer.produce({
21
message: payload
22
});
23
} catch (ex) {
24
console.log(ex.message)
25
}
26
} catch (ex) {
27
console.log(ex);
28
memphis.close();
29
}
30
})();
Memphis abstracts the need for external serialization functions and embeds it within the SDK.
Example schema:
syntax = "proto3";
message Test {
string field1 = 1;
string field2 = 2;
int32 field3 = 3;
}
Producing a message without a local .proto file:
package main
import (
"fmt"
"os"
"github.com/memphisdev/memphis.go"
)
func main() {
conn, err := memphis.Connect("MEMPHIS_BROKER_URL", "APPLICATION_TYPE_USERNAME", memphis.Password("PASSWORD"))
if err != nil {
os.Exit(1)
}
defer conn.Close()
p, err := conn.CreateProducer("STATION_NAME", "PRODUCER_NAME")
hdrs := memphis.Headers{}
hdrs.New()
err = hdrs.Add("key", "value")
if err != nil {
fmt.Printf("Header failed: %v\n", err)
os.Exit(1)
}
msg := make(map[string]interface{})
msg["field1"] = "value1"
msg["field2"] = "value2"
msg["field3"] = 32
err = p.Produce(msg, memphis.MsgHeaders(hdrs))
if err != nil {
fmt.Printf("Produce failed: %v\n", err)
os.Exit(1)
}
}
Producing a message with a local .proto file:
package main
import (
"fmt"
"os"
"demo/schemapb"
"github.com/memphisdev/memphis.go"
)
func main() {
conn, err := memphis.Connect("MEMPHIS_BROKER_URL", "APPLICATION_TYPE_USERNAME", memphis.Password("PASSWORD"))
if err != nil {
os.Exit(1)
}
defer conn.Close()
p, err := conn.CreateProducer("STATION_NAME", "PRODUCER_NAME")
hdrs := memphis.Headers{}
hdrs.New()
err = hdrs.Add("key", "value")
if err != nil {
fmt.Printf("Header failed: %v\n", err)
os.Exit(1)
}
s1 := "Hello"
s2 := "World"
pbInstance := schemapb.Test{
Field1: &s1,
Field2: &s2,
}
err = p.Produce(&pbInstance, memphis.MsgHeaders(hdrs))
if err != nil {
fmt.Printf("Produce failed: %v\n", err)
os.Exit(1)
}
}
Memphis abstracts the need for external serialization functions and embeds them within the SDK.
Example schema:
syntax = "proto3";
message Test {
string field1 = 1;
string field2 = 2;
int32 field3 = 3;
}
Producing a message with a local .proto file:
import asyncio
from memphis import Memphis, Headers, MemphisError, MemphisConnectError, MemphisSchemaError
import schema_pb2 as PB
async def main():
memphis = Memphis()
await memphis.connect(host="MEMPHIS_BROKER_URL", username="APPLICATION_TYPE_USERNAME", password="PASSWORD")
producer = await memphis.producer(
station_name="STATION_NAME", producer_name="PRODUCER_NAME")
headers = Headers()
headers.add("key", "value")
obj = PB.Test()
obj.field1 = "Hello"
obj.field2 = "Amazing"
obj.field3 = "World"
try:
await producer.produce(obj, headers=headers)
except Exception as e:
print(e)
finally:
await asyncio.sleep(3)
await memphis.close()
if __name__ == '__main__':
asyncio.run(main())
Memphis abstracts the need for external serialization functions and embeds them within the SDK.
Example schema:
syntax = "proto3";
message Test {
string field1 = 1;
string field2 = 2;
int32 field3 = 3;
}
Producing a message without a local .proto file:
import memphis from 'memphis-dev';
import type { Memphis } from 'memphis-dev/types';
(async function () {
let memphisConnection: Memphis;
try {
memphisConnection = await memphis.connect({
host: 'MEMPHIS_BROKER_URL',
username: 'APPLICATION_TYPE_USERNAME',
password: 'PASSWORD'
});
const producer = await memphisConnection.producer({
stationName: 'STATION_NAME',
producerName: 'PRODUCER_NAME'
});
const headers = memphis.headers()
headers.add('key', 'value');
const msg = {
field1: "Hello",
field2: "Amazing",
field3: "World"
}
await producer.produce({
message: msg,
headers: headers
});
memphisConnection.close();
} catch (ex) {
console.log(ex);
}
})();
In HTTP, we can simply produce an object. Behind the scenes, the object will be serialized based on the attached schema and data format - protobuf.
Example schema:
syntax = "proto3";
message Test {
string field1 = 1;
string field2 = 2;
int32 field3 = 3;
}
Producing a message without a local .proto file:
1
var axios = require('axios');
2
var data = JSON.stringify({
3
"field1": "foo",
4
"field2": "bar",
5
"field3": 123,
6
});
7
8
var config = {
9
method: 'post',
10
url: 'https://BROKER_RESTGW_URL/stations/hps/produce/single',
11
headers: {
12
'Authorization': 'Bearer <jwt>',
13
'Content-Type': 'application/json'
14
},
15
data : data
16
};
17
18
axios(config)
19
.then(function (response) {
20
console.log(JSON.stringify(response.data));
21
})
22
.catch(function (error) {
23
console.log(error);
24
});

Consume a message (Deserialization)

Node.js
Go
Python
TypeScript
1
const memphis = require("memphis-dev");
2
var protobuf = require("protobufjs");
3
4
(async function () {
5
try {
6
await memphis.connect({
7
host: "localhost",
8
username: "root",
9
password: "memphis"
10
});
11
12
const consumer = await memphis.consumer({
13
stationName: "marketing",
14
consumerName: "cons1",
15
consumerGroup: "cg_cons1",
16
maxMsgDeliveries: 3,
17
maxAckTimeMs: 2000,
18
genUniqueSuffix: true
19
});
20
21
const root = await protobuf.load("schema.proto");
22
var TestMessage = root.lookupType("Test");
23
24
consumer.on("message", message => {
25
const x = message.getData()
26
var msg = TestMessage.decode(x);
27
console.log(msg)
28
message.ack();
29
});
30
consumer.on("error", error => {
31
console.log(error);
32
});
33
} catch (ex) {
34
console.log(ex);
35
memphis.close();
36
}
37
})();
package main
import (
"demo/schemapb" // local protobuf struct
"fmt"
"os"
"time"
"github.com/memphisdev/memphis.go"
"google.golang.org/protobuf/proto"
)
type Test struct {
Name string
LastName string
}
func main() {
conn, err := memphis.Connect("MEMPHIS_HOSTNAME", "MEMPHIS_USER", memphis.Password("PASSWORD"))
if err != nil {
os.Exit(1)
}
defer conn.Close()
consumer, err := conn.CreateConsumer("CONSUMER_NAME", "CONSUMER_GROUP_NAME")
if err != nil {
fmt.Printf("Consumer creation failed: %v\n", err)
os.Exit(1)
}
handler := func(msgs []*memphis.Msg, err error) {
if err != nil {
fmt.Printf("Fetch failed: %v\n", err)
return
}
for _, msg := range msgs {
var message schemapb.Test
err := proto.Unmarshal(msg.Data(), &message)
if err != nil {
fmt.Println(err)
}
fmt.Println(&message)
msg.Ack()
}
}
consumer.Consume(handler)
time.Sleep(3000 * time.Second)
}
import asyncio
from memphis import Memphis
import schema_pb2 as PB # protobuf class // .proto file
async def main():
async def msg_handler(msgs, error):
try:
for msg in msgs:
obj = PB.Message()
obj.ParseFromString(msg.get_data())
await msg.ack()
if error:
print(error)
except Exception as e:
print(e)
try:
memphis = Memphis()
await memphis.connect(host="MEMPHIS_HOST", username="MEMPHIS_USERNAME", password="PASSWORD")
consumer = await memphis.consumer(
station_name="STATION_NAME", consumer_name="CONSUMER_NAME")
consumer.consume(msg_handler)
# Keep your main thread alive so the consumer will keep receiving data
await asyncio.Event().wait()
except Exception as e:
print(e)
finally:
await memphis.close()
if __name__ == '__main__':
asyncio.run(main())
import memphis from 'memphis-dev';
import type { Memphis } from 'memphis-dev/types';
var protobuf = require("protobufjs");
(async function () {
let memphisConnection: Memphis;
try {
memphisConnection = await memphis.connect({
host: 'MEMPHIS_BROKER_URL',
username: 'APPLICATION_TYPE_USERNAME',
password: 'PASSWORD'
});
const consumer = await memphis.consumer({
stationName: "STATION_NAME",
consumerName: "CONSUMER_NAME",
consumerGroup: "CONSUMER_GROUP_NAME",
maxMsgDeliveries: 3,
maxAckTimeMs: 2000,
genUniqueSuffix: true
});
const root = await protobuf.load("schema.proto");
var TestMessage = root.lookupType("Test");
consumer.on("message", message => {
const x = message.getData()
var msg = TestMessage.decode(x);
console.log(msg)
message.ack();
});
consumer.on("error", error => {
console.log(error);
});
} catch (ex) {
console.log(ex);
memphis.close();
}
})();type