Ask or search…
⌃K
Links
Comment on page

Avro

Introduction

​Avro is a row-oriented remote procedure call and data serialization framework developed within Apache's Hadoop project. It uses JSON for defining data types and protocols and serializes data in a compact binary format. Its primary use is in Apache Hadoop, where it can provide both a serialization format for persistent data, and a wire format for communication between Hadoop nodes, and from client programs to the Hadoop services. Avro uses a schema to structure the data that is being encoded. It has two schema languages; one for human editing (Avro IDL) and another more machine-readable based on JSON.

How to Produce/Consume a message

Node.js
Go
Python
TypeScript
.NET
Memphis abstracts the need for external serialization functions and embeds them within the SDK.
In node.js, we can simply produce an object. Behind the scenes, the object will be serialized based on the attached schema and data format.
Example schema:
1
{
2
"type": "record",
3
"namespace": "com.example",
4
"name": "contact_details",
5
"fields": [
6
{ "name": "username", "type": "string" },
7
{ "name": "age", "type": "int" }
8
]
9
}
Code:
1
const memphis = require("memphis-dev");
2
​
3
(async function () {
4
try {
5
await memphis.connect({
6
host: "MEMPHIS_BROKER_URL",
7
username: "APPLICATION_USER",
8
password: "PASSWORD",
9
// accountId: ACCOUNT_ID //*optional* In case you are using Memphis.dev cloud
10
});
11
const producer = await memphis.producer({
12
stationName: "STATION_NAME",
13
producerName: "PRODUCER_NAME"
14
});
15
var payload = {
16
username: "Daniel Craig",
17
age: 36
18
};
19
try {
20
await producer.produce({
21
message: payload
22
});
23
} catch (ex) {
24
console.log(ex.message)
25
}
26
} catch (ex) {
27
console.log(ex);
28
memphis.close();
29
}
30
})();
Memphis abstracts the need for external serialization functions and embeds them within the SDK.
Example schema:
1
{
2
"type": "record",
3
"namespace": "com.example",
4
"name": "contact_details",
5
"fields": [
6
{ "name": "username", "type": "string" },
7
{ "name": "age", "type": "double" }
8
]
9
}
Code:
1
package main
2
​
3
import (
4
"fmt"
5
"os"
6
"github.com/memphisdev/memphis.go"
7
)
8
​
9
func main() {
10
conn, err := memphis.Connect(
11
"MEMPHIS_BROKER_URL",
12
"APPLICATION_TYPE_USERNAME",
13
memphis.Password("PASSWORD"),
14
// memphis.AccountId(123456789), //*optional* In case you are using Memphis.dev cloud
15
)
16
if err != nil {
17
os.Exit(1)
18
}
19
defer conn.Close()
20
p, err := conn.CreateProducer("STATION_NAME", "PRODUCER_NAME")
21
​
22
hdrs := memphis.Headers{}
23
hdrs.New()
24
err = hdrs.Add("key", "value")
25
​
26
if err != nil {
27
fmt.Printf("Header failed: %v\n", err)
28
os.Exit(1)
29
}
30
type msgStruct struct {
31
Username string `avro:"username"`
32
Age int `avro:"age"`
33
}
34
msg := msgStruct{
35
Username: "Daniel Craig",
36
Age: 36,
37
}
38
​
39
err = p.Produce(msg, memphis.MsgHeaders(hdrs))
40
​
41
if err != nil {
42
fmt.Printf("Produce failed: %v\n", err)
43
os.Exit(1)
44
}
45
}
Memphis abstracts the need for external serialization functions and embeds them within the SDK.
Example schema:
1
{
2
"type": "record",
3
"namespace": "com.example",
4
"name": "contact_details",
5
"fields": [
6
{ "name": "username", "type": "string" },
7
{ "name": "age", "type": "int" }
8
]
9
}
Code:
1
import asyncio
2
import json
3
from memphis import Memphis, Headers, MemphisError, MemphisConnectError, MemphisSchemaError
4
​
5
async def main():
6
memphis = Memphis()
7
await memphis.connect(host="MEMPHIS_HOST", username="MEMPHIS_USERNAME", password="PASSWORD", account_id=ACCOUNT_ID)
8
producer = await memphis.producer(
9
station_name="STATION_NAME", producer_name="PRODUCER_NAME")
10
​
11
headers = Headers()
12
headers.add("key", "value")
13
​
14
msg = {'username': 'Daniel Craig', 'age': 36}
15
​
16
try:
17
await producer.produce(message=msg, headers=headers)
18
​
19
except Exception as e:
20
print(e)
21
finally:
22
await asyncio.sleep(3)
23
​
24
await memphis.close()
25
​
26
if __name__ == '__main__':
27
asyncio.run(main())
Memphis abstracts the need for external serialization functions and embeds them within the SDK.
Example schema:
1
{
2
"type": "record",
3
"namespace": "com.example",
4
"name": "contact_details",
5
"fields": [
6
{ "name": "username", "type": "string" },
7
{ "name": "age", "type": "int" }
8
]
9
}
Code:
1
import memphis from 'memphis-dev';
2
import type { Memphis } from 'memphis-dev/types';
3
​
4
(async function () {
5
let memphisConnection: Memphis;
6
​
7
try {
8
memphisConnection = await memphis.connect({
9
host: 'MEMPHIS_BROKER_URL',
10
username: 'APPLICATION_TYPE_USERNAME',
11
password: 'PASSWORD',
12
// accountId: ACCOUNT_ID //*optional* In case you are using Memphis.dev cloud
13
});
14
​
15
const producer = await memphisConnection.producer({
16
stationName: 'STATION_NAME',
17
producerName: 'PRODUCER_NAME'
18
});
19
​
20
const headers = memphis.headers()
21
headers.add('key', 'value');
22
var msg = {
23
username: "Daniel Craig",
24
age: 36
25
};
26
await producer.produce({
27
message: msg,
28
headers: headers
29
});
30
​
31
memphisConnection.close();
32
} catch (ex) {
33
console.log(ex);
34
}
35
})();
Memphis abstracts the need for external serialization functions and embeds them within the SDK.
Example schema:
1
{
2
"type": "record",
3
"namespace": "com.example",
4
"name": "contact_details",
5
"fields": [
6
{ "name": "username", "type": "string" },
7
{ "name": "age", "type": "int" }
8
]
9
}
Code:
1
using Memphis.Client;
2
using Memphis.Client.Producer;
3
​
4
using System.Runtime.Serialization;
5
using System.Collections.Specialized;
6
​
7
var options = MemphisClientFactory.GetDefaultOptions();
8
options.Host = "<memphis-host>";
9
options.Username = "<username>";
10
options.ConnectionToken = "<broker-token>";
11
/**
12
* In case you are using Memphis.dev cloud
13
* options.AccountId = "<account-id>";
14
*/
15
​
16
try
17
{
18
var client = await MemphisClientFactory.CreateClient(options);
19
​
20
var producer = await client.CreateProducer(new MemphisProducerOptions
21
{
22
StationName = "<memphis-station-name>",
23
ProducerName = "<memphis-producer-name>",
24
GenerateUniqueSuffix = true
25
});
26
​
27
NameValueCollection commonHeaders = new()
28
{
29
{
30
"key-1", "value-1"
31
}
32
};
33
​
34
ContactDetail contactDetail = new()
35
{
36
Username = "John Doe",
37
Age = 20
38
};
39
​
40
await producer.ProduceAsync(contactDetail, commonHeaders);
41
client.Dispose();
42
}
43
catch (Exception exception)
44
{
45
Console.WriteLine($"Error occurred: {exception.Message}");
46
}
47
​
48
​
49
public class ContactDetail
50
{
51
[DataMember(Name = "username")]
52
public string Username { get; set; }
53
​
54
[DataMember(Name = "age")]
55
public int Age { get; set; }
56
}
Last modified 3mo ago