Initial commit - SOGOMS v1.0.0
- sogoctl: supervisor avec health checks et restart auto - sogoway: gateway HTTP, auth JWT, routing par hostname - sogoms-db: microservice MariaDB avec pool par application - Protocol IPC Unix socket JSON length-prefixed - Config YAML multi-application (prokov) - Deploy script pour container Alpine gw3 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
169
internal/protocol/client.go
Normal file
169
internal/protocol/client.go
Normal file
@@ -0,0 +1,169 @@
|
||||
package protocol
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Client permet d'appeler un microservice via Unix socket.
|
||||
type Client struct {
|
||||
socketPath string
|
||||
conn net.Conn
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// NewClient crée un nouveau client.
|
||||
func NewClient(socketPath string) *Client {
|
||||
return &Client{
|
||||
socketPath: socketPath,
|
||||
}
|
||||
}
|
||||
|
||||
// Connect établit la connexion au socket.
|
||||
func (c *Client) Connect() error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.conn != nil {
|
||||
return nil // Déjà connecté
|
||||
}
|
||||
|
||||
conn, err := net.Dial("unix", c.socketPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("dial %s: %w", c.socketPath, err)
|
||||
}
|
||||
c.conn = conn
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close ferme la connexion.
|
||||
func (c *Client) Close() error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.conn != nil {
|
||||
err := c.conn.Close()
|
||||
c.conn = nil
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Call envoie une requête et attend la réponse.
|
||||
func (c *Client) Call(ctx context.Context, req *Request) (*Response, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.conn == nil {
|
||||
return nil, fmt.Errorf("not connected")
|
||||
}
|
||||
|
||||
// Timeout
|
||||
timeout := time.Duration(req.TimeoutMs) * time.Millisecond
|
||||
if timeout == 0 {
|
||||
timeout = 5 * time.Second
|
||||
}
|
||||
deadline := time.Now().Add(timeout)
|
||||
c.conn.SetDeadline(deadline)
|
||||
|
||||
// Encoder et envoyer la requête
|
||||
data, err := Encode(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("encode request: %w", err)
|
||||
}
|
||||
|
||||
if err := writeMessage(c.conn, data); err != nil {
|
||||
c.conn = nil // Connexion cassée
|
||||
return nil, fmt.Errorf("write request: %w", err)
|
||||
}
|
||||
|
||||
// Lire la réponse
|
||||
respData, err := readMessage(c.conn)
|
||||
if err != nil {
|
||||
c.conn = nil // Connexion cassée
|
||||
return nil, fmt.Errorf("read response: %w", err)
|
||||
}
|
||||
|
||||
resp, err := DecodeResponse(respData)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decode response: %w", err)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// CallAction raccourci pour appeler une action.
|
||||
func (c *Client) CallAction(ctx context.Context, action string, params map[string]any) (*Response, error) {
|
||||
req := NewRequest(action, params)
|
||||
return c.Call(ctx, req)
|
||||
}
|
||||
|
||||
// Pool gère un pool de connexions vers un service.
|
||||
type Pool struct {
|
||||
socketPath string
|
||||
clients chan *Client
|
||||
maxSize int
|
||||
}
|
||||
|
||||
// NewPool crée un pool de connexions.
|
||||
func NewPool(socketPath string, size int) *Pool {
|
||||
return &Pool{
|
||||
socketPath: socketPath,
|
||||
clients: make(chan *Client, size),
|
||||
maxSize: size,
|
||||
}
|
||||
}
|
||||
|
||||
// Get obtient un client du pool.
|
||||
func (p *Pool) Get() (*Client, error) {
|
||||
select {
|
||||
case client := <-p.clients:
|
||||
return client, nil
|
||||
default:
|
||||
// Créer un nouveau client
|
||||
client := NewClient(p.socketPath)
|
||||
if err := client.Connect(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return client, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Put remet un client dans le pool.
|
||||
func (p *Pool) Put(client *Client) {
|
||||
select {
|
||||
case p.clients <- client:
|
||||
// OK, remis dans le pool
|
||||
default:
|
||||
// Pool plein, fermer le client
|
||||
client.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Call obtient un client, exécute l'appel, et remet le client.
|
||||
func (p *Pool) Call(ctx context.Context, req *Request) (*Response, error) {
|
||||
client, err := p.Get()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := client.Call(ctx, req)
|
||||
if err != nil {
|
||||
client.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p.Put(client)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// Close ferme tous les clients du pool.
|
||||
func (p *Pool) Close() {
|
||||
close(p.clients)
|
||||
for client := range p.clients {
|
||||
client.Close()
|
||||
}
|
||||
}
|
||||
90
internal/protocol/message.go
Normal file
90
internal/protocol/message.go
Normal file
@@ -0,0 +1,90 @@
|
||||
// Package protocol définit le protocole de communication IPC via Unix sockets.
|
||||
// Format: 4 bytes (big-endian length) + JSON payload
|
||||
package protocol
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Request représente une requête envoyée à un microservice.
|
||||
type Request struct {
|
||||
ID string `json:"id"`
|
||||
Action string `json:"action"`
|
||||
TenantID string `json:"tenant_id,omitempty"`
|
||||
Params map[string]any `json:"params,omitempty"`
|
||||
TimeoutMs int `json:"timeout_ms,omitempty"`
|
||||
}
|
||||
|
||||
// Response représente la réponse d'un microservice.
|
||||
type Response struct {
|
||||
ID string `json:"id"`
|
||||
Status string `json:"status"` // "success" ou "error"
|
||||
Result any `json:"result,omitempty"`
|
||||
Error *Error `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// Error détaille une erreur.
|
||||
type Error struct {
|
||||
Code string `json:"code"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
// NewRequest crée une nouvelle requête avec un ID unique.
|
||||
func NewRequest(action string, params map[string]any) *Request {
|
||||
return &Request{
|
||||
ID: generateID(),
|
||||
Action: action,
|
||||
Params: params,
|
||||
TimeoutMs: 5000, // 5s par défaut
|
||||
}
|
||||
}
|
||||
|
||||
// Success crée une réponse de succès.
|
||||
func Success(reqID string, result any) *Response {
|
||||
return &Response{
|
||||
ID: reqID,
|
||||
Status: "success",
|
||||
Result: result,
|
||||
}
|
||||
}
|
||||
|
||||
// Failure crée une réponse d'erreur.
|
||||
func Failure(reqID string, code, message string) *Response {
|
||||
return &Response{
|
||||
ID: reqID,
|
||||
Status: "error",
|
||||
Error: &Error{
|
||||
Code: code,
|
||||
Message: message,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Encode sérialise un message en JSON.
|
||||
func Encode(v any) ([]byte, error) {
|
||||
return json.Marshal(v)
|
||||
}
|
||||
|
||||
// DecodeRequest désérialise une requête JSON.
|
||||
func DecodeRequest(data []byte) (*Request, error) {
|
||||
var req Request
|
||||
if err := json.Unmarshal(data, &req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &req, nil
|
||||
}
|
||||
|
||||
// DecodeResponse désérialise une réponse JSON.
|
||||
func DecodeResponse(data []byte) (*Response, error) {
|
||||
var resp Response
|
||||
if err := json.Unmarshal(data, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
// generateID génère un ID unique basé sur le timestamp.
|
||||
func generateID() string {
|
||||
return "req_" + time.Now().Format("20060102150405.000000")
|
||||
}
|
||||
174
internal/protocol/server.go
Normal file
174
internal/protocol/server.go
Normal file
@@ -0,0 +1,174 @@
|
||||
package protocol
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Handler traite une requête et retourne une réponse.
|
||||
type Handler func(ctx context.Context, req *Request) *Response
|
||||
|
||||
// Server écoute sur un Unix socket et dispatch les requêtes.
|
||||
type Server struct {
|
||||
socketPath string
|
||||
handler Handler
|
||||
listener net.Listener
|
||||
wg sync.WaitGroup
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// NewServer crée un nouveau serveur.
|
||||
func NewServer(socketPath string, handler Handler) *Server {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &Server{
|
||||
socketPath: socketPath,
|
||||
handler: handler,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
// Start démarre le serveur.
|
||||
func (s *Server) Start() error {
|
||||
// Supprimer le socket existant
|
||||
if err := os.RemoveAll(s.socketPath); err != nil {
|
||||
return fmt.Errorf("remove socket: %w", err)
|
||||
}
|
||||
|
||||
listener, err := net.Listen("unix", s.socketPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("listen: %w", err)
|
||||
}
|
||||
s.listener = listener
|
||||
|
||||
// Permissions socket
|
||||
if err := os.Chmod(s.socketPath, 0660); err != nil {
|
||||
return fmt.Errorf("chmod socket: %w", err)
|
||||
}
|
||||
|
||||
log.Printf("[server] listening on %s", s.socketPath)
|
||||
|
||||
go s.acceptLoop()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop arrête le serveur proprement.
|
||||
func (s *Server) Stop() {
|
||||
s.cancel()
|
||||
if s.listener != nil {
|
||||
s.listener.Close()
|
||||
}
|
||||
s.wg.Wait()
|
||||
os.RemoveAll(s.socketPath)
|
||||
log.Printf("[server] stopped")
|
||||
}
|
||||
|
||||
// acceptLoop accepte les connexions entrantes.
|
||||
func (s *Server) acceptLoop() {
|
||||
for {
|
||||
conn, err := s.listener.Accept()
|
||||
if err != nil {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
default:
|
||||
log.Printf("[server] accept error: %v", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
s.wg.Add(1)
|
||||
go s.handleConn(conn)
|
||||
}
|
||||
}
|
||||
|
||||
// handleConn gère une connexion client.
|
||||
func (s *Server) handleConn(conn net.Conn) {
|
||||
defer s.wg.Done()
|
||||
defer conn.Close()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Lire la requête
|
||||
data, err := readMessage(conn)
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
log.Printf("[server] read error: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Décoder la requête
|
||||
req, err := DecodeRequest(data)
|
||||
if err != nil {
|
||||
resp := Failure("", "DECODE_ERROR", err.Error())
|
||||
writeResponse(conn, resp)
|
||||
continue
|
||||
}
|
||||
|
||||
// Traiter la requête
|
||||
resp := s.handler(s.ctx, req)
|
||||
|
||||
// Envoyer la réponse
|
||||
if err := writeResponse(conn, resp); err != nil {
|
||||
log.Printf("[server] write error: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// readMessage lit un message length-prefixed.
|
||||
func readMessage(r io.Reader) ([]byte, error) {
|
||||
// Lire les 4 bytes de longueur
|
||||
lengthBuf := make([]byte, 4)
|
||||
if _, err := io.ReadFull(r, lengthBuf); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
length := binary.BigEndian.Uint32(lengthBuf)
|
||||
if length == 0 || length > 10*1024*1024 { // Max 10MB
|
||||
return nil, fmt.Errorf("invalid message length: %d", length)
|
||||
}
|
||||
|
||||
// Lire le payload
|
||||
data := make([]byte, length)
|
||||
if _, err := io.ReadFull(r, data); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// writeResponse écrit une réponse.
|
||||
func writeResponse(w io.Writer, resp *Response) error {
|
||||
data, err := Encode(resp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return writeMessage(w, data)
|
||||
}
|
||||
|
||||
// writeMessage écrit un message length-prefixed.
|
||||
func writeMessage(w io.Writer, data []byte) error {
|
||||
// Écrire la longueur
|
||||
lengthBuf := make([]byte, 4)
|
||||
binary.BigEndian.PutUint32(lengthBuf, uint32(len(data)))
|
||||
if _, err := w.Write(lengthBuf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Écrire le payload
|
||||
_, err := w.Write(data)
|
||||
return err
|
||||
}
|
||||
Reference in New Issue
Block a user