Subscribe to receive notifications of new posts:

Announcing Pub/Sub: Programmable MQTT-based Messaging

2022-05-12

4 min read
This post is also available in 简体中文, Español (Espaňa) and 日本語.

This post is also available in 简体中文, 日本語, Español.

Announcing Pub/Sub: Programmable MQTT-based Messaging

One of the underlying questions that drives Platform Week is “how do we enable developers to build full stack applications on Cloudflare?”. With Workers as a serverless environment for easily deploying distributed-by-default applications, KV and Durable Objects for caching and coordination, and R2 as our zero-egress cost object store, we’ve continued to discuss what else we need to build to help developers both build new apps and/or bring existing ones over to Cloudflare’s Developer Platform.

With that in mind, we’re excited to announce the private beta of Cloudflare Pub/Sub, a programmable message bus built on the ubiquitous and industry-standard MQTT protocol supported by tens of millions of existing devices today.

In a nutshell, Pub/Sub allows you to:

  • Publish event, telemetry or sensor data from any MQTT capable client (and in the future, other client-facing protocols)

  • Write code that can filter, aggregate and/or modify messages as they’re published to the broker using Cloudflare Workers, and before they’re distributed to subscribers, without the need to ferry messages to a single “cloud region”

  • Push events from applications in other clouds, or from on-prem, with Pub/Sub acting as a programmable event router or a hook into persistent data storage (such as R2 or KV)

  • Move logic out of the client, where it can be hard (or risky!) to push updates, or where running code on devices raises the materials cost (CPU, memory), while still keeping latency as low as possible (your code runs in every location).

And there’s likely a long list of things we haven’t even predicted yet. We’ve seen developers build incredible things on top of Cloudflare Workers, and we’re excited to see what they build with the power of a programmable message bus like Pub/Sub, too.

Why, and what is, MQTT?

If you haven’t heard of MQTT before, you might be surprised to know that it’s one of the most pervasive “messaging protocols” deployed today. There are tens of millions (at least!) of devices that speak MQTT today, from connected payment terminals through to autonomous vehicles, cell phones, and even video games. Sensor readings, telemetry, financial transactions and/or mobile notifications & messages are all common use-cases for MQTT, and the flexibility of the protocol allows developers to make trade-offs around reliability, topic hierarchy and persistence specific to their use-case.

We chose MQTT as the foundation for Cloudflare Pub/Sub as we believe in building on top of open, accessible standards, as we did when we chose the Service Worker API as the foundation for Workers, and with our recently announced participation in the Winter Community Group around server-side runtime APIs. We also wanted to enable existing clients an easy path to benefit from Cloudflare’s scale and programmability, and ensure that developers have a rich ecosystem of client libraries in languages they’re familiar with today.

Beyond that, however, we also think MQTT meets the needs of a modern “publish-subscribe” messaging service. It has flexible delivery guarantees, TLS for transport encryption (no bespoke crypto!), a scalable topic creation and subscription model, extensible per-message metadata, and importantly, it provides a well-defined specification with clear error messages.

With that in mind, we expect to support many more “on-ramps” to Pub/Sub: a lot of the best parts of MQTT can be abstracted away from clients who might want to talk to us over HTTP or WebSockets.

Building Blocks

Given the ability to write code that acts on every message published to a Pub/Sub Broker, what does it look like in practice?

Here’s a simple-but-illustrative example of handling Pub/Sub messages directly in a Worker. We have clients (in this case, payment terminals) reporting back transaction data, and we want to capture the number of transactions processed in each region, so we can track transaction volumes over time.

Specifically, we:

  1. Filter on a specific topic prefix for messages we care about

  2. Parse the message for a specific key:value pair as a metric

  3. Write that metric directly into Workers Analytics Engine, our new serverless time-series analytics service, so we can directly query it with GraphQL.

This saves us having to stand up and maintain an external metrics service, configure another cloud service, or think about how it will scale: we can do it all directly on Cloudflare.


async function pubsub(
  messages: Array<PubSubMessage>,
  env: any,
  ctx: ExecutionContext
): Promise<Array<PubSubMessage>> {
  
  for (let msg of messages) {
    // Extract a value from the payload and write it to Analytics Engine
    // In this example, a transactionsProcessed counter that our clients are sending
    // back to us.
    if (msg.topic.startsWith(“/transactions/”)) {
      // This is non-blocking, and doesn’t hold up our message
      // processing.
      env.TELEMETRY.writeDataPoint({
        // We label this metric so that we can query against these labels
        labels: [`${msg.broker}.${msg.namespace}`, msg.payload.region, msg.payload.merchantId],
        metrics: [msg.payload.transactionsProcessed ?? 0]
      });
    }
  }

  // Return our messages back to the Broker
  return messages;
}

const worker = {
  async fetch(req: Request, env: any, ctx: ExecutionContext) {
    // Critical: you must validate the incoming request is from your Broker
    // In the future, Workers will be able to do this on your behalf for Workers
    // in the same account as your Pub/Sub Broker.
    if (await isValidBrokerRequest(req)) {

      // Parse the incoming PubSub messages
      let incomingMessages: Array<PubSubMessage> = await req.json();
      
      // Pass the message to our pubsub handler, and capture the returned
      // messages
      let outgoingMessages = await pubsub(incomingMessages, env, ctx);

      // Re-serialize the messages and return a HTTP 200 so our Broker
      // knows we’ve successfully handled them
      return new Response(JSON.stringify(outgoingMessages), { status: 200 });
    }

    return new Response("not a valid Broker request", { status: 403 });
  },
};

export default worker;

We can then query these metrics directly using a familiar language: SQL. Our query takes the metrics we’ve written and gives us a breakdown of transactions processed by our payment devices, grouped by merchant (and again, all on Cloudflare):

SELECT
  label_2 as region,
  label_3 as merchantId,
  sum(metric_1) as total_transactions
FROM TELEMETRY
WHERE
  metric_1 > 0
  AND timestamp >= now() - 604800
GROUP BY
  region,
  merchantId
ORDER BY
  total_transactions DESC
LIMIT 10

You could replace or augment the calls to Analytics Engine with any number of examples:

  • Asynchronously writing messages (using ctx.waitUntil) on specific topics to our R2 object storage without blocking message delivery

  • Rewriting messages on-the-fly with data populated from KV, before the message is pushed to subscribers

  • Aggregate messages based on their payload and HTTP POST them to legacy infrastructure hosted outside of Cloudflare

Pub/Sub gives you a way to get data into Cloudflare’s network, filter, aggregate and/or mutate it, and push it back out to subscribers — whether there’s 10, 1,000 or 10,000 of them listening on that topic.

Where are we headed?

As we often like to say: we’re just getting started. The private beta for Pub/Sub is just the beginning of our journey, and we have a long list of capabilities we’re already working on.

Critically, one of our priorities is to cover as much of the MQTT v5.0 specification as we can, so that customers can migrate existing deployments and have it “just work”. Useful capabilities like shared subscriptions that allow you to load-balance messages across many subscribers; wildcard subscriptions (both single- and multi-tier) for aggregation use cases, stronger delivery guarantees (QoS), and support for additional authentication modes (specifically, Mutual TLS) are just a few of the things we’re working on.

Beyond that, we’re focused on making sure Pub/Sub’s developer experience is the best it  can be, and during the beta we’ll be:

  • Supporting a new set of “pubsub” sub-commands in Wrangler, our developer CLI, so that getting started is as low-friction as possible

  • Building ‘native’ bindings (similar to how Workers KV operates) that allow you to publish messages and subscribe to topics directly from Worker code, regardless of whether the message originates from (or is destined for) a client beyond Cloudflare

  • Exploring more ways to publish & subscribe from non-MQTT based clients, including HTTP requests and WebSockets, so that integrating existing code is even easier.

Our developer documentation will cover these capabilities as we land them.

We’re also aware that pricing is a huge part of developer experience, and are committed to ensuring that there is an accessible and flexible free tier. We want to enable developers to experiment, prototype and solve problems we haven’t thought of yet. We’ll be sharing more on pricing during the course of the beta.

Getting Started

If you want to start using Pub/Sub, sign up for the private beta: we plan to start enabling access within the next month. We’re looking forward to collecting feedback from developers and seeing what folks start to build.

In the meantime, review the brand-new Pub/Sub developer documentation to understand how Pub/Sub works under the hood, the MQTT protocol, and how it integrates with Cloudflare Workers.

Cloudflare's connectivity cloud protects entire corporate networks, helps customers build Internet-scale applications efficiently, accelerates any website or Internet application, wards off DDoS attacks, keeps hackers at bay, and can help you on your journey to Zero Trust.

Visit 1.1.1.1 from any device to get started with our free app that makes your Internet faster and safer.

To learn more about our mission to help build a better Internet, start here. If you're looking for a new career direction, check out our open positions.
Platform WeekIoT

Follow on X

Matt Silverlock|@elithrar
Cloudflare|@cloudflare

Related posts