Search…
Go

Installation

First, install Memphis via K8S / Docker.
In your project's directory:
go get github.com/memphisdev/memphis.go

Importing

import "github.com/memphisdev/memphis.go"

Connecting to Memphis

c, err := memphis.Connect("<memphis-host>",
"<application type username>",
"<broker-token>")
It is possible to pass connection configuration parameters, as function-parameters.
// function params
c, err := memphis.Connect("<memphis-host>",
"<application type username>",
"<broker-token>",
Port(<int>),
Reconnect(<bool>),
MaxReconnect(<int>)
)
Once connected, all features offered by Memphis are available.

Disconnecting from Memphis

To disconnect from Memphis, call Close() on the Memphis connection object.
c.Close();

Creating a Factory

// c is of type memphis.Conn
f, err := c.CreateFactory("<factory-name>",
Description("<optional-description>")

Destroying a Factory

Destroying a factory will remove all its resources (including stations, producers, and consumers).
err := f.Destroy()

Creating a Station

Stations can be created from both Conn and Factory
Passing optional parameters using functions
s0, err = c.CreateStation("<station-name>","<factory-name>")
​
s1, err = c.CreateStation("<station-name>",
"<factory-name>",
RetentionTypeOpt(<Messages/MaxMeMessageAgeSeconds/Bytes>),
RetentionVal(<int>),
StorageTypeOpt(<Memory/File>),
Replicas(<int>),
EnableDedup(),
DedupWindow(<time.Duration>))
s2, err = f.CreateStation("<station-name>",
RetentionTypeOpt(<Messages/MaxMeMessageAgeSeconds/Bytes>),
RetentionVal(<retention-value>),
StorageTypeOpt(<Memory/File>),
Replicas(<intgo>),
EnableDedup(),
DedupWindow(<time.Duration>))

Retention Types

Memphis currently supports the following types of retention:
memphis.MaxMeMessageAgeSeconds
The above means that every message persists for the value set in the retention value field (in seconds).
memphis.Messages
The above means that after the maximum number of saved messages (set in retention value) has been reached, the oldest messages will be deleted.
memphis.Bytes
The above means that after maximum number of saved bytes (set in retention value) has been reached, the oldest messages will be deleted.

Storage Types

Memphis currently supports the following types of messages storage:
memphis.File
The above means that messages persist on the file system.
memphis.Memory
The above means that messages persist on the main memory.

Destroying a Station

Destroying a station will remove all its resources (including producers and consumers).
err := s.Destroy();

Produce and Consume Messages

The most common client operations are producing messages and consuming messages.
Messages are published to a station and consumed from it by creating a consumer and calling its Consume function with a message handler callback function. Consumers are pull-based and consume all the messages in a station unless you are using a consumers group, in which case messages are spread across all members in this group.
Memphis messages are payload agnostic. Payloads are byte slices, i.e []byte.
In order to stop receiving messages, you have to call consumer.StopConsume(). The consumer will terminate regardless of whether there are messages in flight for the client.

Creating a Producer

// from a Conn
p0, err := c.CreateProducer("<station-name>", "<producer-name>")
​
// from a Station
p1, err := s.CreateProducer("<producer-name>")

Producing a Message

p.Produce("<message in []byte>",
AckWaitSec(<ack time.Duration>)) // defaults to 15 seconds

Destroying a Producer

p.Destroy();

Creating a Consumer

// creation from a Station
consumer0, err = s.CreateConsumer("<consumer-name>",
ConsumerGroup("<consumer-group>"), // defaults to consumer name
PullInterval(<pull interval time.Duration), // defaults to 1 second
BatchSize(<batch-size int), // defaults to 10
BatchMaxWaitTime(<time.Duration>), // defaults to 5 seconds
MaxAckTime(<time.Duration>), // defaults to 30 sec
MaxMsgDeliveries(<int>)) // defaults to 10
// creation from a Conn
consumer1, err = c.CreateConsumer("<station-name>", "<consumer-name>", ...)
​

Processing Messages

First, create a callback function that receives a slice of pointers to memphis.Msg and an error.
Then, pass this callback into consumer.Consume function.
The consumer will try to fetch messages every pullInterval (that was given in Consumer's creation) and call the defined message handler.
func handler(msgs []*memphis.Msg, err error) {
if err != nil {
m := msgs[0]
fmt.Println(string(m.Data()))
m.Ack()
}
}
​
consumer.Consume(handler)
You can trigger a single fetch with the Fetch() method
msgs, err := consumer.Fetch()

Acknowledging a Message

Acknowledging a message indicates to the Memphis server to not re-send the same message again to the same consumer or consumers group.
message.Ack();

Destroying a Consumer

consumer.Destroy();
Copy link
Edit on GitHub
Outline
Installation
Importing
Connecting to Memphis
Disconnecting from Memphis
Creating a Factory
Destroying a Factory
Creating a Station
Retention Types
Storage Types
Destroying a Station
Produce and Consume Messages
Creating a Producer
Producing a Message
Destroying a Producer
Creating a Consumer
Processing Messages
Acknowledging a Message
Destroying a Consumer