Traditional request-response APIs work great for many scenarios. You send a request, wait for a response, and move on. But what happens when operations take minutes instead of milliseconds? What if you need to notify thousands of clients about a single event? What if services need to communicate without knowing about each other?
This is where async APIs and event-driven architecture shine.
In this guide, we'll build a real-world event-driven system for our PetStore, exploring message queues, pub/sub patterns, event sourcing, and CQRS. You'll see practical code examples and learn when to use each pattern.
The Problem with Synchronous APIs
Let's say a customer adopts a pet from our PetStore. Here's what needs to happen:
- Update pet status to "adopted"
- Send confirmation email to customer
- Notify the shelter
- Update inventory
- Generate adoption certificate (takes 30 seconds)
- Schedule first vet appointment
- Send welcome package
- Update analytics
With a synchronous REST API, this looks like:
app.post('/api/adoptions', async (req, res) => {
try {
// This takes forever...
await updatePetStatus(req.body.petId);
await sendConfirmationEmail(req.body.customerId);
await notifyShelter(req.body.shelterId);
await updateInventory(req.body.petId);
await generateCertificate(req.body.adoptionId); // 30 seconds!
await scheduleVetAppointment(req.body.petId);
await sendWelcomePackage(req.body.customerId);
await updateAnalytics(req.body);
res.json({ success: true });
} catch (error) {
// If ANY step fails, the whole thing fails
res.status(500).json({ error: error.message });
}
});
The customer waits 30+ seconds for a response. If the email service is down, the entire adoption fails. If the server crashes mid-process, you don't know which steps completed.
This doesn't scale.
Enter Event-Driven Architecture
Instead of doing everything synchronously, we emit events and let interested services react:
app.post('/api/adoptions', async (req, res) => {
// Just create the adoption record
const adoption = await db.adoptions.create({
petId: req.body.petId,
customerId: req.body.customerId,
shelterId: req.body.shelterId,
status: 'pending'
});
// Emit an event
await eventBus.publish('pet.adopted', {
adoptionId: adoption.id,
petId: req.body.petId,
customerId: req.body.customerId,
shelterId: req.body.shelterId,
timestamp: new Date().toISOString()
});
// Respond immediately
res.json({
adoptionId: adoption.id,
status: 'processing'
});
});
Now the response is instant. Other services listen for the pet.adopted event and handle their tasks independently:
// Email service
eventBus.subscribe('pet.adopted', async (event) => {
await sendConfirmationEmail(event.customerId, event.adoptionId);
});
// Certificate service
eventBus.subscribe('pet.adopted', async (event) => {
await generateCertificate(event.adoptionId);
});
// Inventory service
eventBus.subscribe('pet.adopted', async (event) => {
await updateInventory(event.petId);
});
Each service works independently. If the email service is down, the adoption still succeeds. Services can be deployed, scaled, and updated independently.
AsyncAPI: OpenAPI for Event-Driven APIs
Just like OpenAPI documents REST APIs, AsyncAPI documents event-driven APIs. Here's our PetStore events defined in AsyncAPI:
asyncapi: 3.0.0
info:
title: PetStore Event API
version: 1.0.0
description: Event-driven API for PetStore operations
servers:
production:
host: kafka.petstore.com:9092
protocol: kafka
description: Production Kafka cluster
channels:
petAdopted:
address: pet.adopted
messages:
petAdoptedMessage:
$ref: '#/components/messages/PetAdopted'
description: Published when a pet is adopted
petAvailable:
address: pet.available
messages:
petAvailableMessage:
$ref: '#/components/messages/PetAvailable'
description: Published when a pet becomes available for adoption
adoptionCompleted:
address: adoption.completed
messages:
adoptionCompletedMessage:
$ref: '#/components/messages/AdoptionCompleted'
description: Published when all adoption steps are complete
components:
messages:
PetAdopted:
name: PetAdopted
title: Pet Adopted Event
summary: Triggered when a customer adopts a pet
contentType: application/json
payload:
type: object
required:
- adoptionId
- petId
- customerId
- timestamp
properties:
adoptionId:
type: string
format: uuid
description: Unique adoption identifier
petId:
type: string
format: uuid
description: ID of the adopted pet
customerId:
type: string
format: uuid
description: ID of the customer
shelterId:
type: string
format: uuid
description: ID of the shelter
timestamp:
type: string
format: date-time
description: When the adoption occurred
PetAvailable:
name: PetAvailable
title: Pet Available Event
contentType: application/json
payload:
type: object
required:
- petId
- species
- timestamp
properties:
petId:
type: string
format: uuid
species:
type: string
enum: [dog, cat, bird, rabbit]
age:
type: integer
minimum: 0
timestamp:
type: string
format: date-time
AdoptionCompleted:
name: AdoptionCompleted
title: Adoption Completed Event
contentType: application/json
payload:
type: object
required:
- adoptionId
- certificateUrl
- timestamp
properties:
adoptionId:
type: string
format: uuid
certificateUrl:
type: string
format: uri
vetAppointmentDate:
type: string
format: date-time
timestamp:
type: string
format: date-time
operations:
publishPetAdopted:
action: send
channel:
$ref: '#/channels/petAdopted'
summary: Publish when a pet is adopted
subscribePetAdopted:
action: receive
channel:
$ref: '#/channels/petAdopted'
summary: Subscribe to pet adoption events
This specification documents your events just like OpenAPI documents your REST endpoints. You can generate code, documentation, and even validate messages against the schema.
Message Queues: RabbitMQ
RabbitMQ is a popular message broker that implements the AMQP protocol. It's great for task queues and work distribution.
Here's how to set up RabbitMQ for our PetStore:
// publisher.js
const amqp = require('amqplib');
class EventPublisher {
constructor() {
this.connection = null;
this.channel = null;
}
async connect() {
this.connection = await amqp.connect('amqp://localhost');
this.channel = await this.connection.createChannel();
// Declare exchanges
await this.channel.assertExchange('petstore.events', 'topic', {
durable: true
});
console.log('Connected to RabbitMQ');
}
async publishPetAdopted(adoption) {
const event = {
eventType: 'pet.adopted',
eventId: crypto.randomUUID(),
timestamp: new Date().toISOString(),
data: adoption
};
this.channel.publish(
'petstore.events',
'pet.adopted',
Buffer.from(JSON.stringify(event)),
{
persistent: true,
contentType: 'application/json',
messageId: event.eventId
}
);
console.log('Published pet.adopted event:', event.eventId);
}
async publishPetAvailable(pet) {
const event = {
eventType: 'pet.available',
eventId: crypto.randomUUID(),
timestamp: new Date().toISOString(),
data: pet
};
this.channel.publish(
'petstore.events',
'pet.available',
Buffer.from(JSON.stringify(event)),
{
persistent: true,
contentType: 'application/json'
}
);
}
async close() {
await this.channel.close();
await this.connection.close();
}
}
module.exports = EventPublisher;
Consuming events:
// consumer.js
const amqp = require('amqplib');
class EventConsumer {
constructor(serviceName) {
this.serviceName = serviceName;
this.connection = null;
this.channel = null;
this.handlers = new Map();
}
async connect() {
this.connection = await amqp.connect('amqp://localhost');
this.channel = await this.connection.createChannel();
// Set prefetch to process one message at a time
await this.channel.prefetch(1);
console.log(`${this.serviceName} connected to RabbitMQ`);
}
async subscribe(eventType, handler) {
// Create a queue for this service
const queueName = `${this.serviceName}.${eventType}`;
await this.channel.assertQueue(queueName, {
durable: true
});
// Bind queue to exchange with routing key
await this.channel.bindQueue(
queueName,
'petstore.events',
eventType
);
// Start consuming
this.channel.consume(queueName, async (msg) => {
if (!msg) return;
try {
const event = JSON.parse(msg.content.toString());
console.log(`Processing ${eventType}:`, event.eventId);
await handler(event.data);
// Acknowledge successful processing
this.channel.ack(msg);
} catch (error) {
console.error(`Error processing ${eventType}:`, error);
// Reject and requeue the message
this.channel.nack(msg, false, true);
}
});
console.log(`Subscribed to ${eventType}`);
}
}
// Email service
const emailConsumer = new EventConsumer('email-service');
await emailConsumer.connect();
await emailConsumer.subscribe('pet.adopted', async (adoption) => {
await sendConfirmationEmail(adoption.customerId, adoption.adoptionId);
console.log('Confirmation email sent');
});
// Certificate service
const certConsumer = new EventConsumer('certificate-service');
await certConsumer.connect();
await certConsumer.subscribe('pet.adopted', async (adoption) => {
const certificateUrl = await generateCertificate(adoption.adoptionId);
// Publish completion event
await publisher.publish('adoption.completed', {
adoptionId: adoption.adoptionId,
certificateUrl
});
});
RabbitMQ ensures messages aren't lost. If a consumer crashes while processing a message, RabbitMQ redelivers it to another consumer.
Pub/Sub with Apache Kafka
Kafka is designed for high-throughput event streaming. Unlike RabbitMQ's queues, Kafka uses a log-based approach where events are stored in order and can be replayed.
Setting up Kafka for PetStore:
// kafka-producer.js
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'petstore-api',
brokers: ['localhost:9092']
});
const producer = kafka.producer();
async function publishEvent(topic, event) {
await producer.connect();
await producer.send({
topic,
messages: [
{
key: event.petId || event.adoptionId,
value: JSON.stringify(event),
headers: {
'event-type': event.eventType,
'event-id': event.eventId,
'timestamp': event.timestamp
}
}
]
});
console.log(`Published to ${topic}:`, event.eventId);
}
// Usage
await publishEvent('pet-events', {
eventType: 'pet.adopted',
eventId: crypto.randomUUID(),
timestamp: new Date().toISOString(),
adoptionId: '123',
petId: '456',
customerId: '789'
});
Consuming from Kafka:
// kafka-consumer.js
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'email-service',
brokers: ['localhost:9092']
});
const consumer = kafka.consumer({
groupId: 'email-service-group'
});
async function startConsumer() {
await consumer.connect();
await consumer.subscribe({
topic: 'pet-events',
fromBeginning: false
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value.toString());
console.log(`Received event: ${event.eventType}`);
switch (event.eventType) {
case 'pet.adopted':
await handlePetAdopted(event);
break;
case 'pet.available':
await handlePetAvailable(event);
break;
default:
console.log('Unknown event type:', event.eventType);
}
}
});
}
async function handlePetAdopted(event) {
await sendConfirmationEmail(event.customerId, event.adoptionId);
console.log('Email sent for adoption:', event.adoptionId);
}
startConsumer().catch(console.error);
Kafka's key advantage is that events are persisted. You can replay events from any point in time:
// Replay events from 24 hours ago
await consumer.subscribe({
topic: 'pet-events',
fromBeginning: false
});
await consumer.seek({
topic: 'pet-events',
partition: 0,
offset: Date.now() - (24 * 60 * 60 * 1000)
});
This is perfect for: - Rebuilding read models - Debugging production issues - Creating new services that need historical data
Event Sourcing: The Complete History
Event sourcing takes event-driven architecture further: instead of storing current state, you store every event that ever happened.
Traditional approach (storing state):
// pets table
{
id: '123',
name: 'Buddy',
status: 'adopted',
adoptedBy: '789',
adoptedAt: '2026-03-13T10:00:00Z'
}
You know Buddy is adopted, but you don't know the full history. Was he returned and re-adopted? Did his name change?
Event sourcing approach (storing events):
// pet_events table
[
{
eventId: '1',
petId: '123',
eventType: 'PetRegistered',
timestamp: '2026-01-15T09:00:00Z',
data: { name: 'Buddy', species: 'dog', age: 2 }
},
{
eventId: '2',
petId: '123',
eventType: 'PetAdopted',
timestamp: '2026-02-01T14:30:00Z',
data: { adoptedBy: '456' }
},
{
eventId: '3',
petId: '123',
eventType: 'PetReturned',
timestamp: '2026-02-15T11:00:00Z',
data: { reason: 'allergies' }
},
{
eventId: '4',
petId: '123',
eventType: 'PetAdopted',
timestamp: '2026-03-13T10:00:00Z',
data: { adoptedBy: '789' }
}
]
Now you have the complete history. You can reconstruct Buddy's current state by replaying events:
class Pet {
constructor(id) {
this.id = id;
this.name = null;
this.species = null;
this.age = null;
this.status = 'unknown';
this.adoptedBy = null;
this.version = 0;
}
// Apply events to rebuild state
apply(event) {
switch (event.eventType) {
case 'PetRegistered':
this.name = event.data.name;
this.species = event.data.species;
this.age = event.data.age;
this.status = 'available';
break;
case 'PetAdopted':
this.status = 'adopted';
this.adoptedBy = event.data.adoptedBy;
break;
case 'PetReturned':
this.status = 'available';
this.adoptedBy = null;
break;
case 'PetNameChanged':
this.name = event.data.newName;
break;
}
this.version++;
}
// Load pet from event store
static async load(petId) {
const events = await eventStore.getEvents(petId);
const pet = new Pet(petId);
for (const event of events) {
pet.apply(event);
}
return pet;
}
// Save new events
async adoptPet(customerId) {
if (this.status !== 'available') {
throw new Error('Pet is not available');
}
const event = {
eventId: crypto.randomUUID(),
petId: this.id,
eventType: 'PetAdopted',
timestamp: new Date().toISOString(),
data: { adoptedBy: customerId },
version: this.version + 1
};
await eventStore.saveEvent(event);
this.apply(event);
}
}
// Usage
const buddy = await Pet.load('123');
console.log(buddy.status); // 'adopted'
console.log(buddy.adoptedBy); // '789'
console.log(buddy.version); // 4
// Adopt another pet
const max = await Pet.load('456');
await max.adoptPet('999');
Event sourcing gives you: - Complete audit trail - Time travel (see state at any point) - Easy debugging (replay events to reproduce bugs) - New features from old data (analyze historical patterns)
CQRS: Separating Reads and Writes
CQRS (Command Query Responsibility Segregation) pairs perfectly with event sourcing. The idea: use different models for writing and reading data.
Writing (commands):
// Command: Adopt a pet
class AdoptPetCommand {
constructor(petId, customerId) {
this.petId = petId;
this.customerId = customerId;
}
}
class AdoptPetHandler {
async handle(command) {
// Load pet from event store
const pet = await Pet.load(command.petId);
// Validate
if (pet.status !== 'available') {
throw new Error('Pet is not available');
}
// Execute command (generates events)
await pet.adoptPet(command.customerId);
// Events are automatically published
return { success: true, petId: pet.id };
}
}
// API endpoint
app.post('/api/commands/adopt-pet', async (req, res) => {
const command = new AdoptPetCommand(
req.body.petId,
req.body.customerId
);
const handler = new AdoptPetHandler();
const result = await handler.handle(command);
res.json(result);
});
Reading (queries):
// Read model: Optimized for queries
// This is a denormalized view built from events
// read_models/available_pets table
{
id: '456',
name: 'Max',
species: 'cat',
age: 1,
shelter_name: 'Happy Paws',
photo_url: 'https://...',
days_available: 45
}
// Query: Get available pets
class GetAvailablePetsQuery {
constructor(species, maxAge) {
this.species = species;
this.maxAge = maxAge;
}
}
class GetAvailablePetsHandler {
async handle(query) {
// Query the read model (fast!)
return await db.availablePets
.where('species', query.species)
.where('age', '<=', query.maxAge)
.orderBy('days_available', 'desc')
.limit(20);
}
}
// API endpoint
app.get('/api/queries/available-pets', async (req, res) => {
const query = new GetAvailablePetsQuery(
req.query.species,
parseInt(req.query.maxAge)
);
const handler = new GetAvailablePetsHandler();
const pets = await handler.handle(query);
res.json(pets);
});
The read model is updated by listening to events:
// Read model projector
eventBus.subscribe('PetRegistered', async (event) => {
await db.availablePets.insert({
id: event.petId,
name: event.data.name,
species: event.data.species,
age: event.data.age,
days_available: 0,
created_at: event.timestamp
});
});
eventBus.subscribe('PetAdopted', async (event) => {
await db.availablePets.delete(event.petId);
});
eventBus.subscribe('PetReturned', async (event) => {
const pet = await Pet.load(event.petId);
await db.availablePets.insert({
id: pet.id,
name: pet.name,
species: pet.species,
age: pet.age,
days_available: 0,
created_at: event.timestamp
});
});
CQRS benefits: - Optimized read models for different use cases - Scale reads and writes independently - Complex queries don't slow down writes - Multiple read models from same events
Putting It All Together
Here's a complete event-driven PetStore system:
// app.js - Main application
const express = require('express');
const { Kafka } = require('kafkajs');
const EventStore = require('./event-store');
const { AdoptPetHandler, RegisterPetHandler } = require('./commands');
const { GetAvailablePetsHandler, GetAdoptionHistoryHandler } = require('./queries');
const app = express();
app.use(express.json());
const kafka = new Kafka({
clientId: 'petstore-api',
brokers: ['localhost:9092']
});
const producer = kafka.producer();
const eventStore = new EventStore();
// Commands (writes)
app.post('/api/commands/register-pet', async (req, res) => {
try {
const handler = new RegisterPetHandler(eventStore, producer);
const result = await handler.handle(req.body);
res.json(result);
} catch (error) {
res.status(400).json({ error: error.message });
}
});
app.post('/api/commands/adopt-pet', async (req, res) => {
try {
const handler = new AdoptPetHandler(eventStore, producer);
const result = await handler.handle(req.body);
res.json(result);
} catch (error) {
res.status(400).json({ error: error.message });
}
});
// Queries (reads)
app.get('/api/queries/available-pets', async (req, res) => {
const handler = new GetAvailablePetsHandler();
const pets = await handler.handle(req.query);
res.json(pets);
});
app.get('/api/queries/adoption-history/:petId', async (req, res) => {
const handler = new GetAdoptionHistoryHandler(eventStore);
const history = await handler.handle(req.params.petId);
res.json(history);
});
app.listen(3000, () => {
console.log('PetStore API running on port 3000');
});
This architecture scales beautifully. You can: - Add new services without changing existing ones - Scale each service independently - Deploy updates without downtime - Replay events to fix bugs or add features - Build new read models from historical events
When to Use Event-Driven Architecture
Event-driven architecture isn't always the answer. Use it when:
You need loose coupling: Services should work independently Operations are asynchronous: Tasks take time or can fail You need scalability: Different parts scale differently Audit trails matter: You need complete history Multiple systems react to events: One action triggers many reactions
Stick with synchronous APIs when: - Operations are simple and fast - You need immediate consistency - The system is small and simple - Your team is unfamiliar with async patterns
Conclusion
Async APIs and event-driven architecture transform how we build systems. Instead of tightly coupled services waiting on each other, you get independent services reacting to events.
RabbitMQ and Kafka provide the infrastructure. AsyncAPI documents your events. Event sourcing gives you complete history. CQRS optimizes reads and writes separately.
The result? Systems that scale, evolve, and handle failures gracefully.
Start small. Add events to one part of your system. See how it feels. Then expand from there. Your future self will thank you.