#!/usr/bin/env python3 """ Script pour comparer les schémas de tables entre deux bases de données Utile pour vérifier la compatibilité avant migration """ import argparse import configparser import os import sys import time import signal import subprocess import mysql.connector from datetime import datetime from tabulate import tabulate def create_config_if_not_exists(): """Crée un fichier de configuration s'il n'existe pas déjà""" config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'python', 'db_config.ini') if not os.path.exists(os.path.dirname(config_path)): os.makedirs(os.path.dirname(config_path)) if not os.path.exists(config_path): config = configparser.ConfigParser() config['SSH'] = { 'host': 'serveur-distant.exemple.com', 'port': '22', 'user': 'utilisateur', 'key_file': '/chemin/vers/cle_ssh' } config['REMOTE_DB'] = { 'host': 'localhost', # Hôte de la base sur le serveur distant 'port': '3306' # Port de la base sur le serveur distant } config['SOURCE_DB'] = { 'host': 'localhost', # Hôte local pour le tunnel SSH 'database': 'geosector', 'user': 'utilisateur_db', 'password': 'mot_de_passe', 'port': '13306' # Port local pour le tunnel SSH } config['TARGET_DB'] = { 'host': 'localhost', 'database': 'geosector_app', 'user': 'root', 'password': '', 'port': '3306' } with open(config_path, 'w') as configfile: config.write(configfile) print(f"Fichier de configuration créé: {config_path}") return config_path def get_db_config(): """Charge la configuration de la base de données""" config_path = create_config_if_not_exists() config = configparser.ConfigParser() config.read(config_path) return config # Variable globale pour stocker le processus du tunnel SSH ssh_tunnel_process = None def create_ssh_tunnel(ssh_config, remote_db_config, source_db_config): """Crée un tunnel SSH vers le serveur distant""" global ssh_tunnel_process # Vérifier si un tunnel SSH est déjà en cours d'exécution try: # Commande pour vérifier si le tunnel est déjà en cours d'exécution check_command = f"ps aux | grep 'ssh -f -N -L {source_db_config['port']}:{remote_db_config['host']}:{remote_db_config['port']}' | grep -v grep" result = subprocess.run(check_command, shell=True, capture_output=True, text=True) if result.stdout.strip(): print("Un tunnel SSH est déjà en cours d'exécution") return True # Construire la commande SSH pour établir le tunnel ssh_command = [ 'ssh', '-f', '-N', '-L', f"{source_db_config['port']}:{remote_db_config['host']}:{remote_db_config['port']}", '-p', ssh_config['port'], '-i', ssh_config['key_file'], f"{ssh_config['user']}@{ssh_config['host']}" ] print(f"Création d'un tunnel SSH vers {ssh_config['host']}...") ssh_tunnel_process = subprocess.Popen(ssh_command) # Attendre que le tunnel soit établi time.sleep(2) # Vérifier si le processus est toujours en cours d'exécution if ssh_tunnel_process.poll() is None: print(f"Tunnel SSH établi sur le port local {source_db_config['port']}") return True else: print("Erreur lors de la création du tunnel SSH") return False except Exception as e: print(f"Erreur lors de la création du tunnel SSH: {e}") return False def close_ssh_tunnel(): """Ferme le tunnel SSH""" global ssh_tunnel_process if ssh_tunnel_process is not None: try: # Tuer le processus SSH ssh_tunnel_process.terminate() ssh_tunnel_process.wait(timeout=5) print("Tunnel SSH fermé") except Exception as e: print(f"Erreur lors de la fermeture du tunnel SSH: {e}") # Forcer la fermeture si nécessaire try: ssh_tunnel_process.kill() except: pass # Rechercher et tuer tous les processus SSH correspondants try: kill_command = "ps aux | grep 'ssh -f -N -L' | grep -v grep | awk '{print $2}' | xargs kill -9 2>/dev/null" subprocess.run(kill_command, shell=True) except: pass def connect_to_db(db_config): """Se connecte à une base de données MySQL/MariaDB""" try: connection = mysql.connector.connect( host=db_config['host'], database=db_config['database'], user=db_config['user'], password=db_config['password'], port=int(db_config['port']) ) return connection except mysql.connector.Error as err: print(f"Erreur de connexion à la base de données: {err}") sys.exit(1) def get_table_schema(connection, table_name): """Récupère le schéma d'une table""" cursor = connection.cursor(dictionary=True) cursor.execute(f"DESCRIBE {table_name}") columns = cursor.fetchall() cursor.close() return columns def get_all_tables(connection): """Récupère toutes les tables d'une base de données""" cursor = connection.cursor() cursor.execute("SHOW TABLES") tables = [table[0] for table in cursor.fetchall()] cursor.close() return tables def compare_tables(source_schema, target_schema): """Compare les schémas de deux tables""" source_columns = {col['Field']: col for col in source_schema} target_columns = {col['Field']: col for col in target_schema} # Colonnes présentes dans les deux tables common_columns = set(source_columns.keys()) & set(target_columns.keys()) # Colonnes uniquement dans la source source_only = set(source_columns.keys()) - set(target_columns.keys()) # Colonnes uniquement dans la cible target_only = set(target_columns.keys()) - set(source_columns.keys()) # Différences dans les colonnes communes differences = [] for col_name in common_columns: source_col = source_columns[col_name] target_col = target_columns[col_name] if source_col['Type'] != target_col['Type'] or \ source_col['Null'] != target_col['Null'] or \ source_col['Key'] != target_col['Key'] or \ source_col['Default'] != target_col['Default']: differences.append({ 'Column': col_name, 'Source_Type': source_col['Type'], 'Target_Type': target_col['Type'], 'Source_Null': source_col['Null'], 'Target_Null': target_col['Null'], 'Source_Key': source_col['Key'], 'Target_Key': target_col['Key'], 'Source_Default': source_col['Default'], 'Target_Default': target_col['Default'] }) return { 'common': common_columns, 'source_only': source_only, 'target_only': target_only, 'differences': differences } def generate_report(table_name, comparison, output_file=None): """Génère un rapport de comparaison""" report = [] report.append(f"Rapport de comparaison pour la table: {table_name}") report.append(f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") report.append("") # Colonnes communes report.append(f"Colonnes communes ({len(comparison['common'])}):") if comparison['common']: report.append(", ".join(sorted(comparison['common']))) else: report.append("Aucune") report.append("") # Colonnes uniquement dans la source report.append(f"Colonnes uniquement dans la source ({len(comparison['source_only'])}):") if comparison['source_only']: report.append(", ".join(sorted(comparison['source_only']))) else: report.append("Aucune") report.append("") # Colonnes uniquement dans la cible report.append(f"Colonnes uniquement dans la cible ({len(comparison['target_only'])}):") if comparison['target_only']: report.append(", ".join(sorted(comparison['target_only']))) else: report.append("Aucune") report.append("") # Différences dans les colonnes communes report.append(f"Différences dans les colonnes communes ({len(comparison['differences'])}):") if comparison['differences']: headers = ["Colonne", "Type Source", "Type Cible", "Null Source", "Null Cible", "Clé Source", "Clé Cible", "Défaut Source", "Défaut Cible"] table_data = [] for diff in comparison['differences']: table_data.append([ diff['Column'], diff['Source_Type'], diff['Target_Type'], diff['Source_Null'], diff['Target_Null'], diff['Source_Key'], diff['Target_Key'], diff['Source_Default'] or 'NULL', diff['Target_Default'] or 'NULL' ]) report.append(tabulate(table_data, headers=headers, tablefmt="grid")) else: report.append("Aucune différence") report_text = "\n".join(report) if output_file: with open(output_file, 'w') as f: f.write(report_text) print(f"Rapport enregistré dans: {output_file}") return report_text def main(): parser = argparse.ArgumentParser(description='Compare les schémas de tables entre deux bases de données') parser.add_argument('table', help='Nom de la table à comparer') parser.add_argument('--output', '-o', help='Fichier de sortie pour le rapport') parser.add_argument('--no-ssh', action='store_true', help='Ne pas utiliser de tunnel SSH') args = parser.parse_args() table_name = args.table output_file = args.output use_ssh = not args.no_ssh # Créer le dossier de logs si nécessaire logs_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs') if not os.path.exists(logs_dir): os.makedirs(logs_dir) # Si aucun fichier de sortie n'est spécifié, en créer un dans le dossier logs if not output_file: output_file = os.path.join(logs_dir, f"schema_comparison_{table_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt") # Charger la configuration config = get_db_config() # Établir un tunnel SSH si nécessaire if use_ssh and 'SSH' in config and 'REMOTE_DB' in config: if not create_ssh_tunnel(config['SSH'], config['REMOTE_DB'], config['SOURCE_DB']): print("Impossible d'établir le tunnel SSH. Abandon.") sys.exit(1) # Se connecter aux bases de données source_conn = connect_to_db(config['SOURCE_DB']) target_conn = connect_to_db(config['TARGET_DB']) # Vérifier si la table existe dans les deux bases source_tables = get_all_tables(source_conn) target_tables = get_all_tables(target_conn) if table_name not in source_tables: print(f"Erreur: La table '{table_name}' n'existe pas dans la base source.") sys.exit(1) if table_name not in target_tables: print(f"Avertissement: La table '{table_name}' n'existe pas dans la base cible.") print("Voulez-vous voir uniquement le schéma de la table source? (o/n)") response = input().lower() if response != 'o': sys.exit(0) # Afficher uniquement le schéma de la table source source_schema = get_table_schema(source_conn, table_name) print(f"\nSchéma de la table '{table_name}' dans la base source:") headers = ["Champ", "Type", "Null", "Clé", "Défaut", "Extra"] table_data = [[col['Field'], col['Type'], col['Null'], col['Key'], col['Default'] or 'NULL', col['Extra']] for col in source_schema] print(tabulate(table_data, headers=headers, tablefmt="grid")) # Enregistrer le schéma dans un fichier with open(output_file, 'w') as f: f.write(f"Schéma de la table '{table_name}' dans la base source:\n") f.write(tabulate(table_data, headers=headers, tablefmt="grid")) print(f"Schéma enregistré dans: {output_file}") sys.exit(0) # Récupérer les schémas des tables source_schema = get_table_schema(source_conn, table_name) target_schema = get_table_schema(target_conn, table_name) # Comparer les schémas comparison = compare_tables(source_schema, target_schema) # Générer et afficher le rapport report = generate_report(table_name, comparison, output_file) print(report) # Fermer les connexions source_conn.close() target_conn.close() if __name__ == "__main__": try: # Configurer le gestionnaire de signal pour fermer proprement le tunnel SSH signal.signal(signal.SIGINT, lambda sig, frame: (close_ssh_tunnel(), sys.exit(0))) signal.signal(signal.SIGTERM, lambda sig, frame: (close_ssh_tunnel(), sys.exit(0))) main() finally: # Fermer le tunnel SSH à la fin du script close_ssh_tunnel()