Links

Getting Started

Accelerate Development of Real-Time Applications with Memphis.dev Cloud
Welcome to our getting started guide with Memphis.dev cloud!
In today's fast-paced and interconnected world, event streaming is the engine behind communication. Whether you are an individual, a small business, or an enterprise, harnessing the power of the Memphis.dev Cloud can bring unparalleled scalability, flexibility, and efficiency to your operations.
This section aims to provide a solid foundation to embark on your streaming journey with Memphis.dev, offering clear explanations, step-by-step instructions, and valuable insights into the key concepts, services, and best practices that will empower you to make the most of the Memphis platform.
Let's dive in and unlock the true potential of Memphis.dev together!

Create an account

Head to the cloud signup page to create an account.
Memphis.dev cloud signup

Walkthrough

To get you up and running quickly, we created the following tutorial to guide you through the different key components. Feel free to skip it if you feel comfortable enough.

Main overview

Upper bar -
  1. 1.
    Stations: Current amount of stations.
  2. 2.
    Slow consumption stations: Stations with a growing latency or a "lag" in one or more consumer groups.
  3. 3.
    Stored messages: Total amount of held events.
  4. 4.
    Dead-letter messages: Total amount of dead-letter messages across the system.
  5. 5.
    Hostname: Broker hostname to be used when connecting to Memphis using clients.
  6. 6.
    Account ID: Account ID to be used when connecting to Memphis using clients.

Hello world

Go
Python
Node.js
TypeScript
.NET
Full code examples can be found in this repo
Step 1: Create an empty dir for the Go project
mkdir memphis-demo && \
cd memphis-demo
Step 2: Init the newly created project
go mod init memphis-demo
Step 3: In your project's directory, install Memphis Go SDK
go get github.com/memphisdev/memphis.go
Step 4: Create a new Go file called producer.go
producer.go
1
package main
2
3
import (
4
"fmt"
5
"os"
6
7
"github.com/memphisdev/memphis.go"
8
)
9
10
func main() {
11
conn, err := memphis.Connect("BROKER_HOSTNAME",
12
"APPLICATION_TYPE_USERNAME",
13
memphis.Password("PASSWORD"), // depends on how Memphis deployed - default is connection token-based authentication
14
memphis.AccountId(123456789),
15
)
16
if err != nil {
17
os.Exit(1)
18
}
19
defer conn.Close()
20
p, err := conn.CreateProducer("STATION_NAME", "PRODUCER_NAME")
21
22
hdrs := memphis.Headers{}
23
hdrs.New()
24
err = hdrs.Add("key", "value")
25
26
if err != nil {
27
fmt.Errorf("Header failed: %v", err)
28
os.Exit(1)
29
}
30
31
err = p.Produce([]byte("You have a message!"), memphis.MsgHeaders(hdrs))
32
33
if err != nil {
34
fmt.Errorf("Produce failed: %v", err)
35
os.Exit(1)
36
}
37
}
Step 4: Run producer.go
go run producer.go
Step 5: Create a new Go file called consumer.go
consumer.go
1
package main
2
3
import (
4
"fmt"
5
"context"
6
"os"
7
"time"
8
9
"github.com/memphisdev/memphis.go"
10
)
11
12
func main() {
13
conn, err := memphis.Connect("BROKER_HOSTNAME",
14
"APPLICATION_TYPE_USERNAME",
15
memphis.Password("PASSWORD"), // depends on how Memphis deployed - default is connection token-based authentication
16
memphis.AccountId(123456789),
17
)
18
if err != nil {
19
os.Exit(1)
20
}
21
defer conn.Close()
22
23
consumer, err := conn.CreateConsumer("STATION_NAME", "CONSUMER_NAME", memphis.PullInterval(15*time.Second))
24
25
if err != nil {
26
fmt.Printf("Consumer creation failed: %v
27
", err)
28
os.Exit(1)
29
}
30
31
handler := func(msgs []*memphis.Msg, err error, ctx context.Context) {
32
if err != nil {
33
fmt.Printf("Fetch failed: %v
34
", err)
35
return
36
}
37
38
for _, msg := range msgs {
39
fmt.Println(string(msg.Data()))
40
msg.Ack()
41
headers := msg.GetHeaders()
42
fmt.Println(headers)
43
}
44
}
45
46
ctx := context.Background()
47
ctx = context.WithValue(ctx, "key", "value")
48
consumer.SetContext(ctx)
49
consumer.Consume(handler)
50
51
// The program will close the connection after 30 seconds,
52
// the message handler may be called after the connection closed
53
// so the handler may receive a timeout error
54
time.Sleep(30 * time.Second)
55
}
Step 6: Run consumer.go
go run consumer.go
Full code examples can be found in this repo
Step 1: Create an empty dir for the Python project
mkdir memphis-demo && \
cd memphis-demo
Step 2: In your project's directory, install Memphis Python SDK
pip3 install --upgrade memphis-py
Step 3: Create a new Python file called producer.py
producer.py
1
from memphis import Memphis, Headers
2
from memphis.types import Retention, Storage
3
import asyncio
4
5
async def main():
6
try:
7
memphis = Memphis()
8
await memphis.connect(host="MEMPHIS_HOSTNAME", username="MEMPHIS_APPLICATION_USER", password="PASSWORD", account_id=ACCOUNT_ID)
9
producer = await memphis.producer(station_name="STATION_NAME", producer_name="PRODUCER_NAME")
10
headers = Headers()
11
headers.add("key", "value")
12
for i in range(5):
13
await producer.produce(bytearray('Message #'+str(i)+': Hello world', 'utf-8'), headers=headers) # you can send the message parameter as dict as well
14
15
except (MemphisError, MemphisConnectError, MemphisHeaderError, MemphisSchemaError) as e:
16
print(e)
17
18
finally:
19
await memphis.close()
20
21
if __name__ == '__main__':
22
asyncio.run(main())
Step 4: Run producer.py
python3 producer.py
Step 5: Create a new Python file called consumer.py
consumer.py
1
from memphis import Memphis, Headers
2
from memphis.types import Retention, Storage
3
import asyncio
4
5
async def main():
6
async def msg_handler(msgs, error, context):
7
try:
8
for msg in msgs:
9
print("message: ", msg.get_data())
10
await msg.ack()
11
headers = msg.get_headers()
12
if error:
13
print(error)
14
except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
15
print(e)
16
return
17
18
try:
19
memphis = Memphis()
20
await memphis.connect(host="MEMPHIS_HOSTNAME", username="MEMPHIS_APPLICATION_USER", password="PASSWORD", account_id=ACCOUNT_ID)
21
22
consumer = await memphis.consumer(station_name="STATION_NAME", consumer_name="CONSUMER_NAME", consumer_group="CONSUMER_GROUP_NAME")
23
consumer.set_context({"key": "value"})
24
consumer.consume(msg_handler)
25
# Keep your main thread alive so the consumer will keep receiving data
26
await asyncio.Event().wait()
27
28
except (MemphisError, MemphisConnectError) as e:
29
print(e)
30
31
finally:
32
await memphis.close()
33
34
if __name__ == '__main__':
35
asyncio.run(main())
Step 6: Run consumer.py
python3 consumer.py
Full code examples can be found in this repo
Step 1: Create an empty dir for the Node.js project
mkdir memphis-demo && \
cd memphis-demo
Step 2: Create a new Node project (If needed)
npm init -y
Step 3: Install memphis Node.js SDK
npm install memphis-dev
Step 4: Create a new .js file called producer.js
producer.js
1
const { memphis } = require("memphis-dev");
2
3
(async function () {
4
let memphisConnection;
5
6
try {
7
memphisConnection = await memphis.connect({
8
host: "MEMPHIS_BROKER_HOSTNAME",
9
username: "APPLICATION_TYPE_USERNAME",
10
password: "PASSWORD",
11
accountId: ACCOUNT_ID
12
});
13
14
const producer = await memphisConnection.producer({
15
stationName: "STATION_NAME",
16
producerName: "PRODUCER_NAME",
17
});
18
19
const headers = memphis.headers();
20
headers.add("KEY", "VALUE");
21
await producer.produce({
22
message: Buffer.from("Message: Hello world"), // you can also send JS object - {}
23
headers: headers,
24
});
25
26
memphisConnection.close();
27
} catch (ex) {
28
console.log(ex);
29
if (memphisConnection) memphisConnection.close();
30
}
31
})();
Step 5: Run producer.js
node producer.js
Step 6: Create a new .js file called consumer.js
consumer.js
1
const { memphis } = require("memphis-dev");
2
3
(async function () {
4
let memphisConnection;
5
6
try {
7
memphisConnection = await memphis.connect({
8
host: "MEMPHIS_BROKER_HOSTNAME",
9
username: "APPLICATION_TYPE_USERNAME",
10
password: "PASSWORD",
11
accountId: ACCOUNT_ID
12
});
13
14
const consumer = await memphisConnection.consumer({
15
stationName: "STATION_NAME",
16
consumerName: "CONSUMER_NAME",
17
consumerGroup: "CONSUMER_GROUP_NAME",
18
});
19
20
consumer.setContext({ key: "value" });
21
consumer.on("message", (message, context) => {
22
console.log(message.getData().toString());
23
message.ack();
24
const headers = message.getHeaders();
25
});
26
27
consumer.on("error", (error) => {});
28
} catch (ex) {
29
console.log(ex);
30
if (memphisConnection) memphisConnection.close();
31
}
32
})();
Step 7: Run consumer.js
node consumer.js
Full code examples can be found in this repo
Step 1: Create an empty dir for the new project
mkdir memphis-demo && \
cd memphis-demo
Step 2: Install memphis Node.js SDK
dotnet add package Memphis.Client
Step 3: Create a new .cs file called Producer.cs
Producer.cs
1
using System.Collections.Specialized;
2
using System.Text;
3
using Memphis.Client;
4
using Memphis.Client.Producer;
5
6
namespace Producer
7
{
8
class ProducerApp
9
{
10
public static async Task Main(string[] args)
11
{
12
try
13
{
14
var options = MemphisClientFactory.GetDefaultOptions();
15
options.Host = "aws-us-east-1.cloud.memphis.dev";
16
options.Username = "client_type_username";
17
options.Password = "client_type_password";
18
var client = await MemphisClientFactory.CreateClient(options);
19
options.AccountId = 123456789;
20
21
var producer = await client.CreateProducer(new MemphisProducerOptions
22
{
23
StationName = "station_name",
24
ProducerName = "producer_name",
25
});
26
27
var commonHeaders = new NameValueCollection();
28
commonHeaders.Add("key-1", "value-1");
29
30
for (int i = 0; i < 10_000000; i++)
31
{
32
await Task.Delay(1_000);
33
var text = $"Message #{i}: Welcome to Memphis";
34
await producer.ProduceAsync(Encoding.UTF8.GetBytes(text), commonHeaders);
35
Console.WriteLine($"Message #{i} sent successfully");
36
}
37
38
await producer.DestroyAsync();
39
}
40
catch (Exception ex)
41
{
42
Console.Error.WriteLine("Exception: " + ex.Message);
43
Console.Error.WriteLine(ex);
44
}
45
}
46
}
47
}
48
Step 4: Create a new .ts file called Consumer.cs
Consumer.cs
1
using System.Text;
2
using Memphis.Client;
3
using Memphis.Client.Consumer;
4
5
namespace Consumer
6
{
7
class ConsumerApp
8
{
9
public static async Task Main(string[] args)
10
{
11
try
12
{
13
var options = MemphisClientFactory.GetDefaultOptions();
14
options.Host = "aws-us-east-1.cloud.memphis.dev";
15
options.Username = "client_type_username";
16
options.Password = "client_type_password";
17
var client = await MemphisClientFactory.CreateClient(options);
18
options.AccountId = 123456789;
19
20
var consumer = await client.CreateConsumer(new MemphisConsumerOptions
21
{
22
StationName = "STATION_NAME",
23
ConsumerName = "CONSUMER_NAME",
24
ConsumerGroup = "CONSUMER_GROUP_NAME",
25
});
26
27
consumer.MessageReceived += (sender, args) =>
28
{
29
if (args.Exception != null)
30
{
31
Console.Error.WriteLine(args.Exception);
32
return;
33
}
34
35
foreach (var msg in args.MessageList)
36
{
37
//print message itself
38
Console.WriteLine("Received data: " + Encoding.UTF8.GetString(msg.GetData()));
39
40
41
// print message headers
42
foreach (var headerKey in msg.GetHeaders().Keys)
43
{
44
Console.WriteLine(
45
$"Header Key: {headerKey}, value: {msg.GetHeaders()[headerKey.ToString()]}");
46
}
47
48
Console.WriteLine("---------");
49
msg.Ack();
50
}
51
Console.WriteLine("destroyed");
52
};
53
54
consumer.ConsumeAsync();
55
56
// Wait 10 seconds, consumer starts to consume, if you need block main thread use await keyword.
57
await Task.Delay(10_000);
58
await consumer.DestroyAsync();
59
}
60
catch (Exception ex)
61
{
62
Console.Error.WriteLine("Exception: " + ex.Message);
63
Console.Error.WriteLine(ex);
64
}
65
}
66
}
67
}
Full code examples can be found in this repo
Step 1: Create an empty dir for the Go project
mkdir memphis-demo && \
cd memphis-demo
Step 2: Init the newly created project
go mod init memphis-demo
Step 3: In your project's directory, install Memphis Go SDK
go get github.com/memphisdev/memphis.go
Step 4: Create a new Go file called producer.go
producer.go
1
package main
2
3
import (
4
"fmt"
5
"os"
6
7
"github.com/memphisdev/memphis.go"
8
)
9
10
func main() {
11
conn, err := memphis.Connect("BROKER_HOSTNAME",
12
"APPLICATION_TYPE_USERNAME",
13
memphis.Password("PASSWORD"), // depends on how Memphis deployed - default is connection token-based authentication
14
memphis.AccountId(123456789),
15
)
16
if err != nil {
17
os.Exit(1)
18
}
19
defer conn.Close()
20
p, err := conn.CreateProducer("STATION_NAME", "PRODUCER_NAME")
21
22
hdrs := memphis.Headers{}
23
hdrs.New()
24
err = hdrs.Add("key", "value")
25
26
if err != nil {
27
fmt.Errorf("Header failed: %v", err)
28
os.Exit(1)
29
}
30
31
err = p.Produce([]byte("You have a message!"), memphis.MsgHeaders(hdrs))
32
33
if err != nil {
34
fmt.Errorf("Produce failed: %v", err)
35
os.Exit(1)
36
}
37
}
Step 4: Run producer.go
go run producer.go
Step 5: Create a new Go file called consumer.go
consumer.go
1
package main
2
3
import (
4
"fmt"
5
"context"
6
"os"
7
"time"
8
9
"github.com/memphisdev/memphis.go"
10
)
11
12
func main() {
13
conn, err := memphis.Connect("BROKER_HOSTNAME",
14
"APPLICATION_TYPE_USERNAME",
15
memphis.Password("PASSWORD"), // depends on how Memphis deployed - default is connection token-based authentication
16
memphis.AccountId(123456789),
17
)
18
if err != nil {
19
os.Exit(1)
20
}
21
defer conn.Close()
22
23
consumer, err := conn.CreateConsumer("STATION_NAME", "CONSUMER_NAME", memphis.PullInterval(15*time.Second))
24
25
if err != nil {
26
fmt.Printf("Consumer creation failed: %v
27
", err)
28
os.Exit(1)
29
}
30
31
handler := func(msgs []*memphis.Msg, err error, ctx context.Context) {
32
if err != nil {
33
fmt.Printf("Fetch failed: %v
34
", err)
35
return
36
}
37
38
for _, msg := range msgs {
39
fmt.Println(string(msg.Data()))
40
msg.Ack()
41
headers := msg.GetHeaders()
42
fmt.Println(headers)
43
}
44
}
45
46
ctx := context.Background()
47
ctx = context.WithValue(ctx, "key", "value")
48
consumer.SetContext(ctx)
49
consumer.Consume(handler)
50
51
// The program will close the connection after 30 seconds,
52
// the message handler may be called after the connection closed
53
// so the handler may receive a timeout error
54
time.Sleep(30 * time.Second)
55
}
Step 6: Run consumer.go
go run consumer.go