Live Data Streaming
Implement live data streaming for real-time dashboards, notifications, and data visualization. This is a foundational concept in server-side JavaScript development that professional developers rely on daily. The explanations below are written to be beginner-friendly while covering the depth and nuance that comes from real-world Node.js experience. Take your time with each section and practice the examples
Data Streaming Architecture
Live data streaming enables real-time updates for dashboards, analytics, and monitoring systems. We'll implement streaming for various data sources including databases, APIs, and IoT devices.. This is an essential concept that every Node.js developer must understand thoroughly. In professional development environments, getting this right can mean the difference between code that works reliably and code that breaks in production. The following sections break this down into clear, digestible pieces with practical examples you can try immediately
Real-time Dashboard Server
// Real-time dashboard server
const express = require('express');
const http = require('http');
const socketIo = require('socket.io');
const mongoose = require('mongoose');
const app = express();
const server = http.createServer(app);
const io = socketIo(server, {
cors: { origin: "*" }
});
// Connect to MongoDB
mongoose.connect('mongodb://localhost:27017/dashboard');
// Data Schema
const dataSchema = new mongoose.Schema({
type: String,
value: Number,
timestamp: { type: Date, default: Date.now },
metadata: mongoose.Schema.Types.Mixed
});
const DataPoint = mongoose.model('DataPoint', dataSchema);
// Simulate real-time data
function generateData() {
const dataTypes = ['temperature', 'humidity', 'pressure', 'cpu_usage'];
const type = dataTypes[Math.floor(Math.random() * dataTypes.length)];
let value;
switch(type) {
case 'temperature':
value = 20 + Math.random() * 30; // 20-50°C
break;
case 'humidity':
value = 30 + Math.random() * 50; // 30-80%
break;
case 'pressure':
value = 1000 + Math.random() * 100; // 1000-1100 hPa
break;
case 'cpu_usage':
value = Math.random() * 100; // 0-100%
break;
}
return { type, value, metadata: { source: 'sensor-1' } };
}
// Stream data to connected clients
function streamData() {
const data = generateData();
// Save to database
const dataPoint = new DataPoint(data);
dataPoint.save();
// Emit to all connected clients
io.emit('data-update', {
...data,
timestamp: dataPoint.timestamp
});
// Emit to specific data type subscribers
io.to(data.type).emit('data-update', {
...data,
timestamp: dataPoint.timestamp
});
}
// Socket connection handling
io.on('connection', (socket) => {
console.log('Dashboard client connected:', socket.id);
// Subscribe to specific data types
socket.on('subscribe', (dataTypes) => {
dataTypes.forEach(type => {
socket.join(type);
});
console.log(`Client subscribed to: ${dataTypes.join(', ')}`);
});
// Unsubscribe from data types
socket.on('unsubscribe', (dataTypes) => {
dataTypes.forEach(type => {
socket.leave(type);
});
});
// Request historical data
socket.on('get-history', async (data) => {
try {
const { type, limit = 100 } = data;
const history = await DataPoint.find({ type })
.sort({ timestamp: -1 })
.limit(limit);
socket.emit('history-data', {
type,
data: history.reverse()
});
} catch (error) {
socket.emit('error', 'Failed to fetch historical data');
}
});
// Handle disconnection
socket.on('disconnect', () => {
console.log('Dashboard client disconnected:', socket.id);
});
});
// Start data streaming
setInterval(streamData, 1000); // Stream every second
// API endpoints
app.get('/api/data/:type', async (req, res) => {
try {
const data = await DataPoint.find({ type: req.params.type })
.sort({ timestamp: -1 })
.limit(100);
res.json(data.reverse());
} catch (error) {
res.status(500).json({ error: 'Failed to fetch data' });
}
});
app.get('/api/data', async (req, res) => {
try {
const data = await DataPoint.find()
.sort({ timestamp: -1 })
.limit(100);
res.json(data.reverse());
} catch (error) {
res.status(500).json({ error: 'Failed to fetch data' });
}
});
server.listen(3002, () => {
console.log('Dashboard server running on port 3002');
});Dashboard Client Implementation
// React dashboard component
import React, { useState, useEffect } from 'react';
import { io } from 'socket.io-client';
import { Line } from 'react-chartjs-2';
const Dashboard = () => {
const [data, setData] = useState({
temperature: [],
humidity: [],
pressure: [],
cpu_usage: []
});
const [socket, setSocket] = useState(null);
const [subscribedTypes, setSubscribedTypes] = useState([]);
useEffect(() => {
const newSocket = io('http://localhost:3002');
setSocket(newSocket);
newSocket.on('data-update', (update) => {
setData(prev => ({
...prev,
[update.type]: [...prev[update.type], update].slice(-50) // Keep last 50 points
}));
});
newSocket.on('history-data', (history) => {
setData(prev => ({
...prev,
[history.type]: history.data
}));
});
return () => newSocket.close();
}, []);
const subscribeToData = (types) => {
if (socket) {
socket.emit('subscribe', types);
setSubscribedTypes(types);
// Request historical data
types.forEach(type => {
socket.emit('get-history', { type, limit: 100 });
});
}
};
const unsubscribeFromData = (types) => {
if (socket) {
socket.emit('unsubscribe', types);
setSubscribedTypes(prev => prev.filter(t => !types.includes(t)));
}
};
const chartData = (type) => ({
labels: data[type].map(d => new Date(d.timestamp).toLocaleTimeString()),
datasets: [{
label: type.charAt(0).toUpperCase() + type.slice(1),
data: data[type].map(d => d.value),
borderColor: 'rgb(75, 192, 192)',
backgroundColor: 'rgba(75, 192, 192, 0.2)',
tension: 0.1
}]
});
const chartOptions = {
responsive: true,
scales: {
y: {
beginAtZero: true
}
},
animation: {
duration: 0
}
};
return (
<div className="dashboard">
<div className="controls">
<button onClick={() => subscribeToData(['temperature', 'humidity'])}>
Subscribe to Environment Data
</button>
<button onClick={() => subscribeToData(['cpu_usage'])}>
Subscribe to CPU Usage
</button>
<button onClick={() => unsubscribeFromData(['temperature', 'humidity', 'cpu_usage'])}>
Unsubscribe All
</button>
</div>
<div className="charts">
{subscribedTypes.includes('temperature') && (
<div className="chart">
<h3>Temperature</h3>
<Line data={chartData('temperature')} options={chartOptions} />
</div>
)}
{subscribedTypes.includes('humidity') && (
<div className="chart">
<h3>Humidity</h3>
<Line data={chartData('humidity')} options={chartOptions} />
</div>
)}
{subscribedTypes.includes('cpu_usage') && (
<div className="chart">
<h3>CPU Usage</h3>
<Line data={chartData('cpu_usage')} options={chartOptions} />
</div>
)}
</div>
<div className="current-values">
{Object.entries(data).map(([type, values]) => (
values.length > 0 && (
<div key={type} className="value-card">
<h4>{type.charAt(0).toUpperCase() + type.slice(1)}</h4>
<p className="value">{values[values.length - 1].value.toFixed(2)}</p>
<p className="timestamp">
{new Date(values[values.length - 1].timestamp).toLocaleTimeString()}
</p>
</div>
)
))}
</div>
</div>
);
};
export default Dashboard;