Getting Started Accelerate Development of Real-Time Applications with Memphis.dev Cloud
Welcome to our getting started guide with Memphis.dev cloud!
In today's fast-paced and interconnected world, event streaming is the engine behind communication. Whether you are an individual, a small business, or an enterprise, harnessing the power of the Memphis.dev Cloud can bring unparalleled scalability, flexibility, and efficiency to your operations.
This section aims to provide a solid foundation to embark on your streaming journey with Memphis.dev, offering clear explanations, step-by-step instructions, and valuable insights into the key concepts, services, and best practices that will empower you to make the most of the Memphis platform.
Memphis.dev is fully compliant with GDPR and SOC2 type I & II regulations. Our team undergoes monthly/quarterly/annually regular training to ensure that Memphis.dev policies and activities are updated with the latest data protection regulations, and we also perform regular audits and assessments to ensure that we remain compliant with all relevant standards. We value our users’ privacy and security, and we are committed to maintaining the highest level of data protection possible. More in our Legal Hub .
Why migrate from self-hosted to Cloud?
Memphis.dev Cloud offers several advantages over self-hosted Memphis.dev deployments. These benefits include:
Seamless Scalability: Experience limitless growth potential with Memphis.dev Cloud. Easily scale to meet your evolving needs without the hassle of managing hardware.
Enhanced Performance: Enjoy lightning-fast response times and superior performance with our optimized cloud infrastructure.
Automatic Updates: Stay at the forefront of Memphis.dev with automatic updates and enhancements. Our cloud version ensures you're always running the latest and greatest version of Memphis.
Effortless Maintenance: Leave server maintenance and monitoring to us. Focus on what matters most, while we handle the backend tasks, ensuring your nervous system is always up and running.
Data Security: Protect your valuable data with our top-notch cloud security measures. Benefit from data encryption, regular backups, and industry-standard security protocols.
Global Accessibility: Access your data from anywhere in the world. Memphis.dev cloud ensures that your data is accessible whenever and wherever your apps need it.
Let's dive in and unlock the true potential of Memphis.dev together!
Create an account
Head to the Cloud's Sign-up page to create an account.
Walkthrough
To get you up and running quickly, we created the following tutorial to guide you through the different key components. Feel free to skip it if you feel comfortable enough.
Main overview
Upper bar -
Stations: Current amount of stations.
Slow consumption stations: Stations with a growing latency or a "lag" in one or more consumer groups.
Stored messages: Total amount of held events.
Dead-letter messages: Total amount of dead-letter messages across the system.
Hostname: Broker hostname to be used when connecting to Memphis using clients.
Account ID: Account ID to be used when connecting to Memphis using clients.
Hello world
Go Python Node.js TypeScript .NET
Full code examples can be found in this repo
Step 1: Create an empty dir for the Go project
Copy mkdir memphis-demo && \
cd memphis-demo
Step 2: Init the newly created project
Copy go mod init memphis-demo
Step 3: In your project's directory, install Memphis Go SDK
Copy go get github.com/memphisdev/memphis.go
Step 4: Create a new Go file called producer.go
Copy package main
import (
"fmt"
"os"
"github.com/memphisdev/memphis.go"
)
func main () {
conn, err := memphis. Connect ( "BROKER_HOSTNAME" ,
"APPLICATION_TYPE_USERNAME" ,
memphis. Password ( "PASSWORD" ), // depends on how Memphis deployed - default is connection token-based authentication
memphis. AccountId ( 123456789 ),
)
if err != nil {
os. Exit ( 1 )
}
defer conn. Close ()
p, err := conn. CreateProducer ( "STATION_NAME" , "PRODUCER_NAME" )
hdrs := memphis . Headers {}
hdrs. New ()
err = hdrs. Add ( "key" , "value" )
if err != nil {
fmt. Errorf ( "Header failed: %v " , err)
os. Exit ( 1 )
}
err = p. Produce ([] byte ( "You have a message!" ), memphis. MsgHeaders (hdrs))
if err != nil {
fmt. Errorf ( "Produce failed: %v " , err)
os. Exit ( 1 )
}
}
Step 4: Run producer.go
Step 5: Create a new Go file called consumer.go
Copy package main
import (
"fmt"
"context"
"os"
"time"
"github.com/memphisdev/memphis.go"
)
func main () {
conn, err := memphis. Connect ( "BROKER_HOSTNAME" ,
"APPLICATION_TYPE_USERNAME" ,
memphis. Password ( "PASSWORD" ), // depends on how Memphis deployed - default is connection token-based authentication
memphis. AccountId ( 123456789 ),
)
if err != nil {
os. Exit ( 1 )
}
defer conn. Close ()
consumer, err := conn. CreateConsumer ( "STATION_NAME" , "CONSUMER_NAME" , memphis. PullInterval ( 15 * time.Second))
if err != nil {
fmt. Printf ( "Consumer creation failed: %v
" , err)
os. Exit ( 1 )
}
handler := func (msgs [] * memphis . Msg , err error , ctx context . Context ) {
if err != nil {
fmt. Printf ( "Fetch failed: %v
" , err)
return
}
for _, msg := range msgs {
fmt. Println ( string (msg. Data ()))
msg. Ack ()
headers := msg. GetHeaders ()
fmt. Println (headers)
}
}
ctx := context. Background ()
ctx = context. WithValue (ctx, "key" , "value" )
consumer. SetContext (ctx)
consumer. Consume (handler)
// The program will close the connection after 30 seconds,
// the message handler may be called after the connection closed
// so the handler may receive a timeout error
time. Sleep ( 30 * time.Second)
}
Step 6: Run consumer.go
Full code examples can be found in this repo
Step 1: Create an empty dir for the Python project
Copy mkdir memphis-demo && \
cd memphis-demo
Step 2: In your project's directory, install Memphis Python SDK
Copy pip3 install --upgrade memphis-py
Step 3: Create a new Python file called producer.py
Copy from memphis import Memphis , Headers
from memphis . types import Retention , Storage
import asyncio
async def main ():
try :
memphis = Memphis ()
await memphis.connect(host="MEMPHIS_HOSTNAME", username="MEMPHIS_APPLICATION_USER", password="PASSWORD", account_id=ACCOUNT_ID)
producer = await memphis . producer (station_name = "STATION_NAME" , producer_name = "PRODUCER_NAME" )
headers = Headers ()
headers . add ( "key" , "value" )
for i in range ( 5 ):
await producer.produce(bytearray('Message #'+str(i)+': Hello world', 'utf-8'), headers=headers) # you can send the message parameter as dict as well
except (MemphisError , MemphisConnectError , MemphisHeaderError , MemphisSchemaError) as e :
print (e)
finally :
await memphis . close ()
if __name__ == '__main__' :
asyncio . run ( main ())
Step 4: Run producer.py
Step 5: Create a new Python file called consumer.py
Copy from memphis import Memphis , Headers
from memphis . types import Retention , Storage
import asyncio
async def main ():
async def msg_handler ( msgs , error , context ):
try :
for msg in msgs :
print ( "message: " , msg. get_data ())
await msg . ack ()
headers = msg . get_headers ()
if error :
print (error)
except (MemphisError , MemphisConnectError , MemphisHeaderError) as e :
print (e)
return
try :
memphis = Memphis ()
await memphis.connect(host="MEMPHIS_HOSTNAME", username="MEMPHIS_APPLICATION_USER", password="PASSWORD", account_id=ACCOUNT_ID)
consumer = await memphis.consumer(station_name="STATION_NAME", consumer_name="CONSUMER_NAME", consumer_group="CONSUMER_GROUP_NAME")
consumer . set_context ({ "key" : "value" })
consumer . consume (msg_handler)
# Keep your main thread alive so the consumer will keep receiving data
await asyncio . Event (). wait ()
except (MemphisError , MemphisConnectError) as e :
print (e)
finally :
await memphis . close ()
if __name__ == '__main__' :
asyncio . run ( main ())
Step 6: Run consumer.py
Full code examples can be found in this repo
Step 1: Create an empty dir for the Node.js project
Copy mkdir memphis-demo && \
cd memphis-demo
Step 2: Create a new Node project (If needed)
Step 3: Install memphis Node.js SDK
Copy npm install memphis-dev
Step 4: Create a new .js file called producer.js
Copy const { memphis } = require ( "memphis-dev" );
( async function () {
let memphisConnection;
try {
memphisConnection = await memphis .connect ({
host : "MEMPHIS_BROKER_HOSTNAME" ,
username : "APPLICATION_TYPE_USERNAME" ,
password : "PASSWORD" ,
accountId : ACCOUNT_ID
});
const producer = await memphisConnection .producer ({
stationName : "STATION_NAME" ,
producerName : "PRODUCER_NAME" ,
});
const headers = memphis .headers ();
headers .add ( "KEY" , "VALUE" );
await producer .produce ({
message : Buffer .from ( "Message: Hello world" ) , // you can also send JS object - {}
headers : headers ,
});
memphisConnection .close ();
} catch (ex) {
console .log (ex);
if (memphisConnection) memphisConnection .close ();
}
})();
Step 5: Run producer.js
Step 6: Create a new .js file called consumer.js
Copy const { memphis } = require ( "memphis-dev" );
( async function () {
let memphisConnection;
try {
memphisConnection = await memphis .connect ({
host : "MEMPHIS_BROKER_HOSTNAME" ,
username : "APPLICATION_TYPE_USERNAME" ,
password : "PASSWORD" ,
accountId : ACCOUNT_ID
});
const consumer = await memphisConnection .consumer ({
stationName : "STATION_NAME" ,
consumerName : "CONSUMER_NAME" ,
consumerGroup : "CONSUMER_GROUP_NAME" ,
});
consumer .setContext ({ key : "value" });
consumer .on ( "message" , (message , context) => {
console .log ( message .getData () .toString ());
message .ack ();
const headers = message .getHeaders ();
});
consumer .on ( "error" , (error) => {});
} catch (ex) {
console .log (ex);
if (memphisConnection) memphisConnection .close ();
}
})();
Step 7: Run consumer.js
Full code examples can be found in this repo
Step 1: Create an empty dir for the new project
Copy mkdir memphis-demo && \
cd memphis-demo
Step 2: Install memphis Node.js SDK
Copy dotnet add package Memphis.Client
Step 3: Create a new .cs file called Producer.cs
Copy using System .Collections.Specialized;
using System .Text;
using Memphis .Client;
using Memphis .Client.Producer;
namespace Producer
{
class ProducerApp
{
public static async Task Main (string[] args)
{
try
{
var options = MemphisClientFactory .GetDefaultOptions ();
options .Host = "aws-us-east-1.cloud.memphis.dev" ;
options .Username = "client_type_username" ;
options .Password = "client_type_password" ;
var client = await MemphisClientFactory .CreateClient (options);
options .AccountId = 123456789 ;
var producer = await client .CreateProducer ( new MemphisProducerOptions
{
StationName = "station_name" ,
ProducerName = "producer_name" ,
});
var commonHeaders = new NameValueCollection ();
commonHeaders .Add ( "key-1" , "value-1" );
for (int i = 0 ; i < 10_000000 ; i ++ )
{
await Task .Delay ( 1_000 );
var text = $ "Message #{i}: Welcome to Memphis" ;
await producer .ProduceAsync ( Encoding . UTF8 .GetBytes (text) , commonHeaders);
Console .WriteLine ($ "Message #{i} sent successfully" );
}
await producer .DestroyAsync ();
}
catch (Exception ex)
{
Console . Error .WriteLine ( "Exception: " + ex .Message);
Console . Error .WriteLine (ex);
}
}
}
}
Step 4: Create a new .ts file called Consumer.cs
Copy using System .Text;
using Memphis .Client;
using Memphis .Client.Consumer;
namespace Consumer
{
class ConsumerApp
{
public static async Task Main (string[] args)
{
try
{
var options = MemphisClientFactory .GetDefaultOptions ();
options .Host = "aws-us-east-1.cloud.memphis.dev" ;
options .Username = "client_type_username" ;
options .Password = "client_type_password" ;
var client = await MemphisClientFactory .CreateClient (options);
options .AccountId = 123456789 ;
var consumer = await client .CreateConsumer ( new MemphisConsumerOptions
{
StationName = "STATION_NAME" ,
ConsumerName = "CONSUMER_NAME" ,
ConsumerGroup = "CONSUMER_GROUP_NAME" ,
});
consumer .MessageReceived += (sender , args) =>
{
if ( args .Exception != null )
{
Console . Error .WriteLine ( args .Exception);
return ;
}
foreach (var msg in args .MessageList)
{
//print message itself
Console .WriteLine ( "Received data: " + Encoding . UTF8 .GetString ( msg .GetData ()));
// print message headers
foreach (var headerKey in msg .GetHeaders ().Keys)
{
Console .WriteLine (
$ "Header Key: {headerKey}, value: {msg.GetHeaders()[headerKey.ToString()]}" );
}
Console .WriteLine ( "---------" );
msg .Ack ();
}
Console .WriteLine ( "destroyed" );
};
consumer .ConsumeAsync ();
// Wait 10 seconds, consumer starts to consume, if you need block main thread use await keyword.
await Task .Delay ( 10_000 );
await consumer .DestroyAsync ();
}
catch (Exception ex)
{
Console . Error .WriteLine ( "Exception: " + ex .Message);
Console . Error .WriteLine (ex);
}
}
}
}
Full code examples can be found in this repo
Step 1: Create an empty dir for the Go project
Copy mkdir memphis-demo && \
cd memphis-demo
Step 2: Init the newly created project
Copy go mod init memphis-demo
Step 3: In your project's directory, install Memphis Go SDK
Copy go get github.com/memphisdev/memphis.go
Step 4: Create a new Go file called producer.go
Copy package main
import (
"fmt"
"os"
"github.com/memphisdev/memphis.go"
)
func main () {
conn, err := memphis. Connect ( "BROKER_HOSTNAME" ,
"APPLICATION_TYPE_USERNAME" ,
memphis. Password ( "PASSWORD" ), // depends on how Memphis deployed - default is connection token-based authentication
memphis. AccountId ( 123456789 ),
)
if err != nil {
os. Exit ( 1 )
}
defer conn. Close ()
p, err := conn. CreateProducer ( "STATION_NAME" , "PRODUCER_NAME" )
hdrs := memphis . Headers {}
hdrs. New ()
err = hdrs. Add ( "key" , "value" )
if err != nil {
fmt. Errorf ( "Header failed: %v " , err)
os. Exit ( 1 )
}
err = p. Produce ([] byte ( "You have a message!" ), memphis. MsgHeaders (hdrs))
if err != nil {
fmt. Errorf ( "Produce failed: %v " , err)
os. Exit ( 1 )
}
}
Step 4: Run producer.go
Step 5: Create a new Go file called consumer.go
Copy package main
import (
"fmt"
"context"
"os"
"time"
"github.com/memphisdev/memphis.go"
)
func main () {
conn, err := memphis. Connect ( "BROKER_HOSTNAME" ,
"APPLICATION_TYPE_USERNAME" ,
memphis. Password ( "PASSWORD" ), // depends on how Memphis deployed - default is connection token-based authentication
memphis. AccountId ( 123456789 ),
)
if err != nil {
os. Exit ( 1 )
}
defer conn. Close ()
consumer, err := conn. CreateConsumer ( "STATION_NAME" , "CONSUMER_NAME" , memphis. PullInterval ( 15 * time.Second))
if err != nil {
fmt. Printf ( "Consumer creation failed: %v
" , err)
os. Exit ( 1 )
}
handler := func (msgs [] * memphis . Msg , err error , ctx context . Context ) {
if err != nil {
fmt. Printf ( "Fetch failed: %v
" , err)
return
}
for _, msg := range msgs {
fmt. Println ( string (msg. Data ()))
msg. Ack ()
headers := msg. GetHeaders ()
fmt. Println (headers)
}
}
ctx := context. Background ()
ctx = context. WithValue (ctx, "key" , "value" )
consumer. SetContext (ctx)
consumer. Consume (handler)
// The program will close the connection after 30 seconds,
// the message handler may be called after the connection closed
// so the handler may receive a timeout error
time. Sleep ( 30 * time.Second)
}
Step 6: Run consumer.go
Last updated 3 months ago