訂閱以接收新文章的通知:

DO it again: how we used Durable Objects to add WebSockets support and authentication to AI Gateway

2024-11-19

閱讀時間:5 分鐘
本貼文還提供以下語言版本:English

In October 2024, we talked about storing billions of logs from your AI application using AI Gateway, and how we used Cloudflare’s Developer Platform to do this. 

With AI Gateway already processing over 3 billion logs and experiencing rapid growth, the number of connections to the platform continues to increase steadily. To help developers manage this scale more effectively, we wanted to offer an alternative to implementing HTTP/2 keep-alive to maintain persistent HTTP(S) connections, thereby avoiding the overhead of repeated handshakes and TLS negotiations with each new HTTP connection to AI Gateway. We understand that implementing HTTP/2 can present challenges, particularly when many libraries and tools may not support it by default and most modern programming languages have well-established WebSocket libraries available.

With this in mind, we used Cloudflare’s Developer Platform and Durable Objects (yes, again!) to build a WebSockets API that establishes a single, persistent connection, enabling continuous communication. 

Through this API, all AI providers supported by AI Gateway can be accessed via WebSocket, allowing you to maintain a single TCP connection between your client or server application and the AI Gateway. The best part? Even if your chosen provider doesn’t support WebSockets, we handle it for you, managing the requests to your preferred AI provider.

BLOG-2617 2

By connecting via WebSocket to AI Gateway, we make the requests to the inference service for you using the provider’s supported protocols (HTTPS, WebSocket, etc.), and you can keep the connection open to execute as many inference requests as you would like. 

To make your connection to AI Gateway more secure, we are also introducing authentication for AI Gateway. The new WebSockets API will require authentication. All you need to do is create a Cloudflare API token with the permission “AI Gateway: Run” and send that in the cf-aig-authorization header.

BLOG-2617 3

In the flow diagram above:

1️⃣ When Authenticated Gateway is enabled and a valid token is included, requests will pass successfully.

2️⃣ If Authenticated Gateway is enabled, but a request does not contain the required cf-aig-authorization header with a valid token, the request will fail. This ensures only verified requests pass through the gateway. 

3️⃣ When Authenticated Gateway is disabled, the cf-aig-authorization header is bypassed entirely, and any token — whether valid or invalid — is ignored.

How we built it

We recently used Durable Objects (DOs) to scale our logging solution for AI Gateway, so using WebSockets within the same DOs was a natural fit.

When a new WebSocket connection is received by our Cloudflare Workers, we implement authentication in two ways to support the diverse capabilities of WebSocket clients. The primary method involves validating a Cloudflare API token through the cf-aig-authorization header, ensuring the token is valid for the connecting account and gateway. 

However, due to limitations in browser WebSocket implementations, we also support authentication via the “sec-websocket-protocol” header. Browser WebSocket clients don't allow for custom headers in their standard API, complicating the addition of authentication tokens in requests. While we don’t recommend that you store API keys in a browser, we decided to add this method to add more flexibility to all WebSocket clients.

// Built-in WebSocket client in browsers
const socket = new WebSocket("wss://gateway.ai.cloudflare.com/v1/my-account-id/my-gateway/", [
   "cf-aig-authorization.${AI_GATEWAY_TOKEN}"
]);


// ws npm package
import WebSocket from "ws";
const ws = new WebSocket("wss://gateway.ai.cloudflare.com/v1/my-account-id/my-gateway/",{
   headers: {
       "cf-aig-authorization": "Bearer AI_GATEWAY_TOKEN",
   },
});

After this initial verification step, we upgrade the connection to the Durable Object, meaning that it will now handle all the messages for the connection. Before the new connection is fully accepted, we generate a random UUID, so this connection is identifiable among all the messages received by the Durable Object. During an open connection, any AI Gateway settings passed via headers — such as cf-aig-skip-cache (which bypasses caching when set to true) — are stored and applied to all requests in the session. However, these headers can still be overridden on a per-request basis, just like with the Universal Endpoint today.

How it works

Once the connection is established, the Durable Object begins listening for incoming messages. From this point on, users can send messages in the AI Gateway universal format via WebSocket, simplifying the transition of your application from an existing HTTP setup to WebSockets-based communication.

import WebSocket from "ws";
const ws = new WebSocket("wss://gateway.ai.cloudflare.com/v1/my-account-id/my-gateway/",{
   headers: {
       "cf-aig-authorization": "Bearer AI_GATEWAY_TOKEN",
   },
});

ws.send(JSON.stringify({
   type: "universal.create",
   request: {
      "eventId": "my-request",
      "provider": "workers-ai",
      "endpoint": "@cf/meta/llama-3.1-8b-instruct",
      "headers": {
         "Authorization": "Bearer WORKERS_AI_TOKEN",
         "Content-Type": "application/json"
      },
      "query": {
         "prompt": "tell me a joke"
      }
   }
}));

ws.on("message", function incoming(message) {
   console.log(message.toString())
});

When a new message reaches the Durable Object, it’s processed using the same code that powers the HTTP Universal Endpoint, enabling seamless code reuse across Workers and Durable Objects — one of the key benefits of building on Cloudflare.

For non-streaming requests, the response is wrapped in a JSON envelope, allowing us to include additional information beyond the AI inference itself, such as the AI Gateway log ID for that request.

Here’s an example response for the request above:

{
  "type":"universal.created",
  "metadata":{
     "cacheStatus":"MISS",
     "eventId":"my-request",
     "logId":"01JC3R94FRD97JBCBX3S0ZAXKW",
     "step":"0",
     "contentType":"application/json"
  },
  "response":{
     "result":{
        "response":"Why was the math book sad? Because it had too many problems. Would you like to hear another one?"
     },
     "success":true,
     "errors":[],
     "messages":[]
  }
}

For streaming requests, AI Gateway sends an initial message with request metadata telling the developer the stream is starting.

{
  "type":"universal.created",
  "metadata":{
     "cacheStatus":"MISS",
     "eventId":"my-request",
     "logId":"01JC40RB3NGBE5XFRZGBN07572",
     "step":"0",
     "contentType":"text/event-stream"
  }
}

After this initial message, all streaming chunks are relayed in real-time to the WebSocket connection as they arrive from the inference provider. Note that only the eventId field is included in the metadata for these streaming chunks (more info on what this new field is below).

{
  "type":"universal.stream",
  "metadata":{
     "eventId":"my-request",
  }
  "response":{
     "response":"would"
  }
}

This approach serves two purposes: first, all request metadata is already provided in the initial message. Second, it addresses the concurrency challenge of handling multiple streaming requests simultaneously.

Handling asynchronous events

With WebSocket connections, client and server can send messages asynchronously at any time. This means the client doesn’t need to wait for a server response before sending another message. But what happens if a client sends multiple streaming inference requests immediately after the WebSocket connection opens?

In this case, the server streams all the inference responses simultaneously to the client. Since everything occurs asynchronously, the client has no built-in way to identify which response corresponds to each request.

To address this, we introduced a new field in the Universal format called eventId, which allows AI Gateway to include a client-defined ID with each message, even in a streaming WebSocket environment.

So, to fully answer the question above: the server streams both responses in parallel chunks, and the client can accurately identify which request each message belongs to based on the eventId.

Once all chunks for a request have been streamed, AI Gateway sends a final message to signal the request’s completion. For added flexibility, this message includes all the metadata again, even though it was also provided at the start of the streaming process.

{
  "type":"universal.done",
  "metadata":{
     "cacheStatus":"MISS",
     "eventId":"my-request",
     "logId":"01JC40RB3NGBE5XFRZGBN07572",
     "step":"0",
     "contentType":"text/event-stream"
  }
}

Try it out today

AI Gateway’s real-time Websocket API is now in beta and open to everyone!

To try it out, copy your gateway universal endpoint URL, and replace the “https://” with “wss://”, like this:

wss://gateway.ai.cloudflare.com/v1/my-account-id/my-gateway/

BLOG-2617 4

Then open a WebSocket connection using your Universal Endpoint, and guarantee that it is authenticated with a Cloudflare token with the AI Gateway Run permission.

Here’s an example code using the ws npm package:

import WebSocket from "ws";
const ws = new WebSocket("wss://gateway.ai.cloudflare.com/v1/my-account-id/my-gateway/", {
   headers: {
       "cf-aig-authorization": "Bearer AI_GATEWAY_TOKEN",
   },
});

ws.on("open", function open() {
   console.log("Connected to server.");
   ws.send(JSON.stringify({
      type: "universal.create",
      request: {
         "provider": "workers-ai",
         "endpoint": "@cf/meta/llama-3.1-8b-instruct",
         "headers": {
            "Authorization": "Bearer WORKERS_AI_TOKEN",
            "Content-Type": "application/json"
         },
         "query": {
            "stream": true,
            "prompt": "tell me a joke"
         }
      }
   }));
});


ws.on("message", function incoming(message) {
   console.log(message.toString())
});

Here’s an example code using the built-in browser WebSocket client:

const socket = new WebSocket("wss://gateway.ai.cloudflare.com/v1/my-account-id/my-gateway/", [
   "cf-aig-authorization.${AI_GATEWAY_TOKEN}"
]);

socket.addEventListener("open", (event) => {
  console.log("Connected to server.");
   socket.send(JSON.stringify({
      type: "universal.create",
      request: {
         "provider": "workers-ai",
         "endpoint": "@cf/meta/llama-3.1-8b-instruct",
         "headers": {
            "Authorization": "Bearer WORKERS_AI_TOKEN",
            "Content-Type": "application/json"
         },
         "query": {
            "stream": true,
            "prompt": "tell me a joke"
         }
      }
   }));
});

socket.addEventListener("message", (event) => {
  console.log(event.data);
});

And we will DO it again

In Q1 2025, we plan to support WebSocket-to-WebSocket connections (using DOs), allowing you to connect to OpenAI's new real-time API directly through our platform. In the meantime, you can deploy this Worker in your account to proxy the requests yourself.

If you have any questions, reach out on our Discord channel. We’re also hiring for AI Gateway, check out Cloudflare Jobs in Lisbon!

我們保護整個企業網路,協助客戶有效地建置網際網路規模的應用程式,加速任何網站或網際網路應用程式抵禦 DDoS 攻擊,阻止駭客入侵,並且可以協助您實現 Zero Trust

從任何裝置造訪 1.1.1.1,即可開始使用我們的免費應用程式,讓您的網際網路更快速、更安全。

若要進一步瞭解我們協助打造更好的網際網路的使命,請從這裡開始。如果您正在尋找新的職業方向,請查看我們的職缺
AIAI Gateway開發人員Developer PlatformJavaScript

在 X 上進行關注

Cloudflare|@cloudflare

相關貼文