Files
meowlib/client/sendjobs_test.go

185 lines
4.8 KiB
Go
Raw Normal View History

2026-02-26 18:50:46 +01:00
package client
import (
"os"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// helpers ----------------------------------------------------------------
func makeServers(urls ...string) []Server {
out := make([]Server, len(urls))
for i, u := range urls {
out[i] = Server{Url: u}
}
return out
}
func pushJob(t *testing.T, dir, queue, file string, servers []Server, timeout int) {
t.Helper()
require.NoError(t, PushSendJob(dir, &SendJob{
Queue: queue,
File: file,
Servers: servers,
Timeout: timeout,
}))
}
// tests ------------------------------------------------------------------
func TestPushAndPeekSendJob(t *testing.T) {
dir := t.TempDir()
servers := makeServers("http://s1.example", "http://s2.example")
pushJob(t, dir, "q1", "/tmp/msg", servers, 60)
got, id, err := PeekSendJob(dir, "q1")
require.NoError(t, err)
require.NotNil(t, got)
assert.Greater(t, id, int64(0))
assert.Equal(t, "/tmp/msg", got.File)
assert.Equal(t, 60, got.Timeout)
assert.Equal(t, SendStatusPending, got.Status)
assert.Nil(t, got.SentAt)
assert.Nil(t, got.SuccessfulServer)
assert.Len(t, got.Retries, 2)
assert.Equal(t, 0, got.Retries[0])
assert.Equal(t, 0, got.Retries[1])
assert.WithinDuration(t, time.Now(), got.InsertedAt, 5*time.Second)
}
func TestPeekSendJob_EmptyQueue(t *testing.T) {
dir := t.TempDir()
got, id, err := PeekSendJob(dir, "empty")
require.NoError(t, err)
assert.Nil(t, got)
assert.Equal(t, int64(0), id)
}
func TestPeekSendJob_OldestFirst(t *testing.T) {
dir := t.TempDir()
for _, f := range []string{"/a", "/b", "/c"} {
pushJob(t, dir, "q1", f, makeServers("http://s1"), 0)
}
got, _, err := PeekSendJob(dir, "q1")
require.NoError(t, err)
require.NotNil(t, got)
assert.Equal(t, "/a", got.File)
}
func TestPeekSendJob_SkipsNonPending(t *testing.T) {
dir := t.TempDir()
for _, f := range []string{"/a", "/b", "/c"} {
pushJob(t, dir, "q1", f, makeServers("http://s1"), 0)
}
// mark first as sent
first, _, err := PeekSendJob(dir, "q1")
require.NoError(t, err)
first.Status = SendStatusSent
require.NoError(t, UpdateSendJob(dir, "q1", first))
// mark second as failed
second, _, err := PeekSendJob(dir, "q1")
require.NoError(t, err)
second.Status = SendStatusFailed
require.NoError(t, UpdateSendJob(dir, "q1", second))
// only /c is still pending
got, _, err := PeekSendJob(dir, "q1")
require.NoError(t, err)
require.NotNil(t, got)
assert.Equal(t, "/c", got.File)
}
func TestUpdateSendJob_Sent(t *testing.T) {
dir := t.TempDir()
pushJob(t, dir, "q1", "/tmp/f", makeServers("http://s1"), 10)
job, id, err := PeekSendJob(dir, "q1")
require.NoError(t, err)
require.NotNil(t, job)
now := time.Now()
srvIdx := 0
job.Status = SendStatusSent
job.SentAt = &now
job.SuccessfulServer = &srvIdx
require.NoError(t, UpdateSendJob(dir, "q1", job))
// persisted correctly
got, err := GetSendJob(dir, "q1", id)
require.NoError(t, err)
require.NotNil(t, got)
assert.Equal(t, SendStatusSent, got.Status)
assert.NotNil(t, got.SentAt)
assert.WithinDuration(t, now, *got.SentAt, time.Second)
require.NotNil(t, got.SuccessfulServer)
assert.Equal(t, 0, *got.SuccessfulServer)
// no more pending jobs
pending, _, err := PeekSendJob(dir, "q1")
require.NoError(t, err)
assert.Nil(t, pending)
}
func TestUpdateSendJob_Retries(t *testing.T) {
dir := t.TempDir()
pushJob(t, dir, "q1", "/tmp/f", makeServers("http://s1", "http://s2"), 10)
job, id, err := PeekSendJob(dir, "q1")
require.NoError(t, err)
require.NotNil(t, job)
job.Retries[0] = 2
require.NoError(t, UpdateSendJob(dir, "q1", job))
got, err := GetSendJob(dir, "q1", id)
require.NoError(t, err)
require.NotNil(t, got)
assert.Equal(t, SendStatusPending, got.Status) // still pending
assert.Equal(t, 2, got.Retries[0])
assert.Equal(t, 0, got.Retries[1])
}
func TestGetSendJob_NotFound(t *testing.T) {
dir := t.TempDir()
pushJob(t, dir, "q1", "/tmp/f", makeServers("http://s1"), 0)
got, err := GetSendJob(dir, "q1", 9999)
require.NoError(t, err)
assert.Nil(t, got)
}
func TestDeleteSendJob_KeepsDbWhenNotEmpty(t *testing.T) {
dir := t.TempDir()
pushJob(t, dir, "q1", "/a", makeServers("http://s1"), 0)
pushJob(t, dir, "q1", "/b", makeServers("http://s1"), 0)
_, id, err := PeekSendJob(dir, "q1")
require.NoError(t, err)
require.NoError(t, DeleteSendJob(dir, "q1", id))
// DB file must still exist (second row remains)
_, statErr := os.Stat(filepath.Join(dir, "queues", "q1"))
require.NoError(t, statErr)
}
func TestDeleteSendJob_RemovesDbWhenEmpty(t *testing.T) {
dir := t.TempDir()
pushJob(t, dir, "q1", "/a", makeServers("http://s1"), 0)
_, id, err := PeekSendJob(dir, "q1")
require.NoError(t, err)
require.NoError(t, DeleteSendJob(dir, "q1", id))
_, statErr := os.Stat(filepath.Join(dir, "queues", "q1"))
assert.True(t, os.IsNotExist(statErr), "DB file should be removed when queue is empty")
}