package helpers import ( "database/sql" "net/http" "net/http/httptest" "os" "path/filepath" "sync/atomic" "testing" "time" "forge.redroom.link/yves/meowlib" "forge.redroom.link/yves/meowlib/client" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" _ "github.com/mattn/go-sqlite3" ) // --- test helpers ------------------------------------------------------- // acceptServer starts an httptest server that counts received POST /msg requests. func acceptServer(t *testing.T, received *int64) *httptest.Server { t.Helper() return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { atomic.AddInt64(received, 1) w.WriteHeader(http.StatusOK) })) } // closedServerURL starts and immediately closes an httptest server so its URL // causes "connection refused" without any wait. func closedServerURL(t *testing.T) string { t.Helper() srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) srv.Close() return srv.URL } // writeMsgFile writes a valid serialised empty PackedUserMessage to a temp file // and returns the path. The file content satisfies proto.Unmarshal inside // attemptSendJob; the httptest endpoints ignore the encrypted payload. func writeMsgFile(t *testing.T, dir, name string) string { t.Helper() p := filepath.Join(dir, name) data, err := proto.Marshal(&meowlib.PackedUserMessage{}) require.NoError(t, err) require.NoError(t, os.WriteFile(p, data, 0600)) return p } // newTestServer creates a client.Server for the given URL, generates a // throwaway keypair so that AsymEncryptMessage succeeds, and stores the server // in the current identity's MessageServers so that PackMessageForServer can // look it up via LoadServer. Returns the registered server. // // Call setupMsgHelperConfig before this so an identity is in place. func newTestServer(t *testing.T, url string) client.Server { t.Helper() srv, err := client.CreateServerFromUrl(url) require.NoError(t, err) kp, err := meowlib.NewKeyPair() require.NoError(t, err) srv.PublicKey = kp.Public require.NoError(t, client.GetConfig().GetIdentity().MessageServers.StoreServer(srv)) return *srv } // pushJob is a convenience wrapper around client.PushSendJob. func pushJob(t *testing.T, dir, queue, file string, servers []client.Server, timeout int) { t.Helper() require.NoError(t, client.PushSendJob(dir, &client.SendJob{ Queue: queue, File: file, Servers: servers, Timeout: timeout, })) } // serverSlice builds a []client.Server from plain URLs. func serverSlice(urls ...string) []client.Server { out := make([]client.Server, len(urls)) for i, u := range urls { out[i] = client.Server{Url: u} } return out } // --- unit tests --------------------------------------------------------- // TestAttemptSendJob_Success verifies a successful POST to the first server. func TestAttemptSendJob_Success(t *testing.T) { dir, _ := setupMsgHelperConfig(t) var received int64 srv := acceptServer(t, &received) defer srv.Close() newTestServer(t, srv.URL) job := &client.SendJob{ File: writeMsgFile(t, dir, "msg"), Servers: serverSlice(srv.URL), Timeout: 5, } retries := make([]int, len(job.Servers)) idx, err := attemptSendJob(job, retries) require.NoError(t, err) assert.Equal(t, 0, idx) assert.Equal(t, int64(1), atomic.LoadInt64(&received)) } // TestAttemptSendJob_Fallback verifies that when the first server refuses the // connection, the second server is tried and succeeds. func TestAttemptSendJob_Fallback(t *testing.T) { dir, _ := setupMsgHelperConfig(t) var received int64 good := acceptServer(t, &received) defer good.Close() deadURL := closedServerURL(t) newTestServer(t, deadURL) newTestServer(t, good.URL) job := &client.SendJob{ File: writeMsgFile(t, dir, "msg"), Servers: serverSlice(deadURL, good.URL), Timeout: 5, } retries := make([]int, len(job.Servers)) idx, err := attemptSendJob(job, retries) require.NoError(t, err) assert.Equal(t, 1, idx, "second server should have been used") assert.Equal(t, int64(1), atomic.LoadInt64(&received)) assert.Equal(t, 1, retries[0], "first server retry should be incremented") assert.Equal(t, 0, retries[1], "second server retry must stay at zero") } // TestAttemptSendJob_AllFail verifies that all retry counts are incremented // and an error is returned when every server refuses connections. func TestAttemptSendJob_AllFail(t *testing.T) { dir, _ := setupMsgHelperConfig(t) dead1 := closedServerURL(t) dead2 := closedServerURL(t) newTestServer(t, dead1) newTestServer(t, dead2) job := &client.SendJob{ File: writeMsgFile(t, dir, "msg"), Servers: serverSlice(dead1, dead2), Timeout: 5, } retries := make([]int, len(job.Servers)) _, err := attemptSendJob(job, retries) assert.Error(t, err) assert.Equal(t, 1, retries[0]) assert.Equal(t, 1, retries[1]) } // TestAttemptSendJob_SkipsExhaustedServer verifies that a server already at // maxRetriesPerServer is not contacted. func TestAttemptSendJob_SkipsExhaustedServer(t *testing.T) { dir, _ := setupMsgHelperConfig(t) var received int64 good := acceptServer(t, &received) defer good.Close() deadURL := closedServerURL(t) newTestServer(t, good.URL) // only good server needs to be reachable job := &client.SendJob{ File: writeMsgFile(t, dir, "msg"), Servers: serverSlice( deadURL, // exhausted – must be skipped (no need to store in identity) good.URL, ), Timeout: 5, } retries := []int{maxRetriesPerServer, 0} // first server already exhausted this run idx, err := attemptSendJob(job, retries) require.NoError(t, err) assert.Equal(t, 1, idx) assert.Equal(t, int64(1), atomic.LoadInt64(&received)) } // --- integration tests -------------------------------------------------- // TestWriteSendJob verifies the thin WriteSendJob wrapper enqueues the job. func TestWriteSendJob(t *testing.T) { dir := t.TempDir() err := WriteSendJob(dir, &client.SendJob{ Queue: "q1", File: "/tmp/f", Servers: serverSlice("http://s1"), }) require.NoError(t, err) got, _, err := client.PeekSendJob(dir, "q1") require.NoError(t, err) require.NotNil(t, got) assert.Equal(t, "/tmp/f", got.File) } // TestProcessSendQueues_Success verifies that a pending job is delivered and // marked as sent when the server accepts it. func TestProcessSendQueues_Success(t *testing.T) { dir, _ := setupMsgHelperConfig(t) var received int64 srv := acceptServer(t, &received) defer srv.Close() newTestServer(t, srv.URL) msgPath := writeMsgFile(t, dir, "msg") pushJob(t, dir, "q1", msgPath, serverSlice(srv.URL), 10) // grab the ID before processing so we can inspect the row afterward _, id, err := client.PeekSendJob(dir, "q1") require.NoError(t, err) ProcessSendQueues(dir) assert.Equal(t, int64(1), atomic.LoadInt64(&received), "server should have received exactly one message") job, err := client.GetSendJob(dir, "q1", id) require.NoError(t, err) require.NotNil(t, job) assert.Equal(t, client.SendStatusSent, job.Status) assert.NotNil(t, job.SentAt) require.NotNil(t, job.SuccessfulServer) assert.Equal(t, 0, *job.SuccessfulServer) } // TestProcessSendQueues_ServerFallback verifies that when the first server is // unreachable, the second server is tried successfully in the same pass. func TestProcessSendQueues_ServerFallback(t *testing.T) { dir, _ := setupMsgHelperConfig(t) var received int64 good := acceptServer(t, &received) defer good.Close() deadURL := closedServerURL(t) newTestServer(t, deadURL) newTestServer(t, good.URL) msgPath := writeMsgFile(t, dir, "msg") pushJob(t, dir, "q1", msgPath, serverSlice(deadURL, good.URL), 10) _, id, err := client.PeekSendJob(dir, "q1") require.NoError(t, err) ProcessSendQueues(dir) assert.Equal(t, int64(1), atomic.LoadInt64(&received)) job, err := client.GetSendJob(dir, "q1", id) require.NoError(t, err) require.NotNil(t, job) assert.Equal(t, client.SendStatusSent, job.Status) require.NotNil(t, job.SuccessfulServer) assert.Equal(t, 1, *job.SuccessfulServer, "second server should be recorded as successful") } // TestProcessSendQueues_FailedRunsStayPending verifies that repeated delivery // failures do NOT mark a job as permanently failed. Only a TTL timeout can do // that; retry exhaustion merely stops the current run. func TestProcessSendQueues_FailedRunsStayPending(t *testing.T) { dir, _ := setupMsgHelperConfig(t) deadURL := closedServerURL(t) newTestServer(t, deadURL) msgPath := writeMsgFile(t, dir, "msg") // timeout=0 → uses defaultSendTimeout (24 h), so the job won't expire here. pushJob(t, dir, "q1", msgPath, serverSlice(deadURL), 0) _, id, err := client.PeekSendJob(dir, "q1") require.NoError(t, err) // Run several times – per-server retry counts reset each run, so the job // must remain pending no matter how many runs fail. for i := 0; i < maxRetriesPerServer+2; i++ { ProcessSendQueues(dir) } job, err := client.GetSendJob(dir, "q1", id) require.NoError(t, err) require.NotNil(t, job) assert.Equal(t, client.SendStatusPending, job.Status, "repeated failures must not cause permanent failure – only timeout does") } // TestProcessSendQueues_JobTimeout verifies that a job whose timeout has elapsed // is immediately marked as failed without any send attempt. func TestProcessSendQueues_JobTimeout(t *testing.T) { dir, _ := setupMsgHelperConfig(t) var received int64 srv := acceptServer(t, &received) defer srv.Close() newTestServer(t, srv.URL) msgPath := writeMsgFile(t, dir, "msg") // Timeout of 1 second; we will backdate inserted_at so the job looks expired. pushJob(t, dir, "q1", msgPath, serverSlice(srv.URL), 1) _, id, err := client.PeekSendJob(dir, "q1") require.NoError(t, err) // Backdate inserted_at by 60 seconds directly in the DB. dbPath := filepath.Join(dir, "queues", "q1") backdateJob(t, dbPath, id, -60*time.Second) ProcessSendQueues(dir) assert.Equal(t, int64(0), atomic.LoadInt64(&received), "no send should be attempted for an expired job") job, err := client.GetSendJob(dir, "q1", id) require.NoError(t, err) require.NotNil(t, job) assert.Equal(t, client.SendStatusFailed, job.Status) } // TestProcessSendQueues_MultipleQueues verifies that jobs in different queue // files are processed concurrently and independently. func TestProcessSendQueues_MultipleQueues(t *testing.T) { dir, _ := setupMsgHelperConfig(t) var received int64 srv := acceptServer(t, &received) defer srv.Close() newTestServer(t, srv.URL) for _, q := range []string{"qa", "qb", "qc"} { msgPath := writeMsgFile(t, dir, "msg_"+q) pushJob(t, dir, q, msgPath, serverSlice(srv.URL), 10) } // Concurrent goroutines for each queue all try to open the same BadgerDB for // server lookup; only one can hold the lock at a time. Jobs that lose the // race stay pending and are retried on the next call. Three passes guarantee // every queue gets at least one uncontested turn. for i := 0; i < 3; i++ { ProcessSendQueues(dir) } assert.Equal(t, int64(3), atomic.LoadInt64(&received), "all three queues should have delivered their message") } // backdateJob opens the SQLite file directly and shifts inserted_at by delta. // This lets tests simulate elapsed time without sleeping. func backdateJob(t *testing.T, dbPath string, id int64, delta time.Duration) { t.Helper() db, err := sql.Open("sqlite3", dbPath) require.NoError(t, err) defer db.Close() newTs := time.Now().Add(delta).Unix() _, err = db.Exec("UPDATE queue SET inserted_at = ? WHERE id = ?", newTs, id) require.NoError(t, err) }