From 8b106db52f6b675fcc2f5fcd019a37a08ca57f95 Mon Sep 17 00:00:00 2001 From: yc Date: Tue, 21 Apr 2026 15:53:56 +0200 Subject: [PATCH] duplicate messages send fixes --- client/helpers/bgPollHelper.go | 199 +++++++------- client/identity.go | 2 +- client/messagestorage.go | 472 ++++++++++++++++----------------- client/peerstorage.go | 94 ++++--- client/serverstorage.go | 105 ++++---- http.go | 6 + 6 files changed, 437 insertions(+), 441 deletions(-) diff --git a/client/helpers/bgPollHelper.go b/client/helpers/bgPollHelper.go index bf643ea..6ba45ef 100644 --- a/client/helpers/bgPollHelper.go +++ b/client/helpers/bgPollHelper.go @@ -15,8 +15,8 @@ import ( "forge.redroom.link/yves/meowlib/client" invmsgs "forge.redroom.link/yves/meowlib/client/invitation/messages" invsrv "forge.redroom.link/yves/meowlib/client/invitation/server" - doubleratchet "github.com/status-im/doubleratchet" "github.com/google/uuid" + doubleratchet "github.com/status-im/doubleratchet" "google.golang.org/protobuf/proto" ) @@ -109,7 +109,7 @@ func PollServer(storage_path string, job *client.RequestsJob, timeout int, longP // SaveCheckJobs func SaveCheckJobs() (string, error) { me := client.GetConfig().GetIdentity() - err := me.SaveBackgroundJob() + err := me.SaveCheckJobs() if err != nil { return "CheckMessages: json.Marshal", err @@ -174,7 +174,8 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error // find the peer with that lookup key peer := identity.Peers.GetFromMyLookupKey(packedUserMessage.Destination) if peer == nil { - return nil, nil, "ReadMessage: GetFromMyLookupKey", errors.New("no visible peer for that message") + logger.Error().Str("destination", packedUserMessage.Destination).Msg("ConsumeInboxFile: no visible peer for that message, skipping") + continue } // Unpack the message — step-3 messages arrive before the initiator's identity // key is known, so skip signature verification for pending peers. @@ -185,100 +186,103 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error usermsg, err = peer.ProcessInboundUserMessage(packedUserMessage) } if err != nil { - return nil, nil, "ReadMessage: ProcessInboundUserMessage", err - } + //return nil, nil, "ReadMessage: ProcessInboundUserMessage", err + logger.Error().Msg("ReadMessage: ProcessInboundUserMessage" + err.Error()) + } else { - // Handle invitation step 3: initiator's full ContactCard arriving at the invitee. - if usermsg.Invitation != nil && usermsg.Invitation.Step == 3 { - invBytes, marshalErr := proto.Marshal(usermsg.Invitation) - if marshalErr == nil { - finalizedPeer, finalErr := invmsgs.Step4InviteeFinalizesInitiator(invBytes) - if finalErr == nil && finalizedPeer != nil { - // Auto-send step-4 confirmation to initiator's servers. - step4msgs, sendErr := invsrv.Step4PostConfirmation(finalizedPeer.InvitationId) - if sendErr == nil { - for i, bytemsg := range step4msgs { - if i < len(finalizedPeer.ContactPullServers) { - meowlib.HttpPostMessage(finalizedPeer.ContactPullServers[i], bytemsg, client.GetConfig().HttpTimeOut) + // Handle invitation step 3: initiator's full ContactCard arriving at the invitee. + if usermsg.Invitation != nil && usermsg.Invitation.Step == 3 { + invBytes, marshalErr := proto.Marshal(usermsg.Invitation) + if marshalErr == nil { + finalizedPeer, finalErr := invmsgs.Step4InviteeFinalizesInitiator(invBytes) + if finalErr == nil && finalizedPeer != nil { + // Auto-send step-4 confirmation to initiator's servers. + step4msgs, sendErr := invsrv.Step4PostConfirmation(finalizedPeer.InvitationId) + if sendErr == nil { + for i, bytemsg := range step4msgs { + if i < len(finalizedPeer.ContactPullServers) { + meowlib.HttpPostMessage(finalizedPeer.ContactPullServers[i], bytemsg, client.GetConfig().HttpTimeOut) + } } } } } + continue } - continue - } - // Handle invitation step 4: invitee's confirmation arriving at the initiator. - if usermsg.Invitation != nil && usermsg.Invitation.Step == 4 { - // Contact is fully active — nothing more to do on the initiator side. - continue - } - - // Check for received or processed already filled => it's an ack for one of our sent messages - if len(usermsg.Data) == 0 && usermsg.Status != nil && usermsg.Status.Uuid != "" && - (usermsg.Status.Received != 0 || usermsg.Status.Processed != 0) { - password, _ := client.GetConfig().GetMemPass() - if ackErr := client.UpdateMessageAck(peer, usermsg.Status.Uuid, usermsg.Status.Received, usermsg.Status.Processed, password); ackErr != nil { - logger.Warn().Err(ackErr).Str("uuid", usermsg.Status.Uuid).Msg("ConsumeInboxFile: UpdateMessageAck") + // Handle invitation step 4: invitee's confirmation arriving at the initiator. + if usermsg.Invitation != nil && usermsg.Invitation.Step == 4 { + // Contact is fully active — nothing more to do on the initiator side. + continue } - continue - } - //fmt.Println("From:", usermsg.From) - //jsonUserMessage, _ := json.Marshal(usermsg) - //fmt.Println(string(jsonUserMessage)) - //peer = client.GetConfig().GetIdentity().Peers.GetFromPublicKey(usermsg.From) + // Check for received or processed already filled => it's an ack for one of our sent messages + if len(usermsg.Data) == 0 && usermsg.Status != nil && usermsg.Status.Uuid != "" && + (usermsg.Status.Received != 0 || usermsg.Status.Processed != 0) { + password, _ := client.GetConfig().GetMemPass() + if ackErr := client.UpdateMessageAck(peer, usermsg.Status.Uuid, usermsg.Status.Received, usermsg.Status.Processed, password); ackErr != nil { + logger.Warn().Err(ackErr).Str("uuid", usermsg.Status.Uuid).Msg("ConsumeInboxFile: UpdateMessageAck") + } + continue + } - // detach files - if usermsg.Files != nil { - // create files folder - if _, err := os.Stat(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files")); os.IsNotExist(err) { - err = os.MkdirAll(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files"), 0700) - if err != nil { - return nil, nil, "ReadMessage: MkdirAll", err + //fmt.Println("From:", usermsg.From) + //jsonUserMessage, _ := json.Marshal(usermsg) + //fmt.Println(string(jsonUserMessage)) + //peer = client.GetConfig().GetIdentity().Peers.GetFromPublicKey(usermsg.From) + + // detach files + if usermsg.Files != nil { + // create files folder + if _, err := os.Stat(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files")); os.IsNotExist(err) { + err = os.MkdirAll(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files"), 0700) + if err != nil { + return nil, nil, "ReadMessage: MkdirAll", err + } + } + for _, file := range usermsg.Files { + filename := uuid.New().String() + "_" + file.Filename + filenames = append(filenames, peer.Name+" sent: "+filename) + // detach file + os.WriteFile(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files", filename), file.Data, 0600) + } + //? result["invitation finalized"] = peer.Name + } + // user message + + messagesOverview = append(messagesOverview, peer.Name+" > "+string(usermsg.Data)) + + // stamp the received time before storing + receivedAt := time.Now().UTC().Unix() + if usermsg.Status == nil { + usermsg.Status = &meowlib.ConversationStatus{} + } + usermsg.Status.Received = uint64(receivedAt) + + // add message to storage + err = peer.StoreMessage(usermsg, filenames) + if err != nil { + return nil, nil, "ReadMessage: StoreMessage", err + } + filenames = []string{} + + // Persist peer to save updated DR state (DrStateJson) + if peer.DrRootKey != "" { + if storeErr := identity.Peers.StorePeer(peer); storeErr != nil { + logger.Warn().Err(storeErr).Str("peer", peer.Uid).Msg("ConsumeInboxFile: StorePeer (DR state)") } } - for _, file := range usermsg.Files { - filename := uuid.New().String() + "_" + file.Filename - filenames = append(filenames, peer.Name+" sent: "+filename) - // detach file - os.WriteFile(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files", filename), file.Data, 0600) - } - //? result["invitation finalized"] = peer.Name - } - // user message - messagesOverview = append(messagesOverview, peer.Name+" > "+string(usermsg.Data)) - - // stamp the received time before storing - receivedAt := time.Now().UTC().Unix() - if usermsg.Status == nil { - usermsg.Status = &meowlib.ConversationStatus{} - } - usermsg.Status.Received = uint64(receivedAt) - - // add message to storage - err = peer.StoreMessage(usermsg, filenames) - if err != nil { - return nil, nil, "ReadMessage: StoreMessage", err - } - filenames = []string{} - - // Persist peer to save updated DR state (DrStateJson) - if peer.DrRootKey != "" { - if storeErr := identity.Peers.StorePeer(peer); storeErr != nil { - logger.Warn().Err(storeErr).Str("peer", peer.Uid).Msg("ConsumeInboxFile: StorePeer (DR state)") - } - } - - // Send delivery ack if the peer requested it - if peer.SendDeliveryAck && usermsg.Status.Uuid != "" { - storagePath := filepath.Join(client.GetConfig().StoragePath, identity.Uuid) - if ackErr := sendDeliveryAck(storagePath, peer, usermsg.Status.Uuid, receivedAt); ackErr != nil { - logger.Warn().Err(ackErr).Str("peer", peer.Uid).Msg("ConsumeInboxFile: sendDeliveryAck") + // Send delivery ack if the peer requested it + if peer.SendDeliveryAck && usermsg.Status.Uuid != "" { + storagePath := filepath.Join(client.GetConfig().StoragePath, identity.Uuid) + if ackErr := sendDeliveryAck(storagePath, peer, usermsg.Status.Uuid, receivedAt); ackErr != nil { + logger.Warn().Err(ackErr).Str("peer", peer.Uid).Msg("ConsumeInboxFile: sendDeliveryAck") + } } } } + } err = os.Remove(messageFilename) @@ -290,54 +294,33 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error return messagesOverview, filenames, "", nil } -// LongPollAllSerevrJobs checks for messages on a all servers defived in job file +// LongPollAllServerJobs checks for messages on all servers defined in job file. +// It returns as soon as any server delivers at least one message, or 0 when all +// polls time out. resultChan is buffered so goroutines never block on write. func LongPollAllServerJobs(storage_path string, jobs []client.RequestsJob, timeout int, longPoll bool) (int, string, error) { - - // Channel to collect results resultChan := make(chan int, len(jobs)) - errChan := make(chan error, len(jobs)) - // WaitGroup to sync goroutines var wg sync.WaitGroup - - // Loop through each job (server) for _, job := range jobs { wg.Add(1) - go func(job client.RequestsJob) { defer wg.Done() - - // Long-polling call to the server cnt, _, err := PollServer(storage_path, &job, timeout, true) - if err == nil && cnt > 0 { - select { - case resultChan <- cnt: - default: - } - - // Close the error channel to notify all goroutines - close(errChan) - + resultChan <- cnt } }(job) } - // Close the result channel when all workers are done go func() { wg.Wait() close(resultChan) }() - // Wait for the first message or all timeouts - select { - case cnt := <-resultChan: + if cnt, ok := <-resultChan; ok { return cnt, "", nil - case <-errChan: - // If one fails and exitOnMessage is true - return 0, "", nil } - + return 0, "", nil } // sendDeliveryAck builds a delivery acknowledgment for messageUuid and enqueues diff --git a/client/identity.go b/client/identity.go index c7a7507..54d4c57 100644 --- a/client/identity.go +++ b/client/identity.go @@ -425,7 +425,7 @@ func (id *Identity) GetRequestJobs() []RequestsJob { return list } -func (id *Identity) SaveBackgroundJob() error { +func (id *Identity) SaveCheckJobs() error { if id.RootKp == nil { return errors.New("identity not fully initialized: RootKp is nil") } diff --git a/client/messagestorage.go b/client/messagestorage.go index 8e67551..d47b371 100644 --- a/client/messagestorage.go +++ b/client/messagestorage.go @@ -6,6 +6,7 @@ import ( "math" "os" "path/filepath" + "sync" "forge.redroom.link/yves/meowlib" "github.com/google/uuid" @@ -13,71 +14,82 @@ import ( "google.golang.org/protobuf/proto" ) -func storeMessage(peer *Peer, usermessage *meowlib.UserMessage, filenames []string, password string) error { - var dbid string - cfg := GetConfig() - identity := cfg.GetIdentity() - // If no db/no ID create DB + Tablz - // TODO : if file size > X new db - if len(peer.DbIds) == 0 { - dbid = uuid.NewString() - peer.DbIds = []string{dbid} +// One RWMutex per SQLite file path. Entries are never deleted (bounded by +// peer count, which is small). RLock for reads, Lock for writes. +var dbFileMu sync.Map - identity.Peers.StorePeer(peer) - identity.CreateFolder() - file, err := os.Create(filepath.Join(cfg.StoragePath, identity.Uuid, dbid+GetConfig().DbSuffix)) - if err != nil { - return err - } - file.Close() - sqliteDatabase, err := sql.Open("sqlite3", filepath.Join(cfg.StoragePath, identity.Uuid, dbid+GetConfig().DbSuffix)) - if err != nil { - return err - } - defer sqliteDatabase.Close() - err = createMessageTable(sqliteDatabase) - if err != nil { - return err - } - sqliteDatabase.Close() - } else { - dbid = peer.DbIds[len(peer.DbIds)-1] - } - // Open Db - db, err := sql.Open("sqlite3", filepath.Join(cfg.StoragePath, identity.Uuid, dbid+GetConfig().DbSuffix)) // Open the created SQLite File +func getDbFileMutex(path string) *sync.RWMutex { + v, _ := dbFileMu.LoadOrStore(path, &sync.RWMutex{}) + return v.(*sync.RWMutex) +} + +func withDbWrite(path string, fn func(*sql.DB) error) error { + mu := getDbFileMutex(path) + mu.Lock() + defer mu.Unlock() + db, err := sql.Open("sqlite3", path) if err != nil { return err } defer db.Close() - // Detach Files + return fn(db) +} + +func withDbRead(path string, fn func(*sql.DB) error) error { + mu := getDbFileMutex(path) + mu.RLock() + defer mu.RUnlock() + db, err := sql.Open("sqlite3", path) + if err != nil { + return err + } + defer db.Close() + return fn(db) +} + +func dbPath(cfg *Config, identity *Identity, dbid string) string { + return filepath.Join(cfg.StoragePath, identity.Uuid, dbid+cfg.DbSuffix) +} + +func storeMessage(peer *Peer, usermessage *meowlib.UserMessage, filenames []string, password string) error { + cfg := GetConfig() + identity := cfg.GetIdentity() + + isNew := len(peer.DbIds) == 0 + var dbid string + if isNew { + dbid = uuid.NewString() + peer.DbIds = []string{dbid} + identity.Peers.StorePeer(peer) + identity.CreateFolder() + } else { + dbid = peer.DbIds[len(peer.DbIds)-1] + } + + // Detach file attachments — no DB lock needed for file I/O. hiddenFilenames := []string{} if len(usermessage.Files) > 0 { + secureDir := filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles") + if _, err := os.Stat(secureDir); os.IsNotExist(err) { + if err = os.MkdirAll(secureDir, 0755); err != nil { + return err + } + } for _, f := range usermessage.Files { hiddenFilename := uuid.NewString() - // Cypher file encData, err := meowlib.SymEncrypt(password, f.Data) if err != nil { return err } - if _, err := os.Stat(filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles")); os.IsNotExist(err) { - err = os.MkdirAll(filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles"), 0755) - if err != nil { - return err - } - } - os.WriteFile(filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles", hiddenFilename), encData, 0600) - hiddenFilenames = append(hiddenFilenames, filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles", hiddenFilename)) - // replace f.Data by uuid filename - f.Data = []byte(filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles", hiddenFilename)) + hidden := filepath.Join(secureDir, hiddenFilename) + os.WriteFile(hidden, encData, 0600) + hiddenFilenames = append(hiddenFilenames, hidden) + f.Data = []byte(hidden) } } - outbound := true - if usermessage.From == peer.ContactPublicKey { - outbound = false - } - // Convert UserMessage to DbMessage + + outbound := usermessage.From != peer.ContactPublicKey dbm := UserMessageToDbMessage(outbound, usermessage, hiddenFilenames) - // Encrypt message out, err := proto.Marshal(dbm) if err != nil { return err @@ -86,98 +98,94 @@ func storeMessage(peer *Peer, usermessage *meowlib.UserMessage, filenames []stri if err != nil { return err } - // Insert message - insertMessageSQL := `INSERT INTO message(m) VALUES (?) RETURNING ID` - statement, err := db.Prepare(insertMessageSQL) // Prepare statement. + + var id int64 + path := dbPath(cfg, identity, dbid) + err = withDbWrite(path, func(db *sql.DB) error { + // SQLite creates the file on first Open; create the table if new DB. + if isNew { + if err := createMessageTable(db); err != nil { + return err + } + } + stmt, err := db.Prepare(`INSERT INTO message(m) VALUES (?) RETURNING ID`) + if err != nil { + return err + } + result, err := stmt.Exec(encData) + if err != nil { + return err + } + id, err = result.LastInsertId() + return err + }) if err != nil { return err } - result, err := statement.Exec(encData) - if err != nil { - return err - } - id, err := result.LastInsertId() - if err != nil { - return err - } - ium := DbMessageToInternalUserMessage(id, dbid, dbm) - peer.LastMessage = ium + + peer.LastMessage = DbMessageToInternalUserMessage(id, dbid, dbm) identity.Peers.StorePeer(peer) return nil } -// Get new messages from a peer func loadNewMessages(peer *Peer, lastDbId int, password string) ([]*InternalUserMessage, error) { var messages []*InternalUserMessage cfg := GetConfig() identity := cfg.GetIdentity() - // handle no db yet if len(peer.DbIds) == 0 { return messages, nil } fileidx := len(peer.DbIds) - 1 - // There fileidx should provide the db that we need (unless wantMore overlaps the next DB) - db, err := sql.Open("sqlite3", filepath.Join(cfg.StoragePath, identity.Uuid, peer.DbIds[fileidx]+GetConfig().DbSuffix)) // Open the created SQLite File - if err != nil { - return nil, err - } - defer db.Close() - // if it's first app query, it won't hold a lastIndex, so let's start from end if lastDbId == 0 { lastDbId = math.MaxInt64 } - stm, err := db.Prepare("SELECT id, m FROM message WHERE id > ? ORDER BY id DESC") - if err != nil { - return nil, err - } - defer stm.Close() - rows, err := stm.Query(lastDbId) - if err != nil { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var ium *InternalUserMessage - var dbm meowlib.DbMessage - var id int64 - var m []byte - err = rows.Scan(&id, &m) + err := withDbRead(dbPath(cfg, identity, peer.DbIds[fileidx]), func(db *sql.DB) error { + stm, err := db.Prepare("SELECT id, m FROM message WHERE id > ? ORDER BY id DESC") if err != nil { - return nil, err + return err } - decdata, err := meowlib.SymDecrypt(password, m) + defer stm.Close() + rows, err := stm.Query(lastDbId) if err != nil { - return nil, err + return err } - err = proto.Unmarshal(decdata, &dbm) - if err != nil { - return nil, err + defer rows.Close() + for rows.Next() { + var id int64 + var m []byte + if err = rows.Scan(&id, &m); err != nil { + return err + } + decdata, err := meowlib.SymDecrypt(password, m) + if err != nil { + return err + } + var dbm meowlib.DbMessage + if err = proto.Unmarshal(decdata, &dbm); err != nil { + return err + } + ium := DbMessageToInternalUserMessage(id, peer.DbIds[fileidx], &dbm) + ium.Dbid = id + ium.Dbfile = peer.DbIds[fileidx] + messages = append(messages, ium) } - - ium = DbMessageToInternalUserMessage(id, peer.DbIds[fileidx], &dbm) - ium.Dbid = id - ium.Dbfile = peer.DbIds[fileidx] - messages = append(messages, ium) - } + return nil + }) // TODO DB overlap - return messages, nil + return messages, err } -// Get old messages from a peer func loadMessagesHistory(peer *Peer, inAppMsgCount int, lastDbId int, wantMore int, password string) ([]InternalUserMessage, error) { var messages []InternalUserMessage - // handle no db yet + cfg := GetConfig() if len(peer.DbIds) == 0 { return messages, nil } fileidx := len(peer.DbIds) - 1 - // initialize count with last db message count countStack, err := getMessageCount(peer.DbIds[fileidx]) if err != nil { return nil, err } - // while the db message count < what we already have in app, step to next db file for inAppMsgCount > countStack { fileidx-- if fileidx < 0 { @@ -189,91 +197,80 @@ func loadMessagesHistory(peer *Peer, inAppMsgCount int, lastDbId int, wantMore i } countStack += newCount } - // There fileidx should provide the db that we need (unless wantMore overlaps the next DB) - db, err := sql.Open("sqlite3", filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, peer.DbIds[fileidx]+GetConfig().DbSuffix)) // Open the created SQLite File - if err != nil { - return nil, err - } - defer db.Close() - // if it's first app query, it won't hold a lastIndex, so let's start from end if lastDbId == 0 { lastDbId = math.MaxInt64 } - stm, err := db.Prepare("SELECT id, m FROM message WHERE id < ? ORDER BY id DESC LIMIT ?") - if err != nil { - return nil, err - } - defer stm.Close() - rows, err := stm.Query(lastDbId, wantMore) - if err != nil { - return nil, err - } - defer rows.Close() - - for rows.Next() { - var ium *InternalUserMessage - var dbm meowlib.DbMessage - var id int64 - var m []byte - err = rows.Scan(&id, &m) + err = withDbRead(filepath.Join(cfg.StoragePath, cfg.GetIdentity().Uuid, peer.DbIds[fileidx]+cfg.DbSuffix), func(db *sql.DB) error { + stm, err := db.Prepare("SELECT id, m FROM message WHERE id < ? ORDER BY id DESC LIMIT ?") if err != nil { - return nil, err + return err } - decdata, err := meowlib.SymDecrypt(password, m) + defer stm.Close() + rows, err := stm.Query(lastDbId, wantMore) if err != nil { - return nil, err + return err } - err = proto.Unmarshal(decdata, &dbm) - if err != nil { - return nil, err + defer rows.Close() + for rows.Next() { + var id int64 + var m []byte + if err = rows.Scan(&id, &m); err != nil { + return err + } + decdata, err := meowlib.SymDecrypt(password, m) + if err != nil { + return err + } + var dbm meowlib.DbMessage + if err = proto.Unmarshal(decdata, &dbm); err != nil { + return err + } + ium := DbMessageToInternalUserMessage(id, peer.DbIds[fileidx], &dbm) + ium.Dbid = id + ium.Dbfile = peer.DbIds[fileidx] + messages = append(messages, *ium) } - - ium = DbMessageToInternalUserMessage(id, peer.DbIds[fileidx], &dbm) - ium.Dbid = id - ium.Dbfile = peer.DbIds[fileidx] - - messages = append(messages, *ium) - } + return nil + }) // TODO DB overlap - return messages, nil + return messages, err } func GetDbMessage(dbFile string, dbId int64, password string) (*meowlib.DbMessage, error) { - // There fileidx should provide the db that we need (unless wantMore overlaps the next DB) - db, err := sql.Open("sqlite3", filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, dbFile+GetConfig().DbSuffix)) // Open the created SQLite dbFile - if err != nil { - return nil, err - } - defer db.Close() - - stm, err := db.Prepare("SELECT id, m FROM message WHERE id=?") - if err != nil { - return nil, err - } - defer stm.Close() - rows, err := stm.Query(dbId) - if err != nil { - return nil, err - } - defer rows.Close() + cfg := GetConfig() + path := filepath.Join(cfg.StoragePath, cfg.GetIdentity().Uuid, dbFile+cfg.DbSuffix) var dbm meowlib.DbMessage found := false - for rows.Next() { - found = true - var id int64 - var m []byte - err = rows.Scan(&id, &m) + err := withDbRead(path, func(db *sql.DB) error { + stm, err := db.Prepare("SELECT id, m FROM message WHERE id=?") if err != nil { - return nil, err + return err } - decdata, err := meowlib.SymDecrypt(password, m) + defer stm.Close() + rows, err := stm.Query(dbId) if err != nil { - return nil, err + return err } - err = proto.Unmarshal(decdata, &dbm) - if err != nil { - return nil, err + defer rows.Close() + for rows.Next() { + found = true + var id int64 + var m []byte + if err = rows.Scan(&id, &m); err != nil { + return err + } + decdata, err := meowlib.SymDecrypt(password, m) + if err != nil { + return err + } + if err = proto.Unmarshal(decdata, &dbm); err != nil { + return err + } } + return nil + }) + if err != nil { + return nil, err } if !found { return nil, fmt.Errorf("message row %d not found in %s", dbId, dbFile) @@ -282,12 +279,8 @@ func GetDbMessage(dbFile string, dbId int64, password string) (*meowlib.DbMessag } func UpdateDbMessage(dbm *meowlib.DbMessage, dbFile string, dbId int64, password string) error { - db, err := sql.Open("sqlite3", filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, dbFile+GetConfig().DbSuffix)) // Open the created SQLite dbFile - if err != nil { - return err - } - defer db.Close() - // Encrypt message + cfg := GetConfig() + path := filepath.Join(cfg.StoragePath, cfg.GetIdentity().Uuid, dbFile+cfg.DbSuffix) out, err := proto.Marshal(dbm) if err != nil { return err @@ -296,20 +289,16 @@ func UpdateDbMessage(dbm *meowlib.DbMessage, dbFile string, dbId int64, password if err != nil { return err } - // Insert message - updateMessageSQL := `UPDATE message SET m=? WHERE id=?` - statement, err := db.Prepare(updateMessageSQL) // Prepare statement. - if err != nil { + return withDbWrite(path, func(db *sql.DB) error { + stmt, err := db.Prepare(`UPDATE message SET m=? WHERE id=?`) + if err != nil { + return err + } + _, err = stmt.Exec(encData, dbId) return err - } - _, err = statement.Exec(encData, dbId) - if err != nil { - return err - } - return nil + }) } -// Get old messages from a peer func GetMessagePreview(dbFile string, dbId int64, password string) ([]byte, error) { dbm, err := GetDbMessage(dbFile, dbId, password) if err != nil { @@ -318,24 +307,15 @@ func GetMessagePreview(dbFile string, dbId int64, password string) ([]byte, erro return FilePreview(dbm.FilePaths[0], password) } -// decrypt the a file and returns the raw content func FilePreview(filename string, password string) ([]byte, error) { - // get the hidden file encData, err := os.ReadFile(filename) if err != nil { return nil, err } - // decrypt the file - data, err := meowlib.SymDecrypt(password, encData) - if err != nil { - return nil, err - } - return data, nil + return meowlib.SymDecrypt(password, encData) } -// return the raw content from the files content (loads the first image, or build a more complex view) func InternalUserMessagePreview(msg *InternalUserMessage, password string) ([]byte, error) { - // get the hidden file name if len(msg.FilePaths) == 0 { return nil, nil } @@ -343,21 +323,16 @@ func InternalUserMessagePreview(msg *InternalUserMessage, password string) ([]by } func getMessageCount(dbid string) (int, error) { - db, err := sql.Open("sqlite3", filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, dbid+GetConfig().DbSuffix)) // Open the created SQLite File - if err != nil { - return 0, err - } - defer db.Close() + cfg := GetConfig() + path := filepath.Join(cfg.StoragePath, cfg.GetIdentity().Uuid, dbid+cfg.DbSuffix) var count int - query := "SELECT COUNT(*) FROM message" - err = db.QueryRow(query).Scan(&count) - if err != nil { - return 0, err - } - return count, nil + err := withDbRead(path, func(db *sql.DB) error { + return db.QueryRow("SELECT COUNT(*) FROM message").Scan(&count) + }) + return count, err } -// SetMessageServerDelivery updates the server delivery UUID and timestamp for an existing stored message. +// SetMessageServerDelivery updates the server delivery UUID and timestamp for a stored message. func SetMessageServerDelivery(dbFile string, dbId int64, serverUid string, receiveTime uint64, password string) error { dbm, err := GetDbMessage(dbFile, dbId, password) if err != nil { @@ -375,37 +350,42 @@ func FindMessageByUuid(peer *Peer, messageUuid string, password string) (string, identity := cfg.GetIdentity() for i := len(peer.DbIds) - 1; i >= 0; i-- { dbid := peer.DbIds[i] - db, err := sql.Open("sqlite3", filepath.Join(cfg.StoragePath, identity.Uuid, dbid+GetConfig().DbSuffix)) - if err != nil { - continue - } - rows, err := db.Query("SELECT id, m FROM message ORDER BY id DESC") - if err != nil { - db.Close() - continue - } - for rows.Next() { - var id int64 - var m []byte - if err := rows.Scan(&id, &m); err != nil { - continue - } - decdata, err := meowlib.SymDecrypt(password, m) + path := filepath.Join(cfg.StoragePath, identity.Uuid, dbid+cfg.DbSuffix) + var foundFile string + var foundId int64 + var foundMsg meowlib.DbMessage + err := withDbRead(path, func(db *sql.DB) error { + rows, err := db.Query("SELECT id, m FROM message ORDER BY id DESC") if err != nil { - continue + return err } - var dbm meowlib.DbMessage - if err := proto.Unmarshal(decdata, &dbm); err != nil { - continue - } - if dbm.Status != nil && dbm.Status.Uuid == messageUuid { - rows.Close() - db.Close() - return dbid, id, &dbm, nil + defer rows.Close() + for rows.Next() { + var id int64 + var m []byte + if err := rows.Scan(&id, &m); err != nil { + continue + } + decdata, err := meowlib.SymDecrypt(password, m) + if err != nil { + continue + } + var dbm meowlib.DbMessage + if err := proto.Unmarshal(decdata, &dbm); err != nil { + continue + } + if dbm.Status != nil && dbm.Status.Uuid == messageUuid { + foundFile = dbid + foundId = id + foundMsg = dbm + return nil + } } + return nil + }) + if err == nil && foundFile != "" { + return foundFile, foundId, &foundMsg, nil } - rows.Close() - db.Close() } return "", 0, nil, fmt.Errorf("message with UUID %s not found", messageUuid) } @@ -430,19 +410,18 @@ func UpdateMessageAck(peer *Peer, messageUuid string, receivedAt uint64, process } func createMessageTable(db *sql.DB) error { - createMessageTableSQL := `CREATE TABLE message ( - "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, - "m" BLOB);` // SQL Statement for Create Table - statement, err := db.Prepare(createMessageTableSQL) // Prepare SQL Statement + stmt, err := db.Prepare(`CREATE TABLE message ( + "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, + "m" BLOB)`) if err != nil { return err } - statement.Exec() // Execute SQL Statements + stmt.Exec() return nil } func createServerTable(db *sql.DB) error { - createServerTableSQL := `CREATE TABLE servers ( + stmt, err := db.Prepare(`CREATE TABLE servers ( "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "country" varchar(2), "public" bool, @@ -451,13 +430,12 @@ func createServerTable(db *sql.DB) error { "load" float, "url" varchar(2000) "name" varchar(255); - "description" varchar(5000) + "description" varchar(5000) "publickey" varchar(10000) - )` // SQL Statement for Create Table - statement, err := db.Prepare(createServerTableSQL) // Prepare SQL Statement + )`) if err != nil { return err } - statement.Exec() // Execute SQL Statements + stmt.Exec() return nil } diff --git a/client/peerstorage.go b/client/peerstorage.go index 354927d..2b8a443 100644 --- a/client/peerstorage.go +++ b/client/peerstorage.go @@ -9,6 +9,7 @@ import ( "errors" "path/filepath" "sort" + "sync" "forge.redroom.link/yves/meowlib" "github.com/dgraph-io/badger" @@ -17,11 +18,12 @@ import ( type PeerStorage struct { DbFile string `json:"db_file,omitempty"` + mu sync.RWMutex db *badger.DB cache map[string]*Peer } -// Open the badger database from struct PeerStorage +// open opens the Badger database. Caller must hold mu (write). func (ps *PeerStorage) open() error { if ps.DbFile == "" { ps.DbFile = uuid.New().String() @@ -34,20 +36,27 @@ func (ps *PeerStorage) open() error { opts.Logger = nil var err error ps.db, err = badger.Open(opts) - if err != nil { - return err - } - return nil + return err } -// Store function StorePeer stores a peer in the badger database with Peer.Uid as key +// close closes the Badger database. Caller must hold mu (write). +func (ps *PeerStorage) close() { + ps.db.Close() +} + +// StorePeer stores a peer in the Badger database with Peer.Uid as key. func (ps *PeerStorage) StorePeer(peer *Peer) error { - err := ps.open() - if err != nil { + ps.mu.Lock() + defer ps.mu.Unlock() + return ps.storePeerLocked(peer) +} + +// storePeerLocked is StorePeer without acquiring the lock. Caller must hold mu (write). +func (ps *PeerStorage) storePeerLocked(peer *Peer) error { + if err := ps.open(); err != nil { return err } defer ps.close() - // first marshal the Peer to bytes with protobuf jsonsrv, err := json.Marshal(peer) if err != nil { return err @@ -65,26 +74,24 @@ func (ps *PeerStorage) StorePeer(peer *Peer) error { } shakey := sha256.Sum256([]byte(peer.Uid)) key := shakey[:] - // add it to cache ps.cache[peer.Uid] = peer - // then store it in the database return ps.db.Update(func(txn *badger.Txn) error { return txn.Set(key, data) }) - } -// LoadPeer function loads a Peer from the badger database with Peer.GetUid() as key +// LoadPeer loads a Peer from the Badger database with Peer.GetUid() as key. func (ps *PeerStorage) LoadPeer(uid string, password string) (*Peer, error) { + ps.mu.Lock() + defer ps.mu.Unlock() var peer Peer - err := ps.open() - if err != nil { + if err := ps.open(); err != nil { return nil, err } defer ps.close() shakey := sha256.Sum256([]byte(uid)) key := shakey[:] - err = ps.db.View(func(txn *badger.Txn) error { + err := ps.db.View(func(txn *badger.Txn) error { item, err := txn.Get(key) if err != nil { return err @@ -100,16 +107,17 @@ func (ps *PeerStorage) LoadPeer(uid string, password string) (*Peer, error) { return &peer, err } -// DeletePeer function deletes a Peer from the badger database with Peer.GetUid() as key +// DeletePeer deletes a Peer from the Badger database with Peer.GetUid() as key. func (ps *PeerStorage) DeletePeer(uid string) error { - err := ps.open() - if err != nil { + ps.mu.Lock() + defer ps.mu.Unlock() + if err := ps.open(); err != nil { return err } defer ps.close() shakey := sha256.Sum256([]byte(uid)) key := shakey[:] - err = ps.db.Update(func(txn *badger.Txn) error { + err := ps.db.Update(func(txn *badger.Txn) error { return txn.Delete(key) }) if err == nil { @@ -118,15 +126,16 @@ func (ps *PeerStorage) DeletePeer(uid string) error { return err } -// LoadPeers function loads Peers from the badger database with a specific password +// LoadPeers loads all Peers from the Badger database and populates the cache. func (ps *PeerStorage) LoadPeers(password string) ([]*Peer, error) { + ps.mu.Lock() + defer ps.mu.Unlock() var peers []*Peer - err := ps.open() - if err != nil { + if err := ps.open(); err != nil { return nil, err } defer ps.close() - err = ps.db.View(func(txn *badger.Txn) error { + err := ps.db.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions opts.PrefetchSize = 10 it := txn.NewIterator(opts) @@ -148,32 +157,29 @@ func (ps *PeerStorage) LoadPeers(password string) ([]*Peer, error) { } return nil }) - // Sort peers based on peer.Name sort.Slice(peers, func(i, j int) bool { return peers[i].Name < peers[j].Name }) return peers, err } -// GetPeers function returns all peers from the cache as a sorted array +// GetPeers returns all peers from the cache as a sorted slice. func (ps *PeerStorage) GetPeers() ([]*Peer, error) { + ps.mu.RLock() + defer ps.mu.RUnlock() peers := make([]*Peer, 0, len(ps.cache)) for _, peer := range ps.cache { peers = append(peers, peer) } - // Sort peers based on peer.Name sort.Slice(peers, func(i, j int) bool { return peers[i].Name < peers[j].Name }) return peers, nil } -// close the badger database -func (ps *PeerStorage) close() { - ps.db.Close() -} - func (ps *PeerStorage) GetFromPublicKey(publickey string) *Peer { + ps.mu.RLock() + defer ps.mu.RUnlock() for _, peer := range ps.cache { if peer.ContactPublicKey == publickey { return peer @@ -183,6 +189,8 @@ func (ps *PeerStorage) GetFromPublicKey(publickey string) *Peer { } func (ps *PeerStorage) GetFromInvitationId(invitationId string) *Peer { + ps.mu.RLock() + defer ps.mu.RUnlock() for _, peer := range ps.cache { if peer.InvitationId == invitationId { return peer @@ -192,6 +200,8 @@ func (ps *PeerStorage) GetFromInvitationId(invitationId string) *Peer { } func (ps *PeerStorage) GetFromMyLookupKey(publickey string) *Peer { + ps.mu.RLock() + defer ps.mu.RUnlock() for _, peer := range ps.cache { if peer.MyLookupKp.Public == publickey { return peer @@ -201,6 +211,8 @@ func (ps *PeerStorage) GetFromMyLookupKey(publickey string) *Peer { } func (ps *PeerStorage) NameExists(name string) bool { + ps.mu.RLock() + defer ps.mu.RUnlock() for _, peer := range ps.cache { if peer.Name == name { return true @@ -210,6 +222,8 @@ func (ps *PeerStorage) NameExists(name string) bool { } func (ps *PeerStorage) GetFromName(name string) *Peer { + ps.mu.RLock() + defer ps.mu.RUnlock() for _, peer := range ps.cache { if peer.Name == name { return peer @@ -219,26 +233,29 @@ func (ps *PeerStorage) GetFromName(name string) *Peer { } func (ps *PeerStorage) GetFromUid(uid string) *Peer { + ps.mu.RLock() + defer ps.mu.RUnlock() return ps.cache[uid] } -// Checks if the received contact card is an answer to an invitation, returns true if it is, and the proposed and received nicknames +// CheckInvitation checks if the received contact card is an answer to an invitation. func (ps *PeerStorage) CheckInvitation(ReceivedContact *meowlib.ContactCard) (isAnswer bool, proposedNick string, receivedNick string, invitationMessage string) { - // invitation Id found, this is an answer to an invitation + ps.mu.RLock() + defer ps.mu.RUnlock() for _, p := range ps.cache { if p.InvitationId == ReceivedContact.InvitationId { return true, p.Name, ReceivedContact.Name, ReceivedContact.InvitationMessage } } - // it's an invitation return false, "", ReceivedContact.Name, ReceivedContact.InvitationMessage } -// Finalizes an invitation, returns nil if successful +// FinalizeInvitation completes an invitation handshake and persists the updated peer. func (ps *PeerStorage) FinalizeInvitation(ReceivedContact *meowlib.ContactCard) error { + ps.mu.Lock() + defer ps.mu.Unlock() for i, p := range ps.cache { if p.InvitationId == ReceivedContact.InvitationId { - //id.Peers[i].Name = ReceivedContact.Name ps.cache[i].ContactEncryption = ReceivedContact.EncryptionPublicKey ps.cache[i].ContactLookupKey = ReceivedContact.LookupPublicKey ps.cache[i].ContactPublicKey = ReceivedContact.ContactPublicKey @@ -250,8 +267,7 @@ func (ps *PeerStorage) FinalizeInvitation(ReceivedContact *meowlib.ContactCard) srvs = append(srvs, ReceivedContact.PullServers[srv].GetUid()) } ps.cache[i].ContactPullServers = srvs - ps.StorePeer(ps.cache[i]) - return nil + return ps.storePeerLocked(ps.cache[i]) } } return errors.New("no matching contact found for invitationId " + ReceivedContact.InvitationId) diff --git a/client/serverstorage.go b/client/serverstorage.go index 0c6e628..8037000 100644 --- a/client/serverstorage.go +++ b/client/serverstorage.go @@ -7,6 +7,7 @@ import ( "crypto/sha256" "encoding/json" "path/filepath" + "sync" "forge.redroom.link/yves/meowlib" "github.com/dgraph-io/badger" @@ -14,30 +15,37 @@ import ( type ServerStorage struct { DbFile string `json:"db_file,omitempty"` + mu sync.Mutex db *badger.DB } -// Open a badger database from struct ServerStorage +// open opens the Badger database. Caller must hold mu. func (ss *ServerStorage) open() error { - opts := badger.DefaultOptions(filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, ss.DbFile)) opts.Logger = nil var err error ss.db, err = badger.Open(opts) - if err != nil { - return err - } - return nil + return err } -// Store function StoreServer stores a server in a badger database with Server.GetUid() as key +// close closes the Badger database. Caller must hold mu. +func (ss *ServerStorage) close() { + ss.db.Close() +} + +// StoreServer stores a server in the Badger database with Server.GetUid() as key. func (ss *ServerStorage) StoreServer(sc *Server) error { - err := ss.open() - if err != nil { + ss.mu.Lock() + defer ss.mu.Unlock() + return ss.storeServerLocked(sc) +} + +// storeServerLocked is StoreServer without acquiring the lock. Caller must hold mu. +func (ss *ServerStorage) storeServerLocked(sc *Server) error { + if err := ss.open(); err != nil { return err } defer ss.close() - // first marshal the Server to bytes with protobuf jsonsrv, err := json.Marshal(sc) if err != nil { return err @@ -52,51 +60,56 @@ func (ss *ServerStorage) StoreServer(sc *Server) error { } shakey := sha256.Sum256([]byte(sc.GetServerCard().GetUid())) key := shakey[:] - // then store it in the database return ss.db.Update(func(txn *badger.Txn) error { return txn.Set(key, data) }) - } -// Check if a server exists in a badger database with Server.GetUid() as key +// ServerExists checks if a server exists in the Badger database. func (ss *ServerStorage) ServerExists(sc *Server) (bool, error) { - err := ss.open() - if err != nil { + ss.mu.Lock() + defer ss.mu.Unlock() + return ss.serverExistsLocked(sc) +} + +// serverExistsLocked is ServerExists without acquiring the lock. Caller must hold mu. +func (ss *ServerStorage) serverExistsLocked(sc *Server) (bool, error) { + if err := ss.open(); err != nil { return false, err } defer ss.close() - shakey := sha256.Sum256([]byte(sc.GetServerCard().GetUid())) key := shakey[:] - // check if key exists in badger database - err = ss.db.View(func(txn *badger.Txn) error { + err := ss.db.View(func(txn *badger.Txn) error { _, err := txn.Get(key) return err - }) // Add a comma here - if err != nil { // key does not exist + }) + if err != nil { return false, nil } return true, nil } -// Store a server in a badger database with Server.GetUid() as key if it is not already there +// StoreServerIfNotExists stores a server only if it is not already present. func (ss *ServerStorage) StoreServerIfNotExists(sc *Server) error { - exists, err := ss.ServerExists(sc) + ss.mu.Lock() + defer ss.mu.Unlock() + exists, err := ss.serverExistsLocked(sc) if err != nil { return err } if !exists { - return ss.StoreServer(sc) + return ss.storeServerLocked(sc) } return nil } -// LoadServer function loads a Server from a badger database with Server.GetUid() as key +// LoadServer loads a Server from the Badger database by uid. func (ss *ServerStorage) LoadServer(uid string) (*Server, error) { + ss.mu.Lock() + defer ss.mu.Unlock() var sc Server - err := ss.open() - if err != nil { + if err := ss.open(); err != nil { return nil, err } defer ss.close() @@ -122,10 +135,11 @@ func (ss *ServerStorage) LoadServer(uid string) (*Server, error) { return &sc, err } -// DeleteServer function deletes a Server from a badger database with Server.GetUid() as key +// DeleteServer deletes a Server from the Badger database by uid. func (ss *ServerStorage) DeleteServer(uid string) error { - err := ss.open() - if err != nil { + ss.mu.Lock() + defer ss.mu.Unlock() + if err := ss.open(); err != nil { return err } defer ss.close() @@ -136,11 +150,12 @@ func (ss *ServerStorage) DeleteServer(uid string) error { }) } -// LoadAllServers function loads all Servers from a badger database +// LoadAllServers loads all Servers from the Badger database. func (ss *ServerStorage) LoadAllServers() ([]*Server, error) { + ss.mu.Lock() + defer ss.mu.Unlock() var scs []*Server - err := ss.open() - if err != nil { + if err := ss.open(); err != nil { return nil, err } defer ss.close() @@ -173,11 +188,12 @@ func (ss *ServerStorage) LoadAllServers() ([]*Server, error) { return scs, err } -// LoadAllServers function loads all ServersCards from a badger database +// LoadAllServerCards loads all ServerCards from the Badger database. func (ss *ServerStorage) LoadAllServerCards() ([]*meowlib.ServerCard, error) { + ss.mu.Lock() + defer ss.mu.Unlock() var scs []*meowlib.ServerCard - err := ss.open() - if err != nil { + if err := ss.open(); err != nil { return nil, err } defer ss.close() @@ -210,11 +226,12 @@ func (ss *ServerStorage) LoadAllServerCards() ([]*meowlib.ServerCard, error) { return scs, err } -// LoadServersFromUids function loads Servers with id in []Uid parameter from a badger database +// LoadServersFromUids loads Servers whose UIDs are in the provided slice. func (ss *ServerStorage) LoadServersFromUids(uids []string) ([]*Server, error) { + ss.mu.Lock() + defer ss.mu.Unlock() var scs []*Server - err := ss.open() - if err != nil { + if err := ss.open(); err != nil { return nil, err } defer ss.close() @@ -248,11 +265,12 @@ func (ss *ServerStorage) LoadServersFromUids(uids []string) ([]*Server, error) { return scs, err } -// LoadServersFromUids function loads Servers with id in []Uid parameter from a badger database +// LoadServerCardsFromUids loads ServerCards whose UIDs are in the provided slice. func (ss *ServerStorage) LoadServerCardsFromUids(uids []string) ([]*meowlib.ServerCard, error) { + ss.mu.Lock() + defer ss.mu.Unlock() var scs []*meowlib.ServerCard - err := ss.open() - if err != nil { + if err := ss.open(); err != nil { return nil, err } defer ss.close() @@ -285,8 +303,3 @@ func (ss *ServerStorage) LoadServerCardsFromUids(uids []string) ([]*meowlib.Serv }) return scs, err } - -// close a badger database -func (ss *ServerStorage) close() { - ss.db.Close() -} diff --git a/http.go b/http.go index 92fb9f0..75e4611 100644 --- a/http.go +++ b/http.go @@ -38,6 +38,12 @@ func HttpPostMessage(url string, msg []byte, timeout int) (response []byte, err defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { + // Server already accepted the request (2xx) — body truncation on our + // side doesn't mean the message wasn't stored. Return what we have so + // the caller doesn't retry and produce a duplicate. + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return body, nil + } return nil, err } return body, nil