LogoLogo
CloudDiscordGitHub
  • 👉Getting Started
    • Introduction
    • Quick start
    • Learn by example
    • Case studies
    • How to contribute?
  • ⭐Memphis Broker
    • Architecture
    • Key concepts
      • Message broker
      • Station
      • Producer API
      • Consumer API
      • Consumer Group
      • Storage and Redundancy
      • Security/Authentication
      • Scaling
      • Ordering
      • Dead-letter Station (DLS)
      • Delayed messages
      • Data exchange
      • Idempotency (Duplicate processing)
      • Failover Scenarios
      • Troubleshooting process
      • Connectors
    • Best practices
      • Producer optimization
      • Compression
    • Memphis configuration
    • Comparisons
      • NATS Jetstream vs Memphis
      • RabbitMQ vs Memphis
      • AWS SQS vs Memphis
      • Apache Kafka vs Memphis
      • Apache Pulsar vs Memphis
      • ZeroMQ vs Memphis
      • Apache NiFi vs Memphis
    • Privacy Policy
  • ⭐Memphis Schemaverse
    • Overview
    • Getting started
      • Management
      • Produce/Consume
        • Protobuf
        • JSON Schema
        • GraphQL
        • Avro
    • Comparison
    • KB
  • 📦Open-Source Installation
    • Kubernetes
      • 1 - Installation
      • 2 - Access
      • 3 - Upgrade
      • Terraform
        • Deploy on AWS
        • Deploy on GCP
        • Deploy on DigitalOcean
      • Guides
        • Deploy/Upgrade Memphis utilizing predefined secrets
        • Monitoring/Alerts Recommendations
        • Production Best Practices
        • NGINX Ingress Controller and Cloud-Agnostic Memphis Deployments
        • Migrate Memphis storage between storageClass's
        • Expanding Memphis Disk Storage
        • Scale-out Memphis cluster
        • TLS - Deploy Memphis with TLS Connection to Metadata Frontend
        • TLS - Memphis TLS websocket configuration
        • TLS - Securing Memphis Client with TLS
        • Installing Memphis with an External Metadata Database
    • Docker
      • 1 - Installation
      • 2 - Access
      • 3 - Upgrade
    • Open-source Support
  • Client Libraries
    • REST (Webhook)
    • Node.js / TypeScript / NestJS
    • Go
    • Python
    • Kotlin (Community)
    • .NET
    • Java
    • Rust (Community)
    • NATS
    • Scala
  • 🔌Integrations Center
    • Index
    • Processing
      • Zapier
    • Change data Capture (CDC)
      • Debezium
    • Monitoring
      • Datadog
      • Grafana
    • Notifications
      • Slack
    • Storage tiering
      • S3-Compatible Object Storage
    • Source code
      • GitHub
    • Other platforms
      • Argo
  • 🗒️Release notes
    • KB
    • Releases
      • v1.4.3 - latest/stable
      • v1.4.2
      • v1.4.1
      • v1.4.0
      • v1.3.1
      • v1.3.0
      • v1.2.0
      • v1.1.1
      • v1.1.0
      • v1.0.3
      • v1.0.2
      • v1.0.1
      • V1.0.0 - GA
      • v0.4.5 - beta
      • v0.4.4 - beta
      • v0.4.3 - beta
      • v0.4.2 - beta
      • v0.4.1 - beta
      • v0.4.0 - beta
      • v0.3.6 - beta
      • v0.3.5 - beta
      • v0.3.0 - beta
      • v0.2.2 - beta
      • v0.2.1 - beta
      • v0.2.0 - beta
      • v0.1.0 - beta
Powered by GitBook
LogoLogo

Legal

  • Terms of Service
  • Privacy Policy

All rights reserved to Memphis.dev 2023

On this page
  • "Schema validation has failed: Invalid message format, expecting protobuf"
  • Release v0.4.0 - For protobuf, consumers are required to self-decode consumed messages with a self-hosted .proto file

Was this helpful?

  1. Memphis Schemaverse

KB

"Schema validation has failed: Invalid message format, expecting protobuf"

When producing messages to Memphis without attaching a schema, the SDK produce function is expecting to receive a bytes array, and therefore the standard message-producing implementation uses Buffer.from(<string>)

When attaching a schema, the producer is forced to pass the format that used in the schema creation, for example protobuf, and therefore we remove the Buffer.from(<string>) and use (example in JS) -

await producer.produce({
    message: payload,
    headers: headers
});

Release v0.4.0 - For protobuf, consumers are required to self-decode consumed messages with a self-hosted .proto file

const memphis = require("memphis-dev");
var protobuf = require("protobufjs");

(async function () {
    try {
        await memphis.connect({
            host: "localhost",
            username: "root",
            connectionToken: "memphis"
        });

        const consumer = await memphis.consumer({
            stationName: "marketing",
            consumerName: "cons1",
            consumerGroup: "cg_cons1",
            maxMsgDeliveries: 3,
            maxAckTimeMs: 2000,
            genUniqueSuffix: true
        });

        const root = await protobuf.load("schema.proto");
        var TestMessage = root.lookupType("Test");

        consumer.on("message", message => {
            const x = message.getData()
            var msg = TestMessage.decode(x);
            console.log(msg)
            message.ack();
        });
        consumer.on("error", error => {
            console.log(error);
        });
    } catch (ex) {
        console.log(ex);
        memphis.close();
    }
})();

Last updated 1 year ago

Was this helpful?

⭐