Async APIs and Event-Driven Architecture

Traditional request-response APIs work great for many scenarios. You send a request, wait for a response, and move on. But what happens when operations take minutes instead of milliseconds? What if you need to notify thousands of clients about a single event? What if services need to communicate without knowing

TRY NANO BANANA FOR FREE

Async APIs and Event-Driven Architecture

TRY NANO BANANA FOR FREE
Contents

Traditional request-response APIs work great for many scenarios. You send a request, wait for a response, and move on. But what happens when operations take minutes instead of milliseconds? What if you need to notify thousands of clients about a single event? What if services need to communicate without knowing about each other?

This is where async APIs and event-driven architecture shine.

In this guide, we'll build a real-world event-driven system for our PetStore, exploring message queues, pub/sub patterns, event sourcing, and CQRS. You'll see practical code examples and learn when to use each pattern.

The Problem with Synchronous APIs

Let's say a customer adopts a pet from our PetStore. Here's what needs to happen:

  1. Update pet status to "adopted"
  2. Send confirmation email to customer
  3. Notify the shelter
  4. Update inventory
  5. Generate adoption certificate (takes 30 seconds)
  6. Schedule first vet appointment
  7. Send welcome package
  8. Update analytics

With a synchronous REST API, this looks like:

app.post('/api/adoptions', async (req, res) => {
  try {
    // This takes forever...
    await updatePetStatus(req.body.petId);
    await sendConfirmationEmail(req.body.customerId);
    await notifyShelter(req.body.shelterId);
    await updateInventory(req.body.petId);
    await generateCertificate(req.body.adoptionId); // 30 seconds!
    await scheduleVetAppointment(req.body.petId);
    await sendWelcomePackage(req.body.customerId);
    await updateAnalytics(req.body);

    res.json({ success: true });
  } catch (error) {
    // If ANY step fails, the whole thing fails
    res.status(500).json({ error: error.message });
  }
});

The customer waits 30+ seconds for a response. If the email service is down, the entire adoption fails. If the server crashes mid-process, you don't know which steps completed.

This doesn't scale.

Enter Event-Driven Architecture

Instead of doing everything synchronously, we emit events and let interested services react:

app.post('/api/adoptions', async (req, res) => {
  // Just create the adoption record
  const adoption = await db.adoptions.create({
    petId: req.body.petId,
    customerId: req.body.customerId,
    shelterId: req.body.shelterId,
    status: 'pending'
  });

  // Emit an event
  await eventBus.publish('pet.adopted', {
    adoptionId: adoption.id,
    petId: req.body.petId,
    customerId: req.body.customerId,
    shelterId: req.body.shelterId,
    timestamp: new Date().toISOString()
  });

  // Respond immediately
  res.json({
    adoptionId: adoption.id,
    status: 'processing'
  });
});

Now the response is instant. Other services listen for the pet.adopted event and handle their tasks independently:

// Email service
eventBus.subscribe('pet.adopted', async (event) => {
  await sendConfirmationEmail(event.customerId, event.adoptionId);
});

// Certificate service
eventBus.subscribe('pet.adopted', async (event) => {
  await generateCertificate(event.adoptionId);
});

// Inventory service
eventBus.subscribe('pet.adopted', async (event) => {
  await updateInventory(event.petId);
});

Each service works independently. If the email service is down, the adoption still succeeds. Services can be deployed, scaled, and updated independently.

AsyncAPI: OpenAPI for Event-Driven APIs

Just like OpenAPI documents REST APIs, AsyncAPI documents event-driven APIs. Here's our PetStore events defined in AsyncAPI:

asyncapi: 3.0.0
info:
  title: PetStore Event API
  version: 1.0.0
  description: Event-driven API for PetStore operations

servers:
  production:
    host: kafka.petstore.com:9092
    protocol: kafka
    description: Production Kafka cluster

channels:
  petAdopted:
    address: pet.adopted
    messages:
      petAdoptedMessage:
        $ref: '#/components/messages/PetAdopted'
    description: Published when a pet is adopted

  petAvailable:
    address: pet.available
    messages:
      petAvailableMessage:
        $ref: '#/components/messages/PetAvailable'
    description: Published when a pet becomes available for adoption

  adoptionCompleted:
    address: adoption.completed
    messages:
      adoptionCompletedMessage:
        $ref: '#/components/messages/AdoptionCompleted'
    description: Published when all adoption steps are complete

components:
  messages:
    PetAdopted:
      name: PetAdopted
      title: Pet Adopted Event
      summary: Triggered when a customer adopts a pet
      contentType: application/json
      payload:
        type: object
        required:
          - adoptionId
          - petId
          - customerId
          - timestamp
        properties:
          adoptionId:
            type: string
            format: uuid
            description: Unique adoption identifier
          petId:
            type: string
            format: uuid
            description: ID of the adopted pet
          customerId:
            type: string
            format: uuid
            description: ID of the customer
          shelterId:
            type: string
            format: uuid
            description: ID of the shelter
          timestamp:
            type: string
            format: date-time
            description: When the adoption occurred

    PetAvailable:
      name: PetAvailable
      title: Pet Available Event
      contentType: application/json
      payload:
        type: object
        required:
          - petId
          - species
          - timestamp
        properties:
          petId:
            type: string
            format: uuid
          species:
            type: string
            enum: [dog, cat, bird, rabbit]
          age:
            type: integer
            minimum: 0
          timestamp:
            type: string
            format: date-time

    AdoptionCompleted:
      name: AdoptionCompleted
      title: Adoption Completed Event
      contentType: application/json
      payload:
        type: object
        required:
          - adoptionId
          - certificateUrl
          - timestamp
        properties:
          adoptionId:
            type: string
            format: uuid
          certificateUrl:
            type: string
            format: uri
          vetAppointmentDate:
            type: string
            format: date-time
          timestamp:
            type: string
            format: date-time

operations:
  publishPetAdopted:
    action: send
    channel:
      $ref: '#/channels/petAdopted'
    summary: Publish when a pet is adopted

  subscribePetAdopted:
    action: receive
    channel:
      $ref: '#/channels/petAdopted'
    summary: Subscribe to pet adoption events

This specification documents your events just like OpenAPI documents your REST endpoints. You can generate code, documentation, and even validate messages against the schema.

Message Queues: RabbitMQ

RabbitMQ is a popular message broker that implements the AMQP protocol. It's great for task queues and work distribution.

Here's how to set up RabbitMQ for our PetStore:

// publisher.js
const amqp = require('amqplib');

class EventPublisher {
  constructor() {
    this.connection = null;
    this.channel = null;
  }

  async connect() {
    this.connection = await amqp.connect('amqp://localhost');
    this.channel = await this.connection.createChannel();

    // Declare exchanges
    await this.channel.assertExchange('petstore.events', 'topic', {
      durable: true
    });

    console.log('Connected to RabbitMQ');
  }

  async publishPetAdopted(adoption) {
    const event = {
      eventType: 'pet.adopted',
      eventId: crypto.randomUUID(),
      timestamp: new Date().toISOString(),
      data: adoption
    };

    this.channel.publish(
      'petstore.events',
      'pet.adopted',
      Buffer.from(JSON.stringify(event)),
      {
        persistent: true,
        contentType: 'application/json',
        messageId: event.eventId
      }
    );

    console.log('Published pet.adopted event:', event.eventId);
  }

  async publishPetAvailable(pet) {
    const event = {
      eventType: 'pet.available',
      eventId: crypto.randomUUID(),
      timestamp: new Date().toISOString(),
      data: pet
    };

    this.channel.publish(
      'petstore.events',
      'pet.available',
      Buffer.from(JSON.stringify(event)),
      {
        persistent: true,
        contentType: 'application/json'
      }
    );
  }

  async close() {
    await this.channel.close();
    await this.connection.close();
  }
}

module.exports = EventPublisher;

Consuming events:

// consumer.js
const amqp = require('amqplib');

class EventConsumer {
  constructor(serviceName) {
    this.serviceName = serviceName;
    this.connection = null;
    this.channel = null;
    this.handlers = new Map();
  }

  async connect() {
    this.connection = await amqp.connect('amqp://localhost');
    this.channel = await this.connection.createChannel();

    // Set prefetch to process one message at a time
    await this.channel.prefetch(1);

    console.log(`${this.serviceName} connected to RabbitMQ`);
  }

  async subscribe(eventType, handler) {
    // Create a queue for this service
    const queueName = `${this.serviceName}.${eventType}`;
    await this.channel.assertQueue(queueName, {
      durable: true
    });

    // Bind queue to exchange with routing key
    await this.channel.bindQueue(
      queueName,
      'petstore.events',
      eventType
    );

    // Start consuming
    this.channel.consume(queueName, async (msg) => {
      if (!msg) return;

      try {
        const event = JSON.parse(msg.content.toString());
        console.log(`Processing ${eventType}:`, event.eventId);

        await handler(event.data);

        // Acknowledge successful processing
        this.channel.ack(msg);
      } catch (error) {
        console.error(`Error processing ${eventType}:`, error);

        // Reject and requeue the message
        this.channel.nack(msg, false, true);
      }
    });

    console.log(`Subscribed to ${eventType}`);
  }
}

// Email service
const emailConsumer = new EventConsumer('email-service');
await emailConsumer.connect();

await emailConsumer.subscribe('pet.adopted', async (adoption) => {
  await sendConfirmationEmail(adoption.customerId, adoption.adoptionId);
  console.log('Confirmation email sent');
});

// Certificate service
const certConsumer = new EventConsumer('certificate-service');
await certConsumer.connect();

await certConsumer.subscribe('pet.adopted', async (adoption) => {
  const certificateUrl = await generateCertificate(adoption.adoptionId);

  // Publish completion event
  await publisher.publish('adoption.completed', {
    adoptionId: adoption.adoptionId,
    certificateUrl
  });
});

RabbitMQ ensures messages aren't lost. If a consumer crashes while processing a message, RabbitMQ redelivers it to another consumer.

Pub/Sub with Apache Kafka

Kafka is designed for high-throughput event streaming. Unlike RabbitMQ's queues, Kafka uses a log-based approach where events are stored in order and can be replayed.

Setting up Kafka for PetStore:

// kafka-producer.js
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'petstore-api',
  brokers: ['localhost:9092']
});

const producer = kafka.producer();

async function publishEvent(topic, event) {
  await producer.connect();

  await producer.send({
    topic,
    messages: [
      {
        key: event.petId || event.adoptionId,
        value: JSON.stringify(event),
        headers: {
          'event-type': event.eventType,
          'event-id': event.eventId,
          'timestamp': event.timestamp
        }
      }
    ]
  });

  console.log(`Published to ${topic}:`, event.eventId);
}

// Usage
await publishEvent('pet-events', {
  eventType: 'pet.adopted',
  eventId: crypto.randomUUID(),
  timestamp: new Date().toISOString(),
  adoptionId: '123',
  petId: '456',
  customerId: '789'
});

Consuming from Kafka:

// kafka-consumer.js
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'email-service',
  brokers: ['localhost:9092']
});

const consumer = kafka.consumer({
  groupId: 'email-service-group'
});

async function startConsumer() {
  await consumer.connect();
  await consumer.subscribe({
    topic: 'pet-events',
    fromBeginning: false
  });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const event = JSON.parse(message.value.toString());

      console.log(`Received event: ${event.eventType}`);

      switch (event.eventType) {
        case 'pet.adopted':
          await handlePetAdopted(event);
          break;
        case 'pet.available':
          await handlePetAvailable(event);
          break;
        default:
          console.log('Unknown event type:', event.eventType);
      }
    }
  });
}

async function handlePetAdopted(event) {
  await sendConfirmationEmail(event.customerId, event.adoptionId);
  console.log('Email sent for adoption:', event.adoptionId);
}

startConsumer().catch(console.error);

Kafka's key advantage is that events are persisted. You can replay events from any point in time:

// Replay events from 24 hours ago
await consumer.subscribe({
  topic: 'pet-events',
  fromBeginning: false
});

await consumer.seek({
  topic: 'pet-events',
  partition: 0,
  offset: Date.now() - (24 * 60 * 60 * 1000)
});

This is perfect for: - Rebuilding read models - Debugging production issues - Creating new services that need historical data

Event Sourcing: The Complete History

Event sourcing takes event-driven architecture further: instead of storing current state, you store every event that ever happened.

Traditional approach (storing state):

// pets table
{
  id: '123',
  name: 'Buddy',
  status: 'adopted',
  adoptedBy: '789',
  adoptedAt: '2026-03-13T10:00:00Z'
}

You know Buddy is adopted, but you don't know the full history. Was he returned and re-adopted? Did his name change?

Event sourcing approach (storing events):

// pet_events table
[
  {
    eventId: '1',
    petId: '123',
    eventType: 'PetRegistered',
    timestamp: '2026-01-15T09:00:00Z',
    data: { name: 'Buddy', species: 'dog', age: 2 }
  },
  {
    eventId: '2',
    petId: '123',
    eventType: 'PetAdopted',
    timestamp: '2026-02-01T14:30:00Z',
    data: { adoptedBy: '456' }
  },
  {
    eventId: '3',
    petId: '123',
    eventType: 'PetReturned',
    timestamp: '2026-02-15T11:00:00Z',
    data: { reason: 'allergies' }
  },
  {
    eventId: '4',
    petId: '123',
    eventType: 'PetAdopted',
    timestamp: '2026-03-13T10:00:00Z',
    data: { adoptedBy: '789' }
  }
]

Now you have the complete history. You can reconstruct Buddy's current state by replaying events:

class Pet {
  constructor(id) {
    this.id = id;
    this.name = null;
    this.species = null;
    this.age = null;
    this.status = 'unknown';
    this.adoptedBy = null;
    this.version = 0;
  }

  // Apply events to rebuild state
  apply(event) {
    switch (event.eventType) {
      case 'PetRegistered':
        this.name = event.data.name;
        this.species = event.data.species;
        this.age = event.data.age;
        this.status = 'available';
        break;

      case 'PetAdopted':
        this.status = 'adopted';
        this.adoptedBy = event.data.adoptedBy;
        break;

      case 'PetReturned':
        this.status = 'available';
        this.adoptedBy = null;
        break;

      case 'PetNameChanged':
        this.name = event.data.newName;
        break;
    }

    this.version++;
  }

  // Load pet from event store
  static async load(petId) {
    const events = await eventStore.getEvents(petId);
    const pet = new Pet(petId);

    for (const event of events) {
      pet.apply(event);
    }

    return pet;
  }

  // Save new events
  async adoptPet(customerId) {
    if (this.status !== 'available') {
      throw new Error('Pet is not available');
    }

    const event = {
      eventId: crypto.randomUUID(),
      petId: this.id,
      eventType: 'PetAdopted',
      timestamp: new Date().toISOString(),
      data: { adoptedBy: customerId },
      version: this.version + 1
    };

    await eventStore.saveEvent(event);
    this.apply(event);
  }
}

// Usage
const buddy = await Pet.load('123');
console.log(buddy.status); // 'adopted'
console.log(buddy.adoptedBy); // '789'
console.log(buddy.version); // 4

// Adopt another pet
const max = await Pet.load('456');
await max.adoptPet('999');

Event sourcing gives you: - Complete audit trail - Time travel (see state at any point) - Easy debugging (replay events to reproduce bugs) - New features from old data (analyze historical patterns)

CQRS: Separating Reads and Writes

CQRS (Command Query Responsibility Segregation) pairs perfectly with event sourcing. The idea: use different models for writing and reading data.

Writing (commands):

// Command: Adopt a pet
class AdoptPetCommand {
  constructor(petId, customerId) {
    this.petId = petId;
    this.customerId = customerId;
  }
}

class AdoptPetHandler {
  async handle(command) {
    // Load pet from event store
    const pet = await Pet.load(command.petId);

    // Validate
    if (pet.status !== 'available') {
      throw new Error('Pet is not available');
    }

    // Execute command (generates events)
    await pet.adoptPet(command.customerId);

    // Events are automatically published
    return { success: true, petId: pet.id };
  }
}

// API endpoint
app.post('/api/commands/adopt-pet', async (req, res) => {
  const command = new AdoptPetCommand(
    req.body.petId,
    req.body.customerId
  );

  const handler = new AdoptPetHandler();
  const result = await handler.handle(command);

  res.json(result);
});

Reading (queries):

// Read model: Optimized for queries
// This is a denormalized view built from events

// read_models/available_pets table
{
  id: '456',
  name: 'Max',
  species: 'cat',
  age: 1,
  shelter_name: 'Happy Paws',
  photo_url: 'https://...',
  days_available: 45
}

// Query: Get available pets
class GetAvailablePetsQuery {
  constructor(species, maxAge) {
    this.species = species;
    this.maxAge = maxAge;
  }
}

class GetAvailablePetsHandler {
  async handle(query) {
    // Query the read model (fast!)
    return await db.availablePets
      .where('species', query.species)
      .where('age', '<=', query.maxAge)
      .orderBy('days_available', 'desc')
      .limit(20);
  }
}

// API endpoint
app.get('/api/queries/available-pets', async (req, res) => {
  const query = new GetAvailablePetsQuery(
    req.query.species,
    parseInt(req.query.maxAge)
  );

  const handler = new GetAvailablePetsHandler();
  const pets = await handler.handle(query);

  res.json(pets);
});

The read model is updated by listening to events:

// Read model projector
eventBus.subscribe('PetRegistered', async (event) => {
  await db.availablePets.insert({
    id: event.petId,
    name: event.data.name,
    species: event.data.species,
    age: event.data.age,
    days_available: 0,
    created_at: event.timestamp
  });
});

eventBus.subscribe('PetAdopted', async (event) => {
  await db.availablePets.delete(event.petId);
});

eventBus.subscribe('PetReturned', async (event) => {
  const pet = await Pet.load(event.petId);
  await db.availablePets.insert({
    id: pet.id,
    name: pet.name,
    species: pet.species,
    age: pet.age,
    days_available: 0,
    created_at: event.timestamp
  });
});

CQRS benefits: - Optimized read models for different use cases - Scale reads and writes independently - Complex queries don't slow down writes - Multiple read models from same events

Putting It All Together

Here's a complete event-driven PetStore system:

// app.js - Main application
const express = require('express');
const { Kafka } = require('kafkajs');
const EventStore = require('./event-store');
const { AdoptPetHandler, RegisterPetHandler } = require('./commands');
const { GetAvailablePetsHandler, GetAdoptionHistoryHandler } = require('./queries');

const app = express();
app.use(express.json());

const kafka = new Kafka({
  clientId: 'petstore-api',
  brokers: ['localhost:9092']
});

const producer = kafka.producer();
const eventStore = new EventStore();

// Commands (writes)
app.post('/api/commands/register-pet', async (req, res) => {
  try {
    const handler = new RegisterPetHandler(eventStore, producer);
    const result = await handler.handle(req.body);
    res.json(result);
  } catch (error) {
    res.status(400).json({ error: error.message });
  }
});

app.post('/api/commands/adopt-pet', async (req, res) => {
  try {
    const handler = new AdoptPetHandler(eventStore, producer);
    const result = await handler.handle(req.body);
    res.json(result);
  } catch (error) {
    res.status(400).json({ error: error.message });
  }
});

// Queries (reads)
app.get('/api/queries/available-pets', async (req, res) => {
  const handler = new GetAvailablePetsHandler();
  const pets = await handler.handle(req.query);
  res.json(pets);
});

app.get('/api/queries/adoption-history/:petId', async (req, res) => {
  const handler = new GetAdoptionHistoryHandler(eventStore);
  const history = await handler.handle(req.params.petId);
  res.json(history);
});

app.listen(3000, () => {
  console.log('PetStore API running on port 3000');
});

This architecture scales beautifully. You can: - Add new services without changing existing ones - Scale each service independently - Deploy updates without downtime - Replay events to fix bugs or add features - Build new read models from historical events

When to Use Event-Driven Architecture

Event-driven architecture isn't always the answer. Use it when:

You need loose coupling: Services should work independently Operations are asynchronous: Tasks take time or can fail You need scalability: Different parts scale differently Audit trails matter: You need complete history Multiple systems react to events: One action triggers many reactions

Stick with synchronous APIs when: - Operations are simple and fast - You need immediate consistency - The system is small and simple - Your team is unfamiliar with async patterns

Conclusion

Async APIs and event-driven architecture transform how we build systems. Instead of tightly coupled services waiting on each other, you get independent services reacting to events.

RabbitMQ and Kafka provide the infrastructure. AsyncAPI documents your events. Event sourcing gives you complete history. CQRS optimizes reads and writes separately.

The result? Systems that scale, evolve, and handle failures gracefully.

Start small. Add events to one part of your system. See how it feels. Then expand from there. Your future self will thank you.