Connect to a Broker node
The Broker node ships with interface plugins, which can be used to publish and subscribe to data from applications over off-the-shelf protocols HTTP, WebSocket, and MQTT. For now, publishing and subscribing is available through these plugins however features such as resends are unavailable at this time.
Configuration
The plugins are enabled and configured in the Broker config file. To generate a config file and enable the plugins you need, you can use the Broker's interactive config wizard.
Authentication
The plugins expose ports and API endpoints which can be used to publish and subscribe to data using the identity of the Broker node. You will want to secure those ports, either by setting up a firewall and restricting access to the ports based on IP, or configuring API keys that only allow access if the API key is provided.
The API keys can be configured in the Broker config file, in a top-level field apiAuthentication
:
{
"network": ...
"plugins": ...
"apiAuthentication": {
"keys": ["my-very-secret-key"]
}
}
By knowing any of the correct API keys, the applications are granted access to the node. The API keys are passed by the application in slightly different ways depending on which protocol is used.
Note that the API keys grant access to publishing and subscribing via your node, while the node's private key grants access to your node's identity and assets. Never confuse API keys and the node's private key, and never send the private key over the internet.
How to pass the API key depends on the protocol in question and is described in the sections below.
Ports
The integration plugins open TCP server ports to allow applications to connect to them. The ports need to be reachable by those applications, meaning that you may need to allow the port in your firewall and potentially set up appropriate port forwarding in your router. The port number is configurable for each plugin (see below for details).
Note that the Streamr protocol itself (used for communication between nodes) does not require any ports to be opened.
WebSocket
The WebSocket plugin provides a WebSocket interface for publishing and subscribing.
To enable the WebSocket plugin, define a websocket
object in the plugins
section of the Broker configuration file:
plugins: {
"websocket": {}
}
You can subscribe by creating a standard JavaScript WebSocket connection:
const socket = new WebSocket(
`ws://localhost:${port}/streams/${encodeURIComponent(streamId)}/subscribe`
);
socket.addEventListener('message', (message) => {
console.log(JSON.parse(message.data));
});
And publish to a stream similarly:
const socket = new WebSocket(
`ws://localhost:${port}/streams/${encodeURIComponent(streamId)}/publish`
);
socket.addEventListener('open', () => {
socket.send(JSON.stringify(message));
});
Passing the API key
Pass the API key by adding an apiKey
query parameter to the connection URL:
const socket = new WebSocket(`...?apiKey=my-secret-api-key`)
Advanced usage
Port
The default WebSocket port is 7170
. You can change it by specifying a port
value:
plugins: {
"websocket": {
"port": 1234
}
}
Explicit metadata
By default the payload is the plain content of a stream message.
If you want to provide some metadata for messages (e.g. explicit timestamps), set the payloadMetadata
option to true
:
plugins: {
"websocket": {
...
"payloadMetadata": true
}
}
And restructure the payload to contain content
and metadata
fields:
const content = { foo: 'bar' };
const payload = {
content,
metadata: {
timestamp: 1234567890000,
},
};
socket.send(JSON.stringify(payload));
The configuration option affects also the incoming messages. Those messages will be derived in the same restructured format.
Partitions
If you want to publish or subscribe to a specific partition of a stream, you can add query parameters to the URL:
/streams/:streamId/subscribe?partitions=0,2,5
: subscribes to given partition numbers/streams/:streamId/publish?partition=2
: publishes to given partition number/streams/:streamId/publish?partitionKey=foo
: use the given key to calculate the partition number, see stream partitioning)/streams/:streamId/publish?partitionKeyField=customerId
: use the given field in a JSON to choose theparitionKey
(e.g.{ "customerId": "foo", ... }
->paritionKey
isfoo
)
By default, a random partition is selected.
Secure connections
To support a SSL/TLS connections, define a SSL certificate in the Broker config:
"sslCertificate": {
"certFileName": "path/cert.pem",
"privateKeyFileName": "path/key.pem"
}
And change the connection url to use wss://
instead of ws://
:
const socket = new WebSocket(`wss://...`);
Note: self-signed certificates don't work well in browser environments (the connection may not open at all). In Node environment self-signed certificates can be trusted by setting the an environment variable NODE_TLS_REJECT_UNAUTHORIZED=0
. If possible, please obtain an authorized certificate, e.g. from Let's Encrypt.
MQTT
You can publish and subscribe to a stream using MQTT, making the Broker appear like a traditional MQTT broker towards connected applications and devices. To enable the MQTT plugin, define an mqtt
object in the plugins
section of the Broker configuration file:
plugins: {
"mqtt": {}
}
You can use any MQTT client to connect to the Broker. Here's an example of subscribing to a stream with the async-mqtt library:
import mqtt from 'async-mqtt';
const client = await mqtt.connectAsync(`mqtt://localhost:${port}`);
client.on('message', (topic, message) => {
console.log(JSON.parse(message.toString()));
});
await client.subscribe(streamId);
Publishing data with the same library:
import mqtt from 'async-mqtt';
const client = await mqtt.connectAsync(`mqtt://localhost:${port}`);
await client.publish(streamId, JSON.stringify(msg));
Passing the API key
The authentication scheme of the MQTT protocol uses a username and password. When connecting to the MQTT plugin of Streamr Broker, you can provide anything you want as the username and the API key as the password:
mqtt.connectAsync(`mqtt://localhost:${port}`, {
username: 'any-username',
password: apiKey,
});
Some MQTT clients expect the username and password to be passed in the connection URL:
mqtt://any-username:my-secret-api-key@localhost:1883
Advanced usage
Port
The default port is 1883
. You can change it with port
config option.
Explicit metadata
Explicit metadata can be provided the same way it is provided to the WebSocket plugin.
Topic domains
By default each MQTT topic
matches a streamId
. If you want to simplify the client usage, you can specify a domain, which is prefixed to a topic to make a corresponding streamId
:
plugins: {
"mqtt": {
...
"streamIdDomain": "0x1234567890123456789012345678901234567890" // or "mydomain.eth"
}
}
This way you can publish and subscribe to a stream by using only the path part of a streamId
await client.publish('path-part', ...) // publishes to a stream "0x1234567890123456789012345678901234567890/path-part"
HTTP
At the moment, only publishing is supported over HTTP. To subscribe, use one of the other protocol plugins as they allow a continuous streaming connection.
To publish over HTTP, enable the http
plugin:
plugins: {
"http": {}
}
The plugin provides a single endpoint: /streams/:streamId
.
Note that the streamId
is part of the URL and may contain slashes which need to be URL-encoded (/
becomes %2f
).
To publish a message to a stream, send the data as a POST payload:
curl \
--header 'Content-Type: application/json' \
--data '{"foo":"bar"}' \
http://localhost:7171/streams/foo.eth%2fbar
The endpoint returns HTTP 200 status if the message was published successfully.
Passing the API key
Pass the API key in the Authorization
header, with content bearer <key>
, for example
Authorization: bearer my-secret-api-key
A curl example:
curl \
--header 'Content-Type: application/json' \
--header 'Authorization: bearer my-secret-api-key' \
--data '{"foo":"bar"}' \
http://localhost:7171/streams/foo.eth%2fbar
Advanced usage
Explicit metadata
The endpoint supports the following optional query parameters:
timestamp
can be used to set the message timestamp explicitly. The timestamp should be passed in ISO-8601 string (e.g.2001-02-03T04:05:06Z
), or as milliseconds since epoch, e.g.1234567890000
partition
(explicit partition number) orpartitionKey
(a string which used to calculate the partition number, see stream partitioning). The default (in case neither is provided) is to select a random partition for each message.
Port
The default HTTP server port is 7171
. You can change it by specifying a port
value for the root level httpServer
option:
"network": ...
"plugins": ...
"httpServer": {
"port": 1234
}
Secure connections
If you provide a SSL certificate it will support use SSL/TLS connections:
"httpServer": {
...
"certFileName": "path/cert.pem",
"privateKeyFileName": "path/key.pem"
}