Service Communication
Learn different patterns for inter-service communication including HTTP, message queues, and event-driven architecture. This is a foundational concept in server-side JavaScript development that professional developers rely on daily. The explanations below are written to be beginner-friendly while covering the depth and nuance that comes from real-world Node.js experience. Take your time with each section and practice the examples
Service Communication Patterns
Microservices need to communicate with each other to fulfill business requirements. There are several patterns for inter-service communication, each with its own trade-offs.. This is an essential concept that every Node.js developer must understand thoroughly. In professional development environments, getting this right can mean the difference between code that works reliably and code that breaks in production. The following sections break this down into clear, digestible pieces with practical examples you can try immediately
HTTP Communication
// Service-to-Service HTTP Communication
const express = require('express');
const axios = require('axios');
const app = express();
// Service discovery configuration
const services = {
users: 'http://localhost:3001',
orders: 'http://localhost:3002',
products: 'http://localhost:3003',
payments: 'http://localhost:3004'
};
// Circuit breaker implementation
class CircuitBreaker {
constructor(failureThreshold = 5, timeout = 60000) {
this.failureThreshold = failureThreshold;
this.timeout = timeout;
this.failures = 0;
this.lastFailureTime = null;
this.state = 'CLOSED'; // CLOSED, OPEN, HALF_OPEN
}
async call(serviceCall) {
if (this.state === 'OPEN') {
if (Date.now() - this.lastFailureTime > this.timeout) {
this.state = 'HALF_OPEN';
} else {
throw new Error('Circuit breaker is OPEN');
}
}
try {
const result = await serviceCall();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
onSuccess() {
this.failures = 0;
this.state = 'CLOSED';
}
onFailure() {
this.failures++;
this.lastFailureTime = Date.now();
if (this.failures >= this.failureThreshold) {
this.state = 'OPEN';
}
}
}
// Create circuit breakers for each service
const circuitBreakers = {
users: new CircuitBreaker(),
orders: new CircuitBreaker(),
products: new CircuitBreaker(),
payments: new CircuitBreaker()
};
// Service communication with circuit breaker
async function callService(serviceName, endpoint, options = {}) {
const circuitBreaker = circuitBreakers[serviceName];
const serviceUrl = services[serviceName];
return circuitBreaker.call(async () => {
const response = await axios({
method: options.method || 'GET',
url: `${serviceUrl}${endpoint}`,
data: options.data,
headers: options.headers,
timeout: 5000
});
return response.data;
});
}
// API Gateway routes
app.get('/api/orders/:orderId/details', async (req, res) => {
try {
const { orderId } = req.params;
// Get order details
const order = await callService('orders', `/api/orders/${orderId}`);
// Get user details
const user = await callService('users', `/api/users/${order.userId}`);
// Get product details for each product in the order
const productPromises = order.products.map(product =>
callService('products', `/api/products/${product.productId}`)
);
const products = await Promise.all(productPromises);
// Combine the data
const orderDetails = {
...order,
user,
products: order.products.map((orderProduct, index) => ({
...orderProduct,
details: products[index]
}))
};
res.json(orderDetails);
} catch (error) {
console.error('Error fetching order details:', error);
res.status(500).json({ error: 'Failed to fetch order details' });
}
});
// Health check endpoint
app.get('/health', (req, res) => {
res.json({ status: 'healthy', timestamp: new Date().toISOString() });
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`API Gateway running on port ${PORT}`);
});Message Queue Communication
// Message Queue with RabbitMQ
const amqp = require('amqplib');
const express = require('express');
const app = express();
app.use(express.json());
// RabbitMQ connection
let channel;
async function connectToRabbitMQ() {
try {
const connection = await amqp.connect('amqp://localhost');
channel = await connection.createChannel();
// Declare queues
await channel.assertQueue('order.created', { durable: true });
await channel.assertQueue('order.processed', { durable: true });
await channel.assertQueue('payment.processed', { durable: true });
await channel.assertQueue('inventory.updated', { durable: true });
console.log('Connected to RabbitMQ');
} catch (error) {
console.error('Failed to connect to RabbitMQ:', error);
}
}
// Publish message to queue
async function publishMessage(queue, message) {
if (!channel) {
throw new Error('RabbitMQ channel not available');
}
return channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
persistent: true
});
}
// Consume messages from queue
async function consumeMessages(queue, callback) {
if (!channel) {
throw new Error('RabbitMQ channel not available');
}
await channel.consume(queue, async (msg) => {
if (msg) {
try {
const data = JSON.parse(msg.content.toString());
await callback(data);
channel.ack(msg);
} catch (error) {
console.error('Error processing message:', error);
channel.nack(msg);
}
}
});
}
// Order Service - Publish order created event
app.post('/api/orders', async (req, res) => {
try {
const order = new Order(req.body);
await order.save();
// Publish order created event
await publishMessage('order.created', {
orderId: order._id,
userId: order.userId,
products: order.products,
total: order.total,
timestamp: new Date().toISOString()
});
res.status(201).json(order);
} catch (error) {
res.status(400).json({ error: 'Failed to create order' });
}
});
// Payment Service - Consume order created events
consumeMessages('order.created', async (data) => {
console.log('Processing payment for order:', data.orderId);
// Process payment logic here
const paymentResult = await processPayment(data);
// Publish payment processed event
await publishMessage('payment.processed', {
orderId: data.orderId,
paymentId: paymentResult.paymentId,
status: paymentResult.status,
timestamp: new Date().toISOString()
});
});
// Inventory Service - Consume payment processed events
consumeMessages('payment.processed', async (data) => {
if (data.status === 'completed') {
console.log('Updating inventory for order:', data.orderId);
// Update inventory logic here
await updateInventory(data.orderId);
// Publish inventory updated event
await publishMessage('inventory.updated', {
orderId: data.orderId,
status: 'updated',
timestamp: new Date().toISOString()
});
}
});
// Order Service - Consume inventory updated events
consumeMessages('inventory.updated', async (data) => {
console.log('Marking order as processed:', data.orderId);
// Update order status
await Order.findByIdAndUpdate(data.orderId, { status: 'processed' });
// Publish order processed event
await publishMessage('order.processed', {
orderId: data.orderId,
status: 'processed',
timestamp: new Date().toISOString()
});
});
// Initialize RabbitMQ connection
connectToRabbitMQ();
const PORT = process.env.PORT || 3002;
app.listen(PORT, () => {
console.log(`Order service running on port ${PORT}`);
});