Skip to content

Module 12 - SSH & Automatisation Distante

Automatiser l'administration de serveurs distants via SSH.

Durée estimée : 15 minutes


Objectifs du Module

  • Utiliser Paramiko pour les connexions SSH
  • Exécuter des commandes à distance
  • Transférer des fichiers (SFTP)
  • Gérer plusieurs serveurs

1. Installation

pip install paramiko
pip install fabric  # Optionnel, pour le haut niveau

2. Connexion SSH avec Paramiko

Connexion Basique

import paramiko

# Créer le client SSH
client = paramiko.SSHClient()

# Accepter automatiquement les clés inconnues (attention en production!)
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

try:
    # Connexion par mot de passe
    client.connect(
        hostname="192.168.1.100",
        port=22,
        username="admin",
        password="secret",
        timeout=10
    )

    # Exécuter une commande
    stdin, stdout, stderr = client.exec_command("uptime")

    # Lire les résultats
    output = stdout.read().decode()
    error = stderr.read().decode()
    exit_code = stdout.channel.recv_exit_status()

    print(f"Output: {output}")
    print(f"Exit code: {exit_code}")

finally:
    client.close()

Connexion par Clé SSH

import paramiko
from pathlib import Path

client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

# Clé privée sans passphrase
private_key = paramiko.RSAKey.from_private_key_file(
    str(Path.home() / ".ssh" / "id_rsa")
)

# Clé avec passphrase
private_key = paramiko.RSAKey.from_private_key_file(
    str(Path.home() / ".ssh" / "id_rsa"),
    password="key_passphrase"
)

# Connexion
client.connect(
    hostname="192.168.1.100",
    username="admin",
    pkey=private_key
)

# Ou directement avec key_filename
client.connect(
    hostname="192.168.1.100",
    username="admin",
    key_filename=str(Path.home() / ".ssh" / "id_rsa")
)

Connexion via Agent SSH

import paramiko

client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

# Utiliser l'agent SSH (ssh-agent)
client.connect(
    hostname="192.168.1.100",
    username="admin",
    allow_agent=True,
    look_for_keys=True  # Cherche aussi dans ~/.ssh/
)

3. Exécution de Commandes

Commandes Simples

import paramiko

def run_command(client, command, timeout=30):
    """Exécute une commande et retourne le résultat."""
    stdin, stdout, stderr = client.exec_command(command, timeout=timeout)

    output = stdout.read().decode("utf-8")
    error = stderr.read().decode("utf-8")
    exit_code = stdout.channel.recv_exit_status()

    return {
        "stdout": output,
        "stderr": error,
        "exit_code": exit_code,
        "success": exit_code == 0
    }

# Utilisation
result = run_command(client, "df -h")
if result["success"]:
    print(result["stdout"])
else:
    print(f"Erreur: {result['stderr']}")

Commandes avec Sudo

def run_sudo_command(client, command, sudo_password):
    """Exécute une commande avec sudo."""
    sudo_command = f"echo {sudo_password} | sudo -S {command}"

    stdin, stdout, stderr = client.exec_command(sudo_command)

    output = stdout.read().decode()
    error = stderr.read().decode()

    # Filtrer le prompt sudo de stderr
    error_lines = [l for l in error.split('\n')
                   if not l.startswith('[sudo]')]

    return {
        "stdout": output,
        "stderr": '\n'.join(error_lines),
        "exit_code": stdout.channel.recv_exit_status()
    }

# Utilisation
result = run_sudo_command(client, "systemctl restart nginx", "sudo_password")

Shell Interactif

import paramiko
import time

def interactive_shell(client, commands):
    """Exécute plusieurs commandes dans un shell interactif."""
    channel = client.invoke_shell()
    time.sleep(0.5)  # Attendre le prompt

    output = ""

    for cmd in commands:
        channel.send(cmd + "\n")
        time.sleep(0.5)

        while channel.recv_ready():
            output += channel.recv(4096).decode()

    channel.close()
    return output

# Utilisation
commands = [
    "cd /var/log",
    "ls -la",
    "tail -5 syslog"
]
output = interactive_shell(client, commands)

4. Transfert de Fichiers (SFTP)

Opérations SFTP de Base

import paramiko
from pathlib import Path

# Ouvrir une session SFTP
sftp = client.open_sftp()

try:
    # Upload un fichier
    sftp.put("/local/path/file.txt", "/remote/path/file.txt")

    # Download un fichier
    sftp.get("/remote/path/file.txt", "/local/path/file.txt")

    # Avec callback de progression
    def progress(transferred, total):
        percent = (transferred / total) * 100
        print(f"\rProgress: {percent:.1f}%", end="")

    sftp.put("large_file.tar.gz", "/remote/large_file.tar.gz",
             callback=progress)

finally:
    sftp.close()

Opérations sur les Fichiers/Répertoires

# Lister un répertoire
files = sftp.listdir("/var/log")
for f in files:
    print(f)

# Avec attributs
for attr in sftp.listdir_attr("/var/log"):
    print(f"{attr.filename} - {attr.st_size} bytes - {attr.st_mtime}")

# Créer un répertoire
sftp.mkdir("/remote/new_dir")

# Supprimer un fichier
sftp.remove("/remote/file.txt")

# Supprimer un répertoire
sftp.rmdir("/remote/empty_dir")

# Renommer
sftp.rename("/remote/old.txt", "/remote/new.txt")

# Changer les permissions
sftp.chmod("/remote/script.sh", 0o755)

# Changer le propriétaire
sftp.chown("/remote/file.txt", uid=1000, gid=1000)

# Obtenir les attributs
stat = sftp.stat("/remote/file.txt")
print(f"Size: {stat.st_size}")
print(f"Modified: {stat.st_mtime}")

Upload/Download Récursif

import os
from pathlib import Path

def sftp_upload_dir(sftp, local_dir, remote_dir):
    """Upload récursif d'un répertoire."""
    local_path = Path(local_dir)

    # Créer le répertoire distant s'il n'existe pas
    try:
        sftp.stat(remote_dir)
    except FileNotFoundError:
        sftp.mkdir(remote_dir)

    for item in local_path.iterdir():
        remote_path = f"{remote_dir}/{item.name}"

        if item.is_dir():
            sftp_upload_dir(sftp, str(item), remote_path)
        else:
            print(f"Uploading: {item} -> {remote_path}")
            sftp.put(str(item), remote_path)

def sftp_download_dir(sftp, remote_dir, local_dir):
    """Download récursif d'un répertoire."""
    local_path = Path(local_dir)
    local_path.mkdir(parents=True, exist_ok=True)

    for attr in sftp.listdir_attr(remote_dir):
        remote_path = f"{remote_dir}/{attr.filename}"
        local_file = local_path / attr.filename

        if stat.S_ISDIR(attr.st_mode):
            sftp_download_dir(sftp, remote_path, str(local_file))
        else:
            print(f"Downloading: {remote_path} -> {local_file}")
            sftp.get(remote_path, str(local_file))

# Utilisation
sftp_upload_dir(sftp, "./deploy", "/var/www/app")
sftp_download_dir(sftp, "/var/log/app", "./logs")

5. Client SSH Réutilisable

import paramiko
from dataclasses import dataclass
from typing import Optional, Dict, Any, List
from pathlib import Path
import logging

@dataclass
class CommandResult:
    stdout: str
    stderr: str
    exit_code: int
    success: bool

class SSHClient:
    """Client SSH réutilisable avec gestion d'erreurs."""

    def __init__(
        self,
        hostname: str,
        username: str,
        password: Optional[str] = None,
        key_file: Optional[str] = None,
        port: int = 22,
        timeout: int = 30
    ):
        self.hostname = hostname
        self.username = username
        self.password = password
        self.key_file = key_file
        self.port = port
        self.timeout = timeout
        self.client = None
        self.sftp = None
        self.logger = logging.getLogger(__name__)

    def connect(self):
        """Établit la connexion SSH."""
        self.client = paramiko.SSHClient()
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

        connect_kwargs = {
            "hostname": self.hostname,
            "port": self.port,
            "username": self.username,
            "timeout": self.timeout
        }

        if self.key_file:
            connect_kwargs["key_filename"] = self.key_file
        elif self.password:
            connect_kwargs["password"] = self.password
        else:
            connect_kwargs["allow_agent"] = True
            connect_kwargs["look_for_keys"] = True

        self.logger.info(f"Connecting to {self.hostname}...")
        self.client.connect(**connect_kwargs)
        self.logger.info(f"Connected to {self.hostname}")

    def disconnect(self):
        """Ferme la connexion."""
        if self.sftp:
            self.sftp.close()
            self.sftp = None
        if self.client:
            self.client.close()
            self.client = None
        self.logger.info(f"Disconnected from {self.hostname}")

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.disconnect()

    def run(self, command: str, timeout: int = None) -> CommandResult:
        """Exécute une commande."""
        if not self.client:
            raise RuntimeError("Not connected")

        self.logger.debug(f"Running: {command}")

        stdin, stdout, stderr = self.client.exec_command(
            command,
            timeout=timeout or self.timeout
        )

        out = stdout.read().decode("utf-8")
        err = stderr.read().decode("utf-8")
        code = stdout.channel.recv_exit_status()

        result = CommandResult(
            stdout=out,
            stderr=err,
            exit_code=code,
            success=code == 0
        )

        if not result.success:
            self.logger.warning(f"Command failed: {command} (code={code})")

        return result

    def sudo(self, command: str, password: str = None) -> CommandResult:
        """Exécute une commande avec sudo."""
        pwd = password or self.password
        if not pwd:
            raise ValueError("Password required for sudo")

        sudo_cmd = f"echo {pwd} | sudo -S {command}"
        return self.run(sudo_cmd)

    def get_sftp(self):
        """Retourne une session SFTP."""
        if not self.sftp:
            self.sftp = self.client.open_sftp()
        return self.sftp

    def upload(self, local_path: str, remote_path: str):
        """Upload un fichier."""
        sftp = self.get_sftp()
        self.logger.info(f"Uploading {local_path} -> {remote_path}")
        sftp.put(local_path, remote_path)

    def download(self, remote_path: str, local_path: str):
        """Download un fichier."""
        sftp = self.get_sftp()
        self.logger.info(f"Downloading {remote_path} -> {local_path}")
        sftp.get(remote_path, local_path)

    def read_file(self, remote_path: str) -> str:
        """Lit le contenu d'un fichier distant."""
        sftp = self.get_sftp()
        with sftp.open(remote_path, "r") as f:
            return f.read().decode("utf-8")

    def write_file(self, remote_path: str, content: str):
        """Écrit du contenu dans un fichier distant."""
        sftp = self.get_sftp()
        with sftp.open(remote_path, "w") as f:
            f.write(content)

# Utilisation
with SSHClient("192.168.1.100", "admin", password="secret") as ssh:
    # Exécuter des commandes
    result = ssh.run("uptime")
    print(result.stdout)

    # Avec sudo
    result = ssh.sudo("systemctl status nginx")

    # Transfert de fichiers
    ssh.upload("config.yml", "/etc/myapp/config.yml")

6. Gestion Multi-Serveurs

Exécution Parallèle

from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import List, Dict

@dataclass
class ServerConfig:
    hostname: str
    username: str
    password: str = None
    key_file: str = None

@dataclass
class ServerResult:
    hostname: str
    success: bool
    output: str
    error: str = None

def run_on_server(server: ServerConfig, command: str) -> ServerResult:
    """Exécute une commande sur un serveur."""
    try:
        with SSHClient(
            server.hostname,
            server.username,
            password=server.password,
            key_file=server.key_file
        ) as ssh:
            result = ssh.run(command)
            return ServerResult(
                hostname=server.hostname,
                success=result.success,
                output=result.stdout,
                error=result.stderr if not result.success else None
            )
    except Exception as e:
        return ServerResult(
            hostname=server.hostname,
            success=False,
            output="",
            error=str(e)
        )

def run_parallel(servers: List[ServerConfig], command: str, max_workers: int = 10) -> List[ServerResult]:
    """Exécute une commande sur plusieurs serveurs en parallèle."""
    results = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(run_on_server, server, command): server
            for server in servers
        }

        for future in as_completed(futures):
            result = future.result()
            results.append(result)
            status = "✓" if result.success else "✗"
            print(f"{status} {result.hostname}")

    return results

# Utilisation
servers = [
    ServerConfig("web01.example.com", "admin", key_file="~/.ssh/id_rsa"),
    ServerConfig("web02.example.com", "admin", key_file="~/.ssh/id_rsa"),
    ServerConfig("db01.example.com", "admin", key_file="~/.ssh/id_rsa"),
]

results = run_parallel(servers, "uptime")

for r in results:
    print(f"\n{r.hostname}:")
    if r.success:
        print(r.output)
    else:
        print(f"Error: {r.error}")

Gestionnaire de Serveurs

import yaml
from pathlib import Path

class ServerManager:
    """Gestionnaire de flotte de serveurs."""

    def __init__(self, inventory_file: str = "inventory.yml"):
        self.servers = self._load_inventory(inventory_file)

    def _load_inventory(self, path: str) -> Dict[str, List[ServerConfig]]:
        """Charge l'inventaire depuis un fichier YAML."""
        with open(path) as f:
            data = yaml.safe_load(f)

        inventory = {}
        for group, hosts in data.get("groups", {}).items():
            inventory[group] = [
                ServerConfig(
                    hostname=h["host"],
                    username=h.get("user", "admin"),
                    key_file=h.get("key_file")
                )
                for h in hosts
            ]

        return inventory

    def get_group(self, group: str) -> List[ServerConfig]:
        """Retourne les serveurs d'un groupe."""
        return self.servers.get(group, [])

    def run_on_group(self, group: str, command: str) -> List[ServerResult]:
        """Exécute une commande sur un groupe de serveurs."""
        servers = self.get_group(group)
        return run_parallel(servers, command)

    def deploy_file(self, group: str, local_path: str, remote_path: str):
        """Déploie un fichier sur un groupe de serveurs."""
        def deploy(server):
            try:
                with SSHClient(server.hostname, server.username,
                              key_file=server.key_file) as ssh:
                    ssh.upload(local_path, remote_path)
                    return ServerResult(server.hostname, True, "Deployed")
            except Exception as e:
                return ServerResult(server.hostname, False, "", str(e))

        servers = self.get_group(group)
        with ThreadPoolExecutor(max_workers=10) as executor:
            results = list(executor.map(deploy, servers))

        return results

# Fichier inventory.yml:
# groups:
#   webservers:
#     - host: web01.example.com
#       user: deploy
#       key_file: ~/.ssh/deploy_key
#     - host: web02.example.com
#       user: deploy
#   databases:
#     - host: db01.example.com
#       user: admin

# Utilisation
manager = ServerManager("inventory.yml")
results = manager.run_on_group("webservers", "systemctl status nginx")
manager.deploy_file("webservers", "app.conf", "/etc/nginx/sites-enabled/app.conf")

7. Tunneling SSH

Port Forwarding Local

import paramiko
from sshtunnel import SSHTunnelForwarder

# Avec sshtunnel (pip install sshtunnel)
with SSHTunnelForwarder(
    ("jumpbox.example.com", 22),
    ssh_username="admin",
    ssh_pkey="~/.ssh/id_rsa",
    remote_bind_address=("db.internal", 3306),
    local_bind_address=("127.0.0.1", 3307)
) as tunnel:
    print(f"Tunnel ouvert: localhost:{tunnel.local_bind_port} -> db.internal:3306")
    # Utiliser la connexion tunnelisée
    # mysql -h 127.0.0.1 -P 3307 -u user -p

# Tunnel vers plusieurs ports
with SSHTunnelForwarder(
    ("jumpbox.example.com", 22),
    ssh_username="admin",
    ssh_pkey="~/.ssh/id_rsa",
    remote_bind_addresses=[
        ("db.internal", 3306),
        ("redis.internal", 6379)
    ]
) as tunnel:
    print(f"MySQL: localhost:{tunnel.local_bind_ports[0]}")
    print(f"Redis: localhost:{tunnel.local_bind_ports[1]}")

Jump Host (Bastion)

import paramiko

def connect_via_jump(jump_host, target_host, username, key_file):
    """Connexion via un jump host."""
    # Connexion au jump host
    jump_client = paramiko.SSHClient()
    jump_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    jump_client.connect(
        jump_host,
        username=username,
        key_filename=key_file
    )

    # Créer un canal vers le serveur cible
    jump_transport = jump_client.get_transport()
    dest_addr = (target_host, 22)
    local_addr = ("127.0.0.1", 0)

    channel = jump_transport.open_channel(
        "direct-tcpip",
        dest_addr,
        local_addr
    )

    # Connexion au serveur cible via le canal
    target_client = paramiko.SSHClient()
    target_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    target_client.connect(
        target_host,
        username=username,
        key_filename=key_file,
        sock=channel
    )

    return target_client, jump_client

# Utilisation
target, jump = connect_via_jump(
    "bastion.example.com",
    "internal-server.local",
    "admin",
    "~/.ssh/id_rsa"
)

stdin, stdout, stderr = target.exec_command("hostname")
print(stdout.read().decode())

target.close()
jump.close()

Exercices Pratiques

Exercice 1 : Health Check Distribué

# Créer un script qui :
# - Se connecte à plusieurs serveurs en parallèle
# - Collecte CPU, mémoire, espace disque
# - Génère un rapport HTML

Exercice 2 : Déploiement Automatisé

# Créer un script de déploiement qui :
# - Upload les fichiers d'application
# - Exécute les migrations
# - Redémarre les services
# - Vérifie la santé de l'application

Exercice 3 : Backup Centralisé

# Créer un script qui :
# - Se connecte à plusieurs serveurs
# - Crée des backups (tar.gz)
# - Les télécharge vers un serveur central
# - Nettoie les anciens backups

Points Clés à Retenir

Bonnes Pratiques

  • Utiliser les clés SSH plutôt que les mots de passe
  • Toujours fermer les connexions proprement
  • Gérer les timeouts
  • Logger les opérations
  • Utiliser l'agent SSH quand possible

Sécurité

  • Ne jamais utiliser AutoAddPolicy en production
  • Vérifier les empreintes des clés host
  • Ne pas stocker les mots de passe en clair
  • Utiliser des clés SSH avec passphrase

Voir Aussi


← Module 11 - APIs REST & HTTP Module 13 - Création d'Outils CLI →

Retour au Programme