package helpers import ( "errors" "fmt" "os" "path/filepath" "sync" "time" "forge.redroom.link/yves/meowlib" "forge.redroom.link/yves/meowlib/client" "google.golang.org/protobuf/proto" ) const maxRetriesPerServer = 3 const defaultSendTimeout = 3600 * 24 // seconds, used when job.Timeout is 0 const defaultPostTimeout = 200 // CreateUserMessageAndSendJob is the single entry point for sending a message. // It creates and stores the user message, serialises the packed form to // storagePath/outbox/{dbFile}_{dbId}, and enqueues a SendJob in // storagePath/queues/{peerUid}. func CreateUserMessageAndSendJob(storagePath, message, peerUid, replyToUid string, filelist []string, servers []client.Server, timeout int) error { packedMsg, dbFile, dbId, errTxt, err := CreateAndStoreUserMessage(message, peerUid, replyToUid, filelist) if err != nil { return fmt.Errorf("%s: %w", errTxt, err) } data, err := proto.Marshal(packedMsg) if err != nil { return fmt.Errorf("CreateUserMessageAndSendJob: proto.Marshal: %w", err) } outboxDir := filepath.Join(storagePath, "outbox") if err := os.MkdirAll(outboxDir, 0700); err != nil { return fmt.Errorf("CreateUserMessageAndSendJob: MkdirAll: %w", err) } outboxFile := filepath.Join(outboxDir, fmt.Sprintf("%s_%d", dbFile, dbId)) if err := os.WriteFile(outboxFile, data, 0600); err != nil { return fmt.Errorf("CreateUserMessageAndSendJob: WriteFile: %w", err) } return client.PushSendJob(storagePath, &client.SendJob{ Queue: peerUid, File: outboxFile, Servers: servers, Timeout: timeout, }) } // ProcessSendQueues discovers every queue DB file under storagePath/queues/ // and processes each queue concurrently in its own goroutine. // Call this from the send isolate on wake-up notification or on a periodic timer. // It returns the total number of successfully sent messages across all queues. func ProcessSendQueues(storagePath string) int { queueDir := filepath.Join(storagePath, "queues") entries, err := os.ReadDir(queueDir) if err != nil { logger.Warn().Err(err).Str("dir", queueDir).Msg("ProcessSendQueues: ReadDir") return 0 } var wg sync.WaitGroup counts := make(chan int, len(entries)) for _, entry := range entries { if entry.IsDir() { continue } wg.Add(1) queue := entry.Name() go func(q string) { defer wg.Done() counts <- processSendQueue(storagePath, q) }(queue) } wg.Wait() close(counts) total := 0 for n := range counts { total += n } return total } // processSendQueue processes pending jobs for a single named queue sequentially. // It returns the number of successfully sent messages. // // For each pending job it will: // - immediately mark it failed if its TTL (job.Timeout) has elapsed – this is the // only criterion for permanent failure; retry exhaustion is never a failure cause // - attempt delivery, cycling through servers until one succeeds // - mark it sent on success // - stop and return when all servers fail this run (will resume on next call) // // Per-server retry counts (maxRetriesPerServer) are local to each call so that // past failures in previous runs never prevent future delivery attempts. func processSendQueue(storagePath, queue string) int { sent := 0 for { job, _, err := client.PeekSendJob(storagePath, queue) if err != nil { logger.Error().Err(err).Str("queue", queue).Msg("processSendQueue: PeekSendJob") return sent } if job == nil { return sent // no more pending jobs } // Hard timeout: the only criterion for permanent failure. // Use defaultSendTimeout when the job carries no explicit TTL. ttl := job.Timeout if ttl <= 0 { ttl = defaultSendTimeout } if time.Since(job.InsertedAt) > time.Duration(ttl)*time.Second { job.Status = client.SendStatusFailed if err := client.UpdateSendJob(storagePath, queue, job); err != nil { logger.Error().Err(err).Int64("id", job.ID).Msg("processSendQueue: UpdateSendJob timeout") } continue // try the next pending job } // runRetries is allocated fresh every call so it never accumulates // across processSendQueue invocations. runRetries := make([]int, len(job.Servers)) serverIdx, sendErr := attemptSendJob(job, runRetries) if sendErr == nil { now := time.Now().UTC() job.Status = client.SendStatusSent job.SentAt = &now job.SuccessfulServer = &serverIdx if err := client.UpdateSendJob(storagePath, queue, job); err != nil { logger.Error().Err(err).Int64("id", job.ID).Msg("processSendQueue: UpdateSendJob sent") } sent++ continue // job delivered – look for the next one } // All servers failed this run; stop and wait for the next poll. // Permanent failure is decided solely by the TTL check above. return sent } } // attemptSendJob reads the pre-built packed message from job.File and tries // each server in order, skipping any server that has already reached // maxRetriesPerServer failures within the current run. // On the first successful POST it returns the server index. // Retry counts are tracked in the caller-supplied retries slice (run-local, // never persisted) so that previous runs do not influence this attempt. func attemptSendJob(job *client.SendJob, retries []int) (int, error) { data, err := os.ReadFile(job.File) if err != nil { return -1, err } // Ensure the retries slice is aligned with the servers slice. for len(retries) < len(job.Servers) { retries = append(retries, 0) } for i, srv := range job.Servers { if retries[i] >= maxRetriesPerServer { continue // this server is exhausted for the current run } // Unmarshal the stored PackedUserMessage and wrap it for this server. packedUsrMsg := &meowlib.PackedUserMessage{} if err := proto.Unmarshal(data, packedUsrMsg); err != nil { return -1, err } serverData, errTxt, packErr := PackMessageForServer(packedUsrMsg, srv.GetUid()) if packErr != nil { logger.Error().Err(packErr).Str("errTxt", errTxt).Str("url", srv.Url).Msg("attemptSendJob: PackMessageForServer") retries[i]++ continue } _, err = meowlib.HttpPostMessage(srv.Url, serverData, defaultPostTimeout) if err != nil { logger.Warn().Err(err).Str("url", srv.Url).Int("retry", retries[i]+1).Msg("attemptSendJob: POST failed") retries[i]++ continue } return i, nil } return -1, errors.New("all servers failed or exhausted") }