185 lines
4.8 KiB
Go
185 lines
4.8 KiB
Go
|
|
package client
|
||
|
|
|
||
|
|
import (
|
||
|
|
"os"
|
||
|
|
"path/filepath"
|
||
|
|
"testing"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"github.com/stretchr/testify/assert"
|
||
|
|
"github.com/stretchr/testify/require"
|
||
|
|
)
|
||
|
|
|
||
|
|
// helpers ----------------------------------------------------------------
|
||
|
|
|
||
|
|
func makeServers(urls ...string) []Server {
|
||
|
|
out := make([]Server, len(urls))
|
||
|
|
for i, u := range urls {
|
||
|
|
out[i] = Server{Url: u}
|
||
|
|
}
|
||
|
|
return out
|
||
|
|
}
|
||
|
|
|
||
|
|
func pushJob(t *testing.T, dir, queue, file string, servers []Server, timeout int) {
|
||
|
|
t.Helper()
|
||
|
|
require.NoError(t, PushSendJob(dir, &SendJob{
|
||
|
|
Queue: queue,
|
||
|
|
File: file,
|
||
|
|
Servers: servers,
|
||
|
|
Timeout: timeout,
|
||
|
|
}))
|
||
|
|
}
|
||
|
|
|
||
|
|
// tests ------------------------------------------------------------------
|
||
|
|
|
||
|
|
func TestPushAndPeekSendJob(t *testing.T) {
|
||
|
|
dir := t.TempDir()
|
||
|
|
servers := makeServers("http://s1.example", "http://s2.example")
|
||
|
|
pushJob(t, dir, "q1", "/tmp/msg", servers, 60)
|
||
|
|
|
||
|
|
got, id, err := PeekSendJob(dir, "q1")
|
||
|
|
require.NoError(t, err)
|
||
|
|
require.NotNil(t, got)
|
||
|
|
|
||
|
|
assert.Greater(t, id, int64(0))
|
||
|
|
assert.Equal(t, "/tmp/msg", got.File)
|
||
|
|
assert.Equal(t, 60, got.Timeout)
|
||
|
|
assert.Equal(t, SendStatusPending, got.Status)
|
||
|
|
assert.Nil(t, got.SentAt)
|
||
|
|
assert.Nil(t, got.SuccessfulServer)
|
||
|
|
assert.Len(t, got.Retries, 2)
|
||
|
|
assert.Equal(t, 0, got.Retries[0])
|
||
|
|
assert.Equal(t, 0, got.Retries[1])
|
||
|
|
assert.WithinDuration(t, time.Now(), got.InsertedAt, 5*time.Second)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestPeekSendJob_EmptyQueue(t *testing.T) {
|
||
|
|
dir := t.TempDir()
|
||
|
|
got, id, err := PeekSendJob(dir, "empty")
|
||
|
|
require.NoError(t, err)
|
||
|
|
assert.Nil(t, got)
|
||
|
|
assert.Equal(t, int64(0), id)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestPeekSendJob_OldestFirst(t *testing.T) {
|
||
|
|
dir := t.TempDir()
|
||
|
|
for _, f := range []string{"/a", "/b", "/c"} {
|
||
|
|
pushJob(t, dir, "q1", f, makeServers("http://s1"), 0)
|
||
|
|
}
|
||
|
|
|
||
|
|
got, _, err := PeekSendJob(dir, "q1")
|
||
|
|
require.NoError(t, err)
|
||
|
|
require.NotNil(t, got)
|
||
|
|
assert.Equal(t, "/a", got.File)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestPeekSendJob_SkipsNonPending(t *testing.T) {
|
||
|
|
dir := t.TempDir()
|
||
|
|
for _, f := range []string{"/a", "/b", "/c"} {
|
||
|
|
pushJob(t, dir, "q1", f, makeServers("http://s1"), 0)
|
||
|
|
}
|
||
|
|
|
||
|
|
// mark first as sent
|
||
|
|
first, _, err := PeekSendJob(dir, "q1")
|
||
|
|
require.NoError(t, err)
|
||
|
|
first.Status = SendStatusSent
|
||
|
|
require.NoError(t, UpdateSendJob(dir, "q1", first))
|
||
|
|
|
||
|
|
// mark second as failed
|
||
|
|
second, _, err := PeekSendJob(dir, "q1")
|
||
|
|
require.NoError(t, err)
|
||
|
|
second.Status = SendStatusFailed
|
||
|
|
require.NoError(t, UpdateSendJob(dir, "q1", second))
|
||
|
|
|
||
|
|
// only /c is still pending
|
||
|
|
got, _, err := PeekSendJob(dir, "q1")
|
||
|
|
require.NoError(t, err)
|
||
|
|
require.NotNil(t, got)
|
||
|
|
assert.Equal(t, "/c", got.File)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestUpdateSendJob_Sent(t *testing.T) {
|
||
|
|
dir := t.TempDir()
|
||
|
|
pushJob(t, dir, "q1", "/tmp/f", makeServers("http://s1"), 10)
|
||
|
|
|
||
|
|
job, id, err := PeekSendJob(dir, "q1")
|
||
|
|
require.NoError(t, err)
|
||
|
|
require.NotNil(t, job)
|
||
|
|
|
||
|
|
now := time.Now()
|
||
|
|
srvIdx := 0
|
||
|
|
job.Status = SendStatusSent
|
||
|
|
job.SentAt = &now
|
||
|
|
job.SuccessfulServer = &srvIdx
|
||
|
|
require.NoError(t, UpdateSendJob(dir, "q1", job))
|
||
|
|
|
||
|
|
// persisted correctly
|
||
|
|
got, err := GetSendJob(dir, "q1", id)
|
||
|
|
require.NoError(t, err)
|
||
|
|
require.NotNil(t, got)
|
||
|
|
assert.Equal(t, SendStatusSent, got.Status)
|
||
|
|
assert.NotNil(t, got.SentAt)
|
||
|
|
assert.WithinDuration(t, now, *got.SentAt, time.Second)
|
||
|
|
require.NotNil(t, got.SuccessfulServer)
|
||
|
|
assert.Equal(t, 0, *got.SuccessfulServer)
|
||
|
|
|
||
|
|
// no more pending jobs
|
||
|
|
pending, _, err := PeekSendJob(dir, "q1")
|
||
|
|
require.NoError(t, err)
|
||
|
|
assert.Nil(t, pending)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestUpdateSendJob_Retries(t *testing.T) {
|
||
|
|
dir := t.TempDir()
|
||
|
|
pushJob(t, dir, "q1", "/tmp/f", makeServers("http://s1", "http://s2"), 10)
|
||
|
|
|
||
|
|
job, id, err := PeekSendJob(dir, "q1")
|
||
|
|
require.NoError(t, err)
|
||
|
|
require.NotNil(t, job)
|
||
|
|
|
||
|
|
job.Retries[0] = 2
|
||
|
|
require.NoError(t, UpdateSendJob(dir, "q1", job))
|
||
|
|
|
||
|
|
got, err := GetSendJob(dir, "q1", id)
|
||
|
|
require.NoError(t, err)
|
||
|
|
require.NotNil(t, got)
|
||
|
|
assert.Equal(t, SendStatusPending, got.Status) // still pending
|
||
|
|
assert.Equal(t, 2, got.Retries[0])
|
||
|
|
assert.Equal(t, 0, got.Retries[1])
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestGetSendJob_NotFound(t *testing.T) {
|
||
|
|
dir := t.TempDir()
|
||
|
|
pushJob(t, dir, "q1", "/tmp/f", makeServers("http://s1"), 0)
|
||
|
|
|
||
|
|
got, err := GetSendJob(dir, "q1", 9999)
|
||
|
|
require.NoError(t, err)
|
||
|
|
assert.Nil(t, got)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestDeleteSendJob_KeepsDbWhenNotEmpty(t *testing.T) {
|
||
|
|
dir := t.TempDir()
|
||
|
|
pushJob(t, dir, "q1", "/a", makeServers("http://s1"), 0)
|
||
|
|
pushJob(t, dir, "q1", "/b", makeServers("http://s1"), 0)
|
||
|
|
|
||
|
|
_, id, err := PeekSendJob(dir, "q1")
|
||
|
|
require.NoError(t, err)
|
||
|
|
require.NoError(t, DeleteSendJob(dir, "q1", id))
|
||
|
|
|
||
|
|
// DB file must still exist (second row remains)
|
||
|
|
_, statErr := os.Stat(filepath.Join(dir, "queues", "q1"))
|
||
|
|
require.NoError(t, statErr)
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestDeleteSendJob_RemovesDbWhenEmpty(t *testing.T) {
|
||
|
|
dir := t.TempDir()
|
||
|
|
pushJob(t, dir, "q1", "/a", makeServers("http://s1"), 0)
|
||
|
|
|
||
|
|
_, id, err := PeekSendJob(dir, "q1")
|
||
|
|
require.NoError(t, err)
|
||
|
|
require.NoError(t, DeleteSendJob(dir, "q1", id))
|
||
|
|
|
||
|
|
_, statErr := os.Stat(filepath.Join(dir, "queues", "q1"))
|
||
|
|
assert.True(t, os.IsNotExist(statErr), "DB file should be removed when queue is empty")
|
||
|
|
}
|