package client import ( "database/sql" "encoding/json" "os" "path/filepath" "time" _ "github.com/mattn/go-sqlite3" ) // SendStatus represents the delivery state of a queued send job. type SendStatus int const ( SendStatusPending SendStatus = 0 // waiting to be sent SendStatusSent SendStatus = 1 // successfully delivered SendStatusFailed SendStatus = 2 // all servers exhausted or timed out ) // SendJob describes a message to send, together with its delivery tracking state. // // The File field holds the path of a pre-built packed server message (binary). // Servers is tried in order; after MaxRetriesPerServer failures on one server // the next one is attempted. // // Tracking fields (ID, InsertedAt, Status, SentAt, Retries, SuccessfulServer) // are managed by the queue functions and must not be set by the caller. type SendJob struct { // --- caller-supplied fields --- Queue string `json:"queue,omitempty"` // uid of destination peer, used for naming the sent queue sqlite db File string `json:"file,omitempty"` // filename which content shall be sent to the server as message payload MessageDbFile string `json:"message_db_file,omitempty"` // peer message DB UUID (no path, no suffix) — from peer.LastMessage.Dbfile MessageDbId int64 `json:"message_db_id,omitempty"` // SQLite row ID in MessageDbFile — from peer.LastMessage.Dbid Servers []Server `json:"servers,omitempty"` Timeout int `json:"timeout,omitempty"` // seconds; 0 = no timeout // --- DB-managed tracking fields (not serialised by the caller) --- ID int64 InsertedAt time.Time Status SendStatus SentAt *time.Time Retries []int // retry count per server index SuccessfulServer *int // index into Servers of the server that accepted } func sendQueueDbPath(storagePath, queue string) string { return filepath.Join(storagePath, "queues", queue) } func openOrCreateSendQueue(dbPath string) (*sql.DB, error) { dir := filepath.Dir(dbPath) if _, err := os.Stat(dir); os.IsNotExist(err) { if err := os.MkdirAll(dir, 0700); err != nil { return nil, err } } if _, err := os.Stat(dbPath); os.IsNotExist(err) { f, err := os.Create(dbPath) if err != nil { return nil, err } f.Close() } db, err := sql.Open("sqlite3", dbPath) if err != nil { return nil, err } _, err = db.Exec(`CREATE TABLE IF NOT EXISTS queue ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, file TEXT NOT NULL, message_db_file TEXT NOT NULL DEFAULT '', message_db_id INTEGER NOT NULL DEFAULT 0, servers TEXT NOT NULL, timeout INTEGER NOT NULL DEFAULT 0, inserted_at INTEGER NOT NULL, status INTEGER NOT NULL DEFAULT 0, sent_at INTEGER, retries TEXT NOT NULL DEFAULT '[]', successful_server INTEGER )`) if err != nil { db.Close() return nil, err } // Migration: add columns to existing DBs that pre-date this schema version. // SQLite returns an error if the column already exists; we silently ignore it. db.Exec(`ALTER TABLE queue ADD COLUMN message_db_file TEXT NOT NULL DEFAULT ''`) db.Exec(`ALTER TABLE queue ADD COLUMN message_db_id INTEGER NOT NULL DEFAULT 0`) return db, nil } // PushSendJob appends a SendJob to the SQLite queue identified by job.Queue inside storagePath. // The initial retry counters are set to zero for each server. func PushSendJob(storagePath string, job *SendJob) error { db, err := openOrCreateSendQueue(sendQueueDbPath(storagePath, job.Queue)) if err != nil { return err } defer db.Close() serversJSON, err := json.Marshal(job.Servers) if err != nil { return err } retriesJSON, err := json.Marshal(make([]int, len(job.Servers))) if err != nil { return err } _, err = db.Exec( `INSERT INTO queue(file, message_db_file, message_db_id, servers, timeout, inserted_at, status, retries) VALUES(?,?,?,?,?,?,?,?)`, job.File, job.MessageDbFile, job.MessageDbId, string(serversJSON), job.Timeout, time.Now().Unix(), SendStatusPending, string(retriesJSON), ) return err } // PeekSendJob returns the oldest pending SendJob from the named queue. // Returns nil, 0, nil when the queue has no pending jobs. func PeekSendJob(storagePath, queue string) (*SendJob, int64, error) { db, err := openOrCreateSendQueue(sendQueueDbPath(storagePath, queue)) if err != nil { return nil, 0, err } defer db.Close() var ( id int64 file string messageDbFile string messageDbId int64 serversJSON string timeout int insertedAt int64 status SendStatus sentAt sql.NullInt64 retriesJSON string successfulServer sql.NullInt64 ) err = db.QueryRow( `SELECT id, file, message_db_file, message_db_id, servers, timeout, inserted_at, status, sent_at, retries, successful_server FROM queue WHERE status = ? ORDER BY id ASC LIMIT 1`, SendStatusPending, ).Scan(&id, &file, &messageDbFile, &messageDbId, &serversJSON, &timeout, &insertedAt, &status, &sentAt, &retriesJSON, &successfulServer) if err == sql.ErrNoRows { return nil, 0, nil } if err != nil { return nil, 0, err } var servers []Server if err := json.Unmarshal([]byte(serversJSON), &servers); err != nil { return nil, 0, err } var retries []int if err := json.Unmarshal([]byte(retriesJSON), &retries); err != nil { return nil, 0, err } job := &SendJob{ ID: id, Queue: queue, File: file, MessageDbFile: messageDbFile, MessageDbId: messageDbId, Servers: servers, Timeout: timeout, InsertedAt: time.Unix(insertedAt, 0), Status: status, Retries: retries, } if sentAt.Valid { t := time.Unix(sentAt.Int64, 0) job.SentAt = &t } if successfulServer.Valid { v := int(successfulServer.Int64) job.SuccessfulServer = &v } return job, id, nil } // UpdateSendJob persists the tracking fields (status, sent_at, retries, successful_server) // for a job that was previously returned by PeekSendJob. func UpdateSendJob(storagePath, queue string, job *SendJob) error { db, err := openOrCreateSendQueue(sendQueueDbPath(storagePath, queue)) if err != nil { return err } defer db.Close() retriesJSON, err := json.Marshal(job.Retries) if err != nil { return err } var sentAt any if job.SentAt != nil { sentAt = job.SentAt.Unix() } var successfulServer any if job.SuccessfulServer != nil { successfulServer = *job.SuccessfulServer } _, err = db.Exec( `UPDATE queue SET status=?, sent_at=?, retries=?, successful_server=? WHERE id=?`, job.Status, sentAt, string(retriesJSON), successfulServer, job.ID, ) return err } // GetSendJob retrieves any job by row id regardless of its status. // Returns nil, nil when no row with that id exists. func GetSendJob(storagePath, queue string, id int64) (*SendJob, error) { db, err := openOrCreateSendQueue(sendQueueDbPath(storagePath, queue)) if err != nil { return nil, err } defer db.Close() var ( file string messageDbFile string messageDbId int64 serversJSON string timeout int insertedAt int64 status SendStatus sentAt sql.NullInt64 retriesJSON string successfulServer sql.NullInt64 ) err = db.QueryRow( `SELECT file, message_db_file, message_db_id, servers, timeout, inserted_at, status, sent_at, retries, successful_server FROM queue WHERE id = ?`, id, ).Scan(&file, &messageDbFile, &messageDbId, &serversJSON, &timeout, &insertedAt, &status, &sentAt, &retriesJSON, &successfulServer) if err == sql.ErrNoRows { return nil, nil } if err != nil { return nil, err } var servers []Server if err := json.Unmarshal([]byte(serversJSON), &servers); err != nil { return nil, err } var retries []int if err := json.Unmarshal([]byte(retriesJSON), &retries); err != nil { return nil, err } job := &SendJob{ ID: id, Queue: queue, File: file, MessageDbFile: messageDbFile, MessageDbId: messageDbId, Servers: servers, Timeout: timeout, InsertedAt: time.Unix(insertedAt, 0), Status: status, Retries: retries, } if sentAt.Valid { t := time.Unix(sentAt.Int64, 0) job.SentAt = &t } if successfulServer.Valid { v := int(successfulServer.Int64) job.SuccessfulServer = &v } return job, nil } // GetSentJobs returns all successfully-sent jobs from the named queue, // ordered oldest first. Use this to reconcile delivery status with the // message store and clean up completed entries. func GetSentJobs(storagePath, queue string) ([]*SendJob, error) { db, err := openOrCreateSendQueue(sendQueueDbPath(storagePath, queue)) if err != nil { return nil, err } defer db.Close() rows, err := db.Query( `SELECT id, file, message_db_file, message_db_id, servers, timeout, inserted_at, sent_at, retries, successful_server FROM queue WHERE status = ? ORDER BY id ASC`, SendStatusSent, ) if err != nil { return nil, err } defer rows.Close() var jobs []*SendJob for rows.Next() { var ( id int64 file string messageDbFile string messageDbId int64 serversJSON string timeout int insertedAt int64 sentAt sql.NullInt64 retriesJSON string successfulServer sql.NullInt64 ) if err := rows.Scan(&id, &file, &messageDbFile, &messageDbId, &serversJSON, &timeout, &insertedAt, &sentAt, &retriesJSON, &successfulServer); err != nil { return nil, err } var servers []Server if err := json.Unmarshal([]byte(serversJSON), &servers); err != nil { return nil, err } var retries []int if err := json.Unmarshal([]byte(retriesJSON), &retries); err != nil { return nil, err } job := &SendJob{ ID: id, Queue: queue, File: file, MessageDbFile: messageDbFile, MessageDbId: messageDbId, Servers: servers, Timeout: timeout, InsertedAt: time.Unix(insertedAt, 0), Status: SendStatusSent, Retries: retries, } if sentAt.Valid { t := time.Unix(sentAt.Int64, 0) job.SentAt = &t } if successfulServer.Valid { v := int(successfulServer.Int64) job.SuccessfulServer = &v } jobs = append(jobs, job) } return jobs, nil } // DeleteSendJob removes a row by id from the named queue. // If the queue is empty after deletion, the DB file is removed. func DeleteSendJob(storagePath, queue string, id int64) error { dbPath := sendQueueDbPath(storagePath, queue) db, err := openOrCreateSendQueue(dbPath) if err != nil { return err } if _, err = db.Exec(`DELETE FROM queue WHERE id=?`, id); err != nil { db.Close() return err } var count int if err = db.QueryRow(`SELECT COUNT(*) FROM queue`).Scan(&count); err != nil { db.Close() return err } db.Close() if count == 0 { return os.Remove(dbPath) } return nil }