Custom Management Commands
Learn to create custom Django management commands for automation and administrative tasks
75 min•By Priygop Team•Last updated: Feb 2026
Management Commands Fundamentals
Django management commands are powerful tools for automating tasks, data processing, and administrative operations. They run from the command line using `python manage.py`.
Creating Custom Commands
Example
# Basic Management Command Structure
# management/commands/my_command.py
from django.core.management.base import BaseCommand, CommandError
class Command(BaseCommand):
help = 'Description of what this command does'
def add_arguments(self, parser):
parser.add_argument('--force', action='store_true', help='Force execution')
parser.add_argument('--limit', type=int, default=100, help='Limit number of records')
def handle(self, *args, **options):
try:
# Command logic here
self.stdout.write(self.style.SUCCESS('Command executed successfully'))
except Exception as e:
raise CommandError(f'Command failed: {e}')
# Example: Data Import Command
class Command(BaseCommand):
help = 'Import users from CSV file'
def add_arguments(self, parser):
parser.add_argument('csv_file', type=str, help='Path to CSV file')
parser.add_argument('--dry-run', action='store_true', help='Test run without saving')
def handle(self, *args, **options):
csv_file = options['csv_file']
dry_run = options['dry_run']
if dry_run:
self.stdout.write('DRY RUN MODE - No data will be saved')
# Import logic here
imported_count = 0
self.stdout.write(
self.style.SUCCESS(f'Successfully imported {imported_count} users')
)Advanced Command Patterns
Example
# Advanced Management Commands
# 1. Database Maintenance Command
from django.core.management.base import BaseCommand
from django.db import connection
from django.core.cache import cache
import time
class Command(BaseCommand):
help = 'Perform database maintenance tasks'
def add_arguments(self, parser):
parser.add_argument(
'--vacuum',
action='store_true',
help='Run VACUUM on PostgreSQL tables'
)
parser.add_argument(
'--analyze',
action='store_true',
help='Run ANALYZE on tables'
)
parser.add_argument(
'--cache-clear',
action='store_true',
help='Clear all cache'
)
def handle(self, *args, **options):
start_time = time.time()
if options['vacuum']:
self.vacuum_database()
if options['analyze']:
self.analyze_tables()
if options['cache_clear']:
self.clear_cache()
execution_time = time.time() - start_time
self.stdout.write(
self.style.SUCCESS(f'Maintenance completed in {execution_time:.2f} seconds')
)
def vacuum_database(self):
"""Run VACUUM on PostgreSQL tables"""
with connection.cursor() as cursor:
cursor.execute("VACUUM ANALYZE;")
self.stdout.write('Database vacuum completed')
def analyze_tables(self):
"""Run ANALYZE on tables"""
with connection.cursor() as cursor:
cursor.execute("ANALYZE;")
self.stdout.write('Table analysis completed')
def clear_cache(self):
"""Clear all cache"""
cache.clear()
self.stdout.write('Cache cleared')
# 2. Data Export Command
import csv
import json
from django.core.serializers import serialize
from django.conf import settings
class Command(BaseCommand):
help = 'Export data to various formats'
def add_arguments(self, parser):
parser.add_argument('model', type=str, help='Model name to export')
parser.add_argument(
'--format',
choices=['csv', 'json', 'xml'],
default='csv',
help='Export format'
)
parser.add_argument(
'--output',
type=str,
help='Output file path'
)
parser.add_argument(
'--fields',
nargs='+',
help='Fields to export'
)
def handle(self, *args, **options):
model_name = options['model']
export_format = options['format']
output_file = options['output']
fields = options['fields']
# Get model class
from django.apps import apps
model = apps.get_model(apps.get_app_config('myapp').label, model_name)
# Get queryset
queryset = model.objects.all()
if fields:
queryset = queryset.only(*fields)
# Export based on format
if export_format == 'csv':
self.export_csv(queryset, output_file, fields)
elif export_format == 'json':
self.export_json(queryset, output_file)
elif export_format == 'xml':
self.export_xml(queryset, output_file)
def export_csv(self, queryset, output_file, fields):
"""Export to CSV format"""
if not output_file:
output_file = f'{queryset.model._meta.model_name}_export.csv'
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
if fields:
writer = csv.DictWriter(csvfile, fieldnames=fields)
else:
writer = csv.DictWriter(csvfile, fieldnames=[f.name for f in queryset.model._meta.fields])
writer.writeheader()
for obj in queryset:
if fields:
row = {field: getattr(obj, field) for field in fields}
else:
row = {f.name: getattr(obj, f.name) for f in queryset.model._meta.fields}
writer.writerow(row)
self.stdout.write(
self.style.SUCCESS(f'CSV export completed: {output_file}')
)
def export_json(self, queryset, output_file):
"""Export to JSON format"""
if not output_file:
output_file = f'{queryset.model._meta.model_name}_export.json'
data = serialize('json', queryset)
with open(output_file, 'w', encoding='utf-8') as f:
f.write(data)
self.stdout.write(
self.style.SUCCESS(f'JSON export completed: {output_file}')
)
def export_xml(self, queryset, output_file):
"""Export to XML format"""
if not output_file:
output_file = f'{queryset.model._meta.model_name}_export.xml'
data = serialize('xml', queryset)
with open(output_file, 'w', encoding='utf-8') as f:
f.write(data)
self.stdout.write(
self.style.SUCCESS(f'XML export completed: {output_file}')
)
# 3. System Health Check Command
import psutil
import os
from django.core.cache import cache
from django.db import connection
class Command(BaseCommand):
help = 'Check system health and performance'
def add_arguments(self, parser):
parser.add_argument(
'--detailed',
action='store_true',
help='Show detailed information'
)
parser.add_argument(
'--threshold',
type=int,
default=80,
help='Warning threshold percentage'
)
def handle(self, *args, **options):
self.stdout.write('System Health Check')
self.stdout.write('=' * 50)
# Check system resources
self.check_system_resources(options['detailed'], options['threshold'])
# Check database
self.check_database()
# Check cache
self.check_cache()
# Check disk space
self.check_disk_space(options['threshold'])
self.stdout.write('=' * 50)
self.stdout.write(self.style.SUCCESS('Health check completed'))
def check_system_resources(self, detailed, threshold):
"""Check CPU and memory usage"""
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
self.stdout.write(f'CPU Usage: {cpu_percent}%')
self.stdout.write(f'Memory Usage: {memory.percent}%')
if detailed:
self.stdout.write(f'Memory Available: {memory.available / (1024**3):.2f} GB')
self.stdout.write(f'Memory Total: {memory.total / (1024**3):.2f} GB')
if cpu_percent > threshold or memory.percent > threshold:
self.stdout.write(
self.style.WARNING(f'Warning: Resource usage above {threshold}%')
)
def check_database(self):
"""Check database connection and performance"""
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
self.stdout.write(self.style.SUCCESS('Database: OK'))
except Exception as e:
self.stdout.write(self.style.ERROR(f'Database: ERROR - {e}'))
def check_cache(self):
"""Check cache functionality"""
try:
cache.set('health_check', 'ok', 10)
if cache.get('health_check') == 'ok':
self.stdout.write(self.style.SUCCESS('Cache: OK'))
else:
self.stdout.write(self.style.ERROR('Cache: ERROR'))
except Exception as e:
self.stdout.write(self.style.ERROR(f'Cache: ERROR - {e}'))
def check_disk_space(self, threshold):
"""Check disk space usage"""
disk = psutil.disk_usage('/')
disk_percent = disk.percent
self.stdout.write(f'Disk Usage: {disk_percent}%')
if disk_percent > threshold:
self.stdout.write(
self.style.WARNING(f'Warning: Disk usage above {threshold}%')
)
# Usage examples:
# python manage.py maintenance --vacuum --analyze
# python manage.py export_data Post --format csv --fields title content
# python manage.py health_check --detailed --threshold 90