Getting Started
Accelerate Development of Real-Time Applications with Memphis.dev Cloud
Welcome to our getting started guide with Memphis.dev cloud!
In today's fast-paced and interconnected world, event streaming is the engine behind communication. Whether you are an individual, a small business, or an enterprise, harnessing the power of the Memphis.dev Cloud can bring unparalleled scalability, flexibility, and efficiency to your operations.
This section aims to provide a solid foundation to embark on your streaming journey with Memphis.dev, offering clear explanations, step-by-step instructions, and valuable insights into the key concepts, services, and best practices that will empower you to make the most of the Memphis platform.
Let's dive in and unlock the true potential of Memphis.dev together!

Memphis.dev cloud signup
To get you up and running quickly, we created the following tutorial to guide you through the different key components. Feel free to skip it if you feel comfortable enough.


Upper bar -
- 1.Stations: Current amount of stations.
- 2.Slow consumption stations: Stations with a growing latency or a "lag" in one or more consumer groups.
- 3.Stored messages: Total amount of held events.
- 4.Dead-letter messages: Total amount of dead-letter messages across the system.
- 5.Hostname: Broker hostname to be used when connecting to Memphis using clients.
- 6.Account ID: Account ID to be used when connecting to Memphis using clients.
Go
Python
Node.js
TypeScript
.NET
Step 1: Create an empty dir for the Go project
mkdir memphis-demo && \
cd memphis-demo
Step 2: Init the newly created project
go mod init memphis-demo
Step 3: In your project's directory, install Memphis Go SDK
go get github.com/memphisdev/memphis.go
Step 4: Create a new Go file called
producer.go
producer.go
1
package main
2
3
import (
4
"fmt"
5
"os"
6
7
"github.com/memphisdev/memphis.go"
8
)
9
10
func main() {
11
conn, err := memphis.Connect("BROKER_HOSTNAME",
12
"APPLICATION_TYPE_USERNAME",
13
memphis.Password("PASSWORD"), // depends on how Memphis deployed - default is connection token-based authentication
14
memphis.AccountId(123456789),
15
)
16
if err != nil {
17
os.Exit(1)
18
}
19
defer conn.Close()
20
p, err := conn.CreateProducer("STATION_NAME", "PRODUCER_NAME")
21
22
hdrs := memphis.Headers{}
23
hdrs.New()
24
err = hdrs.Add("key", "value")
25
26
if err != nil {
27
fmt.Errorf("Header failed: %v", err)
28
os.Exit(1)
29
}
30
31
err = p.Produce([]byte("You have a message!"), memphis.MsgHeaders(hdrs))
32
33
if err != nil {
34
fmt.Errorf("Produce failed: %v", err)
35
os.Exit(1)
36
}
37
}
Step 4: Run
producer.go
go run producer.go
Step 5: Create a new Go file called
consumer.go
consumer.go
1
package main
2
3
import (
4
"fmt"
5
"context"
6
"os"
7
"time"
8
9
"github.com/memphisdev/memphis.go"
10
)
11
12
func main() {
13
conn, err := memphis.Connect("BROKER_HOSTNAME",
14
"APPLICATION_TYPE_USERNAME",
15
memphis.Password("PASSWORD"), // depends on how Memphis deployed - default is connection token-based authentication
16
memphis.AccountId(123456789),
17
)
18
if err != nil {
19
os.Exit(1)
20
}
21
defer conn.Close()
22
23
consumer, err := conn.CreateConsumer("STATION_NAME", "CONSUMER_NAME", memphis.PullInterval(15*time.Second))
24
25
if err != nil {
26
fmt.Printf("Consumer creation failed: %v
27
", err)
28
os.Exit(1)
29
}
30
31
handler := func(msgs []*memphis.Msg, err error, ctx context.Context) {
32
if err != nil {
33
fmt.Printf("Fetch failed: %v
34
", err)
35
return
36
}
37
38
for _, msg := range msgs {
39
fmt.Println(string(msg.Data()))
40
msg.Ack()
41
headers := msg.GetHeaders()
42
fmt.Println(headers)
43
}
44
}
45
46
ctx := context.Background()
47
ctx = context.WithValue(ctx, "key", "value")
48
consumer.SetContext(ctx)
49
consumer.Consume(handler)
50
51
// The program will close the connection after 30 seconds,
52
// the message handler may be called after the connection closed
53
// so the handler may receive a timeout error
54
time.Sleep(30 * time.Second)
55
}
Step 6: Run
consumer.go
go run consumer.go
Step 1: Create an empty dir for the Python project
mkdir memphis-demo && \
cd memphis-demo
Step 2: In your project's directory, install Memphis Python SDK
pip3 install --upgrade memphis-py
Step 3: Create a new Python file called
producer.py
producer.py
1
from memphis import Memphis, Headers
2
from memphis.types import Retention, Storage
3
import asyncio
4
5
async def main():
6
try:
7
memphis = Memphis()
8
await memphis.connect(host="MEMPHIS_HOSTNAME", username="MEMPHIS_APPLICATION_USER", password="PASSWORD", account_id=ACCOUNT_ID)
9
producer = await memphis.producer(station_name="STATION_NAME", producer_name="PRODUCER_NAME")
10
headers = Headers()
11
headers.add("key", "value")
12
for i in range(5):
13
await producer.produce(bytearray('Message #'+str(i)+': Hello world', 'utf-8'), headers=headers) # you can send the message parameter as dict as well
14
15
except (MemphisError, MemphisConnectError, MemphisHeaderError, MemphisSchemaError) as e:
16
print(e)
17
18
finally:
19
await memphis.close()
20
21
if __name__ == '__main__':
22
asyncio.run(main())
Step 4: Run
producer.py
python3 producer.py
Step 5: Create a new Python file called
consumer.py
consumer.py
1
from memphis import Memphis, Headers
2
from memphis.types import Retention, Storage
3
import asyncio
4
5
async def main():
6
async def msg_handler(msgs, error, context):
7
try:
8
for msg in msgs:
9
print("message: ", msg.get_data())
10
await msg.ack()
11
headers = msg.get_headers()
12
if error:
13
print(error)
14
except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
15
print(e)
16
return
17
18
try:
19
memphis = Memphis()
20
await memphis.connect(host="MEMPHIS_HOSTNAME", username="MEMPHIS_APPLICATION_USER", password="PASSWORD", account_id=ACCOUNT_ID)
21
22
consumer = await memphis.consumer(station_name="STATION_NAME", consumer_name="CONSUMER_NAME", consumer_group="CONSUMER_GROUP_NAME")
23
consumer.set_context({"key": "value"})
24
consumer.consume(msg_handler)
25
# Keep your main thread alive so the consumer will keep receiving data
26
await asyncio.Event().wait()
27
28
except (MemphisError, MemphisConnectError) as e:
29
print(e)
30
31
finally:
32
await memphis.close()
33
34
if __name__ == '__main__':
35
asyncio.run(main())
Step 6: Run
consumer.py
python3 consumer.py
Step 1: Create an empty dir for the Node.js project
mkdir memphis-demo && \
cd memphis-demo
Step 2: Create a new Node project (If needed)
npm init -y
Step 3: Install memphis Node.js SDK
npm install memphis-dev
Step 4: Create a new .js file called
producer.js
producer.js
1
const { memphis } = require("memphis-dev");
2
3
(async function () {
4
let memphisConnection;
5
6
try {
7
memphisConnection = await memphis.connect({
8
host: "MEMPHIS_BROKER_HOSTNAME",
9
username: "APPLICATION_TYPE_USERNAME",
10
password: "PASSWORD",
11
accountId: ACCOUNT_ID
12
});
13
14
const producer = await memphisConnection.producer({
15
stationName: "STATION_NAME",
16
producerName: "PRODUCER_NAME",
17
});
18
19
const headers = memphis.headers();
20
headers.add("KEY", "VALUE");
21
await producer.produce({
22
message: Buffer.from("Message: Hello world"), // you can also send JS object - {}
23
headers: headers,
24
});
25
26
memphisConnection.close();
27
} catch (ex) {
28
console.log(ex);
29
if (memphisConnection) memphisConnection.close();
30
}
31
})();
Step 5: Run
producer.js
node producer.js
Step 6: Create a new .js file called
consumer.js
consumer.js
1
const { memphis } = require("memphis-dev");
2
3
(async function () {
4
let memphisConnection;
5
6
try {
7
memphisConnection = await memphis.connect({
8
host: "MEMPHIS_BROKER_HOSTNAME",
9
username: "APPLICATION_TYPE_USERNAME",
10
password: "PASSWORD",
11
accountId: ACCOUNT_ID
12
});
13
14
const consumer = await memphisConnection.consumer({
15
stationName: "STATION_NAME",
16
consumerName: "CONSUMER_NAME",
17
consumerGroup: "CONSUMER_GROUP_NAME",
18
});
19
20
consumer.setContext({ key: "value" });
21
consumer.on("message", (message, context) => {
22
console.log(message.getData().toString());
23
message.ack();
24
const headers = message.getHeaders();
25
});
26
27
consumer.on("error", (error) => {});
28
} catch (ex) {
29
console.log(ex);
30
if (memphisConnection) memphisConnection.close();
31
}
32
})();
Step 7: Run
consumer.js
node consumer.js
Step 1: Create an empty dir for the new project
mkdir memphis-demo && \
cd memphis-demo
Step 2: Install memphis Node.js SDK
dotnet add package Memphis.Client
Step 3: Create a new .cs file called
Producer.cs
Producer.cs
1
using System.Collections.Specialized;
2
using System.Text;
3
using Memphis.Client;
4
using Memphis.Client.Producer;
5
6
namespace Producer
7
{
8
class ProducerApp
9
{
10
public static async Task Main(string[] args)
11
{
12
try
13
{
14
var options = MemphisClientFactory.GetDefaultOptions();
15
options.Host = "aws-us-east-1.cloud.memphis.dev";
16
options.Username = "client_type_username";
17
options.Password = "client_type_password";
18
var client = await MemphisClientFactory.CreateClient(options);
19
options.AccountId = 123456789;
20
21
var producer = await client.CreateProducer(new MemphisProducerOptions
22
{
23
StationName = "station_name",
24
ProducerName = "producer_name",
25
});
26
27
var commonHeaders = new NameValueCollection();
28
commonHeaders.Add("key-1", "value-1");
29
30
for (int i = 0; i < 10_000000; i++)
31
{
32
await Task.Delay(1_000);
33
var text = $"Message #{i}: Welcome to Memphis";
34
await producer.ProduceAsync(Encoding.UTF8.GetBytes(text), commonHeaders);
35
Console.WriteLine($"Message #{i} sent successfully");
36
}
37
38
await producer.DestroyAsync();
39
}
40
catch (Exception ex)
41
{
42
Console.Error.WriteLine("Exception: " + ex.Message);
43
Console.Error.WriteLine(ex);
44
}
45
}
46
}
47
}
48
Step 4: Create a new .ts file called
Consumer.cs
Consumer.cs
1
using System.Text;
2
using Memphis.Client;
3
using Memphis.Client.Consumer;
4
5
namespace Consumer
6
{
7
class ConsumerApp
8
{
9
public static async Task Main(string[] args)
10
{
11
try
12
{
13
var options = MemphisClientFactory.GetDefaultOptions();
14
options.Host = "aws-us-east-1.cloud.memphis.dev";
15
options.Username = "client_type_username";
16
options.Password = "client_type_password";
17
var client = await MemphisClientFactory.CreateClient(options);
18
options.AccountId = 123456789;
19
20
var consumer = await client.CreateConsumer(new MemphisConsumerOptions
21
{
22
StationName = "STATION_NAME",
23
ConsumerName = "CONSUMER_NAME",
24
ConsumerGroup = "CONSUMER_GROUP_NAME",
25
});
26
27
consumer.MessageReceived += (sender, args) =>
28
{
29
if (args.Exception != null)
30
{
31
Console.Error.WriteLine(args.Exception);
32
return;
33
}
34
35
foreach (var msg in args.MessageList)
36
{
37
//print message itself
38
Console.WriteLine("Received data: " + Encoding.UTF8.GetString(msg.GetData()));
39
40
41
// print message headers
42
foreach (var headerKey in msg.GetHeaders().Keys)
43
{
44
Console.WriteLine(
45
$"Header Key: {headerKey}, value: {msg.GetHeaders()[headerKey.ToString()]}");
46
}
47
48
Console.WriteLine("---------");
49
msg.Ack();
50
}
51
Console.WriteLine("destroyed");
52
};
53
54
consumer.ConsumeAsync();
55
56
// Wait 10 seconds, consumer starts to consume, if you need block main thread use await keyword.
57
await Task.Delay(10_000);
58
await consumer.DestroyAsync();
59
}
60
catch (Exception ex)
61
{
62
Console.Error.WriteLine("Exception: " + ex.Message);
63
Console.Error.WriteLine(ex);
64
}
65
}
66
}
67
}
Step 1: Create an empty dir for the Go project
mkdir memphis-demo && \
cd memphis-demo
Step 2: Init the newly created project
go mod init memphis-demo
Step 3: In your project's directory, install Memphis Go SDK
go get github.com/memphisdev/memphis.go
Step 4: Create a new Go file called
producer.go
producer.go
1
package main
2
3
import (
4
"fmt"
5
"os"
6
7
"github.com/memphisdev/memphis.go"
8
)
9
10
func main() {
11
conn, err := memphis.Connect("BROKER_HOSTNAME",
12
"APPLICATION_TYPE_USERNAME",
13
memphis.Password("PASSWORD"), // depends on how Memphis deployed - default is connection token-based authentication
14
memphis.AccountId(123456789),
15
)
16
if err != nil {
17
os.Exit(1)
18
}
19
defer conn.Close()
20
p, err := conn.CreateProducer("STATION_NAME", "PRODUCER_NAME")
21
22
hdrs := memphis.Headers{}
23
hdrs.New()
24
err = hdrs.Add("key", "value")
25
26
if err != nil {
27
fmt.Errorf("Header failed: %v", err)
28
os.Exit(1)
29
}
30
31
err = p.Produce([]byte("You have a message!"), memphis.MsgHeaders(hdrs))
32
33
if err != nil {
34
fmt.Errorf("Produce failed: %v", err)
35
os.Exit(1)
36
}
37
}
Step 4: Run
producer.go
go run producer.go
Step 5: Create a new Go file called
consumer.go
consumer.go
1
package main
2
3
import (
4
"fmt"
5
"context"
6
"os"
7
"time"
8
9
"github.com/memphisdev/memphis.go"
10
)
11
12
func main() {
13
conn, err := memphis.Connect("BROKER_HOSTNAME",
14
"APPLICATION_TYPE_USERNAME",
15
memphis.Password("PASSWORD"), // depends on how Memphis deployed - default is connection token-based authentication
16
memphis.AccountId(123456789),
17
)
18
if err != nil {
19
os.Exit(1)
20
}
21
defer conn.Close()
22
23
consumer, err := conn.CreateConsumer("STATION_NAME", "CONSUMER_NAME", memphis.PullInterval(15*time.Second))
24
25
if err != nil {
26
fmt.Printf("Consumer creation failed: %v
27
", err)
28
os.Exit(1)
29
}
30
31
handler := func(msgs []*memphis.Msg, err error, ctx context.Context) {
32
if err != nil {
33
fmt.Printf("Fetch failed: %v
34
", err)
35
return
36
}
37
38
for _, msg := range msgs {
39
fmt.Println(string(msg.Data()))
40
msg.Ack()
41
headers := msg.GetHeaders()
42
fmt.Println(headers)
43
}
44
}
45
46
ctx := context.Background()
47
ctx = context.WithValue(ctx, "key", "value")
48
consumer.SetContext(ctx)
49
consumer.Consume(handler)
50
51
// The program will close the connection after 30 seconds,
52
// the message handler may be called after the connection closed
53
// so the handler may receive a timeout error
54
time.Sleep(30 * time.Second)
55
}
Step 6: Run
consumer.go
go run consumer.go
Last modified 1mo ago