Files
meowlib/client/helpers/bgSendHelper_test.go
ycc eb7fdc9b03
Some checks failed
continuous-integration/drone/push Build is failing
bg sender first draft
2026-02-26 18:50:46 +01:00

338 lines
10 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package helpers
import (
"database/sql"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"sync/atomic"
"testing"
"time"
"forge.redroom.link/yves/meowlib/client"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
_ "github.com/mattn/go-sqlite3"
)
// --- test helpers -------------------------------------------------------
// acceptServer starts an httptest server that counts received POST /msg requests.
func acceptServer(t *testing.T, received *int64) *httptest.Server {
t.Helper()
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt64(received, 1)
w.WriteHeader(http.StatusOK)
}))
}
// closedServerURL starts and immediately closes an httptest server so its URL
// causes "connection refused" without any wait.
func closedServerURL(t *testing.T) string {
t.Helper()
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
srv.Close()
return srv.URL
}
// writeMsgFile writes dummy bytes to a temp file and returns the path.
func writeMsgFile(t *testing.T, dir, name string) string {
t.Helper()
p := filepath.Join(dir, name)
require.NoError(t, os.WriteFile(p, []byte("packed-server-message"), 0600))
return p
}
// pushJob is a convenience wrapper around client.PushSendJob.
func pushJob(t *testing.T, dir, queue, file string, servers []client.Server, timeout int) {
t.Helper()
require.NoError(t, client.PushSendJob(dir, &client.SendJob{
Queue: queue,
File: file,
Servers: servers,
Timeout: timeout,
}))
}
// serverSlice builds a []client.Server from plain URLs.
func serverSlice(urls ...string) []client.Server {
out := make([]client.Server, len(urls))
for i, u := range urls {
out[i] = client.Server{Url: u}
}
return out
}
// --- unit tests ---------------------------------------------------------
func TestAllServersExhausted_NoServers(t *testing.T) {
job := &client.SendJob{}
assert.True(t, allServersExhausted(job))
}
func TestAllServersExhausted_NoneExhausted(t *testing.T) {
job := &client.SendJob{
Servers: serverSlice("http://s1", "http://s2"),
Retries: []int{0, 0},
}
assert.False(t, allServersExhausted(job))
}
func TestAllServersExhausted_PartiallyExhausted(t *testing.T) {
job := &client.SendJob{
Servers: serverSlice("http://s1", "http://s2"),
Retries: []int{maxRetriesPerServer, 0},
}
assert.False(t, allServersExhausted(job))
}
func TestAllServersExhausted_AllExhausted(t *testing.T) {
job := &client.SendJob{
Servers: serverSlice("http://s1", "http://s2"),
Retries: []int{maxRetriesPerServer, maxRetriesPerServer},
}
assert.True(t, allServersExhausted(job))
}
// TestAttemptSendJob_Success verifies a successful POST to the first server.
func TestAttemptSendJob_Success(t *testing.T) {
dir := t.TempDir()
var received int64
srv := acceptServer(t, &received)
defer srv.Close()
job := &client.SendJob{
File: writeMsgFile(t, dir, "msg"),
Servers: serverSlice(srv.URL),
Timeout: 5,
Retries: []int{0},
}
idx, err := attemptSendJob(job)
require.NoError(t, err)
assert.Equal(t, 0, idx)
assert.Equal(t, int64(1), atomic.LoadInt64(&received))
}
// TestAttemptSendJob_Fallback verifies that when the first server refuses the
// connection, the second server is tried and succeeds.
func TestAttemptSendJob_Fallback(t *testing.T) {
dir := t.TempDir()
var received int64
good := acceptServer(t, &received)
defer good.Close()
job := &client.SendJob{
File: writeMsgFile(t, dir, "msg"),
Servers: serverSlice(closedServerURL(t), good.URL),
Timeout: 5,
Retries: []int{0, 0},
}
idx, err := attemptSendJob(job)
require.NoError(t, err)
assert.Equal(t, 1, idx, "second server should have been used")
assert.Equal(t, int64(1), atomic.LoadInt64(&received))
assert.Equal(t, 1, job.Retries[0], "first server retry should be incremented")
assert.Equal(t, 0, job.Retries[1], "second server retry must stay at zero")
}
// TestAttemptSendJob_AllFail verifies that all retry counts are incremented
// and an error is returned when every server refuses connections.
func TestAttemptSendJob_AllFail(t *testing.T) {
dir := t.TempDir()
job := &client.SendJob{
File: writeMsgFile(t, dir, "msg"),
Servers: serverSlice(closedServerURL(t), closedServerURL(t)),
Timeout: 5,
Retries: []int{0, 0},
}
_, err := attemptSendJob(job)
assert.Error(t, err)
assert.Equal(t, 1, job.Retries[0])
assert.Equal(t, 1, job.Retries[1])
}
// TestAttemptSendJob_SkipsExhaustedServer verifies that a server already at
// maxRetriesPerServer is not contacted.
func TestAttemptSendJob_SkipsExhaustedServer(t *testing.T) {
dir := t.TempDir()
var received int64
good := acceptServer(t, &received)
defer good.Close()
job := &client.SendJob{
File: writeMsgFile(t, dir, "msg"),
Servers: serverSlice(
closedServerURL(t), // exhausted must be skipped
good.URL,
),
Timeout: 5,
Retries: []int{maxRetriesPerServer, 0},
}
idx, err := attemptSendJob(job)
require.NoError(t, err)
assert.Equal(t, 1, idx)
assert.Equal(t, int64(1), atomic.LoadInt64(&received))
}
// --- integration tests --------------------------------------------------
// TestWriteSendJob verifies the thin WriteSendJob wrapper enqueues the job.
func TestWriteSendJob(t *testing.T) {
dir := t.TempDir()
err := WriteSendJob(dir, &client.SendJob{
Queue: "q1",
File: "/tmp/f",
Servers: serverSlice("http://s1"),
})
require.NoError(t, err)
got, _, err := client.PeekSendJob(dir, "q1")
require.NoError(t, err)
require.NotNil(t, got)
assert.Equal(t, "/tmp/f", got.File)
}
// TestProcessSendQueues_Success verifies that a pending job is delivered and
// marked as sent when the server accepts it.
func TestProcessSendQueues_Success(t *testing.T) {
dir := t.TempDir()
var received int64
srv := acceptServer(t, &received)
defer srv.Close()
msgPath := writeMsgFile(t, dir, "msg")
pushJob(t, dir, "q1", msgPath, serverSlice(srv.URL), 10)
// grab the ID before processing so we can inspect the row afterward
_, id, err := client.PeekSendJob(dir, "q1")
require.NoError(t, err)
ProcessSendQueues(dir)
assert.Equal(t, int64(1), atomic.LoadInt64(&received), "server should have received exactly one message")
job, err := client.GetSendJob(dir, "q1", id)
require.NoError(t, err)
require.NotNil(t, job)
assert.Equal(t, client.SendStatusSent, job.Status)
assert.NotNil(t, job.SentAt)
require.NotNil(t, job.SuccessfulServer)
assert.Equal(t, 0, *job.SuccessfulServer)
}
// TestProcessSendQueues_ServerFallback verifies that when the first server is
// unreachable, the second server is tried successfully in the same pass.
func TestProcessSendQueues_ServerFallback(t *testing.T) {
dir := t.TempDir()
var received int64
good := acceptServer(t, &received)
defer good.Close()
msgPath := writeMsgFile(t, dir, "msg")
pushJob(t, dir, "q1", msgPath, serverSlice(closedServerURL(t), good.URL), 10)
_, id, err := client.PeekSendJob(dir, "q1")
require.NoError(t, err)
ProcessSendQueues(dir)
assert.Equal(t, int64(1), atomic.LoadInt64(&received))
job, err := client.GetSendJob(dir, "q1", id)
require.NoError(t, err)
require.NotNil(t, job)
assert.Equal(t, client.SendStatusSent, job.Status)
require.NotNil(t, job.SuccessfulServer)
assert.Equal(t, 1, *job.SuccessfulServer, "second server should be recorded as successful")
}
// TestProcessSendQueues_AllServersExhausted verifies that after maxRetriesPerServer
// failed attempts per server the job is marked as failed.
func TestProcessSendQueues_AllServersExhausted(t *testing.T) {
dir := t.TempDir()
deadURL := closedServerURL(t)
msgPath := writeMsgFile(t, dir, "msg")
pushJob(t, dir, "q1", msgPath, serverSlice(deadURL), 0)
_, id, err := client.PeekSendJob(dir, "q1")
require.NoError(t, err)
// Each call to ProcessSendQueues increments the retry counter by 1.
// After maxRetriesPerServer calls, all servers are exhausted → failed.
for i := 0; i < maxRetriesPerServer; i++ {
ProcessSendQueues(dir)
}
job, err := client.GetSendJob(dir, "q1", id)
require.NoError(t, err)
require.NotNil(t, job)
assert.Equal(t, client.SendStatusFailed, job.Status)
assert.Equal(t, maxRetriesPerServer, job.Retries[0])
}
// TestProcessSendQueues_JobTimeout verifies that a job whose timeout has elapsed
// is immediately marked as failed without any send attempt.
func TestProcessSendQueues_JobTimeout(t *testing.T) {
dir := t.TempDir()
var received int64
srv := acceptServer(t, &received)
defer srv.Close()
msgPath := writeMsgFile(t, dir, "msg")
// Timeout of 1 second; we will backdate inserted_at so the job looks expired.
pushJob(t, dir, "q1", msgPath, serverSlice(srv.URL), 1)
_, id, err := client.PeekSendJob(dir, "q1")
require.NoError(t, err)
// Backdate inserted_at by 60 seconds directly in the DB.
dbPath := filepath.Join(dir, "queues", "q1")
backdateJob(t, dbPath, id, -60*time.Second)
ProcessSendQueues(dir)
assert.Equal(t, int64(0), atomic.LoadInt64(&received), "no send should be attempted for an expired job")
job, err := client.GetSendJob(dir, "q1", id)
require.NoError(t, err)
require.NotNil(t, job)
assert.Equal(t, client.SendStatusFailed, job.Status)
}
// TestProcessSendQueues_MultipleQueues verifies that jobs in different queue
// files are processed concurrently and independently.
func TestProcessSendQueues_MultipleQueues(t *testing.T) {
dir := t.TempDir()
var received int64
srv := acceptServer(t, &received)
defer srv.Close()
for _, q := range []string{"qa", "qb", "qc"} {
msgPath := writeMsgFile(t, dir, "msg_"+q)
pushJob(t, dir, q, msgPath, serverSlice(srv.URL), 10)
}
ProcessSendQueues(dir)
assert.Equal(t, int64(3), atomic.LoadInt64(&received), "all three queues should have delivered their message")
}
// backdateJob opens the SQLite file directly and shifts inserted_at by delta.
// This lets tests simulate elapsed time without sleeping.
func backdateJob(t *testing.T, dbPath string, id int64, delta time.Duration) {
t.Helper()
db, err := sql.Open("sqlite3", dbPath)
require.NoError(t, err)
defer db.Close()
newTs := time.Now().Add(delta).Unix()
_, err = db.Exec("UPDATE queue SET inserted_at = ? WHERE id = ?", newTs, id)
require.NoError(t, err)
}