redis_key_auditor.py
Outil d'analyse des patterns de clés et de la consommation mémoire Redis sans bloquer le serveur.
Informations
| Propriété | Valeur |
|---|---|
| Langage | Python 3.8+ |
| Catégorie | Base de données / Audit |
| Niveau | Intermédiaire |
| Dépendances | redis (redis-py) |
Description
Ce script analyse les clés stockées dans Redis en utilisant la commande SCAN (itérative et non-bloquante) pour éviter de bloquer le serveur. Il regroupe les clés par préfixe et fournit des statistiques de mémoire détaillées.
Fonctionnalités :
- Scan non-bloquant : Utilise
SCANau lieu deKEYS *(jamais en production !) - Groupement par préfixe : Agrège les clés par pattern (ex:
session:*,cache:*) - Analyse mémoire : Utilise
MEMORY USAGEpour l'estimation précise - Séparateur configurable : Supporte différentes conventions de nommage
- Export CSV : Génère un rapport exportable
Prérequis
Ne JAMAIS utiliser KEYS * en Production
La commande KEYS * parcourt toutes les clés en une seule opération bloquante.
Sur une instance avec des millions de clés :
- Blocage complet du serveur Redis pendant l'exécution
- Latence de plusieurs secondes voire minutes
- Impact sur toutes les applications connectées
Ce script utilise SCAN qui itère par petits lots sans bloquer.
Script
#!/usr/bin/env python3
"""
Script Name: redis_key_auditor.py
Description: Analyze Redis key patterns and memory usage without blocking
Author: ShellBook
Version: 1.0
"""
import argparse
import logging
import sys
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, Iterator, List, Optional, Tuple
try:
import redis
from redis.exceptions import ConnectionError, ResponseError
except ImportError:
print("Error: redis package not installed. Run: pip install redis")
sys.exit(1)
# Logging configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class KeyStats:
"""
Statistics for a key pattern group.
"""
count: int = 0
total_memory: int = 0
sample_keys: List[str] = field(default_factory=list)
ttl_set: int = 0
ttl_none: int = 0
class RedisKeyAuditor:
"""
Audits Redis keys by pattern with non-blocking SCAN iteration.
"""
def __init__(
self,
host: str = "localhost",
port: int = 6379,
password: Optional[str] = None,
db: int = 0,
pattern_separator: str = ":",
sample_size: int = 5
):
"""
Initialize the Redis auditor.
Args:
host: Redis server hostname
port: Redis server port
password: Redis password (optional)
db: Redis database number
pattern_separator: Character used to separate key prefixes
sample_size: Number of sample keys to keep per pattern
"""
self.host = host
self.port = port
self.db = db
self.pattern_separator = pattern_separator
self.sample_size = sample_size
self.stats: Dict[str, KeyStats] = defaultdict(KeyStats)
self.total_keys = 0
self.total_memory = 0
# Connect to Redis
self.client = redis.Redis(
host=host,
port=port,
password=password,
db=db,
decode_responses=True,
socket_timeout=30,
socket_connect_timeout=10
)
def _get_pattern(self, key: str) -> str:
"""
Extract pattern from key by replacing specific parts with wildcards.
Args:
key: The Redis key
Returns:
Pattern string (e.g., "session:*" from "session:abc123")
"""
parts = key.split(self.pattern_separator)
if len(parts) == 1:
# No separator found, return as-is or group as "no_prefix"
return key if len(key) < 20 else "no_prefix:*"
# Keep first part(s) as prefix, replace the rest with *
# Handle multi-level prefixes (e.g., "app:cache:user:123" -> "app:cache:user:*")
if len(parts) >= 3:
# Keep first two levels for better granularity
return f"{parts[0]}{self.pattern_separator}{parts[1]}{self.pattern_separator}*"
else:
return f"{parts[0]}{self.pattern_separator}*"
def _scan_keys(self, pattern: str = "*", count: int = 1000) -> Iterator[str]:
"""
Iterate over keys using SCAN (non-blocking).
Args:
pattern: Key pattern to match
count: Hint for number of keys per iteration
Yields:
Key names
"""
cursor = 0
while True:
cursor, keys = self.client.scan(cursor=cursor, match=pattern, count=count)
for key in keys:
yield key
if cursor == 0:
break
def _get_memory_usage(self, key: str) -> int:
"""
Get memory usage for a key using MEMORY USAGE command.
Args:
key: The Redis key
Returns:
Memory usage in bytes (0 if unavailable)
"""
try:
# MEMORY USAGE requires Redis 4.0+
usage = self.client.memory_usage(key)
return usage if usage else 0
except ResponseError:
# MEMORY USAGE not available (Redis < 4.0)
return 0
except Exception:
return 0
def _get_ttl(self, key: str) -> int:
"""
Get TTL for a key.
Args:
key: The Redis key
Returns:
TTL in seconds (-1 if no TTL, -2 if key doesn't exist)
"""
try:
return self.client.ttl(key)
except Exception:
return -2
def audit(self, pattern: str = "*", memory_sampling: bool = True) -> None:
"""
Perform the key audit.
Args:
pattern: Key pattern to scan
memory_sampling: Whether to sample memory usage (slower but more accurate)
"""
logger.info(f"Starting audit on {self.host}:{self.port} (db={self.db})")
logger.info(f"Pattern: {pattern} | Separator: '{self.pattern_separator}'")
# Get total key count for progress
total_db_keys = self.client.dbsize()
logger.info(f"Total keys in database: {total_db_keys}")
processed = 0
memory_sampled = 0
max_memory_samples = 10000 # Limit memory sampling for performance
for key in self._scan_keys(pattern=pattern):
processed += 1
# Extract pattern
key_pattern = self._get_pattern(key)
# Update stats
self.stats[key_pattern].count += 1
# Sample memory (limited for performance)
if memory_sampling and memory_sampled < max_memory_samples:
if self.stats[key_pattern].count <= 100 or processed % 100 == 0:
memory = self._get_memory_usage(key)
self.stats[key_pattern].total_memory += memory
self.total_memory += memory
memory_sampled += 1
# Check TTL
ttl = self._get_ttl(key)
if ttl == -1:
self.stats[key_pattern].ttl_none += 1
elif ttl > 0:
self.stats[key_pattern].ttl_set += 1
# Keep sample keys
if len(self.stats[key_pattern].sample_keys) < self.sample_size:
self.stats[key_pattern].sample_keys.append(key)
# Progress logging
if processed % 10000 == 0:
logger.info(f"Processed: {processed:,} keys...")
self.total_keys = processed
logger.info(f"Audit complete: {processed:,} keys analyzed")
def _format_size(self, size_bytes: int) -> str:
"""
Format bytes into human-readable size.
Args:
size_bytes: Size in bytes
Returns:
Formatted string (e.g., "1.5 MB")
"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024
return f"{size_bytes:.2f} TB"
def print_report(self, top_n: int = 20) -> None:
"""
Print the audit report.
Args:
top_n: Number of top patterns to show
"""
print("\n" + "=" * 80)
print("REDIS KEY AUDIT REPORT")
print("=" * 80)
print(f"Host: {self.host}:{self.port} | Database: {self.db}")
print(f"Total Keys Scanned: {self.total_keys:,}")
print(f"Total Memory Sampled: {self._format_size(self.total_memory)}")
print(f"Unique Patterns: {len(self.stats)}")
print("=" * 80)
# Sort by count (descending)
sorted_patterns = sorted(
self.stats.items(),
key=lambda x: x[1].count,
reverse=True
)[:top_n]
print(f"\n{'Pattern':<40} {'Count':>12} {'Memory':>12} {'No TTL':>10}")
print("-" * 80)
for pattern, stats in sorted_patterns:
# Truncate long patterns
display_pattern = pattern if len(pattern) <= 38 else pattern[:35] + "..."
print(
f"{display_pattern:<40} "
f"{stats.count:>12,} "
f"{self._format_size(stats.total_memory):>12} "
f"{stats.ttl_none:>10,}"
)
print("-" * 80)
# Show patterns without TTL (potential memory leaks)
no_ttl_patterns = [
(p, s) for p, s in self.stats.items()
if s.ttl_none > 0 and s.ttl_none == s.count
]
if no_ttl_patterns:
print(f"\n⚠️ PATTERNS WITHOUT TTL ({len(no_ttl_patterns)} patterns):")
print(" These keys will never expire and may cause memory issues.")
for pattern, stats in sorted(no_ttl_patterns, key=lambda x: x[1].count, reverse=True)[:10]:
print(f" - {pattern}: {stats.count:,} keys")
print("\n" + "=" * 80)
def export_csv(self, filename: str) -> None:
"""
Export report to CSV file.
Args:
filename: Output CSV filename
"""
import csv
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([
'Pattern', 'Key Count', 'Memory (bytes)', 'TTL Set', 'No TTL', 'Sample Keys'
])
for pattern, stats in sorted(self.stats.items(), key=lambda x: x[1].count, reverse=True):
writer.writerow([
pattern,
stats.count,
stats.total_memory,
stats.ttl_set,
stats.ttl_none,
';'.join(stats.sample_keys)
])
logger.info(f"Report exported to: {filename}")
def setup_args() -> argparse.Namespace:
"""
Configure CLI arguments.
Returns:
Parsed arguments namespace
"""
parser = argparse.ArgumentParser(
description="Audit Redis keys by pattern without blocking the server",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Exemples:
# Audit local Redis
%(prog)s
# Audit remote Redis with password
%(prog)s --host redis.example.com --password secret
# Audit specific pattern
%(prog)s --pattern "session:*"
# Custom separator (e.g., for keys like "app.cache.user.123")
%(prog)s --pattern-separator "."
# Export to CSV
%(prog)s --export report.csv
Note:
Ce script utilise SCAN (non-bloquant) et non KEYS * (bloquant).
Il est sûr pour une utilisation en production.
"""
)
parser.add_argument(
'-H', '--host',
default='localhost',
help='Hôte Redis (défaut: localhost)'
)
parser.add_argument(
'-p', '--port',
type=int,
default=6379,
help='Port Redis (défaut: 6379)'
)
parser.add_argument(
'-a', '--password',
help='Mot de passe Redis'
)
parser.add_argument(
'-n', '--db',
type=int,
default=0,
help='Numéro de base de données (défaut: 0)'
)
parser.add_argument(
'--pattern',
default='*',
help='Pattern de clés à scanner (défaut: *)'
)
parser.add_argument(
'-s', '--pattern-separator',
default=':',
metavar='CHAR',
help='Séparateur de préfixe (défaut: ":")'
)
parser.add_argument(
'-t', '--top',
type=int,
default=20,
metavar='N',
help='Nombre de patterns à afficher (défaut: 20)'
)
parser.add_argument(
'--no-memory',
action='store_true',
help='Désactiver l\'échantillonnage mémoire (plus rapide)'
)
parser.add_argument(
'-e', '--export',
metavar='FILE',
help='Exporter le rapport en CSV'
)
parser.add_argument(
'-v', '--verbose',
action='store_true',
help='Mode verbeux'
)
return parser.parse_args()
def main() -> int:
"""
Main entry point.
Returns:
Exit code (0 for success, non-zero for errors)
"""
args = setup_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
try:
# Initialize auditor
auditor = RedisKeyAuditor(
host=args.host,
port=args.port,
password=args.password,
db=args.db,
pattern_separator=args.pattern_separator
)
# Test connection
auditor.client.ping()
logger.info("Connected to Redis")
# Run audit
auditor.audit(
pattern=args.pattern,
memory_sampling=not args.no_memory
)
# Print report
auditor.print_report(top_n=args.top)
# Export if requested
if args.export:
auditor.export_csv(args.export)
return 0
except ConnectionError as e:
logger.error(f"Cannot connect to Redis: {e}")
return 1
except KeyboardInterrupt:
logger.warning("Interrupted by user")
return 130
except Exception as e:
logger.error(f"Unexpected error: {e}")
return 1
if __name__ == '__main__':
sys.exit(main())
Utilisation
Audit Basique
# Audit Redis local
python3 redis_key_auditor.py
# Audit instance distante
python3 redis_key_auditor.py --host redis.example.com --port 6379
# Avec authentification
python3 redis_key_auditor.py -H redis.example.com -a "mypassword"
Options Avancées
# Scanner un pattern spécifique
python3 redis_key_auditor.py --pattern "session:*"
# Séparateur personnalisé (pour keys type "app.cache.user.123")
python3 redis_key_auditor.py --pattern-separator "."
# Afficher le top 50 des patterns
python3 redis_key_auditor.py --top 50
# Mode rapide (sans échantillonnage mémoire)
python3 redis_key_auditor.py --no-memory
# Exporter en CSV
python3 redis_key_auditor.py --export audit_report.csv
Exemple de Sortie
================================================================================
REDIS KEY AUDIT REPORT
================================================================================
Host: localhost:6379 | Database: 0
Total Keys Scanned: 1,245,678
Total Memory Sampled: 2.34 GB
Unique Patterns: 47
================================================================================
Pattern Count Memory No TTL
--------------------------------------------------------------------------------
session:* 456,789 512.00 MB 0
cache:api:* 234,567 896.00 MB 34,567
user:profile:* 89,012 128.00 MB 0
queue:jobs:* 67,890 64.00 MB 67,890
rate_limit:* 45,678 32.00 MB 0
analytics:daily:* 34,567 256.00 MB 34,567
--------------------------------------------------------------------------------
⚠️ PATTERNS WITHOUT TTL (3 patterns):
These keys will never expire and may cause memory issues.
- queue:jobs:*: 67,890 keys
- cache:api:*: 34,567 keys (subset without TTL)
- analytics:daily:*: 34,567 keys
================================================================================
Options
| Option | Description |
|---|---|
-H, --host HOST |
Hôte Redis (défaut: localhost) |
-p, --port PORT |
Port Redis (défaut: 6379) |
-a, --password PWD |
Mot de passe Redis |
-n, --db NUM |
Numéro de base de données (défaut: 0) |
--pattern PATTERN |
Pattern de clés à scanner (défaut: *) |
-s, --pattern-separator |
Séparateur de préfixe (défaut: ":") |
-t, --top N |
Nombre de patterns à afficher (défaut: 20) |
--no-memory |
Désactiver l'échantillonnage mémoire |
-e, --export FILE |
Exporter en CSV |
-v, --verbose |
Mode verbeux |
SCAN vs KEYS
| Commande | Comportement | Production |
|---|---|---|
KEYS * |
Bloquant, parcourt tout | INTERDIT |
SCAN |
Itératif, non-bloquant | Recommandé |
Ce script utilise exclusivement SCAN avec un curseur itératif.
Bonnes Pratiques Redis
Toujours définir un TTL pour les clés temporaires :
# Python
redis_client.setex("session:abc123", 3600, "data") # Expire in 1h
# CLI
SET session:abc123 "data" EX 3600
Patterns de nommage :
- Utilisez des préfixes cohérents :
app:module:entity:id - Exemple :
myapp:cache:user:12345
Voir Aussi
- pg-bloat-check.sh - Analyse bloat PostgreSQL
- mysql-security-audit.sh - Audit sécurité MySQL
- health_checker.py - Vérification santé services