diff --git a/client/helpers/messageHelper.go b/client/helpers/messageHelper.go index cfb1bc5..5566efe 100644 --- a/client/helpers/messageHelper.go +++ b/client/helpers/messageHelper.go @@ -3,8 +3,6 @@ package helpers import ( "os" "path/filepath" - "strconv" - "strings" "time" "forge.redroom.link/yves/meowlib" @@ -28,25 +26,29 @@ func PackMessageForServer(packedMsg *meowlib.PackedUserMessage, srvuid string) ( } func CreateStorePackUserMessageForServer(message string, srvuid string, peer_uid string, replyToUid string, filelist []string) ([]byte, string, error) { - usermessage, errtxt, err := CreateAndStoreUserMessage(message, peer_uid, replyToUid, filelist) + usermessage, _, _, errtxt, err := CreateAndStoreUserMessage(message, peer_uid, replyToUid, filelist) if err != nil { return nil, errtxt, err } return PackMessageForServer(usermessage, srvuid) } -func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid string, filelist []string) (*meowlib.PackedUserMessage, string, error) { +// CreateAndStoreUserMessage creates, stores and packs a user message. +// It returns the packed message, the message DB file UUID, the SQLite row ID, +// an error location string, and the error. The caller should set MessageDbFile +// and MessageDbId on the SendJob from the returned dbFile and dbId values. +func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid string, filelist []string) (packedMsg *meowlib.PackedUserMessage, dbFile string, dbId int64, errTxt string, err error) { peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid) // Creating User message usermessage, err := peer.BuildSimpleUserMessage([]byte(message)) if err != nil { - return nil, "PrepareServerMessage : BuildSimpleUserMessage", err + return nil, "", 0, "PrepareServerMessage : BuildSimpleUserMessage", err } for _, file := range filelist { err = usermessage.AddFile(file, client.GetConfig().Chunksize) if err != nil { - return nil, "PrepareServerMessage : AddFile", err + return nil, "", 0, "PrepareServerMessage : AddFile", err } } usermessage.Status.Sent = uint64(time.Now().UTC().Unix()) @@ -55,16 +57,20 @@ func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid strin // Store message err = peer.StoreMessage(usermessage, nil) if err != nil { - return nil, "messageBuildPostprocess : StoreMessage", err + return nil, "", 0, "messageBuildPostprocess : StoreMessage", err } // Prepare cyphered + packed user message - packedMsg, err := peer.ProcessOutboundUserMessage(usermessage) + packedMsg, err = peer.ProcessOutboundUserMessage(usermessage) if err != nil { - return nil, "messageBuildPostprocess : ProcessOutboundUserMessage", err + return nil, "", 0, "messageBuildPostprocess : ProcessOutboundUserMessage", err } - return packedMsg, "", nil + if peer.LastMessage != nil { + dbFile = peer.LastMessage.Dbfile + dbId = peer.LastMessage.Dbid + } + return packedMsg, dbFile, dbId, "", nil } func BuildAckMessage(messageUid string, srvuid string, peer_uid string, received int64, processed int64) ([]byte, string, error) { @@ -105,10 +111,8 @@ func ReadAckMessageResponse() { // the message storage entry with server delivery info for each sent job, then // removes the job from the queue. Returns the number of messages updated. // -// Callers must follow two conventions when building a SendJob: -// - job.Queue = peer UID (used to look up the peer and its DB files) -// - job.File = a path whose basename without extension is the message's -// SQLite row ID as a decimal integer (e.g. "42.bin") +// Each SendJob must have MessageDbFile and MessageDbId set (populated via +// GetPeerLastMessageDbInfo right after the message is stored). func ProcessSentMessages(storagePath string) int { password, _ := client.GetConfig().GetMemPass() queueDir := filepath.Join(storagePath, "queues") @@ -119,7 +123,6 @@ func ProcessSentMessages(storagePath string) int { } updated := 0 - identity := client.GetConfig().GetIdentity() for _, entry := range entries { if entry.IsDir() { @@ -142,27 +145,19 @@ func ProcessSentMessages(storagePath string) int { continue } - // Resolve the peer from the queue name to get its DB file list - peer := identity.Peers.GetFromUid(queue) - if peer == nil || len(peer.DbIds) == 0 { - logger.Warn().Str("queue", queue).Msg("ProcessSentMessages: peer not found or has no DB") - continue - } - dbFile := peer.DbIds[len(peer.DbIds)-1] - - // Parse the DB row ID from the job file's basename (e.g. "42.bin" → 42) - base := strings.TrimSuffix(filepath.Base(job.File), filepath.Ext(job.File)) - dbId, err := strconv.ParseInt(base, 10, 64) - if err != nil { - logger.Error().Err(err).Str("file", job.File).Msg("ProcessSentMessages: parse dbId from filename") + if job.MessageDbFile == "" || job.MessageDbId == 0 { + logger.Error().Int64("id", job.ID).Str("queue", queue). + Msg("ProcessSentMessages: job missing MessageDbFile/MessageDbId — set them via GetPeerLastMessageDbInfo when building the SendJob") continue } serverUid := job.Servers[*job.SuccessfulServer].GetUid() receiveTime := uint64(job.SentAt.Unix()) - if err := client.SetMessageServerDelivery(dbFile, dbId, serverUid, receiveTime, password); err != nil { - logger.Error().Err(err).Str("queue", queue).Int64("dbId", dbId).Msg("ProcessSentMessages: SetMessageServerDelivery") + if err := client.SetMessageServerDelivery(job.MessageDbFile, job.MessageDbId, serverUid, receiveTime, password); err != nil { + logger.Error().Err(err).Str("queue", queue). + Str("dbFile", job.MessageDbFile).Int64("dbId", job.MessageDbId). + Msg("ProcessSentMessages: SetMessageServerDelivery") continue } if err := client.DeleteSendJob(storagePath, queue, job.ID); err != nil { diff --git a/client/helpers/messageHelper_test.go b/client/helpers/messageHelper_test.go new file mode 100644 index 0000000..85e459c --- /dev/null +++ b/client/helpers/messageHelper_test.go @@ -0,0 +1,301 @@ +package helpers + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "forge.redroom.link/yves/meowlib" + "forge.redroom.link/yves/meowlib/client" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + _ "github.com/mattn/go-sqlite3" +) + +// setupMsgHelperConfig wires the global client.Config singleton to a fresh +// temporary directory and returns it. Original values are restored in t.Cleanup. +func setupMsgHelperConfig(t *testing.T) (dir string, id *client.Identity) { + t.Helper() + dir = t.TempDir() + + cfg := client.GetConfig() + origStorage := cfg.StoragePath + origSuffix := cfg.DbSuffix + origChunk := cfg.Chunksize + + cfg.StoragePath = dir + cfg.DbSuffix = ".sqlite" + cfg.Chunksize = 1024 * 1024 + require.NoError(t, cfg.SetMemPass("testpassword")) + + var err error + id, err = client.CreateIdentity("testuser") + require.NoError(t, err) + + t.Cleanup(func() { + cfg.StoragePath = origStorage + cfg.DbSuffix = origSuffix + cfg.Chunksize = origChunk + }) + return dir, id +} + +// newFullyKeyedPeer returns a Peer with all three keypairs and contact keys set, +// ready to store messages. +func newFullyKeyedPeer(t *testing.T, uid string) *client.Peer { + t.Helper() + var err error + peer := &client.Peer{ + Uid: uid, + Name: "TestPeer-" + uid, + } + peer.MyIdentity, err = meowlib.NewKeyPair() + require.NoError(t, err) + peer.MyEncryptionKp, err = meowlib.NewKeyPair() + require.NoError(t, err) + peer.MyLookupKp, err = meowlib.NewKeyPair() + require.NoError(t, err) + + k, err := meowlib.NewKeyPair() + require.NoError(t, err) + peer.ContactPublicKey = k.Public + + k, err = meowlib.NewKeyPair() + require.NoError(t, err) + peer.ContactEncryption = k.Public + + k, err = meowlib.NewKeyPair() + require.NoError(t, err) + peer.ContactLookupKey = k.Public + + return peer +} + +// storeTestMessage stores a single outbound message for peer. +func storeTestMessage(t *testing.T, peer *client.Peer, text string) { + t.Helper() + um := &meowlib.UserMessage{ + Data: []byte(text), + From: peer.MyIdentity.Public, + Status: &meowlib.ConversationStatus{Uuid: "uuid-" + text}, + } + require.NoError(t, peer.StoreMessage(um, nil)) + require.NotNil(t, peer.LastMessage, "StoreMessage must set LastMessage") +} + +// pushAndMarkSent pushes a send job for the given peer and marks it as delivered +// by the given server. Returns the job after the status update. +// It mirrors the correct app usage: read MessageDbFile/MessageDbId from +// peer.LastMessage right after storing (i.e. from CreateAndStoreUserMessage's +// dbFile/dbId return values). +func pushAndMarkSent(t *testing.T, dir string, peer *client.Peer, srv client.Server) *client.SendJob { + t.Helper() + + require.NotNil(t, peer.LastMessage, "pushAndMarkSent: call storeTestMessage first") + dbFile := peer.LastMessage.Dbfile + dbId := peer.LastMessage.Dbid + + msgFile := filepath.Join(dir, fmt.Sprintf("msg-%d.bin", dbId)) + require.NoError(t, os.WriteFile(msgFile, []byte("packed-server-message"), 0600)) + + require.NoError(t, client.PushSendJob(dir, &client.SendJob{ + Queue: peer.Uid, + File: msgFile, + MessageDbFile: dbFile, + MessageDbId: dbId, + Servers: []client.Server{srv}, + Timeout: 60, + })) + + job, _, err := client.PeekSendJob(dir, peer.Uid) + require.NoError(t, err) + require.NotNil(t, job) + + sentAt := time.Now() + srvIdx := 0 + job.Status = client.SendStatusSent + job.SentAt = &sentAt + job.SuccessfulServer = &srvIdx + require.NoError(t, client.UpdateSendJob(dir, peer.Uid, job)) + + return job +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +// TestProcessSentMessages_UpdatesDeliveryInfo is the main round-trip test. +// It verifies that after ProcessSentMessages runs: +// - the function returns 1 (one message updated) +// - the send job is removed from the queue +// - a subsequent LoadMessagesHistory returns ServerDeliveryUuid and +// ServerDeliveryTimestamp for the message +func TestProcessSentMessages_UpdatesDeliveryInfo(t *testing.T) { + dir, id := setupMsgHelperConfig(t) + + peer := newFullyKeyedPeer(t, "peer-uid-main") + require.NoError(t, id.Peers.StorePeer(peer)) + + storeTestMessage(t, peer, "hello world") + + srv := client.Server{Url: "http://test-server.example"} + job := pushAndMarkSent(t, dir, peer, srv) + + // --- call under test --- + updated := ProcessSentMessages(dir) + + assert.Equal(t, 1, updated, "exactly one message should be updated") + + // The job must be removed from the queue after processing. + jobAfter, err := client.GetSendJob(dir, peer.Uid, job.ID) + require.NoError(t, err) + assert.Nil(t, jobAfter, "job should be deleted after processing") + + // Reload message history and verify delivery metadata was persisted. + msgs, err := peer.LoadMessagesHistory(0, 0, 50) + require.NoError(t, err) + require.Len(t, msgs, 1, "expected exactly one message in history") + + assert.Equal(t, srv.GetUid(), msgs[0].ServerDeliveryUuid, + "ServerDeliveryUuid should match the server that accepted the message") + assert.NotZero(t, msgs[0].ServerDeliveryTimestamp, + "ServerDeliveryTimestamp should be set after ProcessSentMessages") + assert.Equal(t, uint64(job.SentAt.Unix()), msgs[0].ServerDeliveryTimestamp, + "ServerDeliveryTimestamp should match job.SentAt") +} + +// TestProcessSentMessages_SkipsJobWithoutDeliveryInfo verifies that a Sent job +// missing SentAt or SuccessfulServer is discarded (not counted, not updating +// the message DB). +func TestProcessSentMessages_SkipsJobWithoutDeliveryInfo(t *testing.T) { + dir, id := setupMsgHelperConfig(t) + + peer := newFullyKeyedPeer(t, "peer-uid-incomplete") + require.NoError(t, id.Peers.StorePeer(peer)) + + storeTestMessage(t, peer, "incomplete job") + require.NotNil(t, peer.LastMessage) + + msgFile := filepath.Join(dir, "msg.bin") + require.NoError(t, os.WriteFile(msgFile, []byte("packed"), 0600)) + + require.NoError(t, client.PushSendJob(dir, &client.SendJob{ + Queue: peer.Uid, + File: msgFile, + MessageDbFile: peer.LastMessage.Dbfile, + MessageDbId: peer.LastMessage.Dbid, + Servers: []client.Server{{Url: "http://test-server.example"}}, + Timeout: 60, + })) + + job, _, err := client.PeekSendJob(dir, peer.Uid) + require.NoError(t, err) + require.NotNil(t, job) + + // Mark as Sent but intentionally leave SentAt and SuccessfulServer nil. + job.Status = client.SendStatusSent + require.NoError(t, client.UpdateSendJob(dir, peer.Uid, job)) + + updated := ProcessSentMessages(dir) + assert.Equal(t, 0, updated, "incomplete job must not be counted as updated") + + // Message should have no delivery info. + msgs, err := peer.LoadMessagesHistory(0, 0, 50) + require.NoError(t, err) + require.Len(t, msgs, 1) + assert.Empty(t, msgs[0].ServerDeliveryUuid, "delivery UUID must not be set") + assert.Zero(t, msgs[0].ServerDeliveryTimestamp, "delivery timestamp must not be set") +} + +// TestProcessSentMessages_EmptyQueues verifies that an absent or empty queues +// directory results in 0 updates without error. +func TestProcessSentMessages_EmptyQueues(t *testing.T) { + dir, _ := setupMsgHelperConfig(t) + // queues/ directory does not exist yet. + updated := ProcessSentMessages(dir) + assert.Equal(t, 0, updated, "no queues → 0 updates") + + // Also test with the directory present but empty. + require.NoError(t, os.MkdirAll(filepath.Join(dir, "queues"), 0700)) + updated = ProcessSentMessages(dir) + assert.Equal(t, 0, updated, "empty queues → 0 updates") +} + +// TestProcessSentMessages_MissingDbInfo verifies that a job with blank +// MessageDbFile or zero MessageDbId is rejected with a clear error, not silently +// treated as successful. +func TestProcessSentMessages_MissingDbInfo(t *testing.T) { + dir, id := setupMsgHelperConfig(t) + + peer := newFullyKeyedPeer(t, "peer-uid-nodbinfo") + require.NoError(t, id.Peers.StorePeer(peer)) + + storeTestMessage(t, peer, "the real message") + + msgFile := filepath.Join(dir, "msg.bin") + require.NoError(t, os.WriteFile(msgFile, []byte("packed"), 0600)) + + // Push a job WITHOUT MessageDbFile / MessageDbId — the old broken pattern. + require.NoError(t, client.PushSendJob(dir, &client.SendJob{ + Queue: peer.Uid, + File: msgFile, + Servers: []client.Server{{Url: "http://test-server.example"}}, + Timeout: 60, + })) + + job, _, err := client.PeekSendJob(dir, peer.Uid) + require.NoError(t, err) + require.NotNil(t, job) + + sentAt := time.Now() + srvIdx := 0 + job.Status = client.SendStatusSent + job.SentAt = &sentAt + job.SuccessfulServer = &srvIdx + require.NoError(t, client.UpdateSendJob(dir, peer.Uid, job)) + + // Must NOT count as updated; the real message row must be untouched. + updated := ProcessSentMessages(dir) + assert.Equal(t, 0, updated, "job without db info must not be counted as updated") + + msgs, err := peer.LoadMessagesHistory(0, 0, 50) + require.NoError(t, err) + require.Len(t, msgs, 1) + assert.Empty(t, msgs[0].ServerDeliveryUuid, "delivery UUID must not be set") + assert.Zero(t, msgs[0].ServerDeliveryTimestamp, "delivery timestamp must not be set") +} + +// TestProcessSentMessages_MultipleMessages verifies that all jobs in the same +// queue are processed and that each message gets its own delivery info. +func TestProcessSentMessages_MultipleMessages(t *testing.T) { + dir, id := setupMsgHelperConfig(t) + + peer := newFullyKeyedPeer(t, "peer-uid-multi") + require.NoError(t, id.Peers.StorePeer(peer)) + + srv := client.Server{Url: "http://test-server.example"} + + const n = 3 + for i := range n { + storeTestMessage(t, peer, fmt.Sprintf("message-%d", i)) + pushAndMarkSent(t, dir, peer, srv) + } + + updated := ProcessSentMessages(dir) + assert.Equal(t, n, updated, "all %d messages should be updated", n) + + msgs, err := peer.LoadMessagesHistory(0, 0, 50) + require.NoError(t, err) + require.Len(t, msgs, n) + + for _, m := range msgs { + assert.Equal(t, srv.GetUid(), m.ServerDeliveryUuid, + "every message should have ServerDeliveryUuid set") + assert.NotZero(t, m.ServerDeliveryTimestamp, + "every message should have ServerDeliveryTimestamp set") + } +} diff --git a/client/messagestorage.go b/client/messagestorage.go index 2b123d0..857b4ff 100644 --- a/client/messagestorage.go +++ b/client/messagestorage.go @@ -2,6 +2,7 @@ package client import ( "database/sql" + "fmt" "math" "os" "path/filepath" @@ -256,8 +257,9 @@ func GetDbMessage(dbFile string, dbId int64, password string) (*meowlib.DbMessag } defer rows.Close() var dbm meowlib.DbMessage + found := false for rows.Next() { - + found = true var id int64 var m []byte err = rows.Scan(&id, &m) @@ -272,7 +274,9 @@ func GetDbMessage(dbFile string, dbId int64, password string) (*meowlib.DbMessag if err != nil { return nil, err } - + } + if !found { + return nil, fmt.Errorf("message row %d not found in %s", dbId, dbFile) } return &dbm, nil } diff --git a/client/sendjobs.go b/client/sendjobs.go index 7e73061..44a4a26 100644 --- a/client/sendjobs.go +++ b/client/sendjobs.go @@ -29,10 +29,12 @@ const ( // are managed by the queue functions and must not be set by the caller. type SendJob struct { // --- caller-supplied fields --- - Queue string `json:"queue,omitempty"` - File string `json:"file,omitempty"` - Servers []Server `json:"servers,omitempty"` - Timeout int `json:"timeout,omitempty"` // seconds; 0 = no timeout + Queue string `json:"queue,omitempty"` // uid of destination peer, used for naming the sent queue sqlite db + File string `json:"file,omitempty"` // filename which content shall be sent to the server as message payload + MessageDbFile string `json:"message_db_file,omitempty"` // peer message DB UUID (no path, no suffix) — from peer.LastMessage.Dbfile + MessageDbId int64 `json:"message_db_id,omitempty"` // SQLite row ID in MessageDbFile — from peer.LastMessage.Dbid + Servers []Server `json:"servers,omitempty"` + Timeout int `json:"timeout,omitempty"` // seconds; 0 = no timeout // --- DB-managed tracking fields (not serialised by the caller) --- ID int64 @@ -68,6 +70,8 @@ func openOrCreateSendQueue(dbPath string) (*sql.DB, error) { _, err = db.Exec(`CREATE TABLE IF NOT EXISTS queue ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, file TEXT NOT NULL, + message_db_file TEXT NOT NULL DEFAULT '', + message_db_id INTEGER NOT NULL DEFAULT 0, servers TEXT NOT NULL, timeout INTEGER NOT NULL DEFAULT 0, inserted_at INTEGER NOT NULL, @@ -80,6 +84,10 @@ func openOrCreateSendQueue(dbPath string) (*sql.DB, error) { db.Close() return nil, err } + // Migration: add columns to existing DBs that pre-date this schema version. + // SQLite returns an error if the column already exists; we silently ignore it. + db.Exec(`ALTER TABLE queue ADD COLUMN message_db_file TEXT NOT NULL DEFAULT ''`) + db.Exec(`ALTER TABLE queue ADD COLUMN message_db_id INTEGER NOT NULL DEFAULT 0`) return db, nil } @@ -101,8 +109,10 @@ func PushSendJob(storagePath string, job *SendJob) error { return err } _, err = db.Exec( - `INSERT INTO queue(file, servers, timeout, inserted_at, status, retries) VALUES(?,?,?,?,?,?)`, - job.File, string(serversJSON), job.Timeout, time.Now().Unix(), SendStatusPending, string(retriesJSON), + `INSERT INTO queue(file, message_db_file, message_db_id, servers, timeout, inserted_at, status, retries) + VALUES(?,?,?,?,?,?,?,?)`, + job.File, job.MessageDbFile, job.MessageDbId, + string(serversJSON), job.Timeout, time.Now().Unix(), SendStatusPending, string(retriesJSON), ) return err } @@ -119,6 +129,8 @@ func PeekSendJob(storagePath, queue string) (*SendJob, int64, error) { var ( id int64 file string + messageDbFile string + messageDbId int64 serversJSON string timeout int insertedAt int64 @@ -128,10 +140,10 @@ func PeekSendJob(storagePath, queue string) (*SendJob, int64, error) { successfulServer sql.NullInt64 ) err = db.QueryRow( - `SELECT id, file, servers, timeout, inserted_at, status, sent_at, retries, successful_server + `SELECT id, file, message_db_file, message_db_id, servers, timeout, inserted_at, status, sent_at, retries, successful_server FROM queue WHERE status = ? ORDER BY id ASC LIMIT 1`, SendStatusPending, - ).Scan(&id, &file, &serversJSON, &timeout, &insertedAt, &status, &sentAt, &retriesJSON, &successfulServer) + ).Scan(&id, &file, &messageDbFile, &messageDbId, &serversJSON, &timeout, &insertedAt, &status, &sentAt, &retriesJSON, &successfulServer) if err == sql.ErrNoRows { return nil, 0, nil } @@ -149,14 +161,16 @@ func PeekSendJob(storagePath, queue string) (*SendJob, int64, error) { } job := &SendJob{ - ID: id, - Queue: queue, - File: file, - Servers: servers, - Timeout: timeout, - InsertedAt: time.Unix(insertedAt, 0), - Status: status, - Retries: retries, + ID: id, + Queue: queue, + File: file, + MessageDbFile: messageDbFile, + MessageDbId: messageDbId, + Servers: servers, + Timeout: timeout, + InsertedAt: time.Unix(insertedAt, 0), + Status: status, + Retries: retries, } if sentAt.Valid { t := time.Unix(sentAt.Int64, 0) @@ -208,6 +222,8 @@ func GetSendJob(storagePath, queue string, id int64) (*SendJob, error) { var ( file string + messageDbFile string + messageDbId int64 serversJSON string timeout int insertedAt int64 @@ -217,10 +233,10 @@ func GetSendJob(storagePath, queue string, id int64) (*SendJob, error) { successfulServer sql.NullInt64 ) err = db.QueryRow( - `SELECT file, servers, timeout, inserted_at, status, sent_at, retries, successful_server + `SELECT file, message_db_file, message_db_id, servers, timeout, inserted_at, status, sent_at, retries, successful_server FROM queue WHERE id = ?`, id, - ).Scan(&file, &serversJSON, &timeout, &insertedAt, &status, &sentAt, &retriesJSON, &successfulServer) + ).Scan(&file, &messageDbFile, &messageDbId, &serversJSON, &timeout, &insertedAt, &status, &sentAt, &retriesJSON, &successfulServer) if err == sql.ErrNoRows { return nil, nil } @@ -238,14 +254,16 @@ func GetSendJob(storagePath, queue string, id int64) (*SendJob, error) { } job := &SendJob{ - ID: id, - Queue: queue, - File: file, - Servers: servers, - Timeout: timeout, - InsertedAt: time.Unix(insertedAt, 0), - Status: status, - Retries: retries, + ID: id, + Queue: queue, + File: file, + MessageDbFile: messageDbFile, + MessageDbId: messageDbId, + Servers: servers, + Timeout: timeout, + InsertedAt: time.Unix(insertedAt, 0), + Status: status, + Retries: retries, } if sentAt.Valid { t := time.Unix(sentAt.Int64, 0) @@ -269,7 +287,7 @@ func GetSentJobs(storagePath, queue string) ([]*SendJob, error) { defer db.Close() rows, err := db.Query( - `SELECT id, file, servers, timeout, inserted_at, sent_at, retries, successful_server + `SELECT id, file, message_db_file, message_db_id, servers, timeout, inserted_at, sent_at, retries, successful_server FROM queue WHERE status = ? ORDER BY id ASC`, SendStatusSent, ) @@ -283,6 +301,8 @@ func GetSentJobs(storagePath, queue string) ([]*SendJob, error) { var ( id int64 file string + messageDbFile string + messageDbId int64 serversJSON string timeout int insertedAt int64 @@ -290,7 +310,7 @@ func GetSentJobs(storagePath, queue string) ([]*SendJob, error) { retriesJSON string successfulServer sql.NullInt64 ) - if err := rows.Scan(&id, &file, &serversJSON, &timeout, &insertedAt, &sentAt, &retriesJSON, &successfulServer); err != nil { + if err := rows.Scan(&id, &file, &messageDbFile, &messageDbId, &serversJSON, &timeout, &insertedAt, &sentAt, &retriesJSON, &successfulServer); err != nil { return nil, err } @@ -304,14 +324,16 @@ func GetSentJobs(storagePath, queue string) ([]*SendJob, error) { } job := &SendJob{ - ID: id, - Queue: queue, - File: file, - Servers: servers, - Timeout: timeout, - InsertedAt: time.Unix(insertedAt, 0), - Status: SendStatusSent, - Retries: retries, + ID: id, + Queue: queue, + File: file, + MessageDbFile: messageDbFile, + MessageDbId: messageDbId, + Servers: servers, + Timeout: timeout, + InsertedAt: time.Unix(insertedAt, 0), + Status: SendStatusSent, + Retries: retries, } if sentAt.Valid { t := time.Unix(sentAt.Int64, 0)