DEV Community

Cover image for Guide to Real-Time Data Stream APIs
Adrian Machado for Zuplo

Posted on • Originally published at zuplo.com

Guide to Real-Time Data Stream APIs

Real-time data streams deliver the sub-second responsiveness that modern applications demand. While batch processing handles data in chunks, real-time processing transforms information instantly, creating experiences that feel truly alive. This isn't just technical preference—it's business critical.

From financial fraud detection to manufacturing optimization, social media feeds to multiplayer gaming, real-time data streams have revolutionized how businesses operate. These systems require specialized API approaches to overcome unique challenges: crushing latency demands, managing massive throughput, and seamless scaling under fluctuating loads.

Let's take a look at how to build and document APIs that make real-time data streams both powerful and accessible for developers who need immediate responsiveness in their applications.

Mastering the Fundamentals: Core Concepts That Power Real-Time APIs

Before diving into implementation details, understanding the foundational concepts behind real-time data streaming will ensure your API architecture stands on solid ground.

When designing an API for real-time data streams, you're creating a high-performance data highway that handles incredible speeds without compromising reliability. The architecture patterns, data formats, and connection protocols you choose form the backbone of your entire system.

Core Streaming Architecture Patterns

These battle-tested approaches define how data flows through your system:

  • Publish-Subscribe (Pub/Sub): Publishers send events to topics without caring who's listening, while subscribers only receive the data they've requested. This pattern excels for dashboards requiring fresh data or notification systems that must capture every event.
  • Event Sourcing: Rather than just recording current state, event sourcing saves every change as an immutable sequence. This creates a complete historical record perfect for audit trails and time-travel debugging capabilities.
  • Command Query Responsibility Segregation (CQRS): By splitting read and write paths, CQRS optimizes each for its specific purpose. Write operations focus on consistency while read operations prioritize speed—crucial for real-time data delivery.

Data Formats Optimized for Streaming

Your format choice significantly impacts performance:

  • Avro: This binary format includes schema definitions with the data, handling evolving schemas elegantly. Avro pairs exceptionally well with Kafka for efficient, compact streaming.
  • Protocol Buffers (Protobuf): Google's binary format delivers unmatched speed and minimal size. When latency is your primary concern, Protobuf offers the smallest payload and fastest serialization available.
  • JSON: While less efficient than binary formats, JSON's human-readability and universal support make it valuable for debugging and web client integration. Just be prepared for the performance trade-off.

Connection Protocols for Real-Time Data Streams

These protocols determine how clients stay connected to your data stream:

WebSockets: Creating persistent, full-duplex channels over a single TCP connection, WebSockets excel for applications requiring two-way communication like chat or collaborative tools.

Server-Sent Events (SSE): Perfect for one-way server-to-client updates, SSE offers simplicity and broad browser support for news feeds, stock tickers, and similar applications.

WebRTC: Enabling direct client-to-client communication, WebRTC eliminates the server middleman for peer-to-peer data streaming applications.

Utilizing a hosted API gateway can simplify the management of these protocols, providing benefits such as scalability, security, and ease of deployment.

Stateful vs. Stateless Processing

This fundamental choice affects how your system handles data context:

  • Stateless Processing: Processing each data piece in isolation allows horizontal scaling and simple failure recovery but limits analytical capabilities.
  • Stateful Processing: Maintaining context across multiple events enables windowed aggregations, cross-stream joins, and pattern detection, though it adds complexity to scaling and recovery.

Event Time vs. Processing Time

Time concepts create critical distinctions in real-time systems:

  • Event Time: When events actually occurred at the source—essential for accurate analytics but challenging with out-of-order arrivals and delayed data.
  • Processing Time: When your system processes the event—simpler to implement but potentially misleading when events arrive with varying delays.

With these foundational concepts clarified, you're equipped to make architecture choices that balance performance, reliability, and developer experience in your real-time data stream APIs.

Crafting Developer-Friendly Experiences: Designing Your Real-Time API

Real Time Documentation Data Streams 1

Creating an exceptional API for real-time data isn't just about moving bits quickly—it's about crafting interfaces that developers genuinely want to use while maintaining ironclad security and predictable performance.

Security, rate management, and clear documentation form the tripod supporting successful real-time APIs. Let's examine how to implement these critical elements effectively.

Authentication and Authorization Strategies

Securing continuous connections requires approaches that balance protection with performance.

  • Token-Based Authentication: JSON Web Tokens (JWT) shine for real-time authentication. They validate without database lookups and carry necessary user information within the token itself. Always implement appropriate expiration times to prevent security vulnerabilities. to monitor and manage permissions effectively.
  • Multi-Factor Authentication (MFA): For streams carrying sensitive financial or healthcare information, implementing MFA verifies user identity through multiple channels before establishing continuous connections.
  • OAuth 2.0 with Refresh Tokens: Ideal for long-running sessions, this approach allows applications to refresh access tokens without forcing users to repeatedly authenticate—maintaining seamless experiences.

For WebSocket connections, authenticate during the initial handshake and maintain that authentication state throughout the session, eliminating the need to validate every message.

Rate Limiting and Throttling

Without traffic controls, your real-time API can quickly become overwhelmed:

  • Token Bucket Algorithm: This approach allows for natural traffic bursts while maintaining overall limits over time—matching real-world usage patterns that rarely follow perfectly consistent intervals.
  • Dynamic Throttling: Adjust rate restrictions based on server load, reducing throughput for non-critical clients during peak times while maintaining service levels for priority connections.
  • Client Identification: Track usage by API key, IP address, or user ID to ensure fair resource allocation and prevent individual clients from monopolizing system capacity.
  • Graceful Degradation: When clients exceed thresholds, reduce update frequency rather than terminating connections completely. This provides a smoother user experience while still protecting system resources.

Implementing these strategies alongside API monitoring tools can help you maintain optimal performance and quickly respond to issues.

API Versioning in Streaming Contexts

Long-lived connections require special versioning considerations:

  • URL Path Versioning: Include the version directly in your connection URL (e.g., /v1/stream/market-data) for explicit, unambiguous version identification.
  • Header-Based Versioning: For WebSocket connections, pass version information in connection headers to maintain clean URLs while preserving explicit version control.
  • Gradual Deprecation: Allow older API versions to continue functioning with reduced features while encouraging migration to newer versions. Abrupt changes lead to frustrated developers and broken applications.
  • Version Negotiation: Implement handshake protocols where clients and servers agree on protocol versions during connection establishment, preventing compatibility surprises.

Async API - The Standards for Real-Time APIs

AsyncAPI is quickly emerging as the defacto standard for describing all non-REST APIs (with OpenAPI being the standard for REST APIs). If you're already familiar with OpenAPI, here's a quick overview of AsyncAPI and analogous properties:

| Concept / Property | AsyncAPI 3.0 | OpenAPI 3.1+ | | -------------------------------- | ------------------------------------------------------------------- | ------------------------------------------------------------- | | Spec Purpose | Event-driven APIs (WebSockets, MQTT, Kafka, SSE, etc.) | Request-response APIs over HTTP/HTTPS | | Top-Level Version | asyncapi: "3.0.0" | openapi: "3.1.0" | | Info Object | info (title, version, description, etc.) | Same | | Servers | servers (with protocol-specific fields like host, protocol, path) | Same, though focused on HTTP URL and variables | | Operations | operations block with send / receive actions | Defined inline under paths with get, post, etc. | | Channels / Paths | channels = logical topics or stream endpoints (e.g. /chat) | paths = HTTP paths (e.g. /users/{id}) | | Messages vs Requests | messages: standalone message definitions (for publish/subscribe) | requestBody and responses for HTTP requests/responses | | Payload Schema | payload (JSON Schema, Avro, etc.) | schema (JSON Schema-based for requests/responses) | | Actions | send, receive, and reply (new in v3) | HTTP methods (get, post, etc.) define intent | | Protocols Supported | WebSockets, MQTT, Kafka, AMQP, SSE, Redis Streams, NATS, etc. | HTTP/HTTPS | | Bindings (Protocol Metadata) | Yes (bindings object for channels, operations, messages) | Not applicable — protocol is standardized as HTTP | | Reusable Components | components: messages, schemas, securitySchemes, etc. | components: schemas, parameters, responses, securitySchemes | | Security Schemes | Yes (e.g. API key, OAuth2, etc.) | Same | | Links / Relationships | Under development (planned in v3.1+) | links for describing response relationships | | Extensions | x- prefix extensions supported | Same | | Codegen & Tooling Support | Growing: CLI, Studio, Generator, Parsers | Mature: Zudoku, Swagger UI, Stoplight, etc. | | Visual Documentation | AsyncAPI Studio, HTML docs generator | Zudoku, Swagger UI, Rapidoc | | Request-Reply Pattern | Explicit in v3 using reply action | Modeled using multiple endpoints manually | | Workflow Modeling | Better for pub/sub or streaming pipelines | Better for RESTful workflows with verbs |

Fundamentally AsyncAPI is channel-first - it defines how data flows via topics, events, or message brokers. This is in contrast to OpenAPI which is resource-first. Now, let's get into some examples to see AsyncAPI 3.0 in action.

🔌 WebSocket Chat API

The canonical example for documenting WebSockets is always a Chat API - so here's how to do it in AsyncAPI.

asyncapi: 3.0.0
info:
  title: WebSocket Chat API
  version: "1.0.0"
  description: Real-time chat API using WebSockets.

servers:
  production:
    host: chat.example.com
    protocol: ws
    path: /ws

channels:
  chatMessageChannel:
    address: chat/message
    messages:
      chatMessage:
        payload:
          type: object
          properties:
            user:
              type: string
            message:
              type: string
            timestamp:
              type: string
              format: date-time

operations:
  sendMessage:
    action: send
    channel:
      $ref: "#/channels/chatMessageChannel"
    messages:
      - $ref: "#/channels/chatMessageChannel/messages/chatMessage"

  receiveMessage:
    action: receive
    channel:
      $ref: "#/channels/chatMessageChannel"
    messages:
      - $ref: "#/channels/chatMessageChannel/messages/chatMessage"
Enter fullscreen mode Exit fullscreen mode

📡 MQTT IoT Sensor API

Want to document your MQTT API? Here's an example from the IoT space.

asyncapi: 3.0.0
info:
  title: MQTT Sensor API
  version: "1.0.0"
  description: Publishes sensor readings from IoT devices.

servers:
  mqttBroker:
    host: broker.example.com
    protocol: mqtt

channels:
  temperatureChannel:
    address: sensors/temperature
    messages:
      tempReading:
        payload:
          type: object
          properties:
            deviceId:
              type: string
            value:
              type: number
            unit:
              type: string
              enum: [C, F]
            timestamp:
              type: string
              format: date-time

operations:
  publishTemperature:
    action: send
    channel:
      $ref: "#/channels/temperatureChannel"
    messages:
      - $ref: "#/channels/temperatureChannel/messages/tempReading"
Enter fullscreen mode Exit fullscreen mode

🪵 Kafka Order Events

If you're building an Ecommerce API - then order management will definitely be a feature. Here's how to document your Kafka stream.

asyncapi: 3.0.0
info:
  title: Kafka Order Events
  version: "1.0.0"
  description: Consumes new order events from Kafka.

servers:
  kafka:
    host: kafka.example.com
    protocol: kafka

channels:
  orderCreatedChannel:
    address: order/created
    messages:
      orderCreated:
        payload:
          type: object
          properties:
            orderId:
              type: string
            customerId:
              type: string
            total:
              type: number

operations:
  consumeOrderCreated:
    action: receive
    channel:
      $ref: "#/channels/orderCreatedChannel"
    messages:
      - $ref: "#/channels/orderCreatedChannel/messages/orderCreated"
Enter fullscreen mode Exit fullscreen mode

📤 SSE Notifications API

Here's an example of documenting a notifications API you would typically use Server Sent Event for.

asyncapi: 3.0.0
info:
  title: SSE Notifications API
  version: "1.0.0"
  description: Server-Sent Events for real-time notifications.

servers:
  default:
    host: api.example.com
    protocol: http
    path: /notifications

channels:
  notificationStream:
    address: /stream
    messages:
      notification:
        payload:
          type: object
          properties:
            id:
              type: string
            type:
              type: string
            content:
              type: string

operations:
  receiveNotifications:
    action: receive
    channel:
      $ref: "#/channels/notificationStream"
    messages:
      - $ref: "#/channels/notificationStream/messages/notification"
Enter fullscreen mode Exit fullscreen mode

Documentation Elements

Effective real-time API documentation covers elements often overlooked in REST documentation:

  • Connection Lifecycle: Detail exactly how connections are established, maintained through heartbeats, and gracefully closed when complete.
  • Event Schemas: Define the structure of every possible message flowing in either direction, with clear explanations for each field.
  • Error Handling: Explain all error codes, recovery procedures, and reconnection strategies so developers know how to respond when things go wrong.
  • Interactive Examples: Provide WebSocket playground environments where developers can test connections and observe live data formats in action.
  • Rate Limit Documentation: Clearly communicate throttling policies and monitoring methods so developers can build applications that respect system constraints. In addition, offering a developer portal and request validation can further improve the usability and security of your API.

When thoughtfully designed, your real-time API becomes more than an interface—it transforms into a competitive advantage that developers actively choose over alternatives. Focus on creating experiences that make your platform the obvious choice for real-time applications.

Building Reliable Foundations: Setting Up the Server Side

The server infrastructure powering real-time data streams determines their ultimate performance, reliability, and scalability. Making informed technology choices and implementing proper flow control creates systems that remain responsive under pressure.

Let's compare key streaming technologies and examine implementation approaches that prevent common pitfalls.

Stream Processing Technologies Comparison

Each technology offers distinct advantages for different use cases:

  • Apache Kafka: The distributed commit log that handles millions of messages per second with configurable retention. Kafka excels in complex event processing scenarios requiring massive throughput and strong durability guarantees.
  • Redis Streams: Delivering microsecond latency with simple setup, Redis Streams provides blazing performance when speed matters more than guaranteed delivery of every message. Its lightweight approach to time-series data processing offers impressive results with minimal complexity.
  • AWS Kinesis: This managed service handles operational concerns while automatically scaling with demand. Kinesis trades some raw throughput capabilities compared to Kafka but dramatically reduces operational overhead.

Implementing Stream Producers

Here's how to build producers that remain stable under high loads:

Node.js Kafka Producer

const { Kafka } = require("kafkajs");

const kafka = new Kafka({
  clientId: "my-app",
  brokers: ["kafka1:9092", "kafka2:9092"],
});

const producer = kafka.producer();

async function sendMessage() {
  await producer.connect();
  await producer.send({
    topic: "test-topic",
    messages: [
      {
        value: JSON.stringify({ event: "user_action", timestamp: Date.now() }),
      },
    ],
  });
}
Enter fullscreen mode Exit fullscreen mode

Python Redis Streams Producer

import redis
import json

r = redis.Redis(host='localhost', port=6379)

event_data = {
    'user_id': 1234,
    'action': 'page_view',
    'timestamp': 1682541892
}

# Add to stream with auto-generated ID
r.xadd('user_events', {'data': json.dumps(event_data)})

Enter fullscreen mode Exit fullscreen mode

Java AWS Kinesis Producer

import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import java.nio.ByteBuffer;

public class KinesisProducer {
    public static void main(String[] args) {
        AmazonKinesis kinesisClient = AmazonKinesisClientBuilder.defaultClient();

        PutRecordRequest request = new PutRecordRequest();
        request.setStreamName("ExampleStream");
        request.setPartitionKey("user123");
        request.setData(ByteBuffer.wrap("Example event data".getBytes()));

        kinesisClient.putRecord(request);
    }
}

Enter fullscreen mode Exit fullscreen mode

To simplify and accelerate the development process, leveraging federated gateways can help manage multiple microservices and APIs more efficiently. Ensuring correct server configuration is essential.

Handling Backpressure and Overflow

Backpressure occurs when consumers can't keep pace with producers—a critical challenge in real-time systems:

  • Rate Limiting: Set producer sending rates based on consumer capacity. Controlled flow prevents system overload during traffic spikes.
  • Buffer Management: Implement smart buffers that absorb traffic spikes, providing breathing room when incoming data temporarily exceeds processing capacity.
  • Consumer-Driven Flow Control: Let consumers signal their processing capacity to producers. Kafka's consumer lag metrics and Redis Stream's XPENDINGcommand reveal processing backlogs so you can adjust accordingly.

Here's a Kafka producer that responds to backpressure:

// Producer with backpressure awareness
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
  clientId: "backpressure-aware-producer",
  brokers: ["kafka:9092"],
});
const producer = kafka.producer({ allowAutoTopicCreation: true });
const admin = kafka.admin();

async function sendWithBackpressureAwareness(topic, message) {
  await producer.connect();
  await admin.connect();

  // Check consumer lag before sending
  const offsets = await admin.fetchOffsets({
    groupId: "consumer-group-1",
    topic,
  });
  const lagTooHigh = offsets.some((t) => t.lag > 1000);

  if (lagTooHigh) {
    // Implement exponential backoff or queue locally
    await new Promise((resolve) => setTimeout(resolve, 100));
    return sendWithBackpressureAwareness(topic, message);
  }

  await producer.send({
    topic,
    messages: [{ value: JSON.stringify(message) }],
  });
}
Enter fullscreen mode Exit fullscreen mode

These server-side implementations create robust pipelines capable of handling the unpredictable realities of production traffic. With proper backpressure management, your streams will maintain consistent performance even under heavy load.

Delivering Data to Clients: Client-Side Implementation

Real Time Documentation Data Streams 2

The client side of real-time data streams requires careful implementation to maintain responsive user experiences while handling connection challenges gracefully. Effective client libraries transform raw data streams into usable application features.

Let's explore client implementation strategies across different platforms and frameworks.

JavaScript Client Implementation

Browser-based applications benefit from native WebSocket support:

// Establishing a secure WebSocket connection
const socket = new WebSocket("wss://api.example.com/v1/stream");

// Connection opened
socket.addEventListener("open", (event) => {
  socket.send(JSON.stringify({ type: "subscribe", channel: "market_data" }));
});

// Listen for messages
socket.addEventListener("message", (event) => {
  const data = JSON.parse(event.data);
  updateUI(data);
});

// Connection closed or error handling
socket.addEventListener("close", (event) => {
  console.log("Connection closed, reconnecting...", event.code);
  setTimeout(reconnect, 1000); // Implement reconnection with backoff
});

socket.addEventListener("error", (error) => {
  console.error("WebSocket error:", error);
});
Enter fullscreen mode Exit fullscreen mode

Mobile Clients for Real-Time Streams

Mobile applications face unique challenges with intermittent connectivity:

Android Kotlin WebSocket Client

private var webSocket: WebSocket? = null

fun connectToStream() {
    val client = OkHttpClient.Builder()
        .readTimeout(0, TimeUnit.MILLISECONDS) // No timeout for streaming
        .build()

    val request = Request.Builder()
        .url("wss://api.example.com/v1/stream")
        .header("Authorization", "Bearer $userToken")
        .build()

    webSocket = client.newWebSocket(request, object : WebSocketListener() {
        override fun onOpen(webSocket: WebSocket, response: Response) {
            webSocket.send("{\"type\":\"subscribe\",\"channel\":\"user_updates\"}")
        }

        override fun onMessage(webSocket: WebSocket, text: String) {
            val data = JSONObject(text)
            updateUI(data)
        }

        override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
            // Handle reconnection with exponential backoff
            reconnectWithBackoff()
        }

        override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
            // Handle errors and reconnection
            reconnectWithBackoff()
        }
    })
}

Enter fullscreen mode Exit fullscreen mode

iOS Swift WebSocket Client

var webSocketTask: URLSessionWebSocketTask?

func connectToStream() {
    let url = URL(string: "wss://api.example.com/v1/stream")!
    var request = URLRequest(url: url)
    request.addValue("Bearer \(userToken)", forHTTPHeaderField: "Authorization")

    let session = URLSession(configuration: .default)
    webSocketTask = session.webSocketTask(with: request)
    webSocketTask?.resume()

    receiveMessage()
}

func receiveMessage() {
    webSocketTask?.receive { [weak self] result in
        switch result {
        case .success(let message):
            switch message {
            case .string(let text):
                if let data = text.data(using: .utf8),
                   let json = try? JSONSerialization.jsonObject(with: data) as? [String: Any] {
                    DispatchQueue.main.async {
                        self?.updateUI(with: json)
                    }
                }
            case .data(let data):
                // Handle binary data
                break
            @unknown default:
                break
            }
            self?.receiveMessage() // Continue receiving messages

        case .failure(let error):
            print("WebSocket error: \(error)")
            DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
                self?.reconnectWithBackoff()
            }
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

Handling Connection Challenges

Robust client implementations must address these common challenges:

Reconnection Strategies

Implement exponential backoff to prevent overwhelming servers during outages while still reconnecting clients promptly:

// Exponential backoff reconnection
let reconnectAttempts = 0;
const maxReconnectAttempts = 10;

function reconnect() {
  if (reconnectAttempts >= maxReconnectAttempts) {
    console.error("Maximum reconnection attempts reached");
    return;
  }

  const delay = Math.min(30000, 1000 * Math.pow(2, reconnectAttempts));
  console.log(`Reconnecting in ${delay}ms...`);

  setTimeout(() => {
    reconnectAttempts++;
    // Re-establish connection
    initializeWebSocket();
  }, delay);
}
Enter fullscreen mode Exit fullscreen mode

Message Buffering

Queue outgoing messages when connections drop to prevent data loss:

// Message buffering for disconnection periods
const messageQueue = [];
let isConnected = false;

function sendMessage(message) {
  if (isConnected && socket.readyState === WebSocket.OPEN) {
    socket.send(JSON.stringify(message));
  } else {
    messageQueue.push(message);
  }
}

function processQueue() {
  while (messageQueue.length > 0 && isConnected) {
    const message = messageQueue.shift();
    socket.send(JSON.stringify(message));
  }
}

socket.addEventListener("open", () => {
  isConnected = true;
  processQueue();
});

socket.addEventListener("close", () => {
  isConnected = false;
});
Enter fullscreen mode Exit fullscreen mode

Heartbeat Implementation

Keep connections alive by sending periodic signals:

// Heartbeat to keep connection alive
function startHeartbeat() {
  const heartbeatInterval = setInterval(() => {
    if (socket.readyState === WebSocket.OPEN) {
      socket.send(JSON.stringify({ type: "ping" }));
    } else {
      clearInterval(heartbeatInterval);
    }
  }, 30000); // Send heartbeat every 30 seconds

  socket.addEventListener("close", () => {
    clearInterval(heartbeatInterval);
  });
}
Enter fullscreen mode Exit fullscreen mode

These client-side implementations ensure users experience consistent real-time updates regardless of network conditions. Proper error handling, reconnection logic, and message buffering transform potentially fragile connections into robust communication channels.

Bringing It All Together

Building high-quality real-time API streams isn't just a technical exercise—it's a strategic investment that shapes how developers experience your platform. Well-crafted documentation via AsyncAPI will guide your developers through the unique challenges of streaming implementations, from connection lifecycles to error recovery patterns, ultimately determining whether they succeed or abandon your API.

If you're interested in building, managing, securing, and auto-documenting your asynchronous/real-time API - you'll definitely want to check out Zuplo. Our native AsyncAPI support ensures that we can easily support whatever stack you build with. Sign up for a free Zuplo account today!

Top comments (0)