Skip to main content

Using Streamr in any language

This tutorial will show you how to publish data into the Streamr Network from inside your app by interfacing with a Streamr node that you run using the MQTT protocol. MQTT client libraries are available for a huge variety of programming languages but you can also use WebSockets or HTTP if you prefer.

Prerequisites:

  • NPM v8 or greater
  • NodeJS 18.x or greater (Ideally v20+)
  • MacOS/Linux environments (Windows environments may require minor adjustments)
  • A small amount of MATIC to pay for gas on Polygon mainnet. You can reachout to us on the #dev channel of Discord for some tokens.
  • A MQTT library of your choice (this tutorial uses MQTT.js)

If you have a Helium setup, you may benefit from reading this blog post first, Helium x Streamr

Install & run the Streamr node.

You'll need to run a Streamr node to connect your app to.

$ npm i -g @streamr/node

Before the Streamr node can be started, its configuration files need to be created using the following command:

$ streamr-node-init

During initiziliation make sure to enable the mqtt-plugin and assign a port to it (default is 1883). Other plugins are unnecessary. For more in depth information on installing a Streamr node, see the guide on running a Streamr node.

image

Once the init script is complete you may view your generated configuration file with:

$ cat ~/.streamr/config/default.json

Start the Streamr node

$ streamr-node

The node's address (its public key) is displayed when the Streamr node is started. Record this as the BrokerNodeAddress, it's needed in the next step!

TODO: Don't include the session key extension.

Configure your stream with the Streamr SDK

Create a folder cd into it and create a package.json by running

$ npm init

The Streamr SDK is available on NPM and can be installed simply with:

$ npm install @streamr/sdk

Having trouble installing the client? Maybe our troubleshooting section will help.

note

Make sure the PRIVATE_KEY you add has a small amount of MATIC (the native token of the Polygon blockchain) in its wallet to pay for gas to create the stream and make the permission assignment.

TODO: Create a stream widget

// Import the Streamr client
import { StreamrClient } from "@streamr/sdk"

const PRIVATE_KEY = ""

// Initialize Streamr with an Ethereum account
const streamr = new StreamrClient({
auth: {
privateKey: PRIVATE_KEY,
},
})

const stream = await streamr.getOrCreateStream({
id: "/sensor/firehose",
})

Assign permission to your Streamr node

The Streamr node needs permission to publish data to your stream. We will be granting the Streamr node PUBLISH and SUBSCRIBE permissions on the stream we just created. This step will consume a small amount of MATIC tokens.

note

Take care to not confuse stream with streamr ;)

await stream.grantPermissions({
user: BrokerNodeAddress,
permissions: [StreamPermission.PUBLISH, StreamPermission.SUBSCRIBE],
})

Push data

In this step we configure the MQTT client to connect and push data to the running Streamr node, which will take care of the rest.

Authorize your MQTT client to connect with the Streamr node

Before your app or device can push data to the Streamr node, we will need to provide the ApiKey from the Streamr node's configuration file. This key can be found here:

$ cat ~/.streamr/config/default.json

{
...
"apiAuthentication": {
"keys": [
"ImTheKeyYouAreLookingFor"
]
}
}

The following instructions are specific to your choice of MQTT library- this tutorial uses MQTT.js. There are many valid alternatives including, async-mqtt.

In the code sample below, we provide the URL (the IP or address to your Streamr node) along with the MQTT port (the default is 1883). To authenticate, use an empty username field and enter the ApiKey as the password.

// Node.js example

const mqttClient = mqtt.connect("mqtt://localhost:1883", {
username: "",
password: ApiKey,
})
note

For URL authentication, for example mqtt://"":ApiKey@1.2.3.4:1883. Some MQTT libraries may have issue with an empty username, to get around this you can provide "x" as the username.

note

If you're connecting to the MQTT interface over the open internet, please remember to make sure the port is open. Technical information about the plugin interface can be found in the Streamr node plugin docs.

Start pushing data

With your Streamr node running and your MQTT client configured correctly, the final remaining step is to start pushing the data. You will push data by providing a StreamId as the first parameter and the JSON payload as the second parameter. The stream ID contains the ethereum addres, i.e. 0x123/sensor/firehose.

info

Push valid JSON! Invalid JSON may silently fail so be sure to run your payload through a JSON validator to double check.

// Node.js example
const StreamId = stream.id

await mqttClient.publish(StreamId, JSON.stringify({ foo: bar }))

If everything has been configured correctly so far then the data should now be flowing to the Streamr node, which will receive and sign the data, then publish it to the to the Streamr Network stream.

At this point, you could use the Streamr CLI tool to subscribe to stream and observe the message flow:

$ npm install -g @streamr/cli-tools
$ streamr stream subscribe 0x.../sensor/firehose --private-key YOUR_PRIVATE_KEY

** Note, if the stream is publicly subscribale, then you can omit the private key flag.

Bonus: Subscribe to streams

Just like you used the Streamr node's MQTT interface to publish data into the network, you can also pull data out of the Network via any Streamr nodes interface just as well using the same pattern described in the above steps. Just make sure that the Streamr node has SUBSCRIBE permission to the stream you are interested in.

Once connected, you can listen for the message callback on the MQTT client. The first parameter will be the StreamId and the second parameter will contain the message JSON payload:

// NodeJS example

mqttClient.subscribe(StreamId)
...
mqttClient.on('connect', () => {
mqttClient.on('message', (StreamId, rawData) => {
...
})
})

Troubleshooting

The most common issues are:

  • TODO

To include more verbose logging you could run the Streamr node with these additional flags:

$ LOG_LEVEL=trace DEBUG=Streamr* streamr-node

All done 🎉

Congratulations! You accomplished:

  • Running a Streamr node on the Streamr Network
  • Created a stream and modified its access control
  • Published data to the Streamr Network using the MQTT interface of your running Streamr node
  • Subscribed to flowing data on the Streamr Network using the MQTT interface of your running Streamr node

If you had any problems along the way, please drop a message to the core team on the #dev channel of our Discord.