372 lines
13 KiB
Python
Executable File
372 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Script pour comparer les schémas de tables entre deux bases de données
|
|
Utile pour vérifier la compatibilité avant migration
|
|
"""
|
|
|
|
import argparse
|
|
import configparser
|
|
import os
|
|
import sys
|
|
import time
|
|
import signal
|
|
import subprocess
|
|
import mysql.connector
|
|
from datetime import datetime
|
|
from tabulate import tabulate
|
|
|
|
|
|
def create_config_if_not_exists():
|
|
"""Crée un fichier de configuration s'il n'existe pas déjà"""
|
|
config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'python', 'db_config.ini')
|
|
|
|
if not os.path.exists(os.path.dirname(config_path)):
|
|
os.makedirs(os.path.dirname(config_path))
|
|
|
|
if not os.path.exists(config_path):
|
|
config = configparser.ConfigParser()
|
|
|
|
config['SSH'] = {
|
|
'host': 'serveur-distant.exemple.com',
|
|
'port': '22',
|
|
'user': 'utilisateur',
|
|
'key_file': '/chemin/vers/cle_ssh'
|
|
}
|
|
|
|
config['REMOTE_DB'] = {
|
|
'host': 'localhost', # Hôte de la base sur le serveur distant
|
|
'port': '3306' # Port de la base sur le serveur distant
|
|
}
|
|
|
|
config['SOURCE_DB'] = {
|
|
'host': 'localhost', # Hôte local pour le tunnel SSH
|
|
'database': 'geosector',
|
|
'user': 'utilisateur_db',
|
|
'password': 'mot_de_passe',
|
|
'port': '13306' # Port local pour le tunnel SSH
|
|
}
|
|
|
|
config['TARGET_DB'] = {
|
|
'host': 'localhost',
|
|
'database': 'geosector_app',
|
|
'user': 'root',
|
|
'password': '',
|
|
'port': '3306'
|
|
}
|
|
|
|
with open(config_path, 'w') as configfile:
|
|
config.write(configfile)
|
|
|
|
print(f"Fichier de configuration créé: {config_path}")
|
|
|
|
return config_path
|
|
|
|
|
|
def get_db_config():
|
|
"""Charge la configuration de la base de données"""
|
|
config_path = create_config_if_not_exists()
|
|
config = configparser.ConfigParser()
|
|
config.read(config_path)
|
|
return config
|
|
|
|
|
|
# Variable globale pour stocker le processus du tunnel SSH
|
|
ssh_tunnel_process = None
|
|
|
|
def create_ssh_tunnel(ssh_config, remote_db_config, source_db_config):
|
|
"""Crée un tunnel SSH vers le serveur distant"""
|
|
global ssh_tunnel_process
|
|
|
|
# Vérifier si un tunnel SSH est déjà en cours d'exécution
|
|
try:
|
|
# Commande pour vérifier si le tunnel est déjà en cours d'exécution
|
|
check_command = f"ps aux | grep 'ssh -f -N -L {source_db_config['port']}:{remote_db_config['host']}:{remote_db_config['port']}' | grep -v grep"
|
|
result = subprocess.run(check_command, shell=True, capture_output=True, text=True)
|
|
|
|
if result.stdout.strip():
|
|
print("Un tunnel SSH est déjà en cours d'exécution")
|
|
return True
|
|
|
|
# Construire la commande SSH pour établir le tunnel
|
|
ssh_command = [
|
|
'ssh',
|
|
'-f', '-N',
|
|
'-L', f"{source_db_config['port']}:{remote_db_config['host']}:{remote_db_config['port']}",
|
|
'-p', ssh_config['port'],
|
|
'-i', ssh_config['key_file'],
|
|
f"{ssh_config['user']}@{ssh_config['host']}"
|
|
]
|
|
|
|
print(f"Création d'un tunnel SSH vers {ssh_config['host']}...")
|
|
ssh_tunnel_process = subprocess.Popen(ssh_command)
|
|
|
|
# Attendre que le tunnel soit établi
|
|
time.sleep(2)
|
|
|
|
# Vérifier si le processus est toujours en cours d'exécution
|
|
if ssh_tunnel_process.poll() is None:
|
|
print(f"Tunnel SSH établi sur le port local {source_db_config['port']}")
|
|
return True
|
|
else:
|
|
print("Erreur lors de la création du tunnel SSH")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"Erreur lors de la création du tunnel SSH: {e}")
|
|
return False
|
|
|
|
def close_ssh_tunnel():
|
|
"""Ferme le tunnel SSH"""
|
|
global ssh_tunnel_process
|
|
|
|
if ssh_tunnel_process is not None:
|
|
try:
|
|
# Tuer le processus SSH
|
|
ssh_tunnel_process.terminate()
|
|
ssh_tunnel_process.wait(timeout=5)
|
|
print("Tunnel SSH fermé")
|
|
except Exception as e:
|
|
print(f"Erreur lors de la fermeture du tunnel SSH: {e}")
|
|
# Forcer la fermeture si nécessaire
|
|
try:
|
|
ssh_tunnel_process.kill()
|
|
except:
|
|
pass
|
|
|
|
# Rechercher et tuer tous les processus SSH correspondants
|
|
try:
|
|
kill_command = "ps aux | grep 'ssh -f -N -L' | grep -v grep | awk '{print $2}' | xargs kill -9 2>/dev/null"
|
|
subprocess.run(kill_command, shell=True)
|
|
except:
|
|
pass
|
|
|
|
def connect_to_db(db_config):
|
|
"""Se connecte à une base de données MySQL/MariaDB"""
|
|
try:
|
|
connection = mysql.connector.connect(
|
|
host=db_config['host'],
|
|
database=db_config['database'],
|
|
user=db_config['user'],
|
|
password=db_config['password'],
|
|
port=int(db_config['port'])
|
|
)
|
|
return connection
|
|
except mysql.connector.Error as err:
|
|
print(f"Erreur de connexion à la base de données: {err}")
|
|
sys.exit(1)
|
|
|
|
|
|
def get_table_schema(connection, table_name):
|
|
"""Récupère le schéma d'une table"""
|
|
cursor = connection.cursor(dictionary=True)
|
|
cursor.execute(f"DESCRIBE {table_name}")
|
|
columns = cursor.fetchall()
|
|
cursor.close()
|
|
return columns
|
|
|
|
|
|
def get_all_tables(connection):
|
|
"""Récupère toutes les tables d'une base de données"""
|
|
cursor = connection.cursor()
|
|
cursor.execute("SHOW TABLES")
|
|
tables = [table[0] for table in cursor.fetchall()]
|
|
cursor.close()
|
|
return tables
|
|
|
|
|
|
def compare_tables(source_schema, target_schema):
|
|
"""Compare les schémas de deux tables"""
|
|
source_columns = {col['Field']: col for col in source_schema}
|
|
target_columns = {col['Field']: col for col in target_schema}
|
|
|
|
# Colonnes présentes dans les deux tables
|
|
common_columns = set(source_columns.keys()) & set(target_columns.keys())
|
|
|
|
# Colonnes uniquement dans la source
|
|
source_only = set(source_columns.keys()) - set(target_columns.keys())
|
|
|
|
# Colonnes uniquement dans la cible
|
|
target_only = set(target_columns.keys()) - set(source_columns.keys())
|
|
|
|
# Différences dans les colonnes communes
|
|
differences = []
|
|
for col_name in common_columns:
|
|
source_col = source_columns[col_name]
|
|
target_col = target_columns[col_name]
|
|
|
|
if source_col['Type'] != target_col['Type'] or \
|
|
source_col['Null'] != target_col['Null'] or \
|
|
source_col['Key'] != target_col['Key'] or \
|
|
source_col['Default'] != target_col['Default']:
|
|
differences.append({
|
|
'Column': col_name,
|
|
'Source_Type': source_col['Type'],
|
|
'Target_Type': target_col['Type'],
|
|
'Source_Null': source_col['Null'],
|
|
'Target_Null': target_col['Null'],
|
|
'Source_Key': source_col['Key'],
|
|
'Target_Key': target_col['Key'],
|
|
'Source_Default': source_col['Default'],
|
|
'Target_Default': target_col['Default']
|
|
})
|
|
|
|
return {
|
|
'common': common_columns,
|
|
'source_only': source_only,
|
|
'target_only': target_only,
|
|
'differences': differences
|
|
}
|
|
|
|
|
|
def generate_report(table_name, comparison, output_file=None):
|
|
"""Génère un rapport de comparaison"""
|
|
report = []
|
|
report.append(f"Rapport de comparaison pour la table: {table_name}")
|
|
report.append(f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
report.append("")
|
|
|
|
# Colonnes communes
|
|
report.append(f"Colonnes communes ({len(comparison['common'])}):")
|
|
if comparison['common']:
|
|
report.append(", ".join(sorted(comparison['common'])))
|
|
else:
|
|
report.append("Aucune")
|
|
report.append("")
|
|
|
|
# Colonnes uniquement dans la source
|
|
report.append(f"Colonnes uniquement dans la source ({len(comparison['source_only'])}):")
|
|
if comparison['source_only']:
|
|
report.append(", ".join(sorted(comparison['source_only'])))
|
|
else:
|
|
report.append("Aucune")
|
|
report.append("")
|
|
|
|
# Colonnes uniquement dans la cible
|
|
report.append(f"Colonnes uniquement dans la cible ({len(comparison['target_only'])}):")
|
|
if comparison['target_only']:
|
|
report.append(", ".join(sorted(comparison['target_only'])))
|
|
else:
|
|
report.append("Aucune")
|
|
report.append("")
|
|
|
|
# Différences dans les colonnes communes
|
|
report.append(f"Différences dans les colonnes communes ({len(comparison['differences'])}):")
|
|
if comparison['differences']:
|
|
headers = ["Colonne", "Type Source", "Type Cible", "Null Source", "Null Cible", "Clé Source", "Clé Cible", "Défaut Source", "Défaut Cible"]
|
|
table_data = []
|
|
for diff in comparison['differences']:
|
|
table_data.append([
|
|
diff['Column'],
|
|
diff['Source_Type'],
|
|
diff['Target_Type'],
|
|
diff['Source_Null'],
|
|
diff['Target_Null'],
|
|
diff['Source_Key'],
|
|
diff['Target_Key'],
|
|
diff['Source_Default'] or 'NULL',
|
|
diff['Target_Default'] or 'NULL'
|
|
])
|
|
report.append(tabulate(table_data, headers=headers, tablefmt="grid"))
|
|
else:
|
|
report.append("Aucune différence")
|
|
|
|
report_text = "\n".join(report)
|
|
|
|
if output_file:
|
|
with open(output_file, 'w') as f:
|
|
f.write(report_text)
|
|
print(f"Rapport enregistré dans: {output_file}")
|
|
|
|
return report_text
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Compare les schémas de tables entre deux bases de données')
|
|
parser.add_argument('table', help='Nom de la table à comparer')
|
|
parser.add_argument('--output', '-o', help='Fichier de sortie pour le rapport')
|
|
parser.add_argument('--no-ssh', action='store_true', help='Ne pas utiliser de tunnel SSH')
|
|
args = parser.parse_args()
|
|
|
|
table_name = args.table
|
|
output_file = args.output
|
|
use_ssh = not args.no_ssh
|
|
|
|
# Créer le dossier de logs si nécessaire
|
|
logs_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs')
|
|
if not os.path.exists(logs_dir):
|
|
os.makedirs(logs_dir)
|
|
|
|
# Si aucun fichier de sortie n'est spécifié, en créer un dans le dossier logs
|
|
if not output_file:
|
|
output_file = os.path.join(logs_dir, f"schema_comparison_{table_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt")
|
|
|
|
# Charger la configuration
|
|
config = get_db_config()
|
|
|
|
# Établir un tunnel SSH si nécessaire
|
|
if use_ssh and 'SSH' in config and 'REMOTE_DB' in config:
|
|
if not create_ssh_tunnel(config['SSH'], config['REMOTE_DB'], config['SOURCE_DB']):
|
|
print("Impossible d'établir le tunnel SSH. Abandon.")
|
|
sys.exit(1)
|
|
|
|
# Se connecter aux bases de données
|
|
source_conn = connect_to_db(config['SOURCE_DB'])
|
|
target_conn = connect_to_db(config['TARGET_DB'])
|
|
|
|
# Vérifier si la table existe dans les deux bases
|
|
source_tables = get_all_tables(source_conn)
|
|
target_tables = get_all_tables(target_conn)
|
|
|
|
if table_name not in source_tables:
|
|
print(f"Erreur: La table '{table_name}' n'existe pas dans la base source.")
|
|
sys.exit(1)
|
|
|
|
if table_name not in target_tables:
|
|
print(f"Avertissement: La table '{table_name}' n'existe pas dans la base cible.")
|
|
print("Voulez-vous voir uniquement le schéma de la table source? (o/n)")
|
|
response = input().lower()
|
|
if response != 'o':
|
|
sys.exit(0)
|
|
|
|
# Afficher uniquement le schéma de la table source
|
|
source_schema = get_table_schema(source_conn, table_name)
|
|
print(f"\nSchéma de la table '{table_name}' dans la base source:")
|
|
headers = ["Champ", "Type", "Null", "Clé", "Défaut", "Extra"]
|
|
table_data = [[col['Field'], col['Type'], col['Null'], col['Key'], col['Default'] or 'NULL', col['Extra']] for col in source_schema]
|
|
print(tabulate(table_data, headers=headers, tablefmt="grid"))
|
|
|
|
# Enregistrer le schéma dans un fichier
|
|
with open(output_file, 'w') as f:
|
|
f.write(f"Schéma de la table '{table_name}' dans la base source:\n")
|
|
f.write(tabulate(table_data, headers=headers, tablefmt="grid"))
|
|
|
|
print(f"Schéma enregistré dans: {output_file}")
|
|
sys.exit(0)
|
|
|
|
# Récupérer les schémas des tables
|
|
source_schema = get_table_schema(source_conn, table_name)
|
|
target_schema = get_table_schema(target_conn, table_name)
|
|
|
|
# Comparer les schémas
|
|
comparison = compare_tables(source_schema, target_schema)
|
|
|
|
# Générer et afficher le rapport
|
|
report = generate_report(table_name, comparison, output_file)
|
|
print(report)
|
|
|
|
# Fermer les connexions
|
|
source_conn.close()
|
|
target_conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
# Configurer le gestionnaire de signal pour fermer proprement le tunnel SSH
|
|
signal.signal(signal.SIGINT, lambda sig, frame: (close_ssh_tunnel(), sys.exit(0)))
|
|
signal.signal(signal.SIGTERM, lambda sig, frame: (close_ssh_tunnel(), sys.exit(0)))
|
|
|
|
main()
|
|
finally:
|
|
# Fermer le tunnel SSH à la fin du script
|
|
close_ssh_tunnel()
|