package client import ( "os" "path/filepath" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) // helpers ---------------------------------------------------------------- func makeServers(urls ...string) []Server { out := make([]Server, len(urls)) for i, u := range urls { out[i] = Server{Url: u} } return out } func pushJob(t *testing.T, dir, queue, file string, servers []Server, timeout int) { t.Helper() require.NoError(t, PushSendJob(dir, &SendJob{ Queue: queue, File: file, Servers: servers, Timeout: timeout, })) } // tests ------------------------------------------------------------------ func TestPushAndPeekSendJob(t *testing.T) { dir := t.TempDir() servers := makeServers("http://s1.example", "http://s2.example") pushJob(t, dir, "q1", "/tmp/msg", servers, 60) got, id, err := PeekSendJob(dir, "q1") require.NoError(t, err) require.NotNil(t, got) assert.Greater(t, id, int64(0)) assert.Equal(t, "/tmp/msg", got.File) assert.Equal(t, 60, got.Timeout) assert.Equal(t, SendStatusPending, got.Status) assert.Nil(t, got.SentAt) assert.Nil(t, got.SuccessfulServer) assert.Len(t, got.Retries, 2) assert.Equal(t, 0, got.Retries[0]) assert.Equal(t, 0, got.Retries[1]) assert.WithinDuration(t, time.Now(), got.InsertedAt, 5*time.Second) } func TestPeekSendJob_EmptyQueue(t *testing.T) { dir := t.TempDir() got, id, err := PeekSendJob(dir, "empty") require.NoError(t, err) assert.Nil(t, got) assert.Equal(t, int64(0), id) } func TestPeekSendJob_OldestFirst(t *testing.T) { dir := t.TempDir() for _, f := range []string{"/a", "/b", "/c"} { pushJob(t, dir, "q1", f, makeServers("http://s1"), 0) } got, _, err := PeekSendJob(dir, "q1") require.NoError(t, err) require.NotNil(t, got) assert.Equal(t, "/a", got.File) } func TestPeekSendJob_SkipsNonPending(t *testing.T) { dir := t.TempDir() for _, f := range []string{"/a", "/b", "/c"} { pushJob(t, dir, "q1", f, makeServers("http://s1"), 0) } // mark first as sent first, _, err := PeekSendJob(dir, "q1") require.NoError(t, err) first.Status = SendStatusSent require.NoError(t, UpdateSendJob(dir, "q1", first)) // mark second as failed second, _, err := PeekSendJob(dir, "q1") require.NoError(t, err) second.Status = SendStatusFailed require.NoError(t, UpdateSendJob(dir, "q1", second)) // only /c is still pending got, _, err := PeekSendJob(dir, "q1") require.NoError(t, err) require.NotNil(t, got) assert.Equal(t, "/c", got.File) } func TestUpdateSendJob_Sent(t *testing.T) { dir := t.TempDir() pushJob(t, dir, "q1", "/tmp/f", makeServers("http://s1"), 10) job, id, err := PeekSendJob(dir, "q1") require.NoError(t, err) require.NotNil(t, job) now := time.Now() srvIdx := 0 job.Status = SendStatusSent job.SentAt = &now job.SuccessfulServer = &srvIdx require.NoError(t, UpdateSendJob(dir, "q1", job)) // persisted correctly got, err := GetSendJob(dir, "q1", id) require.NoError(t, err) require.NotNil(t, got) assert.Equal(t, SendStatusSent, got.Status) assert.NotNil(t, got.SentAt) assert.WithinDuration(t, now, *got.SentAt, time.Second) require.NotNil(t, got.SuccessfulServer) assert.Equal(t, 0, *got.SuccessfulServer) // no more pending jobs pending, _, err := PeekSendJob(dir, "q1") require.NoError(t, err) assert.Nil(t, pending) } func TestUpdateSendJob_Retries(t *testing.T) { dir := t.TempDir() pushJob(t, dir, "q1", "/tmp/f", makeServers("http://s1", "http://s2"), 10) job, id, err := PeekSendJob(dir, "q1") require.NoError(t, err) require.NotNil(t, job) job.Retries[0] = 2 require.NoError(t, UpdateSendJob(dir, "q1", job)) got, err := GetSendJob(dir, "q1", id) require.NoError(t, err) require.NotNil(t, got) assert.Equal(t, SendStatusPending, got.Status) // still pending assert.Equal(t, 2, got.Retries[0]) assert.Equal(t, 0, got.Retries[1]) } func TestGetSendJob_NotFound(t *testing.T) { dir := t.TempDir() pushJob(t, dir, "q1", "/tmp/f", makeServers("http://s1"), 0) got, err := GetSendJob(dir, "q1", 9999) require.NoError(t, err) assert.Nil(t, got) } func TestDeleteSendJob_KeepsDbWhenNotEmpty(t *testing.T) { dir := t.TempDir() pushJob(t, dir, "q1", "/a", makeServers("http://s1"), 0) pushJob(t, dir, "q1", "/b", makeServers("http://s1"), 0) _, id, err := PeekSendJob(dir, "q1") require.NoError(t, err) require.NoError(t, DeleteSendJob(dir, "q1", id)) // DB file must still exist (second row remains) _, statErr := os.Stat(filepath.Join(dir, "queues", "q1")) require.NoError(t, statErr) } func TestDeleteSendJob_RemovesDbWhenEmpty(t *testing.T) { dir := t.TempDir() pushJob(t, dir, "q1", "/a", makeServers("http://s1"), 0) _, id, err := PeekSendJob(dir, "q1") require.NoError(t, err) require.NoError(t, DeleteSendJob(dir, "q1", id)) _, statErr := os.Stat(filepath.Join(dir, "queues", "q1")) assert.True(t, os.IsNotExist(statErr), "DB file should be removed when queue is empty") }