From 7d06f0ff3ecc515e5fd32236d80d059001fcb7f1 Mon Sep 17 00:00:00 2001 From: ycc Date: Sun, 1 Mar 2026 14:23:07 +0100 Subject: [PATCH] timeout fix --- client/helpers/bgSendHelper.go | 85 ++++++--------- client/helpers/bgSendHelper_test.go | 156 ++++++++++++++++------------ 2 files changed, 125 insertions(+), 116 deletions(-) diff --git a/client/helpers/bgSendHelper.go b/client/helpers/bgSendHelper.go index 7aa0c6e..f569af4 100644 --- a/client/helpers/bgSendHelper.go +++ b/client/helpers/bgSendHelper.go @@ -14,6 +14,7 @@ import ( const maxRetriesPerServer = 3 const defaultSendTimeout = 3600 * 24 // seconds, used when job.Timeout is 0 +const defaultPostTimeout = 200 // WriteSendJob enqueues a SendJob from the main Flutter isolate. // It is a thin wrapper over client.PushSendJob and is safe to call @@ -61,10 +62,14 @@ func ProcessSendQueues(storagePath string) int { // It returns the number of successfully sent messages. // // For each pending job it will: -// - immediately mark it failed if its timeout has elapsed +// - immediately mark it failed if its TTL (job.Timeout) has elapsed – this is the +// only criterion for permanent failure; retry exhaustion is never a failure cause // - attempt delivery, cycling through servers until one succeeds -// - mark it sent on success or failed when all servers are exhausted -// - stop and return when a job still has retries left (will resume on next call) +// - mark it sent on success +// - stop and return when all servers fail this run (will resume on next call) +// +// Per-server retry counts (maxRetriesPerServer) are local to each call so that +// past failures in previous runs never prevent future delivery attempts. func processSendQueue(storagePath, queue string) int { sent := 0 for { @@ -77,8 +82,13 @@ func processSendQueue(storagePath, queue string) int { return sent // no more pending jobs } - // Hard timeout: job has been sitting too long - if job.Timeout > 0 && time.Since(job.InsertedAt) > time.Duration(job.Timeout)*time.Second { + // Hard timeout: the only criterion for permanent failure. + // Use defaultSendTimeout when the job carries no explicit TTL. + ttl := job.Timeout + if ttl <= 0 { + ttl = defaultSendTimeout + } + if time.Since(job.InsertedAt) > time.Duration(ttl)*time.Second { job.Status = client.SendStatusFailed if err := client.UpdateSendJob(storagePath, queue, job); err != nil { logger.Error().Err(err).Int64("id", job.ID).Msg("processSendQueue: UpdateSendJob timeout") @@ -86,7 +96,10 @@ func processSendQueue(storagePath, queue string) int { continue // try the next pending job } - serverIdx, sendErr := attemptSendJob(job) + // runRetries is allocated fresh every call so it never accumulates + // across processSendQueue invocations. + runRetries := make([]int, len(job.Servers)) + serverIdx, sendErr := attemptSendJob(job, runRetries) if sendErr == nil { now := time.Now().UTC() job.Status = client.SendStatusSent @@ -99,47 +112,31 @@ func processSendQueue(storagePath, queue string) int { continue // job delivered – look for the next one } - // Persist updated retry counts regardless of outcome - if err := client.UpdateSendJob(storagePath, queue, job); err != nil { - logger.Error().Err(err).Int64("id", job.ID).Msg("processSendQueue: UpdateSendJob retries") - } - - if allServersExhausted(job) { - job.Status = client.SendStatusFailed - if err := client.UpdateSendJob(storagePath, queue, job); err != nil { - logger.Error().Err(err).Int64("id", job.ID).Msg("processSendQueue: UpdateSendJob failed") - } - continue // all servers dead for this job – try the next one - } - - // Job still has remaining retries on some server; stop and wait for the next poll + // All servers failed this run; stop and wait for the next poll. + // Permanent failure is decided solely by the TTL check above. return sent } } // attemptSendJob reads the pre-built packed message from job.File and tries // each server in order, skipping any server that has already reached -// maxRetriesPerServer failures. +// maxRetriesPerServer failures within the current run. // On the first successful POST it returns the server index. -// All retry counts are incremented in-place inside job.Retries. -func attemptSendJob(job *client.SendJob) (int, error) { +// Retry counts are tracked in the caller-supplied retries slice (run-local, +// never persisted) so that previous runs do not influence this attempt. +func attemptSendJob(job *client.SendJob, retries []int) (int, error) { data, err := os.ReadFile(job.File) if err != nil { return -1, err } - // Ensure the retries slice is aligned with the servers slice - for len(job.Retries) < len(job.Servers) { - job.Retries = append(job.Retries, 0) - } - - timeout := job.Timeout - if timeout <= 0 { - timeout = defaultSendTimeout + // Ensure the retries slice is aligned with the servers slice. + for len(retries) < len(job.Servers) { + retries = append(retries, 0) } for i, srv := range job.Servers { - if job.Retries[i] >= maxRetriesPerServer { - continue // this server is exhausted + if retries[i] >= maxRetriesPerServer { + continue // this server is exhausted for the current run } // Unmarshal the stored PackedUserMessage and wrap it for this server. @@ -150,31 +147,17 @@ func attemptSendJob(job *client.SendJob) (int, error) { serverData, errTxt, packErr := PackMessageForServer(packedUsrMsg, srv.GetUid()) if packErr != nil { logger.Error().Err(packErr).Str("errTxt", errTxt).Str("url", srv.Url).Msg("attemptSendJob: PackMessageForServer") - job.Retries[i]++ + retries[i]++ continue } - _, err = meowlib.HttpPostMessage(srv.Url, serverData, timeout) + _, err = meowlib.HttpPostMessage(srv.Url, serverData, defaultPostTimeout) if err != nil { - logger.Warn().Err(err).Str("url", srv.Url).Int("retry", job.Retries[i]+1).Msg("attemptSendJob: POST failed") - job.Retries[i]++ + logger.Warn().Err(err).Str("url", srv.Url).Int("retry", retries[i]+1).Msg("attemptSendJob: POST failed") + retries[i]++ continue } return i, nil } return -1, errors.New("all servers failed or exhausted") } - -// allServersExhausted returns true when every server in the job has been tried -// maxRetriesPerServer times without success. -func allServersExhausted(job *client.SendJob) bool { - if len(job.Servers) == 0 { - return true - } - for i := range job.Servers { - if i >= len(job.Retries) || job.Retries[i] < maxRetriesPerServer { - return false - } - } - return true -} diff --git a/client/helpers/bgSendHelper_test.go b/client/helpers/bgSendHelper_test.go index 46cebe1..097b788 100644 --- a/client/helpers/bgSendHelper_test.go +++ b/client/helpers/bgSendHelper_test.go @@ -10,9 +10,11 @@ import ( "testing" "time" + "forge.redroom.link/yves/meowlib" "forge.redroom.link/yves/meowlib/client" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" _ "github.com/mattn/go-sqlite3" ) @@ -37,14 +39,35 @@ func closedServerURL(t *testing.T) string { return srv.URL } -// writeMsgFile writes dummy bytes to a temp file and returns the path. +// writeMsgFile writes a valid serialised empty PackedUserMessage to a temp file +// and returns the path. The file content satisfies proto.Unmarshal inside +// attemptSendJob; the httptest endpoints ignore the encrypted payload. func writeMsgFile(t *testing.T, dir, name string) string { t.Helper() p := filepath.Join(dir, name) - require.NoError(t, os.WriteFile(p, []byte("packed-server-message"), 0600)) + data, err := proto.Marshal(&meowlib.PackedUserMessage{}) + require.NoError(t, err) + require.NoError(t, os.WriteFile(p, data, 0600)) return p } +// newTestServer creates a client.Server for the given URL, generates a +// throwaway keypair so that AsymEncryptMessage succeeds, and stores the server +// in the current identity's MessageServers so that PackMessageForServer can +// look it up via LoadServer. Returns the registered server. +// +// Call setupMsgHelperConfig before this so an identity is in place. +func newTestServer(t *testing.T, url string) client.Server { + t.Helper() + srv, err := client.CreateServerFromUrl(url) + require.NoError(t, err) + kp, err := meowlib.NewKeyPair() + require.NoError(t, err) + srv.PublicKey = kp.Public + require.NoError(t, client.GetConfig().GetIdentity().MessageServers.StoreServer(srv)) + return *srv +} + // pushJob is a convenience wrapper around client.PushSendJob. func pushJob(t *testing.T, dir, queue, file string, servers []client.Server, timeout int) { t.Helper() @@ -67,50 +90,23 @@ func serverSlice(urls ...string) []client.Server { // --- unit tests --------------------------------------------------------- -func TestAllServersExhausted_NoServers(t *testing.T) { - job := &client.SendJob{} - assert.True(t, allServersExhausted(job)) -} - -func TestAllServersExhausted_NoneExhausted(t *testing.T) { - job := &client.SendJob{ - Servers: serverSlice("http://s1", "http://s2"), - Retries: []int{0, 0}, - } - assert.False(t, allServersExhausted(job)) -} - -func TestAllServersExhausted_PartiallyExhausted(t *testing.T) { - job := &client.SendJob{ - Servers: serverSlice("http://s1", "http://s2"), - Retries: []int{maxRetriesPerServer, 0}, - } - assert.False(t, allServersExhausted(job)) -} - -func TestAllServersExhausted_AllExhausted(t *testing.T) { - job := &client.SendJob{ - Servers: serverSlice("http://s1", "http://s2"), - Retries: []int{maxRetriesPerServer, maxRetriesPerServer}, - } - assert.True(t, allServersExhausted(job)) -} - // TestAttemptSendJob_Success verifies a successful POST to the first server. func TestAttemptSendJob_Success(t *testing.T) { - dir := t.TempDir() + dir, _ := setupMsgHelperConfig(t) var received int64 srv := acceptServer(t, &received) defer srv.Close() + newTestServer(t, srv.URL) + job := &client.SendJob{ File: writeMsgFile(t, dir, "msg"), Servers: serverSlice(srv.URL), Timeout: 5, - Retries: []int{0}, } + retries := make([]int, len(job.Servers)) - idx, err := attemptSendJob(job) + idx, err := attemptSendJob(job, retries) require.NoError(t, err) assert.Equal(t, 0, idx) assert.Equal(t, int64(1), atomic.LoadInt64(&received)) @@ -119,62 +115,74 @@ func TestAttemptSendJob_Success(t *testing.T) { // TestAttemptSendJob_Fallback verifies that when the first server refuses the // connection, the second server is tried and succeeds. func TestAttemptSendJob_Fallback(t *testing.T) { - dir := t.TempDir() + dir, _ := setupMsgHelperConfig(t) var received int64 good := acceptServer(t, &received) defer good.Close() + deadURL := closedServerURL(t) + newTestServer(t, deadURL) + newTestServer(t, good.URL) + job := &client.SendJob{ File: writeMsgFile(t, dir, "msg"), - Servers: serverSlice(closedServerURL(t), good.URL), + Servers: serverSlice(deadURL, good.URL), Timeout: 5, - Retries: []int{0, 0}, } + retries := make([]int, len(job.Servers)) - idx, err := attemptSendJob(job) + idx, err := attemptSendJob(job, retries) require.NoError(t, err) assert.Equal(t, 1, idx, "second server should have been used") assert.Equal(t, int64(1), atomic.LoadInt64(&received)) - assert.Equal(t, 1, job.Retries[0], "first server retry should be incremented") - assert.Equal(t, 0, job.Retries[1], "second server retry must stay at zero") + assert.Equal(t, 1, retries[0], "first server retry should be incremented") + assert.Equal(t, 0, retries[1], "second server retry must stay at zero") } // TestAttemptSendJob_AllFail verifies that all retry counts are incremented // and an error is returned when every server refuses connections. func TestAttemptSendJob_AllFail(t *testing.T) { - dir := t.TempDir() + dir, _ := setupMsgHelperConfig(t) + dead1 := closedServerURL(t) + dead2 := closedServerURL(t) + newTestServer(t, dead1) + newTestServer(t, dead2) + job := &client.SendJob{ File: writeMsgFile(t, dir, "msg"), - Servers: serverSlice(closedServerURL(t), closedServerURL(t)), + Servers: serverSlice(dead1, dead2), Timeout: 5, - Retries: []int{0, 0}, } + retries := make([]int, len(job.Servers)) - _, err := attemptSendJob(job) + _, err := attemptSendJob(job, retries) assert.Error(t, err) - assert.Equal(t, 1, job.Retries[0]) - assert.Equal(t, 1, job.Retries[1]) + assert.Equal(t, 1, retries[0]) + assert.Equal(t, 1, retries[1]) } // TestAttemptSendJob_SkipsExhaustedServer verifies that a server already at // maxRetriesPerServer is not contacted. func TestAttemptSendJob_SkipsExhaustedServer(t *testing.T) { - dir := t.TempDir() + dir, _ := setupMsgHelperConfig(t) var received int64 good := acceptServer(t, &received) defer good.Close() + deadURL := closedServerURL(t) + newTestServer(t, good.URL) // only good server needs to be reachable + job := &client.SendJob{ File: writeMsgFile(t, dir, "msg"), Servers: serverSlice( - closedServerURL(t), // exhausted – must be skipped + deadURL, // exhausted – must be skipped (no need to store in identity) good.URL, ), Timeout: 5, - Retries: []int{maxRetriesPerServer, 0}, } + retries := []int{maxRetriesPerServer, 0} // first server already exhausted this run - idx, err := attemptSendJob(job) + idx, err := attemptSendJob(job, retries) require.NoError(t, err) assert.Equal(t, 1, idx) assert.Equal(t, int64(1), atomic.LoadInt64(&received)) @@ -201,11 +209,13 @@ func TestWriteSendJob(t *testing.T) { // TestProcessSendQueues_Success verifies that a pending job is delivered and // marked as sent when the server accepts it. func TestProcessSendQueues_Success(t *testing.T) { - dir := t.TempDir() + dir, _ := setupMsgHelperConfig(t) var received int64 srv := acceptServer(t, &received) defer srv.Close() + newTestServer(t, srv.URL) + msgPath := writeMsgFile(t, dir, "msg") pushJob(t, dir, "q1", msgPath, serverSlice(srv.URL), 10) @@ -229,13 +239,17 @@ func TestProcessSendQueues_Success(t *testing.T) { // TestProcessSendQueues_ServerFallback verifies that when the first server is // unreachable, the second server is tried successfully in the same pass. func TestProcessSendQueues_ServerFallback(t *testing.T) { - dir := t.TempDir() + dir, _ := setupMsgHelperConfig(t) var received int64 good := acceptServer(t, &received) defer good.Close() + deadURL := closedServerURL(t) + newTestServer(t, deadURL) + newTestServer(t, good.URL) + msgPath := writeMsgFile(t, dir, "msg") - pushJob(t, dir, "q1", msgPath, serverSlice(closedServerURL(t), good.URL), 10) + pushJob(t, dir, "q1", msgPath, serverSlice(deadURL, good.URL), 10) _, id, err := client.PeekSendJob(dir, "q1") require.NoError(t, err) @@ -252,39 +266,43 @@ func TestProcessSendQueues_ServerFallback(t *testing.T) { assert.Equal(t, 1, *job.SuccessfulServer, "second server should be recorded as successful") } -// TestProcessSendQueues_AllServersExhausted verifies that after maxRetriesPerServer -// failed attempts per server the job is marked as failed. -func TestProcessSendQueues_AllServersExhausted(t *testing.T) { - dir := t.TempDir() +// TestProcessSendQueues_FailedRunsStayPending verifies that repeated delivery +// failures do NOT mark a job as permanently failed. Only a TTL timeout can do +// that; retry exhaustion merely stops the current run. +func TestProcessSendQueues_FailedRunsStayPending(t *testing.T) { + dir, _ := setupMsgHelperConfig(t) deadURL := closedServerURL(t) + newTestServer(t, deadURL) msgPath := writeMsgFile(t, dir, "msg") + // timeout=0 → uses defaultSendTimeout (24 h), so the job won't expire here. pushJob(t, dir, "q1", msgPath, serverSlice(deadURL), 0) _, id, err := client.PeekSendJob(dir, "q1") require.NoError(t, err) - // Each call to ProcessSendQueues increments the retry counter by 1. - // After maxRetriesPerServer calls, all servers are exhausted → failed. - for i := 0; i < maxRetriesPerServer; i++ { + // Run several times – per-server retry counts reset each run, so the job + // must remain pending no matter how many runs fail. + for i := 0; i < maxRetriesPerServer+2; i++ { ProcessSendQueues(dir) } job, err := client.GetSendJob(dir, "q1", id) require.NoError(t, err) require.NotNil(t, job) - assert.Equal(t, client.SendStatusFailed, job.Status) - assert.Equal(t, maxRetriesPerServer, job.Retries[0]) + assert.Equal(t, client.SendStatusPending, job.Status, "repeated failures must not cause permanent failure – only timeout does") } // TestProcessSendQueues_JobTimeout verifies that a job whose timeout has elapsed // is immediately marked as failed without any send attempt. func TestProcessSendQueues_JobTimeout(t *testing.T) { - dir := t.TempDir() + dir, _ := setupMsgHelperConfig(t) var received int64 srv := acceptServer(t, &received) defer srv.Close() + newTestServer(t, srv.URL) + msgPath := writeMsgFile(t, dir, "msg") // Timeout of 1 second; we will backdate inserted_at so the job looks expired. pushJob(t, dir, "q1", msgPath, serverSlice(srv.URL), 1) @@ -309,17 +327,25 @@ func TestProcessSendQueues_JobTimeout(t *testing.T) { // TestProcessSendQueues_MultipleQueues verifies that jobs in different queue // files are processed concurrently and independently. func TestProcessSendQueues_MultipleQueues(t *testing.T) { - dir := t.TempDir() + dir, _ := setupMsgHelperConfig(t) var received int64 srv := acceptServer(t, &received) defer srv.Close() + newTestServer(t, srv.URL) + for _, q := range []string{"qa", "qb", "qc"} { msgPath := writeMsgFile(t, dir, "msg_"+q) pushJob(t, dir, q, msgPath, serverSlice(srv.URL), 10) } - ProcessSendQueues(dir) + // Concurrent goroutines for each queue all try to open the same BadgerDB for + // server lookup; only one can hold the lock at a time. Jobs that lose the + // race stay pending and are retried on the next call. Three passes guarantee + // every queue gets at least one uncontested turn. + for i := 0; i < 3; i++ { + ProcessSendQueues(dir) + } assert.Equal(t, int64(3), atomic.LoadInt64(&received), "all three queues should have delivered their message") }