Search
⌃K
Links

Step 2 - Hello World

Creating your 1st station, producer, and consumer!
Please follow the steps below in your preferred language.
Node.js
TypeScript
NestJS
Go
Python
REST
Please make sure you have node.js installed
Step 1: Create an empty dir for the node.js project
mkdir memphis-demo && \
cd memphis-demo
Step 2: Create a new node project (If needed)
npm init -y
Step 3: Install memphis node.js SDK
npm install memphis-dev
Step 4: Create a new .js file called producer.js
producer.js
1
const { memphis } = require("memphis-dev");
2
3
(async function () {
4
let memphisConnection
5
6
try {
7
memphisConnection = await memphis.connect({
8
host: 'MEMPHIS_BROKER_HOSTNAME',
9
username: 'APPLICATION_TYPE_USERNAME',
10
password: 'PASSWORD'
11
});
12
13
const producer = await memphisConnection.producer({
14
stationName: 'STATION_NAME',
15
producerName: 'PRODUCER_NAME'
16
});
17
18
const headers = memphis.headers()
19
headers.add('KEY', 'VALUE')
20
await producer.produce({
21
message: Buffer.from("Message: Hello world"), // you can also send JS object - {}
22
headers: headers
23
});
24
25
memphisConnection.close();
26
} catch (ex) {
27
console.log(ex);
28
if (memphisConnection) memphisConnection.close();
29
}
30
})();
Step 5: Run producer.js
node producer.js
Step 6: Create a new .js file called consumer.js
consumer.js
1
const { memphis } = require("memphis-dev");
2
3
(async function () {
4
let memphisConnection;
5
6
try {
7
memphisConnection = await memphis.connect({
8
host: 'MEMPHIS_BROKER_HOSTNAME',
9
username: 'APPLICATION_TYPE_USERNAME',
10
password: 'PASSWORD'
11
});
12
13
const consumer = await memphisConnection.consumer({
14
stationName: 'STATION_NAME',
15
consumerName: 'CONSUMER_NAME',
16
consumerGroup: 'CONSUMER_GROUP_NAME'
17
});
18
19
consumer.setContext({ key: "value" });
20
consumer.on('message', (message, context) => {
21
console.log(message.getData().toString());
22
message.ack();
23
const headers = message.getHeaders()
24
});
25
26
consumer.on('error', (error) => {});
27
} catch (ex) {
28
console.log(ex);
29
if (memphisConnection) memphisConnection.close();
30
}
31
})();
Step 7: Run consumer.js
node consumer.js
Please make sure you have node.js installed
Step 1: Create an empty dir for the node.js project
mkdir memphis-demo && \
cd memphis-demo
Step 2: Create a new node project (If needed)
npm init -y
Step 3: Install memphis node.js SDK
npm install memphis-dev
Step 4: Create a new .ts file called producer.ts
producer.ts
1
import { memphis, Memphis } from 'memphis-dev';
2
3
(async function () {
4
let memphisConnection: Memphis;
5
6
try {
7
memphisConnection = await memphis.connect({
8
host: 'MEMPHIS_BROKER_HOSTNAME',
9
username: 'APPLICATION_TYPE_USERNAME',
10
password: 'PASSWORD'
11
});
12
13
const producer = await memphisConnection.producer({
14
stationName: 'STATION_NAME',
15
producerName: 'PRODUCER_NAME'
16
});
17
18
const headers = memphis.headers()
19
headers.add('key', 'value');
20
await producer.produce({
21
message: Buffer.from("Message: Hello world"), // you can also send JS object - {}
22
headers: headers
23
});
24
25
memphisConnection.close();
26
} catch (ex) {
27
console.log(ex);
28
if (memphisConnection) memphisConnection.close();
29
}
30
})();
Step 5: Run producer.ts
node producer.ts
Step 6: Create a new .ts file called consumer.ts
consumer.ts
1
import { memphis, Memphis } from 'memphis-dev';
2
3
(async function () {
4
let memphisConnection: Memphis;
5
6
try {
7
memphisConnection = await memphis.connect({
8
host: 'MEMPHIS_BROKER_HOSTNAME',
9
username: 'APPLICATION_TYPE_USERNAME',
10
password: 'PASSWORD'
11
});
12
13
const consumer = await memphisConnection.consumer({
14
stationName: 'STATION_NAME',
15
consumerName: 'CONSUMER_NAME',
16
consumerGroup: 'CONSUMER_GROUP_NAME'
17
});
18
19
consumer.setContext({ key: "value" });
20
consumer.on('message', (message: Message, context: object) => {
21
console.log(message.getData().toString());
22
message.ack();
23
const headers = message.getHeaders()
24
});
25
26
consumer.on('error', (error) => {
27
console.log(error);
28
});
29
} catch (ex) {
30
console.log(ex);
31
if (memphisConnection) memphisConnection.close();
32
}
33
})();
Step 7: Run consumer.ts
node consumer.ts
Please make sure you have node.js installed
Step 1: Create an empty dir for the node.js project
mkdir memphis-demo && \
cd memphis-demo
Step 2: Create a new node project (If needed)
npm init -y
Step 3: Install memphis node.js SDK
npm install memphis-dev
Step 4: Create a new .ts file called producer.module.ts
producer.module.ts
1
import { Module } from "@nestjs/common";
2
import { Memphis, MemphisModule, MemphisService } from "memphis-dev"
3
4
@Module({
5
imports: [MemphisModule.register()],
6
})
7
export class ProducerModule {
8
constructor(private memphis: MemphisService) { }
9
10
startProducer() {
11
(async function () {
12
let memphisConnection: Memphis;
13
14
try {
15
memphisConnection = await memphis.connect({
16
host: 'MEMPHIS_BROKER_HOSTNAME',
17
username: 'APPLICATION_TYPE_USERNAME',
18
password: 'PASSWORD'
19
});
20
21
const producer = await memphisConnection.producer({
22
stationName: 'STATION_NAME',
23
producerName: 'PRODUCER_NAME'
24
});
25
26
for (let index = 0; index < 100; index++) {
27
await producer.produce({
28
message: Buffer.from(`Message #${index}: Hello world`) // you can also send JS object - {}
29
});
30
console.log("Message sent");
31
}
32
33
console.log("All messages sent");
34
memphisConnection.close();
35
} catch (ex) {
36
console.log(ex);
37
if (memphisConnection)
38
memphisConnection.close();
39
}
40
})();
41
}
42
}
Step 5: Create a new .ts file called consumer.controller.ts
consumer.controller.ts
1
import { Controller } from '@nestjs/common';
2
import { consumeMessage } from 'memphis-dev/nest';
3
import type { Message } from 'memphis-dev/types';
4
5
@Controller('auth')
6
export class ExampleController {
7
@consumeMessage({
8
stationName: 'STATION_NAME',
9
consumerName: 'CONSUMER_NAME',
10
consumerGroup: 'CONSUMER_GROUP_NAME'
11
})
12
async messageHandler(message: Message) {
13
console.log(message.getData().toString());
14
message.ack();
15
}
16
}
Step 1: Create an empty dir for the Go project
mkdir memphis-demo && \
cd memphis-demo
Step 2: Init the newly created project
go mod init memphis-demo
Step 3: In your project's directory, install Memphis Go SDK
go get github.com/memphisdev/memphis.go
Step 4: Create a new Go file called producer.go
producer.go
1
package main
2
3
import (
4
"fmt"
5
"os"
6
7
"github.com/memphisdev/memphis.go"
8
)
9
10
func main() {
11
conn, err := memphis.Connect("MEMPHIS_HOSTNAME", "MEMPHIS_APPLICATION_USER", memphis.Password("PASSWORD"))
12
if err != nil {
13
os.Exit(1)
14
}
15
defer conn.Close()
16
p, err := conn.CreateProducer("STATION_NAME", "PRODUCER_NAME")
17
18
hdrs := memphis.Headers{}
19
hdrs.New()
20
err = hdrs.Add("key", "value")
21
22
if err != nil {
23
fmt.Errorf("Header failed: %v", err)
24
os.Exit(1)
25
}
26
27
err = p.Produce([]byte("You have a message!"), memphis.MsgHeaders(hdrs))
28
29
if err != nil {
30
fmt.Errorf("Produce failed: %v", err)
31
os.Exit(1)
32
}
33
}
Step 4: Run producer.go
go run producer.go
Step 5: Create a new Go file called consumer.go
consumer.go
1
package main
2
3
import (
4
"fmt"
5
"context"
6
"os"
7
"time"
8
9
"github.com/memphisdev/memphis.go"
10
)
11
12
func main() {
13
conn, err := memphis.Connect("MEMPHIS_HOSTNAME", "MEMPHIS_APPLICATION_USER", memphis.Password("PASSWORD"))
14
if err != nil {
15
os.Exit(1)
16
}
17
defer conn.Close()
18
19
consumer, err := conn.CreateConsumer("STATION_NAME", "CONSUMER_NAME", memphis.PullInterval(15*time.Second))
20
21
if err != nil {
22
fmt.Printf("Consumer creation failed: %v
23
", err)
24
os.Exit(1)
25
}
26
27
handler := func(msgs []*memphis.Msg, err error, ctx context.Context) {
28
if err != nil {
29
fmt.Printf("Fetch failed: %v
30
", err)
31
return
32
}
33
34
for _, msg := range msgs {
35
fmt.Println(string(msg.Data()))
36
msg.Ack()
37
headers := msg.GetHeaders()
38
fmt.Println(headers)
39
}
40
}
41
42
ctx := context.Background()
43
ctx = context.WithValue(ctx, "key", "value")
44
consumer.SetContext(ctx)
45
consumer.Consume(handler)
46
47
// The program will close the connection after 30 seconds,
48
// the message handler may be called after the connection closed
49
// so the handler may receive a timeout error
50
time.Sleep(30 * time.Second)
51
}
Step 6: Run consumer.go
go run consumer.go
Step 1: Create an empty dir for the Go project
mkdir memphis-demo && \
cd memphis-demo
Step 2: In your project's directory, install Memphis Python SDK
pip3 install --upgrade memphis-py
Step 3: Create a new Python file called producer.py
producer.py
1
from memphis import Memphis, Headers
2
from memphis.types import Retention, Storage
3
4
async def main():
5
try:
6
memphis = Memphis()
7
await memphis.connect(host="MEMPHIS_HOSTNAME", username="MEMPHIS_APPLICATION_USER", password="PASSWORD")
8
9
producer = await memphis.producer(station_name="STATION_NAME", producer_name="PRODUCER_NAME")
10
headers = Headers()
11
headers.add("key", "value")
12
for i in range(5):
13
await producer.produce(bytearray('Message #'+str(i)+': Hello world', 'utf-8'), headers=headers) # you can send the message parameter as dict as well
14
15
except (MemphisError, MemphisConnectError, MemphisHeaderError, MemphisSchemaError) as e:
16
print(e)
17
18
finally:
19
await memphis.close()
20
21
if __name__ == '__main__':
22
asyncio.run(main())
Step 4: Run producer.py
python3 producer.py
Step 5: Create a new Python file called consumer.py
consumer.py
1
from memphis import Memphis, Headers
2
from memphis.types import Retention, Storage
3
4
async def main():
5
async def msg_handler(msgs, error, context):
6
try:
7
for msg in msgs:
8
print("message: ", msg.get_data())
9
await msg.ack()
10
headers = msg.get_headers()
11
if error:
12
print(error)
13
except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
14
print(e)
15
return
16
17
try:
18
memphis = Memphis()
19
await memphis.connect(host="MEMPHIS_HOSTNAME", username="MEMPHIS_APPLICATION_USER", password="PASSWORD")
20
21
consumer = await memphis.consumer(station_name="STATION_NAME", consumer_name="CONSUMER_NAME", consumer_group="CONSUMER_GROUP_NAME")
22
consumer.set_context({"key": "value"})
23
consumer.consume(msg_handler)
24
# Keep your main thread alive so the consumer will keep receiving data
25
await asyncio.Event().wait()
26
27
except (MemphisError, MemphisConnectError) as e:
28
print(e)
29
30
finally:
31
await memphis.close()
32
33
if __name__ == '__main__':
34
asyncio.run(main())
Step 6: Run consumer.py
python3 consumer.py
Producing messages to Memphis via REST API can be implemented using any REST-supported language like Go, Python, Java, Node.js, .NET, etc...
For the following tutorial, we will use Node.js .
Step 1: Create an empty dir for the node.js project
mkdir memphis-demo && \
cd memphis-demo
Step 2: Create a new node project (If needed)
npm init -y
Step 3: Generate a new JWT token generate.js
generate.js
1
var axios = require('axios');
2
var data = JSON.stringify({
3
"username": "APPLICATION_TYPE_USERNAME",
4
"password": "PASSWORD",
5
"token_expiry_in_minutes": 123,
6
"refresh_token_expiry_in_minutes": 10000092
7
});
8
9
var config = {
10
method: 'post',
11
url: 'BROKER_RESTGW_URL',
12
headers: {
13
'Content-Type': 'application/json'
14
},
15
data : data
16
};
17
18
axios(config)
19
.then(function (response) {
20
console.log(JSON.stringify(response.data));
21
})
22
.catch(function (error) {
23
console.log(error);
24
});
Step 4: Run generate.js and copy the returned JWT
node generate.js
Step 5: Create a new file called producer.js
var axios = require('axios');
var data = JSON.stringify({
"message": "New Message"
});
var config = {
method: 'post',
url: 'https://BROKER_RESTGW_URL/stations/hps/produce/single',
headers: {
'Authorization': 'Bearer <jwt>',
'Content-Type': 'application/json'
},
data : data
};
axios(config)
.then(function (response) {
console.log(JSON.stringify(response.data));
})
.catch(function (error) {
console.log(error);
});
Consume messages via REST will soon be released.