Learn advanced Django patterns and custom development techniques.
Learn advanced Django patterns and custom development techniques.
Learn to create custom Django management commands for automation and administrative tasks
Content by: Manali Trivedi
Python Django Developer
Django management commands are powerful tools for automating tasks, data processing, and administrative operations. They run from the command line using `python manage.py`.
# Basic Management Command Structure
# management/commands/my_command.py
from django.core.management.base import BaseCommand, CommandError
class Command(BaseCommand):
help = 'Description of what this command does'
def add_arguments(self, parser):
parser.add_argument('--force', action='store_true', help='Force execution')
parser.add_argument('--limit', type=int, default=100, help='Limit number of records')
def handle(self, *args, **options):
try:
# Command logic here
self.stdout.write(self.style.SUCCESS('Command executed successfully'))
except Exception as e:
raise CommandError(f'Command failed: {e}')
# Example: Data Import Command
class Command(BaseCommand):
help = 'Import users from CSV file'
def add_arguments(self, parser):
parser.add_argument('csv_file', type=str, help='Path to CSV file')
parser.add_argument('--dry-run', action='store_true', help='Test run without saving')
def handle(self, *args, **options):
csv_file = options['csv_file']
dry_run = options['dry_run']
if dry_run:
self.stdout.write('DRY RUN MODE - No data will be saved')
# Import logic here
imported_count = 0
self.stdout.write(
self.style.SUCCESS(f'Successfully imported {imported_count} users')
)# Advanced Management Commands
# 1. Database Maintenance Command
from django.core.management.base import BaseCommand
from django.db import connection
from django.core.cache import cache
import time
class Command(BaseCommand):
help = 'Perform database maintenance tasks'
def add_arguments(self, parser):
parser.add_argument(
'--vacuum',
action='store_true',
help='Run VACUUM on PostgreSQL tables'
)
parser.add_argument(
'--analyze',
action='store_true',
help='Run ANALYZE on tables'
)
parser.add_argument(
'--cache-clear',
action='store_true',
help='Clear all cache'
)
def handle(self, *args, **options):
start_time = time.time()
if options['vacuum']:
self.vacuum_database()
if options['analyze']:
self.analyze_tables()
if options['cache_clear']:
self.clear_cache()
execution_time = time.time() - start_time
self.stdout.write(
self.style.SUCCESS(f'Maintenance completed in {execution_time:.2f} seconds')
)
def vacuum_database(self):
"""Run VACUUM on PostgreSQL tables"""
with connection.cursor() as cursor:
cursor.execute("VACUUM ANALYZE;")
self.stdout.write('Database vacuum completed')
def analyze_tables(self):
"""Run ANALYZE on tables"""
with connection.cursor() as cursor:
cursor.execute("ANALYZE;")
self.stdout.write('Table analysis completed')
def clear_cache(self):
"""Clear all cache"""
cache.clear()
self.stdout.write('Cache cleared')
# 2. Data Export Command
import csv
import json
from django.core.serializers import serialize
from django.conf import settings
class Command(BaseCommand):
help = 'Export data to various formats'
def add_arguments(self, parser):
parser.add_argument('model', type=str, help='Model name to export')
parser.add_argument(
'--format',
choices=['csv', 'json', 'xml'],
default='csv',
help='Export format'
)
parser.add_argument(
'--output',
type=str,
help='Output file path'
)
parser.add_argument(
'--fields',
nargs='+',
help='Fields to export'
)
def handle(self, *args, **options):
model_name = options['model']
export_format = options['format']
output_file = options['output']
fields = options['fields']
# Get model class
from django.apps import apps
model = apps.get_model(apps.get_app_config('myapp').label, model_name)
# Get queryset
queryset = model.objects.all()
if fields:
queryset = queryset.only(*fields)
# Export based on format
if export_format == 'csv':
self.export_csv(queryset, output_file, fields)
elif export_format == 'json':
self.export_json(queryset, output_file)
elif export_format == 'xml':
self.export_xml(queryset, output_file)
def export_csv(self, queryset, output_file, fields):
"""Export to CSV format"""
if not output_file:
output_file = f'{queryset.model._meta.model_name}_export.csv'
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
if fields:
writer = csv.DictWriter(csvfile, fieldnames=fields)
else:
writer = csv.DictWriter(csvfile, fieldnames=[f.name for f in queryset.model._meta.fields])
writer.writeheader()
for obj in queryset:
if fields:
row = {field: getattr(obj, field) for field in fields}
else:
row = {f.name: getattr(obj, f.name) for f in queryset.model._meta.fields}
writer.writerow(row)
self.stdout.write(
self.style.SUCCESS(f'CSV export completed: {output_file}')
)
def export_json(self, queryset, output_file):
"""Export to JSON format"""
if not output_file:
output_file = f'{queryset.model._meta.model_name}_export.json'
data = serialize('json', queryset)
with open(output_file, 'w', encoding='utf-8') as f:
f.write(data)
self.stdout.write(
self.style.SUCCESS(f'JSON export completed: {output_file}')
)
def export_xml(self, queryset, output_file):
"""Export to XML format"""
if not output_file:
output_file = f'{queryset.model._meta.model_name}_export.xml'
data = serialize('xml', queryset)
with open(output_file, 'w', encoding='utf-8') as f:
f.write(data)
self.stdout.write(
self.style.SUCCESS(f'XML export completed: {output_file}')
)
# 3. System Health Check Command
import psutil
import os
from django.core.cache import cache
from django.db import connection
class Command(BaseCommand):
help = 'Check system health and performance'
def add_arguments(self, parser):
parser.add_argument(
'--detailed',
action='store_true',
help='Show detailed information'
)
parser.add_argument(
'--threshold',
type=int,
default=80,
help='Warning threshold percentage'
)
def handle(self, *args, **options):
self.stdout.write('System Health Check')
self.stdout.write('=' * 50)
# Check system resources
self.check_system_resources(options['detailed'], options['threshold'])
# Check database
self.check_database()
# Check cache
self.check_cache()
# Check disk space
self.check_disk_space(options['threshold'])
self.stdout.write('=' * 50)
self.stdout.write(self.style.SUCCESS('Health check completed'))
def check_system_resources(self, detailed, threshold):
"""Check CPU and memory usage"""
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
self.stdout.write(f'CPU Usage: {cpu_percent}%')
self.stdout.write(f'Memory Usage: {memory.percent}%')
if detailed:
self.stdout.write(f'Memory Available: {memory.available / (1024**3):.2f} GB')
self.stdout.write(f'Memory Total: {memory.total / (1024**3):.2f} GB')
if cpu_percent > threshold or memory.percent > threshold:
self.stdout.write(
self.style.WARNING(f'Warning: Resource usage above {threshold}%')
)
def check_database(self):
"""Check database connection and performance"""
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
self.stdout.write(self.style.SUCCESS('Database: OK'))
except Exception as e:
self.stdout.write(self.style.ERROR(f'Database: ERROR - {e}'))
def check_cache(self):
"""Check cache functionality"""
try:
cache.set('health_check', 'ok', 10)
if cache.get('health_check') == 'ok':
self.stdout.write(self.style.SUCCESS('Cache: OK'))
else:
self.stdout.write(self.style.ERROR('Cache: ERROR'))
except Exception as e:
self.stdout.write(self.style.ERROR(f'Cache: ERROR - {e}'))
def check_disk_space(self, threshold):
"""Check disk space usage"""
disk = psutil.disk_usage('/')
disk_percent = disk.percent
self.stdout.write(f'Disk Usage: {disk_percent}%')
if disk_percent > threshold:
self.stdout.write(
self.style.WARNING(f'Warning: Disk usage above {threshold}%')
)
# Usage examples:
# python manage.py maintenance --vacuum --analyze
# python manage.py export_data Post --format csv --fields title content
# python manage.py health_check --detailed --threshold 90Test your understanding of this topic:
Master Django signals for decoupled communication between components
Content by: Manali Trivedi
Python Django Developer
Django signals allow decoupled applications to get notified when certain actions occur elsewhere in the application. They provide a way to execute code when certain actions happen in your Django application.
# Django Signals Implementation
# 1. Built-in Signals
from django.db.models.signals import pre_save, post_save, pre_delete, post_delete
from django.dispatch import receiver
from django.contrib.auth.models import User
from .models import Post, Comment, Like
# Post save signal
@receiver(post_save, sender=Post)
def post_save_handler(sender, instance, created, **kwargs):
"""Handle post save events"""
if created:
# New post created
print(f"New post created: {instance.title}")
# Update user post count
instance.author.profile.post_count += 1
instance.author.profile.save()
# Send notification
send_post_notification(instance)
else:
# Post updated
print(f"Post updated: {instance.title}")
# Update search index
update_search_index(instance)
# Post delete signal
@receiver(post_delete, sender=Post)
def post_delete_handler(sender, instance, **kwargs):
"""Handle post delete events"""
# Update user post count
instance.author.profile.post_count -= 1
instance.author.profile.save()
# Remove from search index
remove_from_search_index(instance)
# Clean up related files
cleanup_post_files(instance)
# Comment signal
@receiver(post_save, sender=Comment)
def comment_save_handler(sender, instance, created, **kwargs):
"""Handle comment save events"""
if created:
# Update post comment count
instance.post.comment_count += 1
instance.post.save()
# Send notification to post author
send_comment_notification(instance)
# Update post activity timestamp
instance.post.last_activity = timezone.now()
instance.post.save()
# 2. Custom Signals
from django.dispatch import Signal
# Define custom signals
post_published = Signal()
post_viewed = Signal()
user_registered = Signal()
post_liked = Signal()
# Signal handlers
@receiver(post_published)
def handle_post_published(sender, post, **kwargs):
"""Handle post publication"""
# Send email notifications
send_publication_notifications(post)
# Update RSS feed
update_rss_feed()
# Post to social media
post_to_social_media(post)
# Update search index
update_search_index(post)
@receiver(post_viewed)
def handle_post_viewed(sender, post, user, **kwargs):
"""Handle post view"""
# Increment view count
post.view_count += 1
post.save()
# Track user activity
track_user_activity(user, 'post_view', post)
# Update trending posts
update_trending_posts(post)
@receiver(user_registered)
def handle_user_registered(sender, user, **kwargs):
"""Handle new user registration"""
# Send welcome email
send_welcome_email(user)
# Create user profile
create_user_profile(user)
# Add to mailing list
add_to_mailing_list(user)
# Send onboarding sequence
start_onboarding_sequence(user)
@receiver(post_liked)
def handle_post_liked(sender, post, user, **kwargs):
"""Handle post like"""
# Update like count
post.like_count += 1
post.save()
# Send notification to post author
send_like_notification(post, user)
# Update user activity
track_user_activity(user, 'post_like', post)
# 3. Advanced Signal Patterns
class SignalManager:
"""Advanced signal management with error handling and logging"""
def __init__(self):
self.signal_handlers = {}
self.error_handlers = {}
def register_handler(self, signal, handler, error_handler=None):
"""Register a signal handler with optional error handling"""
if signal not in self.signal_handlers:
self.signal_handlers[signal] = []
self.signal_handlers[signal].append(handler)
if error_handler:
self.error_handlers[handler] = error_handler
def dispatch_signal(self, signal, **kwargs):
"""Dispatch a signal with error handling"""
if signal not in self.signal_handlers:
return
for handler in self.signal_handlers[signal]:
try:
handler(**kwargs)
except Exception as e:
# Log error
logger.error(f"Signal handler {handler.__name__} failed: {e}")
# Call error handler if available
if handler in self.error_handlers:
try:
self.error_handlers[handler](e, **kwargs)
except Exception as error_e:
logger.error(f"Error handler also failed: {error_e}")
def unregister_handler(self, signal, handler):
"""Unregister a signal handler"""
if signal in self.signal_handlers and handler in self.signal_handlers[signal]:
self.signal_handlers[signal].remove(handler)
# Usage
signal_manager = SignalManager()
# Register handlers
signal_manager.register_handler(
post_published,
handle_post_published,
error_handler=lambda e, **kwargs: logger.error(f"Publication failed: {e}")
)
# Dispatch signals
signal_manager.dispatch_signal(post_published, post=post_instance)
# 4. Signal Decorators and Utilities
from functools import wraps
from django.core.cache import cache
def signal_cache_invalidator(cache_pattern):
"""Decorator to invalidate cache when signals are triggered"""
def decorator(func):
@wraps(func)
def wrapper(sender, instance, **kwargs):
# Execute the original function
result = func(sender, instance, **kwargs)
# Invalidate cache
cache_keys = cache_pattern(instance)
for key in cache_keys:
cache.delete(key)
return result
return wrapper
return decorator
def get_cache_keys_for_post(post):
"""Generate cache keys for a post"""
return [
f'post_{post.id}',
f'user_posts_{post.author.id}',
f'category_posts_{post.category.id}',
'recent_posts',
'trending_posts'
]
# Use the decorator
@signal_cache_invalidator(get_cache_keys_for_post)
@receiver(post_save, sender=Post)
def invalidate_post_cache(sender, instance, **kwargs):
"""Invalidate post-related cache"""
pass
# 5. Signal Testing
from django.test import TestCase
from unittest.mock import patch
class SignalTestCase(TestCase):
def setUp(self):
self.user = User.objects.create_user(
username='testuser',
password='testpass123'
)
self.post = Post.objects.create(
title='Test Post',
content='Test content',
author=self.user
)
@patch('myapp.signals.send_publication_notifications')
def test_post_published_signal(self, mock_send_notifications):
"""Test that post published signal is sent"""
# Publish the post
self.post.status = 'published'
self.post.save()
# Check if signal was sent
mock_send_notifications.assert_called_once_with(self.post)
def test_post_delete_signal(self):
"""Test that post delete signal is sent"""
initial_count = self.user.profile.post_count
# Delete the post
self.post.delete()
# Check if post count was updated
self.user.profile.refresh_from_db()
self.assertEqual(self.user.profile.post_count, initial_count - 1)
# 6. Performance Considerations
class OptimizedSignalHandler:
"""Signal handler with performance optimizations"""
def __init__(self):
self.batch_size = 100
self.pending_updates = []
@receiver(post_save, sender=Post)
def batch_update_search_index(self, sender, instance, **kwargs):
"""Batch update search index for better performance"""
self.pending_updates.append(instance.id)
if len(self.pending_updates) >= self.batch_size:
self._process_batch()
def _process_batch(self):
"""Process batch of updates"""
if not self.pending_updates:
return
# Process in batch
post_ids = self.pending_updates[:self.batch_size]
self.pending_updates = self.pending_updates[self.batch_size:]
# Update search index in batch
update_search_index_batch(post_ids)
# Usage
optimized_handler = OptimizedSignalHandler()Test your understanding of this topic:
Learn to create custom middleware for request/response processing
Content by: Manali Trivedi
Python Django Developer
Middleware is a framework of hooks into Django's request/response processing. It's a way to process requests globally before they reach the view. Custom middleware can handle authentication, logging, rate limiting, and more.
# Custom Middleware Development
# 1. Basic Middleware Structure
class CustomMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# Code executed before the view
response = self.get_response(request)
# Code executed after the view
return response
def process_view(self, request, view_func, view_args, view_kwargs):
# Code executed before the view is called
return None # Continue processing
def process_exception(self, request, exception):
# Code executed when an exception occurs
return None # Let Django handle the exception
def process_template_response(self, request, response):
# Code executed after the view returns a response
return response
# 2. Authentication Middleware
from django.contrib.auth.models import AnonymousUser
from django.core.cache import cache
import jwt
class JWTAuthMiddleware:
"""JWT Authentication Middleware"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# Extract JWT token from headers
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if auth_header.startswith('Bearer '):
token = auth_header[7:] # Remove 'Bearer ' prefix
try:
# Decode JWT token
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=['HS256'])
user_id = payload.get('user_id')
if user_id:
# Get user from cache or database
user = cache.get(f'user_{user_id}')
if not user:
from django.contrib.auth.models import User
user = User.objects.get(id=user_id)
cache.set(f'user_{user_id}', user, 300) # Cache for 5 minutes
request.user = user
else:
request.user = AnonymousUser()
except (jwt.InvalidTokenError, User.DoesNotExist):
request.user = AnonymousUser()
else:
request.user = AnonymousUser()
response = self.get_response(request)
return response
# 3. Rate Limiting Middleware
from django.core.cache import cache
from django.http import HttpResponseTooManyRequests
import time
class RateLimitMiddleware:
"""Rate limiting middleware"""
def __init__(self, get_response):
self.get_response = get_response
self.rate_limits = {
'default': {'requests': 100, 'window': 3600}, # 100 requests per hour
'api': {'requests': 1000, 'window': 3600}, # 1000 requests per hour
'login': {'requests': 5, 'window': 300}, # 5 requests per 5 minutes
}
def __call__(self, request):
# Get client identifier
client_id = self._get_client_id(request)
# Check rate limit
if not self._check_rate_limit(request, client_id):
return HttpResponseTooManyRequests(
'Rate limit exceeded. Please try again later.'
)
response = self.get_response(request)
return response
def _get_client_id(self, request):
"""Get unique client identifier"""
if request.user.is_authenticated:
return f"user_{request.user.id}"
else:
return f"ip_{request.META.get('REMOTE_ADDR', 'unknown')}"
def _check_rate_limit(self, request, client_id):
"""Check if client has exceeded rate limit"""
# Determine rate limit type
if request.path.startswith('/api/'):
limit_type = 'api'
elif request.path.startswith('/login/'):
limit_type = 'login'
else:
limit_type = 'default'
limit_config = self.rate_limits[limit_type]
cache_key = f"rate_limit_{limit_type}_{client_id}"
# Get current request count
current_count = cache.get(cache_key, 0)
if current_count >= limit_config['requests']:
return False
# Increment count
cache.set(cache_key, current_count + 1, limit_config['window'])
return True
# 4. Logging and Monitoring Middleware
import logging
import time
from django.utils.deprecation import MiddlewareMixin
class LoggingMiddleware(MiddlewareMixin):
"""Logging and monitoring middleware"""
def __init__(self, get_response):
self.get_response = get_response
self.logger = logging.getLogger('django.request')
def __call__(self, request):
# Start timing
start_time = time.time()
# Log request
self.logger.info(f"Request started: {request.method} {request.path}")
# Process request
response = self.get_response(request)
# Calculate duration
duration = time.time() - start_time
# Log response
self.logger.info(
f"Request completed: {request.method} {request.path} "
f"- Status: {response.status_code} - Duration: {duration:.3f}s"
)
# Add custom headers
response['X-Request-Duration'] = f"{duration:.3f}s"
response['X-Request-ID'] = self._generate_request_id()
return response
def _generate_request_id(self):
"""Generate unique request ID"""
import uuid
return str(uuid.uuid4())
def process_exception(self, request, exception):
"""Log exceptions"""
self.logger.error(
f"Request failed: {request.method} {request.path} - Error: {str(exception)}"
)
return None
# 5. CORS Middleware
from django.http import JsonResponse
class CORSMiddleware:
"""Custom CORS middleware"""
def __init__(self, get_response):
self.get_response = get_response
self.allowed_origins = [
'http://localhost:3000',
'https://yourdomain.com',
'https://api.yourdomain.com'
]
self.allowed_methods = ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS']
self.allowed_headers = [
'Content-Type',
'Authorization',
'X-Requested-With'
]
def __call__(self, request):
# Handle preflight request
if request.method == 'OPTIONS':
return self._handle_preflight(request)
response = self.get_response(request)
# Add CORS headers
origin = request.META.get('HTTP_ORIGIN')
if origin in self.allowed_origins:
response['Access-Control-Allow-Origin'] = origin
response['Access-Control-Allow-Credentials'] = 'true'
return response
def _handle_preflight(self, request):
"""Handle preflight OPTIONS request"""
origin = request.META.get('HTTP_ORIGIN')
if origin not in self.allowed_origins:
return JsonResponse({'error': 'Origin not allowed'}, status=403)
response = JsonResponse({})
response['Access-Control-Allow-Origin'] = origin
response['Access-Control-Allow-Methods'] = ', '.join(self.allowed_methods)
response['Access-Control-Allow-Headers'] = ', '.join(self.allowed_headers)
response['Access-Control-Allow-Credentials'] = 'true'
response['Access-Control-Max-Age'] = '86400' # 24 hours
return response
# 6. Security Middleware
from django.http import HttpResponseForbidden
import re
class SecurityMiddleware:
"""Security-focused middleware"""
def __init__(self, get_response):
self.get_response = get_response
self.suspicious_patterns = [
r'<script',
r'javascript:',
r'vbscript:',
r'data:text/html',
r'<iframe',
r'<object',
r'<embed'
]
def __call__(self, request):
# Check for suspicious patterns in request
if self._is_suspicious_request(request):
return HttpResponseForbidden('Suspicious request detected')
response = self.get_response(request)
# Add security headers
response['X-Content-Type-Options'] = 'nosniff'
response['X-Frame-Options'] = 'DENY'
response['X-XSS-Protection'] = '1; mode=block'
response['Referrer-Policy'] = 'strict-origin-when-cross-origin'
return response
def _is_suspicious_request(self, request):
"""Check if request contains suspicious patterns"""
# Check GET parameters
for key, value in request.GET.items():
if self._contains_suspicious_pattern(value):
return True
# Check POST data
if request.method == 'POST':
for key, value in request.POST.items():
if self._contains_suspicious_pattern(value):
return True
# Check headers
for key, value in request.META.items():
if self._contains_suspicious_pattern(str(value)):
return True
return False
def _contains_suspicious_pattern(self, value):
"""Check if value contains suspicious patterns"""
value_str = str(value).lower()
for pattern in self.suspicious_patterns:
if re.search(pattern, value_str):
return True
return False
# 7. Performance Monitoring Middleware
from django.core.cache import cache
import psutil
class PerformanceMiddleware:
"""Performance monitoring middleware"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# Record start time and memory
start_time = time.time()
start_memory = psutil.Process().memory_info().rss
# Process request
response = self.get_response(request)
# Calculate metrics
duration = time.time() - start_time
memory_used = psutil.Process().memory_info().rss - start_memory
# Store metrics
self._store_metrics(request.path, duration, memory_used)
# Add performance headers
response['X-Response-Time'] = f"{duration:.3f}s"
response['X-Memory-Used'] = f"{memory_used / 1024:.1f}KB"
return response
def _store_metrics(self, path, duration, memory_used):
"""Store performance metrics"""
cache_key = f"performance_{path}"
metrics = cache.get(cache_key, {
'count': 0,
'total_duration': 0,
'total_memory': 0,
'min_duration': float('inf'),
'max_duration': 0
})
metrics['count'] += 1
metrics['total_duration'] += duration
metrics['total_memory'] += memory_used
metrics['min_duration'] = min(metrics['min_duration'], duration)
metrics['max_duration'] = max(metrics['max_duration'], duration)
cache.set(cache_key, metrics, 3600) # Cache for 1 hour
# 8. Middleware Configuration
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
# Custom middleware
'myapp.middleware.JWTAuthMiddleware',
'myapp.middleware.RateLimitMiddleware',
'myapp.middleware.LoggingMiddleware',
'myapp.middleware.CORSMiddleware',
'myapp.middleware.SecurityMiddleware',
'myapp.middleware.PerformanceMiddleware',
]
# 9. Middleware Testing
from django.test import TestCase, RequestFactory
from django.contrib.auth.models import AnonymousUser
class MiddlewareTestCase(TestCase):
def setUp(self):
self.factory = RequestFactory()
self.middleware = JWTAuthMiddleware(lambda r: None)
def test_jwt_authentication(self):
"""Test JWT authentication middleware"""
# Create request with JWT token
request = self.factory.get('/api/posts/')
request.META['HTTP_AUTHORIZATION'] = 'Bearer valid_token'
# Process request
self.middleware(request)
# Check if user is set
self.assertIsNotNone(request.user)
def test_rate_limiting(self):
"""Test rate limiting middleware"""
middleware = RateLimitMiddleware(lambda r: None)
# Create multiple requests
for i in range(6): # Exceed limit of 5
request = self.factory.post('/login/')
response = middleware(request)
if i < 5:
self.assertIsNone(response) # Should pass
else:
self.assertEqual(response.status_code, 429) # Should be rate limitedTest your understanding of this topic:
Master advanced ORM patterns and techniques for complex data operations
Content by: Manali Trivedi
Python Django Developer
Learn advanced Django ORM patterns for complex queries, custom managers, and database operations.
# Advanced ORM Patterns
# 1. Custom Model Managers
from django.db import models
from django.db.models import Q, Count, Avg, Sum, Max, Min
from django.db.models.functions import Coalesce, TruncDate, ExtractYear
class PostManager(models.Manager):
"""Custom manager for Post model"""
def published(self):
"""Return only published posts"""
return self.filter(status='published')
def by_author(self, author):
"""Return posts by specific author"""
return self.filter(author=author)
def by_category(self, category):
"""Return posts by category"""
return self.filter(category=category)
def with_comments(self):
"""Return posts with comment count"""
return self.annotate(
comment_count=Count('comments'),
avg_rating=Avg('comments__rating')
)
def trending(self, days=7):
"""Return trending posts based on views and likes"""
from django.utils import timezone
from datetime import timedelta
cutoff_date = timezone.now() - timedelta(days=days)
return self.filter(
created_date__gte=cutoff_date
).annotate(
engagement_score=(
Count('views') * 1 +
Count('likes') * 3 +
Count('comments') * 2
)
).order_by('-engagement_score')
def search(self, query):
"""Search posts by title and content"""
return self.filter(
Q(title__icontains=query) |
Q(content__icontains=query) |
Q(tags__name__icontains=query)
).distinct()
def get_popular_authors(self, limit=10):
"""Get most popular authors by post count and engagement"""
return self.values('author').annotate(
post_count=Count('id'),
total_views=Sum('views'),
total_likes=Sum('likes'),
total_comments=Count('comments')
).order_by('-post_count', '-total_views')[:limit]
class Post(models.Model):
title = models.CharField(max_length=200)
content = models.TextField()
author = models.ForeignKey(User, on_delete=models.CASCADE)
category = models.ForeignKey(Category, on_delete=models.CASCADE)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='draft')
created_date = models.DateTimeField(auto_now_add=True)
published_date = models.DateTimeField(null=True, blank=True)
views = models.PositiveIntegerField(default=0)
likes = models.PositiveIntegerField(default=0)
objects = PostManager()
class Meta:
indexes = [
models.Index(fields=['status', 'published_date']),
models.Index(fields=['author', 'created_date']),
models.Index(fields=['category', 'status']),
]
# 2. Custom QuerySet Methods
class PostQuerySet(models.QuerySet):
"""Custom QuerySet for Post model"""
def active(self):
"""Return only active posts"""
return self.filter(status='published', is_active=True)
def recent(self, days=30):
"""Return recent posts"""
from django.utils import timezone
from datetime import timedelta
cutoff_date = timezone.now() - timedelta(days=days)
return self.filter(created_date__gte=cutoff_date)
def featured(self):
"""Return featured posts"""
return self.filter(is_featured=True, status='published')
def by_date_range(self, start_date, end_date):
"""Return posts within date range"""
return self.filter(
created_date__range=[start_date, end_date]
)
def with_related_data(self):
"""Optimize queries with related data"""
return self.select_related('author', 'category').prefetch_related(
'tags',
'comments__author',
'likes__user'
)
def aggregate_stats(self):
"""Get aggregated statistics"""
return self.aggregate(
total_posts=Count('id'),
total_views=Sum('views'),
total_likes=Sum('likes'),
avg_views=Avg('views'),
avg_likes=Avg('likes')
)
def group_by_month(self):
"""Group posts by month"""
return self.annotate(
month=TruncDate('created_date', 'month')
).values('month').annotate(
count=Count('id'),
total_views=Sum('views')
).order_by('month')
# Use the custom QuerySet
Post.objects = PostQuerySet.as_manager()
# 3. Advanced Query Patterns
class AdvancedQueryExamples:
"""Examples of advanced query patterns"""
@staticmethod
def complex_search(query, filters=None):
"""Complex search with multiple filters"""
queryset = Post.objects.all()
# Text search
if query:
queryset = queryset.filter(
Q(title__icontains=query) |
Q(content__icontains=query) |
Q(author__username__icontains=query) |
Q(category__name__icontains=query)
)
# Apply filters
if filters:
if filters.get('category'):
queryset = queryset.filter(category_id=filters['category'])
if filters.get('author'):
queryset = queryset.filter(author_id=filters['author'])
if filters.get('date_from'):
queryset = queryset.filter(created_date__gte=filters['date_from'])
if filters.get('date_to'):
queryset = queryset.filter(created_date__lte=filters['date_to'])
if filters.get('min_views'):
queryset = queryset.filter(views__gte=filters['min_views'])
return queryset.distinct()
@staticmethod
def get_analytics_data(start_date, end_date):
"""Get analytics data for date range"""
return Post.objects.filter(
created_date__range=[start_date, end_date]
).aggregate(
total_posts=Count('id'),
total_views=Sum('views'),
total_likes=Sum('likes'),
total_comments=Count('comments'),
avg_views_per_post=Avg('views'),
avg_likes_per_post=Avg('likes'),
posts_per_day=Count('id') / (end_date - start_date).days
)
@staticmethod
def get_user_engagement(user_id):
"""Get user engagement metrics"""
return Post.objects.filter(author_id=user_id).aggregate(
total_posts=Count('id'),
total_views=Sum('views'),
total_likes=Sum('likes'),
total_comments=Count('comments'),
avg_views_per_post=Avg('views'),
engagement_rate=(
(Sum('likes') + Sum('comments')) / Count('id')
)
)
# 4. Database Functions and Expressions
from django.db.models import F, Func, Value, ExpressionWrapper
from django.db.models.functions import Concat, Lower, Upper, Length
class DatabaseFunctions:
"""Examples of database functions and expressions"""
@staticmethod
def update_view_count(post_id):
"""Update view count using F() expression"""
Post.objects.filter(id=post_id).update(views=F('views') + 1)
@staticmethod
def get_posts_with_full_name():
"""Get posts with concatenated author full name"""
return Post.objects.annotate(
author_full_name=Concat(
'author__first_name',
Value(' '),
'author__last_name'
)
)
@staticmethod
def get_posts_by_title_length():
"""Get posts grouped by title length"""
return Post.objects.annotate(
title_length=Length('title')
).values('title_length').annotate(
count=Count('id')
).order_by('title_length')
@staticmethod
def get_posts_with_engagement_score():
"""Calculate engagement score for posts"""
return Post.objects.annotate(
engagement_score=ExpressionWrapper(
F('views') * 1 + F('likes') * 3 + F('comments__count') * 2,
output_field=models.IntegerField()
)
).order_by('-engagement_score')
# 5. Raw SQL and Complex Queries
from django.db import connection
from django.db.models import RawSQL
class RawSQLExamples:
"""Examples of raw SQL usage"""
@staticmethod
def get_posts_with_complex_stats():
"""Get posts with complex statistics using raw SQL"""
return Post.objects.annotate(
comment_count=RawSQL("""
SELECT COUNT(*) FROM blog_comment
WHERE blog_comment.post_id = blog_post.id
""", []),
avg_rating=RawSQL("""
SELECT AVG(rating) FROM blog_comment
WHERE blog_comment.post_id = blog_post.id
""", []),
last_comment_date=RawSQL("""
SELECT MAX(created_date) FROM blog_comment
WHERE blog_comment.post_id = blog_post.id
""", [])
)
@staticmethod
def get_trending_posts_window_function():
"""Get trending posts using window functions (PostgreSQL)"""
with connection.cursor() as cursor:
cursor.execute("""
SELECT
p.id,
p.title,
p.views,
p.likes,
ROW_NUMBER() OVER (
ORDER BY (p.views + p.likes * 3) DESC
) as rank
FROM blog_post p
WHERE p.status = 'published'
ORDER BY rank
LIMIT 10
""")
return cursor.fetchall()
@staticmethod
def get_posts_by_time_period():
"""Get posts grouped by time periods"""
return Post.objects.raw("""
SELECT
id,
title,
created_date,
CASE
WHEN created_date >= NOW() - INTERVAL '1 day' THEN 'Today'
WHEN created_date >= NOW() - INTERVAL '7 days' THEN 'This Week'
WHEN created_date >= NOW() - INTERVAL '30 days' THEN 'This Month'
ELSE 'Older'
END as time_period
FROM blog_post
ORDER BY created_date DESC
""")
# 6. Bulk Operations and Performance
class BulkOperations:
"""Examples of bulk operations for performance"""
@staticmethod
def bulk_create_posts(posts_data):
"""Bulk create posts"""
posts = [
Post(**data) for data in posts_data
]
return Post.objects.bulk_create(posts, batch_size=100)
@staticmethod
def bulk_update_posts(posts, fields):
"""Bulk update posts"""
return Post.objects.bulk_update(posts, fields, batch_size=100)
@staticmethod
def bulk_delete_posts(post_ids):
"""Bulk delete posts"""
return Post.objects.filter(id__in=post_ids).delete()
@staticmethod
def update_multiple_posts(updates):
"""Update multiple posts with different values"""
for post_id, update_data in updates.items():
Post.objects.filter(id=post_id).update(**update_data)
# 7. Query Optimization Techniques
class QueryOptimization:
"""Query optimization techniques"""
@staticmethod
def optimize_post_list():
"""Optimize post list query"""
return Post.objects.select_related(
'author', 'category'
).prefetch_related(
'tags',
Prefetch(
'comments',
queryset=Comment.objects.select_related('author')[:5]
)
).filter(status='published').order_by('-created_date')
@staticmethod
def optimize_user_posts(user_id):
"""Optimize user posts query"""
return Post.objects.filter(author_id=user_id).select_related(
'category'
).prefetch_related(
'tags',
'comments'
).annotate(
total_views=Sum('views'),
total_likes=Sum('likes')
).order_by('-created_date')
@staticmethod
def optimize_category_posts(category_id):
"""Optimize category posts query"""
return Post.objects.filter(
category_id=category_id,
status='published'
).select_related('author').prefetch_related(
'tags'
).annotate(
comment_count=Count('comments')
).order_by('-created_date')
# Usage Examples
# Get trending posts
trending_posts = Post.objects.trending(days=7)
# Search posts
search_results = Post.objects.search('Django tutorial')
# Get analytics
analytics = AdvancedQueryExamples.get_analytics_data(
start_date=timezone.now() - timedelta(days=30),
end_date=timezone.now()
)
# Optimize queries
optimized_posts = QueryOptimization.optimize_post_list()
# Use raw SQL
complex_stats = RawSQLExamples.get_posts_with_complex_stats()Test your understanding of this topic:
Continue your learning journey and master the next set of concepts.
Continue to Module 9