Compare commits

..

6 Commits

Author SHA1 Message Date
yves 9037a7b3c7 subscriptions cleanup
continuous-integration/drone/push Build is failing
2026-04-21 20:07:35 +02:00
yves ac305eaae0 redis subscriptions fix
continuous-integration/drone/push Build is failing
2026-04-21 19:21:16 +02:00
yves 5d12e0f869 long poll fix
continuous-integration/drone/push Build is failing
2026-04-21 17:00:58 +02:00
yves 8b106db52f duplicate messages send fixes
continuous-integration/drone/push Build is failing
2026-04-21 15:53:56 +02:00
yves 4e78ce5799 reorder received messages
continuous-integration/drone/push Build is failing
2026-04-19 18:36:44 +02:00
yves b9a1233e0e uuid return on create ne message
continuous-integration/drone/push Build is failing
2026-04-18 21:46:28 +02:00
13 changed files with 555 additions and 529 deletions
+105 -107
View File
@@ -1,10 +1,12 @@
package helpers package helpers
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"sort"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@@ -14,6 +16,7 @@ import (
invmsgs "forge.redroom.link/yves/meowlib/client/invitation/messages" invmsgs "forge.redroom.link/yves/meowlib/client/invitation/messages"
invsrv "forge.redroom.link/yves/meowlib/client/invitation/server" invsrv "forge.redroom.link/yves/meowlib/client/invitation/server"
"github.com/google/uuid" "github.com/google/uuid"
doubleratchet "github.com/status-im/doubleratchet"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
@@ -106,7 +109,7 @@ func PollServer(storage_path string, job *client.RequestsJob, timeout int, longP
// SaveCheckJobs // SaveCheckJobs
func SaveCheckJobs() (string, error) { func SaveCheckJobs() (string, error) {
me := client.GetConfig().GetIdentity() me := client.GetConfig().GetIdentity()
err := me.SaveBackgroundJob() err := me.SaveCheckJobs()
if err != nil { if err != nil {
return "CheckMessages: json.Marshal", err return "CheckMessages: json.Marshal", err
@@ -154,12 +157,25 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error
} }
// Chat messages // Chat messages
if len(fromServerMessage.Chat) > 0 { if len(fromServerMessage.Chat) > 0 {
// Sort by DR chain sequence number so messages are decrypted in ratchet order,
// regardless of server delivery order.
sort.SliceStable(fromServerMessage.Chat, func(i, j int) bool {
var hi, hj doubleratchet.MessageHeader
if err := json.Unmarshal(fromServerMessage.Chat[i].DrHeader, &hi); err != nil {
return false
}
if err := json.Unmarshal(fromServerMessage.Chat[j].DrHeader, &hj); err != nil {
return false
}
return hi.N < hj.N
})
for _, packedUserMessage := range fromServerMessage.Chat { for _, packedUserMessage := range fromServerMessage.Chat {
// find the peer with that lookup key // find the peer with that lookup key
peer := identity.Peers.GetFromMyLookupKey(packedUserMessage.Destination) peer := identity.Peers.GetFromMyLookupKey(packedUserMessage.Destination)
if peer == nil { if peer == nil {
return nil, nil, "ReadMessage: GetFromMyLookupKey", errors.New("no visible peer for that message") logger.Error().Str("destination", packedUserMessage.Destination).Msg("ConsumeInboxFile: no visible peer for that message, skipping")
continue
} }
// Unpack the message — step-3 messages arrive before the initiator's identity // Unpack the message — step-3 messages arrive before the initiator's identity
// key is known, so skip signature verification for pending peers. // key is known, so skip signature verification for pending peers.
@@ -170,100 +186,103 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error
usermsg, err = peer.ProcessInboundUserMessage(packedUserMessage) usermsg, err = peer.ProcessInboundUserMessage(packedUserMessage)
} }
if err != nil { if err != nil {
return nil, nil, "ReadMessage: ProcessInboundUserMessage", err //return nil, nil, "ReadMessage: ProcessInboundUserMessage", err
} logger.Error().Msg("ReadMessage: ProcessInboundUserMessage" + err.Error())
} else {
// Handle invitation step 3: initiator's full ContactCard arriving at the invitee. // Handle invitation step 3: initiator's full ContactCard arriving at the invitee.
if usermsg.Invitation != nil && usermsg.Invitation.Step == 3 { if usermsg.Invitation != nil && usermsg.Invitation.Step == 3 {
invBytes, marshalErr := proto.Marshal(usermsg.Invitation) invBytes, marshalErr := proto.Marshal(usermsg.Invitation)
if marshalErr == nil { if marshalErr == nil {
finalizedPeer, finalErr := invmsgs.Step4InviteeFinalizesInitiator(invBytes) finalizedPeer, finalErr := invmsgs.Step4InviteeFinalizesInitiator(invBytes)
if finalErr == nil && finalizedPeer != nil { if finalErr == nil && finalizedPeer != nil {
// Auto-send step-4 confirmation to initiator's servers. // Auto-send step-4 confirmation to initiator's servers.
step4msgs, sendErr := invsrv.Step4PostConfirmation(finalizedPeer.InvitationId) step4msgs, sendErr := invsrv.Step4PostConfirmation(finalizedPeer.InvitationId)
if sendErr == nil { if sendErr == nil {
for i, bytemsg := range step4msgs { for i, bytemsg := range step4msgs {
if i < len(finalizedPeer.ContactPullServers) { if i < len(finalizedPeer.ContactPullServers) {
meowlib.HttpPostMessage(finalizedPeer.ContactPullServers[i], bytemsg, client.GetConfig().HttpTimeOut) meowlib.HttpPostMessage(finalizedPeer.ContactPullServers[i], bytemsg, client.GetConfig().HttpTimeOut)
}
} }
} }
} }
} }
continue
} }
continue
}
// Handle invitation step 4: invitee's confirmation arriving at the initiator. // Handle invitation step 4: invitee's confirmation arriving at the initiator.
if usermsg.Invitation != nil && usermsg.Invitation.Step == 4 { if usermsg.Invitation != nil && usermsg.Invitation.Step == 4 {
// Contact is fully active — nothing more to do on the initiator side. // Contact is fully active — nothing more to do on the initiator side.
continue continue
}
// Check for received or processed already filled => it's an ack for one of our sent messages
if len(usermsg.Data) == 0 && usermsg.Status != nil && usermsg.Status.Uuid != "" &&
(usermsg.Status.Received != 0 || usermsg.Status.Processed != 0) {
password, _ := client.GetConfig().GetMemPass()
if ackErr := client.UpdateMessageAck(peer, usermsg.Status.Uuid, usermsg.Status.Received, usermsg.Status.Processed, password); ackErr != nil {
logger.Warn().Err(ackErr).Str("uuid", usermsg.Status.Uuid).Msg("ConsumeInboxFile: UpdateMessageAck")
} }
continue
}
//fmt.Println("From:", usermsg.From) // Check for received or processed already filled => it's an ack for one of our sent messages
//jsonUserMessage, _ := json.Marshal(usermsg) if len(usermsg.Data) == 0 && usermsg.Status != nil && usermsg.Status.Uuid != "" &&
//fmt.Println(string(jsonUserMessage)) (usermsg.Status.Received != 0 || usermsg.Status.Processed != 0) {
//peer = client.GetConfig().GetIdentity().Peers.GetFromPublicKey(usermsg.From) password, _ := client.GetConfig().GetMemPass()
if ackErr := client.UpdateMessageAck(peer, usermsg.Status.Uuid, usermsg.Status.Received, usermsg.Status.Processed, password); ackErr != nil {
logger.Warn().Err(ackErr).Str("uuid", usermsg.Status.Uuid).Msg("ConsumeInboxFile: UpdateMessageAck")
}
continue
}
// detach files //fmt.Println("From:", usermsg.From)
if usermsg.Files != nil { //jsonUserMessage, _ := json.Marshal(usermsg)
// create files folder //fmt.Println(string(jsonUserMessage))
if _, err := os.Stat(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files")); os.IsNotExist(err) { //peer = client.GetConfig().GetIdentity().Peers.GetFromPublicKey(usermsg.From)
err = os.MkdirAll(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files"), 0700)
if err != nil { // detach files
return nil, nil, "ReadMessage: MkdirAll", err if usermsg.Files != nil {
// create files folder
if _, err := os.Stat(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files")); os.IsNotExist(err) {
err = os.MkdirAll(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files"), 0700)
if err != nil {
return nil, nil, "ReadMessage: MkdirAll", err
}
}
for _, file := range usermsg.Files {
filename := uuid.New().String() + "_" + file.Filename
filenames = append(filenames, peer.Name+" sent: "+filename)
// detach file
os.WriteFile(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files", filename), file.Data, 0600)
}
//? result["invitation finalized"] = peer.Name
}
// user message
messagesOverview = append(messagesOverview, peer.Name+" > "+string(usermsg.Data))
// stamp the received time before storing
receivedAt := time.Now().UTC().Unix()
if usermsg.Status == nil {
usermsg.Status = &meowlib.ConversationStatus{}
}
usermsg.Status.Received = uint64(receivedAt)
// add message to storage
err = peer.StoreMessage(usermsg, filenames)
if err != nil {
return nil, nil, "ReadMessage: StoreMessage", err
}
filenames = []string{}
// Persist peer to save updated DR state (DrStateJson)
if peer.DrRootKey != "" {
if storeErr := identity.Peers.StorePeer(peer); storeErr != nil {
logger.Warn().Err(storeErr).Str("peer", peer.Uid).Msg("ConsumeInboxFile: StorePeer (DR state)")
} }
} }
for _, file := range usermsg.Files {
filename := uuid.New().String() + "_" + file.Filename
filenames = append(filenames, peer.Name+" sent: "+filename)
// detach file
os.WriteFile(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files", filename), file.Data, 0600)
}
//? result["invitation finalized"] = peer.Name
}
// user message
messagesOverview = append(messagesOverview, peer.Name+" > "+string(usermsg.Data)) // Send delivery ack if the peer requested it
if peer.SendDeliveryAck && usermsg.Status.Uuid != "" {
// stamp the received time before storing storagePath := filepath.Join(client.GetConfig().StoragePath, identity.Uuid)
receivedAt := time.Now().UTC().Unix() if ackErr := sendDeliveryAck(storagePath, peer, usermsg.Status.Uuid, receivedAt); ackErr != nil {
if usermsg.Status == nil { logger.Warn().Err(ackErr).Str("peer", peer.Uid).Msg("ConsumeInboxFile: sendDeliveryAck")
usermsg.Status = &meowlib.ConversationStatus{} }
}
usermsg.Status.Received = uint64(receivedAt)
// add message to storage
err = peer.StoreMessage(usermsg, filenames)
if err != nil {
return nil, nil, "ReadMessage: StoreMessage", err
}
filenames = []string{}
// Persist peer to save updated DR state (DrStateJson)
if peer.DrRootKey != "" {
if storeErr := identity.Peers.StorePeer(peer); storeErr != nil {
logger.Warn().Err(storeErr).Str("peer", peer.Uid).Msg("ConsumeInboxFile: StorePeer (DR state)")
}
}
// Send delivery ack if the peer requested it
if peer.SendDeliveryAck && usermsg.Status.Uuid != "" {
storagePath := filepath.Join(client.GetConfig().StoragePath, identity.Uuid)
if ackErr := sendDeliveryAck(storagePath, peer, usermsg.Status.Uuid, receivedAt); ackErr != nil {
logger.Warn().Err(ackErr).Str("peer", peer.Uid).Msg("ConsumeInboxFile: sendDeliveryAck")
} }
} }
} }
} }
err = os.Remove(messageFilename) err = os.Remove(messageFilename)
@@ -275,54 +294,33 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error
return messagesOverview, filenames, "", nil return messagesOverview, filenames, "", nil
} }
// LongPollAllSerevrJobs checks for messages on a all servers defived in job file // LongPollAllServerJobs checks for messages on all servers defined in job file.
// It returns as soon as any server delivers at least one message, or 0 when all
// polls time out. resultChan is buffered so goroutines never block on write.
func LongPollAllServerJobs(storage_path string, jobs []client.RequestsJob, timeout int, longPoll bool) (int, string, error) { func LongPollAllServerJobs(storage_path string, jobs []client.RequestsJob, timeout int, longPoll bool) (int, string, error) {
// Channel to collect results
resultChan := make(chan int, len(jobs)) resultChan := make(chan int, len(jobs))
errChan := make(chan error, len(jobs))
// WaitGroup to sync goroutines
var wg sync.WaitGroup var wg sync.WaitGroup
// Loop through each job (server)
for _, job := range jobs { for _, job := range jobs {
wg.Add(1) wg.Add(1)
go func(job client.RequestsJob) { go func(job client.RequestsJob) {
defer wg.Done() defer wg.Done()
// Long-polling call to the server
cnt, _, err := PollServer(storage_path, &job, timeout, true) cnt, _, err := PollServer(storage_path, &job, timeout, true)
if err == nil && cnt > 0 { if err == nil && cnt > 0 {
select { resultChan <- cnt
case resultChan <- cnt:
default:
}
// Close the error channel to notify all goroutines
close(errChan)
} }
}(job) }(job)
} }
// Close the result channel when all workers are done
go func() { go func() {
wg.Wait() wg.Wait()
close(resultChan) close(resultChan)
}() }()
// Wait for the first message or all timeouts if cnt, ok := <-resultChan; ok {
select {
case cnt := <-resultChan:
return cnt, "", nil return cnt, "", nil
case <-errChan:
// If one fails and exitOnMessage is true
return 0, "", nil
} }
return 0, "", nil
} }
// sendDeliveryAck builds a delivery acknowledgment for messageUuid and enqueues // sendDeliveryAck builds a delivery acknowledgment for messageUuid and enqueues
+11 -8
View File
@@ -21,33 +21,36 @@ const defaultPostTimeout = 200
// It creates and stores the user message, serialises the packed form to // It creates and stores the user message, serialises the packed form to
// storagePath/outbox/{dbFile}_{dbId}, and enqueues a SendJob in // storagePath/outbox/{dbFile}_{dbId}, and enqueues a SendJob in
// storagePath/queues/{peerUid}. // storagePath/queues/{peerUid}.
func CreateUserMessageAndSendJob(storagePath, message, peerUid, replyToUid string, filelist []string, servers []client.Server, timeout int) error { func CreateUserMessageAndSendJob(storagePath, message, peerUid, replyToUid string, filelist []string, servers []client.Server, timeout int) (string, error) {
packedMsg, dbFile, dbId, errTxt, err := CreateAndStoreUserMessage(message, peerUid, replyToUid, filelist) packedMsg, dbFile, dbId, msgUuid, errTxt, err := CreateAndStoreUserMessage(message, peerUid, replyToUid, filelist)
if err != nil { if err != nil {
return fmt.Errorf("%s: %w", errTxt, err) return "", fmt.Errorf("%s: %w", errTxt, err)
} }
data, err := proto.Marshal(packedMsg) data, err := proto.Marshal(packedMsg)
if err != nil { if err != nil {
return fmt.Errorf("CreateUserMessageAndSendJob: proto.Marshal: %w", err) return "", fmt.Errorf("CreateUserMessageAndSendJob: proto.Marshal: %w", err)
} }
outboxDir := filepath.Join(storagePath, "outbox") outboxDir := filepath.Join(storagePath, "outbox")
if err := os.MkdirAll(outboxDir, 0700); err != nil { if err := os.MkdirAll(outboxDir, 0700); err != nil {
return fmt.Errorf("CreateUserMessageAndSendJob: MkdirAll: %w", err) return "", fmt.Errorf("CreateUserMessageAndSendJob: MkdirAll: %w", err)
} }
outboxFile := filepath.Join(outboxDir, fmt.Sprintf("%s_%d", dbFile, dbId)) outboxFile := filepath.Join(outboxDir, fmt.Sprintf("%s_%d", dbFile, dbId))
if err := os.WriteFile(outboxFile, data, 0600); err != nil { if err := os.WriteFile(outboxFile, data, 0600); err != nil {
return fmt.Errorf("CreateUserMessageAndSendJob: WriteFile: %w", err) return "", fmt.Errorf("CreateUserMessageAndSendJob: WriteFile: %w", err)
} }
return client.PushSendJob(storagePath, &client.SendJob{ if err := client.PushSendJob(storagePath, &client.SendJob{
Queue: peerUid, Queue: peerUid,
File: outboxFile, File: outboxFile,
Servers: servers, Servers: servers,
Timeout: timeout, Timeout: timeout,
}) }); err != nil {
return "", err
}
return msgUuid, nil
} }
// ProcessSendQueues discovers every queue DB file under storagePath/queues/ // ProcessSendQueues discovers every queue DB file under storagePath/queues/
+2 -1
View File
@@ -202,7 +202,7 @@ func TestCreateUserMessageAndSendJob(t *testing.T) {
srv := newTestServer(t, "http://test-srv.example") srv := newTestServer(t, "http://test-srv.example")
err := CreateUserMessageAndSendJob( msgUuid, err := CreateUserMessageAndSendJob(
dir, dir,
"hello from integration", "hello from integration",
"peer-create-send", "peer-create-send",
@@ -212,6 +212,7 @@ func TestCreateUserMessageAndSendJob(t *testing.T) {
60, 60,
) )
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, msgUuid, "returned UUID must not be empty")
// A pending job must be in the queue. // A pending job must be in the queue.
job, _, err := client.PeekSendJob(dir, "peer-create-send") job, _, err := client.PeekSendJob(dir, "peer-create-send")
+10 -9
View File
@@ -42,7 +42,7 @@ func PackMessageForServer(packedMsg *meowlib.PackedUserMessage, srvuid string) (
} }
func CreateStorePackUserMessageForServer(message string, srvuid string, peer_uid string, replyToUid string, filelist []string) ([]byte, string, error) { func CreateStorePackUserMessageForServer(message string, srvuid string, peer_uid string, replyToUid string, filelist []string) ([]byte, string, error) {
usermessage, _, _, errtxt, err := CreateAndStoreUserMessage(message, peer_uid, replyToUid, filelist) usermessage, _, _, _, errtxt, err := CreateAndStoreUserMessage(message, peer_uid, replyToUid, filelist)
if err != nil { if err != nil {
return nil, errtxt, err return nil, errtxt, err
} }
@@ -51,20 +51,20 @@ func CreateStorePackUserMessageForServer(message string, srvuid string, peer_uid
// CreateAndStoreUserMessage creates, signs, and stores an outbound message for // CreateAndStoreUserMessage creates, signs, and stores an outbound message for
// peer_uid. It returns the packed (encrypted) form ready for server transport, // peer_uid. It returns the packed (encrypted) form ready for server transport,
// the peer DB file UUID (dbFile), the SQLite row ID (dbId), an error context // the peer DB file UUID (dbFile), the SQLite row ID (dbId), the message UUID
// string, and any error. // (conversation_status uuid), an error context string, and any error.
func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid string, filelist []string) (*meowlib.PackedUserMessage, string, int64, string, error) { func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid string, filelist []string) (*meowlib.PackedUserMessage, string, int64, string, string, error) {
peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid) peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid)
// Creating User message // Creating User message
usermessage, err := peer.BuildSimpleUserMessage([]byte(message)) usermessage, err := peer.BuildSimpleUserMessage([]byte(message))
if err != nil { if err != nil {
return nil, "", 0, "PrepareServerMessage : BuildSimpleUserMessage", err return nil, "", 0, "", "PrepareServerMessage : BuildSimpleUserMessage", err
} }
for _, file := range filelist { for _, file := range filelist {
err = usermessage.AddFile(file, client.GetConfig().Chunksize) err = usermessage.AddFile(file, client.GetConfig().Chunksize)
if err != nil { if err != nil {
return nil, "", 0, "PrepareServerMessage : AddFile", err return nil, "", 0, "", "PrepareServerMessage : AddFile", err
} }
} }
usermessage.Status.Sent = uint64(time.Now().UTC().Unix()) usermessage.Status.Sent = uint64(time.Now().UTC().Unix())
@@ -73,16 +73,17 @@ func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid strin
// Store message // Store message
err = peer.StoreMessage(usermessage, nil) err = peer.StoreMessage(usermessage, nil)
if err != nil { if err != nil {
return nil, "", 0, "messageBuildPostprocess : StoreMessage", err return nil, "", 0, "", "messageBuildPostprocess : StoreMessage", err
} }
dbFile := peer.LastMessage.Dbfile dbFile := peer.LastMessage.Dbfile
dbId := peer.LastMessage.Dbid dbId := peer.LastMessage.Dbid
msgUuid := usermessage.Status.Uuid
// Prepare cyphered + packed user message // Prepare cyphered + packed user message
packedMsg, err := peer.ProcessOutboundUserMessage(usermessage) packedMsg, err := peer.ProcessOutboundUserMessage(usermessage)
if err != nil { if err != nil {
return nil, "", 0, "messageBuildPostprocess : ProcessOutboundUserMessage", err return nil, "", 0, "", "messageBuildPostprocess : ProcessOutboundUserMessage", err
} }
// Persist peer to save updated DR state (DrStateJson) // Persist peer to save updated DR state (DrStateJson)
@@ -92,7 +93,7 @@ func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid strin
} }
} }
return packedMsg, dbFile, dbId, "", nil return packedMsg, dbFile, dbId, msgUuid, "", nil
} }
func BuildReceivedMessage(messageUid string, peer_uid string, received int64) (*meowlib.PackedUserMessage, string, error) { func BuildReceivedMessage(messageUid string, peer_uid string, received int64) (*meowlib.PackedUserMessage, string, error) {
+1 -1
View File
@@ -425,7 +425,7 @@ func (id *Identity) GetRequestJobs() []RequestsJob {
return list return list
} }
func (id *Identity) SaveBackgroundJob() error { func (id *Identity) SaveCheckJobs() error {
if id.RootKp == nil { if id.RootKp == nil {
return errors.New("identity not fully initialized: RootKp is nil") return errors.New("identity not fully initialized: RootKp is nil")
} }
+225 -247
View File
@@ -6,6 +6,7 @@ import (
"math" "math"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"forge.redroom.link/yves/meowlib" "forge.redroom.link/yves/meowlib"
"github.com/google/uuid" "github.com/google/uuid"
@@ -13,71 +14,82 @@ import (
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
func storeMessage(peer *Peer, usermessage *meowlib.UserMessage, filenames []string, password string) error { // One RWMutex per SQLite file path. Entries are never deleted (bounded by
var dbid string // peer count, which is small). RLock for reads, Lock for writes.
cfg := GetConfig() var dbFileMu sync.Map
identity := cfg.GetIdentity()
// If no db/no ID create DB + Tablz
// TODO : if file size > X new db
if len(peer.DbIds) == 0 {
dbid = uuid.NewString()
peer.DbIds = []string{dbid}
identity.Peers.StorePeer(peer) func getDbFileMutex(path string) *sync.RWMutex {
identity.CreateFolder() v, _ := dbFileMu.LoadOrStore(path, &sync.RWMutex{})
file, err := os.Create(filepath.Join(cfg.StoragePath, identity.Uuid, dbid+GetConfig().DbSuffix)) return v.(*sync.RWMutex)
if err != nil { }
return err
} func withDbWrite(path string, fn func(*sql.DB) error) error {
file.Close() mu := getDbFileMutex(path)
sqliteDatabase, err := sql.Open("sqlite3", filepath.Join(cfg.StoragePath, identity.Uuid, dbid+GetConfig().DbSuffix)) mu.Lock()
if err != nil { defer mu.Unlock()
return err db, err := sql.Open("sqlite3", path)
}
defer sqliteDatabase.Close()
err = createMessageTable(sqliteDatabase)
if err != nil {
return err
}
sqliteDatabase.Close()
} else {
dbid = peer.DbIds[len(peer.DbIds)-1]
}
// Open Db
db, err := sql.Open("sqlite3", filepath.Join(cfg.StoragePath, identity.Uuid, dbid+GetConfig().DbSuffix)) // Open the created SQLite File
if err != nil { if err != nil {
return err return err
} }
defer db.Close() defer db.Close()
// Detach Files return fn(db)
}
func withDbRead(path string, fn func(*sql.DB) error) error {
mu := getDbFileMutex(path)
mu.RLock()
defer mu.RUnlock()
db, err := sql.Open("sqlite3", path)
if err != nil {
return err
}
defer db.Close()
return fn(db)
}
func dbPath(cfg *Config, identity *Identity, dbid string) string {
return filepath.Join(cfg.StoragePath, identity.Uuid, dbid+cfg.DbSuffix)
}
func storeMessage(peer *Peer, usermessage *meowlib.UserMessage, filenames []string, password string) error {
cfg := GetConfig()
identity := cfg.GetIdentity()
isNew := len(peer.DbIds) == 0
var dbid string
if isNew {
dbid = uuid.NewString()
peer.DbIds = []string{dbid}
identity.Peers.StorePeer(peer)
identity.CreateFolder()
} else {
dbid = peer.DbIds[len(peer.DbIds)-1]
}
// Detach file attachments — no DB lock needed for file I/O.
hiddenFilenames := []string{} hiddenFilenames := []string{}
if len(usermessage.Files) > 0 { if len(usermessage.Files) > 0 {
secureDir := filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles")
if _, err := os.Stat(secureDir); os.IsNotExist(err) {
if err = os.MkdirAll(secureDir, 0755); err != nil {
return err
}
}
for _, f := range usermessage.Files { for _, f := range usermessage.Files {
hiddenFilename := uuid.NewString() hiddenFilename := uuid.NewString()
// Cypher file
encData, err := meowlib.SymEncrypt(password, f.Data) encData, err := meowlib.SymEncrypt(password, f.Data)
if err != nil { if err != nil {
return err return err
} }
if _, err := os.Stat(filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles")); os.IsNotExist(err) { hidden := filepath.Join(secureDir, hiddenFilename)
err = os.MkdirAll(filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles"), 0755) os.WriteFile(hidden, encData, 0600)
if err != nil { hiddenFilenames = append(hiddenFilenames, hidden)
return err f.Data = []byte(hidden)
}
}
os.WriteFile(filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles", hiddenFilename), encData, 0600)
hiddenFilenames = append(hiddenFilenames, filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles", hiddenFilename))
// replace f.Data by uuid filename
f.Data = []byte(filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles", hiddenFilename))
} }
} }
outbound := true
if usermessage.From == peer.ContactPublicKey { outbound := usermessage.From != peer.ContactPublicKey
outbound = false
}
// Convert UserMessage to DbMessage
dbm := UserMessageToDbMessage(outbound, usermessage, hiddenFilenames) dbm := UserMessageToDbMessage(outbound, usermessage, hiddenFilenames)
// Encrypt message
out, err := proto.Marshal(dbm) out, err := proto.Marshal(dbm)
if err != nil { if err != nil {
return err return err
@@ -86,98 +98,94 @@ func storeMessage(peer *Peer, usermessage *meowlib.UserMessage, filenames []stri
if err != nil { if err != nil {
return err return err
} }
// Insert message
insertMessageSQL := `INSERT INTO message(m) VALUES (?) RETURNING ID` var id int64
statement, err := db.Prepare(insertMessageSQL) // Prepare statement. path := dbPath(cfg, identity, dbid)
err = withDbWrite(path, func(db *sql.DB) error {
// SQLite creates the file on first Open; create the table if new DB.
if isNew {
if err := createMessageTable(db); err != nil {
return err
}
}
stmt, err := db.Prepare(`INSERT INTO message(m) VALUES (?) RETURNING ID`)
if err != nil {
return err
}
result, err := stmt.Exec(encData)
if err != nil {
return err
}
id, err = result.LastInsertId()
return err
})
if err != nil { if err != nil {
return err return err
} }
result, err := statement.Exec(encData)
if err != nil { peer.LastMessage = DbMessageToInternalUserMessage(id, dbid, dbm)
return err
}
id, err := result.LastInsertId()
if err != nil {
return err
}
ium := DbMessageToInternalUserMessage(id, dbid, dbm)
peer.LastMessage = ium
identity.Peers.StorePeer(peer) identity.Peers.StorePeer(peer)
return nil return nil
} }
// Get new messages from a peer
func loadNewMessages(peer *Peer, lastDbId int, password string) ([]*InternalUserMessage, error) { func loadNewMessages(peer *Peer, lastDbId int, password string) ([]*InternalUserMessage, error) {
var messages []*InternalUserMessage var messages []*InternalUserMessage
cfg := GetConfig() cfg := GetConfig()
identity := cfg.GetIdentity() identity := cfg.GetIdentity()
// handle no db yet
if len(peer.DbIds) == 0 { if len(peer.DbIds) == 0 {
return messages, nil return messages, nil
} }
fileidx := len(peer.DbIds) - 1 fileidx := len(peer.DbIds) - 1
// There fileidx should provide the db that we need (unless wantMore overlaps the next DB)
db, err := sql.Open("sqlite3", filepath.Join(cfg.StoragePath, identity.Uuid, peer.DbIds[fileidx]+GetConfig().DbSuffix)) // Open the created SQLite File
if err != nil {
return nil, err
}
defer db.Close()
// if it's first app query, it won't hold a lastIndex, so let's start from end
if lastDbId == 0 { if lastDbId == 0 {
lastDbId = math.MaxInt64 lastDbId = math.MaxInt64
} }
stm, err := db.Prepare("SELECT id, m FROM message WHERE id > ? ORDER BY id DESC") err := withDbRead(dbPath(cfg, identity, peer.DbIds[fileidx]), func(db *sql.DB) error {
if err != nil { stm, err := db.Prepare("SELECT id, m FROM message WHERE id > ? ORDER BY id DESC")
return nil, err
}
defer stm.Close()
rows, err := stm.Query(lastDbId)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var ium *InternalUserMessage
var dbm meowlib.DbMessage
var id int64
var m []byte
err = rows.Scan(&id, &m)
if err != nil { if err != nil {
return nil, err return err
} }
decdata, err := meowlib.SymDecrypt(password, m) defer stm.Close()
rows, err := stm.Query(lastDbId)
if err != nil { if err != nil {
return nil, err return err
} }
err = proto.Unmarshal(decdata, &dbm) defer rows.Close()
if err != nil { for rows.Next() {
return nil, err var id int64
var m []byte
if err = rows.Scan(&id, &m); err != nil {
return err
}
decdata, err := meowlib.SymDecrypt(password, m)
if err != nil {
return err
}
var dbm meowlib.DbMessage
if err = proto.Unmarshal(decdata, &dbm); err != nil {
return err
}
ium := DbMessageToInternalUserMessage(id, peer.DbIds[fileidx], &dbm)
ium.Dbid = id
ium.Dbfile = peer.DbIds[fileidx]
messages = append(messages, ium)
} }
return nil
ium = DbMessageToInternalUserMessage(id, peer.DbIds[fileidx], &dbm) })
ium.Dbid = id
ium.Dbfile = peer.DbIds[fileidx]
messages = append(messages, ium)
}
// TODO DB overlap // TODO DB overlap
return messages, nil return messages, err
} }
// Get old messages from a peer
func loadMessagesHistory(peer *Peer, inAppMsgCount int, lastDbId int, wantMore int, password string) ([]InternalUserMessage, error) { func loadMessagesHistory(peer *Peer, inAppMsgCount int, lastDbId int, wantMore int, password string) ([]InternalUserMessage, error) {
var messages []InternalUserMessage var messages []InternalUserMessage
// handle no db yet cfg := GetConfig()
if len(peer.DbIds) == 0 { if len(peer.DbIds) == 0 {
return messages, nil return messages, nil
} }
fileidx := len(peer.DbIds) - 1 fileidx := len(peer.DbIds) - 1
// initialize count with last db message count
countStack, err := getMessageCount(peer.DbIds[fileidx]) countStack, err := getMessageCount(peer.DbIds[fileidx])
if err != nil { if err != nil {
return nil, err return nil, err
} }
// while the db message count < what we already have in app, step to next db file
for inAppMsgCount > countStack { for inAppMsgCount > countStack {
fileidx-- fileidx--
if fileidx < 0 { if fileidx < 0 {
@@ -189,91 +197,80 @@ func loadMessagesHistory(peer *Peer, inAppMsgCount int, lastDbId int, wantMore i
} }
countStack += newCount countStack += newCount
} }
// There fileidx should provide the db that we need (unless wantMore overlaps the next DB)
db, err := sql.Open("sqlite3", filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, peer.DbIds[fileidx]+GetConfig().DbSuffix)) // Open the created SQLite File
if err != nil {
return nil, err
}
defer db.Close()
// if it's first app query, it won't hold a lastIndex, so let's start from end
if lastDbId == 0 { if lastDbId == 0 {
lastDbId = math.MaxInt64 lastDbId = math.MaxInt64
} }
stm, err := db.Prepare("SELECT id, m FROM message WHERE id < ? ORDER BY id DESC LIMIT ?") err = withDbRead(filepath.Join(cfg.StoragePath, cfg.GetIdentity().Uuid, peer.DbIds[fileidx]+cfg.DbSuffix), func(db *sql.DB) error {
if err != nil { stm, err := db.Prepare("SELECT id, m FROM message WHERE id < ? ORDER BY id DESC LIMIT ?")
return nil, err
}
defer stm.Close()
rows, err := stm.Query(lastDbId, wantMore)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var ium *InternalUserMessage
var dbm meowlib.DbMessage
var id int64
var m []byte
err = rows.Scan(&id, &m)
if err != nil { if err != nil {
return nil, err return err
} }
decdata, err := meowlib.SymDecrypt(password, m) defer stm.Close()
rows, err := stm.Query(lastDbId, wantMore)
if err != nil { if err != nil {
return nil, err return err
} }
err = proto.Unmarshal(decdata, &dbm) defer rows.Close()
if err != nil { for rows.Next() {
return nil, err var id int64
var m []byte
if err = rows.Scan(&id, &m); err != nil {
return err
}
decdata, err := meowlib.SymDecrypt(password, m)
if err != nil {
return err
}
var dbm meowlib.DbMessage
if err = proto.Unmarshal(decdata, &dbm); err != nil {
return err
}
ium := DbMessageToInternalUserMessage(id, peer.DbIds[fileidx], &dbm)
ium.Dbid = id
ium.Dbfile = peer.DbIds[fileidx]
messages = append(messages, *ium)
} }
return nil
ium = DbMessageToInternalUserMessage(id, peer.DbIds[fileidx], &dbm) })
ium.Dbid = id
ium.Dbfile = peer.DbIds[fileidx]
messages = append(messages, *ium)
}
// TODO DB overlap // TODO DB overlap
return messages, nil return messages, err
} }
func GetDbMessage(dbFile string, dbId int64, password string) (*meowlib.DbMessage, error) { func GetDbMessage(dbFile string, dbId int64, password string) (*meowlib.DbMessage, error) {
// There fileidx should provide the db that we need (unless wantMore overlaps the next DB) cfg := GetConfig()
db, err := sql.Open("sqlite3", filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, dbFile+GetConfig().DbSuffix)) // Open the created SQLite dbFile path := filepath.Join(cfg.StoragePath, cfg.GetIdentity().Uuid, dbFile+cfg.DbSuffix)
if err != nil {
return nil, err
}
defer db.Close()
stm, err := db.Prepare("SELECT id, m FROM message WHERE id=?")
if err != nil {
return nil, err
}
defer stm.Close()
rows, err := stm.Query(dbId)
if err != nil {
return nil, err
}
defer rows.Close()
var dbm meowlib.DbMessage var dbm meowlib.DbMessage
found := false found := false
for rows.Next() { err := withDbRead(path, func(db *sql.DB) error {
found = true stm, err := db.Prepare("SELECT id, m FROM message WHERE id=?")
var id int64
var m []byte
err = rows.Scan(&id, &m)
if err != nil { if err != nil {
return nil, err return err
} }
decdata, err := meowlib.SymDecrypt(password, m) defer stm.Close()
rows, err := stm.Query(dbId)
if err != nil { if err != nil {
return nil, err return err
} }
err = proto.Unmarshal(decdata, &dbm) defer rows.Close()
if err != nil { for rows.Next() {
return nil, err found = true
var id int64
var m []byte
if err = rows.Scan(&id, &m); err != nil {
return err
}
decdata, err := meowlib.SymDecrypt(password, m)
if err != nil {
return err
}
if err = proto.Unmarshal(decdata, &dbm); err != nil {
return err
}
} }
return nil
})
if err != nil {
return nil, err
} }
if !found { if !found {
return nil, fmt.Errorf("message row %d not found in %s", dbId, dbFile) return nil, fmt.Errorf("message row %d not found in %s", dbId, dbFile)
@@ -282,12 +279,8 @@ func GetDbMessage(dbFile string, dbId int64, password string) (*meowlib.DbMessag
} }
func UpdateDbMessage(dbm *meowlib.DbMessage, dbFile string, dbId int64, password string) error { func UpdateDbMessage(dbm *meowlib.DbMessage, dbFile string, dbId int64, password string) error {
db, err := sql.Open("sqlite3", filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, dbFile+GetConfig().DbSuffix)) // Open the created SQLite dbFile cfg := GetConfig()
if err != nil { path := filepath.Join(cfg.StoragePath, cfg.GetIdentity().Uuid, dbFile+cfg.DbSuffix)
return err
}
defer db.Close()
// Encrypt message
out, err := proto.Marshal(dbm) out, err := proto.Marshal(dbm)
if err != nil { if err != nil {
return err return err
@@ -296,20 +289,16 @@ func UpdateDbMessage(dbm *meowlib.DbMessage, dbFile string, dbId int64, password
if err != nil { if err != nil {
return err return err
} }
// Insert message return withDbWrite(path, func(db *sql.DB) error {
updateMessageSQL := `UPDATE message SET m=? WHERE id=?` stmt, err := db.Prepare(`UPDATE message SET m=? WHERE id=?`)
statement, err := db.Prepare(updateMessageSQL) // Prepare statement. if err != nil {
if err != nil { return err
}
_, err = stmt.Exec(encData, dbId)
return err return err
} })
_, err = statement.Exec(encData, dbId)
if err != nil {
return err
}
return nil
} }
// Get old messages from a peer
func GetMessagePreview(dbFile string, dbId int64, password string) ([]byte, error) { func GetMessagePreview(dbFile string, dbId int64, password string) ([]byte, error) {
dbm, err := GetDbMessage(dbFile, dbId, password) dbm, err := GetDbMessage(dbFile, dbId, password)
if err != nil { if err != nil {
@@ -318,24 +307,15 @@ func GetMessagePreview(dbFile string, dbId int64, password string) ([]byte, erro
return FilePreview(dbm.FilePaths[0], password) return FilePreview(dbm.FilePaths[0], password)
} }
// decrypt the a file and returns the raw content
func FilePreview(filename string, password string) ([]byte, error) { func FilePreview(filename string, password string) ([]byte, error) {
// get the hidden file
encData, err := os.ReadFile(filename) encData, err := os.ReadFile(filename)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// decrypt the file return meowlib.SymDecrypt(password, encData)
data, err := meowlib.SymDecrypt(password, encData)
if err != nil {
return nil, err
}
return data, nil
} }
// return the raw content from the files content (loads the first image, or build a more complex view)
func InternalUserMessagePreview(msg *InternalUserMessage, password string) ([]byte, error) { func InternalUserMessagePreview(msg *InternalUserMessage, password string) ([]byte, error) {
// get the hidden file name
if len(msg.FilePaths) == 0 { if len(msg.FilePaths) == 0 {
return nil, nil return nil, nil
} }
@@ -343,21 +323,16 @@ func InternalUserMessagePreview(msg *InternalUserMessage, password string) ([]by
} }
func getMessageCount(dbid string) (int, error) { func getMessageCount(dbid string) (int, error) {
db, err := sql.Open("sqlite3", filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, dbid+GetConfig().DbSuffix)) // Open the created SQLite File cfg := GetConfig()
if err != nil { path := filepath.Join(cfg.StoragePath, cfg.GetIdentity().Uuid, dbid+cfg.DbSuffix)
return 0, err
}
defer db.Close()
var count int var count int
query := "SELECT COUNT(*) FROM message" err := withDbRead(path, func(db *sql.DB) error {
err = db.QueryRow(query).Scan(&count) return db.QueryRow("SELECT COUNT(*) FROM message").Scan(&count)
if err != nil { })
return 0, err return count, err
}
return count, nil
} }
// SetMessageServerDelivery updates the server delivery UUID and timestamp for an existing stored message. // SetMessageServerDelivery updates the server delivery UUID and timestamp for a stored message.
func SetMessageServerDelivery(dbFile string, dbId int64, serverUid string, receiveTime uint64, password string) error { func SetMessageServerDelivery(dbFile string, dbId int64, serverUid string, receiveTime uint64, password string) error {
dbm, err := GetDbMessage(dbFile, dbId, password) dbm, err := GetDbMessage(dbFile, dbId, password)
if err != nil { if err != nil {
@@ -375,37 +350,42 @@ func FindMessageByUuid(peer *Peer, messageUuid string, password string) (string,
identity := cfg.GetIdentity() identity := cfg.GetIdentity()
for i := len(peer.DbIds) - 1; i >= 0; i-- { for i := len(peer.DbIds) - 1; i >= 0; i-- {
dbid := peer.DbIds[i] dbid := peer.DbIds[i]
db, err := sql.Open("sqlite3", filepath.Join(cfg.StoragePath, identity.Uuid, dbid+GetConfig().DbSuffix)) path := filepath.Join(cfg.StoragePath, identity.Uuid, dbid+cfg.DbSuffix)
if err != nil { var foundFile string
continue var foundId int64
} var foundMsg meowlib.DbMessage
rows, err := db.Query("SELECT id, m FROM message ORDER BY id DESC") err := withDbRead(path, func(db *sql.DB) error {
if err != nil { rows, err := db.Query("SELECT id, m FROM message ORDER BY id DESC")
db.Close()
continue
}
for rows.Next() {
var id int64
var m []byte
if err := rows.Scan(&id, &m); err != nil {
continue
}
decdata, err := meowlib.SymDecrypt(password, m)
if err != nil { if err != nil {
continue return err
} }
var dbm meowlib.DbMessage defer rows.Close()
if err := proto.Unmarshal(decdata, &dbm); err != nil { for rows.Next() {
continue var id int64
} var m []byte
if dbm.Status != nil && dbm.Status.Uuid == messageUuid { if err := rows.Scan(&id, &m); err != nil {
rows.Close() continue
db.Close() }
return dbid, id, &dbm, nil decdata, err := meowlib.SymDecrypt(password, m)
if err != nil {
continue
}
var dbm meowlib.DbMessage
if err := proto.Unmarshal(decdata, &dbm); err != nil {
continue
}
if dbm.Status != nil && dbm.Status.Uuid == messageUuid {
foundFile = dbid
foundId = id
foundMsg = dbm
return nil
}
} }
return nil
})
if err == nil && foundFile != "" {
return foundFile, foundId, &foundMsg, nil
} }
rows.Close()
db.Close()
} }
return "", 0, nil, fmt.Errorf("message with UUID %s not found", messageUuid) return "", 0, nil, fmt.Errorf("message with UUID %s not found", messageUuid)
} }
@@ -430,19 +410,18 @@ func UpdateMessageAck(peer *Peer, messageUuid string, receivedAt uint64, process
} }
func createMessageTable(db *sql.DB) error { func createMessageTable(db *sql.DB) error {
createMessageTableSQL := `CREATE TABLE message ( stmt, err := db.Prepare(`CREATE TABLE message (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"m" BLOB);` // SQL Statement for Create Table "m" BLOB)`)
statement, err := db.Prepare(createMessageTableSQL) // Prepare SQL Statement
if err != nil { if err != nil {
return err return err
} }
statement.Exec() // Execute SQL Statements stmt.Exec()
return nil return nil
} }
func createServerTable(db *sql.DB) error { func createServerTable(db *sql.DB) error {
createServerTableSQL := `CREATE TABLE servers ( stmt, err := db.Prepare(`CREATE TABLE servers (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"country" varchar(2), "country" varchar(2),
"public" bool, "public" bool,
@@ -451,13 +430,12 @@ func createServerTable(db *sql.DB) error {
"load" float, "load" float,
"url" varchar(2000) "url" varchar(2000)
"name" varchar(255); "name" varchar(255);
"description" varchar(5000) "description" varchar(5000)
"publickey" varchar(10000) "publickey" varchar(10000)
)` // SQL Statement for Create Table )`)
statement, err := db.Prepare(createServerTableSQL) // Prepare SQL Statement
if err != nil { if err != nil {
return err return err
} }
statement.Exec() // Execute SQL Statements stmt.Exec()
return nil return nil
} }
+55 -39
View File
@@ -9,6 +9,7 @@ import (
"errors" "errors"
"path/filepath" "path/filepath"
"sort" "sort"
"sync"
"forge.redroom.link/yves/meowlib" "forge.redroom.link/yves/meowlib"
"github.com/dgraph-io/badger" "github.com/dgraph-io/badger"
@@ -17,11 +18,12 @@ import (
type PeerStorage struct { type PeerStorage struct {
DbFile string `json:"db_file,omitempty"` DbFile string `json:"db_file,omitempty"`
mu sync.RWMutex
db *badger.DB db *badger.DB
cache map[string]*Peer cache map[string]*Peer
} }
// Open the badger database from struct PeerStorage // open opens the Badger database. Caller must hold mu (write).
func (ps *PeerStorage) open() error { func (ps *PeerStorage) open() error {
if ps.DbFile == "" { if ps.DbFile == "" {
ps.DbFile = uuid.New().String() ps.DbFile = uuid.New().String()
@@ -34,20 +36,27 @@ func (ps *PeerStorage) open() error {
opts.Logger = nil opts.Logger = nil
var err error var err error
ps.db, err = badger.Open(opts) ps.db, err = badger.Open(opts)
if err != nil { return err
return err
}
return nil
} }
// Store function StorePeer stores a peer in the badger database with Peer.Uid as key // close closes the Badger database. Caller must hold mu (write).
func (ps *PeerStorage) close() {
ps.db.Close()
}
// StorePeer stores a peer in the Badger database with Peer.Uid as key.
func (ps *PeerStorage) StorePeer(peer *Peer) error { func (ps *PeerStorage) StorePeer(peer *Peer) error {
err := ps.open() ps.mu.Lock()
if err != nil { defer ps.mu.Unlock()
return ps.storePeerLocked(peer)
}
// storePeerLocked is StorePeer without acquiring the lock. Caller must hold mu (write).
func (ps *PeerStorage) storePeerLocked(peer *Peer) error {
if err := ps.open(); err != nil {
return err return err
} }
defer ps.close() defer ps.close()
// first marshal the Peer to bytes with protobuf
jsonsrv, err := json.Marshal(peer) jsonsrv, err := json.Marshal(peer)
if err != nil { if err != nil {
return err return err
@@ -65,26 +74,24 @@ func (ps *PeerStorage) StorePeer(peer *Peer) error {
} }
shakey := sha256.Sum256([]byte(peer.Uid)) shakey := sha256.Sum256([]byte(peer.Uid))
key := shakey[:] key := shakey[:]
// add it to cache
ps.cache[peer.Uid] = peer ps.cache[peer.Uid] = peer
// then store it in the database
return ps.db.Update(func(txn *badger.Txn) error { return ps.db.Update(func(txn *badger.Txn) error {
return txn.Set(key, data) return txn.Set(key, data)
}) })
} }
// LoadPeer function loads a Peer from the badger database with Peer.GetUid() as key // LoadPeer loads a Peer from the Badger database with Peer.GetUid() as key.
func (ps *PeerStorage) LoadPeer(uid string, password string) (*Peer, error) { func (ps *PeerStorage) LoadPeer(uid string, password string) (*Peer, error) {
ps.mu.Lock()
defer ps.mu.Unlock()
var peer Peer var peer Peer
err := ps.open() if err := ps.open(); err != nil {
if err != nil {
return nil, err return nil, err
} }
defer ps.close() defer ps.close()
shakey := sha256.Sum256([]byte(uid)) shakey := sha256.Sum256([]byte(uid))
key := shakey[:] key := shakey[:]
err = ps.db.View(func(txn *badger.Txn) error { err := ps.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(key) item, err := txn.Get(key)
if err != nil { if err != nil {
return err return err
@@ -100,16 +107,17 @@ func (ps *PeerStorage) LoadPeer(uid string, password string) (*Peer, error) {
return &peer, err return &peer, err
} }
// DeletePeer function deletes a Peer from the badger database with Peer.GetUid() as key // DeletePeer deletes a Peer from the Badger database with Peer.GetUid() as key.
func (ps *PeerStorage) DeletePeer(uid string) error { func (ps *PeerStorage) DeletePeer(uid string) error {
err := ps.open() ps.mu.Lock()
if err != nil { defer ps.mu.Unlock()
if err := ps.open(); err != nil {
return err return err
} }
defer ps.close() defer ps.close()
shakey := sha256.Sum256([]byte(uid)) shakey := sha256.Sum256([]byte(uid))
key := shakey[:] key := shakey[:]
err = ps.db.Update(func(txn *badger.Txn) error { err := ps.db.Update(func(txn *badger.Txn) error {
return txn.Delete(key) return txn.Delete(key)
}) })
if err == nil { if err == nil {
@@ -118,15 +126,16 @@ func (ps *PeerStorage) DeletePeer(uid string) error {
return err return err
} }
// LoadPeers function loads Peers from the badger database with a specific password // LoadPeers loads all Peers from the Badger database and populates the cache.
func (ps *PeerStorage) LoadPeers(password string) ([]*Peer, error) { func (ps *PeerStorage) LoadPeers(password string) ([]*Peer, error) {
ps.mu.Lock()
defer ps.mu.Unlock()
var peers []*Peer var peers []*Peer
err := ps.open() if err := ps.open(); err != nil {
if err != nil {
return nil, err return nil, err
} }
defer ps.close() defer ps.close()
err = ps.db.View(func(txn *badger.Txn) error { err := ps.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions opts := badger.DefaultIteratorOptions
opts.PrefetchSize = 10 opts.PrefetchSize = 10
it := txn.NewIterator(opts) it := txn.NewIterator(opts)
@@ -148,32 +157,29 @@ func (ps *PeerStorage) LoadPeers(password string) ([]*Peer, error) {
} }
return nil return nil
}) })
// Sort peers based on peer.Name
sort.Slice(peers, func(i, j int) bool { sort.Slice(peers, func(i, j int) bool {
return peers[i].Name < peers[j].Name return peers[i].Name < peers[j].Name
}) })
return peers, err return peers, err
} }
// GetPeers function returns all peers from the cache as a sorted array // GetPeers returns all peers from the cache as a sorted slice.
func (ps *PeerStorage) GetPeers() ([]*Peer, error) { func (ps *PeerStorage) GetPeers() ([]*Peer, error) {
ps.mu.RLock()
defer ps.mu.RUnlock()
peers := make([]*Peer, 0, len(ps.cache)) peers := make([]*Peer, 0, len(ps.cache))
for _, peer := range ps.cache { for _, peer := range ps.cache {
peers = append(peers, peer) peers = append(peers, peer)
} }
// Sort peers based on peer.Name
sort.Slice(peers, func(i, j int) bool { sort.Slice(peers, func(i, j int) bool {
return peers[i].Name < peers[j].Name return peers[i].Name < peers[j].Name
}) })
return peers, nil return peers, nil
} }
// close the badger database
func (ps *PeerStorage) close() {
ps.db.Close()
}
func (ps *PeerStorage) GetFromPublicKey(publickey string) *Peer { func (ps *PeerStorage) GetFromPublicKey(publickey string) *Peer {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, peer := range ps.cache { for _, peer := range ps.cache {
if peer.ContactPublicKey == publickey { if peer.ContactPublicKey == publickey {
return peer return peer
@@ -183,6 +189,8 @@ func (ps *PeerStorage) GetFromPublicKey(publickey string) *Peer {
} }
func (ps *PeerStorage) GetFromInvitationId(invitationId string) *Peer { func (ps *PeerStorage) GetFromInvitationId(invitationId string) *Peer {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, peer := range ps.cache { for _, peer := range ps.cache {
if peer.InvitationId == invitationId { if peer.InvitationId == invitationId {
return peer return peer
@@ -192,6 +200,8 @@ func (ps *PeerStorage) GetFromInvitationId(invitationId string) *Peer {
} }
func (ps *PeerStorage) GetFromMyLookupKey(publickey string) *Peer { func (ps *PeerStorage) GetFromMyLookupKey(publickey string) *Peer {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, peer := range ps.cache { for _, peer := range ps.cache {
if peer.MyLookupKp.Public == publickey { if peer.MyLookupKp.Public == publickey {
return peer return peer
@@ -201,6 +211,8 @@ func (ps *PeerStorage) GetFromMyLookupKey(publickey string) *Peer {
} }
func (ps *PeerStorage) NameExists(name string) bool { func (ps *PeerStorage) NameExists(name string) bool {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, peer := range ps.cache { for _, peer := range ps.cache {
if peer.Name == name { if peer.Name == name {
return true return true
@@ -210,6 +222,8 @@ func (ps *PeerStorage) NameExists(name string) bool {
} }
func (ps *PeerStorage) GetFromName(name string) *Peer { func (ps *PeerStorage) GetFromName(name string) *Peer {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, peer := range ps.cache { for _, peer := range ps.cache {
if peer.Name == name { if peer.Name == name {
return peer return peer
@@ -219,26 +233,29 @@ func (ps *PeerStorage) GetFromName(name string) *Peer {
} }
func (ps *PeerStorage) GetFromUid(uid string) *Peer { func (ps *PeerStorage) GetFromUid(uid string) *Peer {
ps.mu.RLock()
defer ps.mu.RUnlock()
return ps.cache[uid] return ps.cache[uid]
} }
// Checks if the received contact card is an answer to an invitation, returns true if it is, and the proposed and received nicknames // CheckInvitation checks if the received contact card is an answer to an invitation.
func (ps *PeerStorage) CheckInvitation(ReceivedContact *meowlib.ContactCard) (isAnswer bool, proposedNick string, receivedNick string, invitationMessage string) { func (ps *PeerStorage) CheckInvitation(ReceivedContact *meowlib.ContactCard) (isAnswer bool, proposedNick string, receivedNick string, invitationMessage string) {
// invitation Id found, this is an answer to an invitation ps.mu.RLock()
defer ps.mu.RUnlock()
for _, p := range ps.cache { for _, p := range ps.cache {
if p.InvitationId == ReceivedContact.InvitationId { if p.InvitationId == ReceivedContact.InvitationId {
return true, p.Name, ReceivedContact.Name, ReceivedContact.InvitationMessage return true, p.Name, ReceivedContact.Name, ReceivedContact.InvitationMessage
} }
} }
// it's an invitation
return false, "", ReceivedContact.Name, ReceivedContact.InvitationMessage return false, "", ReceivedContact.Name, ReceivedContact.InvitationMessage
} }
// Finalizes an invitation, returns nil if successful // FinalizeInvitation completes an invitation handshake and persists the updated peer.
func (ps *PeerStorage) FinalizeInvitation(ReceivedContact *meowlib.ContactCard) error { func (ps *PeerStorage) FinalizeInvitation(ReceivedContact *meowlib.ContactCard) error {
ps.mu.Lock()
defer ps.mu.Unlock()
for i, p := range ps.cache { for i, p := range ps.cache {
if p.InvitationId == ReceivedContact.InvitationId { if p.InvitationId == ReceivedContact.InvitationId {
//id.Peers[i].Name = ReceivedContact.Name
ps.cache[i].ContactEncryption = ReceivedContact.EncryptionPublicKey ps.cache[i].ContactEncryption = ReceivedContact.EncryptionPublicKey
ps.cache[i].ContactLookupKey = ReceivedContact.LookupPublicKey ps.cache[i].ContactLookupKey = ReceivedContact.LookupPublicKey
ps.cache[i].ContactPublicKey = ReceivedContact.ContactPublicKey ps.cache[i].ContactPublicKey = ReceivedContact.ContactPublicKey
@@ -250,8 +267,7 @@ func (ps *PeerStorage) FinalizeInvitation(ReceivedContact *meowlib.ContactCard)
srvs = append(srvs, ReceivedContact.PullServers[srv].GetUid()) srvs = append(srvs, ReceivedContact.PullServers[srv].GetUid())
} }
ps.cache[i].ContactPullServers = srvs ps.cache[i].ContactPullServers = srvs
ps.StorePeer(ps.cache[i]) return ps.storePeerLocked(ps.cache[i])
return nil
} }
} }
return errors.New("no matching contact found for invitationId " + ReceivedContact.InvitationId) return errors.New("no matching contact found for invitationId " + ReceivedContact.InvitationId)
+59 -46
View File
@@ -7,6 +7,7 @@ import (
"crypto/sha256" "crypto/sha256"
"encoding/json" "encoding/json"
"path/filepath" "path/filepath"
"sync"
"forge.redroom.link/yves/meowlib" "forge.redroom.link/yves/meowlib"
"github.com/dgraph-io/badger" "github.com/dgraph-io/badger"
@@ -14,30 +15,37 @@ import (
type ServerStorage struct { type ServerStorage struct {
DbFile string `json:"db_file,omitempty"` DbFile string `json:"db_file,omitempty"`
mu sync.Mutex
db *badger.DB db *badger.DB
} }
// Open a badger database from struct ServerStorage // open opens the Badger database. Caller must hold mu.
func (ss *ServerStorage) open() error { func (ss *ServerStorage) open() error {
opts := badger.DefaultOptions(filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, ss.DbFile)) opts := badger.DefaultOptions(filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, ss.DbFile))
opts.Logger = nil opts.Logger = nil
var err error var err error
ss.db, err = badger.Open(opts) ss.db, err = badger.Open(opts)
if err != nil { return err
return err
}
return nil
} }
// Store function StoreServer stores a server in a badger database with Server.GetUid() as key // close closes the Badger database. Caller must hold mu.
func (ss *ServerStorage) close() {
ss.db.Close()
}
// StoreServer stores a server in the Badger database with Server.GetUid() as key.
func (ss *ServerStorage) StoreServer(sc *Server) error { func (ss *ServerStorage) StoreServer(sc *Server) error {
err := ss.open() ss.mu.Lock()
if err != nil { defer ss.mu.Unlock()
return ss.storeServerLocked(sc)
}
// storeServerLocked is StoreServer without acquiring the lock. Caller must hold mu.
func (ss *ServerStorage) storeServerLocked(sc *Server) error {
if err := ss.open(); err != nil {
return err return err
} }
defer ss.close() defer ss.close()
// first marshal the Server to bytes with protobuf
jsonsrv, err := json.Marshal(sc) jsonsrv, err := json.Marshal(sc)
if err != nil { if err != nil {
return err return err
@@ -52,51 +60,56 @@ func (ss *ServerStorage) StoreServer(sc *Server) error {
} }
shakey := sha256.Sum256([]byte(sc.GetServerCard().GetUid())) shakey := sha256.Sum256([]byte(sc.GetServerCard().GetUid()))
key := shakey[:] key := shakey[:]
// then store it in the database
return ss.db.Update(func(txn *badger.Txn) error { return ss.db.Update(func(txn *badger.Txn) error {
return txn.Set(key, data) return txn.Set(key, data)
}) })
} }
// Check if a server exists in a badger database with Server.GetUid() as key // ServerExists checks if a server exists in the Badger database.
func (ss *ServerStorage) ServerExists(sc *Server) (bool, error) { func (ss *ServerStorage) ServerExists(sc *Server) (bool, error) {
err := ss.open() ss.mu.Lock()
if err != nil { defer ss.mu.Unlock()
return ss.serverExistsLocked(sc)
}
// serverExistsLocked is ServerExists without acquiring the lock. Caller must hold mu.
func (ss *ServerStorage) serverExistsLocked(sc *Server) (bool, error) {
if err := ss.open(); err != nil {
return false, err return false, err
} }
defer ss.close() defer ss.close()
shakey := sha256.Sum256([]byte(sc.GetServerCard().GetUid())) shakey := sha256.Sum256([]byte(sc.GetServerCard().GetUid()))
key := shakey[:] key := shakey[:]
// check if key exists in badger database err := ss.db.View(func(txn *badger.Txn) error {
err = ss.db.View(func(txn *badger.Txn) error {
_, err := txn.Get(key) _, err := txn.Get(key)
return err return err
}) // Add a comma here })
if err != nil { // key does not exist if err != nil {
return false, nil return false, nil
} }
return true, nil return true, nil
} }
// Store a server in a badger database with Server.GetUid() as key if it is not already there // StoreServerIfNotExists stores a server only if it is not already present.
func (ss *ServerStorage) StoreServerIfNotExists(sc *Server) error { func (ss *ServerStorage) StoreServerIfNotExists(sc *Server) error {
exists, err := ss.ServerExists(sc) ss.mu.Lock()
defer ss.mu.Unlock()
exists, err := ss.serverExistsLocked(sc)
if err != nil { if err != nil {
return err return err
} }
if !exists { if !exists {
return ss.StoreServer(sc) return ss.storeServerLocked(sc)
} }
return nil return nil
} }
// LoadServer function loads a Server from a badger database with Server.GetUid() as key // LoadServer loads a Server from the Badger database by uid.
func (ss *ServerStorage) LoadServer(uid string) (*Server, error) { func (ss *ServerStorage) LoadServer(uid string) (*Server, error) {
ss.mu.Lock()
defer ss.mu.Unlock()
var sc Server var sc Server
err := ss.open() if err := ss.open(); err != nil {
if err != nil {
return nil, err return nil, err
} }
defer ss.close() defer ss.close()
@@ -122,10 +135,11 @@ func (ss *ServerStorage) LoadServer(uid string) (*Server, error) {
return &sc, err return &sc, err
} }
// DeleteServer function deletes a Server from a badger database with Server.GetUid() as key // DeleteServer deletes a Server from the Badger database by uid.
func (ss *ServerStorage) DeleteServer(uid string) error { func (ss *ServerStorage) DeleteServer(uid string) error {
err := ss.open() ss.mu.Lock()
if err != nil { defer ss.mu.Unlock()
if err := ss.open(); err != nil {
return err return err
} }
defer ss.close() defer ss.close()
@@ -136,11 +150,12 @@ func (ss *ServerStorage) DeleteServer(uid string) error {
}) })
} }
// LoadAllServers function loads all Servers from a badger database // LoadAllServers loads all Servers from the Badger database.
func (ss *ServerStorage) LoadAllServers() ([]*Server, error) { func (ss *ServerStorage) LoadAllServers() ([]*Server, error) {
ss.mu.Lock()
defer ss.mu.Unlock()
var scs []*Server var scs []*Server
err := ss.open() if err := ss.open(); err != nil {
if err != nil {
return nil, err return nil, err
} }
defer ss.close() defer ss.close()
@@ -173,11 +188,12 @@ func (ss *ServerStorage) LoadAllServers() ([]*Server, error) {
return scs, err return scs, err
} }
// LoadAllServers function loads all ServersCards from a badger database // LoadAllServerCards loads all ServerCards from the Badger database.
func (ss *ServerStorage) LoadAllServerCards() ([]*meowlib.ServerCard, error) { func (ss *ServerStorage) LoadAllServerCards() ([]*meowlib.ServerCard, error) {
ss.mu.Lock()
defer ss.mu.Unlock()
var scs []*meowlib.ServerCard var scs []*meowlib.ServerCard
err := ss.open() if err := ss.open(); err != nil {
if err != nil {
return nil, err return nil, err
} }
defer ss.close() defer ss.close()
@@ -210,11 +226,12 @@ func (ss *ServerStorage) LoadAllServerCards() ([]*meowlib.ServerCard, error) {
return scs, err return scs, err
} }
// LoadServersFromUids function loads Servers with id in []Uid parameter from a badger database // LoadServersFromUids loads Servers whose UIDs are in the provided slice.
func (ss *ServerStorage) LoadServersFromUids(uids []string) ([]*Server, error) { func (ss *ServerStorage) LoadServersFromUids(uids []string) ([]*Server, error) {
ss.mu.Lock()
defer ss.mu.Unlock()
var scs []*Server var scs []*Server
err := ss.open() if err := ss.open(); err != nil {
if err != nil {
return nil, err return nil, err
} }
defer ss.close() defer ss.close()
@@ -248,11 +265,12 @@ func (ss *ServerStorage) LoadServersFromUids(uids []string) ([]*Server, error) {
return scs, err return scs, err
} }
// LoadServersFromUids function loads Servers with id in []Uid parameter from a badger database // LoadServerCardsFromUids loads ServerCards whose UIDs are in the provided slice.
func (ss *ServerStorage) LoadServerCardsFromUids(uids []string) ([]*meowlib.ServerCard, error) { func (ss *ServerStorage) LoadServerCardsFromUids(uids []string) ([]*meowlib.ServerCard, error) {
ss.mu.Lock()
defer ss.mu.Unlock()
var scs []*meowlib.ServerCard var scs []*meowlib.ServerCard
err := ss.open() if err := ss.open(); err != nil {
if err != nil {
return nil, err return nil, err
} }
defer ss.close() defer ss.close()
@@ -285,8 +303,3 @@ func (ss *ServerStorage) LoadServerCardsFromUids(uids []string) ([]*meowlib.Serv
}) })
return scs, err return scs, err
} }
// close a badger database
func (ss *ServerStorage) close() {
ss.db.Close()
}
+9 -11
View File
@@ -1,11 +1,9 @@
module forge.redroom.link/yves/meowlib module forge.redroom.link/yves/meowlib
go 1.23.1 go 1.25.0
toolchain go1.24.2
require ( require (
github.com/ProtonMail/gopenpgp/v2 v2.8.3 github.com/ProtonMail/gopenpgp/v2 v2.10.0
github.com/awnumar/memguard v0.23.0 github.com/awnumar/memguard v0.23.0
github.com/dgraph-io/badger v1.6.2 github.com/dgraph-io/badger v1.6.2
github.com/go-redis/redis v6.15.9+incompatible github.com/go-redis/redis v6.15.9+incompatible
@@ -16,18 +14,18 @@ require (
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/rs/zerolog v1.34.0 github.com/rs/zerolog v1.34.0
github.com/stretchr/testify v1.9.0 github.com/stretchr/testify v1.9.0
google.golang.org/protobuf v1.36.6 google.golang.org/protobuf v1.36.11
) )
require ( require (
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect
github.com/ProtonMail/go-crypto v1.2.0 // indirect github.com/ProtonMail/go-crypto v1.4.1 // indirect
github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f // indirect github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f // indirect
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect
github.com/alicebob/miniredis v2.5.0+incompatible // indirect github.com/alicebob/miniredis v2.5.0+incompatible // indirect
github.com/awnumar/memcall v0.4.0 // indirect github.com/awnumar/memcall v0.4.0 // indirect
github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash v1.1.0 // indirect
github.com/cloudflare/circl v1.6.1 // indirect github.com/cloudflare/circl v1.6.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgraph-io/ristretto v0.0.2 // indirect github.com/dgraph-io/ristretto v0.0.2 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect
@@ -43,11 +41,11 @@ require (
github.com/status-im/doubleratchet v3.0.0+incompatible // indirect github.com/status-im/doubleratchet v3.0.0+incompatible // indirect
github.com/twitchtv/twirp v8.1.3+incompatible // indirect github.com/twitchtv/twirp v8.1.3+incompatible // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect
golang.org/x/crypto v0.41.0 // indirect golang.org/x/crypto v0.50.0 // indirect
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect
golang.org/x/net v0.42.0 // indirect golang.org/x/net v0.52.0 // indirect
golang.org/x/sys v0.35.0 // indirect golang.org/x/sys v0.43.0 // indirect
golang.org/x/text v0.28.0 // indirect golang.org/x/text v0.36.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect
google.golang.org/grpc v1.62.0 // indirect google.golang.org/grpc v1.62.0 // indirect
+18
View File
@@ -5,10 +5,14 @@ github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/ProtonMail/go-crypto v1.2.0 h1:+PhXXn4SPGd+qk76TlEePBfOfivE0zkWFenhGhFLzWs= github.com/ProtonMail/go-crypto v1.2.0 h1:+PhXXn4SPGd+qk76TlEePBfOfivE0zkWFenhGhFLzWs=
github.com/ProtonMail/go-crypto v1.2.0/go.mod h1:9whxjD8Rbs29b4XWbB8irEcE8KHMqaR2e7GWU1R+/PE= github.com/ProtonMail/go-crypto v1.2.0/go.mod h1:9whxjD8Rbs29b4XWbB8irEcE8KHMqaR2e7GWU1R+/PE=
github.com/ProtonMail/go-crypto v1.4.1 h1:9RfcZHqEQUvP8RzecWEUafnZVtEvrBVL9BiF67IQOfM=
github.com/ProtonMail/go-crypto v1.4.1/go.mod h1:e1OaTyu5SYVrO9gKOEhTc+5UcXtTUa+P3uLudwcgPqo=
github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f h1:tCbYj7/299ekTTXpdwKYF8eBlsYsDVoggDAuAjoK66k= github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f h1:tCbYj7/299ekTTXpdwKYF8eBlsYsDVoggDAuAjoK66k=
github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f/go.mod h1:gcr0kNtGBqin9zDW9GOHcVntrwnjrK+qdJ06mWYBybw= github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f/go.mod h1:gcr0kNtGBqin9zDW9GOHcVntrwnjrK+qdJ06mWYBybw=
github.com/ProtonMail/gopenpgp/v2 v2.8.3 h1:1jHlELwCR00qovx2B50DkL/FjYwt/P91RnlsqeOp2Hs= github.com/ProtonMail/gopenpgp/v2 v2.8.3 h1:1jHlELwCR00qovx2B50DkL/FjYwt/P91RnlsqeOp2Hs=
github.com/ProtonMail/gopenpgp/v2 v2.8.3/go.mod h1:LiuOTbnJit8w9ZzOoLscj0kmdALY7hfoCVh5Qlb0bcg= github.com/ProtonMail/gopenpgp/v2 v2.8.3/go.mod h1:LiuOTbnJit8w9ZzOoLscj0kmdALY7hfoCVh5Qlb0bcg=
github.com/ProtonMail/gopenpgp/v2 v2.10.0 h1:llCzLvntC9+iH+if/na4AgKTef/Zm4vpaRrR3+JdKvo=
github.com/ProtonMail/gopenpgp/v2 v2.10.0/go.mod h1:dc0h9Pg3ftfN0U4pfRzujilfh61A2R52wgMkZWcWm2I=
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 h1:uvdUDbHQHO85qeSydJtItA4T55Pw6BtAejd0APRJOCE= github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 h1:uvdUDbHQHO85qeSydJtItA4T55Pw6BtAejd0APRJOCE=
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI= github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI=
@@ -28,6 +32,8 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cloudflare/circl v1.6.1 h1:zqIqSPIndyBh1bjLVVDHMPpVKqp8Su/V+6MeDzzQBQ0= github.com/cloudflare/circl v1.6.1 h1:zqIqSPIndyBh1bjLVVDHMPpVKqp8Su/V+6MeDzzQBQ0=
github.com/cloudflare/circl v1.6.1/go.mod h1:uddAzsPgqdMAYatqJ0lsjX1oECcQLIlRpzZh3pJrofs= github.com/cloudflare/circl v1.6.1/go.mod h1:uddAzsPgqdMAYatqJ0lsjX1oECcQLIlRpzZh3pJrofs=
github.com/cloudflare/circl v1.6.3 h1:9GPOhQGF9MCYUeXyMYlqTR6a5gTrgR/fBLXvUgtVcg8=
github.com/cloudflare/circl v1.6.3/go.mod h1:2eXP6Qfat4O/Yhh8BznvKnJ+uzEoTQ6jVKJRn81BiS4=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
@@ -88,6 +94,7 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
@@ -248,6 +255,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4=
golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc=
golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI=
golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q=
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 h1:LfspQV/FYTatPTr/3HzIcmiUFH7PGP+OQ6mgDYo3yuQ= golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 h1:LfspQV/FYTatPTr/3HzIcmiUFH7PGP+OQ6mgDYo3yuQ=
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= golang.org/x/exp v0.0.0-20240222234643-814bf88cf225/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
@@ -264,6 +273,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0=
golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -271,6 +282,7 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -294,6 +306,8 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI=
golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
@@ -308,6 +322,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg=
golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
@@ -333,6 +349,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
+6
View File
@@ -38,6 +38,12 @@ func HttpPostMessage(url string, msg []byte, timeout int) (response []byte, err
defer resp.Body.Close() defer resp.Body.Close()
body, err := io.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
// Server already accepted the request (2xx) — body truncation on our
// side doesn't mean the message wasn't stored. Return what we have so
// the caller doesn't retry and produce a duplicate.
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return body, nil
}
return nil, err return nil, err
} }
return body, nil return body, nil
+40 -47
View File
@@ -2,7 +2,6 @@ package server
import ( import (
"context" "context"
"sync"
"time" "time"
"forge.redroom.link/yves/meowlib" "forge.redroom.link/yves/meowlib"
@@ -37,7 +36,7 @@ func NewRedisRouter(server *Identity, redisUrl string, password string, db int,
return &r return &r
} }
func (r *RedisRouter) Route(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { func (r *RedisRouter) Route(ctx context.Context, msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) {
var from_server *meowlib.FromServerMessage var from_server *meowlib.FromServerMessage
// update messages counter // update messages counter
err := r.Client.Incr("statistics:messages:total").Err() err := r.Client.Incr("statistics:messages:total").Err()
@@ -59,10 +58,9 @@ func (r *RedisRouter) Route(msg *meowlib.ToServerMessage) (*meowlib.FromServerMe
if err != nil { if err != nil {
return nil, err return nil, err
} }
if msg.Timeout > 0 { if msg.Timeout > 0 && len(from_server.Chat) == 0 && from_server.Invitation == nil {
logger.Info().Msg("long poll, subscribing for messages") logger.Info().Msg("long poll, subscribing for messages")
// set timeout for the lookup from_server, err = r.subscribe(ctx, msg, int(msg.Timeout))
from_server, err = r.subscribe(msg, int(msg.Timeout))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -206,60 +204,55 @@ func (r *RedisRouter) checkForMessage(msg *meowlib.ToServerMessage) (*meowlib.Fr
return &from_server, nil return &from_server, nil
} }
func goSubscribeAndListen(client *redis.Client, key string, messages chan<- string, wg *sync.WaitGroup, done <-chan struct{}) {
defer wg.Done() func (r *RedisRouter) subscribe(reqCtx context.Context, msg *meowlib.ToServerMessage, timeout int) (*meowlib.FromServerMessage, error) {
pubsub := client.Subscribe("msgch:" + key) if err := r.Client.Incr("statistics:messages:messagessubscription").Err(); err != nil {
return nil, err
}
channels := make([]string, 0, len(msg.PullRequest))
for _, rq := range msg.PullRequest {
channels = append(channels, "msgch:"+rq.LookupKey)
}
// Subscribe before re-checking: any publish that fires after Subscribe()
// returns is buffered in pubsub.Channel(), closing the store→publish→check race.
pubsub := r.Client.Subscribe(channels...)
defer pubsub.Close() defer pubsub.Close()
// Create a new channel for the messages from this subscription // Drain one subscribe-confirmation per channel.
myMessages := make(chan *redis.Message) for range len(channels) {
go func() { if _, err := pubsub.Receive(); err != nil {
for { return nil, err
msg, err := pubsub.ReceiveMessage()
if err != nil {
close(myMessages)
return
}
myMessages <- msg
} }
}()
// Wait for a message or for the done signal
select {
case msg := <-myMessages:
messages <- msg.Payload
case <-done:
return
} }
}
func (r *RedisRouter) subscribe(msg *meowlib.ToServerMessage, timeout int) (*meowlib.FromServerMessage, error) { // Re-check now that we are subscribed; catches messages that arrived
var from_server meowlib.FromServerMessage // between the caller's first checkForMessage and our Subscribe call.
// update messages counter fromServer, err := r.checkForMessage(msg)
err := r.Client.Incr("statistics:messages:messagessubscription").Err()
if err != nil { if err != nil {
return nil, err return nil, err
} }
messages := make(chan string) if len(fromServer.Chat) > 0 || fromServer.Invitation != nil {
var wg sync.WaitGroup return fromServer, nil
done := make(chan struct{})
// extract lookup keys and subscribe
// iterate over pull requests
for _, rq := range msg.PullRequest {
wg.Add(1)
// subscribe to the lookup key
go goSubscribeAndListen(r.Client, rq.LookupKey, messages, &wg, done)
} }
// wait for timeout or message
ctx, cancel := context.WithTimeout(reqCtx, time.Duration(timeout)*time.Second)
defer cancel()
ch := pubsub.Channel()
select { select {
case <-messages: case <-ctx.Done():
close(done) if ctx.Err() == context.DeadlineExceeded {
return fromServer, nil
}
return nil, ctx.Err()
case _, ok := <-ch:
if !ok {
return fromServer, nil
}
return r.checkForMessage(msg) return r.checkForMessage(msg)
case <-time.After(time.Duration(timeout) * time.Second): // 10 seconds timeout
close(done)
} }
wg.Wait()
return &from_server, nil
} }
func (r *RedisRouter) handleInvitation(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { func (r *RedisRouter) handleInvitation(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) {
+14 -13
View File
@@ -1,6 +1,7 @@
package server package server
import ( import (
"context"
"testing" "testing"
"time" "time"
@@ -33,7 +34,7 @@ func newTestRouter(t *testing.T) (*RedisRouter, *miniredis.Miniredis) {
Addr: mr.Addr(), Addr: mr.Addr(),
}), }),
InvitationTimeout: 3600, InvitationTimeout: 3600,
Context: nil, Context: context.Background(),
} }
// seed the statistics:start key that NewRedisRouter normally sets // seed the statistics:start key that NewRedisRouter normally sets
router.Client.Set("statistics:start", time.Now().UTC().Format(time.RFC3339), 0) router.Client.Set("statistics:start", time.Now().UTC().Format(time.RFC3339), 0)
@@ -219,7 +220,7 @@ func TestRouteDispatchesStoreAndCheck(t *testing.T) {
{Destination: dest, Payload: []byte("routed msg")}, {Destination: dest, Payload: []byte("routed msg")},
}, },
} }
resp, err := router.Route(storeReq) resp, err := router.Route(context.Background(),storeReq)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, "route-store-uuid", resp.UuidAck) assert.Equal(t, "route-store-uuid", resp.UuidAck)
@@ -229,7 +230,7 @@ func TestRouteDispatchesStoreAndCheck(t *testing.T) {
{LookupKey: dest}, {LookupKey: dest},
}, },
} }
resp, err = router.Route(pullReq) resp, err = router.Route(context.Background(),pullReq)
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, resp.Chat, 1) assert.Len(t, resp.Chat, 1)
assert.Equal(t, []byte("routed msg"), resp.Chat[0].Payload) assert.Equal(t, []byte("routed msg"), resp.Chat[0].Payload)
@@ -240,7 +241,7 @@ func TestRouteEmptyMessage(t *testing.T) {
router, mr := newTestRouter(t) router, mr := newTestRouter(t)
defer mr.Close() defer mr.Close()
resp, err := router.Route(&meowlib.ToServerMessage{}) resp, err := router.Route(context.Background(),&meowlib.ToServerMessage{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Nil(t, resp) assert.Nil(t, resp)
} }
@@ -250,9 +251,9 @@ func TestRouteIncrementsTotalCounter(t *testing.T) {
router, mr := newTestRouter(t) router, mr := newTestRouter(t)
defer mr.Close() defer mr.Close()
router.Route(&meowlib.ToServerMessage{}) router.Route(context.Background(),&meowlib.ToServerMessage{})
router.Route(&meowlib.ToServerMessage{}) router.Route(context.Background(),&meowlib.ToServerMessage{})
router.Route(&meowlib.ToServerMessage{}) router.Route(context.Background(),&meowlib.ToServerMessage{})
val, err := router.Client.Get("statistics:messages:total").Int() val, err := router.Client.Get("statistics:messages:total").Int()
assert.NoError(t, err) assert.NoError(t, err)
@@ -553,7 +554,7 @@ func TestRouteMatriochka(t *testing.T) {
Data: []byte("wrapped"), Data: []byte("wrapped"),
}, },
} }
resp, err := router.Route(msg) resp, err := router.Route(context.Background(),msg)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, "route-mtk", resp.UuidAck) assert.Equal(t, "route-mtk", resp.UuidAck)
@@ -577,7 +578,7 @@ func TestRouteInvitation(t *testing.T) {
ShortcodeLen: 6, ShortcodeLen: 6,
}, },
} }
resp, err := router.Route(msg) resp, err := router.Route(context.Background(),msg)
assert.NoError(t, err) assert.NoError(t, err)
assert.NotEmpty(t, resp.Invitation.Shortcode) assert.NotEmpty(t, resp.Invitation.Shortcode)
assert.Len(t, resp.Invitation.Shortcode, 6) assert.Len(t, resp.Invitation.Shortcode, 6)
@@ -594,7 +595,7 @@ func TestStatisticsCountersIncrement(t *testing.T) {
dest := "stats-dest" dest := "stats-dest"
// one store increments usermessages // one store increments usermessages
router.Route(&meowlib.ToServerMessage{ router.Route(context.Background(),&meowlib.ToServerMessage{
Messages: []*meowlib.PackedUserMessage{ Messages: []*meowlib.PackedUserMessage{
{Destination: dest, Payload: []byte("x")}, {Destination: dest, Payload: []byte("x")},
}, },
@@ -603,7 +604,7 @@ func TestStatisticsCountersIncrement(t *testing.T) {
assert.Equal(t, 1, val) assert.Equal(t, 1, val)
// one pull increments messagelookups // one pull increments messagelookups
router.Route(&meowlib.ToServerMessage{ router.Route(context.Background(),&meowlib.ToServerMessage{
PullRequest: []*meowlib.ConversationRequest{ PullRequest: []*meowlib.ConversationRequest{
{LookupKey: dest}, {LookupKey: dest},
}, },
@@ -612,14 +613,14 @@ func TestStatisticsCountersIncrement(t *testing.T) {
assert.Equal(t, 1, val) assert.Equal(t, 1, val)
// one matriochka increments matriochka counter // one matriochka increments matriochka counter
router.Route(&meowlib.ToServerMessage{ router.Route(context.Background(),&meowlib.ToServerMessage{
MatriochkaMessage: &meowlib.Matriochka{Data: []byte("m")}, MatriochkaMessage: &meowlib.Matriochka{Data: []byte("m")},
}) })
val, _ = router.Client.Get("statistics:messages:matriochka").Int() val, _ = router.Client.Get("statistics:messages:matriochka").Int()
assert.Equal(t, 1, val) assert.Equal(t, 1, val)
// one invitation increments invitation counter // one invitation increments invitation counter
router.Route(&meowlib.ToServerMessage{ router.Route(context.Background(),&meowlib.ToServerMessage{
Invitation: &meowlib.Invitation{ Invitation: &meowlib.Invitation{
Step: 1, Step: 1,
Payload: []byte("i"), Payload: []byte("i"),