Ajout du dossier api avec la géolocalisation automatique des casernes de pompiers

This commit is contained in:
d6soft
2025-05-16 21:03:04 +02:00
parent 69dcff42f8
commit f4f7882963
143 changed files with 24329 additions and 1 deletions

View File

@@ -0,0 +1,371 @@
#!/usr/bin/env python3
"""
Script pour comparer les schémas de tables entre deux bases de données
Utile pour vérifier la compatibilité avant migration
"""
import argparse
import configparser
import os
import sys
import time
import signal
import subprocess
import mysql.connector
from datetime import datetime
from tabulate import tabulate
def create_config_if_not_exists():
"""Crée un fichier de configuration s'il n'existe pas déjà"""
config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'python', 'db_config.ini')
if not os.path.exists(os.path.dirname(config_path)):
os.makedirs(os.path.dirname(config_path))
if not os.path.exists(config_path):
config = configparser.ConfigParser()
config['SSH'] = {
'host': 'serveur-distant.exemple.com',
'port': '22',
'user': 'utilisateur',
'key_file': '/chemin/vers/cle_ssh'
}
config['REMOTE_DB'] = {
'host': 'localhost', # Hôte de la base sur le serveur distant
'port': '3306' # Port de la base sur le serveur distant
}
config['SOURCE_DB'] = {
'host': 'localhost', # Hôte local pour le tunnel SSH
'database': 'geosector',
'user': 'utilisateur_db',
'password': 'mot_de_passe',
'port': '13306' # Port local pour le tunnel SSH
}
config['TARGET_DB'] = {
'host': 'localhost',
'database': 'geosector_app',
'user': 'root',
'password': '',
'port': '3306'
}
with open(config_path, 'w') as configfile:
config.write(configfile)
print(f"Fichier de configuration créé: {config_path}")
return config_path
def get_db_config():
"""Charge la configuration de la base de données"""
config_path = create_config_if_not_exists()
config = configparser.ConfigParser()
config.read(config_path)
return config
# Variable globale pour stocker le processus du tunnel SSH
ssh_tunnel_process = None
def create_ssh_tunnel(ssh_config, remote_db_config, source_db_config):
"""Crée un tunnel SSH vers le serveur distant"""
global ssh_tunnel_process
# Vérifier si un tunnel SSH est déjà en cours d'exécution
try:
# Commande pour vérifier si le tunnel est déjà en cours d'exécution
check_command = f"ps aux | grep 'ssh -f -N -L {source_db_config['port']}:{remote_db_config['host']}:{remote_db_config['port']}' | grep -v grep"
result = subprocess.run(check_command, shell=True, capture_output=True, text=True)
if result.stdout.strip():
print("Un tunnel SSH est déjà en cours d'exécution")
return True
# Construire la commande SSH pour établir le tunnel
ssh_command = [
'ssh',
'-f', '-N',
'-L', f"{source_db_config['port']}:{remote_db_config['host']}:{remote_db_config['port']}",
'-p', ssh_config['port'],
'-i', ssh_config['key_file'],
f"{ssh_config['user']}@{ssh_config['host']}"
]
print(f"Création d'un tunnel SSH vers {ssh_config['host']}...")
ssh_tunnel_process = subprocess.Popen(ssh_command)
# Attendre que le tunnel soit établi
time.sleep(2)
# Vérifier si le processus est toujours en cours d'exécution
if ssh_tunnel_process.poll() is None:
print(f"Tunnel SSH établi sur le port local {source_db_config['port']}")
return True
else:
print("Erreur lors de la création du tunnel SSH")
return False
except Exception as e:
print(f"Erreur lors de la création du tunnel SSH: {e}")
return False
def close_ssh_tunnel():
"""Ferme le tunnel SSH"""
global ssh_tunnel_process
if ssh_tunnel_process is not None:
try:
# Tuer le processus SSH
ssh_tunnel_process.terminate()
ssh_tunnel_process.wait(timeout=5)
print("Tunnel SSH fermé")
except Exception as e:
print(f"Erreur lors de la fermeture du tunnel SSH: {e}")
# Forcer la fermeture si nécessaire
try:
ssh_tunnel_process.kill()
except:
pass
# Rechercher et tuer tous les processus SSH correspondants
try:
kill_command = "ps aux | grep 'ssh -f -N -L' | grep -v grep | awk '{print $2}' | xargs kill -9 2>/dev/null"
subprocess.run(kill_command, shell=True)
except:
pass
def connect_to_db(db_config):
"""Se connecte à une base de données MySQL/MariaDB"""
try:
connection = mysql.connector.connect(
host=db_config['host'],
database=db_config['database'],
user=db_config['user'],
password=db_config['password'],
port=int(db_config['port'])
)
return connection
except mysql.connector.Error as err:
print(f"Erreur de connexion à la base de données: {err}")
sys.exit(1)
def get_table_schema(connection, table_name):
"""Récupère le schéma d'une table"""
cursor = connection.cursor(dictionary=True)
cursor.execute(f"DESCRIBE {table_name}")
columns = cursor.fetchall()
cursor.close()
return columns
def get_all_tables(connection):
"""Récupère toutes les tables d'une base de données"""
cursor = connection.cursor()
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
cursor.close()
return tables
def compare_tables(source_schema, target_schema):
"""Compare les schémas de deux tables"""
source_columns = {col['Field']: col for col in source_schema}
target_columns = {col['Field']: col for col in target_schema}
# Colonnes présentes dans les deux tables
common_columns = set(source_columns.keys()) & set(target_columns.keys())
# Colonnes uniquement dans la source
source_only = set(source_columns.keys()) - set(target_columns.keys())
# Colonnes uniquement dans la cible
target_only = set(target_columns.keys()) - set(source_columns.keys())
# Différences dans les colonnes communes
differences = []
for col_name in common_columns:
source_col = source_columns[col_name]
target_col = target_columns[col_name]
if source_col['Type'] != target_col['Type'] or \
source_col['Null'] != target_col['Null'] or \
source_col['Key'] != target_col['Key'] or \
source_col['Default'] != target_col['Default']:
differences.append({
'Column': col_name,
'Source_Type': source_col['Type'],
'Target_Type': target_col['Type'],
'Source_Null': source_col['Null'],
'Target_Null': target_col['Null'],
'Source_Key': source_col['Key'],
'Target_Key': target_col['Key'],
'Source_Default': source_col['Default'],
'Target_Default': target_col['Default']
})
return {
'common': common_columns,
'source_only': source_only,
'target_only': target_only,
'differences': differences
}
def generate_report(table_name, comparison, output_file=None):
"""Génère un rapport de comparaison"""
report = []
report.append(f"Rapport de comparaison pour la table: {table_name}")
report.append(f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append("")
# Colonnes communes
report.append(f"Colonnes communes ({len(comparison['common'])}):")
if comparison['common']:
report.append(", ".join(sorted(comparison['common'])))
else:
report.append("Aucune")
report.append("")
# Colonnes uniquement dans la source
report.append(f"Colonnes uniquement dans la source ({len(comparison['source_only'])}):")
if comparison['source_only']:
report.append(", ".join(sorted(comparison['source_only'])))
else:
report.append("Aucune")
report.append("")
# Colonnes uniquement dans la cible
report.append(f"Colonnes uniquement dans la cible ({len(comparison['target_only'])}):")
if comparison['target_only']:
report.append(", ".join(sorted(comparison['target_only'])))
else:
report.append("Aucune")
report.append("")
# Différences dans les colonnes communes
report.append(f"Différences dans les colonnes communes ({len(comparison['differences'])}):")
if comparison['differences']:
headers = ["Colonne", "Type Source", "Type Cible", "Null Source", "Null Cible", "Clé Source", "Clé Cible", "Défaut Source", "Défaut Cible"]
table_data = []
for diff in comparison['differences']:
table_data.append([
diff['Column'],
diff['Source_Type'],
diff['Target_Type'],
diff['Source_Null'],
diff['Target_Null'],
diff['Source_Key'],
diff['Target_Key'],
diff['Source_Default'] or 'NULL',
diff['Target_Default'] or 'NULL'
])
report.append(tabulate(table_data, headers=headers, tablefmt="grid"))
else:
report.append("Aucune différence")
report_text = "\n".join(report)
if output_file:
with open(output_file, 'w') as f:
f.write(report_text)
print(f"Rapport enregistré dans: {output_file}")
return report_text
def main():
parser = argparse.ArgumentParser(description='Compare les schémas de tables entre deux bases de données')
parser.add_argument('table', help='Nom de la table à comparer')
parser.add_argument('--output', '-o', help='Fichier de sortie pour le rapport')
parser.add_argument('--no-ssh', action='store_true', help='Ne pas utiliser de tunnel SSH')
args = parser.parse_args()
table_name = args.table
output_file = args.output
use_ssh = not args.no_ssh
# Créer le dossier de logs si nécessaire
logs_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs')
if not os.path.exists(logs_dir):
os.makedirs(logs_dir)
# Si aucun fichier de sortie n'est spécifié, en créer un dans le dossier logs
if not output_file:
output_file = os.path.join(logs_dir, f"schema_comparison_{table_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt")
# Charger la configuration
config = get_db_config()
# Établir un tunnel SSH si nécessaire
if use_ssh and 'SSH' in config and 'REMOTE_DB' in config:
if not create_ssh_tunnel(config['SSH'], config['REMOTE_DB'], config['SOURCE_DB']):
print("Impossible d'établir le tunnel SSH. Abandon.")
sys.exit(1)
# Se connecter aux bases de données
source_conn = connect_to_db(config['SOURCE_DB'])
target_conn = connect_to_db(config['TARGET_DB'])
# Vérifier si la table existe dans les deux bases
source_tables = get_all_tables(source_conn)
target_tables = get_all_tables(target_conn)
if table_name not in source_tables:
print(f"Erreur: La table '{table_name}' n'existe pas dans la base source.")
sys.exit(1)
if table_name not in target_tables:
print(f"Avertissement: La table '{table_name}' n'existe pas dans la base cible.")
print("Voulez-vous voir uniquement le schéma de la table source? (o/n)")
response = input().lower()
if response != 'o':
sys.exit(0)
# Afficher uniquement le schéma de la table source
source_schema = get_table_schema(source_conn, table_name)
print(f"\nSchéma de la table '{table_name}' dans la base source:")
headers = ["Champ", "Type", "Null", "Clé", "Défaut", "Extra"]
table_data = [[col['Field'], col['Type'], col['Null'], col['Key'], col['Default'] or 'NULL', col['Extra']] for col in source_schema]
print(tabulate(table_data, headers=headers, tablefmt="grid"))
# Enregistrer le schéma dans un fichier
with open(output_file, 'w') as f:
f.write(f"Schéma de la table '{table_name}' dans la base source:\n")
f.write(tabulate(table_data, headers=headers, tablefmt="grid"))
print(f"Schéma enregistré dans: {output_file}")
sys.exit(0)
# Récupérer les schémas des tables
source_schema = get_table_schema(source_conn, table_name)
target_schema = get_table_schema(target_conn, table_name)
# Comparer les schémas
comparison = compare_tables(source_schema, target_schema)
# Générer et afficher le rapport
report = generate_report(table_name, comparison, output_file)
print(report)
# Fermer les connexions
source_conn.close()
target_conn.close()
if __name__ == "__main__":
try:
# Configurer le gestionnaire de signal pour fermer proprement le tunnel SSH
signal.signal(signal.SIGINT, lambda sig, frame: (close_ssh_tunnel(), sys.exit(0)))
signal.signal(signal.SIGTERM, lambda sig, frame: (close_ssh_tunnel(), sys.exit(0)))
main()
finally:
# Fermer le tunnel SSH à la fin du script
close_ssh_tunnel()