Module 10 - Programmation Réseau
Comprendre et utiliser les sockets pour la communication réseau.
Durée estimée : 15 minutes
Objectifs du Module
- Comprendre les concepts réseau (TCP, UDP)
- Utiliser le module socket
- Créer des clients et serveurs simples
- Scanner et diagnostiquer des réseaux
1. Concepts Fondamentaux
Le Modèle TCP/IP
TCP vs UDP
| Caractéristique | TCP | UDP |
|---|---|---|
| Connexion | Orienté connexion | Sans connexion |
| Fiabilité | Garantie de livraison | Pas de garantie |
| Ordre | Préservé | Non garanti |
| Usage | HTTP, SSH, FTP | DNS, DHCP, streaming |
2. Module socket
Client TCP Simple
import socket
def tcp_client(host, port, message):
"""Client TCP simple."""
# Créer le socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# Se connecter
sock.connect((host, port))
# Envoyer des données
sock.sendall(message.encode())
# Recevoir la réponse
response = sock.recv(4096)
return response.decode()
finally:
sock.close()
# Utilisation
response = tcp_client("example.com", 80, "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
print(response)
Client TCP avec Context Manager
import socket
def tcp_request(host, port, data, timeout=10):
"""Requête TCP avec timeout et context manager."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
sock.connect((host, port))
sock.sendall(data.encode() if isinstance(data, str) else data)
# Recevoir toute la réponse
chunks = []
while True:
chunk = sock.recv(4096)
if not chunk:
break
chunks.append(chunk)
return b"".join(chunks)
Serveur TCP Simple
import socket
def tcp_server(host, port):
"""Serveur TCP echo simple."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
# Réutiliser l'adresse
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((host, port))
server.listen(5)
print(f"Serveur en écoute sur {host}:{port}")
while True:
client, address = server.accept()
print(f"Connexion de {address}")
with client:
while True:
data = client.recv(1024)
if not data:
break
# Echo : renvoie les données reçues
client.sendall(data)
print(f"Déconnexion de {address}")
# Lancer le serveur
# tcp_server("0.0.0.0", 9999)
Serveur Multi-clients (Threading)
import socket
import threading
class ThreadedTCPServer:
"""Serveur TCP multi-clients."""
def __init__(self, host, port):
self.host = host
self.port = port
self.server = None
self.running = False
def start(self):
"""Démarre le serveur."""
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind((self.host, self.port))
self.server.listen(5)
self.running = True
print(f"Serveur démarré sur {self.host}:{self.port}")
while self.running:
try:
client, address = self.server.accept()
thread = threading.Thread(
target=self.handle_client,
args=(client, address)
)
thread.daemon = True
thread.start()
except socket.error:
break
def handle_client(self, client, address):
"""Gère un client dans un thread séparé."""
print(f"Nouveau client: {address}")
try:
while True:
data = client.recv(1024)
if not data:
break
# Traiter les données
response = self.process(data)
client.sendall(response)
finally:
client.close()
print(f"Client déconnecté: {address}")
def process(self, data):
"""Traite les données reçues."""
return data.upper()
def stop(self):
"""Arrête le serveur."""
self.running = False
if self.server:
self.server.close()
3. Client/Serveur UDP
Client UDP
import socket
def udp_client(host, port, message):
"""Client UDP simple."""
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.settimeout(5)
# Envoyer (pas de connexion préalable)
sock.sendto(message.encode(), (host, port))
# Recevoir la réponse
data, server = sock.recvfrom(4096)
return data.decode()
# Exemple : requête DNS simplifiée
response = udp_client("8.8.8.8", 53, dns_query)
Serveur UDP
import socket
def udp_server(host, port):
"""Serveur UDP simple."""
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.bind((host, port))
print(f"Serveur UDP en écoute sur {host}:{port}")
while True:
data, client = sock.recvfrom(1024)
print(f"Reçu de {client}: {data}")
# Répondre
response = data.upper()
sock.sendto(response, client)
4. Résolution DNS
import socket
# Résolution simple
ip = socket.gethostbyname("google.com")
print(f"IP: {ip}")
# Résolution complète
info = socket.getaddrinfo("google.com", 443)
for family, socktype, proto, canonname, sockaddr in info:
print(f"{family.name}: {sockaddr}")
# Résolution inverse
hostname = socket.gethostbyaddr("8.8.8.8")
print(f"Hostname: {hostname[0]}")
# Nom de la machine locale
print(socket.gethostname())
print(socket.getfqdn())
5. Outils Réseau SysOps
Port Scanner
import socket
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Tuple
def scan_port(host: str, port: int, timeout: float = 1) -> Tuple[int, bool]:
"""Scan un port individuel."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
return port, result == 0
except socket.error:
return port, False
def scan_ports(host: str, ports: List[int], max_workers: int = 100) -> dict:
"""Scan plusieurs ports en parallèle."""
open_ports = []
closed_ports = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(scan_port, host, port): port
for port in ports
}
for future in as_completed(futures):
port, is_open = future.result()
if is_open:
open_ports.append(port)
else:
closed_ports.append(port)
return {
"host": host,
"open": sorted(open_ports),
"closed_count": len(closed_ports)
}
# Utilisation
result = scan_ports("192.168.1.1", range(1, 1025))
print(f"Ports ouverts: {result['open']}")
Vérification de Connectivité
import socket
from dataclasses import dataclass
from typing import Optional
@dataclass
class ConnectivityResult:
host: str
port: int
reachable: bool
latency_ms: Optional[float]
error: Optional[str]
def check_connectivity(host: str, port: int, timeout: float = 5) -> ConnectivityResult:
"""Vérifie la connectivité vers un host:port."""
import time
start = time.time()
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
sock.connect((host, port))
latency = (time.time() - start) * 1000
return ConnectivityResult(
host=host,
port=port,
reachable=True,
latency_ms=round(latency, 2),
error=None
)
except socket.timeout:
return ConnectivityResult(host, port, False, None, "Timeout")
except socket.error as e:
return ConnectivityResult(host, port, False, None, str(e))
# Vérifier plusieurs endpoints
endpoints = [
("google.com", 443),
("github.com", 22),
("192.168.1.1", 80),
]
for host, port in endpoints:
result = check_connectivity(host, port)
status = "✓" if result.reachable else "✗"
print(f"{status} {host}:{port} - {result.latency_ms or result.error}")
Service Banner Grabbing
import socket
def grab_banner(host: str, port: int, timeout: float = 3) -> str:
"""Récupère la bannière d'un service."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
sock.connect((host, port))
# Certains services envoient une bannière immédiatement
# Pour HTTP, il faut envoyer une requête
if port in [80, 8080, 443]:
sock.sendall(b"HEAD / HTTP/1.0\r\n\r\n")
banner = sock.recv(1024)
return banner.decode(errors="ignore").strip()
except Exception as e:
return f"Error: {e}"
# Récupérer les bannières
services = [
("192.168.1.1", 22), # SSH
("192.168.1.1", 80), # HTTP
("192.168.1.1", 21), # FTP
]
for host, port in services:
banner = grab_banner(host, port)
print(f"{host}:{port} -> {banner[:50]}")
Résolveur DNS Personnalisé
import socket
import struct
def dns_query(domain: str, dns_server: str = "8.8.8.8") -> list:
"""Effectue une requête DNS simple."""
# Construire la requête DNS
transaction_id = b"\xaa\xbb"
flags = b"\x01\x00" # Standard query
questions = b"\x00\x01"
answer_rrs = b"\x00\x00"
authority_rrs = b"\x00\x00"
additional_rrs = b"\x00\x00"
header = transaction_id + flags + questions + answer_rrs + authority_rrs + additional_rrs
# Encoder le nom de domaine
qname = b""
for part in domain.split("."):
qname += bytes([len(part)]) + part.encode()
qname += b"\x00"
qtype = b"\x00\x01" # A record
qclass = b"\x00\x01" # IN class
question = qname + qtype + qclass
query = header + question
# Envoyer la requête
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.settimeout(5)
sock.sendto(query, (dns_server, 53))
response, _ = sock.recvfrom(512)
# Parser la réponse (simplifié)
answer_count = struct.unpack(">H", response[6:8])[0]
print(f"Réponses: {answer_count}")
# Les IPs sont à la fin de la réponse pour les A records
ips = []
pos = len(query)
for _ in range(answer_count):
# Skip name, type, class, ttl
pos += 12
data_len = struct.unpack(">H", response[pos-2:pos])[0]
if data_len == 4: # IPv4
ip = ".".join(str(b) for b in response[pos:pos+4])
ips.append(ip)
pos += data_len
return ips
# Utilisation
ips = dns_query("google.com")
print(f"IPs: {ips}")
6. Informations Réseau Locales
import socket
import fcntl
import struct
def get_local_ip():
"""Obtient l'IP locale utilisée pour les connexions sortantes."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except Exception:
return "127.0.0.1"
def get_interface_ip(interface: str) -> str:
"""Obtient l'IP d'une interface (Linux)."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ip = socket.inet_ntoa(fcntl.ioctl(
sock.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', interface.encode()[:15])
)[20:24])
return ip
except Exception as e:
return f"Error: {e}"
# Lister toutes les interfaces (avec netifaces)
def list_interfaces():
"""Liste les interfaces réseau (nécessite netifaces)."""
try:
import netifaces
for iface in netifaces.interfaces():
addrs = netifaces.ifaddresses(iface)
if netifaces.AF_INET in addrs:
for addr in addrs[netifaces.AF_INET]:
print(f"{iface}: {addr['addr']}")
except ImportError:
print("Module netifaces non installé")
print(f"IP locale: {get_local_ip()}")
7. Patterns Avancés
Timeout et Retry
import socket
import time
def robust_connect(host: str, port: int, retries: int = 3, delay: float = 1) -> socket.socket:
"""Connexion avec retry automatique."""
last_error = None
for attempt in range(retries):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
sock.connect((host, port))
return sock
except socket.error as e:
last_error = e
if attempt < retries - 1:
time.sleep(delay * (attempt + 1))
sock.close()
raise last_error
Non-Blocking Sockets
import socket
import select
def non_blocking_server(host, port):
"""Serveur non-bloquant avec select."""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.setblocking(False)
server.bind((host, port))
server.listen(5)
inputs = [server]
outputs = []
while inputs:
readable, writable, exceptional = select.select(inputs, outputs, inputs)
for sock in readable:
if sock is server:
# Nouvelle connexion
client, addr = sock.accept()
client.setblocking(False)
inputs.append(client)
else:
# Données d'un client
data = sock.recv(1024)
if data:
# Traiter les données
if sock not in outputs:
outputs.append(sock)
else:
# Client déconnecté
if sock in outputs:
outputs.remove(sock)
inputs.remove(sock)
sock.close()
for sock in exceptional:
inputs.remove(sock)
if sock in outputs:
outputs.remove(sock)
sock.close()
Exercices Pratiques
Exercice 1 : Ping TCP
# Créer une fonction tcp_ping() qui :
# - Mesure le temps de connexion TCP
# - Supporte plusieurs tentatives
# - Retourne min/max/avg latency
Exercice 2 : Serveur de Monitoring
# Créer un serveur TCP qui :
# - Accepte des connexions de clients
# - Répond aux commandes : STATUS, UPTIME, MEMORY
# - Log toutes les connexions
Exercice 3 : Network Discovery
# Créer un script qui :
# - Scanne un subnet (ex: 192.168.1.0/24)
# - Identifie les hôtes actifs (ping TCP port 22 ou 80)
# - Liste les services courants ouverts
Points Clés à Retenir
Bonnes Pratiques
- Toujours définir un timeout sur les sockets
- Utiliser les context managers (
with) - Gérer proprement les erreurs réseau
- Fermer les connexions après usage
Sécurité
- Ne jamais exposer de sockets sans authentification
- Valider toutes les données reçues
- Utiliser SSL/TLS pour les données sensibles
- Limiter le rate des connexions
Voir Aussi
Navigation
| ← Module 09 - Gestion des Erreurs & Log... | Module 11 - APIs REST & HTTP → |