Compare commits

...

8 Commits

Author SHA1 Message Date
yves 9037a7b3c7 subscriptions cleanup
continuous-integration/drone/push Build is failing
2026-04-21 20:07:35 +02:00
yves ac305eaae0 redis subscriptions fix
continuous-integration/drone/push Build is failing
2026-04-21 19:21:16 +02:00
yves 5d12e0f869 long poll fix
continuous-integration/drone/push Build is failing
2026-04-21 17:00:58 +02:00
yves 8b106db52f duplicate messages send fixes
continuous-integration/drone/push Build is failing
2026-04-21 15:53:56 +02:00
yves 4e78ce5799 reorder received messages
continuous-integration/drone/push Build is failing
2026-04-19 18:36:44 +02:00
yves b9a1233e0e uuid return on create ne message
continuous-integration/drone/push Build is failing
2026-04-18 21:46:28 +02:00
yves a5dd6cd73f delete peer cache fix
continuous-integration/drone/push Build is failing
2026-04-18 20:40:23 +02:00
yves 00e4e6b046 cleaner invitation step messages 2026-04-14 19:12:09 +02:00
21 changed files with 760 additions and 698 deletions
+41 -34
View File
@@ -1,10 +1,12 @@
package helpers package helpers
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"sort"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@@ -14,6 +16,7 @@ import (
invmsgs "forge.redroom.link/yves/meowlib/client/invitation/messages" invmsgs "forge.redroom.link/yves/meowlib/client/invitation/messages"
invsrv "forge.redroom.link/yves/meowlib/client/invitation/server" invsrv "forge.redroom.link/yves/meowlib/client/invitation/server"
"github.com/google/uuid" "github.com/google/uuid"
doubleratchet "github.com/status-im/doubleratchet"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
@@ -106,7 +109,7 @@ func PollServer(storage_path string, job *client.RequestsJob, timeout int, longP
// SaveCheckJobs // SaveCheckJobs
func SaveCheckJobs() (string, error) { func SaveCheckJobs() (string, error) {
me := client.GetConfig().GetIdentity() me := client.GetConfig().GetIdentity()
err := me.SaveBackgroundJob() err := me.SaveCheckJobs()
if err != nil { if err != nil {
return "CheckMessages: json.Marshal", err return "CheckMessages: json.Marshal", err
@@ -133,8 +136,12 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error
} }
// check if invitation answer (step-2 answer waiting for the initiator) // check if invitation answer (step-2 answer waiting for the initiator)
if fromServerMessage.Invitation != nil { if fromServerMessage.Invitation != nil {
peer, _, invErr := invmsgs.Step3InitiatorFinalizesInviteeAndCreatesContactCard(fromServerMessage.Invitation) invBytes, marshalErr := proto.Marshal(fromServerMessage.Invitation)
if invErr == nil && peer != nil { if marshalErr == nil {
step3Bytes, invErr := invmsgs.Step3InitiatorFinalizesInviteeAndCreatesContactCard(invBytes)
if invErr == nil && step3Bytes != nil {
peer := client.GetConfig().GetIdentity().Peers.GetFromInvitationId(fromServerMessage.Invitation.Uuid)
if peer != nil {
// Auto-send step-3 CC to invitee's servers. // Auto-send step-3 CC to invitee's servers.
msgs, sendErr := invsrv.Step3PostCard(peer.InvitationId) msgs, sendErr := invsrv.Step3PostCard(peer.InvitationId)
if sendErr == nil { if sendErr == nil {
@@ -146,14 +153,29 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error
} }
} }
} }
}
}
// Chat messages // Chat messages
if len(fromServerMessage.Chat) > 0 { if len(fromServerMessage.Chat) > 0 {
// Sort by DR chain sequence number so messages are decrypted in ratchet order,
// regardless of server delivery order.
sort.SliceStable(fromServerMessage.Chat, func(i, j int) bool {
var hi, hj doubleratchet.MessageHeader
if err := json.Unmarshal(fromServerMessage.Chat[i].DrHeader, &hi); err != nil {
return false
}
if err := json.Unmarshal(fromServerMessage.Chat[j].DrHeader, &hj); err != nil {
return false
}
return hi.N < hj.N
})
for _, packedUserMessage := range fromServerMessage.Chat { for _, packedUserMessage := range fromServerMessage.Chat {
// find the peer with that lookup key // find the peer with that lookup key
peer := identity.Peers.GetFromMyLookupKey(packedUserMessage.Destination) peer := identity.Peers.GetFromMyLookupKey(packedUserMessage.Destination)
if peer == nil { if peer == nil {
return nil, nil, "ReadMessage: GetFromMyLookupKey", errors.New("no visible peer for that message") logger.Error().Str("destination", packedUserMessage.Destination).Msg("ConsumeInboxFile: no visible peer for that message, skipping")
continue
} }
// Unpack the message — step-3 messages arrive before the initiator's identity // Unpack the message — step-3 messages arrive before the initiator's identity
// key is known, so skip signature verification for pending peers. // key is known, so skip signature verification for pending peers.
@@ -164,12 +186,15 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error
usermsg, err = peer.ProcessInboundUserMessage(packedUserMessage) usermsg, err = peer.ProcessInboundUserMessage(packedUserMessage)
} }
if err != nil { if err != nil {
return nil, nil, "ReadMessage: ProcessInboundUserMessage", err //return nil, nil, "ReadMessage: ProcessInboundUserMessage", err
} logger.Error().Msg("ReadMessage: ProcessInboundUserMessage" + err.Error())
} else {
// Handle invitation step 3: initiator's full ContactCard arriving at the invitee. // Handle invitation step 3: initiator's full ContactCard arriving at the invitee.
if usermsg.Invitation != nil && usermsg.Invitation.Step == 3 { if usermsg.Invitation != nil && usermsg.Invitation.Step == 3 {
finalizedPeer, finalErr := invmsgs.Step4InviteeFinalizesInitiator(usermsg) invBytes, marshalErr := proto.Marshal(usermsg.Invitation)
if marshalErr == nil {
finalizedPeer, finalErr := invmsgs.Step4InviteeFinalizesInitiator(invBytes)
if finalErr == nil && finalizedPeer != nil { if finalErr == nil && finalizedPeer != nil {
// Auto-send step-4 confirmation to initiator's servers. // Auto-send step-4 confirmation to initiator's servers.
step4msgs, sendErr := invsrv.Step4PostConfirmation(finalizedPeer.InvitationId) step4msgs, sendErr := invsrv.Step4PostConfirmation(finalizedPeer.InvitationId)
@@ -181,6 +206,7 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error
} }
} }
} }
}
continue continue
} }
@@ -257,6 +283,8 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error
} }
} }
}
err = os.Remove(messageFilename) err = os.Remove(messageFilename)
if err != nil { if err != nil {
return nil, nil, "ReadMessage: Remove", err return nil, nil, "ReadMessage: Remove", err
@@ -266,54 +294,33 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error
return messagesOverview, filenames, "", nil return messagesOverview, filenames, "", nil
} }
// LongPollAllSerevrJobs checks for messages on a all servers defived in job file // LongPollAllServerJobs checks for messages on all servers defined in job file.
// It returns as soon as any server delivers at least one message, or 0 when all
// polls time out. resultChan is buffered so goroutines never block on write.
func LongPollAllServerJobs(storage_path string, jobs []client.RequestsJob, timeout int, longPoll bool) (int, string, error) { func LongPollAllServerJobs(storage_path string, jobs []client.RequestsJob, timeout int, longPoll bool) (int, string, error) {
// Channel to collect results
resultChan := make(chan int, len(jobs)) resultChan := make(chan int, len(jobs))
errChan := make(chan error, len(jobs))
// WaitGroup to sync goroutines
var wg sync.WaitGroup var wg sync.WaitGroup
// Loop through each job (server)
for _, job := range jobs { for _, job := range jobs {
wg.Add(1) wg.Add(1)
go func(job client.RequestsJob) { go func(job client.RequestsJob) {
defer wg.Done() defer wg.Done()
// Long-polling call to the server
cnt, _, err := PollServer(storage_path, &job, timeout, true) cnt, _, err := PollServer(storage_path, &job, timeout, true)
if err == nil && cnt > 0 { if err == nil && cnt > 0 {
select { resultChan <- cnt
case resultChan <- cnt:
default:
}
// Close the error channel to notify all goroutines
close(errChan)
} }
}(job) }(job)
} }
// Close the result channel when all workers are done
go func() { go func() {
wg.Wait() wg.Wait()
close(resultChan) close(resultChan)
}() }()
// Wait for the first message or all timeouts if cnt, ok := <-resultChan; ok {
select {
case cnt := <-resultChan:
return cnt, "", nil return cnt, "", nil
case <-errChan:
// If one fails and exitOnMessage is true
return 0, "", nil
} }
return 0, "", nil
} }
// sendDeliveryAck builds a delivery acknowledgment for messageUuid and enqueues // sendDeliveryAck builds a delivery acknowledgment for messageUuid and enqueues
+11 -8
View File
@@ -21,33 +21,36 @@ const defaultPostTimeout = 200
// It creates and stores the user message, serialises the packed form to // It creates and stores the user message, serialises the packed form to
// storagePath/outbox/{dbFile}_{dbId}, and enqueues a SendJob in // storagePath/outbox/{dbFile}_{dbId}, and enqueues a SendJob in
// storagePath/queues/{peerUid}. // storagePath/queues/{peerUid}.
func CreateUserMessageAndSendJob(storagePath, message, peerUid, replyToUid string, filelist []string, servers []client.Server, timeout int) error { func CreateUserMessageAndSendJob(storagePath, message, peerUid, replyToUid string, filelist []string, servers []client.Server, timeout int) (string, error) {
packedMsg, dbFile, dbId, errTxt, err := CreateAndStoreUserMessage(message, peerUid, replyToUid, filelist) packedMsg, dbFile, dbId, msgUuid, errTxt, err := CreateAndStoreUserMessage(message, peerUid, replyToUid, filelist)
if err != nil { if err != nil {
return fmt.Errorf("%s: %w", errTxt, err) return "", fmt.Errorf("%s: %w", errTxt, err)
} }
data, err := proto.Marshal(packedMsg) data, err := proto.Marshal(packedMsg)
if err != nil { if err != nil {
return fmt.Errorf("CreateUserMessageAndSendJob: proto.Marshal: %w", err) return "", fmt.Errorf("CreateUserMessageAndSendJob: proto.Marshal: %w", err)
} }
outboxDir := filepath.Join(storagePath, "outbox") outboxDir := filepath.Join(storagePath, "outbox")
if err := os.MkdirAll(outboxDir, 0700); err != nil { if err := os.MkdirAll(outboxDir, 0700); err != nil {
return fmt.Errorf("CreateUserMessageAndSendJob: MkdirAll: %w", err) return "", fmt.Errorf("CreateUserMessageAndSendJob: MkdirAll: %w", err)
} }
outboxFile := filepath.Join(outboxDir, fmt.Sprintf("%s_%d", dbFile, dbId)) outboxFile := filepath.Join(outboxDir, fmt.Sprintf("%s_%d", dbFile, dbId))
if err := os.WriteFile(outboxFile, data, 0600); err != nil { if err := os.WriteFile(outboxFile, data, 0600); err != nil {
return fmt.Errorf("CreateUserMessageAndSendJob: WriteFile: %w", err) return "", fmt.Errorf("CreateUserMessageAndSendJob: WriteFile: %w", err)
} }
return client.PushSendJob(storagePath, &client.SendJob{ if err := client.PushSendJob(storagePath, &client.SendJob{
Queue: peerUid, Queue: peerUid,
File: outboxFile, File: outboxFile,
Servers: servers, Servers: servers,
Timeout: timeout, Timeout: timeout,
}) }); err != nil {
return "", err
}
return msgUuid, nil
} }
// ProcessSendQueues discovers every queue DB file under storagePath/queues/ // ProcessSendQueues discovers every queue DB file under storagePath/queues/
+2 -1
View File
@@ -202,7 +202,7 @@ func TestCreateUserMessageAndSendJob(t *testing.T) {
srv := newTestServer(t, "http://test-srv.example") srv := newTestServer(t, "http://test-srv.example")
err := CreateUserMessageAndSendJob( msgUuid, err := CreateUserMessageAndSendJob(
dir, dir,
"hello from integration", "hello from integration",
"peer-create-send", "peer-create-send",
@@ -212,6 +212,7 @@ func TestCreateUserMessageAndSendJob(t *testing.T) {
60, 60,
) )
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, msgUuid, "returned UUID must not be empty")
// A pending job must be in the queue. // A pending job must be in the queue.
job, _, err := client.PeekSendJob(dir, "peer-create-send") job, _, err := client.PeekSendJob(dir, "peer-create-send")
+10 -9
View File
@@ -42,7 +42,7 @@ func PackMessageForServer(packedMsg *meowlib.PackedUserMessage, srvuid string) (
} }
func CreateStorePackUserMessageForServer(message string, srvuid string, peer_uid string, replyToUid string, filelist []string) ([]byte, string, error) { func CreateStorePackUserMessageForServer(message string, srvuid string, peer_uid string, replyToUid string, filelist []string) ([]byte, string, error) {
usermessage, _, _, errtxt, err := CreateAndStoreUserMessage(message, peer_uid, replyToUid, filelist) usermessage, _, _, _, errtxt, err := CreateAndStoreUserMessage(message, peer_uid, replyToUid, filelist)
if err != nil { if err != nil {
return nil, errtxt, err return nil, errtxt, err
} }
@@ -51,20 +51,20 @@ func CreateStorePackUserMessageForServer(message string, srvuid string, peer_uid
// CreateAndStoreUserMessage creates, signs, and stores an outbound message for // CreateAndStoreUserMessage creates, signs, and stores an outbound message for
// peer_uid. It returns the packed (encrypted) form ready for server transport, // peer_uid. It returns the packed (encrypted) form ready for server transport,
// the peer DB file UUID (dbFile), the SQLite row ID (dbId), an error context // the peer DB file UUID (dbFile), the SQLite row ID (dbId), the message UUID
// string, and any error. // (conversation_status uuid), an error context string, and any error.
func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid string, filelist []string) (*meowlib.PackedUserMessage, string, int64, string, error) { func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid string, filelist []string) (*meowlib.PackedUserMessage, string, int64, string, string, error) {
peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid) peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid)
// Creating User message // Creating User message
usermessage, err := peer.BuildSimpleUserMessage([]byte(message)) usermessage, err := peer.BuildSimpleUserMessage([]byte(message))
if err != nil { if err != nil {
return nil, "", 0, "PrepareServerMessage : BuildSimpleUserMessage", err return nil, "", 0, "", "PrepareServerMessage : BuildSimpleUserMessage", err
} }
for _, file := range filelist { for _, file := range filelist {
err = usermessage.AddFile(file, client.GetConfig().Chunksize) err = usermessage.AddFile(file, client.GetConfig().Chunksize)
if err != nil { if err != nil {
return nil, "", 0, "PrepareServerMessage : AddFile", err return nil, "", 0, "", "PrepareServerMessage : AddFile", err
} }
} }
usermessage.Status.Sent = uint64(time.Now().UTC().Unix()) usermessage.Status.Sent = uint64(time.Now().UTC().Unix())
@@ -73,16 +73,17 @@ func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid strin
// Store message // Store message
err = peer.StoreMessage(usermessage, nil) err = peer.StoreMessage(usermessage, nil)
if err != nil { if err != nil {
return nil, "", 0, "messageBuildPostprocess : StoreMessage", err return nil, "", 0, "", "messageBuildPostprocess : StoreMessage", err
} }
dbFile := peer.LastMessage.Dbfile dbFile := peer.LastMessage.Dbfile
dbId := peer.LastMessage.Dbid dbId := peer.LastMessage.Dbid
msgUuid := usermessage.Status.Uuid
// Prepare cyphered + packed user message // Prepare cyphered + packed user message
packedMsg, err := peer.ProcessOutboundUserMessage(usermessage) packedMsg, err := peer.ProcessOutboundUserMessage(usermessage)
if err != nil { if err != nil {
return nil, "", 0, "messageBuildPostprocess : ProcessOutboundUserMessage", err return nil, "", 0, "", "messageBuildPostprocess : ProcessOutboundUserMessage", err
} }
// Persist peer to save updated DR state (DrStateJson) // Persist peer to save updated DR state (DrStateJson)
@@ -92,7 +93,7 @@ func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid strin
} }
} }
return packedMsg, dbFile, dbId, "", nil return packedMsg, dbFile, dbId, msgUuid, "", nil
} }
func BuildReceivedMessage(messageUid string, peer_uid string, received int64) (*meowlib.PackedUserMessage, string, error) { func BuildReceivedMessage(messageUid string, peer_uid string, received int64) (*meowlib.PackedUserMessage, string, error) {
+4 -1
View File
@@ -425,7 +425,10 @@ func (id *Identity) GetRequestJobs() []RequestsJob {
return list return list
} }
func (id *Identity) SaveBackgroundJob() error { func (id *Identity) SaveCheckJobs() error {
if id.RootKp == nil {
return errors.New("identity not fully initialized: RootKp is nil")
}
var bj BackgroundJob var bj BackgroundJob
bj.Jobs = id.GetRequestJobs() bj.Jobs = id.GetRequestJobs()
bj.RootPublic = id.RootKp.Public bj.RootPublic = id.RootKp.Public
+17 -13
View File
@@ -3,28 +3,32 @@ package files
import ( import (
"os" "os"
"forge.redroom.link/yves/meowlib"
"forge.redroom.link/yves/meowlib/client" "forge.redroom.link/yves/meowlib/client"
"forge.redroom.link/yves/meowlib/client/invitation/messages" "forge.redroom.link/yves/meowlib/client/invitation/messages"
"google.golang.org/protobuf/proto"
) )
// Step1Write creates a pending peer and writes the InvitationInitPayload to a file. // Step1Write creates a pending peer and writes the InvitationInitPayload to a file.
// format: "qr" writes a QR-code PNG; anything else writes a compressed binary .mwiv file. // format: "qr" writes a QR-code PNG; anything else writes a compressed binary .mwiv file.
func Step1Write(contactName string, myNickname string, invitationMessage string, serverUids []string, format string) (*client.Peer, error) { func Step1Write(contactName string, myNickname string, invitationMessage string, serverUids []string, format string) error {
payload, peer, err := messages.Step1InitiatorCreatesInviteeAndTempKey(contactName, myNickname, invitationMessage, serverUids) payloadBytes, err := messages.Step1InitiatorCreatesInviteeAndTempKey(contactName, myNickname, invitationMessage, serverUids)
if err != nil { if err != nil {
return nil, err return err
}
var payload meowlib.InvitationInitPayload
if err := proto.Unmarshal(payloadBytes, &payload); err != nil {
return err
}
mynick := myNickname
if mynick == "" {
mynick = client.GetConfig().GetIdentity().Nickname
} }
c := client.GetConfig() c := client.GetConfig()
if format == "qr" { if format == "qr" {
filename := c.StoragePath + string(os.PathSeparator) + peer.MyName + "-" + peer.Name + ".png" filename := c.StoragePath + string(os.PathSeparator) + mynick + "-" + contactName + ".png"
if err := payload.WriteQr(filename); err != nil { return payload.WriteQr(filename)
return nil, err
} }
} else { filename := c.StoragePath + string(os.PathSeparator) + mynick + "-" + contactName + ".mwiv"
filename := c.StoragePath + string(os.PathSeparator) + peer.MyName + "-" + peer.Name + ".mwiv" return payload.WriteCompressed(filename)
if err := payload.WriteCompressed(filename); err != nil {
return nil, err
}
}
return peer, nil
} }
+9 -21
View File
@@ -12,8 +12,8 @@ import (
) )
// Step2ReadAndAnswer reads an InvitationInitPayload from a .mwiv file, creates the // Step2ReadAndAnswer reads an InvitationInitPayload from a .mwiv file, creates the
// invitee's peer entry, and writes the encrypted ContactCard (PackedUserMessage) to a // invitee's peer entry, and writes the serialized Invitation (step=2) to a .mwiv file
// .mwiv file for the initiator to pick up and process in step 3. // for the initiator to pick up and process in step 3.
func Step2ReadAndAnswer(invitationFile string, nickname string, myNickname string, serverUids []string) error { func Step2ReadAndAnswer(invitationFile string, nickname string, myNickname string, serverUids []string) error {
if _, err := os.Stat(invitationFile); os.IsNotExist(err) { if _, err := os.Stat(invitationFile); os.IsNotExist(err) {
return err return err
@@ -29,35 +29,23 @@ func Step2ReadAndAnswer(invitationFile string, nickname string, myNickname strin
if err != nil { if err != nil {
return err return err
} }
payloadBytes, err := proto.Marshal(payload)
if err != nil {
return err
}
mynick := myNickname mynick := myNickname
if mynick == "" { if mynick == "" {
mynick = client.GetConfig().GetIdentity().Nickname mynick = client.GetConfig().GetIdentity().Nickname
} }
packed, peer, err := messages.Step2InviteeCreatesInitiatorAndEncryptedContactCard(payload, nickname, mynick, serverUids) // messages.Step2 returns a serialized Invitation ready to write directly to file.
if err != nil { invBytes, err := messages.Step2InviteeCreatesInitiatorAndEncryptedContactCard(payloadBytes, nickname, mynick, serverUids)
return err
}
// Wrap the PackedUserMessage in an Invitation so the initiator (step3) has the
// invitee's public key available for signature verification without an extra file.
packedBytes, err := proto.Marshal(packed)
if err != nil {
return err
}
invitation := &meowlib.Invitation{
Uuid: peer.InvitationId,
Step: 2,
From: peer.MyIdentity.Public,
Payload: packedBytes,
}
out, err := proto.Marshal(invitation)
if err != nil { if err != nil {
return err return err
} }
c := client.GetConfig() c := client.GetConfig()
filename := c.StoragePath + string(os.PathSeparator) + mynick + "-" + nickname + ".mwiv" filename := c.StoragePath + string(os.PathSeparator) + mynick + "-" + nickname + ".mwiv"
return os.WriteFile(filename, out, 0600) return os.WriteFile(filename, invBytes, 0600)
} }
+78 -84
View File
@@ -29,114 +29,108 @@ func setupIdentity(t *testing.T, nickname string) (*client.Identity, func()) {
return id, cleanup return id, cleanup
} }
// TestStep2ProducesPackedUserMessage verifies that Step2 returns a PackedUserMessage // TestStep1ReturnsBinaryPayload verifies that Step1 returns non-empty bytes that
// (not just a peer) and that the message is encrypted with the initiator's temp key // deserialise to a valid InvitationInitPayload, and that the pending peer is stored
// so Step3 can decrypt it. // with only a temp keypair (no real identity keys yet).
func TestStep2ProducesPackedUserMessage(t *testing.T) { func TestStep1ReturnsBinaryPayload(t *testing.T) {
cfg := client.GetConfig() cfg := client.GetConfig()
cfg.SetMemPass("testpass") //nolint:errcheck cfg.SetMemPass("testpass") //nolint:errcheck
// --- STEP 1: initiator creates temp keypair and payload ---
initiator, cleanInit := setupIdentity(t, "alice") initiator, cleanInit := setupIdentity(t, "alice")
defer cleanInit() defer cleanInit()
payload, initPeer, err := messages.Step1InitiatorCreatesInviteeAndTempKey("Bob", "Alice", "Hello!", nil) step1Bytes, err := messages.Step1InitiatorCreatesInviteeAndTempKey("Bob", "Alice", "Hello!", nil)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, payload) require.NotEmpty(t, step1Bytes)
var payload meowlib.InvitationInitPayload
require.NoError(t, proto.Unmarshal(step1Bytes, &payload))
assert.NotEmpty(t, payload.Uuid)
assert.NotEmpty(t, payload.PublicKey)
assert.Equal(t, "Alice", payload.Name)
assert.Equal(t, "Hello!", payload.InvitationMessage)
// Peer is saved with temp keypair only — no real identity keys yet.
initPeer := initiator.Peers.GetFromName("Bob")
require.NotNil(t, initPeer) require.NotNil(t, initPeer)
// Initiator has only the temp keypair at this stage.
assert.Nil(t, initPeer.MyIdentity) assert.Nil(t, initPeer.MyIdentity)
assert.NotNil(t, initPeer.InvitationKp) assert.NotNil(t, initPeer.InvitationKp)
}
// --- STEP 2: invitee receives payload, creates peer, returns packed message --- // TestFullInvitationFlow runs all four steps end-to-end, passing the binary output of
_, cleanInvitee := setupIdentity(t, "bob") // each step directly to the next, and verifies that both peers end up with each other's
// real keys after the exchange completes.
func TestFullInvitationFlow(t *testing.T) {
cfg := client.GetConfig()
cfg.SetMemPass("testpass") //nolint:errcheck
// --- STEP 1: initiator creates temp keypair, gets binary payload ---
initiator, cleanInit := setupIdentity(t, "alice2")
defer cleanInit()
step1Bytes, err := messages.Step1InitiatorCreatesInviteeAndTempKey("Bob", "Alice", "", nil)
require.NoError(t, err)
require.NotEmpty(t, step1Bytes)
// --- STEP 2: invitee creates peer, returns serialized Invitation (step=2) ---
invitee, cleanInvitee := setupIdentity(t, "bob2")
defer cleanInvitee() defer cleanInvitee()
packed, inviteePeer, err := messages.Step2InviteeCreatesInitiatorAndEncryptedContactCard( step2Bytes, err := messages.Step2InviteeCreatesInitiatorAndEncryptedContactCard(step1Bytes, "Alice", "Bob", nil)
payload, "Alice", "Bob", nil,
)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, packed, "step2 must return a PackedUserMessage, not just a peer") require.NotEmpty(t, step2Bytes, "step2 must return non-empty invitation bytes")
// Invitee now has a peer with full keypairs.
inviteePeer := invitee.Peers.GetFromName("Alice")
require.NotNil(t, inviteePeer) require.NotNil(t, inviteePeer)
// The packed message destination is the initiator's temp key (used as lookup key).
assert.Equal(t, payload.PublicKey, packed.Destination)
// The invitee peer has full keypairs now.
assert.NotNil(t, inviteePeer.MyIdentity) assert.NotNil(t, inviteePeer.MyIdentity)
assert.NotNil(t, inviteePeer.MyEncryptionKp) assert.NotNil(t, inviteePeer.MyEncryptionKp)
assert.NotNil(t, inviteePeer.MyLookupKp) assert.NotNil(t, inviteePeer.MyLookupKp)
// --- STEP 3: initiator decrypts invitee's packed message and finalises --- // The step-2 wire format is a serialized Invitation.
var inv2 meowlib.Invitation
require.NoError(t, proto.Unmarshal(step2Bytes, &inv2))
assert.EqualValues(t, 2, inv2.Step)
assert.NotEmpty(t, inv2.Uuid)
assert.Equal(t, inviteePeer.MyIdentity.Public, inv2.From)
assert.NotEmpty(t, inv2.Payload)
// --- STEP 3: initiator decrypts invitee's card, returns serialized Invitation (step=3) ---
cfg.SetIdentity(initiator) cfg.SetIdentity(initiator)
// Simulate how the server delivers the step-2 answer: marshal the PackedUserMessage step3Bytes, err := messages.Step3InitiatorFinalizesInviteeAndCreatesContactCard(step2Bytes)
// into an Invitation.Payload.
packedBytes, err := proto.Marshal(packed)
require.NoError(t, err) require.NoError(t, err)
require.NotEmpty(t, step3Bytes)
invitation := &meowlib.Invitation{ // Initiator's peer must now hold invitee's real keys; temp keypair must be gone.
Uuid: payload.Uuid, initPeer := initiator.Peers.GetFromName("Bob")
Step: 2, require.NotNil(t, initPeer)
From: inviteePeer.MyIdentity.Public, assert.Equal(t, inviteePeer.MyIdentity.Public, initPeer.ContactPublicKey)
Payload: packedBytes, assert.Equal(t, inviteePeer.MyEncryptionKp.Public, initPeer.ContactEncryption)
} assert.Equal(t, inviteePeer.MyLookupKp.Public, initPeer.ContactLookupKey)
assert.Nil(t, initPeer.InvitationKp, "temp keypair must be cleared after step3")
assert.NotEmpty(t, initPeer.DrKpPublic)
assert.NotEmpty(t, initPeer.DrRootKey)
peer, myCC, err := messages.Step3InitiatorFinalizesInviteeAndCreatesContactCard(invitation) // The step-3 wire format is a serialized Invitation.
var inv3 meowlib.Invitation
require.NoError(t, proto.Unmarshal(step3Bytes, &inv3))
assert.EqualValues(t, 3, inv3.Step)
assert.NotEmpty(t, inv3.Uuid)
assert.NotEmpty(t, inv3.Payload)
// --- STEP 4: invitee finalises initiator ---
cfg.SetIdentity(invitee)
finalPeer, err := messages.Step4InviteeFinalizesInitiator(step3Bytes)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, peer) require.NotNil(t, finalPeer)
require.NotNil(t, myCC, "step3 must produce the initiator's ContactCard to send to invitee")
// Initiator's peer must now hold invitee's real keys. // Invitee's peer must now hold initiator's real keys and the invitation must be complete.
assert.Equal(t, inviteePeer.MyIdentity.Public, peer.ContactPublicKey) assert.Equal(t, initPeer.MyIdentity.Public, finalPeer.ContactPublicKey)
assert.Equal(t, inviteePeer.MyEncryptionKp.Public, peer.ContactEncryption) assert.Equal(t, initPeer.MyEncryptionKp.Public, finalPeer.ContactEncryption)
assert.Equal(t, inviteePeer.MyLookupKp.Public, peer.ContactLookupKey) assert.Equal(t, initPeer.MyLookupKp.Public, finalPeer.ContactLookupKey)
assert.Nil(t, peer.InvitationKp, "temp keypair must be cleared after step3") assert.False(t, finalPeer.InvitationPending(), "invitation must be fully finalized")
assert.NotEmpty(t, myCC.DrRootKey) assert.NotEmpty(t, finalPeer.DrRootKey)
assert.NotEmpty(t, myCC.DrPublicKey) assert.NotEmpty(t, finalPeer.ContactDrPublicKey)
}
// TestStep2Step3RoundTripPayload verifies that the PackedUserMessage produced by step2
// actually carries the invitee's ContactCard when decrypted by the initiator.
func TestStep2Step3RoundTripPayload(t *testing.T) {
cfg := client.GetConfig()
cfg.SetMemPass("testpass") //nolint:errcheck
initiator, cleanInit := setupIdentity(t, "alice2")
defer cleanInit()
payload, _, err := messages.Step1InitiatorCreatesInviteeAndTempKey("Bob", "Alice", "", nil)
require.NoError(t, err)
_, cleanInvitee := setupIdentity(t, "bob2")
defer cleanInvitee()
packed, inviteePeer, err := messages.Step2InviteeCreatesInitiatorAndEncryptedContactCard(payload, "Alice", "Bob", nil)
require.NoError(t, err)
// Confirm the message serialises cleanly (transport simulation).
packedBytes, err := proto.Marshal(packed)
require.NoError(t, err)
assert.NotEmpty(t, packedBytes)
// Switch back to initiator and run step3.
cfg.SetIdentity(initiator)
var roundTripped meowlib.PackedUserMessage
require.NoError(t, proto.Unmarshal(packedBytes, &roundTripped))
invitation := &meowlib.Invitation{
Uuid: payload.Uuid,
Step: 2,
From: inviteePeer.MyIdentity.Public,
Payload: packedBytes,
}
_, myCC, err := messages.Step3InitiatorFinalizesInviteeAndCreatesContactCard(invitation)
require.NoError(t, err)
require.NotNil(t, myCC)
// The initiator's CC must reference the invitee's invitation ID so the invitee
// can match it when step4 arrives.
assert.Equal(t, payload.Uuid, myCC.InvitationId)
} }
+8 -7
View File
@@ -1,22 +1,23 @@
package messages package messages
import ( import (
"forge.redroom.link/yves/meowlib"
"forge.redroom.link/yves/meowlib/client" "forge.redroom.link/yves/meowlib/client"
"google.golang.org/protobuf/proto"
) )
// Step1InitiatorCreatesInviteeAndTempKey creates a minimal pending peer and a temporary // Step1InitiatorCreatesInviteeAndTempKey creates a minimal pending peer and a temporary
// keypair, and returns the InvitationInitPayload to be transmitted to the invitee // keypair, and returns the serialized InvitationInitPayload bytes to be transmitted to
// via any transport (file, QR, server…). // the invitee via any transport (file, QR, server…). The peer is already persisted by
func Step1InitiatorCreatesInviteeAndTempKey(contactName string, myNickname string, invitationMessage string, serverUids []string) (*meowlib.InvitationInitPayload, *client.Peer, error) { // InvitationStep1 so no peer reference is returned.
func Step1InitiatorCreatesInviteeAndTempKey(contactName string, myNickname string, invitationMessage string, serverUids []string) ([]byte, error) {
mynick := myNickname mynick := myNickname
if mynick == "" { if mynick == "" {
mynick = client.GetConfig().GetIdentity().Nickname mynick = client.GetConfig().GetIdentity().Nickname
} }
payload, peer, err := client.GetConfig().GetIdentity().InvitationStep1(mynick, contactName, serverUids, invitationMessage) payload, _, err := client.GetConfig().GetIdentity().InvitationStep1(mynick, contactName, serverUids, invitationMessage)
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
client.GetConfig().GetIdentity().Save() client.GetConfig().GetIdentity().Save()
return payload, peer, nil return proto.Marshal(payload)
} }
+26 -12
View File
@@ -3,31 +3,45 @@ package messages
import ( import (
"forge.redroom.link/yves/meowlib" "forge.redroom.link/yves/meowlib"
"forge.redroom.link/yves/meowlib/client" "forge.redroom.link/yves/meowlib/client"
"google.golang.org/protobuf/proto"
) )
// Step2InviteeCreatesInitiatorAndEncryptedContactCard creates the invitee's peer entry // Step2InviteeCreatesInitiatorAndEncryptedContactCard deserialises the step-1 payload bytes,
// from an InvitationInitPayload, then builds the invitee's ContactCard and returns it // creates the invitee's peer entry, builds and encrypts the invitee's ContactCard, and returns
// as a PackedUserMessage asymmetrically encrypted with the initiator's temporary public // a serialized Invitation (step=2) whose Payload is the PackedUserMessage encrypted with the
// key. The packed message is ready to be transmitted to the initiator via any transport // initiator's temporary public key. The bytes are transport-ready and consumed directly by
// (file, QR, server…); Step3InitiatorFinalizesInviteeAndCreatesContactCard on the // Step3InitiatorFinalizesInviteeAndCreatesContactCard.
// initiator side will decrypt and process it. func Step2InviteeCreatesInitiatorAndEncryptedContactCard(payloadBytes []byte, nickname string, myNickname string, serverUids []string) ([]byte, error) {
func Step2InviteeCreatesInitiatorAndEncryptedContactCard(payload *meowlib.InvitationInitPayload, nickname string, myNickname string, serverUids []string) (*meowlib.PackedUserMessage, *client.Peer, error) {
mynick := myNickname mynick := myNickname
if mynick == "" { if mynick == "" {
mynick = client.GetConfig().GetIdentity().Nickname mynick = client.GetConfig().GetIdentity().Nickname
} }
peer, err := client.GetConfig().GetIdentity().InvitationStep2(mynick, nickname, serverUids, payload) var payload meowlib.InvitationInitPayload
if err := proto.Unmarshal(payloadBytes, &payload); err != nil {
return nil, err
}
peer, err := client.GetConfig().GetIdentity().InvitationStep2(mynick, nickname, serverUids, &payload)
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
usermsg, err := peer.BuildInvitationStep2Message(peer.GetMyContact()) usermsg, err := peer.BuildInvitationStep2Message(peer.GetMyContact())
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
packed, err := peer.ProcessOutboundUserMessage(usermsg) packed, err := peer.ProcessOutboundUserMessage(usermsg)
if err != nil { if err != nil {
return nil, nil, err return nil, err
}
packedBytes, err := proto.Marshal(packed)
if err != nil {
return nil, err
}
inv := &meowlib.Invitation{
Uuid: payload.Uuid,
Step: 2,
From: peer.MyIdentity.Public,
Payload: packedBytes,
} }
client.GetConfig().GetIdentity().Save() client.GetConfig().GetIdentity().Save()
return packed, peer, nil return proto.Marshal(inv)
} }
+29 -13
View File
@@ -8,39 +8,55 @@ import (
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
// Step3InitiatorFinalizesInviteeAndCreatesContactCard is called by the initiator when a // Step3InitiatorFinalizesInviteeAndCreatesContactCard is called by the initiator when the
// step-2 answer (invitee's encrypted ContactCard) arrives. It decrypts the card, upgrades // step-2 answer (serialized Invitation bytes) arrives. It decrypts the invitee's ContactCard,
// the invitee's peer entry with the real keys, and returns the initiator's own ContactCard // upgrades the pending peer with the invitee's real keys, and returns a serialized Invitation
// ready to be sent to the invitee via any transport. // (step=3) whose Payload is the initiator's ContactCard, ready to be consumed directly by
func Step3InitiatorFinalizesInviteeAndCreatesContactCard(invitation *meowlib.Invitation) (*client.Peer, *meowlib.ContactCard, error) { // Step4InviteeFinalizesInitiator on the invitee side.
func Step3InitiatorFinalizesInviteeAndCreatesContactCard(invitationBytes []byte) ([]byte, error) {
var invitation meowlib.Invitation
if err := proto.Unmarshal(invitationBytes, &invitation); err != nil {
return nil, err
}
var invitationAnswer meowlib.PackedUserMessage var invitationAnswer meowlib.PackedUserMessage
if err := proto.Unmarshal(invitation.Payload, &invitationAnswer); err != nil { if err := proto.Unmarshal(invitation.Payload, &invitationAnswer); err != nil {
return nil, nil, err return nil, err
} }
peer := client.GetConfig().GetIdentity().Peers.GetFromInvitationId(invitation.Uuid) peer := client.GetConfig().GetIdentity().Peers.GetFromInvitationId(invitation.Uuid)
if peer == nil { if peer == nil {
return nil, nil, errors.New("no peer for invitation uuid " + invitation.Uuid) return nil, errors.New("no peer for invitation uuid " + invitation.Uuid)
} }
// Guard against duplicate delivery (e.g., same answer from multiple servers). // Guard against duplicate delivery (e.g., same answer from multiple servers).
if peer.InvitationKp == nil { if peer.InvitationKp == nil {
return nil, nil, nil return nil, nil
} }
usermsg, err := peer.ProcessInboundStep2UserMessage(&invitationAnswer, invitation.From) usermsg, err := peer.ProcessInboundStep2UserMessage(&invitationAnswer, invitation.From)
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
var inviteeCC meowlib.ContactCard var inviteeCC meowlib.ContactCard
if err := proto.Unmarshal(usermsg.Invitation.Payload, &inviteeCC); err != nil { if err := proto.Unmarshal(usermsg.Invitation.Payload, &inviteeCC); err != nil {
return nil, nil, err return nil, err
} }
myCC, peer, err := client.GetConfig().GetIdentity().InvitationStep3(&inviteeCC) myCC, _, err := client.GetConfig().GetIdentity().InvitationStep3(&inviteeCC)
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
client.GetConfig().GetIdentity().Save() client.GetConfig().GetIdentity().Save()
return peer, myCC, nil
ccBytes, err := proto.Marshal(myCC)
if err != nil {
return nil, err
}
inv := &meowlib.Invitation{
Uuid: myCC.InvitationId,
Step: 3,
Payload: ccBytes,
}
return proto.Marshal(inv)
} }
+11 -13
View File
@@ -1,27 +1,25 @@
package messages package messages
import ( import (
"errors"
"forge.redroom.link/yves/meowlib" "forge.redroom.link/yves/meowlib"
"forge.redroom.link/yves/meowlib/client" "forge.redroom.link/yves/meowlib/client"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
// Step4InviteeFinalizesInitiator is called by the invitee's message processor when a // Step4InviteeFinalizesInitiator is called by the invitee when the step-3 answer
// UserMessage with invitation.step == 3 arrives. It unmarshals the initiator's ContactCard // (serialized Invitation bytes) arrives. It unmarshals the initiator's ContactCard and
// and completes the invitee's peer entry with the initiator's real keys. // completes the invitee's peer entry with the initiator's real keys.
func Step4InviteeFinalizesInitiator(usermsg *meowlib.UserMessage) (*client.Peer, error) { func Step4InviteeFinalizesInitiator(invitationBytes []byte) (*client.Peer, error) {
if usermsg.Invitation == nil || usermsg.Invitation.Step != 3 { var inv meowlib.Invitation
return nil, errors.New("expected invitation step 3") if err := proto.Unmarshal(invitationBytes, &inv); err != nil {
} return nil, err
var initiatorCC meowlib.ContactCard }
if err := proto.Unmarshal(usermsg.Invitation.Payload, &initiatorCC); err != nil { var initiatorCC meowlib.ContactCard
if err := proto.Unmarshal(inv.Payload, &initiatorCC); err != nil {
return nil, err return nil, err
} }
// Patch the invitation ID from the outer message in case it was not set in the CC.
if initiatorCC.InvitationId == "" { if initiatorCC.InvitationId == "" {
initiatorCC.InvitationId = usermsg.Invitation.Uuid initiatorCC.InvitationId = inv.Uuid
} }
if err := client.GetConfig().GetIdentity().InvitationStep4(&initiatorCC); err != nil { if err := client.GetConfig().GetIdentity().InvitationStep4(&initiatorCC); err != nil {
return nil, err return nil, err
+157 -179
View File
@@ -6,6 +6,7 @@ import (
"math" "math"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"forge.redroom.link/yves/meowlib" "forge.redroom.link/yves/meowlib"
"github.com/google/uuid" "github.com/google/uuid"
@@ -13,71 +14,82 @@ import (
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
func storeMessage(peer *Peer, usermessage *meowlib.UserMessage, filenames []string, password string) error { // One RWMutex per SQLite file path. Entries are never deleted (bounded by
var dbid string // peer count, which is small). RLock for reads, Lock for writes.
cfg := GetConfig() var dbFileMu sync.Map
identity := cfg.GetIdentity()
// If no db/no ID create DB + Tablz
// TODO : if file size > X new db
if len(peer.DbIds) == 0 {
dbid = uuid.NewString()
peer.DbIds = []string{dbid}
identity.Peers.StorePeer(peer) func getDbFileMutex(path string) *sync.RWMutex {
identity.CreateFolder() v, _ := dbFileMu.LoadOrStore(path, &sync.RWMutex{})
file, err := os.Create(filepath.Join(cfg.StoragePath, identity.Uuid, dbid+GetConfig().DbSuffix)) return v.(*sync.RWMutex)
if err != nil { }
return err
} func withDbWrite(path string, fn func(*sql.DB) error) error {
file.Close() mu := getDbFileMutex(path)
sqliteDatabase, err := sql.Open("sqlite3", filepath.Join(cfg.StoragePath, identity.Uuid, dbid+GetConfig().DbSuffix)) mu.Lock()
if err != nil { defer mu.Unlock()
return err db, err := sql.Open("sqlite3", path)
}
defer sqliteDatabase.Close()
err = createMessageTable(sqliteDatabase)
if err != nil {
return err
}
sqliteDatabase.Close()
} else {
dbid = peer.DbIds[len(peer.DbIds)-1]
}
// Open Db
db, err := sql.Open("sqlite3", filepath.Join(cfg.StoragePath, identity.Uuid, dbid+GetConfig().DbSuffix)) // Open the created SQLite File
if err != nil { if err != nil {
return err return err
} }
defer db.Close() defer db.Close()
// Detach Files return fn(db)
}
func withDbRead(path string, fn func(*sql.DB) error) error {
mu := getDbFileMutex(path)
mu.RLock()
defer mu.RUnlock()
db, err := sql.Open("sqlite3", path)
if err != nil {
return err
}
defer db.Close()
return fn(db)
}
func dbPath(cfg *Config, identity *Identity, dbid string) string {
return filepath.Join(cfg.StoragePath, identity.Uuid, dbid+cfg.DbSuffix)
}
func storeMessage(peer *Peer, usermessage *meowlib.UserMessage, filenames []string, password string) error {
cfg := GetConfig()
identity := cfg.GetIdentity()
isNew := len(peer.DbIds) == 0
var dbid string
if isNew {
dbid = uuid.NewString()
peer.DbIds = []string{dbid}
identity.Peers.StorePeer(peer)
identity.CreateFolder()
} else {
dbid = peer.DbIds[len(peer.DbIds)-1]
}
// Detach file attachments — no DB lock needed for file I/O.
hiddenFilenames := []string{} hiddenFilenames := []string{}
if len(usermessage.Files) > 0 { if len(usermessage.Files) > 0 {
secureDir := filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles")
if _, err := os.Stat(secureDir); os.IsNotExist(err) {
if err = os.MkdirAll(secureDir, 0755); err != nil {
return err
}
}
for _, f := range usermessage.Files { for _, f := range usermessage.Files {
hiddenFilename := uuid.NewString() hiddenFilename := uuid.NewString()
// Cypher file
encData, err := meowlib.SymEncrypt(password, f.Data) encData, err := meowlib.SymEncrypt(password, f.Data)
if err != nil { if err != nil {
return err return err
} }
if _, err := os.Stat(filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles")); os.IsNotExist(err) { hidden := filepath.Join(secureDir, hiddenFilename)
err = os.MkdirAll(filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles"), 0755) os.WriteFile(hidden, encData, 0600)
if err != nil { hiddenFilenames = append(hiddenFilenames, hidden)
return err f.Data = []byte(hidden)
} }
} }
os.WriteFile(filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles", hiddenFilename), encData, 0600)
hiddenFilenames = append(hiddenFilenames, filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles", hiddenFilename)) outbound := usermessage.From != peer.ContactPublicKey
// replace f.Data by uuid filename
f.Data = []byte(filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles", hiddenFilename))
}
}
outbound := true
if usermessage.From == peer.ContactPublicKey {
outbound = false
}
// Convert UserMessage to DbMessage
dbm := UserMessageToDbMessage(outbound, usermessage, hiddenFilenames) dbm := UserMessageToDbMessage(outbound, usermessage, hiddenFilenames)
// Encrypt message
out, err := proto.Marshal(dbm) out, err := proto.Marshal(dbm)
if err != nil { if err != nil {
return err return err
@@ -86,98 +98,94 @@ func storeMessage(peer *Peer, usermessage *meowlib.UserMessage, filenames []stri
if err != nil { if err != nil {
return err return err
} }
// Insert message
insertMessageSQL := `INSERT INTO message(m) VALUES (?) RETURNING ID` var id int64
statement, err := db.Prepare(insertMessageSQL) // Prepare statement. path := dbPath(cfg, identity, dbid)
err = withDbWrite(path, func(db *sql.DB) error {
// SQLite creates the file on first Open; create the table if new DB.
if isNew {
if err := createMessageTable(db); err != nil {
return err
}
}
stmt, err := db.Prepare(`INSERT INTO message(m) VALUES (?) RETURNING ID`)
if err != nil { if err != nil {
return err return err
} }
result, err := statement.Exec(encData) result, err := stmt.Exec(encData)
if err != nil { if err != nil {
return err return err
} }
id, err := result.LastInsertId() id, err = result.LastInsertId()
return err
})
if err != nil { if err != nil {
return err return err
} }
ium := DbMessageToInternalUserMessage(id, dbid, dbm)
peer.LastMessage = ium peer.LastMessage = DbMessageToInternalUserMessage(id, dbid, dbm)
identity.Peers.StorePeer(peer) identity.Peers.StorePeer(peer)
return nil return nil
} }
// Get new messages from a peer
func loadNewMessages(peer *Peer, lastDbId int, password string) ([]*InternalUserMessage, error) { func loadNewMessages(peer *Peer, lastDbId int, password string) ([]*InternalUserMessage, error) {
var messages []*InternalUserMessage var messages []*InternalUserMessage
cfg := GetConfig() cfg := GetConfig()
identity := cfg.GetIdentity() identity := cfg.GetIdentity()
// handle no db yet
if len(peer.DbIds) == 0 { if len(peer.DbIds) == 0 {
return messages, nil return messages, nil
} }
fileidx := len(peer.DbIds) - 1 fileidx := len(peer.DbIds) - 1
// There fileidx should provide the db that we need (unless wantMore overlaps the next DB)
db, err := sql.Open("sqlite3", filepath.Join(cfg.StoragePath, identity.Uuid, peer.DbIds[fileidx]+GetConfig().DbSuffix)) // Open the created SQLite File
if err != nil {
return nil, err
}
defer db.Close()
// if it's first app query, it won't hold a lastIndex, so let's start from end
if lastDbId == 0 { if lastDbId == 0 {
lastDbId = math.MaxInt64 lastDbId = math.MaxInt64
} }
err := withDbRead(dbPath(cfg, identity, peer.DbIds[fileidx]), func(db *sql.DB) error {
stm, err := db.Prepare("SELECT id, m FROM message WHERE id > ? ORDER BY id DESC") stm, err := db.Prepare("SELECT id, m FROM message WHERE id > ? ORDER BY id DESC")
if err != nil { if err != nil {
return nil, err return err
} }
defer stm.Close() defer stm.Close()
rows, err := stm.Query(lastDbId) rows, err := stm.Query(lastDbId)
if err != nil { if err != nil {
return nil, err return err
} }
defer rows.Close() defer rows.Close()
for rows.Next() { for rows.Next() {
var ium *InternalUserMessage
var dbm meowlib.DbMessage
var id int64 var id int64
var m []byte var m []byte
err = rows.Scan(&id, &m) if err = rows.Scan(&id, &m); err != nil {
if err != nil { return err
return nil, err
} }
decdata, err := meowlib.SymDecrypt(password, m) decdata, err := meowlib.SymDecrypt(password, m)
if err != nil { if err != nil {
return nil, err return err
} }
err = proto.Unmarshal(decdata, &dbm) var dbm meowlib.DbMessage
if err != nil { if err = proto.Unmarshal(decdata, &dbm); err != nil {
return nil, err return err
} }
ium := DbMessageToInternalUserMessage(id, peer.DbIds[fileidx], &dbm)
ium = DbMessageToInternalUserMessage(id, peer.DbIds[fileidx], &dbm)
ium.Dbid = id ium.Dbid = id
ium.Dbfile = peer.DbIds[fileidx] ium.Dbfile = peer.DbIds[fileidx]
messages = append(messages, ium) messages = append(messages, ium)
} }
return nil
})
// TODO DB overlap // TODO DB overlap
return messages, nil return messages, err
} }
// Get old messages from a peer
func loadMessagesHistory(peer *Peer, inAppMsgCount int, lastDbId int, wantMore int, password string) ([]InternalUserMessage, error) { func loadMessagesHistory(peer *Peer, inAppMsgCount int, lastDbId int, wantMore int, password string) ([]InternalUserMessage, error) {
var messages []InternalUserMessage var messages []InternalUserMessage
// handle no db yet cfg := GetConfig()
if len(peer.DbIds) == 0 { if len(peer.DbIds) == 0 {
return messages, nil return messages, nil
} }
fileidx := len(peer.DbIds) - 1 fileidx := len(peer.DbIds) - 1
// initialize count with last db message count
countStack, err := getMessageCount(peer.DbIds[fileidx]) countStack, err := getMessageCount(peer.DbIds[fileidx])
if err != nil { if err != nil {
return nil, err return nil, err
} }
// while the db message count < what we already have in app, step to next db file
for inAppMsgCount > countStack { for inAppMsgCount > countStack {
fileidx-- fileidx--
if fileidx < 0 { if fileidx < 0 {
@@ -189,92 +197,81 @@ func loadMessagesHistory(peer *Peer, inAppMsgCount int, lastDbId int, wantMore i
} }
countStack += newCount countStack += newCount
} }
// There fileidx should provide the db that we need (unless wantMore overlaps the next DB)
db, err := sql.Open("sqlite3", filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, peer.DbIds[fileidx]+GetConfig().DbSuffix)) // Open the created SQLite File
if err != nil {
return nil, err
}
defer db.Close()
// if it's first app query, it won't hold a lastIndex, so let's start from end
if lastDbId == 0 { if lastDbId == 0 {
lastDbId = math.MaxInt64 lastDbId = math.MaxInt64
} }
err = withDbRead(filepath.Join(cfg.StoragePath, cfg.GetIdentity().Uuid, peer.DbIds[fileidx]+cfg.DbSuffix), func(db *sql.DB) error {
stm, err := db.Prepare("SELECT id, m FROM message WHERE id < ? ORDER BY id DESC LIMIT ?") stm, err := db.Prepare("SELECT id, m FROM message WHERE id < ? ORDER BY id DESC LIMIT ?")
if err != nil { if err != nil {
return nil, err return err
} }
defer stm.Close() defer stm.Close()
rows, err := stm.Query(lastDbId, wantMore) rows, err := stm.Query(lastDbId, wantMore)
if err != nil { if err != nil {
return nil, err return err
} }
defer rows.Close() defer rows.Close()
for rows.Next() { for rows.Next() {
var ium *InternalUserMessage
var dbm meowlib.DbMessage
var id int64 var id int64
var m []byte var m []byte
err = rows.Scan(&id, &m) if err = rows.Scan(&id, &m); err != nil {
if err != nil { return err
return nil, err
} }
decdata, err := meowlib.SymDecrypt(password, m) decdata, err := meowlib.SymDecrypt(password, m)
if err != nil { if err != nil {
return nil, err return err
} }
err = proto.Unmarshal(decdata, &dbm) var dbm meowlib.DbMessage
if err != nil { if err = proto.Unmarshal(decdata, &dbm); err != nil {
return nil, err return err
} }
ium := DbMessageToInternalUserMessage(id, peer.DbIds[fileidx], &dbm)
ium = DbMessageToInternalUserMessage(id, peer.DbIds[fileidx], &dbm)
ium.Dbid = id ium.Dbid = id
ium.Dbfile = peer.DbIds[fileidx] ium.Dbfile = peer.DbIds[fileidx]
messages = append(messages, *ium) messages = append(messages, *ium)
} }
return nil
})
// TODO DB overlap // TODO DB overlap
return messages, nil return messages, err
} }
func GetDbMessage(dbFile string, dbId int64, password string) (*meowlib.DbMessage, error) { func GetDbMessage(dbFile string, dbId int64, password string) (*meowlib.DbMessage, error) {
// There fileidx should provide the db that we need (unless wantMore overlaps the next DB) cfg := GetConfig()
db, err := sql.Open("sqlite3", filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, dbFile+GetConfig().DbSuffix)) // Open the created SQLite dbFile path := filepath.Join(cfg.StoragePath, cfg.GetIdentity().Uuid, dbFile+cfg.DbSuffix)
if err != nil { var dbm meowlib.DbMessage
return nil, err found := false
} err := withDbRead(path, func(db *sql.DB) error {
defer db.Close()
stm, err := db.Prepare("SELECT id, m FROM message WHERE id=?") stm, err := db.Prepare("SELECT id, m FROM message WHERE id=?")
if err != nil { if err != nil {
return nil, err return err
} }
defer stm.Close() defer stm.Close()
rows, err := stm.Query(dbId) rows, err := stm.Query(dbId)
if err != nil { if err != nil {
return nil, err return err
} }
defer rows.Close() defer rows.Close()
var dbm meowlib.DbMessage
found := false
for rows.Next() { for rows.Next() {
found = true found = true
var id int64 var id int64
var m []byte var m []byte
err = rows.Scan(&id, &m) if err = rows.Scan(&id, &m); err != nil {
if err != nil { return err
return nil, err
} }
decdata, err := meowlib.SymDecrypt(password, m) decdata, err := meowlib.SymDecrypt(password, m)
if err != nil { if err != nil {
return nil, err return err
} }
err = proto.Unmarshal(decdata, &dbm) if err = proto.Unmarshal(decdata, &dbm); err != nil {
return err
}
}
return nil
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
}
if !found { if !found {
return nil, fmt.Errorf("message row %d not found in %s", dbId, dbFile) return nil, fmt.Errorf("message row %d not found in %s", dbId, dbFile)
} }
@@ -282,12 +279,8 @@ func GetDbMessage(dbFile string, dbId int64, password string) (*meowlib.DbMessag
} }
func UpdateDbMessage(dbm *meowlib.DbMessage, dbFile string, dbId int64, password string) error { func UpdateDbMessage(dbm *meowlib.DbMessage, dbFile string, dbId int64, password string) error {
db, err := sql.Open("sqlite3", filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, dbFile+GetConfig().DbSuffix)) // Open the created SQLite dbFile cfg := GetConfig()
if err != nil { path := filepath.Join(cfg.StoragePath, cfg.GetIdentity().Uuid, dbFile+cfg.DbSuffix)
return err
}
defer db.Close()
// Encrypt message
out, err := proto.Marshal(dbm) out, err := proto.Marshal(dbm)
if err != nil { if err != nil {
return err return err
@@ -296,20 +289,16 @@ func UpdateDbMessage(dbm *meowlib.DbMessage, dbFile string, dbId int64, password
if err != nil { if err != nil {
return err return err
} }
// Insert message return withDbWrite(path, func(db *sql.DB) error {
updateMessageSQL := `UPDATE message SET m=? WHERE id=?` stmt, err := db.Prepare(`UPDATE message SET m=? WHERE id=?`)
statement, err := db.Prepare(updateMessageSQL) // Prepare statement.
if err != nil { if err != nil {
return err return err
} }
_, err = statement.Exec(encData, dbId) _, err = stmt.Exec(encData, dbId)
if err != nil {
return err return err
} })
return nil
} }
// Get old messages from a peer
func GetMessagePreview(dbFile string, dbId int64, password string) ([]byte, error) { func GetMessagePreview(dbFile string, dbId int64, password string) ([]byte, error) {
dbm, err := GetDbMessage(dbFile, dbId, password) dbm, err := GetDbMessage(dbFile, dbId, password)
if err != nil { if err != nil {
@@ -318,24 +307,15 @@ func GetMessagePreview(dbFile string, dbId int64, password string) ([]byte, erro
return FilePreview(dbm.FilePaths[0], password) return FilePreview(dbm.FilePaths[0], password)
} }
// decrypt the a file and returns the raw content
func FilePreview(filename string, password string) ([]byte, error) { func FilePreview(filename string, password string) ([]byte, error) {
// get the hidden file
encData, err := os.ReadFile(filename) encData, err := os.ReadFile(filename)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// decrypt the file return meowlib.SymDecrypt(password, encData)
data, err := meowlib.SymDecrypt(password, encData)
if err != nil {
return nil, err
}
return data, nil
} }
// return the raw content from the files content (loads the first image, or build a more complex view)
func InternalUserMessagePreview(msg *InternalUserMessage, password string) ([]byte, error) { func InternalUserMessagePreview(msg *InternalUserMessage, password string) ([]byte, error) {
// get the hidden file name
if len(msg.FilePaths) == 0 { if len(msg.FilePaths) == 0 {
return nil, nil return nil, nil
} }
@@ -343,21 +323,16 @@ func InternalUserMessagePreview(msg *InternalUserMessage, password string) ([]by
} }
func getMessageCount(dbid string) (int, error) { func getMessageCount(dbid string) (int, error) {
db, err := sql.Open("sqlite3", filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, dbid+GetConfig().DbSuffix)) // Open the created SQLite File cfg := GetConfig()
if err != nil { path := filepath.Join(cfg.StoragePath, cfg.GetIdentity().Uuid, dbid+cfg.DbSuffix)
return 0, err
}
defer db.Close()
var count int var count int
query := "SELECT COUNT(*) FROM message" err := withDbRead(path, func(db *sql.DB) error {
err = db.QueryRow(query).Scan(&count) return db.QueryRow("SELECT COUNT(*) FROM message").Scan(&count)
if err != nil { })
return 0, err return count, err
}
return count, nil
} }
// SetMessageServerDelivery updates the server delivery UUID and timestamp for an existing stored message. // SetMessageServerDelivery updates the server delivery UUID and timestamp for a stored message.
func SetMessageServerDelivery(dbFile string, dbId int64, serverUid string, receiveTime uint64, password string) error { func SetMessageServerDelivery(dbFile string, dbId int64, serverUid string, receiveTime uint64, password string) error {
dbm, err := GetDbMessage(dbFile, dbId, password) dbm, err := GetDbMessage(dbFile, dbId, password)
if err != nil { if err != nil {
@@ -375,15 +350,16 @@ func FindMessageByUuid(peer *Peer, messageUuid string, password string) (string,
identity := cfg.GetIdentity() identity := cfg.GetIdentity()
for i := len(peer.DbIds) - 1; i >= 0; i-- { for i := len(peer.DbIds) - 1; i >= 0; i-- {
dbid := peer.DbIds[i] dbid := peer.DbIds[i]
db, err := sql.Open("sqlite3", filepath.Join(cfg.StoragePath, identity.Uuid, dbid+GetConfig().DbSuffix)) path := filepath.Join(cfg.StoragePath, identity.Uuid, dbid+cfg.DbSuffix)
if err != nil { var foundFile string
continue var foundId int64
} var foundMsg meowlib.DbMessage
err := withDbRead(path, func(db *sql.DB) error {
rows, err := db.Query("SELECT id, m FROM message ORDER BY id DESC") rows, err := db.Query("SELECT id, m FROM message ORDER BY id DESC")
if err != nil { if err != nil {
db.Close() return err
continue
} }
defer rows.Close()
for rows.Next() { for rows.Next() {
var id int64 var id int64
var m []byte var m []byte
@@ -399,13 +375,17 @@ func FindMessageByUuid(peer *Peer, messageUuid string, password string) (string,
continue continue
} }
if dbm.Status != nil && dbm.Status.Uuid == messageUuid { if dbm.Status != nil && dbm.Status.Uuid == messageUuid {
rows.Close() foundFile = dbid
db.Close() foundId = id
return dbid, id, &dbm, nil foundMsg = dbm
return nil
} }
} }
rows.Close() return nil
db.Close() })
if err == nil && foundFile != "" {
return foundFile, foundId, &foundMsg, nil
}
} }
return "", 0, nil, fmt.Errorf("message with UUID %s not found", messageUuid) return "", 0, nil, fmt.Errorf("message with UUID %s not found", messageUuid)
} }
@@ -430,19 +410,18 @@ func UpdateMessageAck(peer *Peer, messageUuid string, receivedAt uint64, process
} }
func createMessageTable(db *sql.DB) error { func createMessageTable(db *sql.DB) error {
createMessageTableSQL := `CREATE TABLE message ( stmt, err := db.Prepare(`CREATE TABLE message (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"m" BLOB);` // SQL Statement for Create Table "m" BLOB)`)
statement, err := db.Prepare(createMessageTableSQL) // Prepare SQL Statement
if err != nil { if err != nil {
return err return err
} }
statement.Exec() // Execute SQL Statements stmt.Exec()
return nil return nil
} }
func createServerTable(db *sql.DB) error { func createServerTable(db *sql.DB) error {
createServerTableSQL := `CREATE TABLE servers ( stmt, err := db.Prepare(`CREATE TABLE servers (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"country" varchar(2), "country" varchar(2),
"public" bool, "public" bool,
@@ -453,11 +432,10 @@ func createServerTable(db *sql.DB) error {
"name" varchar(255); "name" varchar(255);
"description" varchar(5000) "description" varchar(5000)
"publickey" varchar(10000) "publickey" varchar(10000)
)` // SQL Statement for Create Table )`)
statement, err := db.Prepare(createServerTableSQL) // Prepare SQL Statement
if err != nil { if err != nil {
return err return err
} }
statement.Exec() // Execute SQL Statements stmt.Exec()
return nil return nil
} }
+58 -38
View File
@@ -9,6 +9,7 @@ import (
"errors" "errors"
"path/filepath" "path/filepath"
"sort" "sort"
"sync"
"forge.redroom.link/yves/meowlib" "forge.redroom.link/yves/meowlib"
"github.com/dgraph-io/badger" "github.com/dgraph-io/badger"
@@ -17,11 +18,12 @@ import (
type PeerStorage struct { type PeerStorage struct {
DbFile string `json:"db_file,omitempty"` DbFile string `json:"db_file,omitempty"`
mu sync.RWMutex
db *badger.DB db *badger.DB
cache map[string]*Peer cache map[string]*Peer
} }
// Open the badger database from struct PeerStorage // open opens the Badger database. Caller must hold mu (write).
func (ps *PeerStorage) open() error { func (ps *PeerStorage) open() error {
if ps.DbFile == "" { if ps.DbFile == "" {
ps.DbFile = uuid.New().String() ps.DbFile = uuid.New().String()
@@ -34,20 +36,27 @@ func (ps *PeerStorage) open() error {
opts.Logger = nil opts.Logger = nil
var err error var err error
ps.db, err = badger.Open(opts) ps.db, err = badger.Open(opts)
if err != nil {
return err return err
}
return nil
} }
// Store function StorePeer stores a peer in the badger database with Peer.Uid as key // close closes the Badger database. Caller must hold mu (write).
func (ps *PeerStorage) close() {
ps.db.Close()
}
// StorePeer stores a peer in the Badger database with Peer.Uid as key.
func (ps *PeerStorage) StorePeer(peer *Peer) error { func (ps *PeerStorage) StorePeer(peer *Peer) error {
err := ps.open() ps.mu.Lock()
if err != nil { defer ps.mu.Unlock()
return ps.storePeerLocked(peer)
}
// storePeerLocked is StorePeer without acquiring the lock. Caller must hold mu (write).
func (ps *PeerStorage) storePeerLocked(peer *Peer) error {
if err := ps.open(); err != nil {
return err return err
} }
defer ps.close() defer ps.close()
// first marshal the Peer to bytes with protobuf
jsonsrv, err := json.Marshal(peer) jsonsrv, err := json.Marshal(peer)
if err != nil { if err != nil {
return err return err
@@ -65,26 +74,24 @@ func (ps *PeerStorage) StorePeer(peer *Peer) error {
} }
shakey := sha256.Sum256([]byte(peer.Uid)) shakey := sha256.Sum256([]byte(peer.Uid))
key := shakey[:] key := shakey[:]
// add it to cache
ps.cache[peer.Uid] = peer ps.cache[peer.Uid] = peer
// then store it in the database
return ps.db.Update(func(txn *badger.Txn) error { return ps.db.Update(func(txn *badger.Txn) error {
return txn.Set(key, data) return txn.Set(key, data)
}) })
} }
// LoadPeer function loads a Peer from the badger database with Peer.GetUid() as key // LoadPeer loads a Peer from the Badger database with Peer.GetUid() as key.
func (ps *PeerStorage) LoadPeer(uid string, password string) (*Peer, error) { func (ps *PeerStorage) LoadPeer(uid string, password string) (*Peer, error) {
ps.mu.Lock()
defer ps.mu.Unlock()
var peer Peer var peer Peer
err := ps.open() if err := ps.open(); err != nil {
if err != nil {
return nil, err return nil, err
} }
defer ps.close() defer ps.close()
shakey := sha256.Sum256([]byte(uid)) shakey := sha256.Sum256([]byte(uid))
key := shakey[:] key := shakey[:]
err = ps.db.View(func(txn *badger.Txn) error { err := ps.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(key) item, err := txn.Get(key)
if err != nil { if err != nil {
return err return err
@@ -100,29 +107,35 @@ func (ps *PeerStorage) LoadPeer(uid string, password string) (*Peer, error) {
return &peer, err return &peer, err
} }
// DeletePeer function deletes a Peer from the badger database with Peer.GetUid() as key // DeletePeer deletes a Peer from the Badger database with Peer.GetUid() as key.
func (ps *PeerStorage) DeletePeer(uid string) error { func (ps *PeerStorage) DeletePeer(uid string) error {
err := ps.open() ps.mu.Lock()
if err != nil { defer ps.mu.Unlock()
if err := ps.open(); err != nil {
return err return err
} }
defer ps.close() defer ps.close()
shakey := sha256.Sum256([]byte(uid)) shakey := sha256.Sum256([]byte(uid))
key := shakey[:] key := shakey[:]
return ps.db.Update(func(txn *badger.Txn) error { err := ps.db.Update(func(txn *badger.Txn) error {
return txn.Delete(key) return txn.Delete(key)
}) })
if err == nil {
delete(ps.cache, uid)
}
return err
} }
// LoadPeers function loads Peers from the badger database with a specific password // LoadPeers loads all Peers from the Badger database and populates the cache.
func (ps *PeerStorage) LoadPeers(password string) ([]*Peer, error) { func (ps *PeerStorage) LoadPeers(password string) ([]*Peer, error) {
ps.mu.Lock()
defer ps.mu.Unlock()
var peers []*Peer var peers []*Peer
err := ps.open() if err := ps.open(); err != nil {
if err != nil {
return nil, err return nil, err
} }
defer ps.close() defer ps.close()
err = ps.db.View(func(txn *badger.Txn) error { err := ps.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions opts := badger.DefaultIteratorOptions
opts.PrefetchSize = 10 opts.PrefetchSize = 10
it := txn.NewIterator(opts) it := txn.NewIterator(opts)
@@ -144,32 +157,29 @@ func (ps *PeerStorage) LoadPeers(password string) ([]*Peer, error) {
} }
return nil return nil
}) })
// Sort peers based on peer.Name
sort.Slice(peers, func(i, j int) bool { sort.Slice(peers, func(i, j int) bool {
return peers[i].Name < peers[j].Name return peers[i].Name < peers[j].Name
}) })
return peers, err return peers, err
} }
// GetPeers function returns all peers from the cache as a sorted array // GetPeers returns all peers from the cache as a sorted slice.
func (ps *PeerStorage) GetPeers() ([]*Peer, error) { func (ps *PeerStorage) GetPeers() ([]*Peer, error) {
ps.mu.RLock()
defer ps.mu.RUnlock()
peers := make([]*Peer, 0, len(ps.cache)) peers := make([]*Peer, 0, len(ps.cache))
for _, peer := range ps.cache { for _, peer := range ps.cache {
peers = append(peers, peer) peers = append(peers, peer)
} }
// Sort peers based on peer.Name
sort.Slice(peers, func(i, j int) bool { sort.Slice(peers, func(i, j int) bool {
return peers[i].Name < peers[j].Name return peers[i].Name < peers[j].Name
}) })
return peers, nil return peers, nil
} }
// close the badger database
func (ps *PeerStorage) close() {
ps.db.Close()
}
func (ps *PeerStorage) GetFromPublicKey(publickey string) *Peer { func (ps *PeerStorage) GetFromPublicKey(publickey string) *Peer {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, peer := range ps.cache { for _, peer := range ps.cache {
if peer.ContactPublicKey == publickey { if peer.ContactPublicKey == publickey {
return peer return peer
@@ -179,6 +189,8 @@ func (ps *PeerStorage) GetFromPublicKey(publickey string) *Peer {
} }
func (ps *PeerStorage) GetFromInvitationId(invitationId string) *Peer { func (ps *PeerStorage) GetFromInvitationId(invitationId string) *Peer {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, peer := range ps.cache { for _, peer := range ps.cache {
if peer.InvitationId == invitationId { if peer.InvitationId == invitationId {
return peer return peer
@@ -188,6 +200,8 @@ func (ps *PeerStorage) GetFromInvitationId(invitationId string) *Peer {
} }
func (ps *PeerStorage) GetFromMyLookupKey(publickey string) *Peer { func (ps *PeerStorage) GetFromMyLookupKey(publickey string) *Peer {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, peer := range ps.cache { for _, peer := range ps.cache {
if peer.MyLookupKp.Public == publickey { if peer.MyLookupKp.Public == publickey {
return peer return peer
@@ -197,6 +211,8 @@ func (ps *PeerStorage) GetFromMyLookupKey(publickey string) *Peer {
} }
func (ps *PeerStorage) NameExists(name string) bool { func (ps *PeerStorage) NameExists(name string) bool {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, peer := range ps.cache { for _, peer := range ps.cache {
if peer.Name == name { if peer.Name == name {
return true return true
@@ -206,6 +222,8 @@ func (ps *PeerStorage) NameExists(name string) bool {
} }
func (ps *PeerStorage) GetFromName(name string) *Peer { func (ps *PeerStorage) GetFromName(name string) *Peer {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, peer := range ps.cache { for _, peer := range ps.cache {
if peer.Name == name { if peer.Name == name {
return peer return peer
@@ -215,26 +233,29 @@ func (ps *PeerStorage) GetFromName(name string) *Peer {
} }
func (ps *PeerStorage) GetFromUid(uid string) *Peer { func (ps *PeerStorage) GetFromUid(uid string) *Peer {
ps.mu.RLock()
defer ps.mu.RUnlock()
return ps.cache[uid] return ps.cache[uid]
} }
// Checks if the received contact card is an answer to an invitation, returns true if it is, and the proposed and received nicknames // CheckInvitation checks if the received contact card is an answer to an invitation.
func (ps *PeerStorage) CheckInvitation(ReceivedContact *meowlib.ContactCard) (isAnswer bool, proposedNick string, receivedNick string, invitationMessage string) { func (ps *PeerStorage) CheckInvitation(ReceivedContact *meowlib.ContactCard) (isAnswer bool, proposedNick string, receivedNick string, invitationMessage string) {
// invitation Id found, this is an answer to an invitation ps.mu.RLock()
defer ps.mu.RUnlock()
for _, p := range ps.cache { for _, p := range ps.cache {
if p.InvitationId == ReceivedContact.InvitationId { if p.InvitationId == ReceivedContact.InvitationId {
return true, p.Name, ReceivedContact.Name, ReceivedContact.InvitationMessage return true, p.Name, ReceivedContact.Name, ReceivedContact.InvitationMessage
} }
} }
// it's an invitation
return false, "", ReceivedContact.Name, ReceivedContact.InvitationMessage return false, "", ReceivedContact.Name, ReceivedContact.InvitationMessage
} }
// Finalizes an invitation, returns nil if successful // FinalizeInvitation completes an invitation handshake and persists the updated peer.
func (ps *PeerStorage) FinalizeInvitation(ReceivedContact *meowlib.ContactCard) error { func (ps *PeerStorage) FinalizeInvitation(ReceivedContact *meowlib.ContactCard) error {
ps.mu.Lock()
defer ps.mu.Unlock()
for i, p := range ps.cache { for i, p := range ps.cache {
if p.InvitationId == ReceivedContact.InvitationId { if p.InvitationId == ReceivedContact.InvitationId {
//id.Peers[i].Name = ReceivedContact.Name
ps.cache[i].ContactEncryption = ReceivedContact.EncryptionPublicKey ps.cache[i].ContactEncryption = ReceivedContact.EncryptionPublicKey
ps.cache[i].ContactLookupKey = ReceivedContact.LookupPublicKey ps.cache[i].ContactLookupKey = ReceivedContact.LookupPublicKey
ps.cache[i].ContactPublicKey = ReceivedContact.ContactPublicKey ps.cache[i].ContactPublicKey = ReceivedContact.ContactPublicKey
@@ -246,8 +267,7 @@ func (ps *PeerStorage) FinalizeInvitation(ReceivedContact *meowlib.ContactCard)
srvs = append(srvs, ReceivedContact.PullServers[srv].GetUid()) srvs = append(srvs, ReceivedContact.PullServers[srv].GetUid())
} }
ps.cache[i].ContactPullServers = srvs ps.cache[i].ContactPullServers = srvs
ps.StorePeer(ps.cache[i]) return ps.storePeerLocked(ps.cache[i])
return nil
} }
} }
return errors.New("no matching contact found for invitationId " + ReceivedContact.InvitationId) return errors.New("no matching contact found for invitationId " + ReceivedContact.InvitationId)
+58 -45
View File
@@ -7,6 +7,7 @@ import (
"crypto/sha256" "crypto/sha256"
"encoding/json" "encoding/json"
"path/filepath" "path/filepath"
"sync"
"forge.redroom.link/yves/meowlib" "forge.redroom.link/yves/meowlib"
"github.com/dgraph-io/badger" "github.com/dgraph-io/badger"
@@ -14,30 +15,37 @@ import (
type ServerStorage struct { type ServerStorage struct {
DbFile string `json:"db_file,omitempty"` DbFile string `json:"db_file,omitempty"`
mu sync.Mutex
db *badger.DB db *badger.DB
} }
// Open a badger database from struct ServerStorage // open opens the Badger database. Caller must hold mu.
func (ss *ServerStorage) open() error { func (ss *ServerStorage) open() error {
opts := badger.DefaultOptions(filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, ss.DbFile)) opts := badger.DefaultOptions(filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, ss.DbFile))
opts.Logger = nil opts.Logger = nil
var err error var err error
ss.db, err = badger.Open(opts) ss.db, err = badger.Open(opts)
if err != nil {
return err return err
}
return nil
} }
// Store function StoreServer stores a server in a badger database with Server.GetUid() as key // close closes the Badger database. Caller must hold mu.
func (ss *ServerStorage) close() {
ss.db.Close()
}
// StoreServer stores a server in the Badger database with Server.GetUid() as key.
func (ss *ServerStorage) StoreServer(sc *Server) error { func (ss *ServerStorage) StoreServer(sc *Server) error {
err := ss.open() ss.mu.Lock()
if err != nil { defer ss.mu.Unlock()
return ss.storeServerLocked(sc)
}
// storeServerLocked is StoreServer without acquiring the lock. Caller must hold mu.
func (ss *ServerStorage) storeServerLocked(sc *Server) error {
if err := ss.open(); err != nil {
return err return err
} }
defer ss.close() defer ss.close()
// first marshal the Server to bytes with protobuf
jsonsrv, err := json.Marshal(sc) jsonsrv, err := json.Marshal(sc)
if err != nil { if err != nil {
return err return err
@@ -52,51 +60,56 @@ func (ss *ServerStorage) StoreServer(sc *Server) error {
} }
shakey := sha256.Sum256([]byte(sc.GetServerCard().GetUid())) shakey := sha256.Sum256([]byte(sc.GetServerCard().GetUid()))
key := shakey[:] key := shakey[:]
// then store it in the database
return ss.db.Update(func(txn *badger.Txn) error { return ss.db.Update(func(txn *badger.Txn) error {
return txn.Set(key, data) return txn.Set(key, data)
}) })
} }
// Check if a server exists in a badger database with Server.GetUid() as key // ServerExists checks if a server exists in the Badger database.
func (ss *ServerStorage) ServerExists(sc *Server) (bool, error) { func (ss *ServerStorage) ServerExists(sc *Server) (bool, error) {
err := ss.open() ss.mu.Lock()
if err != nil { defer ss.mu.Unlock()
return ss.serverExistsLocked(sc)
}
// serverExistsLocked is ServerExists without acquiring the lock. Caller must hold mu.
func (ss *ServerStorage) serverExistsLocked(sc *Server) (bool, error) {
if err := ss.open(); err != nil {
return false, err return false, err
} }
defer ss.close() defer ss.close()
shakey := sha256.Sum256([]byte(sc.GetServerCard().GetUid())) shakey := sha256.Sum256([]byte(sc.GetServerCard().GetUid()))
key := shakey[:] key := shakey[:]
// check if key exists in badger database err := ss.db.View(func(txn *badger.Txn) error {
err = ss.db.View(func(txn *badger.Txn) error {
_, err := txn.Get(key) _, err := txn.Get(key)
return err return err
}) // Add a comma here })
if err != nil { // key does not exist if err != nil {
return false, nil return false, nil
} }
return true, nil return true, nil
} }
// Store a server in a badger database with Server.GetUid() as key if it is not already there // StoreServerIfNotExists stores a server only if it is not already present.
func (ss *ServerStorage) StoreServerIfNotExists(sc *Server) error { func (ss *ServerStorage) StoreServerIfNotExists(sc *Server) error {
exists, err := ss.ServerExists(sc) ss.mu.Lock()
defer ss.mu.Unlock()
exists, err := ss.serverExistsLocked(sc)
if err != nil { if err != nil {
return err return err
} }
if !exists { if !exists {
return ss.StoreServer(sc) return ss.storeServerLocked(sc)
} }
return nil return nil
} }
// LoadServer function loads a Server from a badger database with Server.GetUid() as key // LoadServer loads a Server from the Badger database by uid.
func (ss *ServerStorage) LoadServer(uid string) (*Server, error) { func (ss *ServerStorage) LoadServer(uid string) (*Server, error) {
ss.mu.Lock()
defer ss.mu.Unlock()
var sc Server var sc Server
err := ss.open() if err := ss.open(); err != nil {
if err != nil {
return nil, err return nil, err
} }
defer ss.close() defer ss.close()
@@ -122,10 +135,11 @@ func (ss *ServerStorage) LoadServer(uid string) (*Server, error) {
return &sc, err return &sc, err
} }
// DeleteServer function deletes a Server from a badger database with Server.GetUid() as key // DeleteServer deletes a Server from the Badger database by uid.
func (ss *ServerStorage) DeleteServer(uid string) error { func (ss *ServerStorage) DeleteServer(uid string) error {
err := ss.open() ss.mu.Lock()
if err != nil { defer ss.mu.Unlock()
if err := ss.open(); err != nil {
return err return err
} }
defer ss.close() defer ss.close()
@@ -136,11 +150,12 @@ func (ss *ServerStorage) DeleteServer(uid string) error {
}) })
} }
// LoadAllServers function loads all Servers from a badger database // LoadAllServers loads all Servers from the Badger database.
func (ss *ServerStorage) LoadAllServers() ([]*Server, error) { func (ss *ServerStorage) LoadAllServers() ([]*Server, error) {
ss.mu.Lock()
defer ss.mu.Unlock()
var scs []*Server var scs []*Server
err := ss.open() if err := ss.open(); err != nil {
if err != nil {
return nil, err return nil, err
} }
defer ss.close() defer ss.close()
@@ -173,11 +188,12 @@ func (ss *ServerStorage) LoadAllServers() ([]*Server, error) {
return scs, err return scs, err
} }
// LoadAllServers function loads all ServersCards from a badger database // LoadAllServerCards loads all ServerCards from the Badger database.
func (ss *ServerStorage) LoadAllServerCards() ([]*meowlib.ServerCard, error) { func (ss *ServerStorage) LoadAllServerCards() ([]*meowlib.ServerCard, error) {
ss.mu.Lock()
defer ss.mu.Unlock()
var scs []*meowlib.ServerCard var scs []*meowlib.ServerCard
err := ss.open() if err := ss.open(); err != nil {
if err != nil {
return nil, err return nil, err
} }
defer ss.close() defer ss.close()
@@ -210,11 +226,12 @@ func (ss *ServerStorage) LoadAllServerCards() ([]*meowlib.ServerCard, error) {
return scs, err return scs, err
} }
// LoadServersFromUids function loads Servers with id in []Uid parameter from a badger database // LoadServersFromUids loads Servers whose UIDs are in the provided slice.
func (ss *ServerStorage) LoadServersFromUids(uids []string) ([]*Server, error) { func (ss *ServerStorage) LoadServersFromUids(uids []string) ([]*Server, error) {
ss.mu.Lock()
defer ss.mu.Unlock()
var scs []*Server var scs []*Server
err := ss.open() if err := ss.open(); err != nil {
if err != nil {
return nil, err return nil, err
} }
defer ss.close() defer ss.close()
@@ -248,11 +265,12 @@ func (ss *ServerStorage) LoadServersFromUids(uids []string) ([]*Server, error) {
return scs, err return scs, err
} }
// LoadServersFromUids function loads Servers with id in []Uid parameter from a badger database // LoadServerCardsFromUids loads ServerCards whose UIDs are in the provided slice.
func (ss *ServerStorage) LoadServerCardsFromUids(uids []string) ([]*meowlib.ServerCard, error) { func (ss *ServerStorage) LoadServerCardsFromUids(uids []string) ([]*meowlib.ServerCard, error) {
ss.mu.Lock()
defer ss.mu.Unlock()
var scs []*meowlib.ServerCard var scs []*meowlib.ServerCard
err := ss.open() if err := ss.open(); err != nil {
if err != nil {
return nil, err return nil, err
} }
defer ss.close() defer ss.close()
@@ -285,8 +303,3 @@ func (ss *ServerStorage) LoadServerCardsFromUids(uids []string) ([]*meowlib.Serv
}) })
return scs, err return scs, err
} }
// close a badger database
func (ss *ServerStorage) close() {
ss.db.Close()
}
+5
View File
@@ -1,13 +1,18 @@
@startuml General Invitation Steps @startuml General Invitation Steps
InitiatingUser -> InitiatingUser : STEP_1 = Create InivitedUser_Id generate a public key, invitation uid & message for InvitedUser optionnally password protected InitiatingUser -> InitiatingUser : STEP_1 = Create InivitedUser_Id generate a public key, invitation uid & message for InvitedUser optionnally password protected
note right of InitiatingUser #Yellow: Invitee created, only temp key
InitiatingUser -> InvitedUser: STEP_1_SEND= transmit step 1 data (QR Code, Bluetooth, messaging, mail, ...) optional password being tranmitted through another channel InitiatingUser -> InvitedUser: STEP_1_SEND= transmit step 1 data (QR Code, Bluetooth, messaging, mail, ...) optional password being tranmitted through another channel
InvitedUser -> InvitedUser :Create InitatingUser_Id & InvitedUser ContactCard InvitedUser -> InvitedUser :Create InitatingUser_Id & InvitedUser ContactCard
note right of InvitedUser #Yellow: Initiator created, empty
InvitedUser -> InitiatingUser: STEP_2_SEND=transmit InvitedUser ContactCard (QR Codes, Bluetooth, messaging, mail, ...) encrypted with initiating user pub key InvitedUser -> InitiatingUser: STEP_2_SEND=transmit InvitedUser ContactCard (QR Codes, Bluetooth, messaging, mail, ...) encrypted with initiating user pub key
InitiatingUser -> InitiatingUser : STEP_3=InitiatingUser_Id Accept Invitation and create answer (Generate InitiatingUser ContactCard and create finalized InvitedUser contact) InitiatingUser -> InitiatingUser : STEP_3=InitiatingUser_Id Accept Invitation and create answer (Generate InitiatingUser ContactCard and create finalized InvitedUser contact)
note right of InitiatingUser #Lime: Invitee complete
InitiatingUser -> InvitedUser: STEP_3_SEND=Send answer through invited user's message servers from contact card InitiatingUser -> InvitedUser: STEP_3_SEND=Send answer through invited user's message servers from contact card
InvitedUser -> InvitedUser : Finalize InitiatingUser from its ContactCard InvitedUser -> InvitedUser : Finalize InitiatingUser from its ContactCard
note right of InvitedUser #Lime: Initiator complete
InvitedUser -> InitiatingUser: STEP_4= Send confirmation to InitiatingUser that communication is possible through initiating user's message servers from contact card InvitedUser -> InitiatingUser: STEP_4= Send confirmation to InitiatingUser that communication is possible through initiating user's message servers from contact card
@enduml @enduml
+9 -11
View File
@@ -1,11 +1,9 @@
module forge.redroom.link/yves/meowlib module forge.redroom.link/yves/meowlib
go 1.23.1 go 1.25.0
toolchain go1.24.2
require ( require (
github.com/ProtonMail/gopenpgp/v2 v2.8.3 github.com/ProtonMail/gopenpgp/v2 v2.10.0
github.com/awnumar/memguard v0.23.0 github.com/awnumar/memguard v0.23.0
github.com/dgraph-io/badger v1.6.2 github.com/dgraph-io/badger v1.6.2
github.com/go-redis/redis v6.15.9+incompatible github.com/go-redis/redis v6.15.9+incompatible
@@ -16,18 +14,18 @@ require (
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/rs/zerolog v1.34.0 github.com/rs/zerolog v1.34.0
github.com/stretchr/testify v1.9.0 github.com/stretchr/testify v1.9.0
google.golang.org/protobuf v1.36.6 google.golang.org/protobuf v1.36.11
) )
require ( require (
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect
github.com/ProtonMail/go-crypto v1.2.0 // indirect github.com/ProtonMail/go-crypto v1.4.1 // indirect
github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f // indirect github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f // indirect
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect
github.com/alicebob/miniredis v2.5.0+incompatible // indirect github.com/alicebob/miniredis v2.5.0+incompatible // indirect
github.com/awnumar/memcall v0.4.0 // indirect github.com/awnumar/memcall v0.4.0 // indirect
github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash v1.1.0 // indirect
github.com/cloudflare/circl v1.6.1 // indirect github.com/cloudflare/circl v1.6.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgraph-io/ristretto v0.0.2 // indirect github.com/dgraph-io/ristretto v0.0.2 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect
@@ -43,11 +41,11 @@ require (
github.com/status-im/doubleratchet v3.0.0+incompatible // indirect github.com/status-im/doubleratchet v3.0.0+incompatible // indirect
github.com/twitchtv/twirp v8.1.3+incompatible // indirect github.com/twitchtv/twirp v8.1.3+incompatible // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect
golang.org/x/crypto v0.41.0 // indirect golang.org/x/crypto v0.50.0 // indirect
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect
golang.org/x/net v0.42.0 // indirect golang.org/x/net v0.52.0 // indirect
golang.org/x/sys v0.35.0 // indirect golang.org/x/sys v0.43.0 // indirect
golang.org/x/text v0.28.0 // indirect golang.org/x/text v0.36.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect
google.golang.org/grpc v1.62.0 // indirect google.golang.org/grpc v1.62.0 // indirect
+18
View File
@@ -5,10 +5,14 @@ github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/ProtonMail/go-crypto v1.2.0 h1:+PhXXn4SPGd+qk76TlEePBfOfivE0zkWFenhGhFLzWs= github.com/ProtonMail/go-crypto v1.2.0 h1:+PhXXn4SPGd+qk76TlEePBfOfivE0zkWFenhGhFLzWs=
github.com/ProtonMail/go-crypto v1.2.0/go.mod h1:9whxjD8Rbs29b4XWbB8irEcE8KHMqaR2e7GWU1R+/PE= github.com/ProtonMail/go-crypto v1.2.0/go.mod h1:9whxjD8Rbs29b4XWbB8irEcE8KHMqaR2e7GWU1R+/PE=
github.com/ProtonMail/go-crypto v1.4.1 h1:9RfcZHqEQUvP8RzecWEUafnZVtEvrBVL9BiF67IQOfM=
github.com/ProtonMail/go-crypto v1.4.1/go.mod h1:e1OaTyu5SYVrO9gKOEhTc+5UcXtTUa+P3uLudwcgPqo=
github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f h1:tCbYj7/299ekTTXpdwKYF8eBlsYsDVoggDAuAjoK66k= github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f h1:tCbYj7/299ekTTXpdwKYF8eBlsYsDVoggDAuAjoK66k=
github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f/go.mod h1:gcr0kNtGBqin9zDW9GOHcVntrwnjrK+qdJ06mWYBybw= github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f/go.mod h1:gcr0kNtGBqin9zDW9GOHcVntrwnjrK+qdJ06mWYBybw=
github.com/ProtonMail/gopenpgp/v2 v2.8.3 h1:1jHlELwCR00qovx2B50DkL/FjYwt/P91RnlsqeOp2Hs= github.com/ProtonMail/gopenpgp/v2 v2.8.3 h1:1jHlELwCR00qovx2B50DkL/FjYwt/P91RnlsqeOp2Hs=
github.com/ProtonMail/gopenpgp/v2 v2.8.3/go.mod h1:LiuOTbnJit8w9ZzOoLscj0kmdALY7hfoCVh5Qlb0bcg= github.com/ProtonMail/gopenpgp/v2 v2.8.3/go.mod h1:LiuOTbnJit8w9ZzOoLscj0kmdALY7hfoCVh5Qlb0bcg=
github.com/ProtonMail/gopenpgp/v2 v2.10.0 h1:llCzLvntC9+iH+if/na4AgKTef/Zm4vpaRrR3+JdKvo=
github.com/ProtonMail/gopenpgp/v2 v2.10.0/go.mod h1:dc0h9Pg3ftfN0U4pfRzujilfh61A2R52wgMkZWcWm2I=
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 h1:uvdUDbHQHO85qeSydJtItA4T55Pw6BtAejd0APRJOCE= github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 h1:uvdUDbHQHO85qeSydJtItA4T55Pw6BtAejd0APRJOCE=
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI= github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI=
@@ -28,6 +32,8 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cloudflare/circl v1.6.1 h1:zqIqSPIndyBh1bjLVVDHMPpVKqp8Su/V+6MeDzzQBQ0= github.com/cloudflare/circl v1.6.1 h1:zqIqSPIndyBh1bjLVVDHMPpVKqp8Su/V+6MeDzzQBQ0=
github.com/cloudflare/circl v1.6.1/go.mod h1:uddAzsPgqdMAYatqJ0lsjX1oECcQLIlRpzZh3pJrofs= github.com/cloudflare/circl v1.6.1/go.mod h1:uddAzsPgqdMAYatqJ0lsjX1oECcQLIlRpzZh3pJrofs=
github.com/cloudflare/circl v1.6.3 h1:9GPOhQGF9MCYUeXyMYlqTR6a5gTrgR/fBLXvUgtVcg8=
github.com/cloudflare/circl v1.6.3/go.mod h1:2eXP6Qfat4O/Yhh8BznvKnJ+uzEoTQ6jVKJRn81BiS4=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
@@ -88,6 +94,7 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
@@ -248,6 +255,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4=
golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc=
golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI=
golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q=
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 h1:LfspQV/FYTatPTr/3HzIcmiUFH7PGP+OQ6mgDYo3yuQ= golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 h1:LfspQV/FYTatPTr/3HzIcmiUFH7PGP+OQ6mgDYo3yuQ=
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= golang.org/x/exp v0.0.0-20240222234643-814bf88cf225/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
@@ -264,6 +273,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0=
golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -271,6 +282,7 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -294,6 +306,8 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI=
golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
@@ -308,6 +322,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg=
golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
@@ -333,6 +349,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
+6
View File
@@ -38,6 +38,12 @@ func HttpPostMessage(url string, msg []byte, timeout int) (response []byte, err
defer resp.Body.Close() defer resp.Body.Close()
body, err := io.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
// Server already accepted the request (2xx) — body truncation on our
// side doesn't mean the message wasn't stored. Return what we have so
// the caller doesn't retry and produce a duplicate.
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return body, nil
}
return nil, err return nil, err
} }
return body, nil return body, nil
+41 -48
View File
@@ -2,7 +2,6 @@ package server
import ( import (
"context" "context"
"sync"
"time" "time"
"forge.redroom.link/yves/meowlib" "forge.redroom.link/yves/meowlib"
@@ -37,7 +36,7 @@ func NewRedisRouter(server *Identity, redisUrl string, password string, db int,
return &r return &r
} }
func (r *RedisRouter) Route(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { func (r *RedisRouter) Route(ctx context.Context, msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) {
var from_server *meowlib.FromServerMessage var from_server *meowlib.FromServerMessage
// update messages counter // update messages counter
err := r.Client.Incr("statistics:messages:total").Err() err := r.Client.Incr("statistics:messages:total").Err()
@@ -59,10 +58,9 @@ func (r *RedisRouter) Route(msg *meowlib.ToServerMessage) (*meowlib.FromServerMe
if err != nil { if err != nil {
return nil, err return nil, err
} }
if msg.Timeout > 0 { if msg.Timeout > 0 && len(from_server.Chat) == 0 && from_server.Invitation == nil {
logger.Info().Msg("long poll, subscribing for messages") logger.Info().Msg("long poll, subscribing for messages")
// set timeout for the lookup from_server, err = r.subscribe(ctx, msg, int(msg.Timeout))
from_server, err = r.subscribe(msg, int(msg.Timeout))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -206,60 +204,55 @@ func (r *RedisRouter) checkForMessage(msg *meowlib.ToServerMessage) (*meowlib.Fr
return &from_server, nil return &from_server, nil
} }
func goSubscribeAndListen(client *redis.Client, key string, messages chan<- string, wg *sync.WaitGroup, done <-chan struct{}) {
defer wg.Done() func (r *RedisRouter) subscribe(reqCtx context.Context, msg *meowlib.ToServerMessage, timeout int) (*meowlib.FromServerMessage, error) {
pubsub := client.Subscribe("msgch:" + key) if err := r.Client.Incr("statistics:messages:messagessubscription").Err(); err != nil {
return nil, err
}
channels := make([]string, 0, len(msg.PullRequest))
for _, rq := range msg.PullRequest {
channels = append(channels, "msgch:"+rq.LookupKey)
}
// Subscribe before re-checking: any publish that fires after Subscribe()
// returns is buffered in pubsub.Channel(), closing the store→publish→check race.
pubsub := r.Client.Subscribe(channels...)
defer pubsub.Close() defer pubsub.Close()
// Create a new channel for the messages from this subscription // Drain one subscribe-confirmation per channel.
myMessages := make(chan *redis.Message) for range len(channels) {
go func() { if _, err := pubsub.Receive(); err != nil {
for { return nil, err
msg, err := pubsub.ReceiveMessage()
if err != nil {
close(myMessages)
return
} }
myMessages <- msg
} }
}()
// Wait for a message or for the done signal // Re-check now that we are subscribed; catches messages that arrived
select { // between the caller's first checkForMessage and our Subscribe call.
case msg := <-myMessages: fromServer, err := r.checkForMessage(msg)
messages <- msg.Payload
case <-done:
return
}
}
func (r *RedisRouter) subscribe(msg *meowlib.ToServerMessage, timeout int) (*meowlib.FromServerMessage, error) {
var from_server meowlib.FromServerMessage
// update messages counter
err := r.Client.Incr("statistics:messages:messagessubscription").Err()
if err != nil { if err != nil {
return nil, err return nil, err
} }
messages := make(chan string) if len(fromServer.Chat) > 0 || fromServer.Invitation != nil {
var wg sync.WaitGroup return fromServer, nil
done := make(chan struct{})
// extract lookup keys and subscribe
// iterate over pull requests
for _, rq := range msg.PullRequest {
wg.Add(1)
// subscribe to the lookup key
go goSubscribeAndListen(r.Client, rq.LookupKey, messages, &wg, done)
} }
// wait for timeout or message
ctx, cancel := context.WithTimeout(reqCtx, time.Duration(timeout)*time.Second)
defer cancel()
ch := pubsub.Channel()
select { select {
case <-messages: case <-ctx.Done():
close(done) if ctx.Err() == context.DeadlineExceeded {
return r.checkForMessage(msg) return fromServer, nil
case <-time.After(time.Duration(timeout) * time.Second): // 10 seconds timeout }
close(done) return nil, ctx.Err()
case _, ok := <-ch:
if !ok {
return fromServer, nil
}
return r.checkForMessage(msg)
} }
wg.Wait()
return &from_server, nil
} }
func (r *RedisRouter) handleInvitation(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { func (r *RedisRouter) handleInvitation(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) {
+14 -13
View File
@@ -1,6 +1,7 @@
package server package server
import ( import (
"context"
"testing" "testing"
"time" "time"
@@ -33,7 +34,7 @@ func newTestRouter(t *testing.T) (*RedisRouter, *miniredis.Miniredis) {
Addr: mr.Addr(), Addr: mr.Addr(),
}), }),
InvitationTimeout: 3600, InvitationTimeout: 3600,
Context: nil, Context: context.Background(),
} }
// seed the statistics:start key that NewRedisRouter normally sets // seed the statistics:start key that NewRedisRouter normally sets
router.Client.Set("statistics:start", time.Now().UTC().Format(time.RFC3339), 0) router.Client.Set("statistics:start", time.Now().UTC().Format(time.RFC3339), 0)
@@ -219,7 +220,7 @@ func TestRouteDispatchesStoreAndCheck(t *testing.T) {
{Destination: dest, Payload: []byte("routed msg")}, {Destination: dest, Payload: []byte("routed msg")},
}, },
} }
resp, err := router.Route(storeReq) resp, err := router.Route(context.Background(),storeReq)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, "route-store-uuid", resp.UuidAck) assert.Equal(t, "route-store-uuid", resp.UuidAck)
@@ -229,7 +230,7 @@ func TestRouteDispatchesStoreAndCheck(t *testing.T) {
{LookupKey: dest}, {LookupKey: dest},
}, },
} }
resp, err = router.Route(pullReq) resp, err = router.Route(context.Background(),pullReq)
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, resp.Chat, 1) assert.Len(t, resp.Chat, 1)
assert.Equal(t, []byte("routed msg"), resp.Chat[0].Payload) assert.Equal(t, []byte("routed msg"), resp.Chat[0].Payload)
@@ -240,7 +241,7 @@ func TestRouteEmptyMessage(t *testing.T) {
router, mr := newTestRouter(t) router, mr := newTestRouter(t)
defer mr.Close() defer mr.Close()
resp, err := router.Route(&meowlib.ToServerMessage{}) resp, err := router.Route(context.Background(),&meowlib.ToServerMessage{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Nil(t, resp) assert.Nil(t, resp)
} }
@@ -250,9 +251,9 @@ func TestRouteIncrementsTotalCounter(t *testing.T) {
router, mr := newTestRouter(t) router, mr := newTestRouter(t)
defer mr.Close() defer mr.Close()
router.Route(&meowlib.ToServerMessage{}) router.Route(context.Background(),&meowlib.ToServerMessage{})
router.Route(&meowlib.ToServerMessage{}) router.Route(context.Background(),&meowlib.ToServerMessage{})
router.Route(&meowlib.ToServerMessage{}) router.Route(context.Background(),&meowlib.ToServerMessage{})
val, err := router.Client.Get("statistics:messages:total").Int() val, err := router.Client.Get("statistics:messages:total").Int()
assert.NoError(t, err) assert.NoError(t, err)
@@ -553,7 +554,7 @@ func TestRouteMatriochka(t *testing.T) {
Data: []byte("wrapped"), Data: []byte("wrapped"),
}, },
} }
resp, err := router.Route(msg) resp, err := router.Route(context.Background(),msg)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, "route-mtk", resp.UuidAck) assert.Equal(t, "route-mtk", resp.UuidAck)
@@ -577,7 +578,7 @@ func TestRouteInvitation(t *testing.T) {
ShortcodeLen: 6, ShortcodeLen: 6,
}, },
} }
resp, err := router.Route(msg) resp, err := router.Route(context.Background(),msg)
assert.NoError(t, err) assert.NoError(t, err)
assert.NotEmpty(t, resp.Invitation.Shortcode) assert.NotEmpty(t, resp.Invitation.Shortcode)
assert.Len(t, resp.Invitation.Shortcode, 6) assert.Len(t, resp.Invitation.Shortcode, 6)
@@ -594,7 +595,7 @@ func TestStatisticsCountersIncrement(t *testing.T) {
dest := "stats-dest" dest := "stats-dest"
// one store increments usermessages // one store increments usermessages
router.Route(&meowlib.ToServerMessage{ router.Route(context.Background(),&meowlib.ToServerMessage{
Messages: []*meowlib.PackedUserMessage{ Messages: []*meowlib.PackedUserMessage{
{Destination: dest, Payload: []byte("x")}, {Destination: dest, Payload: []byte("x")},
}, },
@@ -603,7 +604,7 @@ func TestStatisticsCountersIncrement(t *testing.T) {
assert.Equal(t, 1, val) assert.Equal(t, 1, val)
// one pull increments messagelookups // one pull increments messagelookups
router.Route(&meowlib.ToServerMessage{ router.Route(context.Background(),&meowlib.ToServerMessage{
PullRequest: []*meowlib.ConversationRequest{ PullRequest: []*meowlib.ConversationRequest{
{LookupKey: dest}, {LookupKey: dest},
}, },
@@ -612,14 +613,14 @@ func TestStatisticsCountersIncrement(t *testing.T) {
assert.Equal(t, 1, val) assert.Equal(t, 1, val)
// one matriochka increments matriochka counter // one matriochka increments matriochka counter
router.Route(&meowlib.ToServerMessage{ router.Route(context.Background(),&meowlib.ToServerMessage{
MatriochkaMessage: &meowlib.Matriochka{Data: []byte("m")}, MatriochkaMessage: &meowlib.Matriochka{Data: []byte("m")},
}) })
val, _ = router.Client.Get("statistics:messages:matriochka").Int() val, _ = router.Client.Get("statistics:messages:matriochka").Int()
assert.Equal(t, 1, val) assert.Equal(t, 1, val)
// one invitation increments invitation counter // one invitation increments invitation counter
router.Route(&meowlib.ToServerMessage{ router.Route(context.Background(),&meowlib.ToServerMessage{
Invitation: &meowlib.Invitation{ Invitation: &meowlib.Invitation{
Step: 1, Step: 1,
Payload: []byte("i"), Payload: []byte("i"),