LogoLogo
CloudDiscordGitHub
  • 👉Getting Started
    • Introduction
    • Quick start
    • Learn by example
    • Case studies
    • How to contribute?
  • ⭐Memphis Broker
    • Architecture
    • Key concepts
      • Message broker
      • Station
      • Producer API
      • Consumer API
      • Consumer Group
      • Storage and Redundancy
      • Security/Authentication
      • Scaling
      • Ordering
      • Dead-letter Station (DLS)
      • Delayed messages
      • Data exchange
      • Idempotency (Duplicate processing)
      • Failover Scenarios
      • Troubleshooting process
      • Connectors
    • Best practices
      • Producer optimization
      • Compression
    • Memphis configuration
    • Comparisons
      • NATS Jetstream vs Memphis
      • RabbitMQ vs Memphis
      • AWS SQS vs Memphis
      • Apache Kafka vs Memphis
      • Apache Pulsar vs Memphis
      • ZeroMQ vs Memphis
      • Apache NiFi vs Memphis
    • Privacy Policy
  • ⭐Memphis Schemaverse
    • Overview
    • Getting started
      • Management
      • Produce/Consume
        • Protobuf
        • JSON Schema
        • GraphQL
        • Avro
    • Comparison
    • KB
  • 📦Open-Source Installation
    • Kubernetes
      • 1 - Installation
      • 2 - Access
      • 3 - Upgrade
      • Terraform
        • Deploy on AWS
        • Deploy on GCP
        • Deploy on DigitalOcean
      • Guides
        • Deploy/Upgrade Memphis utilizing predefined secrets
        • Monitoring/Alerts Recommendations
        • Production Best Practices
        • NGINX Ingress Controller and Cloud-Agnostic Memphis Deployments
        • Migrate Memphis storage between storageClass's
        • Expanding Memphis Disk Storage
        • Scale-out Memphis cluster
        • TLS - Deploy Memphis with TLS Connection to Metadata Frontend
        • TLS - Memphis TLS websocket configuration
        • TLS - Securing Memphis Client with TLS
        • Installing Memphis with an External Metadata Database
    • Docker
      • 1 - Installation
      • 2 - Access
      • 3 - Upgrade
    • Open-source Support
  • Client Libraries
    • REST (Webhook)
    • Node.js / TypeScript / NestJS
    • Go
    • Python
    • Kotlin (Community)
    • .NET
    • Java
    • Rust (Community)
    • NATS
    • Scala
  • 🔌Integrations Center
    • Index
    • Processing
      • Zapier
    • Change data Capture (CDC)
      • Debezium
    • Monitoring
      • Datadog
      • Grafana
    • Notifications
      • Slack
    • Storage tiering
      • S3-Compatible Object Storage
    • Source code
      • GitHub
    • Other platforms
      • Argo
  • 🗒️Release notes
    • KB
    • Releases
      • v1.4.3 - latest/stable
      • v1.4.2
      • v1.4.1
      • v1.4.0
      • v1.3.1
      • v1.3.0
      • v1.2.0
      • v1.1.1
      • v1.1.0
      • v1.0.3
      • v1.0.2
      • v1.0.1
      • V1.0.0 - GA
      • v0.4.5 - beta
      • v0.4.4 - beta
      • v0.4.3 - beta
      • v0.4.2 - beta
      • v0.4.1 - beta
      • v0.4.0 - beta
      • v0.3.6 - beta
      • v0.3.5 - beta
      • v0.3.0 - beta
      • v0.2.2 - beta
      • v0.2.1 - beta
      • v0.2.0 - beta
      • v0.1.0 - beta
Powered by GitBook
LogoLogo

Legal

  • Terms of Service
  • Privacy Policy

All rights reserved to Memphis.dev 2023

On this page
  • How to Produce a message (Serialization)
  • Consume a message (Deserialization)

Was this helpful?

  1. Memphis Schemaverse
  2. Getting started
  3. Produce/Consume

GraphQL

Last updated 1 year ago

Was this helpful?

is an open-source data query and manipulation language for APIs, and a runtime for fulfilling queries with existing data. GraphQL was developed internally by Facebook in 2012 before being publicly released in 2015. On 7 November 2018, the GraphQL project was moved from Facebook to the newly established GraphQL Foundation, hosted by the non-profit Linux Foundation.

How to Produce a message (Serialization)

Memphis abstracts the need for external serialization functions and embeds them within the SDK.

Example schema:

type Query {
            greeting:String
            students:[Student]
         }
         
         type Student {
            id:ID!
            firstName:String
            lastName:String
         }

Code (Uint8Arrays):

const memphis = require("memphis-dev");

(async function () {
    try {
        await memphis.connect({
            host: "MEMPHIS_BROKER_URL",
            username: "APPLICATION_USER",
            connectionToken: "CONNECTION_TOKEN",
            // accountId: ACCOUNT_ID //*optional* In case you are using Memphis.dev cloud
        });
        const producer = await memphis.producer({
            stationName: "STATION_NAME",
            producerName: "PRODUCER_NAME"
        });
        const graphqlMsg = 'query myQuery {greeting} mutation msg { updateUserEmail( email:"http://github.com" id:1){id name}}'
        try {
            await producer.produce({
                message: Buffer.from(graphqlMsg)
        });
        } catch (ex) {
            console.log(ex.message)
        }
    } catch (ex) {
        console.log(ex);
        memphis.close();
    }
})();

Code (string):

const memphis = require("memphis-dev");

(async function () {
    try {
        await memphis.connect({
            host: "MEMPHIS_BROKER_URL",
            username: "APPLICATION_USER",
            password: "PASSWORD",
            // accountId: ACCOUNT_ID //*optional* In case you are using Memphis.dev cloud
        });
        const producer = await memphis.producer({
            stationName: "STATION_NAME",
            producerName: "PRODUCER_NAME"
        });
        const graphqlMsg = 'query myQuery {greeting} mutation msg { updateUserEmail( email:"http://github.com" id:1){id name}}'
        try {
            await producer.produce({
                message: graphqlMsg
        });
        } catch (ex) {
            console.log(ex.message)
        }
    } catch (ex) {
        console.log(ex);
        memphis.close();
    }
})();

Code (DocumentNode):

import {parse} from 'graphql'
const memphis = require("memphis-dev");

(async function () {
    try {
        await memphis.connect({
            host: "MEMPHIS_BROKER_URL",
            username: "APPLICATION_USER",
            password: "PASSWORD",
            // accountId: ACCOUNT_ID //*optional* In case you are using Memphis.dev cloud
        });
        const producer = await memphis.producer({
            stationName: "STATION_NAME",
            producerName: "PRODUCER_NAME"
        });
        const graphqlMsg = 'query myQuery {greeting} mutation msg { updateUserEmail( email:"http://github.com" id:1){id name}}'
        const doc = parse(graphqlMsg)
        try {
            await producer.produce({
                message: doc
        });
        } catch (ex) {
            console.log(ex.message)
        }
    } catch (ex) {
        console.log(ex);
        memphis.close();
    }
})();

Memphis abstracts the need for external serialization functions and embeds them within the SDK.

Example schema:

type Query {
            greeting:String
            students:[Student]
         }
         
         type Student {
            id:ID!
            firstName:String
            lastName:String
         }

Code (string):

package main

import (
    "fmt"
    "os"
    "github.com/memphisdev/memphis.go"
)

func main() {
    conn, err := memphis.Connect("MEMPHIS_BROKER_URL", "APPLICATION_TYPE_USERNAME", memphis.Password("PASSWORD"), memphis.AccountId(123456789),)
    if err != nil {
        os.Exit(1)
    }
    defer conn.Close()
    p, err := conn.CreateProducer("STATION_NAME", "PRODUCER_NAME")

    hdrs := memphis.Headers{}
    hdrs.New()
    err = hdrs.Add("key", "value")

    if err != nil {
        fmt.Printf("Header failed: %v\n", err)
        os.Exit(1)
    }
    graphQlExample := `query myQuery {greeting} mutation msg { updateUserEmail( email:"http://github.com" id:1){id name}}`

    err = p.Produce(graphQlExample, memphis.MsgHeaders(hdrs))

    if err != nil {
        fmt.Printf("Produce failed: %v\n", err)
        os.Exit(1)
    }
}

Code ([]byte):

package main

import (
    "fmt"
    "os"
    "github.com/memphisdev/memphis.go"
)

func main() {
    conn, err := memphis.Connect("MEMPHIS_BROKER_URL", "APPLICATION_TYPE_USERNAME", memphis.Password("PASSWORD"), memphis.AccountId(123456789),)
    if err != nil {
        os.Exit(1)
    }
    defer conn.Close()
    p, err := conn.CreateProducer("STATION_NAME", "PRODUCER_NAME")

    hdrs := memphis.Headers{}
    hdrs.New()
    err = hdrs.Add("key", "value")

    if err != nil {
        fmt.Printf("Header failed: %v\n", err)
        os.Exit(1)
    }
    graphQlExample := `query myQuery {greeting} mutation msg { updateUserEmail( email:"http://github.com" id:1){id name}}`

    err = p.Produce([]byte(graphQlExample), memphis.MsgHeaders(hdrs))

    if err != nil {
        fmt.Printf("Produce failed: %v\n", err)
        os.Exit(1)
    }
}

Memphis abstracts the need for external serialization functions and embeds them within the SDK.

Example schema:

type Query {
            greeting:String
            students:[Student]
         }
         
         type Student {
            id:ID!
            firstName:String
            lastName:String
         }

Code (bytearray):

import asyncio
import json
from memphis import Memphis, Headers, MemphisError, MemphisConnectError, MemphisSchemaError

async def main():
    memphis = Memphis()
    await memphis.connect(host="MEMPHIS_URL", username="MEMPHIS_USERNAME", password="PASSWORD", account_id=ACCOUNT_ID)
    producer = await memphis.producer(
        station_name="STATION_NAME", producer_name="PRODUCER_NAME")

    headers = Headers()
    headers.add("key", "value")

    graphqlMsg = 'query myQuery {greeting} mutation msg { updateUserEmail( email:"http://github.com" id:1){id name}}'

    try:
        await producer.produce(bytearray(graphqlExample, 'utf-8'), headers=headers)

    except Exception as e:
        print(e)
    finally:
        await asyncio.sleep(3)

    await memphis.close()

if __name__ == '__main__':
    asyncio.run(main())

Code (string):

import asyncio
import json
from memphis import Memphis, Headers, MemphisError, MemphisConnectError, MemphisSchemaError

async def main():
    memphis = Memphis()
    await memphis.connect(host="MEMPHIS_URL", username="MEMPHIS_USERNAME", password="PASSWORD", account_id=ACCOUNT_ID,)
    producer = await memphis.producer(
        station_name="STATION_NAME", producer_name="PRODUCER_NAME")

    headers = Headers()
    headers.add("key", "value")

    graphqlMsg = 'query myQuery {greeting} mutation msg { updateUserEmail( email:"http://github.com" id:1){id name}}'

    try:
        await producer.produce(graphqlMsg, headers=headers)

    except Exception as e:
        print(e)
    finally:
        await asyncio.sleep(3)

    await memphis.close()

if __name__ == '__main__':
    asyncio.run(main())

Code (graphql.language.ast.DocumentNode):

import asyncio
import json
from memphis import Memphis, Headers, MemphisError, MemphisConnectError, MemphisSchemaError
from graphql import parse

async def main():
    memphis = Memphis()
    await memphis.connect(host="MEMPHIS_URL", username="MEMPHIS_USERNAME", connection_token="CONNECTION_TOKEN", account_id=ACCOUNT_ID)
    producer = await memphis.producer(
        station_name="STATION_NAME", producer_name="PRODUCER_NAME")

    headers = Headers()
    headers.add("key", "value")
    
    graphqlMsg = 'query myQuery {greeting} mutation msg { updateUserEmail( email:"http://github.com" id:1){id name}}'
    document_node = parse(graphqlMsg)

    try:
        await producer.produce(document_node, headers=headers)

    except Exception as e:
        print(e)
    finally:
        await asyncio.sleep(3)

    await memphis.close()

if __name__ == '__main__':
    asyncio.run(main())

Memphis abstracts the need for external serialization functions and embeds them within the SDK.

Example schema:

type Query {
            greeting:String
            students:[Student]
         }
         
         type Student {
            id:ID!
            firstName:String
            lastName:String
         }

Code (Uint8Arrays):

import memphis from 'memphis-dev';
import type { Memphis } from 'memphis-dev/types';

(async function () {
    let memphisConnection: Memphis;

    try {
        memphisConnection = await memphis.connect({
            host: 'MEMPHIS_BROKER_URL',
            username: 'APPLICATION_TYPE_USERNAME',
            password: 'PASSWORD',
            // accountId: ACCOUNT_ID //*optional* In case you are using Memphis.dev cloud
        });

        const producer = await memphisConnection.producer({
            stationName: 'STATION_NAME',
            producerName: 'PRODUCER_NAME'
        });

        const headers = memphis.headers()
        headers.add('key', 'value');
        const graphqlMsg = 'query myQuery {greeting} mutation msg { updateUserEmail( email:"http://github.com" id:1){id name}}'
        
        await producer.produce({
            message: Buffer.from(graphqlMsg),
            headers: headers
        });

        memphisConnection.close();
    } catch (ex) {
        console.log(ex);
    }
})();

Code (string):

import memphis from 'memphis-dev';
import type { Memphis } from 'memphis-dev/types';

(async function () {
    let memphisConnection: Memphis;

    try {
        memphisConnection = await memphis.connect({
            host: 'MEMPHIS_BROKER_URL',
            username: 'APPLICATION_TYPE_USERNAME',
            password: 'PASSWORD',
            // accountId: ACCOUNT_ID //*optional* In case you are using Memphis.dev cloud
        });

        const producer = await memphisConnection.producer({
            stationName: 'STATION_NAME',
            producerName: 'PRODUCER_NAME'
        });

        const headers = memphis.headers()
        headers.add('key', 'value');
        const graphqlMsg = 'query myQuery {greeting} mutation msg { updateUserEmail( email:"http://github.com" id:1){id name}}'
        await producer.produce({
            message: graphqlMsg,
            headers: headers
        });

        memphisConnection.close();
    } catch (ex) {
        console.log(ex);
    }
})();

Code (DocumentNode):

import memphis from 'memphis-dev';
import type { Memphis } from 'memphis-dev/types';
import {parse} from 'graphql'

(async function () {
    let memphisConnection: Memphis;

    try {
        memphisConnection = await memphis.connect({
            host: 'MEMPHIS_BROKER_URL',
            username: 'APPLICATION_TYPE_USERNAME',
            password: 'PASSWORD',
            // accountId: ACCOUNT_ID //*optional* In case you are using Memphis.dev cloud
        });

        const producer = await memphisConnection.producer({
            stationName: 'STATION_NAME',
            producerName: 'PRODUCER_NAME'
        });

        const headers = memphis.headers()
        headers.add('key', 'value');
        const graphqlMsg = 'query myQuery {greeting} mutation msg { updateUserEmail( email:"http://github.com" id:1){id name}}'
        const doc = parse(graphqlMsg)
        await producer.produce({
            message: doc,
            headers: headers
        });

        memphisConnection.close();
    } catch (ex) {
        console.log(ex);
    }
})();

Memphis abstracts the need for external serialization functions and embeds them within the SDK.

Example schema:

type Query {
            greeting:String
            students:[Student]
         }
         
         type Student {
            id:ID!
            firstName:String
            lastName:String
         }

Code:

using System.Collections.Specialized;
using Memphis.Client;
using Memphis.Client.Producer;
using System.Text;


var options = MemphisClientFactory.GetDefaultOptions();
options.Host = "<memphis-host>";
options.Username = "<application type username>";
options.ConnectionToken = "<broker-token>";
/**
* In case you are using Memphis.dev cloud
* options.AccountId = "<account-id>";
*/
try
{
    var client = await MemphisClientFactory.CreateClient(options);

    var producer = await client.CreateProducer(new MemphisProducerOptions
    {
        StationName = "<memphis-station-name>",
        ProducerName = "<memphis-producer-name>",
        GenerateUniqueSuffix = true
    });

    NameValueCollection commonHeaders = new()
    {
        {
            "key-1", "value-1"
        }
    };

    string graphqlMsg = @"query myQuery { greeting } mutation msg { updateUserEmail(email: ""http://github.com"", id: 1) { id name } }";

    await producer.ProduceAsync(Encoding.UTF8.GetBytes(graphqlMsg), commonHeaders);
    client.Dispose();
}
catch (Exception exception)
{
    Console.WriteLine($"Error occured: {exception.Message}");
}

Consume a message (Deserialization)

In coming versions, Memphis will abstract the need for external deserialization functions and embeds them within the SDK.

An example of received schema:

type Query {
            greeting:String
            students:[Student]
         }
         
         type Student {
            id:ID!
            firstName:String
            lastName:String
         }

Code:

const memphis = require('memphis-dev');
const graphql = require('graphql');

(async function () {
    let memphisConnection;

    try {
        memphisConnection = await memphis.connect({
            host: 'MEMPHIS_HOSTNAME',
            username: 'MEMPHIS_USERNAME',
            password: "PASSWORD",
            // accountId: ACCOUNT_ID //*optional* In case you are using Memphis.dev cloud
        });

        const consumer = await memphisConnection.consumer({
            stationName: 'MEMPHIS_STATION',
            consumerName: 'MEMPHIS_CONSUMER',
            consumerGroup: 'MEMPHIS_CG',
        });

        consumer.on('message', (message) => {
            console.log(message.getData().toString());
            const doc = graphql.parse(message.getData().toString())
            console.log("doc graphql", doc)
            message.ack();
            const headers = message.getHeaders()
        });

        consumer.on('error', (error) => {console.log(error)});
    } catch (ex) {
        console.log(ex);
        if (memphisConnection) memphisConnection.close();
    }
})();

In coming versions, Memphis will abstract the need for external deserialization functions and embeds them within the SDK.

An example of received schema:

type Query {
            greeting:String
            students:[Student]
         }
         
         type Student {
            id:ID!
            firstName:String
            lastName:String
         }

Code:

package main

import (
    "fmt"
    "os"
    "time"

    "github.com/memphisdev/memphis.go"
)

func main() {
    conn, err := memphis.Connect("<memphis-host>", "<application type username>", memphis.Password: "password", memphis.AccountId(123456789), //*optional* In case you are using Memphis.dev cloud)
    if err != nil {
        os.Exit(1)
    }
    defer conn.Close()

    consumer, err := conn.CreateConsumer("<station-name>", "<consumer-name>", memphis.PullInterval(15*time.Second))

    if err != nil {
        fmt.Printf("Consumer creation failed: %v\n", err)
        os.Exit(1)
    }

    handler := func(msgs []*memphis.Msg, err error) {
        if err != nil {
            fmt.Printf("Fetch failed: %v\n", err)
            return
        }

        for _, msg := range msgs {
            fmt.Println(string(msg.Data()))
            msg.Ack()
            headers := msg.GetHeaders()
            fmt.Println(headers)
        }
    }

    consumer.Consume(handler)

    // The program will close the connection after 30 seconds,
    // the message handler may be called after the connection closed
    // so the handler may receive a timeout error
    time.Sleep(30 * time.Second)
}

In coming versions, Memphis will abstract the need for external deserialization functions and embeds them within the SDK.

An example of received schema:

type Query {
            greeting:String
            students:[Student]
         }
         
         type Student {
            id:ID!
            firstName:String
            lastName:String
         }

Code:

import asyncio
from memphis import Memphis, MemphisError, MemphisConnectError, MemphisHeaderError
from graphql import parse

async def main():
  async def msg_handler(msgs, error):
    try:
      for msg in msgs:
        message = msg.get_data()
        decoded_str = message.decode("utf-8")
        document_node = parse(decoded_str)
        print("document_node graphQL", document_node.to_dict())
        await msg.ack()
        headers = msg.get_headers()
      if error:
        print(error)
    except (MemphisError, MemphisConnectError, MemphisHeaderError, Exception) as e:
      print(e)
      return

  try:
    memphis = Memphis()
    await memphis.connect(host="MEMPHIS_URL", username="MEMPHIS_USERNAME", password="PASSWORD", account_id=123456789)
    consumer = await memphis.consumer(
      station_name="STATION_NAME", consumer_name="CONSUMER_NAME", consumer_group="CG_NAME")
    consumer.consume(msg_handler)
    # Keep your main thread alive so the consumer will keep receiving data
    await asyncio.Event().wait()


  except (MemphisError, MemphisConnectError) as e:
    print(e)

  finally:
    await memphis.close()

if __name__ == '__main__':
  asyncio.run(main())
import memphis from 'memphis-dev';
import {Memphis, Message} from 'memphis-dev/types';
import {parse} from 'graphql'


(async function () {
    let memphisConnection: Memphis;

    try {
        memphisConnection = await memphis.connect({
            host: 'MEMPHIS_BROKER_URL',
            username: 'APPLICATION_USER',
            password: 'PASSWORD',
            // accountId: ACCOUNT_ID //*optional* In case you are using Memphis.dev cloud
        });

        const consumer = await memphisConnection.consumer({
            stationName: 'STATION_NAME',
            consumerName: 'CONSUMER_NAME',
            consumerGroup: 'CG_NAME', 
        });


        consumer.on('message', (message: Message) => {
            //string or bytearray
            //parsing
            const doc = parse(message.getData().toString())
            const headers = message.getHeaders();
            console.log(doc)
            message.ack();
        });

        consumer.on('error', (error) => {
            console.log(error);
        });
    } catch (ex) {
        
        console.log(ex);
        if (memphisConnection) memphisConnection.close();
    }
})();

In coming versions, Memphis will abstract the need for external deserialization functions and embeds them within the SDK.

An example of received schema:

type Query {
            greeting:String
            students:[Student]
         }
         
         type Student {
            id:ID!
            firstName:String
            lastName:String
         }

Code:

using Memphis.Client.Consumer;
using Memphis.Client;
using ProtoBuf;

var options = MemphisClientFactory.GetDefaultOptions();
options.Host = "<memphis-host>";
options.Username = "<application type username>";
options.ConnectionToken = "<broker-token>";

/**
* In case you are using Memphis.dev cloud
* options.AccountId = "<account-id>";
*/

try
{
    var client = await MemphisClientFactory.CreateClient(options);

    var consumer = await client.CreateConsumer(new MemphisConsumerOptions
    {
        StationName = "<station-name>",
        ConsumerName = "<consumer-name>",
        ConsumerGroup = "<consumer-group-name>",
    });

    consumer.MessageReceived += (sender, args) =>
    {
        if (args.Exception is not null)
        {
            Console.Error.WriteLine(args.Exception);
            return;
        }

        foreach (var msg in args.MessageList)
        {
            var data = msg.GetData();
            if (data is { Length: > 0 })
            {
                Console.WriteLine(data);net
            }
        }
    };

    consumer.ConsumeAsync();

    await Task.Delay(TimeSpan.FromMinutes(1));
    await consumer.DestroyAsync();
    client.Dispose();
}
catch (Exception exception)
{
    Console.WriteLine($"Error occured: {exception.Message}");
}
⭐
GraphQL