package helpers import ( "database/sql" "net/http" "net/http/httptest" "os" "path/filepath" "sync/atomic" "testing" "time" "forge.redroom.link/yves/meowlib/client" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" _ "github.com/mattn/go-sqlite3" ) // --- test helpers ------------------------------------------------------- // acceptServer starts an httptest server that counts received POST /msg requests. func acceptServer(t *testing.T, received *int64) *httptest.Server { t.Helper() return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { atomic.AddInt64(received, 1) w.WriteHeader(http.StatusOK) })) } // closedServerURL starts and immediately closes an httptest server so its URL // causes "connection refused" without any wait. func closedServerURL(t *testing.T) string { t.Helper() srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) srv.Close() return srv.URL } // writeMsgFile writes dummy bytes to a temp file and returns the path. func writeMsgFile(t *testing.T, dir, name string) string { t.Helper() p := filepath.Join(dir, name) require.NoError(t, os.WriteFile(p, []byte("packed-server-message"), 0600)) return p } // pushJob is a convenience wrapper around client.PushSendJob. func pushJob(t *testing.T, dir, queue, file string, servers []client.Server, timeout int) { t.Helper() require.NoError(t, client.PushSendJob(dir, &client.SendJob{ Queue: queue, File: file, Servers: servers, Timeout: timeout, })) } // serverSlice builds a []client.Server from plain URLs. func serverSlice(urls ...string) []client.Server { out := make([]client.Server, len(urls)) for i, u := range urls { out[i] = client.Server{Url: u} } return out } // --- unit tests --------------------------------------------------------- func TestAllServersExhausted_NoServers(t *testing.T) { job := &client.SendJob{} assert.True(t, allServersExhausted(job)) } func TestAllServersExhausted_NoneExhausted(t *testing.T) { job := &client.SendJob{ Servers: serverSlice("http://s1", "http://s2"), Retries: []int{0, 0}, } assert.False(t, allServersExhausted(job)) } func TestAllServersExhausted_PartiallyExhausted(t *testing.T) { job := &client.SendJob{ Servers: serverSlice("http://s1", "http://s2"), Retries: []int{maxRetriesPerServer, 0}, } assert.False(t, allServersExhausted(job)) } func TestAllServersExhausted_AllExhausted(t *testing.T) { job := &client.SendJob{ Servers: serverSlice("http://s1", "http://s2"), Retries: []int{maxRetriesPerServer, maxRetriesPerServer}, } assert.True(t, allServersExhausted(job)) } // TestAttemptSendJob_Success verifies a successful POST to the first server. func TestAttemptSendJob_Success(t *testing.T) { dir := t.TempDir() var received int64 srv := acceptServer(t, &received) defer srv.Close() job := &client.SendJob{ File: writeMsgFile(t, dir, "msg"), Servers: serverSlice(srv.URL), Timeout: 5, Retries: []int{0}, } idx, err := attemptSendJob(job) require.NoError(t, err) assert.Equal(t, 0, idx) assert.Equal(t, int64(1), atomic.LoadInt64(&received)) } // TestAttemptSendJob_Fallback verifies that when the first server refuses the // connection, the second server is tried and succeeds. func TestAttemptSendJob_Fallback(t *testing.T) { dir := t.TempDir() var received int64 good := acceptServer(t, &received) defer good.Close() job := &client.SendJob{ File: writeMsgFile(t, dir, "msg"), Servers: serverSlice(closedServerURL(t), good.URL), Timeout: 5, Retries: []int{0, 0}, } idx, err := attemptSendJob(job) require.NoError(t, err) assert.Equal(t, 1, idx, "second server should have been used") assert.Equal(t, int64(1), atomic.LoadInt64(&received)) assert.Equal(t, 1, job.Retries[0], "first server retry should be incremented") assert.Equal(t, 0, job.Retries[1], "second server retry must stay at zero") } // TestAttemptSendJob_AllFail verifies that all retry counts are incremented // and an error is returned when every server refuses connections. func TestAttemptSendJob_AllFail(t *testing.T) { dir := t.TempDir() job := &client.SendJob{ File: writeMsgFile(t, dir, "msg"), Servers: serverSlice(closedServerURL(t), closedServerURL(t)), Timeout: 5, Retries: []int{0, 0}, } _, err := attemptSendJob(job) assert.Error(t, err) assert.Equal(t, 1, job.Retries[0]) assert.Equal(t, 1, job.Retries[1]) } // TestAttemptSendJob_SkipsExhaustedServer verifies that a server already at // maxRetriesPerServer is not contacted. func TestAttemptSendJob_SkipsExhaustedServer(t *testing.T) { dir := t.TempDir() var received int64 good := acceptServer(t, &received) defer good.Close() job := &client.SendJob{ File: writeMsgFile(t, dir, "msg"), Servers: serverSlice( closedServerURL(t), // exhausted – must be skipped good.URL, ), Timeout: 5, Retries: []int{maxRetriesPerServer, 0}, } idx, err := attemptSendJob(job) require.NoError(t, err) assert.Equal(t, 1, idx) assert.Equal(t, int64(1), atomic.LoadInt64(&received)) } // --- integration tests -------------------------------------------------- // TestWriteSendJob verifies the thin WriteSendJob wrapper enqueues the job. func TestWriteSendJob(t *testing.T) { dir := t.TempDir() err := WriteSendJob(dir, &client.SendJob{ Queue: "q1", File: "/tmp/f", Servers: serverSlice("http://s1"), }) require.NoError(t, err) got, _, err := client.PeekSendJob(dir, "q1") require.NoError(t, err) require.NotNil(t, got) assert.Equal(t, "/tmp/f", got.File) } // TestProcessSendQueues_Success verifies that a pending job is delivered and // marked as sent when the server accepts it. func TestProcessSendQueues_Success(t *testing.T) { dir := t.TempDir() var received int64 srv := acceptServer(t, &received) defer srv.Close() msgPath := writeMsgFile(t, dir, "msg") pushJob(t, dir, "q1", msgPath, serverSlice(srv.URL), 10) // grab the ID before processing so we can inspect the row afterward _, id, err := client.PeekSendJob(dir, "q1") require.NoError(t, err) ProcessSendQueues(dir) assert.Equal(t, int64(1), atomic.LoadInt64(&received), "server should have received exactly one message") job, err := client.GetSendJob(dir, "q1", id) require.NoError(t, err) require.NotNil(t, job) assert.Equal(t, client.SendStatusSent, job.Status) assert.NotNil(t, job.SentAt) require.NotNil(t, job.SuccessfulServer) assert.Equal(t, 0, *job.SuccessfulServer) } // TestProcessSendQueues_ServerFallback verifies that when the first server is // unreachable, the second server is tried successfully in the same pass. func TestProcessSendQueues_ServerFallback(t *testing.T) { dir := t.TempDir() var received int64 good := acceptServer(t, &received) defer good.Close() msgPath := writeMsgFile(t, dir, "msg") pushJob(t, dir, "q1", msgPath, serverSlice(closedServerURL(t), good.URL), 10) _, id, err := client.PeekSendJob(dir, "q1") require.NoError(t, err) ProcessSendQueues(dir) assert.Equal(t, int64(1), atomic.LoadInt64(&received)) job, err := client.GetSendJob(dir, "q1", id) require.NoError(t, err) require.NotNil(t, job) assert.Equal(t, client.SendStatusSent, job.Status) require.NotNil(t, job.SuccessfulServer) assert.Equal(t, 1, *job.SuccessfulServer, "second server should be recorded as successful") } // TestProcessSendQueues_AllServersExhausted verifies that after maxRetriesPerServer // failed attempts per server the job is marked as failed. func TestProcessSendQueues_AllServersExhausted(t *testing.T) { dir := t.TempDir() deadURL := closedServerURL(t) msgPath := writeMsgFile(t, dir, "msg") pushJob(t, dir, "q1", msgPath, serverSlice(deadURL), 0) _, id, err := client.PeekSendJob(dir, "q1") require.NoError(t, err) // Each call to ProcessSendQueues increments the retry counter by 1. // After maxRetriesPerServer calls, all servers are exhausted → failed. for i := 0; i < maxRetriesPerServer; i++ { ProcessSendQueues(dir) } job, err := client.GetSendJob(dir, "q1", id) require.NoError(t, err) require.NotNil(t, job) assert.Equal(t, client.SendStatusFailed, job.Status) assert.Equal(t, maxRetriesPerServer, job.Retries[0]) } // TestProcessSendQueues_JobTimeout verifies that a job whose timeout has elapsed // is immediately marked as failed without any send attempt. func TestProcessSendQueues_JobTimeout(t *testing.T) { dir := t.TempDir() var received int64 srv := acceptServer(t, &received) defer srv.Close() msgPath := writeMsgFile(t, dir, "msg") // Timeout of 1 second; we will backdate inserted_at so the job looks expired. pushJob(t, dir, "q1", msgPath, serverSlice(srv.URL), 1) _, id, err := client.PeekSendJob(dir, "q1") require.NoError(t, err) // Backdate inserted_at by 60 seconds directly in the DB. dbPath := filepath.Join(dir, "queues", "q1") backdateJob(t, dbPath, id, -60*time.Second) ProcessSendQueues(dir) assert.Equal(t, int64(0), atomic.LoadInt64(&received), "no send should be attempted for an expired job") job, err := client.GetSendJob(dir, "q1", id) require.NoError(t, err) require.NotNil(t, job) assert.Equal(t, client.SendStatusFailed, job.Status) } // TestProcessSendQueues_MultipleQueues verifies that jobs in different queue // files are processed concurrently and independently. func TestProcessSendQueues_MultipleQueues(t *testing.T) { dir := t.TempDir() var received int64 srv := acceptServer(t, &received) defer srv.Close() for _, q := range []string{"qa", "qb", "qc"} { msgPath := writeMsgFile(t, dir, "msg_"+q) pushJob(t, dir, q, msgPath, serverSlice(srv.URL), 10) } ProcessSendQueues(dir) assert.Equal(t, int64(3), atomic.LoadInt64(&received), "all three queues should have delivered their message") } // backdateJob opens the SQLite file directly and shifts inserted_at by delta. // This lets tests simulate elapsed time without sleeping. func backdateJob(t *testing.T, dbPath string, id int64, delta time.Duration) { t.Helper() db, err := sql.Open("sqlite3", dbPath) require.NoError(t, err) defer db.Close() newTs := time.Now().Add(delta).Unix() _, err = db.Exec("UPDATE queue SET inserted_at = ? WHERE id = ?", newTs, id) require.NoError(t, err) }