NATS
Migrate NATS clients to Memphis
The motivation behind adding compatibility with NATS API is to
- Enable Memphis users to enjoy the broad reach and integrations of the NATS ecosystem.
- Enable a lift & shift type of migration from NATS to Memphis.
- NATS SDKs version - Compatibility with NATS Jetstream 2.9 and above.
- The following Memphis features will not be supported when using NATS SDK:
- Producers/Consumers' observability
- Schemaverse
- Dead-letter station - resend unacked messages
- 1.Redirect the
servers
parameter to Memphis Cloud brokerhostname
. It can be found in the main dashboard.

- 2.In Memphis GUI, create a client-type user based on the one you are (or not) using with NATS and concatenate "$MEMPHIS_ACCOUNT_ID" to it.
- 3.Replace port 4222 with 6666.
Code Example (Before)
main.py
1
import asyncio
2
import nats
3
4
async def main():
5
connection_opts = {
6
"servers": "localhost:4222",
7
"allow_reconnect": True,
8
"max_reconnect_attempts": 10,
9
"reconnect_time_wait": 3,
10
"connect_timeout": 15,
11
"user":"nats", # Optional in NATS. Mandatory in Memphis.
12
"password":"natspassword" # Optional in NATS. Mandatory in Memphis.
13
}
14
conn = await nats.connect(**connection_opts)
15
16
js = conn.jetstream()
17
await js.add_stream(name="test", subjects=["test"])
18
await js.publish("test", "hello world".encode())
19
20
await conn.close()
21
22
if __name__ == "__main__":
23
asyncio.run(main())
Code Example (After)
main.py
1
import asyncio
2
import nats
3
4
async def main():
5
connection_opts = {
6
"servers": "aws-eu-central-1.cloud.memphis.dev:6666",
7
"allow_reconnect": True,
8
"max_reconnect_attempts": 10,
9
"reconnect_time_wait": 3,
10
"connect_timeout": 15,
11
"user":"nats$123456789",
12
"password":"natspassword"
13
}
14
conn = await nats.connect(**connection_opts)
15
16
js = conn.jetstream()
17
await js.add_stream(name="test", subjects=["test"])
18
await js.publish("test", "hello world".encode())
19
20
await conn.close()
21
22
if __name__ == "__main__":
23
asyncio.run(main())
- 1.Redirect the
servers
parameter to Memphishostname
- 2.Change port 4222 to 6666
- 3.In Memphis GUI, create a client-type user based on the one you are (or not) using with NATS
Code Example (Before)
main.py
1
import asyncio
2
import nats
3
4
async def main():
5
connection_opts = {
6
"servers": "localhost:4222",
7
"allow_reconnect": True,
8
"max_reconnect_attempts": 10,
9
"reconnect_time_wait": 3,
10
"connect_timeout": 15,
11
"user":"nats", # Optional in NATS. Mandatory in Memphis.
12
"password":"natspassword" # Optional in NATS. Mandatory in Memphis.
13
}
14
conn = await nats.connect(**connection_opts)
15
16
js = conn.jetstream()
17
await js.add_stream(name="test", subjects=["test"])
18
await js.publish("test", "hello world".encode())
19
20
await conn.close()
21
22
if __name__ == "__main__":
23
asyncio.run(main())
Code Example (After)
main.py
1
import asyncio
2
import nats
3
4
async def main():
5
connection_opts = {
6
"servers": "localhost:6666",
7
"allow_reconnect": True,
8
"max_reconnect_attempts": 10,
9
"reconnect_time_wait": 3,
10
"connect_timeout": 15,
11
"user":"nats",
12
"password":"natspassword"
13
}
14
conn = await nats.connect(**connection_opts)
15
16
js = conn.jetstream()
17
await js.add_stream(name="test", subjects=["test"])
18
await js.publish("test", "hello world".encode())
19
20
await conn.close()
21
22
if __name__ == "__main__":
23
asyncio.run(main())
All of NATS core features will be supported when communicating with Memphis, but without performing the below procedure, the Memphis platform will not be able to control and display the created objects, and therefore it is not recommended.
Memphis operates at the stream level. For a NATS
subject
to be visible and managed by Memphis, it must first be wrapped by a stream
.

Follow the below instructions based on your Memphis deployment type:
- 1.
- 2.Perform the below instructions. Needed information can be found in the main dashboard.

1
nats stream add -s <MEMPHIS_BROKER_HOSTNAME>:6666 --user='<MEMPHIS_CLIENT_USER>$<ACCOUNT_ID>' --password='<MEMPHIS_CLIENT_USER_PASSWORD>' --timeout=10s
Example:
1
nats stream add -s aws-eu-central-1.cloud.memphis.dev:6666 --user='nats$123456789' --password='natsmemphis!@#' --timeout=10s
Important to know! Memphis.dev Cloud only supports
Replication factor
3 and storage type
file1
? Subjects test
2
? Storage file
3
? Replication 1
4
? Retention Policy Limits
5
? Discard Policy New
6
? Stream Messages Limit -1
7
? Per Subject Messages Limit -1
8
? Total Stream Size -1
9
? Message TTL -1
10
? Max Message Size -1
11
? Duplicate tracking time window 2m0s
12
? Allow message Roll-ups Yes
13
? Allow message deletion Yes
14
? Allow purging subjects or the entire stream Yes
Allowed characters for
stream
name. Any other character will not be accepted.- a-z/A-Z
- 0-9
- _ -
Replacements in the client's code -
- 1.Redirect the
servers
parameter to Memphis Cloud brokerhostname
. It can be found in the main dashboard. - 2.Change port 4222 to 6666
- 3.In Memphis GUI, create a client-type user based on the one you are (or not) using with NATS
Code Example (Before)
main.py
1
import asyncio
2
import nats
3
4
async def main():
5
connection_opts = {
6
"servers": "localhost:4222",
7
"allow_reconnect": True,
8
"max_reconnect_attempts": 10,
9
"reconnect_time_wait": 3,
10
"connect_timeout": 15,
11
"user":"nats", # Optional in NATS. Mandatory in Memphis.
12
"password":"natspassword" # Optional in NATS. Mandatory in Memphis.
13
}
14
conn = await nats.connect(**connection_opts)
15
16
await conn.publish("test", "hello world".encode())
17
await conn.close()
18
19
if __name__ == "__main__":
20
asyncio.run(main())
Code Example (After)
main.py
1
import asyncio
2
import nats
3
4
async def main():
5
connection_opts = {
6
"servers": "aws-eu-central-1.cloud.memphis.dev:6666",
7
"allow_reconnect": True,
8
"max_reconnect_attempts": 10,
9
"reconnect_time_wait": 3,
10
"connect_timeout": 15,
11
"user":"nats$123456789", # Optional in NATS. Mandatory in Memphis.
12
"password":"natspassword" # Optional in NATS. Mandatory in Memphis.
13
}
14
conn = await nats.connect(**connection_opts)
15
16
await conn.publish("test", "hello world".encode())
17
await conn.close()
18
19
if __name__ == "__main__":
20
asyncio.run(main())
- 1.
- 2.Perform the below instructions based on your Memphis type of authentication:
1
nats stream add -s <MEMPHIS_BROKER_URL>:6666 --user=<MEMPHIS_CLIENT_USER> --password=<MEMPHIS_CLIENT_USER_PASSWORD>
Walkthrough example
1
? Subjects test
2
? Storage file
3
? Replication 1
4
? Retention Policy Limits
5
? Discard Policy New
6
? Stream Messages Limit -1
7
? Per Subject Messages Limit -1
8
? Total Stream Size -1
9
? Message TTL -1
10
? Max Message Size -1
11
? Duplicate tracking time window 2m0s
12
? Allow message Roll-ups Yes
13
? Allow message deletion Yes
14
? Allow purging subjects or the entire stream Yes
1
nats stream add -s <MEMPHIS_BROKER_URL>:6666 --user=<MEMPHIS_APPLICATION_USER>::<MEMPHIS_CONNECTION_TOKEN>
Allowed characters for
stream
name. Any other character will not be accepted.- a-z/A-Z
- 0-9
- _ -
Replacements in the client's code -
- 1.Redirect the
servers
parameter to Memphis brokerhostname
. - 2.Change port 4222 to 6666
- 3.In Memphis GUI, create a client-type user based on the one you are (or not) using with NATS
Code Example (Before)
main.py
1
import asyncio
2
import nats
3
4
async def main():
5
connection_opts = {
6
"servers": "localhost:4222",
7
"allow_reconnect": True,
8
"max_reconnect_attempts": 10,
9
"reconnect_time_wait": 3,
10
"connect_timeout": 15,
11
"user":"nats", # Optional in NATS. Mandatory in Memphis.
12
"password":"natspassword" # Optional in NATS. Mandatory in Memphis.
13
}
14
conn = await nats.connect(**connection_opts)
15
16
await conn.publish("test", "hello world".encode())
17
await conn.close()
18
19
if __name__ == "__main__":
20
asyncio.run(main())
Code Example (After)
main.py
1
import asyncio
2
import nats
3
4
async def main():
5
connection_opts = {
6
"servers": "localhost:6666",
7
"allow_reconnect": True,
8
"max_reconnect_attempts": 10,
9
"reconnect_time_wait": 3,
10
"connect_timeout": 15,
11
"user":"nats", # Optional in NATS. Mandatory in Memphis.
12
"password":"natspassword" # Optional in NATS. Mandatory in Memphis.
13
}
14
conn = await nats.connect(**connection_opts)
15
16
await conn.publish("test", "hello world".encode())
17
await conn.close()
18
19
if __name__ == "__main__":
20
asyncio.run(main())
- Messages' producers' names will be displayed as "Unknown".
stream
names in NATS are case sensitive, while in Memphis, they are lower-cased, so please consider using only lower-cased names.- In case a station has been created using Memphis GUI/SDK, and you want to produce messages to the same created station, you will have to send the messages into a subject called
<stream_name>$<partition_number(starts from 1)>.final
. - In case your station name contains a '
.
' sign replace it with '#
' sign in the subject name level.
Using Memphis NATS API compatibility to integrate Memphis with Argo
Last modified 3d ago