Search…
⌃K
Links

Step 2 - Hello World

Creating your 1st station, producer, and consumer!
Please follow the steps below in your preferred language.
Node.js
TypeScript
NestJS
Go
Python
Please make sure you have node.js installed​
Step 1: Create an empty dir for the node.js project
mkdir memphis-demo && \
cd memphis-demo
Step 2: Create a new node project (If needed)
npm init -y
Step 3: Install memphis node.js SDK
npm install memphis-dev
Step 4: Create a new .js file called producer.js
producer.js
1
const memphis = require("memphis-dev");
2
​
3
(async function () {
4
let memphisConnection
5
​
6
try {
7
memphisConnection = await memphis.connect({
8
host: '<Memphis_hostname>',
9
username: '<application_type_user>',
10
connectionToken: '<connection_token>'
11
});
12
​
13
const producer = await memphisConnection.producer({
14
stationName: '<station_name>',
15
producerName: '<producer_name>'
16
});
17
​
18
const headers = memphis.headers()
19
headers.add('key', 'value')
20
await producer.produce({
21
message: Buffer.from("Message: Hello world"), // you can also send JS object - {}
22
headers: headers
23
});
24
​
25
memphisConnection.close();
26
} catch (ex) {
27
console.log(ex);
28
if (memphisConnection) memphisConnection.close();
29
}
30
})();
31
Step 5: Run producer.js
node producer.js
Step 6: Create a new .js file called consumer.js
consumer.js
1
const memphis = require('memphis-dev');
2
​
3
(async function () {
4
let memphisConnection;
5
​
6
try {
7
memphisConnection = await memphis.connect({
8
host: '<Memphis_hostname>',
9
username: '<application_type_user>',
10
connectionToken: '<connection_token>'
11
});
12
​
13
const consumer = await memphisConnection.consumer({
14
stationName: '<station_name',
15
consumerName: '<consumer_name>',
16
consumerGroup: '<consumer_group_name>'
17
});
18
​
19
consumer.setContext({ key: "value" });
20
consumer.on('message', (message, context) => {
21
console.log(message.getData().toString());
22
message.ack();
23
const headers = message.getHeaders()
24
});
25
​
26
consumer.on('error', (error) => {});
27
} catch (ex) {
28
console.log(ex);
29
if (memphisConnection) memphisConnection.close();
30
}
31
})();
Step 7: Run consumer.js
node consumer.js
Please make sure you have node.js installed​
Step 1: Create an empty dir for the node.js project
mkdir memphis-demo && \
cd memphis-demo
Step 2: Create a new node project (If needed)
npm init -y
Step 3: Install memphis node.js SDK
npm install memphis-dev
Step 4: Create a new .ts file called producer.ts
producer.ts
1
import memphis from 'memphis-dev';
2
import type { Memphis } from 'memphis-dev/types';
3
​
4
(async function () {
5
let memphisConnection: Memphis;
6
​
7
try {
8
memphisConnection = await memphis.connect({
9
host: '<Memphis_hostname>',
10
username: '<application_type_user>',
11
connectionToken: '<connection_token>'
12
});
13
​
14
const producer = await memphisConnection.producer({
15
stationName: 'STATION_NAME',
16
producerName: 'PRODUCER_NAME'
17
});
18
​
19
const headers = memphis.headers()
20
headers.add('key', 'value');
21
await producer.produce({
22
message: Buffer.from("Message: Hello world"), // you can also send JS object - {}
23
headers: headers
24
});
25
​
26
memphisConnection.close();
27
} catch (ex) {
28
console.log(ex);
29
if (memphisConnection) memphisConnection.close();
30
}
31
})();
Step 5: Run producer.ts
node producer.ts
Step 6: Create a new .ts file called consumer.ts
consumer.ts
1
import memphis from 'memphis-dev';
2
import { Memphis, Message } from 'memphis-dev/types';
3
​
4
(async function () {
5
let memphisConnection: Memphis;
6
​
7
try {
8
memphisConnection = await memphis.connect({
9
host: '<Memphis_hostname>',
10
username: '<application_type_user>',
11
connectionToken: '<connection_token>'
12
});
13
​
14
const consumer = await memphisConnection.consumer({
15
stationName: 'STATION_NAME',
16
consumerName: 'CONSUMER_NAME',
17
consumerGroup: 'CONSUMER_GROUP_NAME'
18
});
19
​
20
consumer.setContext({ key: "value" });
21
consumer.on('message', (message: Message, context: object) => {
22
console.log(message.getData().toString());
23
message.ack();
24
const headers = message.getHeaders()
25
});
26
​
27
consumer.on('error', (error) => {
28
console.log(error);
29
});
30
} catch (ex) {
31
console.log(ex);
32
if (memphisConnection) memphisConnection.close();
33
}
34
})();
Step 7: Run consumer.ts
node consumer.ts
Please make sure you have node.js installed​
Step 1: Create an empty dir for the node.js project
mkdir memphis-demo && \
cd memphis-demo
Step 2: Create a new node project (If needed)
npm init -y
Step 3: Install memphis node.js SDK
npm install memphis-dev
Step 4: Create a new .ts file called producer.module.ts
producer.module.ts
1
import { Module } from "@nestjs/common";
2
import { MemphisModule, MemphisService } from "memphis-dev/nest"
3
import type { Memphis } from 'memphis-dev/types';
4
@Module({
5
imports: [MemphisModule.register()],
6
})
7
export class ProducerModule {
8
constructor(private memphis: MemphisService) { }
9
​
10
startProducer() {
11
(async function () {
12
let memphisConnection: Memphis;
13
14
try {
15
memphisConnection = await this.memphis.connect({
16
host: '<Memphis_hostname>',
17
username: '<application_type_user>',
18
connectionToken: '<connection_token>'
19
});
20
​
21
const producer = await memphisConnection.producer({
22
stationName: 'STATION_NAME',
23
producerName: 'PRODUCER_NAME'
24
});
25
​
26
for (let index = 0; index < 100; index++) {
27
await producer.produce({
28
message: Buffer.from(`Message #${index}: Hello world`) // you can also send JS object - {}
29
});
30
console.log("Message sent");
31
}
32
​
33
console.log("All messages sent");
34
memphisConnection.close();
35
} catch (ex) {
36
console.log(ex);
37
if (memphisConnection)
38
memphisConnection.close();
39
}
40
})();
41
}
42
}
Step 5: Create a new .ts file called consumer.controller.ts
consumer.controller.ts
1
import { Controller } from '@nestjs/common';
2
import { consumeMessage } from 'memphis-dev/nest';
3
import type { Message } from 'memphis-dev/types';
4
​
5
@Controller('auth')
6
export class ExampleController {
7
@consumeMessage({
8
stationName: '<station-name>',
9
consumerName: '<consumer-name>',
10
consumerGroup: ''
11
})
12
async messageHandler(message: Message) {
13
console.log(message.getData().toString());
14
message.ack();
15
}
16
}
Step 1: Create an empty dir for the Go project
mkdir memphis-demo && \
cd memphis-demo
Step 2: In your project's directory, install Memphis Go SDK
go get github.com/memphisdev/memphis.go
Step 3: Create a new Go file called producer.go
producer.go
1
package main
2
​
3
import (
4
"fmt"
5
"os"
6
​
7
"github.com/memphisdev/memphis.go"
8
)
9
​
10
func main() {
11
conn, err := memphis.Connect("MEMPHIS_HOSTNAME", "MEMPHIS_APPLICATION_USER", "MEMPHIS_CONNECTION_TOKEN")
12
if err != nil {
13
os.Exit(1)
14
}
15
defer conn.Close()
16
p, err := conn.CreateProducer("STATION_NAME", "PRODUCER_NAME")
17
​
18
hdrs := memphis.Headers{}
19
hdrs.New()
20
err = hdrs.Add("key", "value")
21
​
22
if err != nil {
23
fmt.Errorf("Header failed: %v", err)
24
os.Exit(1)
25
}
26
​
27
err = p.Produce([]byte("You have a message!"), memphis.MsgHeaders(hdrs))
28
​
29
if err != nil {
30
fmt.Errorf("Produce failed: %v", err)
31
os.Exit(1)
32
}
33
}
Step 4: Run producer.go
go run producer.go
Step 5: Create a new Go file called consumer.go
consumer.go
1
package main
2
​
3
import (
4
"fmt"
5
"context"
6
"os"
7
"time"
8
​
9
"github.com/memphisdev/memphis.go"
10
)
11
​
12
func main() {
13
conn, err := memphis.Connect("MEMPHIS_HOSTNAME", "MEMPHIS_APPLICATION_USER", "MEMPHIS_CONNECTION_TOKEN")
14
if err != nil {
15
os.Exit(1)
16
}
17
defer conn.Close()
18
​
19
consumer, err := conn.CreateConsumer("STATION_NAME", "CONSUMER_NAME", memphis.PullInterval(15*time.Second))
20
​
21
if err != nil {
22
fmt.Printf("Consumer creation failed: %v
23
", err)
24
os.Exit(1)
25
}
26
​
27
handler := func(msgs []*memphis.Msg, err error, ctx context.Context) {
28
if err != nil {
29
fmt.Printf("Fetch failed: %v
30
", err)
31
return
32
}
33
​
34
for _, msg := range msgs {
35
fmt.Println(string(msg.Data()))
36
msg.Ack()
37
headers := msg.GetHeaders()
38
fmt.Println(headers)
39
}
40
}
41
​
42
ctx := context.Background()
43
ctx = context.WithValue(ctx, "key", "value)
44
consumer.SetContext(ctx)
45
consumer.Consume(handler)
46
​
47
// The program will close the connection after 30 seconds,
48
// the message handler may be called after the connection closed
49
// so the handler may receive a timeout error
50
time.Sleep(30 * time.Second)
51
}
Step 6: Run consumer.go
go run consumer.go
Step 1: Create an empty dir for the Go project
mkdir memphis-demo && \
cd memphis-demo
Step 2: In your project's directory, install Memphis Python SDK
pip3 install --upgrade memphis-py
Step 3: Create a new Python file called producer.py
producer.py
1
import asyncio
2
from memphis import Memphis, Headers, MemphisError, MemphisConnectError, MemphisHeaderError, MemphisSchemaError
3
4
async def main():
5
try:
6
memphis = Memphis()
7
await memphis.connect(host="MEMPHIS_HOSTNAME", username="MEMPHIS_APPLICATION_USER", connection_token="MEMPHIS_CONNECTION_TOKEN")
8
9
producer = await memphis.producer(station_name="STATION_NAME", producer_name="PRODUCER_NAME")
10
headers = Headers()
11
headers.add("key", "value")
12
for i in range(5):
13
await producer.produce(bytearray('Message #'+str(i)+': Hello world', 'utf-8'), headers=headers) # you can send the message parameter as dict as well
14
15
except (MemphisError, MemphisConnectError, MemphisHeaderError, MemphisSchemaError) as e:
16
print(e)
17
18
finally:
19
await memphis.close()
20
21
if __name__ == '__main__':
22
asyncio.run(main())
Step 4: Run producer.py
python3 producer.py
Step 5: Create a new Python file called consumer.py
consumer.py
1
import asyncio
2
from memphis import Memphis, MemphisError, MemphisConnectError, MemphisHeaderError
3
4
async def main():
5
async def msg_handler(msgs, error, context):
6
try:
7
for msg in msgs:
8
print("message: ", msg.get_data())
9
await msg.ack()
10
headers = msg.get_headers()
11
if error:
12
print(error)
13
except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
14
print(e)
15
return
16
17
try:
18
memphis = Memphis()
19
await memphis.connect(host="MEMPHIS_HOSTNAME", username="MEMPHIS_APPLICATION_USER", connection_token="MEMPHIS_CONNECTION_TOKEN")
20
21
consumer = await memphis.consumer(station_name="STATION_NAME", consumer_name="CONSUMER_NAME", consumer_group="CONSUMER_GROUP_NAME")
22
consumer.set_context({"key": "value"})
23
consumer.consume(msg_handler)
24
# Keep your main thread alive so the consumer will keep receiving data
25
await asyncio.Event().wait()
26
27
except (MemphisError, MemphisConnectError) as e:
28
print(e)
29
30
finally:
31
await memphis.close()
32
33
if __name__ == '__main__':
34
asyncio.run(main())
Step 6: Run consumer.py
python3 consumer.py