LogoLogo
CloudDiscordGitHub
  • 👉Getting Started
    • Introduction
    • Quick start
    • Learn by example
    • Case studies
    • How to contribute?
  • ⭐Memphis Broker
    • Architecture
    • Key concepts
      • Message broker
      • Station
      • Producer API
      • Consumer API
      • Consumer Group
      • Storage and Redundancy
      • Security/Authentication
      • Scaling
      • Ordering
      • Dead-letter Station (DLS)
      • Delayed messages
      • Data exchange
      • Idempotency (Duplicate processing)
      • Failover Scenarios
      • Troubleshooting process
      • Connectors
    • Best practices
      • Producer optimization
      • Compression
    • Memphis configuration
    • Comparisons
      • NATS Jetstream vs Memphis
      • RabbitMQ vs Memphis
      • AWS SQS vs Memphis
      • Apache Kafka vs Memphis
      • Apache Pulsar vs Memphis
      • ZeroMQ vs Memphis
      • Apache NiFi vs Memphis
    • Privacy Policy
  • ⭐Memphis Schemaverse
    • Overview
    • Getting started
      • Management
      • Produce/Consume
        • Protobuf
        • JSON Schema
        • GraphQL
        • Avro
    • Comparison
    • KB
  • 📦Open-Source Installation
    • Kubernetes
      • 1 - Installation
      • 2 - Access
      • 3 - Upgrade
      • Terraform
        • Deploy on AWS
        • Deploy on GCP
        • Deploy on DigitalOcean
      • Guides
        • Deploy/Upgrade Memphis utilizing predefined secrets
        • Monitoring/Alerts Recommendations
        • Production Best Practices
        • NGINX Ingress Controller and Cloud-Agnostic Memphis Deployments
        • Migrate Memphis storage between storageClass's
        • Expanding Memphis Disk Storage
        • Scale-out Memphis cluster
        • TLS - Deploy Memphis with TLS Connection to Metadata Frontend
        • TLS - Memphis TLS websocket configuration
        • TLS - Securing Memphis Client with TLS
        • Installing Memphis with an External Metadata Database
    • Docker
      • 1 - Installation
      • 2 - Access
      • 3 - Upgrade
    • Open-source Support
  • Client Libraries
    • REST (Webhook)
    • Node.js / TypeScript / NestJS
    • Go
    • Python
    • Kotlin (Community)
    • .NET
    • Java
    • Rust (Community)
    • NATS
    • Scala
  • 🔌Integrations Center
    • Index
    • Processing
      • Zapier
    • Change data Capture (CDC)
      • Debezium
    • Monitoring
      • Datadog
      • Grafana
    • Notifications
      • Slack
    • Storage tiering
      • S3-Compatible Object Storage
    • Source code
      • GitHub
    • Other platforms
      • Argo
  • 🗒️Release notes
    • KB
    • Releases
      • v1.4.3 - latest/stable
      • v1.4.2
      • v1.4.1
      • v1.4.0
      • v1.3.1
      • v1.3.0
      • v1.2.0
      • v1.1.1
      • v1.1.0
      • v1.0.3
      • v1.0.2
      • v1.0.1
      • V1.0.0 - GA
      • v0.4.5 - beta
      • v0.4.4 - beta
      • v0.4.3 - beta
      • v0.4.2 - beta
      • v0.4.1 - beta
      • v0.4.0 - beta
      • v0.3.6 - beta
      • v0.3.5 - beta
      • v0.3.0 - beta
      • v0.2.2 - beta
      • v0.2.1 - beta
      • v0.2.0 - beta
      • v0.1.0 - beta
Powered by GitBook
LogoLogo

Legal

  • Terms of Service
  • Privacy Policy

All rights reserved to Memphis.dev 2023

On this page
  • Introduction
  • Limitations
  • Replacement process
  • For NATS Jetstream clients
  • For NATS Core clients
  • Important to know
  • Example

Was this helpful?

  1. Client Libraries

NATS

Migrate NATS clients to Memphis

Last updated 1 year ago

Was this helpful?

Have you already tried ?

Introduction

The motivation behind adding compatibility with NATS API is to

  • Enable Memphis users to enjoy the broad reach and integrations of the NATS ecosystem.

  • Enable a lift & shift type of migration from NATS to Memphis.

Limitations

  • NATS SDKs version - Compatibility with NATS Jetstream 2.9 and above.

  • The following Memphis features will not be supported when using NATS SDK:

    • Producers/Consumers' observability

    • Schemaverse

    • Dead-letter station - resend unacked messages

Replacement process

For NATS Jetstream clients

Cloud
  1. Redirect the servers parameter to Memphis Cloud broker hostname. It can be found in the main dashboard.

  1. In Memphis GUI, create a client-type user based on the one you are (or not) using with NATS and concatenate "$MEMPHIS_ACCOUNT_ID" to it.

  2. Replace port 4222 with 6666.

Code Example (Before)

main.py
import asyncio
import nats

async def main():
    connection_opts = {
        "servers": "localhost:4222",
        "allow_reconnect": True,
        "max_reconnect_attempts": 10,
        "reconnect_time_wait": 3,
        "connect_timeout": 15,
        "user":"nats", # Optional in NATS. Mandatory in Memphis.
        "password":"natspassword" # Optional in NATS. Mandatory in Memphis.
    }
    conn = await nats.connect(**connection_opts)

    js = conn.jetstream()
    await js.add_stream(name="test", subjects=["test"])
    await js.publish("test", "hello world".encode())

    await conn.close()

if __name__ == "__main__":
    asyncio.run(main())

Code Example (After)

main.py
import asyncio
import nats

async def main():
    connection_opts = {
        "servers": "aws-eu-central-1.cloud.memphis.dev:6666",
        "allow_reconnect": True,
        "max_reconnect_attempts": 10,
        "reconnect_time_wait": 3,
        "connect_timeout": 15,
        "user":"nats$123456789",
        "password":"natspassword"
    }
    conn = await nats.connect(**connection_opts)

    js = conn.jetstream()
    await js.add_stream(name="test", subjects=["test"])
    await js.publish("test", "hello world".encode())

    await conn.close()

if __name__ == "__main__":
    asyncio.run(main())
Open-source
  1. Redirect the servers parameter to Memphis hostname

  2. Change port 4222 to 6666

  3. In Memphis GUI, create a client-type user based on the one you are (or not) using with NATS

Code Example (Before)

main.py
import asyncio
import nats

async def main():
    connection_opts = {
        "servers": "localhost:4222",
        "allow_reconnect": True,
        "max_reconnect_attempts": 10,
        "reconnect_time_wait": 3,
        "connect_timeout": 15,
        "user":"nats", # Optional in NATS. Mandatory in Memphis.
        "password":"natspassword" # Optional in NATS. Mandatory in Memphis.
    }
    conn = await nats.connect(**connection_opts)

    js = conn.jetstream()
    await js.add_stream(name="test", subjects=["test"])
    await js.publish("test", "hello world".encode())

    await conn.close()

if __name__ == "__main__":
    asyncio.run(main())

Code Example (After)

main.py
import asyncio
import nats

async def main():
    connection_opts = {
        "servers": "localhost:6666",
        "allow_reconnect": True,
        "max_reconnect_attempts": 10,
        "reconnect_time_wait": 3,
        "connect_timeout": 15,
        "user":"nats",
        "password":"natspassword"
    }
    conn = await nats.connect(**connection_opts)

    js = conn.jetstream()
    await js.add_stream(name="test", subjects=["test"])
    await js.publish("test", "hello world".encode())

    await conn.close()

if __name__ == "__main__":
    asyncio.run(main())

For NATS Core clients

All of NATS core features will be supported when communicating with Memphis, but without performing the below procedure, the Memphis platform will not be able to control and display the created objects, and therefore it is not recommended.

Memphis operates at the stream level. For a NATS subject to be visible and managed by Memphis, it must first be wrapped by a stream.

Follow the below instructions based on your Memphis deployment type:

Cloud
  1. Perform the below instructions. Needed information can be found in the main dashboard.

nats stream add -s <MEMPHIS_BROKER_HOSTNAME>:6666 --user='<MEMPHIS_CLIENT_USER>$<ACCOUNT_ID>' --password='<MEMPHIS_CLIENT_USER_PASSWORD>' --timeout=10s

Example:

nats stream add -s aws-eu-central-1.cloud.memphis.dev:6666 --user='nats$123456789' --password='natsmemphis!@#' --timeout=10s

Important to know! Memphis.dev Cloud only supports Replication factor 3 and storage type file

? Subjects test
? Storage file
? Replication 1
? Retention Policy Limits
? Discard Policy New
? Stream Messages Limit -1
? Per Subject Messages Limit -1
? Total Stream Size -1
? Message TTL -1
? Max Message Size -1
? Duplicate tracking time window 2m0s
? Allow message Roll-ups Yes
? Allow message deletion Yes
? Allow purging subjects or the entire stream Yes

Allowed characters for stream name. Any other character will not be accepted.

  • a-z/A-Z

  • 0-9

  • _ -

Replacements in the client's code -

  1. Redirect the servers parameter to Memphis Cloud broker hostname. It can be found in the main dashboard.

  2. Change port 4222 to 6666

  3. In Memphis GUI, create a client-type user based on the one you are (or not) using with NATS

Code Example (Before)

main.py
import asyncio
import nats

async def main():
    connection_opts = {
        "servers": "localhost:4222",
        "allow_reconnect": True,
        "max_reconnect_attempts": 10,
        "reconnect_time_wait": 3,
        "connect_timeout": 15,
        "user":"nats", # Optional in NATS. Mandatory in Memphis.
        "password":"natspassword" # Optional in NATS. Mandatory in Memphis.
    }
    conn = await nats.connect(**connection_opts)

    await conn.publish("test", "hello world".encode())
    await conn.close()

if __name__ == "__main__":
    asyncio.run(main())

Code Example (After)

main.py
import asyncio
import nats

async def main():
    connection_opts = {
        "servers": "aws-eu-central-1.cloud.memphis.dev:6666",
        "allow_reconnect": True,
        "max_reconnect_attempts": 10,
        "reconnect_time_wait": 3,
        "connect_timeout": 15,
        "user":"nats$123456789", # Optional in NATS. Mandatory in Memphis.
        "password":"natspassword" # Optional in NATS. Mandatory in Memphis.
    }
    conn = await nats.connect(**connection_opts)

    await conn.publish("test", "hello world".encode())
    await conn.close()

if __name__ == "__main__":
    asyncio.run(main())
Open-source
  1. Perform the below instructions based on your Memphis type of authentication:

nats stream add -s <MEMPHIS_BROKER_URL>:6666 --user=<MEMPHIS_CLIENT_USER> --password=<MEMPHIS_CLIENT_USER_PASSWORD>

Walkthrough example

? Subjects test
? Storage file
? Replication 1
? Retention Policy Limits
? Discard Policy New
? Stream Messages Limit -1
? Per Subject Messages Limit -1
? Total Stream Size -1
? Message TTL -1
? Max Message Size -1
? Duplicate tracking time window 2m0s
? Allow message Roll-ups Yes
? Allow message deletion Yes
? Allow purging subjects or the entire stream Yes

When using Memphis Connection token-based authentication (Legacy OS):

nats stream add -s <MEMPHIS_BROKER_URL>:6666 --user=<MEMPHIS_APPLICATION_USER>::<MEMPHIS_CONNECTION_TOKEN> 

Allowed characters for stream name. Any other character will not be accepted.

  • a-z/A-Z

  • 0-9

  • _ -

Replacements in the client's code -

  1. Redirect the servers parameter to Memphis broker hostname.

  2. Change port 4222 to 6666

  3. In Memphis GUI, create a client-type user based on the one you are (or not) using with NATS

Code Example (Before)

main.py
import asyncio
import nats

async def main():
    connection_opts = {
        "servers": "localhost:4222",
        "allow_reconnect": True,
        "max_reconnect_attempts": 10,
        "reconnect_time_wait": 3,
        "connect_timeout": 15,
        "user":"nats", # Optional in NATS. Mandatory in Memphis.
        "password":"natspassword" # Optional in NATS. Mandatory in Memphis.
    }
    conn = await nats.connect(**connection_opts)

    await conn.publish("test", "hello world".encode())
    await conn.close()

if __name__ == "__main__":
    asyncio.run(main())

Code Example (After)

main.py
import asyncio
import nats

async def main():
    connection_opts = {
        "servers": "localhost:6666",
        "allow_reconnect": True,
        "max_reconnect_attempts": 10,
        "reconnect_time_wait": 3,
        "connect_timeout": 15,
        "user":"nats", # Optional in NATS. Mandatory in Memphis.
        "password":"natspassword" # Optional in NATS. Mandatory in Memphis.
    }
    conn = await nats.connect(**connection_opts)

    await conn.publish("test", "hello world".encode())
    await conn.close()

if __name__ == "__main__":
    asyncio.run(main())

Important to know

  • Messages' producers' names will be displayed as "Unknown".

  • stream names in NATS are case sensitive, while in Memphis, they are lower-cased, so please consider using only lower-cased names.

  • In case a station has been created using Memphis GUI/SDK, and you want to produce messages to the same created station, you will have to send the messages into a subject called <stream_name>$<partition_number(starts from 1)>.final.

  • In case your station name contains a '.' sign replace it with '#' sign in the subject name level.

Example

Using Memphis NATS API compatibility to integrate Memphis with Argo

Install .

Install .

NATS CLI
NATS CLI
Argo
Memphis.dev Cloud
Page cover image