Learn advanced Django performance optimization techniques, caching strategies, database optimization, and production-ready deployment practices.
Learn advanced Django performance optimization techniques, caching strategies, database optimization, and production-ready deployment practices.
Master advanced caching techniques to dramatically improve Django application performance.
Content by: Manali Trivedi
Python Django Developer
Caching is one of the most effective ways to improve Django application performance. Understanding different caching strategies and when to use them is crucial for production applications.
# Advanced Caching Strategies
# 1. View-Level Caching
from django.views.decorators.cache import cache_page
from django.utils.decorators import method_decorator
from django.views.generic import ListView
# Function-based view caching
@cache_page(60 * 15) # Cache for 15 minutes
def post_list(request):
posts = Post.objects.all()
return render(request, 'blog/post_list.html', {'posts': posts})
# Class-based view caching
@method_decorator(cache_page(60 * 15), name='dispatch')
class PostListView(ListView):
model = Post
template_name = 'blog/post_list.html'
# 2. Template Fragment Caching
{% load cache %}
{% cache 500 sidebar request.user.username %}
<!-- Expensive sidebar content -->
<div class="sidebar">
<h3>Recent Posts</h3>
{% for post in recent_posts %}
<div class="post-item">
<h4>{{ post.title }}</h4>
<p>{{ post.excerpt }}</p>
</div>
{% endfor %}
</div>
{% endcache %}
# 3. Low-Level Caching
from django.core.cache import cache
from django.core.cache import caches
# Using default cache
def get_user_posts(user_id):
cache_key = f'user_posts_{user_id}'
posts = cache.get(cache_key)
if posts is None:
posts = Post.objects.filter(author_id=user_id)
cache.set(cache_key, posts, 300) # Cache for 5 minutes
return posts
# Using specific cache backend
def get_category_posts(category_id):
cache_key = f'category_posts_{category_id}'
redis_cache = caches['redis']
posts = redis_cache.get(cache_key)
if posts is None:
posts = Post.objects.filter(category_id=category_id)
redis_cache.set(cache_key, posts, 600) # Cache for 10 minutes
return posts
# 4. Cache Versioning
def get_user_profile(user_id):
cache_key = f'user_profile_{user_id}'
version = cache.get(f'user_profile_version_{user_id}', 1)
profile = cache.get(f'{cache_key}_v{version}')
if profile is None:
profile = UserProfile.objects.get(user_id=user_id)
cache.set(f'{cache_key}_v{version}', profile, 3600)
return profile
def invalidate_user_profile(user_id):
"""Invalidate user profile cache by incrementing version"""
version_key = f'user_profile_version_{user_id}'
current_version = cache.get(version_key, 1)
cache.set(version_key, current_version + 1, 3600)
# 5. Cache Key Generation
import hashlib
import json
def generate_cache_key(prefix, *args, **kwargs):
"""Generate a consistent cache key from arguments"""
# Convert args and kwargs to a string
key_parts = [str(arg) for arg in args]
key_parts.extend([f"{k}:{v}" for k, v in sorted(kwargs.items())])
# Create a hash of the key parts
key_string = "|".join(key_parts)
key_hash = hashlib.md5(key_string.encode()).hexdigest()
return f"{prefix}_{key_hash}"
# Usage
cache_key = generate_cache_key('user_posts', user_id, category_id, page=1)
posts = cache.get(cache_key)
# 6. Cache Invalidation Patterns
class Post(models.Model):
title = models.CharField(max_length=200)
content = models.TextField()
author = models.ForeignKey(User, on_delete=models.CASCADE)
category = models.ForeignKey(Category, on_delete=models.CASCADE)
def save(self, *args, **kwargs):
# Invalidate related caches
cache.delete(f'user_posts_{self.author.id}')
cache.delete(f'category_posts_{self.category.id}')
cache.delete('recent_posts')
super().save(*args, **kwargs)
def delete(self, *args, **kwargs):
# Invalidate related caches
cache.delete(f'user_posts_{self.author.id}')
cache.delete(f'category_posts_{self.category.id}')
cache.delete('recent_posts')
super().delete(*args, **kwargs)
# 7. Cache Middleware
from django.middleware.cache import UpdateCacheMiddleware
from django.middleware.cache import FetchFromCacheMiddleware
MIDDLEWARE = [
'django.middleware.cache.UpdateCacheMiddleware',
# ... other middleware
'django.middleware.cache.FetchFromCacheMiddleware',
]
# Cache settings
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379/1',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
},
'KEY_PREFIX': 'myapp',
'TIMEOUT': 300,
'VERSION': 1,
},
'redis': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379/2',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
},
'KEY_PREFIX': 'myapp_redis',
'TIMEOUT': 600,
},
'session': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379/3',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
},
'KEY_PREFIX': 'session',
'TIMEOUT': 86400, # 24 hours
}
}
# 8. Custom Cache Backend
from django.core.cache.backends.base import BaseCache
import pickle
import time
class CustomCacheBackend(BaseCache):
def __init__(self, location, params):
super().__init__(params)
self.location = location
self._cache = {}
def get(self, key, default=None, version=None):
key = self.make_key(key, version=version)
if key in self._cache:
value, expiry = self._cache[key]
if expiry > time.time():
return value
else:
del self._cache[key]
return default
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None):
key = self.make_key(key, version=version)
expiry = time.time() + timeout if timeout else None
self._cache[key] = (value, expiry)
def delete(self, key, version=None):
key = self.make_key(key, version=version)
if key in self._cache:
del self._cache[key]
def clear(self):
self._cache.clear()# Smart Cache Manager
from django.core.cache import cache
from django.db.models import Model
from django.core.exceptions import ObjectDoesNotExist
import hashlib
import json
import time
class SmartCacheManager:
"""Advanced cache manager with automatic invalidation and smart key generation"""
def __init__(self, cache_backend=None):
self.cache = cache_backend or cache
self.cache_patterns = {}
def register_pattern(self, pattern_name, pattern_func, dependencies=None):
"""Register a cache pattern with dependencies"""
self.cache_patterns[pattern_name] = {
'func': pattern_func,
'dependencies': dependencies or [],
'last_updated': 0
}
def get_or_set(self, pattern_name, *args, **kwargs):
"""Get cached value or compute and cache it"""
if pattern_name not in self.cache_patterns:
raise ValueError(f"Unknown cache pattern: {pattern_name}")
pattern = self.cache_patterns[pattern_name]
cache_key = self._generate_key(pattern_name, *args, **kwargs)
# Try to get from cache
cached_value = self.cache.get(cache_key)
if cached_value is not None:
return cached_value
# Compute value
value = pattern['func'](*args, **kwargs)
# Cache with TTL based on pattern type
ttl = self._get_ttl(pattern_name)
self.cache.set(cache_key, value, ttl)
# Update last updated time
pattern['last_updated'] = time.time()
return value
def invalidate_pattern(self, pattern_name, *args, **kwargs):
"""Invalidate a specific cache pattern"""
if pattern_name in self.cache_patterns:
cache_key = self._generate_key(pattern_name, *args, **kwargs)
self.cache.delete(cache_key)
self.cache_patterns[pattern_name]['last_updated'] = 0
def invalidate_dependencies(self, model_class, instance_id):
"""Invalidate all cache patterns that depend on a model instance"""
for pattern_name, pattern in self.cache_patterns.items():
if model_class.__name__ in pattern['dependencies']:
# Invalidate all instances of this pattern
self._invalidate_pattern_instances(pattern_name)
def _generate_key(self, pattern_name, *args, **kwargs):
"""Generate a consistent cache key"""
key_parts = [pattern_name]
key_parts.extend([str(arg) for arg in args])
key_parts.extend([f"{k}:{v}" for k, v in sorted(kwargs.items())])
key_string = "|".join(key_parts)
return hashlib.md5(key_string.encode()).hexdigest()
def _get_ttl(self, pattern_name):
"""Get TTL for a pattern based on its type"""
ttl_map = {
'user_posts': 300, # 5 minutes
'category_posts': 600, # 10 minutes
'recent_posts': 180, # 3 minutes
'user_profile': 3600, # 1 hour
'search_results': 900, # 15 minutes
}
return ttl_map.get(pattern_name, 300)
def _invalidate_pattern_instances(self, pattern_name):
"""Invalidate all instances of a pattern (complex operation)"""
# This would require maintaining a registry of all cached keys
# For simplicity, we'll just clear the pattern's last_updated
if pattern_name in self.cache_patterns:
self.cache_patterns[pattern_name]['last_updated'] = 0
# Usage Example
cache_manager = SmartCacheManager()
# Register cache patterns
def get_user_posts(user_id):
return Post.objects.filter(author_id=user_id).select_related('author')
def get_category_posts(category_id):
return Post.objects.filter(category_id=category_id).select_related('author')
def get_recent_posts(limit=10):
return Post.objects.filter(status='published').order_by('-created_date')[:limit]
cache_manager.register_pattern('user_posts', get_user_posts, ['Post'])
cache_manager.register_pattern('category_posts', get_category_posts, ['Post'])
cache_manager.register_pattern('recent_posts', get_recent_posts, ['Post'])
# Use in views
def user_posts_view(request, user_id):
posts = cache_manager.get_or_set('user_posts', user_id)
return render(request, 'user_posts.html', {'posts': posts})
def category_posts_view(request, category_id):
posts = cache_manager.get_or_set('category_posts', category_id)
return render(request, 'category_posts.html', {'posts': posts})
# Invalidate when posts change
def post_save_handler(sender, instance, **kwargs):
cache_manager.invalidate_dependencies(Post, instance.id)
# Connect signal
from django.db.models.signals import post_save, post_delete
post_save.connect(post_save_handler, sender=Post)
post_delete.connect(post_save_handler, sender=Post)Test your understanding of this topic:
Learn advanced database optimization techniques for Django applications including query optimization, indexing, and connection management.
Content by: Manali Trivedi
Python Django Developer
Database performance is often the bottleneck in web applications. Understanding how to optimize database queries, use proper indexing, and manage connections is crucial for production applications.
# Database Optimization Techniques
# 1. Query Optimization
from django.db.models import Prefetch, Count, Sum, Avg, Min, Max
from django.db import connection
# Use select_related for ForeignKey relationships
posts = Post.objects.select_related('author', 'category').all()
# Use prefetch_related for ManyToMany and reverse ForeignKey
posts = Post.objects.prefetch_related(
'tags',
Prefetch('comments', queryset=Comment.objects.select_related('author'))
).all()
# Use only() to limit fields
posts = Post.objects.only('title', 'created_date', 'author__username').all()
# Use defer() to exclude heavy fields
posts = Post.objects.defer('content', 'author__bio').all()
# Use values() for dictionary results
post_data = Post.objects.values('title', 'created_date', 'author__username').all()
# Use values_list() for tuple results
post_titles = Post.objects.values_list('title', flat=True).all()
# 2. Advanced Query Optimization
class PostQuerySet(models.QuerySet):
def with_related_data(self):
"""Optimize queries with all related data"""
return self.select_related('author', 'category').prefetch_related(
'tags',
Prefetch('comments', queryset=Comment.objects.select_related('author')),
Prefetch('likes', queryset=Like.objects.select_related('user'))
)
def published(self):
"""Filter only published posts"""
return self.filter(status='published')
def by_category(self, category_id):
"""Filter by category with optimization"""
return self.filter(category_id=category_id).select_related('category')
def with_stats(self):
"""Add computed statistics"""
return self.annotate(
comment_count=Count('comments'),
like_count=Count('likes'),
view_count=Count('views')
)
# Use the custom QuerySet
class Post(models.Model):
# ... fields ...
objects = PostQuerySet.as_manager()
class Meta:
indexes = [
models.Index(fields=['status', 'created_date']),
models.Index(fields=['author', 'created_date']),
models.Index(fields=['category', 'published_date']),
models.Index(fields=['title']), # For LIKE queries
]
# 3. Database Indexing Strategies
class AdvancedPost(models.Model):
title = models.CharField(max_length=200, db_index=True)
content = models.TextField()
author = models.ForeignKey(User, on_delete=models.CASCADE, db_index=True)
category = models.ForeignKey(Category, on_delete=models.CASCADE, db_index=True)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, db_index=True)
created_date = models.DateTimeField(auto_now_add=True, db_index=True)
published_date = models.DateTimeField(null=True, blank=True, db_index=True)
is_featured = models.BooleanField(default=False, db_index=True)
class Meta:
indexes = [
# Composite indexes for common query patterns
models.Index(fields=['status', 'published_date', 'created_date']),
models.Index(fields=['author', 'status', 'created_date']),
models.Index(fields=['category', 'status', 'published_date']),
models.Index(fields=['is_featured', 'published_date']),
# Partial indexes for specific conditions
models.Index(
fields=['published_date'],
condition=models.Q(status='published')
),
# Functional indexes (PostgreSQL)
models.Index(
fields=['title'],
name='post_title_gin',
opclasses=['gin_trgm_ops'] # For trigram search
),
]
# 4. Query Analysis and Debugging
from django.db import connection
from django.test.utils import CaptureQueriesContext
def analyze_queries(func):
"""Decorator to analyze database queries"""
def wrapper(*args, **kwargs):
with CaptureQueriesContext(connection) as context:
result = func(*args, **kwargs)
print(f"Function {func.__name__} executed {len(context.captured_queries)} queries:")
for i, query in enumerate(context.captured_queries):
print(f"Query {i+1}: {query['sql']}")
print(f"Time: {query['time']}s")
return result
return wrapper
# Usage
@analyze_queries
def get_posts_with_analysis():
return Post.objects.select_related('author').prefetch_related('tags').all()
# 5. Database Connection Management
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'mydb',
'USER': 'myuser',
'PASSWORD': 'mypassword',
'HOST': 'localhost',
'PORT': '5432',
# Connection optimization
'CONN_MAX_AGE': 60, # Keep connections alive for 60 seconds
'OPTIONS': {
'MAX_CONNS': 20, # Maximum connections
'MIN_CONNS': 5, # Minimum connections
},
# Query optimization
'ATOMIC_REQUESTS': False, # Disable for better performance
'AUTOCOMMIT': True,
}
}
# 6. Raw SQL Optimization
from django.db import connection
def get_posts_with_raw_sql():
"""Use raw SQL for complex queries"""
with connection.cursor() as cursor:
cursor.execute("""
SELECT
p.id,
p.title,
p.created_date,
u.username as author_name,
c.name as category_name,
COUNT(cm.id) as comment_count,
COUNT(l.id) as like_count
FROM blog_post p
JOIN auth_user u ON p.author_id = u.id
JOIN blog_category c ON p.category_id = c.id
LEFT JOIN blog_comment cm ON p.id = cm.post_id
LEFT JOIN blog_like l ON p.id = l.post_id
WHERE p.status = 'published'
GROUP BY p.id, p.title, p.created_date, u.username, c.name
ORDER BY p.created_date DESC
""")
columns = [col[0] for col in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
# 7. Database Partitioning (PostgreSQL)
class PartitionedPost(models.Model):
title = models.CharField(max_length=200)
content = models.TextField()
created_date = models.DateField()
class Meta:
managed = False # Django won't manage this table
# Create partitioned table
db_table = 'blog_post_partitioned'
# Migration for partitioning
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('blog', '0001_initial'),
]
operations = [
migrations.RunSQL("""
-- Create partitioned table
CREATE TABLE blog_post_partitioned (
id SERIAL,
title VARCHAR(200),
content TEXT,
created_date DATE
) PARTITION BY RANGE (created_date);
-- Create partitions for each month
CREATE TABLE blog_post_2024_01 PARTITION OF blog_post_partitioned
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE blog_post_2024_02 PARTITION OF blog_post_partitioned
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
"""),
]
# 8. Query Result Caching
from django.core.cache import cache
from django.db.models.query import QuerySet
class CachedQuerySet(QuerySet):
def cache_results(self, timeout=300):
"""Cache the results of this queryset"""
cache_key = self._get_cache_key()
results = cache.get(cache_key)
if results is None:
results = list(self)
cache.set(cache_key, results, timeout)
return results
def _get_cache_key(self):
"""Generate a cache key for this queryset"""
query_str = str(self.query)
return f"queryset_{hash(query_str)}"
# Usage
posts = Post.objects.filter(status='published').cache_results(timeout=600)Test your understanding of this topic:
Learn production deployment strategies, server configuration, and monitoring for Django applications.
Content by: Manali Trivedi
Python Django Developer
Deploying Django applications to production requires careful planning, proper server configuration, and ongoing monitoring to ensure reliability and performance.
# Production Deployment Strategies
# 1. Docker Deployment
# Dockerfile
FROM python:3.11-slim
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV DJANGO_SETTINGS_MODULE=myproject.settings.production
# Set work directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y postgresql-client nginx && rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy project
COPY . .
# Collect static files
RUN python manage.py collectstatic --noinput
# Create non-root user
RUN useradd -m -u 1000 django && chown -R django:django /app
USER django
# Expose port
EXPOSE 8000
# Start command
CMD ["gunicorn", "myproject.wsgi:application", "--bind", "0.0.0.0:8000"]
# docker-compose.yml
version: '3.8'
services:
web:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/dbname
- REDIS_URL=redis://redis:6379/0
- SECRET_KEY=${SECRET_KEY}
- DEBUG=False
depends_on:
- db
- redis
volumes:
- static_volume:/app/staticfiles
- media_volume:/app/media
restart: unless-stopped
db:
image: postgres:13
environment:
- POSTGRES_DB=dbname
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
redis:
image: redis:6-alpine
volumes:
- redis_data:/data
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- static_volume:/app/staticfiles
- media_volume:/app/media
- ./ssl:/etc/nginx/ssl
depends_on:
- web
restart: unless-stopped
volumes:
postgres_data:
redis_data:
static_volume:
media_volume:
# 2. Nginx Configuration
# nginx.conf
events {
worker_connections 1024;
}
http {
upstream django {
server web:8000;
}
# Rate limiting
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
limit_req_zone $binary_remote_addr zone=login:10m rate=5r/m;
server {
listen 80;
server_name yourdomain.com;
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name yourdomain.com;
# SSL configuration
ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/key.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-RSA-AES256-GCM-SHA512:DHE-RSA-AES256-GCM-SHA512;
ssl_prefer_server_ciphers off;
# Security headers
add_header X-Frame-Options DENY;
add_header X-Content-Type-Options nosniff;
add_header X-XSS-Protection "1; mode=block";
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains";
# Rate limiting
location /api/ {
limit_req zone=api burst=20 nodelay;
proxy_pass http://django;
}
location /login/ {
limit_req zone=login burst=5 nodelay;
proxy_pass http://django;
}
# Static files
location /static/ {
alias /app/staticfiles/;
expires 1y;
add_header Cache-Control "public, immutable";
}
# Media files
location /media/ {
alias /app/media/;
expires 1y;
add_header Cache-Control "public";
}
# Proxy to Django
location / {
proxy_pass http://django;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
}
# 3. Gunicorn Configuration
# gunicorn.conf.py
import multiprocessing
import os
# Server socket
bind = "0.0.0.0:8000"
backlog = 2048
# Worker processes
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "sync"
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 50
preload_app = True
# Logging
accesslog = "-"
errorlog = "-"
loglevel = "info"
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'
# Process naming
proc_name = "django_app"
# Server mechanics
daemon = False
pidfile = "/tmp/gunicorn.pid"
user = None
group = None
tmp_upload_dir = None
# SSL
keyfile = None
certfile = None
# 4. Production Settings
# settings/production.py
import os
from .base import *
# Security
DEBUG = False
SECRET_KEY = os.environ.get('SECRET_KEY')
ALLOWED_HOSTS = os.environ.get('ALLOWED_HOSTS', '').split(',')
# Database
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': os.environ.get('DB_NAME'),
'USER': os.environ.get('DB_USER'),
'PASSWORD': os.environ.get('DB_PASSWORD'),
'HOST': os.environ.get('DB_HOST'),
'PORT': os.environ.get('DB_PORT', '5432'),
'OPTIONS': {
'CONN_MAX_AGE': 60,
},
}
}
# Cache
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': os.environ.get('REDIS_URL'),
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
},
}
}
# Static files
STATIC_ROOT = os.path.join(BASE_DIR, 'staticfiles')
STATICFILES_STORAGE = 'django.contrib.staticfiles.storage.ManifestStaticFilesStorage'
# Media files
MEDIA_ROOT = os.path.join(BASE_DIR, 'media')
MEDIA_URL = '/media/'
# Security settings
SECURE_SSL_REDIRECT = True
SECURE_HSTS_SECONDS = 31536000
SECURE_HSTS_INCLUDE_SUBDOMAINS = True
SECURE_HSTS_PRELOAD = True
SECURE_BROWSER_XSS_FILTER = True
SECURE_CONTENT_TYPE_NOSNIFF = True
SESSION_COOKIE_SECURE = True
CSRF_COOKIE_SECURE = True
X_FRAME_OPTIONS = 'DENY'
# Logging
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
'style': '{',
},
},
'handlers': {
'file': {
'level': 'INFO',
'class': 'logging.FileHandler',
'filename': '/var/log/django/app.log',
'formatter': 'verbose',
},
'console': {
'level': 'INFO',
'class': 'logging.StreamHandler',
'formatter': 'verbose',
},
},
'root': {
'handlers': ['file', 'console'],
'level': 'INFO',
},
}
# 5. Deployment Scripts
# deploy.sh
#!/bin/bash
echo "Starting deployment..."
# Pull latest code
git pull origin main
# Install dependencies
pip install -r requirements.txt
# Run migrations
python manage.py migrate
# Collect static files
python manage.py collectstatic --noinput
# Restart services
sudo systemctl restart gunicorn
sudo systemctl restart nginx
echo "Deployment completed!"
# 6. Systemd Service Files
# /etc/systemd/system/gunicorn.service
[Unit]
Description=Gunicorn daemon for Django application
After=network.target
[Service]
User=django
Group=django
WorkingDirectory=/app
ExecStart=/app/venv/bin/gunicorn --config gunicorn.conf.py myproject.wsgi:application
ExecReload=/bin/kill -s HUP $MAINPID
KillMode=mixed
TimeoutStopSec=5
PrivateTmp=true
[Install]
WantedBy=multi-user.target
# 7. Monitoring and Health Checks
# health_check.py
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from django.core.cache import cache
from django.db import connection
import psutil
import os
@require_http_methods(["GET"])
def health_check(request):
"""Comprehensive health check endpoint"""
health_status = {
'status': 'healthy',
'timestamp': timezone.now().isoformat(),
'checks': {}
}
# Database check
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
health_status['checks']['database'] = 'healthy'
except Exception as e:
health_status['checks']['database'] = f'unhealthy: {str(e)}'
health_status['status'] = 'unhealthy'
# Cache check
try:
cache.set('health_check', 'ok', 10)
if cache.get('health_check') == 'ok':
health_status['checks']['cache'] = 'healthy'
else:
health_status['checks']['cache'] = 'unhealthy'
health_status['status'] = 'unhealthy'
except Exception as e:
health_status['checks']['cache'] = f'unhealthy: {str(e)}'
health_status['status'] = 'unhealthy'
# System resources
try:
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
health_status['checks']['system'] = {
'cpu_percent': cpu_percent,
'memory_percent': memory.percent,
'disk_percent': disk.percent
}
# Check if resources are within limits
if cpu_percent > 90 or memory.percent > 90 or disk.percent > 90:
health_status['status'] = 'degraded'
except Exception as e:
health_status['checks']['system'] = f'unhealthy: {str(e)}'
health_status['status'] = 'unhealthy'
# Return appropriate status code
status_code = 200 if health_status['status'] == 'healthy' else 503
return JsonResponse(health_status, status=status_code)Test your understanding of this topic:
Continue your learning journey and master the next set of concepts.
Continue to Module 8