LogoLogo
CloudDiscordGitHub
  • 👉Getting Started
    • Introduction
    • Quick start
    • Learn by example
    • Case studies
    • How to contribute?
  • ⭐Memphis Broker
    • Architecture
    • Key concepts
      • Message broker
      • Station
      • Producer API
      • Consumer API
      • Consumer Group
      • Storage and Redundancy
      • Security/Authentication
      • Scaling
      • Ordering
      • Dead-letter Station (DLS)
      • Delayed messages
      • Data exchange
      • Idempotency (Duplicate processing)
      • Failover Scenarios
      • Troubleshooting process
      • Connectors
    • Best practices
      • Producer optimization
      • Compression
    • Memphis configuration
    • Comparisons
      • NATS Jetstream vs Memphis
      • RabbitMQ vs Memphis
      • AWS SQS vs Memphis
      • Apache Kafka vs Memphis
      • Apache Pulsar vs Memphis
      • ZeroMQ vs Memphis
      • Apache NiFi vs Memphis
    • Privacy Policy
  • ⭐Memphis Schemaverse
    • Overview
    • Getting started
      • Management
      • Produce/Consume
        • Protobuf
        • JSON Schema
        • GraphQL
        • Avro
    • Comparison
    • KB
  • 📦Open-Source Installation
    • Kubernetes
      • 1 - Installation
      • 2 - Access
      • 3 - Upgrade
      • Terraform
        • Deploy on AWS
        • Deploy on GCP
        • Deploy on DigitalOcean
      • Guides
        • Deploy/Upgrade Memphis utilizing predefined secrets
        • Monitoring/Alerts Recommendations
        • Production Best Practices
        • NGINX Ingress Controller and Cloud-Agnostic Memphis Deployments
        • Migrate Memphis storage between storageClass's
        • Expanding Memphis Disk Storage
        • Scale-out Memphis cluster
        • TLS - Deploy Memphis with TLS Connection to Metadata Frontend
        • TLS - Memphis TLS websocket configuration
        • TLS - Securing Memphis Client with TLS
        • Installing Memphis with an External Metadata Database
    • Docker
      • 1 - Installation
      • 2 - Access
      • 3 - Upgrade
    • Open-source Support
  • Client Libraries
    • REST (Webhook)
    • Node.js / TypeScript / NestJS
    • Go
    • Python
    • Kotlin (Community)
    • .NET
    • Java
    • Rust (Community)
    • NATS
    • Scala
  • 🔌Integrations Center
    • Index
    • Processing
      • Zapier
    • Change data Capture (CDC)
      • Debezium
    • Monitoring
      • Datadog
      • Grafana
    • Notifications
      • Slack
    • Storage tiering
      • S3-Compatible Object Storage
    • Source code
      • GitHub
    • Other platforms
      • Argo
  • 🗒️Release notes
    • KB
    • Releases
      • v1.4.3 - latest/stable
      • v1.4.2
      • v1.4.1
      • v1.4.0
      • v1.3.1
      • v1.3.0
      • v1.2.0
      • v1.1.1
      • v1.1.0
      • v1.0.3
      • v1.0.2
      • v1.0.1
      • V1.0.0 - GA
      • v0.4.5 - beta
      • v0.4.4 - beta
      • v0.4.3 - beta
      • v0.4.2 - beta
      • v0.4.1 - beta
      • v0.4.0 - beta
      • v0.3.6 - beta
      • v0.3.5 - beta
      • v0.3.0 - beta
      • v0.2.2 - beta
      • v0.2.1 - beta
      • v0.2.0 - beta
      • v0.1.0 - beta
Powered by GitBook
LogoLogo

Legal

  • Terms of Service
  • Privacy Policy

All rights reserved to Memphis.dev 2023

On this page
  • Supported versions
  • Supported Features
  • Getting started
  • How to produce a message
  • How to consume a message (Deserialization)

Was this helpful?

  1. Memphis Schemaverse
  2. Getting started
  3. Produce/Consume

Protobuf

Last updated 1 year ago

Was this helpful?

is a free and open-source cross-platform data format used to serialize structured data, Initially released on July 7, 2008. It is useful in developing programs to communicate with each other over a network or for storing data. The method involves an interface description language that describes the structure of some data and a program that generates source code from that description for generating or parsing a stream of bytes that represents the structured data.

Supported versions

  • proto2

  • proto3

Supported Features

  • Retrieve compiled protobuf schemas (Produce messages without .proto files)

  • Versioning

  • Embedded serialization

  • Live evolution

  • Import packages (soon)

  • Import types (soon)

Getting started

How to produce a message

Memphis abstracts the need for external serialization functions and embeds them within the SDK.

In node.js, we can simply produce an object. Behind the scenes, the object will be serialized based on the attached schema and data format - protobuf.

Example schema: (No need to compile)

syntax = "proto3";
message Test {
            string field1 = 1;
            string  field2 = 2;
            int32  field3 = 3;
}

Producing a message without a local .proto file:

const { memphis } = require("memphis-dev");

(async function () {
    let memphisConnection

    try {
        memphisConnection = await memphis.connect({
            host: "MEMPHIS_BROKER_URL",
            username: "APPLICATION_USER",
            password: "PASSWORD",
            accountId: ACCOUNT_ID //*optional* In case you are using Memphis.dev cloud
        });
        const producer = await memphisConnection.producer({
            stationName: "STATION_NAME",
            producerName: "PRODUCER_NAME"
        });
        var payload = {
            field1: "AwesomeString",
            field2: "AwesomeString",
            field3: 54
        };
        await producer.produce({
            message: payload
        })
        memphisConnection.close();

    } catch (ex) {
        console.log(ex);
        if (memphisConnection) memphisConnection.close();
    }
})();

Memphis abstracts the need for external serialization functions and embeds them within the SDK. Example proto file:

syntax = "proto3";
option go_package = "./";

message Test {
            string field1 = 1;
            string field2 = 2;
            int32 field3 = 3;
}

To compile the proto file, run the following command:

protoc --go_out=. ./{proto file name}

Producing a message without a local .proto file:

package main

import (
    "fmt"
    "os"
    "github.com/memphisdev/memphis.go"
)

func main() {
    conn, err := memphis.Connect(
        "MEMPHIS_BROKER_URL", 
        "APPLICATION_TYPE_USERNAME", 
        memphis.Password("PASSWORD"),
        memphis.AccountId(123456789), //*optional* In case you are using Memphis.dev cloud
        )
    if err != nil {
        os.Exit(1)
    }
    defer conn.Close()
    p, err := conn.CreateProducer("STATION_NAME", "PRODUCER_NAME")

    hdrs := memphis.Headers{}
    hdrs.New()
    err = hdrs.Add("key", "value")

    if err != nil {
        fmt.Printf("Header failed: %v\n", err)
        os.Exit(1)
    }
	msg := make(map[string]interface{})
	msg["field1"] = "value1"
	msg["field2"] = "value2"
	msg["field3"] = 32

    err = p.Produce(msg, memphis.MsgHeaders(hdrs))

    if err != nil {
        fmt.Printf("Produce failed: %v\n", err)
        os.Exit(1)
    }
}

Producing a message with a local .proto file:

package main

import (
    "fmt"
    "os"
    "demo/schemapb"
    "github.com/memphisdev/memphis.go"
)

func main() {
    conn, err := memphis.Connect(
        "MEMPHIS_BROKER_URL", 
        "APPLICATION_TYPE_USERNAME", 
        memphis.Password("PASSWORD"),
        memphis.AccountId(123456789), //*optional* In case you are using Memphis.dev cloud
        )
    if err != nil {
        os.Exit(1)
    }
    defer conn.Close()
    p, err := conn.CreateProducer("STATION_NAME", "PRODUCER_NAME")

    hdrs := memphis.Headers{}
    hdrs.New()
    err = hdrs.Add("key", "value")
    if err != nil {
        fmt.Printf("Header failed: %v\n", err)
        os.Exit(1)
    }
    s1 := "Hello"
    s2 := "World"
    pbInstance := schemapb.Test{
	Field1: s1,
	Field2: s2,
    }

    err = p.Produce(&pbInstance, memphis.MsgHeaders(hdrs))
    if err != nil {
        fmt.Printf("Produce failed: %v\n", err)
        os.Exit(1)
    }
}
        

Memphis abstracts the need for external serialization functions and embeds them within the SDK.

Example schema:

syntax = "proto3";
message Test {
            string field1 = 1;
            string field2 = 2;
            int32 field3 = 3;
}

To compile the proto file, run the following command:

protoc --go_out=. ./{proto file name}

Producing a message with a local .proto file:

import asyncio
from memphis import Memphis, Headers, MemphisError, MemphisConnectError, MemphisSchemaError

import schema_pb2 as PB

async def main():
    memphis = Memphis()
    await memphis.connect(host="MEMPHIS_BROKER_URL", username="APPLICATION_TYPE_USERNAME", password="PASSWORD", account_id=ACCOUNT_ID)
    producer = await memphis.producer(
        station_name="STATION_NAME", producer_name="PRODUCER_NAME")

    headers = Headers()
    headers.add("key", "value")

    obj = PB.Test()
    obj.field1 = "Hello"
    obj.field2 = "Amazing"
    obj.field3 = 32
    
    try:
        await producer.produce(obj, headers=headers)

    except Exception as e:
        print(e)
    finally:
        await asyncio.sleep(3)

    await memphis.close()

if __name__ == '__main__':
    asyncio.run(main())

Memphis abstracts the need for external serialization functions and embeds them within the SDK.

Example schema:

syntax = "proto3";
message Test {
            string field1 = 1;
            string field2 = 2;
            int32 field3 = 3;
}

Producing a message without a local .proto file:

import { memphis, Memphis } from 'memphis-dev';

(async function () {
    let memphisConnection: Memphis;

    try {
        memphisConnection = await memphis.connect({
            host: 'MEMPHIS_BROKER_URL',
            username: 'APPLICATION_TYPE_USERNAME',
            password: 'PASSWORD',
            accountId: ACCOUNT_ID //*optional* In case you are using Memphis.dev cloud
        });

        const producer = await memphisConnection.producer({
            stationName: 'STATION_NAME',
            producerName: 'PRODUCER_NAME'
        });

        const headers = memphis.headers()
        headers.add('key', 'value');
        const msg = {
            field1: "Hello",
            field2: "Amazing",
            field3: 32
        }
        await producer.produce({
            message: msg,
            headers: headers
        });

        memphisConnection.close();
    } catch (ex) {
        console.log(ex);
    }
})();

Memphis abstracts the need for external serialization functions and embeds them within the SDK.

Example schema:

syntax = "proto3";
message Test {
            string field1 = 1;
            string field2 = 2;
            int32 field3 = 3;
}

Producing a message without a local .proto file:

using Memphis.Client.Producer;
using System.Collections.Specialized;
using Memphis.Client;
using ProtoBuf;

var options = MemphisClientFactory.GetDefaultOptions();
options.Host = "<memphis-host>";
options.Username = "<application type username>";
options.ConnectionToken = "<broker-token>";

/**
* In case you are using Memphis.dev cloud
* options.AccountId = "<account-id>";
*/

try
{
    var client = await MemphisClientFactory.CreateClient(options);

    var producer = await client.CreateProducer(new MemphisProducerOptions
    {
        StationName = "<memphis-station-name>",
        ProducerName = "<memphis-prodcducer-name>",
        GenerateUniqueSuffix = true
    });

    NameValueCollection commonHeaders = new()
    {
        {
            "key-1", "value-1"
        }
    };

    Test test = new()
    {
        Field1 = "Hello",
        Field2 = "Amazing",
        Field3 = 32
    };
    using var memoryStream = new MemoryStream();
    Serializer.Serialize(memoryStream, test);
    var message = memoryStream.ToArray();

    await producer.ProduceAsync(message, commonHeaders);
    client.Dispose();
}
catch (Exception exception)
{
    Console.WriteLine($"Error occured: {exception.Message}");
}

[ProtoContract]
class Test
{
    [ProtoMember(1, Name = "field1")]
    public required string Field1 { get; set; }
    [ProtoMember(2, Name = "field2")]
    public required string Field2 { get; set; }
    [ProtoMember(3, Name = "field3")]
    public int Field3 { get; set; }
}

In REST, you can simply produce an object. Behind the scenes, the object will be serialized based on the attached schema and data format - protobuf.

Example schema:

syntax = "proto3";
message Test {
            string field1 = 1;
            string field2 = 2;
            int32 field3 = 3;
}

Producing a message without a local .proto file:

var axios = require('axios');
var data = JSON.stringify({
  "field1": "foo",
  "field2": "bar",
  "field3": 123,
});

var config = {
  method: 'post',
  url: 'https://BROKER_RESTGW_URL/stations/hps/produce/single',
  headers: { 
    'Authorization': 'Bearer <jwt>', 
    'Content-Type': 'application/json'
  },
  data : data
};

axios(config)
.then(function (response) {
  console.log(JSON.stringify(response.data));
})
.catch(function (error) {
  console.log(error);
});

How to consume a message (Deserialization)

const { memphis } = require("memphis-dev");

(async function () {
    try {
        await memphis.connect({
            host: "localhost",
            username: "CLIENT_TYPE_USERNAME",
            password: "PASSWORD"
            accountId: ACCOUNT_ID //*optional* In case you are using Memphis.dev cloud
        });

        const consumer = await memphis.consumer({
            stationName: "marketing",
            consumerName: "cons1",
            consumerGroup: "cg_cons1",
            maxMsgDeliveries: 3,
            maxAckTimeMs: 2000,
            genUniqueSuffix: true
        });

        consumer.on("message", message => {
            console.log(message.getDataDeserialized());
            message.ack();
        });
        consumer.on("error", error => {
            console.log(error);
        });
    } catch (ex) {
        console.log(ex);
        memphis.close();
    }
})();
package main

import (
	"fmt"
	"os"
	"time"

	"github.com/memphisdev/memphis.go"
)

func main() {
	conn, err := memphis.Connect(
        	"MEMPHIS_BROKER_URL", 
	        "APPLICATION_TYPE_USERNAME", 
	        memphis.Password("PASSWORD"),
	        memphis.AccountId(123456789), //*optional* In case you are using Memphis.dev cloud
        )
	if err != nil {
		os.Exit(1)
	}
	defer conn.Close()

	consumer, err := conn.CreateConsumer("CONSUMER_NAME", "CONSUMER_GROUP_NAME")
	if err != nil {
		fmt.Printf("Consumer creation failed: %v\n", err)
		os.Exit(1)
	}

	handler := func(msgs []*memphis.Msg, err error) {
		if err != nil {
			fmt.Printf("Fetch failed: %v\n", err)
			return
		}

		for _, msg := range msgs {
			fmt.Println(string(msg.DataDeserialized()))	
			msg.Ack()
		}
	}

	consumer.Consume(handler)
	time.Sleep(3000 * time.Second)
}
import asyncio
from memphis import Memphis

async def main():
    async def msg_handler(msgs, error):
        try:
            for msg in msgs:
                print("message: ", await msg.get_data_deserialized())
                await msg.ack()
            if error:
                print(error)
        except Exception as e:
            print(e)

    try:
        memphis = Memphis()
        await memphis.connect(host="MEMPHIS_HOST", username="MEMPHIS_USERNAME", password="PASSWORD", account_id=ACCOUNT_ID)
        consumer = await memphis.consumer(
            station_name="STATION_NAME", consumer_name="CONSUMER_NAME")
        consumer.consume(msg_handler)
        # Keep your main thread alive so the consumer will keep receiving data
        await asyncio.Event().wait()
    except Exception as e:
        print(e)
    finally:
        await memphis.close()

if __name__ == '__main__':
    asyncio.run(main())
import { memphis, Memphis } from 'memphis-dev';

(async function () {
    let memphisConnection: Memphis;
    try {
        memphisConnection = await memphis.connect({
            host: 'MEMPHIS_BROKER_URL',
            username: 'APPLICATION_TYPE_USERNAME',
            password: 'PASSWORD',
            accountId: ACCOUNT_ID //*optional* In case you are using Memphis.dev cloud
        });

        const consumer = await memphis.consumer({
            stationName: "STATION_NAME",
            consumerName: "CONSUMER_NAME",
            consumerGroup: "CONSUMER_GROUP_NAME",
            maxMsgDeliveries: 3,
            maxAckTimeMs: 2000,
            genUniqueSuffix: true
        });

        consumer.on("message", message => {
            console.log(message.getDataDeserialized());
            message.ack();
        });
        consumer.on("error", error => {
            console.log(error);
        });
    } catch (ex) {
        console.log(ex);
        memphis.close();
    }
})();type

Example schema:

syntax = "proto3";
message Test {
            string field1 = 1;
            string field2 = 2;
            int32 field3 = 3;
}

Consumption

using Memphis.Client.Consumer;
using Memphis.Client;

var options = MemphisClientFactory.GetDefaultOptions();
options.Host = "<memphis-host>";
options.Username = "<application type username>";
options.ConnectionToken = "<broker-token>";

/**
* In case you are using Memphis.dev cloud
* options.AccountId = "<account-id>";
*/

try
{
    var client = await MemphisClientFactory.CreateClient(options);

    var consumer = await client.CreateConsumer(new MemphisConsumerOptions
    {
        StationName = "<station-name>",
        ConsumerName = "<consumer-name>",
        ConsumerGroup = "<consumer-group-name>",
    });

    consumer.MessageReceived += (sender, args) =>
    {
        if (args.Exception is not null)
        {
            Console.Error.WriteLine(args.Exception);
            return;
        }

        foreach (var msg in args.MessageList)
        {
            Console.WriteLine($"Received data: {msg.GetDeserializedData()}");
        }
    };

    consumer.ConsumeAsync();

    await Task.Delay(TimeSpan.FromMinutes(1));
    await consumer.DestroyAsync();
    client.Dispose();
}
catch (Exception exception)
{
    Console.WriteLine($"Error occured: {exception.Message}");
}
⭐
Protocol Buffers (Protobuf)