From e6f9bc796e962ea238bd99ec5fdc4720b149c9fa Mon Sep 17 00:00:00 2001 From: ycc Date: Fri, 27 Feb 2026 20:58:15 +0100 Subject: [PATCH] count sends --- client/helpers/bgSendHelper.go | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/client/helpers/bgSendHelper.go b/client/helpers/bgSendHelper.go index 99af814..0ca62b4 100644 --- a/client/helpers/bgSendHelper.go +++ b/client/helpers/bgSendHelper.go @@ -25,15 +25,17 @@ func WriteSendJob(storagePath string, job *client.SendJob) error { // ProcessSendQueues discovers every queue DB file under storagePath/queues/ // and processes each queue concurrently in its own goroutine. // Call this from the send isolate on wake-up notification or on a periodic timer. -func ProcessSendQueues(storagePath string) { +// It returns the total number of successfully sent messages across all queues. +func ProcessSendQueues(storagePath string) int { queueDir := filepath.Join(storagePath, "queues") entries, err := os.ReadDir(queueDir) if err != nil { logger.Warn().Err(err).Str("dir", queueDir).Msg("ProcessSendQueues: ReadDir") - return + return 0 } var wg sync.WaitGroup + counts := make(chan int, len(entries)) for _, entry := range entries { if entry.IsDir() { continue @@ -42,28 +44,37 @@ func ProcessSendQueues(storagePath string) { queue := entry.Name() go func(q string) { defer wg.Done() - processSendQueue(storagePath, q) + counts <- processSendQueue(storagePath, q) }(queue) } wg.Wait() + close(counts) + + total := 0 + for n := range counts { + total += n + } + return total } // processSendQueue processes pending jobs for a single named queue sequentially. +// It returns the number of successfully sent messages. // // For each pending job it will: // - immediately mark it failed if its timeout has elapsed // - attempt delivery, cycling through servers until one succeeds // - mark it sent on success or failed when all servers are exhausted // - stop and return when a job still has retries left (will resume on next call) -func processSendQueue(storagePath, queue string) { +func processSendQueue(storagePath, queue string) int { + sent := 0 for { job, _, err := client.PeekSendJob(storagePath, queue) if err != nil { logger.Error().Err(err).Str("queue", queue).Msg("processSendQueue: PeekSendJob") - return + return sent } if job == nil { - return // no more pending jobs + return sent // no more pending jobs } // Hard timeout: job has been sitting too long @@ -84,6 +95,7 @@ func processSendQueue(storagePath, queue string) { if err := client.UpdateSendJob(storagePath, queue, job); err != nil { logger.Error().Err(err).Int64("id", job.ID).Msg("processSendQueue: UpdateSendJob sent") } + sent++ continue // job delivered – look for the next one } @@ -101,7 +113,7 @@ func processSendQueue(storagePath, queue string) { } // Job still has remaining retries on some server; stop and wait for the next poll - return + return sent } }