Quick start
Create your first station, producer, and consumer in your preferred language.
Last updated
Was this helpful?
Create your first station, producer, and consumer in your preferred language.
Last updated
Was this helpful?
To learn by practice, you can find a sample application .
Step 1: Sign up for Memphis Cloud .
Step 2: Hello world
For Kubernetes
Stable -
helm repo add memphis https://k8s.memphis.dev/charts/ --force-update &&
helm install memphis memphis/memphis --create-namespace --namespace memphis --wait --version=1.2.2
Latest -
helm repo add memphis https://k8s.memphis.dev/charts/ --force-update &&
helm install memphis memphis/memphis --create-namespace --namespace memphis --wait
More information can be found in the documentation.
Docker compose (Syntax for v2)
Stable -
curl -s https://memphisdev.github.io/memphis-docker/docker-compose.yml -o docker-compose.yml && docker compose -f docker-compose.yml -p memphis up
Latest -
curl -s https://memphisdev.github.io/memphis-docker/docker-compose-latest.yml -o docker-compose-latest.yml && docker compose -f docker-compose-latest.yml -p memphis up
More information can be found in the documentation.
Step 1: Create an empty dir for the Node.js project
mkdir memphis-demo && \
cd memphis-demo
Step 2: Create a new Node project (If needed)
npm init -y
Step 3: Install memphis Node.js SDK
npm install memphis-dev
Step 4: Create a new .js file called producer.js
const { memphis } = require("memphis-dev");
(async function () {
let memphisConnection;
try {
memphisConnection = await memphis.connect({
host: "MEMPHIS_BROKER_HOSTNAME",
username: "APPLICATION_TYPE_USERNAME",
password: "PASSWORD",
});
const producer = await memphisConnection.producer({
stationName: "STATION_NAME",
producerName: "PRODUCER_NAME",
});
const headers = memphis.headers();
headers.add("KEY", "VALUE");
await producer.produce({
message: Buffer.from("Message: Hello world"), // you can also send JS object - {}
headers: headers,
});
memphisConnection.close();
} catch (ex) {
console.log(ex);
if (memphisConnection) memphisConnection.close();
}
})();
Step 5: Run producer.js
node producer.js
Step 6: Create a new .js file called consumer.js
const { memphis } = require("memphis-dev");
(async function () {
let memphisConnection;
try {
memphisConnection = await memphis.connect({
host: "MEMPHIS_BROKER_HOSTNAME",
username: "APPLICATION_TYPE_USERNAME",
password: "PASSWORD",
});
const consumer = await memphisConnection.consumer({
stationName: "STATION_NAME",
consumerName: "CONSUMER_NAME",
consumerGroup: "CONSUMER_GROUP_NAME",
});
consumer.setContext({ key: "value" }); // Optional
let messages = await consumer.fetch(); // Fetches 10 messages
for (let message of messages){
const messageObject = JSON.parse(message.getData().toString());
// Do something with the message
console.log(messageObject["Hello"]);
message.ack();
}
memphisConnection.close();
} catch (ex) {
console.log(ex);
if (memphisConnection) memphisConnection.close();
}
})();
Step 7: Run consumer.js
node consumer.js
Step 1: Create an empty dir for the TypeScript project
mkdir memphis-demo && \
cd memphis-demo
Step 2: Create a new Node project (If needed)
npm init -y
Step 3: Install memphis Node.js SDK
npm install memphis-dev
Step 4: Create a new .ts file called producer.ts
import { memphis, Memphis } from "memphis-dev";
(async function () {
let memphisConnection: Memphis;
try {
memphisConnection = await memphis.connect({
host: "MEMPHIS_BROKER_HOSTNAME",
username: "APPLICATION_TYPE_USERNAME",
password: "PASSWORD",
});
const producer = await memphisConnection.producer({
stationName: "STATION_NAME",
producerName: "PRODUCER_NAME",
});
const headers = memphis.headers();
headers.add("key", "value");
await producer.produce({
message: Buffer.from("Message: Hello world"), // you can also send JS object - {}
headers: headers,
});
memphisConnection.close();
} catch (ex) {
console.log(ex);
if (memphisConnection) memphisConnection.close();
}
})();
Step 5: Run producer.ts
node producer.ts
Step 6: Create a new .ts file called consumer.ts
import { memphis, Memphis, Message } from "memphis-dev";
(async function () {
let memphisConnection: Memphis;
try {
memphisConnection = await memphis.connect({
host: "MEMPHIS_BROKER_HOSTNAME",
username: "APPLICATION_TYPE_USERNAME",
password: "PASSWORD",
});
const consumer = await memphisConnection.consumer({
stationName: "STATION_NAME",
consumerName: "CONSUMER_NAME",
consumerGroup: "CONSUMER_GROUP_NAME",
});
consumer.setContext({ key: "value" });
let messages: Message[] = await consumer.fetch(); // Fetches 10 messages
for (let message of messages){
const messageObject: Record<string, any> = JSON.parse(message.getData().toString());
// Do something with the message
console.log(messageObject["Hello"]);
message.ack();
}
memphisConnection.close();
} catch (ex) {
console.log(ex);
if (memphisConnection) memphisConnection.close();
}
})();
Step 7: Run consumer.ts
node consumer.ts
Step 1: Create an empty dir for the Go project
mkdir memphis-demo && \
cd memphis-demo
Step 2: Init the newly created project
go mod init memphis-demo
Step 3: In your project's directory, install Memphis Go SDK
go get github.com/memphisdev/memphis.go
Step 4: Create a new Go file called producer.go
package main
import (
"fmt"
"os"
"github.com/memphisdev/memphis.go"
)
func main() {
conn, err := memphis.Connect("MEMPHIS_HOSTNAME", "MEMPHIS_APPLICATION_USER", memphis.Password("PASSWORD"),)
if err != nil {
os.Exit(1)
}
defer conn.Close()
p, err := conn.CreateProducer("STATION_NAME", "PRODUCER_NAME")
hdrs := memphis.Headers{}
hdrs.New()
err = hdrs.Add("key", "value")
if err != nil {
fmt.Errorf("Header failed: %v", err)
os.Exit(1)
}
err = p.Produce([]byte("You have a message!"), memphis.MsgHeaders(hdrs))
if err != nil {
fmt.Errorf("Produce failed: %v", err)
os.Exit(1)
}
}
Step 4: Run producer.go
go run producer.go
Step 5: Create a new Go file called consumer.go
package main
import (
"fmt"
"context"
"os"
"time"
"github.com/memphisdev/memphis.go"
)
func main() {
conn, err := memphis.Connect("MEMPHIS_HOSTNAME", "MEMPHIS_APPLICATION_USER", memphis.Password("PASSWORD"),)
if err != nil {
os.Exit(1)
}
defer conn.Close()
consumer, err := conn.CreateConsumer("STATION_NAME", "CONSUMER_NAME", memphis.PullInterval(15*time.Second))
if err != nil {
fmt.Printf("Consumer creation failed: %v", err)
os.Exit(1)
}
consumer.SetContext(ctx)
messages, err := consumer.Fetch()
var msg_map map[string]any
for _, message := range messages{
err = json.Unmarshal(message.Data(), &msg_map)
if err != nil{
fmt.Print(err)
continue
}
// Do something with the message
err = message.Ack()
if err != nil{
fmt.Print(err)
continue
}
}
// The program will close the connection after 30 seconds,
// the message handler may be called after the connection closed
// so the handler may receive a timeout error
time.Sleep(30 * time.Second)
}
Step 6: Run consumer.go
go run consumer.go
Step 1: Create an empty dir for the Python project
mkdir memphis-demo && \
cd memphis-demo
Step 2: In your project's directory, install Memphis Python SDK
pip3 install --upgrade memphis-py
Step 3: Create a new Python file called producer.py
from memphis import Memphis, Headers
from memphis.types import Retention, Storage
import asyncio
async def main():
try:
memphis = Memphis()
await memphis.connect(host="localhost", username="root", password="memphis")
producer = await memphis.producer(station_name="memphis-test", producer_name="producer-test")
await producer.produce(bytearray('Hello world', 'utf-8')) # you can send the message parameter as dict as well
except Exception as e:
print(e)
finally:
await memphis.close()
if __name__ == '__main__':
asyncio.run(main())
Step 4: Run producer.py
python3 producer.py
Step 5: Create a new Python file called consumer.py
from memphis import Memphis, Headers
from memphis.types import Retention, Storage
from memphis.message import Message
import asyncio
async def main():
try:
memphis = Memphis()
await memphis.connect(host="MEMPHIS_HOSTNAME", username="MEMPHIS_APPLICATION_USER", password="PASSWORD")
consumer = await memphis.consumer(station_name="STATION_NAME", consumer_name="CONSUMER_NAME", consumer_group="CONSUMER_GROUP_NAME")
messages: list[Message] = await consumer.fetch() # Type-hint the return here for LSP integration
for consumed_message in messages:
msg_data = json.loads(consumed_message.get_data())
# Do something with the message data
await consumed_message.ack()
except (MemphisError, MemphisConnectError) as e:
print(e)
finally:
await memphis.close()
if __name__ == '__main__':
asyncio.run(main())
Step 6: Run consumer.py
python3 consumer.py
Step 1: Create a new C# project through Visual Studio or using the dotnet CLI
dotnet new console -n MyC#Project
If you are using top level statements, you will have to create two of these projects.
Step 2: In your project's directory(s), install Memphis C# SDK
dotnet add package Memphis.Client -v ${MEMPHIS_CLIENT_VERSION}
Step 3: In your Producer file/project copy this quickstart inside of it
using Memphis.Client;
using System.Collections.Specialized;
using System.Text;
using System.Text.Json;
try
{
var options = MemphisClientFactory.GetDefaultOptions();
options.Host = "aws-us-east-1.cloud.memphis.dev";
options.AccountId = int.Parse(Environment.GetEnvironmentVariable("memphis_account_id"));
options.Username = "test_user";
options.Password = Environment.GetEnvironmentVariable("memphis_pass");
var memphisClient = await MemphisClientFactory.CreateClient(options);
var producer = await memphisClient.CreateProducer(
new Memphis.Client.Producer.MemphisProducerOptions
{
StationName = "test_station",
ProducerName = "producer"
});
Message message = new()
{
Hello = "World!"
};
var headers = new NameValueCollection();
for (int i = 0; i < 3; i++)
{
var msgBytes = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message));
await producer.ProduceAsync(msgBytes, headers);
}
memphisClient.Dispose();
}
catch (Exception ex)
{
Console.Error.WriteLine(ex.Message);
}
public class Message
{
public string Hello { get; set; }
}
Step 4: Run producer.cs
dotnet run
Step 5: Create a new C# project/file called consumer.cs
using Memphis.Client;
using Memphis.Client.Core;
using System.Text.Json;
try
{
var options = MemphisClientFactory.GetDefaultOptions();
options.Host = "aws-us-east-1.cloud.memphis.dev";
options.AccountId = int.Parse(Environment.GetEnvironmentVariable("memphis_account_id"));
options.Username = "test_user";
options.Password = Environment.GetEnvironmentVariable("memphis_pass");
var memphisClient = await MemphisClientFactory.CreateClient(options);
var consumer = await memphisClient.CreateConsumer(
new Memphis.Client.Consumer.MemphisConsumerOptions
{
StationName = "test_station",
ConsumerName = "consumer"
});
var messages = consumer.Fetch(3, false);
foreach (MemphisMessage message in messages)
{
var messageData = message.GetData();
var messageOBJ = JsonSerializer.Deserialize<Message>(messageData);
// Do something with the message here
Console.WriteLine(JsonSerializer.Serialize(messageOBJ));
message.Ack();
}
memphisClient.Dispose();
}
catch (Exception ex)
{
Console.Error.WriteLine(ex.Message);
}
public class Message
{
public string Hello { get; set; }
}
Step 6: Run consumer.cs
dotnet run
Producing messages to Memphis via REST API can be implemented using any REST-supported language like Go, Python, Java, Node.js, .NET, etc...
For the following tutorial, we will use Node.js .
Step 1: Create an empty dir for the REST API project
mkdir memphis-demo && \
cd memphis-demo
Step 2: Create a new Node project (If needed)
npm init -y
Step 3: Generate a new JWT token generate.js
var axios = require("axios");
var data = JSON.stringify({
username: "APPLICATION_TYPE_USERNAME",
password: "PASSWORD",
token_expiry_in_minutes: 123,
refresh_token_expiry_in_minutes: 10000092,
});
var config = {
method: "post",
url: "localhost:4444",
headers: {
"Content-Type": "application/json",
},
data: data,
};
axios(config)
.then(function (response) {
console.log(JSON.stringify(response.data));
})
.catch(function (error) {
console.log(error);
});
Step 4: Run generate.js
and copy the returned JWT
node generate.js
Step 5: Create a new file called producer.js
var axios = require("axios");
var data = JSON.stringify({
message: "New Message",
});
var config = {
method: "post",
url: "http://localhost:4444/stations/hps/produce/single",
headers: {
Authorization: "Bearer <jwt>",
"Content-Type": "application/json",
},
data: data,
};
axios(config)
.then(function (response) {
console.log(JSON.stringify(response.data));
})
.catch(function (error) {
console.log(error);
});
More code examples