package helpers import ( "errors" "os" "path/filepath" "sync" "time" "forge.redroom.link/yves/meowlib" "forge.redroom.link/yves/meowlib/client" "google.golang.org/protobuf/proto" ) const maxRetriesPerServer = 3 const defaultSendTimeout = 3600 * 24 // seconds, used when job.Timeout is 0 // WriteSendJob enqueues a SendJob from the main Flutter isolate. // It is a thin wrapper over client.PushSendJob and is safe to call // concurrently with ProcessSendQueues. func WriteSendJob(storagePath string, job *client.SendJob) error { return client.PushSendJob(storagePath, job) } // ProcessSendQueues discovers every queue DB file under storagePath/queues/ // and processes each queue concurrently in its own goroutine. // Call this from the send isolate on wake-up notification or on a periodic timer. // It returns the total number of successfully sent messages across all queues. func ProcessSendQueues(storagePath string) int { queueDir := filepath.Join(storagePath, "queues") entries, err := os.ReadDir(queueDir) if err != nil { logger.Warn().Err(err).Str("dir", queueDir).Msg("ProcessSendQueues: ReadDir") return 0 } var wg sync.WaitGroup counts := make(chan int, len(entries)) for _, entry := range entries { if entry.IsDir() { continue } wg.Add(1) queue := entry.Name() go func(q string) { defer wg.Done() counts <- processSendQueue(storagePath, q) }(queue) } wg.Wait() close(counts) total := 0 for n := range counts { total += n } return total } // processSendQueue processes pending jobs for a single named queue sequentially. // It returns the number of successfully sent messages. // // For each pending job it will: // - immediately mark it failed if its timeout has elapsed // - attempt delivery, cycling through servers until one succeeds // - mark it sent on success or failed when all servers are exhausted // - stop and return when a job still has retries left (will resume on next call) func processSendQueue(storagePath, queue string) int { sent := 0 for { job, _, err := client.PeekSendJob(storagePath, queue) if err != nil { logger.Error().Err(err).Str("queue", queue).Msg("processSendQueue: PeekSendJob") return sent } if job == nil { return sent // no more pending jobs } // Hard timeout: job has been sitting too long if job.Timeout > 0 && time.Since(job.InsertedAt) > time.Duration(job.Timeout)*time.Second { job.Status = client.SendStatusFailed if err := client.UpdateSendJob(storagePath, queue, job); err != nil { logger.Error().Err(err).Int64("id", job.ID).Msg("processSendQueue: UpdateSendJob timeout") } continue // try the next pending job } serverIdx, sendErr := attemptSendJob(job) if sendErr == nil { now := time.Now() job.Status = client.SendStatusSent job.SentAt = &now job.SuccessfulServer = &serverIdx if err := client.UpdateSendJob(storagePath, queue, job); err != nil { logger.Error().Err(err).Int64("id", job.ID).Msg("processSendQueue: UpdateSendJob sent") } sent++ continue // job delivered – look for the next one } // Persist updated retry counts regardless of outcome if err := client.UpdateSendJob(storagePath, queue, job); err != nil { logger.Error().Err(err).Int64("id", job.ID).Msg("processSendQueue: UpdateSendJob retries") } if allServersExhausted(job) { job.Status = client.SendStatusFailed if err := client.UpdateSendJob(storagePath, queue, job); err != nil { logger.Error().Err(err).Int64("id", job.ID).Msg("processSendQueue: UpdateSendJob failed") } continue // all servers dead for this job – try the next one } // Job still has remaining retries on some server; stop and wait for the next poll return sent } } // attemptSendJob reads the pre-built packed message from job.File and tries // each server in order, skipping any server that has already reached // maxRetriesPerServer failures. // On the first successful POST it returns the server index. // All retry counts are incremented in-place inside job.Retries. func attemptSendJob(job *client.SendJob) (int, error) { data, err := os.ReadFile(job.File) if err != nil { return -1, err } // Ensure the retries slice is aligned with the servers slice for len(job.Retries) < len(job.Servers) { job.Retries = append(job.Retries, 0) } timeout := job.Timeout if timeout <= 0 { timeout = defaultSendTimeout } for i, srv := range job.Servers { if job.Retries[i] >= maxRetriesPerServer { continue // this server is exhausted } // Unmarshal the stored PackedUserMessage and wrap it for this server. packedUsrMsg := &meowlib.PackedUserMessage{} if err := proto.Unmarshal(data, packedUsrMsg); err != nil { return -1, err } serverData, errTxt, packErr := PackMessageForServer(packedUsrMsg, srv.GetUid()) if packErr != nil { logger.Error().Err(packErr).Str("errTxt", errTxt).Str("url", srv.Url).Msg("attemptSendJob: PackMessageForServer") job.Retries[i]++ continue } _, err = meowlib.HttpPostMessage(srv.Url, serverData, timeout) if err != nil { logger.Warn().Err(err).Str("url", srv.Url).Int("retry", job.Retries[i]+1).Msg("attemptSendJob: POST failed") job.Retries[i]++ continue } return i, nil } return -1, errors.New("all servers failed or exhausted") } // allServersExhausted returns true when every server in the job has been tried // maxRetriesPerServer times without success. func allServersExhausted(job *client.SendJob) bool { if len(job.Servers) == 0 { return true } for i := range job.Servers { if i >= len(job.Retries) || job.Retries[i] < maxRetriesPerServer { return false } } return true }