From eb7fdc9b03d80b6ec4a0ab05f2696975233899c2 Mon Sep 17 00:00:00 2001 From: ycc Date: Thu, 26 Feb 2026 18:50:46 +0100 Subject: [PATCH] bg sender first draft --- .../{backgroundHelper.go => bgPollHelper.go} | 0 client/helpers/bgSendHelper.go | 155 ++++++++ client/helpers/bgSendHelper_test.go | 337 ++++++++++++++++++ client/sendjobs.go | 286 +++++++++++++++ client/sendjobs_test.go | 184 ++++++++++ 5 files changed, 962 insertions(+) rename client/helpers/{backgroundHelper.go => bgPollHelper.go} (100%) create mode 100644 client/helpers/bgSendHelper.go create mode 100644 client/helpers/bgSendHelper_test.go create mode 100644 client/sendjobs.go create mode 100644 client/sendjobs_test.go diff --git a/client/helpers/backgroundHelper.go b/client/helpers/bgPollHelper.go similarity index 100% rename from client/helpers/backgroundHelper.go rename to client/helpers/bgPollHelper.go diff --git a/client/helpers/bgSendHelper.go b/client/helpers/bgSendHelper.go new file mode 100644 index 0000000..b328844 --- /dev/null +++ b/client/helpers/bgSendHelper.go @@ -0,0 +1,155 @@ +package helpers + +import ( + "errors" + "os" + "path/filepath" + "sync" + "time" + + "forge.redroom.link/yves/meowlib" + "forge.redroom.link/yves/meowlib/client" +) + +const maxRetriesPerServer = 3 +const defaultSendTimeout = 3600 * 24 // seconds, used when job.Timeout is 0 + +// WriteSendJob enqueues a SendJob from the main Flutter isolate. +// It is a thin wrapper over client.PushSendJob and is safe to call +// concurrently with ProcessSendQueues. +func WriteSendJob(storagePath string, job *client.SendJob) error { + return client.PushSendJob(storagePath, job) +} + +// ProcessSendQueues discovers every queue DB file under storagePath/queues/ +// and processes each queue concurrently in its own goroutine. +// Call this from the send isolate on wake-up notification or on a periodic timer. +func ProcessSendQueues(storagePath string) { + queueDir := filepath.Join(storagePath, "queues") + entries, err := os.ReadDir(queueDir) + if err != nil { + logger.Warn().Err(err).Str("dir", queueDir).Msg("ProcessSendQueues: ReadDir") + return + } + + var wg sync.WaitGroup + for _, entry := range entries { + if entry.IsDir() { + continue + } + wg.Add(1) + queue := entry.Name() + go func(q string) { + defer wg.Done() + processSendQueue(storagePath, q) + }(queue) + } + wg.Wait() +} + +// processSendQueue processes pending jobs for a single named queue sequentially. +// +// For each pending job it will: +// - immediately mark it failed if its timeout has elapsed +// - attempt delivery, cycling through servers until one succeeds +// - mark it sent on success or failed when all servers are exhausted +// - stop and return when a job still has retries left (will resume on next call) +func processSendQueue(storagePath, queue string) { + for { + job, _, err := client.PeekSendJob(storagePath, queue) + if err != nil { + logger.Error().Err(err).Str("queue", queue).Msg("processSendQueue: PeekSendJob") + return + } + if job == nil { + return // no more pending jobs + } + + // Hard timeout: job has been sitting too long + if job.Timeout > 0 && time.Since(job.InsertedAt) > time.Duration(job.Timeout)*time.Second { + job.Status = client.SendStatusFailed + if err := client.UpdateSendJob(storagePath, queue, job); err != nil { + logger.Error().Err(err).Int64("id", job.ID).Msg("processSendQueue: UpdateSendJob timeout") + } + continue // try the next pending job + } + + serverIdx, sendErr := attemptSendJob(job) + if sendErr == nil { + now := time.Now() + job.Status = client.SendStatusSent + job.SentAt = &now + job.SuccessfulServer = &serverIdx + if err := client.UpdateSendJob(storagePath, queue, job); err != nil { + logger.Error().Err(err).Int64("id", job.ID).Msg("processSendQueue: UpdateSendJob sent") + } + continue // job delivered – look for the next one + } + + // Persist updated retry counts regardless of outcome + if err := client.UpdateSendJob(storagePath, queue, job); err != nil { + logger.Error().Err(err).Int64("id", job.ID).Msg("processSendQueue: UpdateSendJob retries") + } + + if allServersExhausted(job) { + job.Status = client.SendStatusFailed + if err := client.UpdateSendJob(storagePath, queue, job); err != nil { + logger.Error().Err(err).Int64("id", job.ID).Msg("processSendQueue: UpdateSendJob failed") + } + continue // all servers dead for this job – try the next one + } + + // Job still has remaining retries on some server; stop and wait for the next poll + return + } +} + +// attemptSendJob reads the pre-built packed message from job.File and tries +// each server in order, skipping any server that has already reached +// maxRetriesPerServer failures. +// On the first successful POST it returns the server index. +// All retry counts are incremented in-place inside job.Retries. +func attemptSendJob(job *client.SendJob) (int, error) { + data, err := os.ReadFile(job.File) + if err != nil { + return -1, err + } + + // Ensure the retries slice is aligned with the servers slice + for len(job.Retries) < len(job.Servers) { + job.Retries = append(job.Retries, 0) + } + + timeout := job.Timeout + if timeout <= 0 { + timeout = defaultSendTimeout + } + + for i, srv := range job.Servers { + if job.Retries[i] >= maxRetriesPerServer { + continue // this server is exhausted + } + _, err := meowlib.HttpPostMessage(srv.Url, data, timeout) + if err != nil { + logger.Warn().Err(err).Str("url", srv.Url).Int("retry", job.Retries[i]+1).Msg("attemptSendJob: POST failed") + job.Retries[i]++ + continue + } + return i, nil + } + return -1, errors.New("all servers failed or exhausted") +} + +// allServersExhausted returns true when every server in the job has been tried +// maxRetriesPerServer times without success. +func allServersExhausted(job *client.SendJob) bool { + if len(job.Servers) == 0 { + return true + } + for i := range job.Servers { + if i >= len(job.Retries) || job.Retries[i] < maxRetriesPerServer { + return false + } + } + return true +} diff --git a/client/helpers/bgSendHelper_test.go b/client/helpers/bgSendHelper_test.go new file mode 100644 index 0000000..46cebe1 --- /dev/null +++ b/client/helpers/bgSendHelper_test.go @@ -0,0 +1,337 @@ +package helpers + +import ( + "database/sql" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "sync/atomic" + "testing" + "time" + + "forge.redroom.link/yves/meowlib/client" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + _ "github.com/mattn/go-sqlite3" +) + +// --- test helpers ------------------------------------------------------- + +// acceptServer starts an httptest server that counts received POST /msg requests. +func acceptServer(t *testing.T, received *int64) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt64(received, 1) + w.WriteHeader(http.StatusOK) + })) +} + +// closedServerURL starts and immediately closes an httptest server so its URL +// causes "connection refused" without any wait. +func closedServerURL(t *testing.T) string { + t.Helper() + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + srv.Close() + return srv.URL +} + +// writeMsgFile writes dummy bytes to a temp file and returns the path. +func writeMsgFile(t *testing.T, dir, name string) string { + t.Helper() + p := filepath.Join(dir, name) + require.NoError(t, os.WriteFile(p, []byte("packed-server-message"), 0600)) + return p +} + +// pushJob is a convenience wrapper around client.PushSendJob. +func pushJob(t *testing.T, dir, queue, file string, servers []client.Server, timeout int) { + t.Helper() + require.NoError(t, client.PushSendJob(dir, &client.SendJob{ + Queue: queue, + File: file, + Servers: servers, + Timeout: timeout, + })) +} + +// serverSlice builds a []client.Server from plain URLs. +func serverSlice(urls ...string) []client.Server { + out := make([]client.Server, len(urls)) + for i, u := range urls { + out[i] = client.Server{Url: u} + } + return out +} + +// --- unit tests --------------------------------------------------------- + +func TestAllServersExhausted_NoServers(t *testing.T) { + job := &client.SendJob{} + assert.True(t, allServersExhausted(job)) +} + +func TestAllServersExhausted_NoneExhausted(t *testing.T) { + job := &client.SendJob{ + Servers: serverSlice("http://s1", "http://s2"), + Retries: []int{0, 0}, + } + assert.False(t, allServersExhausted(job)) +} + +func TestAllServersExhausted_PartiallyExhausted(t *testing.T) { + job := &client.SendJob{ + Servers: serverSlice("http://s1", "http://s2"), + Retries: []int{maxRetriesPerServer, 0}, + } + assert.False(t, allServersExhausted(job)) +} + +func TestAllServersExhausted_AllExhausted(t *testing.T) { + job := &client.SendJob{ + Servers: serverSlice("http://s1", "http://s2"), + Retries: []int{maxRetriesPerServer, maxRetriesPerServer}, + } + assert.True(t, allServersExhausted(job)) +} + +// TestAttemptSendJob_Success verifies a successful POST to the first server. +func TestAttemptSendJob_Success(t *testing.T) { + dir := t.TempDir() + var received int64 + srv := acceptServer(t, &received) + defer srv.Close() + + job := &client.SendJob{ + File: writeMsgFile(t, dir, "msg"), + Servers: serverSlice(srv.URL), + Timeout: 5, + Retries: []int{0}, + } + + idx, err := attemptSendJob(job) + require.NoError(t, err) + assert.Equal(t, 0, idx) + assert.Equal(t, int64(1), atomic.LoadInt64(&received)) +} + +// TestAttemptSendJob_Fallback verifies that when the first server refuses the +// connection, the second server is tried and succeeds. +func TestAttemptSendJob_Fallback(t *testing.T) { + dir := t.TempDir() + var received int64 + good := acceptServer(t, &received) + defer good.Close() + + job := &client.SendJob{ + File: writeMsgFile(t, dir, "msg"), + Servers: serverSlice(closedServerURL(t), good.URL), + Timeout: 5, + Retries: []int{0, 0}, + } + + idx, err := attemptSendJob(job) + require.NoError(t, err) + assert.Equal(t, 1, idx, "second server should have been used") + assert.Equal(t, int64(1), atomic.LoadInt64(&received)) + assert.Equal(t, 1, job.Retries[0], "first server retry should be incremented") + assert.Equal(t, 0, job.Retries[1], "second server retry must stay at zero") +} + +// TestAttemptSendJob_AllFail verifies that all retry counts are incremented +// and an error is returned when every server refuses connections. +func TestAttemptSendJob_AllFail(t *testing.T) { + dir := t.TempDir() + job := &client.SendJob{ + File: writeMsgFile(t, dir, "msg"), + Servers: serverSlice(closedServerURL(t), closedServerURL(t)), + Timeout: 5, + Retries: []int{0, 0}, + } + + _, err := attemptSendJob(job) + assert.Error(t, err) + assert.Equal(t, 1, job.Retries[0]) + assert.Equal(t, 1, job.Retries[1]) +} + +// TestAttemptSendJob_SkipsExhaustedServer verifies that a server already at +// maxRetriesPerServer is not contacted. +func TestAttemptSendJob_SkipsExhaustedServer(t *testing.T) { + dir := t.TempDir() + var received int64 + good := acceptServer(t, &received) + defer good.Close() + + job := &client.SendJob{ + File: writeMsgFile(t, dir, "msg"), + Servers: serverSlice( + closedServerURL(t), // exhausted – must be skipped + good.URL, + ), + Timeout: 5, + Retries: []int{maxRetriesPerServer, 0}, + } + + idx, err := attemptSendJob(job) + require.NoError(t, err) + assert.Equal(t, 1, idx) + assert.Equal(t, int64(1), atomic.LoadInt64(&received)) +} + +// --- integration tests -------------------------------------------------- + +// TestWriteSendJob verifies the thin WriteSendJob wrapper enqueues the job. +func TestWriteSendJob(t *testing.T) { + dir := t.TempDir() + err := WriteSendJob(dir, &client.SendJob{ + Queue: "q1", + File: "/tmp/f", + Servers: serverSlice("http://s1"), + }) + require.NoError(t, err) + + got, _, err := client.PeekSendJob(dir, "q1") + require.NoError(t, err) + require.NotNil(t, got) + assert.Equal(t, "/tmp/f", got.File) +} + +// TestProcessSendQueues_Success verifies that a pending job is delivered and +// marked as sent when the server accepts it. +func TestProcessSendQueues_Success(t *testing.T) { + dir := t.TempDir() + var received int64 + srv := acceptServer(t, &received) + defer srv.Close() + + msgPath := writeMsgFile(t, dir, "msg") + pushJob(t, dir, "q1", msgPath, serverSlice(srv.URL), 10) + + // grab the ID before processing so we can inspect the row afterward + _, id, err := client.PeekSendJob(dir, "q1") + require.NoError(t, err) + + ProcessSendQueues(dir) + + assert.Equal(t, int64(1), atomic.LoadInt64(&received), "server should have received exactly one message") + + job, err := client.GetSendJob(dir, "q1", id) + require.NoError(t, err) + require.NotNil(t, job) + assert.Equal(t, client.SendStatusSent, job.Status) + assert.NotNil(t, job.SentAt) + require.NotNil(t, job.SuccessfulServer) + assert.Equal(t, 0, *job.SuccessfulServer) +} + +// TestProcessSendQueues_ServerFallback verifies that when the first server is +// unreachable, the second server is tried successfully in the same pass. +func TestProcessSendQueues_ServerFallback(t *testing.T) { + dir := t.TempDir() + var received int64 + good := acceptServer(t, &received) + defer good.Close() + + msgPath := writeMsgFile(t, dir, "msg") + pushJob(t, dir, "q1", msgPath, serverSlice(closedServerURL(t), good.URL), 10) + + _, id, err := client.PeekSendJob(dir, "q1") + require.NoError(t, err) + + ProcessSendQueues(dir) + + assert.Equal(t, int64(1), atomic.LoadInt64(&received)) + + job, err := client.GetSendJob(dir, "q1", id) + require.NoError(t, err) + require.NotNil(t, job) + assert.Equal(t, client.SendStatusSent, job.Status) + require.NotNil(t, job.SuccessfulServer) + assert.Equal(t, 1, *job.SuccessfulServer, "second server should be recorded as successful") +} + +// TestProcessSendQueues_AllServersExhausted verifies that after maxRetriesPerServer +// failed attempts per server the job is marked as failed. +func TestProcessSendQueues_AllServersExhausted(t *testing.T) { + dir := t.TempDir() + deadURL := closedServerURL(t) + + msgPath := writeMsgFile(t, dir, "msg") + pushJob(t, dir, "q1", msgPath, serverSlice(deadURL), 0) + + _, id, err := client.PeekSendJob(dir, "q1") + require.NoError(t, err) + + // Each call to ProcessSendQueues increments the retry counter by 1. + // After maxRetriesPerServer calls, all servers are exhausted → failed. + for i := 0; i < maxRetriesPerServer; i++ { + ProcessSendQueues(dir) + } + + job, err := client.GetSendJob(dir, "q1", id) + require.NoError(t, err) + require.NotNil(t, job) + assert.Equal(t, client.SendStatusFailed, job.Status) + assert.Equal(t, maxRetriesPerServer, job.Retries[0]) +} + +// TestProcessSendQueues_JobTimeout verifies that a job whose timeout has elapsed +// is immediately marked as failed without any send attempt. +func TestProcessSendQueues_JobTimeout(t *testing.T) { + dir := t.TempDir() + var received int64 + srv := acceptServer(t, &received) + defer srv.Close() + + msgPath := writeMsgFile(t, dir, "msg") + // Timeout of 1 second; we will backdate inserted_at so the job looks expired. + pushJob(t, dir, "q1", msgPath, serverSlice(srv.URL), 1) + + _, id, err := client.PeekSendJob(dir, "q1") + require.NoError(t, err) + + // Backdate inserted_at by 60 seconds directly in the DB. + dbPath := filepath.Join(dir, "queues", "q1") + backdateJob(t, dbPath, id, -60*time.Second) + + ProcessSendQueues(dir) + + assert.Equal(t, int64(0), atomic.LoadInt64(&received), "no send should be attempted for an expired job") + + job, err := client.GetSendJob(dir, "q1", id) + require.NoError(t, err) + require.NotNil(t, job) + assert.Equal(t, client.SendStatusFailed, job.Status) +} + +// TestProcessSendQueues_MultipleQueues verifies that jobs in different queue +// files are processed concurrently and independently. +func TestProcessSendQueues_MultipleQueues(t *testing.T) { + dir := t.TempDir() + var received int64 + srv := acceptServer(t, &received) + defer srv.Close() + + for _, q := range []string{"qa", "qb", "qc"} { + msgPath := writeMsgFile(t, dir, "msg_"+q) + pushJob(t, dir, q, msgPath, serverSlice(srv.URL), 10) + } + + ProcessSendQueues(dir) + + assert.Equal(t, int64(3), atomic.LoadInt64(&received), "all three queues should have delivered their message") +} + +// backdateJob opens the SQLite file directly and shifts inserted_at by delta. +// This lets tests simulate elapsed time without sleeping. +func backdateJob(t *testing.T, dbPath string, id int64, delta time.Duration) { + t.Helper() + db, err := sql.Open("sqlite3", dbPath) + require.NoError(t, err) + defer db.Close() + newTs := time.Now().Add(delta).Unix() + _, err = db.Exec("UPDATE queue SET inserted_at = ? WHERE id = ?", newTs, id) + require.NoError(t, err) +} diff --git a/client/sendjobs.go b/client/sendjobs.go new file mode 100644 index 0000000..cc1ac3b --- /dev/null +++ b/client/sendjobs.go @@ -0,0 +1,286 @@ +package client + +import ( + "database/sql" + "encoding/json" + "os" + "path/filepath" + "time" + + _ "github.com/mattn/go-sqlite3" +) + +// SendStatus represents the delivery state of a queued send job. +type SendStatus int + +const ( + SendStatusPending SendStatus = 0 // waiting to be sent + SendStatusSent SendStatus = 1 // successfully delivered + SendStatusFailed SendStatus = 2 // all servers exhausted or timed out +) + +// SendJob describes a message to send, together with its delivery tracking state. +// +// The File field holds the path of a pre-built packed server message (binary). +// Servers is tried in order; after MaxRetriesPerServer failures on one server +// the next one is attempted. +// +// Tracking fields (ID, InsertedAt, Status, SentAt, Retries, SuccessfulServer) +// are managed by the queue functions and must not be set by the caller. +type SendJob struct { + // --- caller-supplied fields --- + Queue string `json:"queue,omitempty"` + File string `json:"file,omitempty"` + Servers []Server `json:"servers,omitempty"` + Timeout int `json:"timeout,omitempty"` // seconds; 0 = no timeout + + // --- DB-managed tracking fields (not serialised by the caller) --- + ID int64 + InsertedAt time.Time + Status SendStatus + SentAt *time.Time + Retries []int // retry count per server index + SuccessfulServer *int // index into Servers of the server that accepted +} + +func sendQueueDbPath(storagePath, queue string) string { + return filepath.Join(storagePath, "queues", queue) +} + +func openOrCreateSendQueue(dbPath string) (*sql.DB, error) { + dir := filepath.Dir(dbPath) + if _, err := os.Stat(dir); os.IsNotExist(err) { + if err := os.MkdirAll(dir, 0700); err != nil { + return nil, err + } + } + if _, err := os.Stat(dbPath); os.IsNotExist(err) { + f, err := os.Create(dbPath) + if err != nil { + return nil, err + } + f.Close() + } + db, err := sql.Open("sqlite3", dbPath) + if err != nil { + return nil, err + } + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS queue ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + file TEXT NOT NULL, + servers TEXT NOT NULL, + timeout INTEGER NOT NULL DEFAULT 0, + inserted_at INTEGER NOT NULL, + status INTEGER NOT NULL DEFAULT 0, + sent_at INTEGER, + retries TEXT NOT NULL DEFAULT '[]', + successful_server INTEGER + )`) + if err != nil { + db.Close() + return nil, err + } + return db, nil +} + +// PushSendJob appends a SendJob to the SQLite queue identified by job.Queue inside storagePath. +// The initial retry counters are set to zero for each server. +func PushSendJob(storagePath string, job *SendJob) error { + db, err := openOrCreateSendQueue(sendQueueDbPath(storagePath, job.Queue)) + if err != nil { + return err + } + defer db.Close() + + serversJSON, err := json.Marshal(job.Servers) + if err != nil { + return err + } + retriesJSON, err := json.Marshal(make([]int, len(job.Servers))) + if err != nil { + return err + } + _, err = db.Exec( + `INSERT INTO queue(file, servers, timeout, inserted_at, status, retries) VALUES(?,?,?,?,?,?)`, + job.File, string(serversJSON), job.Timeout, time.Now().Unix(), SendStatusPending, string(retriesJSON), + ) + return err +} + +// PeekSendJob returns the oldest pending SendJob from the named queue. +// Returns nil, 0, nil when the queue has no pending jobs. +func PeekSendJob(storagePath, queue string) (*SendJob, int64, error) { + db, err := openOrCreateSendQueue(sendQueueDbPath(storagePath, queue)) + if err != nil { + return nil, 0, err + } + defer db.Close() + + var ( + id int64 + file string + serversJSON string + timeout int + insertedAt int64 + status SendStatus + sentAt sql.NullInt64 + retriesJSON string + successfulServer sql.NullInt64 + ) + err = db.QueryRow( + `SELECT id, file, servers, timeout, inserted_at, status, sent_at, retries, successful_server + FROM queue WHERE status = ? ORDER BY id ASC LIMIT 1`, + SendStatusPending, + ).Scan(&id, &file, &serversJSON, &timeout, &insertedAt, &status, &sentAt, &retriesJSON, &successfulServer) + if err == sql.ErrNoRows { + return nil, 0, nil + } + if err != nil { + return nil, 0, err + } + + var servers []Server + if err := json.Unmarshal([]byte(serversJSON), &servers); err != nil { + return nil, 0, err + } + var retries []int + if err := json.Unmarshal([]byte(retriesJSON), &retries); err != nil { + return nil, 0, err + } + + job := &SendJob{ + ID: id, + Queue: queue, + File: file, + Servers: servers, + Timeout: timeout, + InsertedAt: time.Unix(insertedAt, 0), + Status: status, + Retries: retries, + } + if sentAt.Valid { + t := time.Unix(sentAt.Int64, 0) + job.SentAt = &t + } + if successfulServer.Valid { + v := int(successfulServer.Int64) + job.SuccessfulServer = &v + } + return job, id, nil +} + +// UpdateSendJob persists the tracking fields (status, sent_at, retries, successful_server) +// for a job that was previously returned by PeekSendJob. +func UpdateSendJob(storagePath, queue string, job *SendJob) error { + db, err := openOrCreateSendQueue(sendQueueDbPath(storagePath, queue)) + if err != nil { + return err + } + defer db.Close() + + retriesJSON, err := json.Marshal(job.Retries) + if err != nil { + return err + } + var sentAt any + if job.SentAt != nil { + sentAt = job.SentAt.Unix() + } + var successfulServer any + if job.SuccessfulServer != nil { + successfulServer = *job.SuccessfulServer + } + _, err = db.Exec( + `UPDATE queue SET status=?, sent_at=?, retries=?, successful_server=? WHERE id=?`, + job.Status, sentAt, string(retriesJSON), successfulServer, job.ID, + ) + return err +} + +// GetSendJob retrieves any job by row id regardless of its status. +// Returns nil, nil when no row with that id exists. +func GetSendJob(storagePath, queue string, id int64) (*SendJob, error) { + db, err := openOrCreateSendQueue(sendQueueDbPath(storagePath, queue)) + if err != nil { + return nil, err + } + defer db.Close() + + var ( + file string + serversJSON string + timeout int + insertedAt int64 + status SendStatus + sentAt sql.NullInt64 + retriesJSON string + successfulServer sql.NullInt64 + ) + err = db.QueryRow( + `SELECT file, servers, timeout, inserted_at, status, sent_at, retries, successful_server + FROM queue WHERE id = ?`, + id, + ).Scan(&file, &serversJSON, &timeout, &insertedAt, &status, &sentAt, &retriesJSON, &successfulServer) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + + var servers []Server + if err := json.Unmarshal([]byte(serversJSON), &servers); err != nil { + return nil, err + } + var retries []int + if err := json.Unmarshal([]byte(retriesJSON), &retries); err != nil { + return nil, err + } + + job := &SendJob{ + ID: id, + Queue: queue, + File: file, + Servers: servers, + Timeout: timeout, + InsertedAt: time.Unix(insertedAt, 0), + Status: status, + Retries: retries, + } + if sentAt.Valid { + t := time.Unix(sentAt.Int64, 0) + job.SentAt = &t + } + if successfulServer.Valid { + v := int(successfulServer.Int64) + job.SuccessfulServer = &v + } + return job, nil +} + +// DeleteSendJob removes a row by id from the named queue. +// If the queue is empty after deletion, the DB file is removed. +func DeleteSendJob(storagePath, queue string, id int64) error { + dbPath := sendQueueDbPath(storagePath, queue) + db, err := openOrCreateSendQueue(dbPath) + if err != nil { + return err + } + + if _, err = db.Exec(`DELETE FROM queue WHERE id=?`, id); err != nil { + db.Close() + return err + } + + var count int + if err = db.QueryRow(`SELECT COUNT(*) FROM queue`).Scan(&count); err != nil { + db.Close() + return err + } + db.Close() + + if count == 0 { + return os.Remove(dbPath) + } + return nil +} diff --git a/client/sendjobs_test.go b/client/sendjobs_test.go new file mode 100644 index 0000000..e66b433 --- /dev/null +++ b/client/sendjobs_test.go @@ -0,0 +1,184 @@ +package client + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// helpers ---------------------------------------------------------------- + +func makeServers(urls ...string) []Server { + out := make([]Server, len(urls)) + for i, u := range urls { + out[i] = Server{Url: u} + } + return out +} + +func pushJob(t *testing.T, dir, queue, file string, servers []Server, timeout int) { + t.Helper() + require.NoError(t, PushSendJob(dir, &SendJob{ + Queue: queue, + File: file, + Servers: servers, + Timeout: timeout, + })) +} + +// tests ------------------------------------------------------------------ + +func TestPushAndPeekSendJob(t *testing.T) { + dir := t.TempDir() + servers := makeServers("http://s1.example", "http://s2.example") + pushJob(t, dir, "q1", "/tmp/msg", servers, 60) + + got, id, err := PeekSendJob(dir, "q1") + require.NoError(t, err) + require.NotNil(t, got) + + assert.Greater(t, id, int64(0)) + assert.Equal(t, "/tmp/msg", got.File) + assert.Equal(t, 60, got.Timeout) + assert.Equal(t, SendStatusPending, got.Status) + assert.Nil(t, got.SentAt) + assert.Nil(t, got.SuccessfulServer) + assert.Len(t, got.Retries, 2) + assert.Equal(t, 0, got.Retries[0]) + assert.Equal(t, 0, got.Retries[1]) + assert.WithinDuration(t, time.Now(), got.InsertedAt, 5*time.Second) +} + +func TestPeekSendJob_EmptyQueue(t *testing.T) { + dir := t.TempDir() + got, id, err := PeekSendJob(dir, "empty") + require.NoError(t, err) + assert.Nil(t, got) + assert.Equal(t, int64(0), id) +} + +func TestPeekSendJob_OldestFirst(t *testing.T) { + dir := t.TempDir() + for _, f := range []string{"/a", "/b", "/c"} { + pushJob(t, dir, "q1", f, makeServers("http://s1"), 0) + } + + got, _, err := PeekSendJob(dir, "q1") + require.NoError(t, err) + require.NotNil(t, got) + assert.Equal(t, "/a", got.File) +} + +func TestPeekSendJob_SkipsNonPending(t *testing.T) { + dir := t.TempDir() + for _, f := range []string{"/a", "/b", "/c"} { + pushJob(t, dir, "q1", f, makeServers("http://s1"), 0) + } + + // mark first as sent + first, _, err := PeekSendJob(dir, "q1") + require.NoError(t, err) + first.Status = SendStatusSent + require.NoError(t, UpdateSendJob(dir, "q1", first)) + + // mark second as failed + second, _, err := PeekSendJob(dir, "q1") + require.NoError(t, err) + second.Status = SendStatusFailed + require.NoError(t, UpdateSendJob(dir, "q1", second)) + + // only /c is still pending + got, _, err := PeekSendJob(dir, "q1") + require.NoError(t, err) + require.NotNil(t, got) + assert.Equal(t, "/c", got.File) +} + +func TestUpdateSendJob_Sent(t *testing.T) { + dir := t.TempDir() + pushJob(t, dir, "q1", "/tmp/f", makeServers("http://s1"), 10) + + job, id, err := PeekSendJob(dir, "q1") + require.NoError(t, err) + require.NotNil(t, job) + + now := time.Now() + srvIdx := 0 + job.Status = SendStatusSent + job.SentAt = &now + job.SuccessfulServer = &srvIdx + require.NoError(t, UpdateSendJob(dir, "q1", job)) + + // persisted correctly + got, err := GetSendJob(dir, "q1", id) + require.NoError(t, err) + require.NotNil(t, got) + assert.Equal(t, SendStatusSent, got.Status) + assert.NotNil(t, got.SentAt) + assert.WithinDuration(t, now, *got.SentAt, time.Second) + require.NotNil(t, got.SuccessfulServer) + assert.Equal(t, 0, *got.SuccessfulServer) + + // no more pending jobs + pending, _, err := PeekSendJob(dir, "q1") + require.NoError(t, err) + assert.Nil(t, pending) +} + +func TestUpdateSendJob_Retries(t *testing.T) { + dir := t.TempDir() + pushJob(t, dir, "q1", "/tmp/f", makeServers("http://s1", "http://s2"), 10) + + job, id, err := PeekSendJob(dir, "q1") + require.NoError(t, err) + require.NotNil(t, job) + + job.Retries[0] = 2 + require.NoError(t, UpdateSendJob(dir, "q1", job)) + + got, err := GetSendJob(dir, "q1", id) + require.NoError(t, err) + require.NotNil(t, got) + assert.Equal(t, SendStatusPending, got.Status) // still pending + assert.Equal(t, 2, got.Retries[0]) + assert.Equal(t, 0, got.Retries[1]) +} + +func TestGetSendJob_NotFound(t *testing.T) { + dir := t.TempDir() + pushJob(t, dir, "q1", "/tmp/f", makeServers("http://s1"), 0) + + got, err := GetSendJob(dir, "q1", 9999) + require.NoError(t, err) + assert.Nil(t, got) +} + +func TestDeleteSendJob_KeepsDbWhenNotEmpty(t *testing.T) { + dir := t.TempDir() + pushJob(t, dir, "q1", "/a", makeServers("http://s1"), 0) + pushJob(t, dir, "q1", "/b", makeServers("http://s1"), 0) + + _, id, err := PeekSendJob(dir, "q1") + require.NoError(t, err) + require.NoError(t, DeleteSendJob(dir, "q1", id)) + + // DB file must still exist (second row remains) + _, statErr := os.Stat(filepath.Join(dir, "queues", "q1")) + require.NoError(t, statErr) +} + +func TestDeleteSendJob_RemovesDbWhenEmpty(t *testing.T) { + dir := t.TempDir() + pushJob(t, dir, "q1", "/a", makeServers("http://s1"), 0) + + _, id, err := PeekSendJob(dir, "q1") + require.NoError(t, err) + require.NoError(t, DeleteSendJob(dir, "q1", id)) + + _, statErr := os.Stat(filepath.Join(dir, "queues", "q1")) + assert.True(t, os.IsNotExist(statErr), "DB file should be removed when queue is empty") +}