Links

NATS

Migrate NATS clients to Memphis
Have you already tried Memphis.dev Cloud?

Introduction

The motivation behind adding compatibility with NATS API is to
  • Enable Memphis users to enjoy the broad reach and integrations of the NATS ecosystem.
  • Enable a lift & shift type of migration from NATS to Memphis.

Limitations

  • NATS SDKs version - Compatibility with NATS Jetstream 2.9 and above.
  • The following Memphis features will not be supported when using NATS SDK:
    • Producers/Consumers' observability
    • Schemaverse
    • Dead-letter station - resend unacked messages

Replacement process

For NATS Jetstream clients

Cloud
  1. 1.
    Redirect the servers parameter to Memphis Cloud broker hostname. It can be found in the main dashboard.
  1. 2.
    In Memphis GUI, create a client-type user based on the one you are (or not) using with NATS and concatenate "$MEMPHIS_ACCOUNT_ID" to it.
  2. 3.
    Replace port 4222 with 6666.
Code Example (Before)
main.py
1
import asyncio
2
import nats
3
4
async def main():
5
connection_opts = {
6
"servers": "localhost:4222",
7
"allow_reconnect": True,
8
"max_reconnect_attempts": 10,
9
"reconnect_time_wait": 3,
10
"connect_timeout": 15,
11
"user":"nats", # Optional in NATS. Mandatory in Memphis.
12
"password":"natspassword" # Optional in NATS. Mandatory in Memphis.
13
}
14
conn = await nats.connect(**connection_opts)
15
16
js = conn.jetstream()
17
await js.add_stream(name="test", subjects=["test"])
18
await js.publish("test", "hello world".encode())
19
20
await conn.close()
21
22
if __name__ == "__main__":
23
asyncio.run(main())
Code Example (After)
main.py
1
import asyncio
2
import nats
3
4
async def main():
5
connection_opts = {
6
"servers": "aws-eu-central-1.cloud.memphis.dev:6666",
7
"allow_reconnect": True,
8
"max_reconnect_attempts": 10,
9
"reconnect_time_wait": 3,
10
"connect_timeout": 15,
11
"user":"nats$123456789",
12
"password":"natspassword"
13
}
14
conn = await nats.connect(**connection_opts)
15
16
js = conn.jetstream()
17
await js.add_stream(name="test", subjects=["test"])
18
await js.publish("test", "hello world".encode())
19
20
await conn.close()
21
22
if __name__ == "__main__":
23
asyncio.run(main())
Open-source
  1. 1.
    Redirect the servers parameter to Memphis hostname
  2. 2.
    Change port 4222 to 6666
  3. 3.
    In Memphis GUI, create a client-type user based on the one you are (or not) using with NATS
Code Example (Before)
main.py
1
import asyncio
2
import nats
3
4
async def main():
5
connection_opts = {
6
"servers": "localhost:4222",
7
"allow_reconnect": True,
8
"max_reconnect_attempts": 10,
9
"reconnect_time_wait": 3,
10
"connect_timeout": 15,
11
"user":"nats", # Optional in NATS. Mandatory in Memphis.
12
"password":"natspassword" # Optional in NATS. Mandatory in Memphis.
13
}
14
conn = await nats.connect(**connection_opts)
15
16
js = conn.jetstream()
17
await js.add_stream(name="test", subjects=["test"])
18
await js.publish("test", "hello world".encode())
19
20
await conn.close()
21
22
if __name__ == "__main__":
23
asyncio.run(main())
Code Example (After)
main.py
1
import asyncio
2
import nats
3
4
async def main():
5
connection_opts = {
6
"servers": "localhost:6666",
7
"allow_reconnect": True,
8
"max_reconnect_attempts": 10,
9
"reconnect_time_wait": 3,
10
"connect_timeout": 15,
11
"user":"nats",
12
"password":"natspassword"
13
}
14
conn = await nats.connect(**connection_opts)
15
16
js = conn.jetstream()
17
await js.add_stream(name="test", subjects=["test"])
18
await js.publish("test", "hello world".encode())
19
20
await conn.close()
21
22
if __name__ == "__main__":
23
asyncio.run(main())

For NATS Core clients

All of NATS core features will be supported when communicating with Memphis, but without performing the below procedure, the Memphis platform will not be able to control and display the created objects, and therefore it is not recommended.
Memphis operates at the stream level. For a NATS subject to be visible and managed by Memphis, it must first be wrapped by a stream.
Follow the below instructions based on your Memphis deployment type:
Cloud
  1. 1.
    Install NATS CLI.
  2. 2.
    Perform the below instructions. Needed information can be found in the main dashboard.
1
nats stream add -s <MEMPHIS_BROKER_HOSTNAME>:6666 --user='<MEMPHIS_CLIENT_USER>$<ACCOUNT_ID>' --password='<MEMPHIS_CLIENT_USER_PASSWORD>' --timeout=10s
Example:
1
nats stream add -s aws-eu-central-1.cloud.memphis.dev:6666 --user='nats$123456789' --password='natsmemphis!@#' --timeout=10s
Important to know! Memphis.dev Cloud only supports Replication factor 3 and storage type file
1
? Subjects test
2
? Storage file
3
? Replication 1
4
? Retention Policy Limits
5
? Discard Policy New
6
? Stream Messages Limit -1
7
? Per Subject Messages Limit -1
8
? Total Stream Size -1
9
? Message TTL -1
10
? Max Message Size -1
11
? Duplicate tracking time window 2m0s
12
? Allow message Roll-ups Yes
13
? Allow message deletion Yes
14
? Allow purging subjects or the entire stream Yes
Allowed characters for stream name. Any other character will not be accepted.
  • a-z/A-Z
  • 0-9
  • _ -
Replacements in the client's code -
  1. 1.
    Redirect the servers parameter to Memphis Cloud broker hostname. It can be found in the main dashboard.
  2. 2.
    Change port 4222 to 6666
  3. 3.
    In Memphis GUI, create a client-type user based on the one you are (or not) using with NATS
Code Example (Before)
main.py
1
import asyncio
2
import nats
3
4
async def main():
5
connection_opts = {
6
"servers": "localhost:4222",
7
"allow_reconnect": True,
8
"max_reconnect_attempts": 10,
9
"reconnect_time_wait": 3,
10
"connect_timeout": 15,
11
"user":"nats", # Optional in NATS. Mandatory in Memphis.
12
"password":"natspassword" # Optional in NATS. Mandatory in Memphis.
13
}
14
conn = await nats.connect(**connection_opts)
15
16
await conn.publish("test", "hello world".encode())
17
await conn.close()
18
19
if __name__ == "__main__":
20
asyncio.run(main())
Code Example (After)
main.py
1
import asyncio
2
import nats
3
4
async def main():
5
connection_opts = {
6
"servers": "aws-eu-central-1.cloud.memphis.dev:6666",
7
"allow_reconnect": True,
8
"max_reconnect_attempts": 10,
9
"reconnect_time_wait": 3,
10
"connect_timeout": 15,
11
"user":"nats$123456789", # Optional in NATS. Mandatory in Memphis.
12
"password":"natspassword" # Optional in NATS. Mandatory in Memphis.
13
}
14
conn = await nats.connect(**connection_opts)
15
16
await conn.publish("test", "hello world".encode())
17
await conn.close()
18
19
if __name__ == "__main__":
20
asyncio.run(main())
Open-source
  1. 1.
    Install NATS CLI.
  2. 2.
    Perform the below instructions based on your Memphis type of authentication:
1
nats stream add -s <MEMPHIS_BROKER_URL>:6666 --user=<MEMPHIS_CLIENT_USER> --password=<MEMPHIS_CLIENT_USER_PASSWORD>
Walkthrough example
1
? Subjects test
2
? Storage file
3
? Replication 1
4
? Retention Policy Limits
5
? Discard Policy New
6
? Stream Messages Limit -1
7
? Per Subject Messages Limit -1
8
? Total Stream Size -1
9
? Message TTL -1
10
? Max Message Size -1
11
? Duplicate tracking time window 2m0s
12
? Allow message Roll-ups Yes
13
? Allow message deletion Yes
14
? Allow purging subjects or the entire stream Yes

When using Memphis Connection token-based authentication (Legacy OS):

1
nats stream add -s <MEMPHIS_BROKER_URL>:6666 --user=<MEMPHIS_APPLICATION_USER>::<MEMPHIS_CONNECTION_TOKEN>
Allowed characters for stream name. Any other character will not be accepted.
  • a-z/A-Z
  • 0-9
  • _ -
Replacements in the client's code -
  1. 1.
    Redirect the servers parameter to Memphis broker hostname.
  2. 2.
    Change port 4222 to 6666
  3. 3.
    In Memphis GUI, create a client-type user based on the one you are (or not) using with NATS
Code Example (Before)
main.py
1
import asyncio
2
import nats
3
4
async def main():
5
connection_opts = {
6
"servers": "localhost:4222",
7
"allow_reconnect": True,
8
"max_reconnect_attempts": 10,
9
"reconnect_time_wait": 3,
10
"connect_timeout": 15,
11
"user":"nats", # Optional in NATS. Mandatory in Memphis.
12
"password":"natspassword" # Optional in NATS. Mandatory in Memphis.
13
}
14
conn = await nats.connect(**connection_opts)
15
16
await conn.publish("test", "hello world".encode())
17
await conn.close()
18
19
if __name__ == "__main__":
20
asyncio.run(main())
Code Example (After)
main.py
1
import asyncio
2
import nats
3
4
async def main():
5
connection_opts = {
6
"servers": "localhost:6666",
7
"allow_reconnect": True,
8
"max_reconnect_attempts": 10,
9
"reconnect_time_wait": 3,
10
"connect_timeout": 15,
11
"user":"nats", # Optional in NATS. Mandatory in Memphis.
12
"password":"natspassword" # Optional in NATS. Mandatory in Memphis.
13
}
14
conn = await nats.connect(**connection_opts)
15
16
await conn.publish("test", "hello world".encode())
17
await conn.close()
18
19
if __name__ == "__main__":
20
asyncio.run(main())

Important to know

  • Messages' producers' names will be displayed as "Unknown".
  • stream names in NATS are case sensitive, while in Memphis, they are lower-cased, so please consider using only lower-cased names.
  • In case a station has been created using Memphis GUI/SDK, and you want to produce messages to the same created station, you will have to send the messages into a subject called <stream_name>$<partition_number(starts from 1)>.final.
  • In case your station name contains a '.' sign replace it with '#' sign in the subject name level.

Example

Using Memphis NATS API compatibility to integrate Memphis with Argo