Skip to content

API & Réseau

Intégration avec des APIs REST, SSH et opérations réseau en Python.


Requests (HTTP)

Installation

pip install requests

Requêtes de Base

import requests

# GET
response = requests.get("https://api.github.com/users/octocat")
print(response.status_code)  # 200
print(response.json())       # Dict

# GET avec paramètres
response = requests.get(
    "https://api.example.com/servers",
    params={"env": "prod", "status": "active"}
)

# Headers
response = requests.get(
    "https://api.example.com/data",
    headers={
        "Authorization": "Bearer token123",
        "Accept": "application/json"
    }
)

# Timeout (toujours définir!)
response = requests.get("https://api.example.com", timeout=10)

POST, PUT, DELETE

import requests

# POST JSON
response = requests.post(
    "https://api.example.com/servers",
    json={
        "hostname": "web03",
        "ip": "10.0.0.13"
    },
    headers={"Authorization": "Bearer token123"}
)

# POST form data
response = requests.post(
    "https://api.example.com/login",
    data={"username": "admin", "password": "secret"}
)

# PUT
response = requests.put(
    "https://api.example.com/servers/web03",
    json={"status": "maintenance"}
)

# DELETE
response = requests.delete("https://api.example.com/servers/web03")

Gestion des Erreurs

import requests
from requests.exceptions import RequestException, Timeout, HTTPError

def call_api(url: str, timeout: int = 10) -> dict:
    """Appelle une API avec gestion d'erreurs."""
    try:
        response = requests.get(url, timeout=timeout)
        response.raise_for_status()  # Lève HTTPError si 4xx/5xx
        return response.json()

    except Timeout:
        print(f"Timeout calling {url}")
        return None
    except HTTPError as e:
        print(f"HTTP Error: {e.response.status_code}")
        return None
    except RequestException as e:
        print(f"Request failed: {e}")
        return None

Session (Réutiliser les Connexions)

import requests

# Session pour plusieurs requêtes
session = requests.Session()
session.headers.update({
    "Authorization": "Bearer token123",
    "Accept": "application/json"
})

# Les headers sont réutilisés
response1 = session.get("https://api.example.com/servers")
response2 = session.get("https://api.example.com/metrics")

# Fermer proprement
session.close()

# Ou avec context manager
with requests.Session() as session:
    session.headers["Authorization"] = "Bearer token123"
    response = session.get("https://api.example.com/data")

Exemple : Client API REST

import requests
from typing import Optional, List, Dict


class APIClient:
    """Client pour une API REST."""

    def __init__(self, base_url: str, api_key: str):
        self.base_url = base_url.rstrip("/")
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {api_key}",
            "Accept": "application/json",
            "Content-Type": "application/json"
        })

    def _request(self, method: str, endpoint: str, **kwargs) -> Optional[Dict]:
        """Effectue une requête."""
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        try:
            response = self.session.request(method, url, timeout=30, **kwargs)
            response.raise_for_status()
            return response.json() if response.text else None
        except requests.exceptions.RequestException as e:
            print(f"API Error: {e}")
            return None

    def get_servers(self) -> List[Dict]:
        """Liste tous les serveurs."""
        return self._request("GET", "/servers") or []

    def get_server(self, server_id: str) -> Optional[Dict]:
        """Récupère un serveur par ID."""
        return self._request("GET", f"/servers/{server_id}")

    def create_server(self, data: Dict) -> Optional[Dict]:
        """Crée un nouveau serveur."""
        return self._request("POST", "/servers", json=data)

    def update_server(self, server_id: str, data: Dict) -> Optional[Dict]:
        """Met à jour un serveur."""
        return self._request("PUT", f"/servers/{server_id}", json=data)

    def delete_server(self, server_id: str) -> bool:
        """Supprime un serveur."""
        result = self._request("DELETE", f"/servers/{server_id}")
        return result is not None


# Utilisation
client = APIClient("https://api.example.com", "my-api-key")

# CRUD
servers = client.get_servers()
new_server = client.create_server({"hostname": "web03", "ip": "10.0.0.13"})
client.update_server("web03", {"status": "active"})
client.delete_server("old-server")

Paramiko (SSH)

Installation

pip install paramiko

Connexion et Commandes

import paramiko

# Connexion par mot de passe
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

client.connect(
    hostname="10.0.0.10",
    username="admin",
    password="password",
    port=22
)

# Exécuter une commande
stdin, stdout, stderr = client.exec_command("uptime")
output = stdout.read().decode()
errors = stderr.read().decode()
print(output)

client.close()

Connexion par Clé SSH

import paramiko

client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

# Avec clé privée
client.connect(
    hostname="10.0.0.10",
    username="admin",
    key_filename="/home/user/.ssh/id_rsa",
    # ou pkey=paramiko.RSAKey.from_private_key_file("/path/to/key")
)

stdin, stdout, stderr = client.exec_command("hostname")
print(stdout.read().decode().strip())

client.close()

Classe SSH Réutilisable

import paramiko
from typing import Tuple, Optional


class SSHClient:
    """Client SSH réutilisable."""

    def __init__(self, hostname: str, username: str,
                 password: str = None, key_file: str = None, port: int = 22):
        self.hostname = hostname
        self.username = username
        self.password = password
        self.key_file = key_file
        self.port = port
        self.client = None

    def connect(self):
        """Établit la connexion SSH."""
        self.client = paramiko.SSHClient()
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

        connect_kwargs = {
            "hostname": self.hostname,
            "username": self.username,
            "port": self.port,
        }

        if self.key_file:
            connect_kwargs["key_filename"] = self.key_file
        elif self.password:
            connect_kwargs["password"] = self.password

        self.client.connect(**connect_kwargs)

    def run(self, command: str, timeout: int = 30) -> Tuple[str, str, int]:
        """Exécute une commande et retourne (stdout, stderr, exit_code)."""
        if not self.client:
            self.connect()

        stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout)
        exit_code = stdout.channel.recv_exit_status()

        return (
            stdout.read().decode().strip(),
            stderr.read().decode().strip(),
            exit_code
        )

    def close(self):
        """Ferme la connexion."""
        if self.client:
            self.client.close()

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()


# Utilisation
with SSHClient("10.0.0.10", "admin", key_file="~/.ssh/id_rsa") as ssh:
    stdout, stderr, code = ssh.run("uptime")
    print(f"Uptime: {stdout}")

    stdout, stderr, code = ssh.run("df -h /")
    print(f"Disk: {stdout}")

SFTP (Transfert de Fichiers)

import paramiko

# Connexion
transport = paramiko.Transport(("10.0.0.10", 22))
transport.connect(username="admin", password="password")
sftp = paramiko.SFTPClient.from_transport(transport)

# Upload
sftp.put("/local/file.txt", "/remote/file.txt")

# Download
sftp.get("/remote/file.txt", "/local/file.txt")

# Lister un répertoire
for entry in sftp.listdir("/var/log"):
    print(entry)

# Fermer
sftp.close()
transport.close()

Sockets (Bas Niveau)

Vérification de Port

import socket
from typing import List, Tuple


def check_port(host: str, port: int, timeout: float = 5.0) -> bool:
    """Vérifie si un port est ouvert."""
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
            sock.settimeout(timeout)
            result = sock.connect_ex((host, port))
            return result == 0
    except socket.error:
        return False


def scan_ports(host: str, ports: List[int]) -> List[Tuple[int, bool]]:
    """Scanne plusieurs ports."""
    results = []
    for port in ports:
        is_open = check_port(host, port)
        results.append((port, is_open))
    return results


# Utilisation
common_ports = [22, 80, 443, 3306, 5432, 6379]
results = scan_ports("10.0.0.10", common_ports)

for port, is_open in results:
    status = "OPEN" if is_open else "CLOSED"
    print(f"Port {port}: {status}")

Résolution DNS

import socket

# Résoudre un hostname
ip = socket.gethostbyname("google.com")
print(ip)

# Reverse DNS
hostname = socket.gethostbyaddr("8.8.8.8")
print(hostname)

# Toutes les IPs
ips = socket.getaddrinfo("google.com", 443)
for ip in ips:
    print(ip[4][0])

Webhooks

Envoyer un Webhook

import requests
import json
from datetime import datetime


def send_slack_webhook(webhook_url: str, message: str, channel: str = None):
    """Envoie un message à Slack."""
    payload = {
        "text": message,
        "username": "Python Bot",
        "icon_emoji": ":robot_face:"
    }
    if channel:
        payload["channel"] = channel

    response = requests.post(webhook_url, json=payload, timeout=10)
    return response.status_code == 200


def send_teams_webhook(webhook_url: str, title: str, message: str):
    """Envoie une carte à Microsoft Teams."""
    payload = {
        "@type": "MessageCard",
        "@context": "http://schema.org/extensions",
        "summary": title,
        "themeColor": "0076D7",
        "title": title,
        "sections": [{
            "activityTitle": message,
            "facts": [
                {"name": "Timestamp", "value": datetime.now().isoformat()}
            ]
        }]
    }

    response = requests.post(webhook_url, json=payload, timeout=10)
    return response.status_code == 200


# Utilisation
send_slack_webhook(
    "https://hooks.slack.com/services/XXX/YYY/ZZZ",
    ":warning: Server web01 is down!"
)

Polling & Retry

import time
import requests
from typing import Callable


def retry(func: Callable, max_attempts: int = 3, delay: float = 1.0):
    """Réessaie une fonction en cas d'échec."""
    last_exception = None

    for attempt in range(1, max_attempts + 1):
        try:
            return func()
        except Exception as e:
            last_exception = e
            print(f"Attempt {attempt} failed: {e}")
            if attempt < max_attempts:
                time.sleep(delay * attempt)  # Backoff exponentiel

    raise last_exception


def wait_for_service(url: str, timeout: int = 300, interval: int = 5) -> bool:
    """Attend qu'un service soit disponible."""
    start_time = time.time()

    while time.time() - start_time < timeout:
        try:
            response = requests.get(url, timeout=5)
            if response.status_code == 200:
                return True
        except requests.exceptions.RequestException:
            pass

        print(f"Waiting for {url}...")
        time.sleep(interval)

    return False


# Utilisation
if wait_for_service("http://localhost:8080/health", timeout=120):
    print("Service is ready!")
else:
    print("Service failed to start")

Voir Aussi