Files
sogoms/cmd/sogoms/cron/main.go
Pierre 65da4efdad SOGOMS v1.0.3 - Admin UI, Cron, Config reload
Phase 13 : sogoms-cron
- Jobs planifiés avec schedule cron standard
- Types: query_email, http, service
- Actions: list, trigger, status

Phase 16 : Réorganisation config/apps/{app}/
- Tous les fichiers d'une app dans un seul dossier
- Migration prokov vers nouvelle structure

Phase 17 : sogoms-admin
- Interface web d'administration (Go templates + htmx)
- Auth sessions cookies signées HMAC-SHA256
- Rôles super_admin / app_admin avec permissions

Phase 19 : Création d'app via Admin UI
- Formulaire création app avec config DB/auth
- Bouton "Scanner la base" : introspection + schema.yaml
- Rechargement automatique sogoway via SIGHUP

Infrastructure :
- sogoctl : socket de contrôle /run/sogoctl.sock
- sogoway : reload config sur SIGHUP sans restart

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-19 20:30:56 +01:00

744 lines
18 KiB
Go

// sogoms-cron : Microservice de tâches planifiées.
// Exécute des jobs périodiques définis dans config/apps/{app}/cron.yaml.
package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"
"gopkg.in/yaml.v3"
"sogoms.com/internal/config"
"sogoms.com/internal/cron"
"sogoms.com/internal/protocol"
)
var (
socketPath = flag.String("socket", "/run/sogoms-cron.1.sock", "Unix socket path")
configDir = flag.String("config", "/config", "Configuration directory")
dbSocket = flag.String("db-socket", "/run/sogoms-db.1.sock", "DB service socket")
smtpSocket = flag.String("smtp-socket", "/run/sogoms-smtp.1.sock", "SMTP service socket")
logsSocket = flag.String("logs-socket", "/run/sogoms-logs.1.sock", "Logs service socket")
)
// CronConfig représente la configuration cron d'une application.
type CronConfig struct {
Timezone string `yaml:"timezone"`
Retry RetryConfig `yaml:"retry"`
HistoryDays int `yaml:"history_days"`
Jobs map[string]*JobConfig `yaml:"jobs"`
location *time.Location
}
// RetryConfig configure les tentatives en cas d'échec.
type RetryConfig struct {
MaxAttempts int `yaml:"max_attempts"`
Delay string `yaml:"delay"`
delayDur time.Duration
}
// JobConfig représente un job planifié.
type JobConfig struct {
Schedule string `yaml:"schedule"`
Type string `yaml:"type"` // query_email, http, service
Enabled bool `yaml:"enabled"`
// Pour query_email
Query string `yaml:"query"`
GroupBy string `yaml:"group_by"`
Template string `yaml:"template"`
// Pour http
Method string `yaml:"method"`
URL string `yaml:"url"`
Headers map[string]string `yaml:"headers"`
Body string `yaml:"body"`
// Pour service
Service string `yaml:"service"`
Action string `yaml:"action"`
Params map[string]any `yaml:"params"`
// Runtime
schedule *cron.Schedule
nextRun time.Time
}
// JobExecution représente une exécution de job (historique).
type JobExecution struct {
JobName string `json:"job_name"`
AppID string `json:"app_id"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Success bool `json:"success"`
Attempt int `json:"attempt"`
Result string `json:"result"`
Error string `json:"error,omitempty"`
}
// CronManager gère les jobs cron pour toutes les applications.
type CronManager struct {
registry *config.Registry
configDir string
configs map[string]*CronConfig // appID -> config
executions []*JobExecution
historyDays int
dbPool *protocol.Pool
smtpPool *protocol.Pool
logsPool *protocol.Pool
stopCh chan struct{}
mu sync.RWMutex
}
// NewCronManager crée un nouveau gestionnaire cron.
func NewCronManager(registry *config.Registry, configDir string, dbPool, smtpPool, logsPool *protocol.Pool) *CronManager {
return &CronManager{
registry: registry,
configDir: configDir,
configs: make(map[string]*CronConfig),
executions: make([]*JobExecution, 0),
historyDays: 7,
dbPool: dbPool,
smtpPool: smtpPool,
logsPool: logsPool,
stopCh: make(chan struct{}),
}
}
// Load charge les configurations cron pour toutes les applications.
func (m *CronManager) Load() error {
m.mu.Lock()
defer m.mu.Unlock()
for _, appID := range m.registry.Apps() {
cronPath := filepath.Join(m.configDir, "apps", appID, "cron.yaml")
if _, err := os.Stat(cronPath); os.IsNotExist(err) {
continue // Pas de config cron pour cette app
}
cfg, err := m.loadCronConfig(cronPath)
if err != nil {
log.Printf("[cron] warning: cannot load %s: %v", appID, err)
continue
}
// Parser les schedules des jobs
for name, job := range cfg.Jobs {
if !job.Enabled {
continue
}
sched, err := cron.ParseSchedule(job.Schedule, cfg.location)
if err != nil {
log.Printf("[cron] warning: %s/%s invalid schedule: %v", appID, name, err)
job.Enabled = false
continue
}
job.schedule = sched
job.nextRun = sched.Next(time.Now())
}
m.configs[appID] = cfg
log.Printf("[cron] loaded %s: %d jobs", appID, len(cfg.Jobs))
}
return nil
}
// loadCronConfig charge une configuration cron depuis un fichier YAML.
func (m *CronManager) loadCronConfig(path string) (*CronConfig, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var cfg CronConfig
if err := yaml.Unmarshal(data, &cfg); err != nil {
return nil, err
}
// Timezone par défaut
if cfg.Timezone == "" {
cfg.Timezone = "UTC"
}
loc, err := time.LoadLocation(cfg.Timezone)
if err != nil {
return nil, fmt.Errorf("invalid timezone %s: %w", cfg.Timezone, err)
}
cfg.location = loc
// Retry par défaut
if cfg.Retry.MaxAttempts == 0 {
cfg.Retry.MaxAttempts = 3
}
if cfg.Retry.Delay == "" {
cfg.Retry.Delay = "5m"
}
cfg.Retry.delayDur, err = time.ParseDuration(cfg.Retry.Delay)
if err != nil {
cfg.Retry.delayDur = 5 * time.Minute
}
// History par défaut
if cfg.HistoryDays == 0 {
cfg.HistoryDays = 7
}
if cfg.HistoryDays > m.historyDays {
m.historyDays = cfg.HistoryDays
}
return &cfg, nil
}
// Start démarre le scheduler.
func (m *CronManager) Start() {
go m.run()
log.Printf("[cron] scheduler started")
}
// Stop arrête le scheduler.
func (m *CronManager) Stop() {
close(m.stopCh)
}
// run est la boucle principale du scheduler.
func (m *CronManager) run() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
// Vérification initiale
m.checkJobs()
for {
select {
case <-ticker.C:
m.checkJobs()
m.cleanHistory()
case <-m.stopCh:
return
}
}
}
// checkJobs vérifie et exécute les jobs dont l'heure est passée.
func (m *CronManager) checkJobs() {
m.mu.RLock()
defer m.mu.RUnlock()
now := time.Now()
for appID, cfg := range m.configs {
for jobName, job := range cfg.Jobs {
if !job.Enabled || job.schedule == nil {
continue
}
if now.After(job.nextRun) || now.Equal(job.nextRun) {
// Exécuter le job
go m.executeJob(appID, jobName, job, cfg)
// Calculer le prochain run
job.nextRun = job.schedule.Next(now)
}
}
}
}
// executeJob exécute un job avec retry.
func (m *CronManager) executeJob(appID, jobName string, job *JobConfig, cfg *CronConfig) {
var lastErr error
var result string
for attempt := 1; attempt <= cfg.Retry.MaxAttempts; attempt++ {
exec := &JobExecution{
JobName: jobName,
AppID: appID,
StartTime: time.Now(),
Attempt: attempt,
}
result, lastErr = m.runJob(appID, jobName, job)
exec.EndTime = time.Now()
exec.Success = lastErr == nil
exec.Result = result
if lastErr != nil {
exec.Error = lastErr.Error()
}
m.addExecution(exec)
if lastErr == nil {
m.logEvent(appID, "job_success", map[string]any{
"job": jobName,
"attempt": attempt,
"result": result,
})
return
}
m.logEvent(appID, "job_failed", map[string]any{
"job": jobName,
"attempt": attempt,
"error": lastErr.Error(),
})
if attempt < cfg.Retry.MaxAttempts {
time.Sleep(cfg.Retry.delayDur)
}
}
// Échec après tous les retries
m.logEvent(appID, "job_exhausted", map[string]any{
"job": jobName,
"attempts": cfg.Retry.MaxAttempts,
"error": lastErr.Error(),
})
}
// runJob exécute un job selon son type.
func (m *CronManager) runJob(appID, jobName string, job *JobConfig) (string, error) {
switch job.Type {
case "query_email":
return m.runQueryEmail(appID, job)
case "http":
return m.runHTTP(job)
case "service":
return m.runService(appID, job)
default:
return "", fmt.Errorf("unknown job type: %s", job.Type)
}
}
// runQueryEmail exécute une requête DB et envoie des emails groupés.
func (m *CronManager) runQueryEmail(appID string, job *JobConfig) (string, error) {
if m.dbPool == nil {
return "", fmt.Errorf("db service not available")
}
if m.smtpPool == nil {
return "", fmt.Errorf("smtp service not available")
}
// Exécuter la requête
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
req := protocol.NewRequest("query", map[string]any{
"app_id": appID,
"query": job.Query,
"args": []any{},
})
resp, err := m.dbPool.Call(ctx, req)
if err != nil {
return "", fmt.Errorf("db query: %w", err)
}
if resp.Status != "success" {
return "", fmt.Errorf("db query failed: %s", resp.Error.Message)
}
// Extraire les résultats
resultMap, ok := resp.Result.(map[string]any)
if !ok {
return "", fmt.Errorf("invalid result format")
}
rows, ok := resultMap["rows"].([]any)
if !ok || len(rows) == 0 {
return "no data", nil
}
// Grouper par user si demandé
grouped := m.groupRows(rows, job.GroupBy)
// Envoyer un email par groupe
sent := 0
for key, groupRows := range grouped {
if err := m.sendGroupEmail(appID, job, key, groupRows); err != nil {
log.Printf("[cron] %s: email error for %s: %v", appID, key, err)
continue
}
sent++
}
return fmt.Sprintf("sent %d emails", sent), nil
}
// groupRows groupe les lignes par une clé.
func (m *CronManager) groupRows(rows []any, groupBy string) map[string][]map[string]any {
grouped := make(map[string][]map[string]any)
for _, row := range rows {
rowMap, ok := row.(map[string]any)
if !ok {
continue
}
key := "default"
if groupBy != "" {
if v, ok := rowMap[groupBy]; ok {
key = fmt.Sprintf("%v", v)
}
}
grouped[key] = append(grouped[key], rowMap)
}
return grouped
}
// sendGroupEmail envoie un email pour un groupe de lignes.
func (m *CronManager) sendGroupEmail(appID string, job *JobConfig, key string, rows []map[string]any) error {
if len(rows) == 0 {
return nil
}
// Extraire l'email du premier row (doit contenir "email")
email, ok := rows[0]["email"].(string)
if !ok {
return fmt.Errorf("no email field in row")
}
// Extraire le nom (optionnel)
name, _ := rows[0]["user_name"].(string)
if name == "" {
name, _ = rows[0]["name"].(string)
}
// Préparer les données du template
now := time.Now()
tasks := make([]map[string]any, 0, len(rows))
for _, row := range rows {
task := map[string]any{
"Name": row["title"],
"Project": row["project_name"],
"Status": row["status_name"],
"StatusColor": row["status_color"],
"DueTime": "",
}
if dt, ok := row["due_date"].(time.Time); ok {
task["DueTime"] = dt.Format("15:04")
}
tasks = append(tasks, task)
}
data := map[string]any{
"Name": name,
"Date": now.Format("02/01/2006"),
"Tasks": tasks,
"TaskCount": len(tasks),
}
// Envoyer via smtp service
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
req := protocol.NewRequest("send_template", map[string]any{
"app_id": appID,
"to": email,
"template": job.Template,
"data": data,
})
resp, err := m.smtpPool.Call(ctx, req)
if err != nil {
return err
}
if resp.Status != "success" {
return fmt.Errorf("smtp: %s", resp.Error.Message)
}
return nil
}
// runHTTP exécute une requête HTTP.
func (m *CronManager) runHTTP(job *JobConfig) (string, error) {
method := job.Method
if method == "" {
method = "GET"
}
var body *bytes.Reader
if job.Body != "" {
body = bytes.NewReader([]byte(job.Body))
} else {
body = bytes.NewReader(nil)
}
req, err := http.NewRequest(method, job.URL, body)
if err != nil {
return "", err
}
for k, v := range job.Headers {
req.Header.Set(k, v)
}
client := &http.Client{Timeout: 30 * time.Second}
resp, err := client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
return "", fmt.Errorf("HTTP %d", resp.StatusCode)
}
return fmt.Sprintf("HTTP %d", resp.StatusCode), nil
}
// runService appelle un service interne.
func (m *CronManager) runService(appID string, job *JobConfig) (string, error) {
var pool *protocol.Pool
switch job.Service {
case "db", "sogoms-db":
pool = m.dbPool
case "smtp", "sogoms-smtp":
pool = m.smtpPool
case "logs", "sogoms-logs":
pool = m.logsPool
default:
return "", fmt.Errorf("unknown service: %s", job.Service)
}
if pool == nil {
return "", fmt.Errorf("service %s not available", job.Service)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
params := make(map[string]any)
for k, v := range job.Params {
params[k] = v
}
params["app_id"] = appID
req := protocol.NewRequest(job.Action, params)
resp, err := pool.Call(ctx, req)
if err != nil {
return "", err
}
if resp.Status != "success" {
return "", fmt.Errorf("%s: %s", resp.Error.Code, resp.Error.Message)
}
result, _ := json.Marshal(resp.Result)
return string(result), nil
}
// addExecution ajoute une exécution à l'historique.
func (m *CronManager) addExecution(exec *JobExecution) {
m.mu.Lock()
defer m.mu.Unlock()
m.executions = append(m.executions, exec)
}
// cleanHistory supprime les exécutions plus vieilles que historyDays.
func (m *CronManager) cleanHistory() {
m.mu.Lock()
defer m.mu.Unlock()
cutoff := time.Now().AddDate(0, 0, -m.historyDays)
var kept []*JobExecution
for _, exec := range m.executions {
if exec.StartTime.After(cutoff) {
kept = append(kept, exec)
}
}
m.executions = kept
}
// logEvent envoie un log au service logs.
func (m *CronManager) logEvent(appID, eventType string, data map[string]any) {
if m.logsPool == nil {
return
}
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
req := protocol.NewRequest("log_event", map[string]any{
"app_id": appID,
"event_type": "cron_" + eventType,
"data": data,
})
m.logsPool.Call(ctx, req)
}()
}
// ListJobs retourne la liste des jobs avec leur prochain run.
func (m *CronManager) ListJobs() []map[string]any {
m.mu.RLock()
defer m.mu.RUnlock()
var jobs []map[string]any
for appID, cfg := range m.configs {
for name, job := range cfg.Jobs {
jobs = append(jobs, map[string]any{
"app_id": appID,
"name": name,
"type": job.Type,
"schedule": job.Schedule,
"enabled": job.Enabled,
"next_run": job.nextRun.Format(time.RFC3339),
})
}
}
return jobs
}
// GetHistory retourne l'historique des exécutions.
func (m *CronManager) GetHistory(appID, jobName string, limit int) []*JobExecution {
m.mu.RLock()
defer m.mu.RUnlock()
var result []*JobExecution
for i := len(m.executions) - 1; i >= 0 && len(result) < limit; i-- {
exec := m.executions[i]
if appID != "" && exec.AppID != appID {
continue
}
if jobName != "" && exec.JobName != jobName {
continue
}
result = append(result, exec)
}
return result
}
// TriggerJob déclenche un job manuellement.
func (m *CronManager) TriggerJob(appID, jobName string) error {
m.mu.RLock()
cfg, ok := m.configs[appID]
if !ok {
m.mu.RUnlock()
return fmt.Errorf("app not found: %s", appID)
}
job, ok := cfg.Jobs[jobName]
if !ok {
m.mu.RUnlock()
return fmt.Errorf("job not found: %s", jobName)
}
m.mu.RUnlock()
go m.executeJob(appID, jobName, job, cfg)
return nil
}
func main() {
flag.Parse()
log.SetFlags(log.Ltime | log.Lshortfile)
// Charger les configurations des apps
registry := config.NewRegistry(*configDir)
if err := registry.Load(); err != nil {
log.Fatalf("load config: %v", err)
}
log.Printf("[cron] loaded apps: %v", registry.Apps())
// Pools de connexion aux services
var dbPool, smtpPool, logsPool *protocol.Pool
if *dbSocket != "" {
dbPool = protocol.NewPool(*dbSocket, 2)
}
if *smtpSocket != "" {
smtpPool = protocol.NewPool(*smtpSocket, 2)
}
if *logsSocket != "" {
logsPool = protocol.NewPool(*logsSocket, 2)
}
// Manager cron
manager := NewCronManager(registry, *configDir, dbPool, smtpPool, logsPool)
if err := manager.Load(); err != nil {
log.Fatalf("load cron config: %v", err)
}
manager.Start()
defer manager.Stop()
// Handler des requêtes IPC
handler := func(ctx context.Context, req *protocol.Request) *protocol.Response {
return handleRequest(ctx, req, manager)
}
// Démarrer le serveur
server := protocol.NewServer(*socketPath, handler)
if err := server.Start(); err != nil {
log.Fatalf("start server: %v", err)
}
log.Printf("[cron] sogoms-cron started on %s", *socketPath)
// Attendre signal d'arrêt
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Printf("[cron] shutting down...")
server.Stop()
}
func handleRequest(ctx context.Context, req *protocol.Request, manager *CronManager) *protocol.Response {
switch req.Action {
case "health":
return protocol.Success(req.ID, map[string]any{"status": "ok"})
case "list":
return handleList(req, manager)
case "trigger":
return handleTrigger(req, manager)
case "status":
return handleStatus(req, manager)
default:
return protocol.Failure(req.ID, "UNKNOWN_ACTION", "unknown action: "+req.Action)
}
}
// handleList retourne la liste des jobs.
func handleList(req *protocol.Request, manager *CronManager) *protocol.Response {
jobs := manager.ListJobs()
return protocol.Success(req.ID, map[string]any{"jobs": jobs})
}
// handleTrigger déclenche un job manuellement.
// Params: app_id, job
func handleTrigger(req *protocol.Request, manager *CronManager) *protocol.Response {
appID, _ := req.Params["app_id"].(string)
jobName, _ := req.Params["job"].(string)
if appID == "" || jobName == "" {
return protocol.Failure(req.ID, "MISSING_PARAMS", "app_id and job are required")
}
if err := manager.TriggerJob(appID, jobName); err != nil {
return protocol.Failure(req.ID, "TRIGGER_ERROR", err.Error())
}
return protocol.Success(req.ID, map[string]any{"triggered": true})
}
// handleStatus retourne l'historique des exécutions.
// Params: app_id (optionnel), job (optionnel), limit (optionnel, défaut 50)
func handleStatus(req *protocol.Request, manager *CronManager) *protocol.Response {
appID, _ := req.Params["app_id"].(string)
jobName, _ := req.Params["job"].(string)
limit := 50
if l, ok := req.Params["limit"].(float64); ok {
limit = int(l)
}
history := manager.GetHistory(appID, jobName, limit)
return protocol.Success(req.ID, map[string]any{"executions": history})
}