Files
sogoms/cmd/sogoms/db/main.go
Pierre a4694a10d1 SOGOMS v1.0.1 - Microservices logs, smtp et roadmap
Nouveaux services:
- sogoms-logs : logging centralisé avec rotation
- sogoms-smtp : envoi emails avec templates YAML

Nouvelles fonctionnalités:
- Queries YAML externalisées (config/queries/{app}/)
- CRUD générique paramétrable
- Filtres par rôle (default, admin)
- Templates email (config/emails/{app}/)

Documentation:
- DOCTECH.md : documentation technique complète
- README.md : vision et roadmap
- TODO.md : phases 11-15 planifiées

Roadmap:
- Phase 11: sogoms-crypt (chiffrement)
- Phase 12: sogoms-imap/mailproc (emails)
- Phase 13: sogoms-cron (tâches planifiées)
- Phase 14: sogoms-push (MQTT temps réel)
- Phase 15: sogoms-schema (API auto-générée)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-16 14:58:46 +01:00

437 lines
11 KiB
Go
Executable File

// sogoms-db : Microservice d'accès à MariaDB.
// Chaque application cliente a sa propre base de données.
package main
import (
"context"
"database/sql"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
_ "github.com/go-sql-driver/mysql"
"sogoms.com/internal/config"
"sogoms.com/internal/protocol"
)
var (
socketPath = flag.String("socket", "/run/sogoms-db.1.sock", "Unix socket path")
configDir = flag.String("config", "/config", "Configuration directory")
logsSocket = flag.String("logs-socket", "/run/sogoms-logs.1.sock", "sogoms-logs socket path")
)
var logsPool *protocol.Pool
// logError envoie une erreur à sogoms-logs (fire and forget).
func logError(appID, level, message string, ctx map[string]any) {
if logsPool == nil {
return
}
go func() {
req := protocol.NewRequest("log_error", map[string]any{
"app_id": appID,
"level": level,
"message": message,
"context": ctx,
})
reqCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
logsPool.Call(reqCtx, req)
}()
}
// DBPool gère les connexions DB par application.
type DBPool struct {
registry *config.Registry
pools map[string]*sql.DB
mu sync.RWMutex
}
func NewDBPool(registry *config.Registry) *DBPool {
return &DBPool{
registry: registry,
pools: make(map[string]*sql.DB),
}
}
// GetDB retourne une connexion DB pour l'application spécifiée.
func (p *DBPool) GetDB(appID string) (*sql.DB, error) {
p.mu.RLock()
db, ok := p.pools[appID]
p.mu.RUnlock()
if ok {
return db, nil
}
// Créer une nouvelle connexion
p.mu.Lock()
defer p.mu.Unlock()
// Double-check après le lock
if db, ok := p.pools[appID]; ok {
return db, nil
}
cfg, ok := p.registry.GetByApp(appID)
if !ok {
return nil, fmt.Errorf("unknown app: %s", appID)
}
db, err := sql.Open("mysql", cfg.Database.DSN())
if err != nil {
return nil, fmt.Errorf("open db: %w", err)
}
// Configuration du pool
db.SetMaxOpenConns(10)
db.SetMaxIdleConns(5)
// Test de connexion
if err := db.Ping(); err != nil {
db.Close()
return nil, fmt.Errorf("ping db: %w", err)
}
p.pools[appID] = db
log.Printf("[db] connected to database for app: %s", appID)
return db, nil
}
// Close ferme toutes les connexions.
func (p *DBPool) Close() {
p.mu.Lock()
defer p.mu.Unlock()
for appID, db := range p.pools {
db.Close()
log.Printf("[db] closed connection for app: %s", appID)
}
}
func main() {
flag.Parse()
log.SetFlags(log.Ltime | log.Lshortfile)
// Charger les configurations
registry := config.NewRegistry(*configDir)
if err := registry.Load(); err != nil {
log.Fatalf("load config: %v", err)
}
log.Printf("[db] loaded apps: %v", registry.Apps())
// Pool de connexions DB
dbPool := NewDBPool(registry)
defer dbPool.Close()
// Pool de connexions vers sogoms-logs
logsPool = protocol.NewPool(*logsSocket, 3)
defer logsPool.Close()
// Handler des requêtes
handler := func(ctx context.Context, req *protocol.Request) *protocol.Response {
return handleRequest(ctx, req, dbPool)
}
// Démarrer le serveur
server := protocol.NewServer(*socketPath, handler)
if err := server.Start(); err != nil {
log.Fatalf("start server: %v", err)
}
log.Printf("[db] sogoms-db started on %s", *socketPath)
// Attendre signal d'arrêt
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Printf("[db] shutting down...")
server.Stop()
}
func handleRequest(ctx context.Context, req *protocol.Request, dbPool *DBPool) *protocol.Response {
// L'app_id doit être fourni
appID, ok := req.Params["app_id"].(string)
if !ok || appID == "" {
return protocol.Failure(req.ID, "MISSING_APP_ID", "app_id is required")
}
db, err := dbPool.GetDB(appID)
if err != nil {
return protocol.Failure(req.ID, "DB_ERROR", err.Error())
}
switch req.Action {
case "query":
return handleQuery(req, db, appID)
case "query_one":
return handleQueryOne(req, db, appID)
case "insert":
return handleInsert(req, db, appID)
case "update":
return handleUpdate(req, db, appID)
case "delete":
return handleDelete(req, db, appID)
case "health":
return handleHealth(req, db)
default:
return protocol.Failure(req.ID, "UNKNOWN_ACTION", "unknown action: "+req.Action)
}
}
// handleQuery exécute un SELECT et retourne plusieurs lignes.
func handleQuery(req *protocol.Request, db *sql.DB, appID string) *protocol.Response {
query, args, err := extractQueryParams(req.Params)
if err != nil {
return protocol.Failure(req.ID, "INVALID_PARAMS", err.Error())
}
rows, err := db.Query(query, args...)
if err != nil {
logError(appID, "error", "query_failed", map[string]any{"query": query, "error": err.Error()})
return protocol.Failure(req.ID, "QUERY_ERROR", err.Error())
}
defer rows.Close()
results, err := scanRows(rows)
if err != nil {
logError(appID, "error", "scan_failed", map[string]any{"query": query, "error": err.Error()})
return protocol.Failure(req.ID, "SCAN_ERROR", err.Error())
}
return protocol.Success(req.ID, results)
}
// handleQueryOne exécute un SELECT et retourne une seule ligne.
func handleQueryOne(req *protocol.Request, db *sql.DB, appID string) *protocol.Response {
query, args, err := extractQueryParams(req.Params)
if err != nil {
return protocol.Failure(req.ID, "INVALID_PARAMS", err.Error())
}
rows, err := db.Query(query, args...)
if err != nil {
logError(appID, "error", "query_failed", map[string]any{"query": query, "error": err.Error()})
return protocol.Failure(req.ID, "QUERY_ERROR", err.Error())
}
defer rows.Close()
results, err := scanRows(rows)
if err != nil {
logError(appID, "error", "scan_failed", map[string]any{"query": query, "error": err.Error()})
return protocol.Failure(req.ID, "SCAN_ERROR", err.Error())
}
if len(results) == 0 {
return protocol.Failure(req.ID, "NOT_FOUND", "no rows found")
}
return protocol.Success(req.ID, results[0])
}
// handleInsert exécute un INSERT et retourne l'ID inséré.
func handleInsert(req *protocol.Request, db *sql.DB, appID string) *protocol.Response {
table, ok := req.Params["table"].(string)
if !ok {
return protocol.Failure(req.ID, "INVALID_PARAMS", "table is required")
}
data, ok := req.Params["data"].(map[string]any)
if !ok {
return protocol.Failure(req.ID, "INVALID_PARAMS", "data is required")
}
// Construire la requête INSERT
columns := make([]string, 0, len(data))
placeholders := make([]string, 0, len(data))
values := make([]any, 0, len(data))
for col, val := range data {
columns = append(columns, col)
placeholders = append(placeholders, "?")
values = append(values, val)
}
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
table,
strings.Join(columns, ", "),
strings.Join(placeholders, ", "))
result, err := db.Exec(query, values...)
if err != nil {
logError(appID, "error", "insert_failed", map[string]any{"table": table, "error": err.Error()})
return protocol.Failure(req.ID, "INSERT_ERROR", err.Error())
}
insertID, _ := result.LastInsertId()
return protocol.Success(req.ID, map[string]any{
"insert_id": insertID,
})
}
// handleUpdate exécute un UPDATE et retourne le nombre de lignes affectées.
func handleUpdate(req *protocol.Request, db *sql.DB, appID string) *protocol.Response {
table, ok := req.Params["table"].(string)
if !ok {
return protocol.Failure(req.ID, "INVALID_PARAMS", "table is required")
}
data, ok := req.Params["data"].(map[string]any)
if !ok {
return protocol.Failure(req.ID, "INVALID_PARAMS", "data is required")
}
where, ok := req.Params["where"].(map[string]any)
if !ok {
return protocol.Failure(req.ID, "INVALID_PARAMS", "where is required")
}
// Construire SET
setClauses := make([]string, 0, len(data))
values := make([]any, 0, len(data)+len(where))
for col, val := range data {
setClauses = append(setClauses, col+" = ?")
values = append(values, val)
}
// Construire WHERE
whereClauses := make([]string, 0, len(where))
for col, val := range where {
whereClauses = append(whereClauses, col+" = ?")
values = append(values, val)
}
query := fmt.Sprintf("UPDATE %s SET %s WHERE %s",
table,
strings.Join(setClauses, ", "),
strings.Join(whereClauses, " AND "))
result, err := db.Exec(query, values...)
if err != nil {
logError(appID, "error", "update_failed", map[string]any{"table": table, "error": err.Error()})
return protocol.Failure(req.ID, "UPDATE_ERROR", err.Error())
}
affected, _ := result.RowsAffected()
return protocol.Success(req.ID, map[string]any{
"affected_rows": affected,
})
}
// handleDelete exécute un DELETE et retourne le nombre de lignes affectées.
func handleDelete(req *protocol.Request, db *sql.DB, appID string) *protocol.Response {
table, ok := req.Params["table"].(string)
if !ok {
return protocol.Failure(req.ID, "INVALID_PARAMS", "table is required")
}
where, ok := req.Params["where"].(map[string]any)
if !ok {
return protocol.Failure(req.ID, "INVALID_PARAMS", "where is required")
}
// Construire WHERE
whereClauses := make([]string, 0, len(where))
values := make([]any, 0, len(where))
for col, val := range where {
whereClauses = append(whereClauses, col+" = ?")
values = append(values, val)
}
query := fmt.Sprintf("DELETE FROM %s WHERE %s",
table,
strings.Join(whereClauses, " AND "))
result, err := db.Exec(query, values...)
if err != nil {
logError(appID, "error", "delete_failed", map[string]any{"table": table, "error": err.Error()})
return protocol.Failure(req.ID, "DELETE_ERROR", err.Error())
}
affected, _ := result.RowsAffected()
return protocol.Success(req.ID, map[string]any{
"affected_rows": affected,
})
}
// handleHealth vérifie la connexion à la DB.
func handleHealth(req *protocol.Request, db *sql.DB) *protocol.Response {
if err := db.Ping(); err != nil {
return protocol.Failure(req.ID, "UNHEALTHY", err.Error())
}
return protocol.Success(req.ID, map[string]any{"status": "ok"})
}
// extractQueryParams extrait query et args des paramètres.
func extractQueryParams(params map[string]any) (string, []any, error) {
query, ok := params["query"].(string)
if !ok {
return "", nil, fmt.Errorf("query is required")
}
var args []any
if argsRaw, ok := params["args"]; ok {
switch v := argsRaw.(type) {
case []any:
args = v
default:
// Essayer de convertir via JSON
data, _ := json.Marshal(argsRaw)
json.Unmarshal(data, &args)
}
}
return query, args, nil
}
// scanRows convertit les résultats SQL en slice de maps.
func scanRows(rows *sql.Rows) ([]map[string]any, error) {
columns, err := rows.Columns()
if err != nil {
return nil, err
}
var results []map[string]any
for rows.Next() {
// Créer des pointeurs pour scanner
values := make([]any, len(columns))
valuePtrs := make([]any, len(columns))
for i := range values {
valuePtrs[i] = &values[i]
}
if err := rows.Scan(valuePtrs...); err != nil {
return nil, err
}
// Construire la map
row := make(map[string]any)
for i, col := range columns {
val := values[i]
// Convertir []byte en string
if b, ok := val.([]byte); ok {
row[col] = string(b)
} else {
row[col] = val
}
}
results = append(results, row)
}
if results == nil {
results = []map[string]any{}
}
return results, rows.Err()
}