Learn to build real-time applications with WebSockets, Socket.io, and live data streaming using Node.js.
Learn to build real-time applications with WebSockets, Socket.io, and live data streaming using Node.js.
Learn to implement WebSockets for real-time bidirectional communication
Content by: Bansi Patel
Node.js Developer
WebSockets provide a persistent connection between a client and server, allowing real-time bidirectional communication. Unlike HTTP requests, WebSockets maintain an open connection for instant data exchange.
// Install ws library
npm install ws
// Basic WebSocket server
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', function connection(ws) {
console.log('New client connected');
// Send welcome message
ws.send(JSON.stringify({
type: 'welcome',
message: 'Welcome to the WebSocket server!'
}));
// Handle incoming messages
ws.on('message', function incoming(message) {
try {
const data = JSON.parse(message);
console.log('Received:', data);
// Echo the message back
ws.send(JSON.stringify({
type: 'echo',
message: data.message,
timestamp: new Date().toISOString()
}));
} catch (error) {
ws.send(JSON.stringify({
type: 'error',
message: 'Invalid JSON format'
}));
}
});
// Handle client disconnect
ws.on('close', function close() {
console.log('Client disconnected');
});
// Handle errors
ws.on('error', function error(err) {
console.error('WebSocket error:', err);
});
});
console.log('WebSocket server running on port 8080');
// WebSocket client (browser)
const ws = new WebSocket('ws://localhost:8080');
ws.onopen = function(event) {
console.log('Connected to WebSocket server');
// Send a message
ws.send(JSON.stringify({
type: 'chat',
message: 'Hello from client!'
}));
};
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
console.log('Received:', data);
if (data.type === 'welcome') {
console.log('Server says:', data.message);
} else if (data.type === 'echo') {
console.log('Echo:', data.message);
}
};
ws.onclose = function(event) {
console.log('Disconnected from server');
};
ws.onerror = function(error) {
console.error('WebSocket error:', error);
};
// Send message function
function sendMessage(message) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: 'chat',
message: message
}));
}
}
Test your understanding of this topic:
Master Socket.io for building robust real-time applications with rooms, namespaces, and middleware
Content by: Rahul Sonchhatra
Node.js Developer
Socket.io is a JavaScript library that enables real-time, bidirectional communication between web clients and servers. It provides additional features like automatic reconnection, room management, and cross-browser compatibility.
// Install Socket.io
npm install socket.io
// Socket.io server with Express
const express = require('express');
const http = require('http');
const socketIo = require('socket.io');
const app = express();
const server = http.createServer(app);
const io = socketIo(server, {
cors: {
origin: "http://localhost:3000",
methods: ["GET", "POST"]
}
});
// Connection handling
io.on('connection', (socket) => {
console.log('User connected:', socket.id);
// Join a room
socket.on('join-room', (roomName) => {
socket.join(roomName);
socket.to(roomName).emit('user-joined', {
userId: socket.id,
message: 'A new user joined the room'
});
console.log(`User ${socket.id} joined room: ${roomName}`);
});
// Handle chat messages
socket.on('send-message', (data) => {
socket.to(data.room).emit('receive-message', {
userId: socket.id,
message: data.message,
timestamp: new Date().toISOString()
});
});
// Handle typing indicators
socket.on('typing', (data) => {
socket.to(data.room).emit('user-typing', {
userId: socket.id,
isTyping: true
});
});
socket.on('stop-typing', (data) => {
socket.to(data.room).emit('user-typing', {
userId: socket.id,
isTyping: false
});
});
// Handle disconnection
socket.on('disconnect', () => {
console.log('User disconnected:', socket.id);
});
});
// Middleware for authentication
io.use((socket, next) => {
const token = socket.handshake.auth.token;
if (token) {
// Verify token here
socket.userId = 'user-' + Math.random();
next();
} else {
next(new Error('Authentication error'));
}
});
server.listen(3001, () => {
console.log('Socket.io server running on port 3001');
});
// Socket.io client
import { io } from 'socket.io-client';
const socket = io('http://localhost:3001', {
auth: {
token: 'your-auth-token'
}
});
// Connection events
socket.on('connect', () => {
console.log('Connected to server');
console.log('Socket ID:', socket.id);
});
socket.on('disconnect', () => {
console.log('Disconnected from server');
});
socket.on('connect_error', (error) => {
console.error('Connection error:', error);
});
// Join a room
function joinRoom(roomName) {
socket.emit('join-room', roomName);
}
// Send a message
function sendMessage(roomName, message) {
socket.emit('send-message', {
room: roomName,
message: message
});
}
// Listen for messages
socket.on('receive-message', (data) => {
console.log('New message:', data);
// Update UI with new message
});
// Listen for user join/leave
socket.on('user-joined', (data) => {
console.log('User joined:', data);
// Update UI to show new user
});
socket.on('user-left', (data) => {
console.log('User left:', data);
// Update UI to show user left
});
// Typing indicators
function startTyping(roomName) {
socket.emit('typing', { room: roomName });
}
function stopTyping(roomName) {
socket.emit('stop-typing', { room: roomName });
}
socket.on('user-typing', (data) => {
console.log('User typing:', data);
// Show typing indicator in UI
});
// Reconnection handling
socket.on('reconnect', (attemptNumber) => {
console.log('Reconnected after', attemptNumber, 'attempts');
});
socket.on('reconnect_attempt', (attemptNumber) => {
console.log('Reconnection attempt:', attemptNumber);
});
Test your understanding of this topic:
Build a complete real-time chat application with user authentication, message persistence, and advanced features
Content by: Dharmik Patel
Node.js Developer
A real-time chat application requires careful consideration of user authentication, message persistence, room management, and real-time updates. We'll build a complete chat system with MongoDB for message storage.
// Chat server with Socket.io and MongoDB
const express = require('express');
const http = require('http');
const socketIo = require('socket.io');
const mongoose = require('mongoose');
const jwt = require('jsonwebtoken');
const app = express();
const server = http.createServer(app);
const io = socketIo(server, {
cors: { origin: "http://localhost:3000" }
});
// MongoDB connection
mongoose.connect('mongodb://localhost:27017/chatapp');
// Message Schema
const messageSchema = new mongoose.Schema({
room: String,
userId: String,
username: String,
message: String,
timestamp: { type: Date, default: Date.now }
});
const Message = mongoose.model('Message', messageSchema);
// User Schema
const userSchema = new mongoose.Schema({
username: String,
email: String,
password: String,
isOnline: { type: Boolean, default: false },
lastSeen: { type: Date, default: Date.now }
});
const User = mongoose.model('User', userSchema);
// Socket.io connection handling
io.on('connection', async (socket) => {
console.log('User connected:', socket.id);
// Authenticate user
socket.on('authenticate', async (token) => {
try {
const decoded = jwt.verify(token, 'your-secret-key');
const user = await User.findById(decoded.userId);
if (user) {
socket.userId = user._id;
socket.username = user.username;
// Update user status
await User.findByIdAndUpdate(user._id, {
isOnline: true,
lastSeen: new Date()
});
socket.emit('authenticated', {
userId: user._id,
username: user.username
});
// Notify other users
socket.broadcast.emit('user-online', {
userId: user._id,
username: user.username
});
}
} catch (error) {
socket.emit('auth-error', 'Invalid token');
}
});
// Join a chat room
socket.on('join-room', async (roomName) => {
socket.join(roomName);
// Load previous messages
const messages = await Message.find({ room: roomName })
.sort({ timestamp: 1 })
.limit(50);
socket.emit('room-messages', messages);
socket.to(roomName).emit('user-joined', {
userId: socket.userId,
username: socket.username,
message: socket.username + ' joined the room'
});
});
// Send message
socket.on('send-message', async (data) => {
const newMessage = new Message({
room: data.room,
userId: socket.userId,
username: socket.username,
message: data.message
});
await newMessage.save();
// Broadcast to room
socket.to(data.room).emit('new-message', {
userId: socket.userId,
username: socket.username,
message: data.message,
timestamp: newMessage.timestamp
});
});
// Typing indicators
socket.on('typing', (data) => {
socket.to(data.room).emit('user-typing', {
userId: socket.userId,
username: socket.username
});
});
socket.on('stop-typing', (data) => {
socket.to(data.room).emit('user-stop-typing', {
userId: socket.userId,
username: socket.username
});
});
// Handle disconnection
socket.on('disconnect', async () => {
if (socket.userId) {
await User.findByIdAndUpdate(socket.userId, {
isOnline: false,
lastSeen: new Date()
});
socket.broadcast.emit('user-offline', {
userId: socket.userId,
username: socket.username
});
}
console.log('User disconnected:', socket.id);
});
});
// API routes for user management
app.get('/api/users', async (req, res) => {
try {
const users = await User.find({}, 'username isOnline lastSeen');
res.json(users);
} catch (error) {
res.status(500).json({ error: 'Failed to fetch users' });
}
});
app.get('/api/messages/:room', async (req, res) => {
try {
const messages = await Message.find({ room: req.params.room })
.sort({ timestamp: -1 })
.limit(100);
res.json(messages.reverse());
} catch (error) {
res.status(500).json({ error: 'Failed to fetch messages' });
}
});
server.listen(3001, () => {
console.log('Chat server running on port 3001');
});
// React chat client component
import React, { useState, useEffect, useRef } from 'react';
import { io } from 'socket.io-client';
const ChatRoom = ({ roomName, user }) => {
const [messages, setMessages] = useState([]);
const [newMessage, setNewMessage] = useState('');
const [isTyping, setIsTyping] = useState(false);
const [typingUsers, setTypingUsers] = useState([]);
const [socket, setSocket] = useState(null);
const messagesEndRef = useRef(null);
useEffect(() => {
const newSocket = io('http://localhost:3001');
setSocket(newSocket);
// Authenticate
newSocket.emit('authenticate', localStorage.getItem('token'));
newSocket.on('authenticated', (data) => {
console.log('Authenticated:', data);
newSocket.emit('join-room', roomName);
});
newSocket.on('room-messages', (roomMessages) => {
setMessages(roomMessages);
});
newSocket.on('new-message', (message) => {
setMessages(prev => [...prev, message]);
});
newSocket.on('user-typing', (data) => {
setTypingUsers(prev => [...prev.filter(u => u.userId !== data.userId), data]);
});
newSocket.on('user-stop-typing', (data) => {
setTypingUsers(prev => prev.filter(u => u.userId !== data.userId));
});
return () => newSocket.close();
}, [roomName]);
useEffect(() => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
}, [messages]);
const sendMessage = () => {
if (newMessage.trim() && socket) {
socket.emit('send-message', {
room: roomName,
message: newMessage
});
setNewMessage('');
}
};
const handleTyping = () => {
if (!isTyping) {
setIsTyping(true);
socket.emit('typing', { room: roomName });
}
};
const handleStopTyping = () => {
setIsTyping(false);
socket.emit('stop-typing', { room: roomName });
};
return (
<div className="chat-room">
<div className="messages">
{messages.map((msg, index) => (
<div key={index} className={`message ${msg.userId === user.id ? 'own' : 'other'}`}>
<div className="message-header">
<span className="username">{msg.username}</span>
<span className="timestamp">
{new Date(msg.timestamp).toLocaleTimeString()}
</span>
</div>
<div className="message-content">{msg.message}</div>
</div>
))}
{typingUsers.length > 0 && (
<div className="typing-indicator">
{typingUsers.map(user => user.username).join(', ')} is typing...
</div>
)}
<div ref={messagesEndRef} />
</div>
<div className="message-input">
<input
type="text"
value={newMessage}
onChange={(e) => setNewMessage(e.target.value)}
onKeyPress={(e) => e.key === 'Enter' && sendMessage()}
onKeyDown={handleTyping}
onKeyUp={handleStopTyping}
placeholder="Type a message..."
/>
<button onClick={sendMessage}>Send</button>
</div>
</div>
);
};
export default ChatRoom;
Test your understanding of this topic:
Implement live data streaming for real-time dashboards, notifications, and data visualization
Content by: Sachin Patel
Node.js Developer
Live data streaming enables real-time updates for dashboards, analytics, and monitoring systems. We'll implement streaming for various data sources including databases, APIs, and IoT devices.
// Real-time dashboard server
const express = require('express');
const http = require('http');
const socketIo = require('socket.io');
const mongoose = require('mongoose');
const app = express();
const server = http.createServer(app);
const io = socketIo(server, {
cors: { origin: "*" }
});
// Connect to MongoDB
mongoose.connect('mongodb://localhost:27017/dashboard');
// Data Schema
const dataSchema = new mongoose.Schema({
type: String,
value: Number,
timestamp: { type: Date, default: Date.now },
metadata: mongoose.Schema.Types.Mixed
});
const DataPoint = mongoose.model('DataPoint', dataSchema);
// Simulate real-time data
function generateData() {
const dataTypes = ['temperature', 'humidity', 'pressure', 'cpu_usage'];
const type = dataTypes[Math.floor(Math.random() * dataTypes.length)];
let value;
switch(type) {
case 'temperature':
value = 20 + Math.random() * 30; // 20-50°C
break;
case 'humidity':
value = 30 + Math.random() * 50; // 30-80%
break;
case 'pressure':
value = 1000 + Math.random() * 100; // 1000-1100 hPa
break;
case 'cpu_usage':
value = Math.random() * 100; // 0-100%
break;
}
return { type, value, metadata: { source: 'sensor-1' } };
}
// Stream data to connected clients
function streamData() {
const data = generateData();
// Save to database
const dataPoint = new DataPoint(data);
dataPoint.save();
// Emit to all connected clients
io.emit('data-update', {
...data,
timestamp: dataPoint.timestamp
});
// Emit to specific data type subscribers
io.to(data.type).emit('data-update', {
...data,
timestamp: dataPoint.timestamp
});
}
// Socket connection handling
io.on('connection', (socket) => {
console.log('Dashboard client connected:', socket.id);
// Subscribe to specific data types
socket.on('subscribe', (dataTypes) => {
dataTypes.forEach(type => {
socket.join(type);
});
console.log(`Client subscribed to: ${dataTypes.join(', ')}`);
});
// Unsubscribe from data types
socket.on('unsubscribe', (dataTypes) => {
dataTypes.forEach(type => {
socket.leave(type);
});
});
// Request historical data
socket.on('get-history', async (data) => {
try {
const { type, limit = 100 } = data;
const history = await DataPoint.find({ type })
.sort({ timestamp: -1 })
.limit(limit);
socket.emit('history-data', {
type,
data: history.reverse()
});
} catch (error) {
socket.emit('error', 'Failed to fetch historical data');
}
});
// Handle disconnection
socket.on('disconnect', () => {
console.log('Dashboard client disconnected:', socket.id);
});
});
// Start data streaming
setInterval(streamData, 1000); // Stream every second
// API endpoints
app.get('/api/data/:type', async (req, res) => {
try {
const data = await DataPoint.find({ type: req.params.type })
.sort({ timestamp: -1 })
.limit(100);
res.json(data.reverse());
} catch (error) {
res.status(500).json({ error: 'Failed to fetch data' });
}
});
app.get('/api/data', async (req, res) => {
try {
const data = await DataPoint.find()
.sort({ timestamp: -1 })
.limit(100);
res.json(data.reverse());
} catch (error) {
res.status(500).json({ error: 'Failed to fetch data' });
}
});
server.listen(3002, () => {
console.log('Dashboard server running on port 3002');
});
// React dashboard component
import React, { useState, useEffect } from 'react';
import { io } from 'socket.io-client';
import { Line } from 'react-chartjs-2';
const Dashboard = () => {
const [data, setData] = useState({
temperature: [],
humidity: [],
pressure: [],
cpu_usage: []
});
const [socket, setSocket] = useState(null);
const [subscribedTypes, setSubscribedTypes] = useState([]);
useEffect(() => {
const newSocket = io('http://localhost:3002');
setSocket(newSocket);
newSocket.on('data-update', (update) => {
setData(prev => ({
...prev,
[update.type]: [...prev[update.type], update].slice(-50) // Keep last 50 points
}));
});
newSocket.on('history-data', (history) => {
setData(prev => ({
...prev,
[history.type]: history.data
}));
});
return () => newSocket.close();
}, []);
const subscribeToData = (types) => {
if (socket) {
socket.emit('subscribe', types);
setSubscribedTypes(types);
// Request historical data
types.forEach(type => {
socket.emit('get-history', { type, limit: 100 });
});
}
};
const unsubscribeFromData = (types) => {
if (socket) {
socket.emit('unsubscribe', types);
setSubscribedTypes(prev => prev.filter(t => !types.includes(t)));
}
};
const chartData = (type) => ({
labels: data[type].map(d => new Date(d.timestamp).toLocaleTimeString()),
datasets: [{
label: type.charAt(0).toUpperCase() + type.slice(1),
data: data[type].map(d => d.value),
borderColor: 'rgb(75, 192, 192)',
backgroundColor: 'rgba(75, 192, 192, 0.2)',
tension: 0.1
}]
});
const chartOptions = {
responsive: true,
scales: {
y: {
beginAtZero: true
}
},
animation: {
duration: 0
}
};
return (
<div className="dashboard">
<div className="controls">
<button onClick={() => subscribeToData(['temperature', 'humidity'])}>
Subscribe to Environment Data
</button>
<button onClick={() => subscribeToData(['cpu_usage'])}>
Subscribe to CPU Usage
</button>
<button onClick={() => unsubscribeFromData(['temperature', 'humidity', 'cpu_usage'])}>
Unsubscribe All
</button>
</div>
<div className="charts">
{subscribedTypes.includes('temperature') && (
<div className="chart">
<h3>Temperature</h3>
<Line data={chartData('temperature')} options={chartOptions} />
</div>
)}
{subscribedTypes.includes('humidity') && (
<div className="chart">
<h3>Humidity</h3>
<Line data={chartData('humidity')} options={chartOptions} />
</div>
)}
{subscribedTypes.includes('cpu_usage') && (
<div className="chart">
<h3>CPU Usage</h3>
<Line data={chartData('cpu_usage')} options={chartOptions} />
</div>
)}
</div>
<div className="current-values">
{Object.entries(data).map(([type, values]) => (
values.length > 0 && (
<div key={type} className="value-card">
<h4>{type.charAt(0).toUpperCase() + type.slice(1)}</h4>
<p className="value">{values[values.length - 1].value.toFixed(2)}</p>
<p className="timestamp">
{new Date(values[values.length - 1].timestamp).toLocaleTimeString()}
</p>
</div>
)
))}
</div>
</div>
);
};
export default Dashboard;
Test your understanding of this topic:
Continue your learning journey and master the next set of concepts.
Continue to Module 8