diff --git a/client/helpers/bgSendHelper.go b/client/helpers/bgSendHelper.go index f569af4..c9e0d1b 100644 --- a/client/helpers/bgSendHelper.go +++ b/client/helpers/bgSendHelper.go @@ -2,6 +2,7 @@ package helpers import ( "errors" + "fmt" "os" "path/filepath" "sync" @@ -16,11 +17,37 @@ const maxRetriesPerServer = 3 const defaultSendTimeout = 3600 * 24 // seconds, used when job.Timeout is 0 const defaultPostTimeout = 200 -// WriteSendJob enqueues a SendJob from the main Flutter isolate. -// It is a thin wrapper over client.PushSendJob and is safe to call -// concurrently with ProcessSendQueues. -func WriteSendJob(storagePath string, job *client.SendJob) error { - return client.PushSendJob(storagePath, job) +// CreateUserMessageAndSendJob is the single entry point for sending a message. +// It creates and stores the user message, serialises the packed form to +// storagePath/outbox/{dbFile}_{dbId}, and enqueues a SendJob in +// storagePath/queues/{peerUid}. +func CreateUserMessageAndSendJob(storagePath, message, peerUid, replyToUid string, filelist []string, servers []client.Server, timeout int) error { + packedMsg, dbFile, dbId, errTxt, err := CreateAndStoreUserMessage(message, peerUid, replyToUid, filelist) + if err != nil { + return fmt.Errorf("%s: %w", errTxt, err) + } + + data, err := proto.Marshal(packedMsg) + if err != nil { + return fmt.Errorf("CreateUserMessageAndSendJob: proto.Marshal: %w", err) + } + + outboxDir := filepath.Join(storagePath, "outbox") + if err := os.MkdirAll(outboxDir, 0700); err != nil { + return fmt.Errorf("CreateUserMessageAndSendJob: MkdirAll: %w", err) + } + + outboxFile := filepath.Join(outboxDir, fmt.Sprintf("%s_%d", dbFile, dbId)) + if err := os.WriteFile(outboxFile, data, 0600); err != nil { + return fmt.Errorf("CreateUserMessageAndSendJob: WriteFile: %w", err) + } + + return client.PushSendJob(storagePath, &client.SendJob{ + Queue: peerUid, + File: outboxFile, + Servers: servers, + Timeout: timeout, + }) } // ProcessSendQueues discovers every queue DB file under storagePath/queues/ diff --git a/client/helpers/bgSendHelper_test.go b/client/helpers/bgSendHelper_test.go index 097b788..54ca21a 100644 --- a/client/helpers/bgSendHelper_test.go +++ b/client/helpers/bgSendHelper_test.go @@ -6,6 +6,8 @@ import ( "net/http/httptest" "os" "path/filepath" + "strconv" + "strings" "sync/atomic" "testing" "time" @@ -190,20 +192,44 @@ func TestAttemptSendJob_SkipsExhaustedServer(t *testing.T) { // --- integration tests -------------------------------------------------- -// TestWriteSendJob verifies the thin WriteSendJob wrapper enqueues the job. -func TestWriteSendJob(t *testing.T) { - dir := t.TempDir() - err := WriteSendJob(dir, &client.SendJob{ - Queue: "q1", - File: "/tmp/f", - Servers: serverSlice("http://s1"), - }) +// TestCreateUserMessageAndSendJob verifies that the packed message is written to +// outbox/{dbFile}_{dbId} and a pending send job is enqueued for the peer. +func TestCreateUserMessageAndSendJob(t *testing.T) { + dir, id := setupMsgHelperConfig(t) + + peer := newFullyKeyedPeer(t, "peer-create-send") + require.NoError(t, id.Peers.StorePeer(peer)) + + srv := newTestServer(t, "http://test-srv.example") + + err := CreateUserMessageAndSendJob( + dir, + "hello from integration", + "peer-create-send", + "", + nil, + []client.Server{srv}, + 60, + ) require.NoError(t, err) - got, _, err := client.PeekSendJob(dir, "q1") + // A pending job must be in the queue. + job, _, err := client.PeekSendJob(dir, "peer-create-send") require.NoError(t, err) - require.NotNil(t, got) - assert.Equal(t, "/tmp/f", got.File) + require.NotNil(t, job, "a send job must be enqueued") + + // The outbox file must exist under storagePath/outbox/. + assert.FileExists(t, job.File) + assert.True(t, strings.HasPrefix(job.File, filepath.Join(dir, "outbox")), + "outbox file must be under storagePath/outbox/") + + // The basename must follow the {dbFile}_{dbId} naming convention. + base := filepath.Base(job.File) + sep := strings.LastIndex(base, "_") + require.Greater(t, sep, 0, "filename must contain an underscore separating dbFile from dbId") + dbId, parseErr := strconv.ParseInt(base[sep+1:], 10, 64) + assert.NoError(t, parseErr, "suffix after underscore must be a numeric db ID") + assert.Greater(t, dbId, int64(0), "db ID must be positive") } // TestProcessSendQueues_Success verifies that a pending job is delivered and diff --git a/client/helpers/messageHelper.go b/client/helpers/messageHelper.go index e37354f..1bd24bf 100644 --- a/client/helpers/messageHelper.go +++ b/client/helpers/messageHelper.go @@ -1,9 +1,10 @@ package helpers import ( - "errors" "os" "path/filepath" + "strconv" + "strings" "time" "forge.redroom.link/yves/meowlib" @@ -27,25 +28,29 @@ func PackMessageForServer(packedMsg *meowlib.PackedUserMessage, srvuid string) ( } func CreateStorePackUserMessageForServer(message string, srvuid string, peer_uid string, replyToUid string, filelist []string) ([]byte, string, error) { - usermessage, errtxt, err := CreateAndStoreUserMessage(message, peer_uid, replyToUid, filelist) + usermessage, _, _, errtxt, err := CreateAndStoreUserMessage(message, peer_uid, replyToUid, filelist) if err != nil { return nil, errtxt, err } return PackMessageForServer(usermessage, srvuid) } -func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid string, filelist []string) (*meowlib.PackedUserMessage, string, error) { +// CreateAndStoreUserMessage creates, signs, and stores an outbound message for +// peer_uid. It returns the packed (encrypted) form ready for server transport, +// the peer DB file UUID (dbFile), the SQLite row ID (dbId), an error context +// string, and any error. +func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid string, filelist []string) (*meowlib.PackedUserMessage, string, int64, string, error) { peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid) // Creating User message usermessage, err := peer.BuildSimpleUserMessage([]byte(message)) if err != nil { - return nil, "PrepareServerMessage : BuildSimpleUserMessage", err + return nil, "", 0, "PrepareServerMessage : BuildSimpleUserMessage", err } for _, file := range filelist { err = usermessage.AddFile(file, client.GetConfig().Chunksize) if err != nil { - return nil, "PrepareServerMessage : AddFile", err + return nil, "", 0, "PrepareServerMessage : AddFile", err } } usermessage.Status.Sent = uint64(time.Now().UTC().Unix()) @@ -54,16 +59,19 @@ func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid strin // Store message err = peer.StoreMessage(usermessage, nil) if err != nil { - return nil, "messageBuildPostprocess : StoreMessage", err + return nil, "", 0, "messageBuildPostprocess : StoreMessage", err } + dbFile := peer.LastMessage.Dbfile + dbId := peer.LastMessage.Dbid + // Prepare cyphered + packed user message packedMsg, err := peer.ProcessOutboundUserMessage(usermessage) if err != nil { - return nil, "messageBuildPostprocess : ProcessOutboundUserMessage", err + return nil, "", 0, "messageBuildPostprocess : ProcessOutboundUserMessage", err } - return packedMsg, "", nil + return packedMsg, dbFile, dbId, "", nil } func BuildAckMessage(messageUid string, srvuid string, peer_uid string, received int64, processed int64) ([]byte, string, error) { @@ -100,26 +108,14 @@ func ReadAckMessageResponse() { //! update the status in message store } -// GetPeerLastMessageDbInfo returns the DB location of the most recently stored -// message for the given peer. Call this immediately after CreateAndStoreUserMessage -// to get the values needed for SendJob.MessageDbFile and SendJob.MessageDbId. -func GetPeerLastMessageDbInfo(peer_uid string) (dbFile string, dbId int64, errTxt string, err error) { - peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid) - if peer == nil { - return "", 0, "GetPeerLastMessageDbInfo: peer not found", errors.New("peer not found") - } - if peer.LastMessage == nil { - return "", 0, "GetPeerLastMessageDbInfo: no message stored yet", errors.New("no message stored yet for this peer") - } - return peer.LastMessage.Dbfile, peer.LastMessage.Dbid, "", nil -} - // ProcessSentMessages scans every send queue under storagePath/queues/, updates // the message storage entry with server delivery info for each sent job, then // removes the job from the queue. Returns the number of messages updated. // -// Each SendJob must have MessageDbFile and MessageDbId set (populated via -// GetPeerLastMessageDbInfo right after the message is stored). +// The message DB location is recovered from the job's File basename, which must +// follow the naming convention produced by CreateUserMessageAndSendJob: +// +// outbox/{dbFile}_{dbId} func ProcessSentMessages(storagePath string) int { password, _ := client.GetConfig().GetMemPass() queueDir := filepath.Join(storagePath, "queues") @@ -152,18 +148,28 @@ func ProcessSentMessages(storagePath string) int { continue } - if job.MessageDbFile == "" || job.MessageDbId == 0 { - logger.Error().Int64("id", job.ID).Str("queue", queue). - Msg("ProcessSentMessages: job missing MessageDbFile/MessageDbId — set them via GetPeerLastMessageDbInfo when building the SendJob") + // Recover dbFile and dbId from the outbox filename: {dbFile}_{dbId} + base := filepath.Base(job.File) + sep := strings.LastIndex(base, "_") + if sep <= 0 { + logger.Error().Int64("id", job.ID).Str("file", job.File). + Msg("ProcessSentMessages: cannot parse dbFile/dbId from job filename — use CreateUserMessageAndSendJob to build jobs") + continue + } + dbFile := base[:sep] + dbId, parseErr := strconv.ParseInt(base[sep+1:], 10, 64) + if parseErr != nil || dbFile == "" || dbId == 0 { + logger.Error().Int64("id", job.ID).Str("file", job.File). + Msg("ProcessSentMessages: invalid dbFile/dbId in job filename") continue } serverUid := job.Servers[*job.SuccessfulServer].GetUid() receiveTime := uint64(job.SentAt.Unix()) - if err := client.SetMessageServerDelivery(job.MessageDbFile, job.MessageDbId, serverUid, receiveTime, password); err != nil { + if err := client.SetMessageServerDelivery(dbFile, dbId, serverUid, receiveTime, password); err != nil { logger.Error().Err(err).Str("queue", queue). - Str("dbFile", job.MessageDbFile).Int64("dbId", job.MessageDbId). + Str("dbFile", dbFile).Int64("dbId", dbId). Msg("ProcessSentMessages: SetMessageServerDelivery") continue } diff --git a/client/helpers/messageHelper_test.go b/client/helpers/messageHelper_test.go index 7ef9348..50e41b1 100644 --- a/client/helpers/messageHelper_test.go +++ b/client/helpers/messageHelper_test.go @@ -88,24 +88,25 @@ func storeTestMessage(t *testing.T, peer *client.Peer, text string) { // pushAndMarkSent pushes a send job for the given peer and marks it as delivered // by the given server. Returns the job after the status update. -// It mirrors the correct app usage: call GetPeerLastMessageDbInfo immediately -// after storing to populate MessageDbFile and MessageDbId on the job. +// The outbox file is named {dbFile}_{dbId} so that ProcessSentMessages can +// recover the message DB location from the filename, matching the convention +// used by CreateUserMessageAndSendJob. func pushAndMarkSent(t *testing.T, dir string, peer *client.Peer, srv client.Server) *client.SendJob { t.Helper() - dbFile, dbId, errTxt, err := GetPeerLastMessageDbInfo(peer.Uid) - require.NoError(t, err, errTxt) + dbFile := peer.LastMessage.Dbfile + dbId := peer.LastMessage.Dbid - msgFile := filepath.Join(dir, fmt.Sprintf("msg-%d.bin", dbId)) + outboxDir := filepath.Join(dir, "outbox") + require.NoError(t, os.MkdirAll(outboxDir, 0700)) + msgFile := filepath.Join(outboxDir, fmt.Sprintf("%s_%d", dbFile, dbId)) require.NoError(t, os.WriteFile(msgFile, []byte("packed-server-message"), 0600)) require.NoError(t, client.PushSendJob(dir, &client.SendJob{ - Queue: peer.Uid, - File: msgFile, - MessageDbFile: dbFile, - MessageDbId: dbId, - Servers: []client.Server{srv}, - Timeout: 60, + Queue: peer.Uid, + File: msgFile, + Servers: []client.Server{srv}, + Timeout: 60, })) job, _, err := client.PeekSendJob(dir, peer.Uid) @@ -177,19 +178,19 @@ func TestProcessSentMessages_SkipsJobWithoutDeliveryInfo(t *testing.T) { storeTestMessage(t, peer, "incomplete job") - dbFile, dbId, errTxt, err := GetPeerLastMessageDbInfo(peer.Uid) - require.NoError(t, err, errTxt) + dbFile := peer.LastMessage.Dbfile + dbId := peer.LastMessage.Dbid - msgFile := filepath.Join(dir, "msg.bin") + outboxDir := filepath.Join(dir, "outbox") + require.NoError(t, os.MkdirAll(outboxDir, 0700)) + msgFile := filepath.Join(outboxDir, fmt.Sprintf("%s_%d", dbFile, dbId)) require.NoError(t, os.WriteFile(msgFile, []byte("packed"), 0600)) require.NoError(t, client.PushSendJob(dir, &client.SendJob{ - Queue: peer.Uid, - File: msgFile, - MessageDbFile: dbFile, - MessageDbId: dbId, - Servers: []client.Server{{Url: "http://test-server.example"}}, - Timeout: 60, + Queue: peer.Uid, + File: msgFile, + Servers: []client.Server{{Url: "http://test-server.example"}}, + Timeout: 60, })) job, _, err := client.PeekSendJob(dir, peer.Uid) @@ -225,10 +226,10 @@ func TestProcessSentMessages_EmptyQueues(t *testing.T) { assert.Equal(t, 0, updated, "empty queues → 0 updates") } -// TestProcessSentMessages_MissingDbInfo verifies that a job with blank -// MessageDbFile or zero MessageDbId is rejected with a clear error, not silently -// treated as successful. -func TestProcessSentMessages_MissingDbInfo(t *testing.T) { +// TestProcessSentMessages_UnparseableFilename verifies that a job whose filename +// does not follow the {dbFile}_{dbId} convention is skipped with a logged error +// and not counted as updated. +func TestProcessSentMessages_UnparseableFilename(t *testing.T) { dir, id := setupMsgHelperConfig(t) peer := newFullyKeyedPeer(t, "peer-uid-nodbinfo") @@ -236,10 +237,10 @@ func TestProcessSentMessages_MissingDbInfo(t *testing.T) { storeTestMessage(t, peer, "the real message") - msgFile := filepath.Join(dir, "msg.bin") + // A filename with no underscore cannot be parsed as {dbFile}_{dbId}. + msgFile := filepath.Join(dir, "badname.bin") require.NoError(t, os.WriteFile(msgFile, []byte("packed"), 0600)) - // Push a job WITHOUT MessageDbFile / MessageDbId — the old broken pattern. require.NoError(t, client.PushSendJob(dir, &client.SendJob{ Queue: peer.Uid, File: msgFile, diff --git a/client/sendjobs.go b/client/sendjobs.go index 44a4a26..85ebd3c 100644 --- a/client/sendjobs.go +++ b/client/sendjobs.go @@ -21,7 +21,9 @@ const ( // SendJob describes a message to send, together with its delivery tracking state. // -// The File field holds the path of a pre-built packed server message (binary). +// The File field holds the path to an outbox file written by CreateUserMessageAndSendJob. +// It must follow the naming convention outbox/{dbFile}_{dbId} so that +// ProcessSentMessages can recover the message DB location from the filename alone. // Servers is tried in order; after MaxRetriesPerServer failures on one server // the next one is attempted. // @@ -29,12 +31,10 @@ const ( // are managed by the queue functions and must not be set by the caller. type SendJob struct { // --- caller-supplied fields --- - Queue string `json:"queue,omitempty"` // uid of destination peer, used for naming the sent queue sqlite db - File string `json:"file,omitempty"` // filename which content shall be sent to the server as message payload - MessageDbFile string `json:"message_db_file,omitempty"` // peer message DB UUID (no path, no suffix) — from peer.LastMessage.Dbfile - MessageDbId int64 `json:"message_db_id,omitempty"` // SQLite row ID in MessageDbFile — from peer.LastMessage.Dbid - Servers []Server `json:"servers,omitempty"` - Timeout int `json:"timeout,omitempty"` // seconds; 0 = no timeout + Queue string `json:"queue,omitempty"` // uid of destination peer, used for naming the queue sqlite db + File string `json:"file,omitempty"` // outbox file path; basename must be {dbFile}_{dbId} + Servers []Server `json:"servers,omitempty"` + Timeout int `json:"timeout,omitempty"` // seconds; 0 = no timeout // --- DB-managed tracking fields (not serialised by the caller) --- ID int64 @@ -70,8 +70,6 @@ func openOrCreateSendQueue(dbPath string) (*sql.DB, error) { _, err = db.Exec(`CREATE TABLE IF NOT EXISTS queue ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, file TEXT NOT NULL, - message_db_file TEXT NOT NULL DEFAULT '', - message_db_id INTEGER NOT NULL DEFAULT 0, servers TEXT NOT NULL, timeout INTEGER NOT NULL DEFAULT 0, inserted_at INTEGER NOT NULL, @@ -84,10 +82,6 @@ func openOrCreateSendQueue(dbPath string) (*sql.DB, error) { db.Close() return nil, err } - // Migration: add columns to existing DBs that pre-date this schema version. - // SQLite returns an error if the column already exists; we silently ignore it. - db.Exec(`ALTER TABLE queue ADD COLUMN message_db_file TEXT NOT NULL DEFAULT ''`) - db.Exec(`ALTER TABLE queue ADD COLUMN message_db_id INTEGER NOT NULL DEFAULT 0`) return db, nil } @@ -109,10 +103,9 @@ func PushSendJob(storagePath string, job *SendJob) error { return err } _, err = db.Exec( - `INSERT INTO queue(file, message_db_file, message_db_id, servers, timeout, inserted_at, status, retries) - VALUES(?,?,?,?,?,?,?,?)`, - job.File, job.MessageDbFile, job.MessageDbId, - string(serversJSON), job.Timeout, time.Now().Unix(), SendStatusPending, string(retriesJSON), + `INSERT INTO queue(file, servers, timeout, inserted_at, status, retries) + VALUES(?,?,?,?,?,?)`, + job.File, string(serversJSON), job.Timeout, time.Now().Unix(), SendStatusPending, string(retriesJSON), ) return err } @@ -129,8 +122,6 @@ func PeekSendJob(storagePath, queue string) (*SendJob, int64, error) { var ( id int64 file string - messageDbFile string - messageDbId int64 serversJSON string timeout int insertedAt int64 @@ -140,10 +131,10 @@ func PeekSendJob(storagePath, queue string) (*SendJob, int64, error) { successfulServer sql.NullInt64 ) err = db.QueryRow( - `SELECT id, file, message_db_file, message_db_id, servers, timeout, inserted_at, status, sent_at, retries, successful_server + `SELECT id, file, servers, timeout, inserted_at, status, sent_at, retries, successful_server FROM queue WHERE status = ? ORDER BY id ASC LIMIT 1`, SendStatusPending, - ).Scan(&id, &file, &messageDbFile, &messageDbId, &serversJSON, &timeout, &insertedAt, &status, &sentAt, &retriesJSON, &successfulServer) + ).Scan(&id, &file, &serversJSON, &timeout, &insertedAt, &status, &sentAt, &retriesJSON, &successfulServer) if err == sql.ErrNoRows { return nil, 0, nil } @@ -161,16 +152,14 @@ func PeekSendJob(storagePath, queue string) (*SendJob, int64, error) { } job := &SendJob{ - ID: id, - Queue: queue, - File: file, - MessageDbFile: messageDbFile, - MessageDbId: messageDbId, - Servers: servers, - Timeout: timeout, - InsertedAt: time.Unix(insertedAt, 0), - Status: status, - Retries: retries, + ID: id, + Queue: queue, + File: file, + Servers: servers, + Timeout: timeout, + InsertedAt: time.Unix(insertedAt, 0), + Status: status, + Retries: retries, } if sentAt.Valid { t := time.Unix(sentAt.Int64, 0) @@ -222,8 +211,6 @@ func GetSendJob(storagePath, queue string, id int64) (*SendJob, error) { var ( file string - messageDbFile string - messageDbId int64 serversJSON string timeout int insertedAt int64 @@ -233,10 +220,10 @@ func GetSendJob(storagePath, queue string, id int64) (*SendJob, error) { successfulServer sql.NullInt64 ) err = db.QueryRow( - `SELECT file, message_db_file, message_db_id, servers, timeout, inserted_at, status, sent_at, retries, successful_server + `SELECT file, servers, timeout, inserted_at, status, sent_at, retries, successful_server FROM queue WHERE id = ?`, id, - ).Scan(&file, &messageDbFile, &messageDbId, &serversJSON, &timeout, &insertedAt, &status, &sentAt, &retriesJSON, &successfulServer) + ).Scan(&file, &serversJSON, &timeout, &insertedAt, &status, &sentAt, &retriesJSON, &successfulServer) if err == sql.ErrNoRows { return nil, nil } @@ -254,16 +241,14 @@ func GetSendJob(storagePath, queue string, id int64) (*SendJob, error) { } job := &SendJob{ - ID: id, - Queue: queue, - File: file, - MessageDbFile: messageDbFile, - MessageDbId: messageDbId, - Servers: servers, - Timeout: timeout, - InsertedAt: time.Unix(insertedAt, 0), - Status: status, - Retries: retries, + ID: id, + Queue: queue, + File: file, + Servers: servers, + Timeout: timeout, + InsertedAt: time.Unix(insertedAt, 0), + Status: status, + Retries: retries, } if sentAt.Valid { t := time.Unix(sentAt.Int64, 0) @@ -287,7 +272,7 @@ func GetSentJobs(storagePath, queue string) ([]*SendJob, error) { defer db.Close() rows, err := db.Query( - `SELECT id, file, message_db_file, message_db_id, servers, timeout, inserted_at, sent_at, retries, successful_server + `SELECT id, file, servers, timeout, inserted_at, sent_at, retries, successful_server FROM queue WHERE status = ? ORDER BY id ASC`, SendStatusSent, ) @@ -301,8 +286,6 @@ func GetSentJobs(storagePath, queue string) ([]*SendJob, error) { var ( id int64 file string - messageDbFile string - messageDbId int64 serversJSON string timeout int insertedAt int64 @@ -310,7 +293,7 @@ func GetSentJobs(storagePath, queue string) ([]*SendJob, error) { retriesJSON string successfulServer sql.NullInt64 ) - if err := rows.Scan(&id, &file, &messageDbFile, &messageDbId, &serversJSON, &timeout, &insertedAt, &sentAt, &retriesJSON, &successfulServer); err != nil { + if err := rows.Scan(&id, &file, &serversJSON, &timeout, &insertedAt, &sentAt, &retriesJSON, &successfulServer); err != nil { return nil, err } @@ -324,16 +307,14 @@ func GetSentJobs(storagePath, queue string) ([]*SendJob, error) { } job := &SendJob{ - ID: id, - Queue: queue, - File: file, - MessageDbFile: messageDbFile, - MessageDbId: messageDbId, - Servers: servers, - Timeout: timeout, - InsertedAt: time.Unix(insertedAt, 0), - Status: SendStatusSent, - Retries: retries, + ID: id, + Queue: queue, + File: file, + Servers: servers, + Timeout: timeout, + InsertedAt: time.Unix(insertedAt, 0), + Status: SendStatusSent, + Retries: retries, } if sentAt.Valid { t := time.Unix(sentAt.Int64, 0)