Files
meowlib/client/helpers/bgSendHelper.go
ycc e6f9bc796e
Some checks failed
continuous-integration/drone/push Build is failing
count sends
2026-02-27 20:58:15 +01:00

181 lines
5.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package helpers
import (
"errors"
"os"
"path/filepath"
"sync"
"time"
"forge.redroom.link/yves/meowlib"
"forge.redroom.link/yves/meowlib/client"
"google.golang.org/protobuf/proto"
)
const maxRetriesPerServer = 3
const defaultSendTimeout = 3600 * 24 // seconds, used when job.Timeout is 0
// WriteSendJob enqueues a SendJob from the main Flutter isolate.
// It is a thin wrapper over client.PushSendJob and is safe to call
// concurrently with ProcessSendQueues.
func WriteSendJob(storagePath string, job *client.SendJob) error {
return client.PushSendJob(storagePath, job)
}
// ProcessSendQueues discovers every queue DB file under storagePath/queues/
// and processes each queue concurrently in its own goroutine.
// Call this from the send isolate on wake-up notification or on a periodic timer.
// It returns the total number of successfully sent messages across all queues.
func ProcessSendQueues(storagePath string) int {
queueDir := filepath.Join(storagePath, "queues")
entries, err := os.ReadDir(queueDir)
if err != nil {
logger.Warn().Err(err).Str("dir", queueDir).Msg("ProcessSendQueues: ReadDir")
return 0
}
var wg sync.WaitGroup
counts := make(chan int, len(entries))
for _, entry := range entries {
if entry.IsDir() {
continue
}
wg.Add(1)
queue := entry.Name()
go func(q string) {
defer wg.Done()
counts <- processSendQueue(storagePath, q)
}(queue)
}
wg.Wait()
close(counts)
total := 0
for n := range counts {
total += n
}
return total
}
// processSendQueue processes pending jobs for a single named queue sequentially.
// It returns the number of successfully sent messages.
//
// For each pending job it will:
// - immediately mark it failed if its timeout has elapsed
// - attempt delivery, cycling through servers until one succeeds
// - mark it sent on success or failed when all servers are exhausted
// - stop and return when a job still has retries left (will resume on next call)
func processSendQueue(storagePath, queue string) int {
sent := 0
for {
job, _, err := client.PeekSendJob(storagePath, queue)
if err != nil {
logger.Error().Err(err).Str("queue", queue).Msg("processSendQueue: PeekSendJob")
return sent
}
if job == nil {
return sent // no more pending jobs
}
// Hard timeout: job has been sitting too long
if job.Timeout > 0 && time.Since(job.InsertedAt) > time.Duration(job.Timeout)*time.Second {
job.Status = client.SendStatusFailed
if err := client.UpdateSendJob(storagePath, queue, job); err != nil {
logger.Error().Err(err).Int64("id", job.ID).Msg("processSendQueue: UpdateSendJob timeout")
}
continue // try the next pending job
}
serverIdx, sendErr := attemptSendJob(job)
if sendErr == nil {
now := time.Now()
job.Status = client.SendStatusSent
job.SentAt = &now
job.SuccessfulServer = &serverIdx
if err := client.UpdateSendJob(storagePath, queue, job); err != nil {
logger.Error().Err(err).Int64("id", job.ID).Msg("processSendQueue: UpdateSendJob sent")
}
sent++
continue // job delivered look for the next one
}
// Persist updated retry counts regardless of outcome
if err := client.UpdateSendJob(storagePath, queue, job); err != nil {
logger.Error().Err(err).Int64("id", job.ID).Msg("processSendQueue: UpdateSendJob retries")
}
if allServersExhausted(job) {
job.Status = client.SendStatusFailed
if err := client.UpdateSendJob(storagePath, queue, job); err != nil {
logger.Error().Err(err).Int64("id", job.ID).Msg("processSendQueue: UpdateSendJob failed")
}
continue // all servers dead for this job try the next one
}
// Job still has remaining retries on some server; stop and wait for the next poll
return sent
}
}
// attemptSendJob reads the pre-built packed message from job.File and tries
// each server in order, skipping any server that has already reached
// maxRetriesPerServer failures.
// On the first successful POST it returns the server index.
// All retry counts are incremented in-place inside job.Retries.
func attemptSendJob(job *client.SendJob) (int, error) {
data, err := os.ReadFile(job.File)
if err != nil {
return -1, err
}
// Ensure the retries slice is aligned with the servers slice
for len(job.Retries) < len(job.Servers) {
job.Retries = append(job.Retries, 0)
}
timeout := job.Timeout
if timeout <= 0 {
timeout = defaultSendTimeout
}
for i, srv := range job.Servers {
if job.Retries[i] >= maxRetriesPerServer {
continue // this server is exhausted
}
// Unmarshal the stored PackedUserMessage and wrap it for this server.
packedUsrMsg := &meowlib.PackedUserMessage{}
if err := proto.Unmarshal(data, packedUsrMsg); err != nil {
return -1, err
}
serverData, errTxt, packErr := PackMessageForServer(packedUsrMsg, srv.GetUid())
if packErr != nil {
logger.Error().Err(packErr).Str("errTxt", errTxt).Str("url", srv.Url).Msg("attemptSendJob: PackMessageForServer")
job.Retries[i]++
continue
}
_, err = meowlib.HttpPostMessage(srv.Url, serverData, timeout)
if err != nil {
logger.Warn().Err(err).Str("url", srv.Url).Int("retry", job.Retries[i]+1).Msg("attemptSendJob: POST failed")
job.Retries[i]++
continue
}
return i, nil
}
return -1, errors.New("all servers failed or exhausted")
}
// allServersExhausted returns true when every server in the job has been tried
// maxRetriesPerServer times without success.
func allServersExhausted(job *client.SendJob) bool {
if len(job.Servers) == 0 {
return true
}
for i := range job.Servers {
if i >= len(job.Retries) || job.Retries[i] < maxRetriesPerServer {
return false
}
}
return true
}