Compare commits

..

8 Commits

Author SHA1 Message Date
yves 9037a7b3c7 subscriptions cleanup
continuous-integration/drone/push Build is failing
2026-04-21 20:07:35 +02:00
yves ac305eaae0 redis subscriptions fix
continuous-integration/drone/push Build is failing
2026-04-21 19:21:16 +02:00
yves 5d12e0f869 long poll fix
continuous-integration/drone/push Build is failing
2026-04-21 17:00:58 +02:00
yves 8b106db52f duplicate messages send fixes
continuous-integration/drone/push Build is failing
2026-04-21 15:53:56 +02:00
yves 4e78ce5799 reorder received messages
continuous-integration/drone/push Build is failing
2026-04-19 18:36:44 +02:00
yves b9a1233e0e uuid return on create ne message
continuous-integration/drone/push Build is failing
2026-04-18 21:46:28 +02:00
yves a5dd6cd73f delete peer cache fix
continuous-integration/drone/push Build is failing
2026-04-18 20:40:23 +02:00
yves 00e4e6b046 cleaner invitation step messages 2026-04-14 19:12:09 +02:00
21 changed files with 760 additions and 698 deletions
+41 -34
View File
@@ -1,10 +1,12 @@
package helpers
import (
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"sync"
"time"
@@ -14,6 +16,7 @@ import (
invmsgs "forge.redroom.link/yves/meowlib/client/invitation/messages"
invsrv "forge.redroom.link/yves/meowlib/client/invitation/server"
"github.com/google/uuid"
doubleratchet "github.com/status-im/doubleratchet"
"google.golang.org/protobuf/proto"
)
@@ -106,7 +109,7 @@ func PollServer(storage_path string, job *client.RequestsJob, timeout int, longP
// SaveCheckJobs
func SaveCheckJobs() (string, error) {
me := client.GetConfig().GetIdentity()
err := me.SaveBackgroundJob()
err := me.SaveCheckJobs()
if err != nil {
return "CheckMessages: json.Marshal", err
@@ -133,8 +136,12 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error
}
// check if invitation answer (step-2 answer waiting for the initiator)
if fromServerMessage.Invitation != nil {
peer, _, invErr := invmsgs.Step3InitiatorFinalizesInviteeAndCreatesContactCard(fromServerMessage.Invitation)
if invErr == nil && peer != nil {
invBytes, marshalErr := proto.Marshal(fromServerMessage.Invitation)
if marshalErr == nil {
step3Bytes, invErr := invmsgs.Step3InitiatorFinalizesInviteeAndCreatesContactCard(invBytes)
if invErr == nil && step3Bytes != nil {
peer := client.GetConfig().GetIdentity().Peers.GetFromInvitationId(fromServerMessage.Invitation.Uuid)
if peer != nil {
// Auto-send step-3 CC to invitee's servers.
msgs, sendErr := invsrv.Step3PostCard(peer.InvitationId)
if sendErr == nil {
@@ -146,14 +153,29 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error
}
}
}
}
}
// Chat messages
if len(fromServerMessage.Chat) > 0 {
// Sort by DR chain sequence number so messages are decrypted in ratchet order,
// regardless of server delivery order.
sort.SliceStable(fromServerMessage.Chat, func(i, j int) bool {
var hi, hj doubleratchet.MessageHeader
if err := json.Unmarshal(fromServerMessage.Chat[i].DrHeader, &hi); err != nil {
return false
}
if err := json.Unmarshal(fromServerMessage.Chat[j].DrHeader, &hj); err != nil {
return false
}
return hi.N < hj.N
})
for _, packedUserMessage := range fromServerMessage.Chat {
// find the peer with that lookup key
peer := identity.Peers.GetFromMyLookupKey(packedUserMessage.Destination)
if peer == nil {
return nil, nil, "ReadMessage: GetFromMyLookupKey", errors.New("no visible peer for that message")
logger.Error().Str("destination", packedUserMessage.Destination).Msg("ConsumeInboxFile: no visible peer for that message, skipping")
continue
}
// Unpack the message — step-3 messages arrive before the initiator's identity
// key is known, so skip signature verification for pending peers.
@@ -164,12 +186,15 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error
usermsg, err = peer.ProcessInboundUserMessage(packedUserMessage)
}
if err != nil {
return nil, nil, "ReadMessage: ProcessInboundUserMessage", err
}
//return nil, nil, "ReadMessage: ProcessInboundUserMessage", err
logger.Error().Msg("ReadMessage: ProcessInboundUserMessage" + err.Error())
} else {
// Handle invitation step 3: initiator's full ContactCard arriving at the invitee.
if usermsg.Invitation != nil && usermsg.Invitation.Step == 3 {
finalizedPeer, finalErr := invmsgs.Step4InviteeFinalizesInitiator(usermsg)
invBytes, marshalErr := proto.Marshal(usermsg.Invitation)
if marshalErr == nil {
finalizedPeer, finalErr := invmsgs.Step4InviteeFinalizesInitiator(invBytes)
if finalErr == nil && finalizedPeer != nil {
// Auto-send step-4 confirmation to initiator's servers.
step4msgs, sendErr := invsrv.Step4PostConfirmation(finalizedPeer.InvitationId)
@@ -181,6 +206,7 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error
}
}
}
}
continue
}
@@ -257,6 +283,8 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error
}
}
}
err = os.Remove(messageFilename)
if err != nil {
return nil, nil, "ReadMessage: Remove", err
@@ -266,54 +294,33 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error
return messagesOverview, filenames, "", nil
}
// LongPollAllSerevrJobs checks for messages on a all servers defived in job file
// LongPollAllServerJobs checks for messages on all servers defined in job file.
// It returns as soon as any server delivers at least one message, or 0 when all
// polls time out. resultChan is buffered so goroutines never block on write.
func LongPollAllServerJobs(storage_path string, jobs []client.RequestsJob, timeout int, longPoll bool) (int, string, error) {
// Channel to collect results
resultChan := make(chan int, len(jobs))
errChan := make(chan error, len(jobs))
// WaitGroup to sync goroutines
var wg sync.WaitGroup
// Loop through each job (server)
for _, job := range jobs {
wg.Add(1)
go func(job client.RequestsJob) {
defer wg.Done()
// Long-polling call to the server
cnt, _, err := PollServer(storage_path, &job, timeout, true)
if err == nil && cnt > 0 {
select {
case resultChan <- cnt:
default:
}
// Close the error channel to notify all goroutines
close(errChan)
resultChan <- cnt
}
}(job)
}
// Close the result channel when all workers are done
go func() {
wg.Wait()
close(resultChan)
}()
// Wait for the first message or all timeouts
select {
case cnt := <-resultChan:
if cnt, ok := <-resultChan; ok {
return cnt, "", nil
case <-errChan:
// If one fails and exitOnMessage is true
return 0, "", nil
}
return 0, "", nil
}
// sendDeliveryAck builds a delivery acknowledgment for messageUuid and enqueues
+11 -8
View File
@@ -21,33 +21,36 @@ const defaultPostTimeout = 200
// It creates and stores the user message, serialises the packed form to
// storagePath/outbox/{dbFile}_{dbId}, and enqueues a SendJob in
// storagePath/queues/{peerUid}.
func CreateUserMessageAndSendJob(storagePath, message, peerUid, replyToUid string, filelist []string, servers []client.Server, timeout int) error {
packedMsg, dbFile, dbId, errTxt, err := CreateAndStoreUserMessage(message, peerUid, replyToUid, filelist)
func CreateUserMessageAndSendJob(storagePath, message, peerUid, replyToUid string, filelist []string, servers []client.Server, timeout int) (string, error) {
packedMsg, dbFile, dbId, msgUuid, errTxt, err := CreateAndStoreUserMessage(message, peerUid, replyToUid, filelist)
if err != nil {
return fmt.Errorf("%s: %w", errTxt, err)
return "", fmt.Errorf("%s: %w", errTxt, err)
}
data, err := proto.Marshal(packedMsg)
if err != nil {
return fmt.Errorf("CreateUserMessageAndSendJob: proto.Marshal: %w", err)
return "", fmt.Errorf("CreateUserMessageAndSendJob: proto.Marshal: %w", err)
}
outboxDir := filepath.Join(storagePath, "outbox")
if err := os.MkdirAll(outboxDir, 0700); err != nil {
return fmt.Errorf("CreateUserMessageAndSendJob: MkdirAll: %w", err)
return "", fmt.Errorf("CreateUserMessageAndSendJob: MkdirAll: %w", err)
}
outboxFile := filepath.Join(outboxDir, fmt.Sprintf("%s_%d", dbFile, dbId))
if err := os.WriteFile(outboxFile, data, 0600); err != nil {
return fmt.Errorf("CreateUserMessageAndSendJob: WriteFile: %w", err)
return "", fmt.Errorf("CreateUserMessageAndSendJob: WriteFile: %w", err)
}
return client.PushSendJob(storagePath, &client.SendJob{
if err := client.PushSendJob(storagePath, &client.SendJob{
Queue: peerUid,
File: outboxFile,
Servers: servers,
Timeout: timeout,
})
}); err != nil {
return "", err
}
return msgUuid, nil
}
// ProcessSendQueues discovers every queue DB file under storagePath/queues/
+2 -1
View File
@@ -202,7 +202,7 @@ func TestCreateUserMessageAndSendJob(t *testing.T) {
srv := newTestServer(t, "http://test-srv.example")
err := CreateUserMessageAndSendJob(
msgUuid, err := CreateUserMessageAndSendJob(
dir,
"hello from integration",
"peer-create-send",
@@ -212,6 +212,7 @@ func TestCreateUserMessageAndSendJob(t *testing.T) {
60,
)
require.NoError(t, err)
assert.NotEmpty(t, msgUuid, "returned UUID must not be empty")
// A pending job must be in the queue.
job, _, err := client.PeekSendJob(dir, "peer-create-send")
+10 -9
View File
@@ -42,7 +42,7 @@ func PackMessageForServer(packedMsg *meowlib.PackedUserMessage, srvuid string) (
}
func CreateStorePackUserMessageForServer(message string, srvuid string, peer_uid string, replyToUid string, filelist []string) ([]byte, string, error) {
usermessage, _, _, errtxt, err := CreateAndStoreUserMessage(message, peer_uid, replyToUid, filelist)
usermessage, _, _, _, errtxt, err := CreateAndStoreUserMessage(message, peer_uid, replyToUid, filelist)
if err != nil {
return nil, errtxt, err
}
@@ -51,20 +51,20 @@ func CreateStorePackUserMessageForServer(message string, srvuid string, peer_uid
// CreateAndStoreUserMessage creates, signs, and stores an outbound message for
// peer_uid. It returns the packed (encrypted) form ready for server transport,
// the peer DB file UUID (dbFile), the SQLite row ID (dbId), an error context
// string, and any error.
func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid string, filelist []string) (*meowlib.PackedUserMessage, string, int64, string, error) {
// the peer DB file UUID (dbFile), the SQLite row ID (dbId), the message UUID
// (conversation_status uuid), an error context string, and any error.
func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid string, filelist []string) (*meowlib.PackedUserMessage, string, int64, string, string, error) {
peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid)
// Creating User message
usermessage, err := peer.BuildSimpleUserMessage([]byte(message))
if err != nil {
return nil, "", 0, "PrepareServerMessage : BuildSimpleUserMessage", err
return nil, "", 0, "", "PrepareServerMessage : BuildSimpleUserMessage", err
}
for _, file := range filelist {
err = usermessage.AddFile(file, client.GetConfig().Chunksize)
if err != nil {
return nil, "", 0, "PrepareServerMessage : AddFile", err
return nil, "", 0, "", "PrepareServerMessage : AddFile", err
}
}
usermessage.Status.Sent = uint64(time.Now().UTC().Unix())
@@ -73,16 +73,17 @@ func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid strin
// Store message
err = peer.StoreMessage(usermessage, nil)
if err != nil {
return nil, "", 0, "messageBuildPostprocess : StoreMessage", err
return nil, "", 0, "", "messageBuildPostprocess : StoreMessage", err
}
dbFile := peer.LastMessage.Dbfile
dbId := peer.LastMessage.Dbid
msgUuid := usermessage.Status.Uuid
// Prepare cyphered + packed user message
packedMsg, err := peer.ProcessOutboundUserMessage(usermessage)
if err != nil {
return nil, "", 0, "messageBuildPostprocess : ProcessOutboundUserMessage", err
return nil, "", 0, "", "messageBuildPostprocess : ProcessOutboundUserMessage", err
}
// Persist peer to save updated DR state (DrStateJson)
@@ -92,7 +93,7 @@ func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid strin
}
}
return packedMsg, dbFile, dbId, "", nil
return packedMsg, dbFile, dbId, msgUuid, "", nil
}
func BuildReceivedMessage(messageUid string, peer_uid string, received int64) (*meowlib.PackedUserMessage, string, error) {
+4 -1
View File
@@ -425,7 +425,10 @@ func (id *Identity) GetRequestJobs() []RequestsJob {
return list
}
func (id *Identity) SaveBackgroundJob() error {
func (id *Identity) SaveCheckJobs() error {
if id.RootKp == nil {
return errors.New("identity not fully initialized: RootKp is nil")
}
var bj BackgroundJob
bj.Jobs = id.GetRequestJobs()
bj.RootPublic = id.RootKp.Public
+17 -13
View File
@@ -3,28 +3,32 @@ package files
import (
"os"
"forge.redroom.link/yves/meowlib"
"forge.redroom.link/yves/meowlib/client"
"forge.redroom.link/yves/meowlib/client/invitation/messages"
"google.golang.org/protobuf/proto"
)
// Step1Write creates a pending peer and writes the InvitationInitPayload to a file.
// format: "qr" writes a QR-code PNG; anything else writes a compressed binary .mwiv file.
func Step1Write(contactName string, myNickname string, invitationMessage string, serverUids []string, format string) (*client.Peer, error) {
payload, peer, err := messages.Step1InitiatorCreatesInviteeAndTempKey(contactName, myNickname, invitationMessage, serverUids)
func Step1Write(contactName string, myNickname string, invitationMessage string, serverUids []string, format string) error {
payloadBytes, err := messages.Step1InitiatorCreatesInviteeAndTempKey(contactName, myNickname, invitationMessage, serverUids)
if err != nil {
return nil, err
return err
}
var payload meowlib.InvitationInitPayload
if err := proto.Unmarshal(payloadBytes, &payload); err != nil {
return err
}
mynick := myNickname
if mynick == "" {
mynick = client.GetConfig().GetIdentity().Nickname
}
c := client.GetConfig()
if format == "qr" {
filename := c.StoragePath + string(os.PathSeparator) + peer.MyName + "-" + peer.Name + ".png"
if err := payload.WriteQr(filename); err != nil {
return nil, err
filename := c.StoragePath + string(os.PathSeparator) + mynick + "-" + contactName + ".png"
return payload.WriteQr(filename)
}
} else {
filename := c.StoragePath + string(os.PathSeparator) + peer.MyName + "-" + peer.Name + ".mwiv"
if err := payload.WriteCompressed(filename); err != nil {
return nil, err
}
}
return peer, nil
filename := c.StoragePath + string(os.PathSeparator) + mynick + "-" + contactName + ".mwiv"
return payload.WriteCompressed(filename)
}
+9 -21
View File
@@ -12,8 +12,8 @@ import (
)
// Step2ReadAndAnswer reads an InvitationInitPayload from a .mwiv file, creates the
// invitee's peer entry, and writes the encrypted ContactCard (PackedUserMessage) to a
// .mwiv file for the initiator to pick up and process in step 3.
// invitee's peer entry, and writes the serialized Invitation (step=2) to a .mwiv file
// for the initiator to pick up and process in step 3.
func Step2ReadAndAnswer(invitationFile string, nickname string, myNickname string, serverUids []string) error {
if _, err := os.Stat(invitationFile); os.IsNotExist(err) {
return err
@@ -29,35 +29,23 @@ func Step2ReadAndAnswer(invitationFile string, nickname string, myNickname strin
if err != nil {
return err
}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
return err
}
mynick := myNickname
if mynick == "" {
mynick = client.GetConfig().GetIdentity().Nickname
}
packed, peer, err := messages.Step2InviteeCreatesInitiatorAndEncryptedContactCard(payload, nickname, mynick, serverUids)
if err != nil {
return err
}
// Wrap the PackedUserMessage in an Invitation so the initiator (step3) has the
// invitee's public key available for signature verification without an extra file.
packedBytes, err := proto.Marshal(packed)
if err != nil {
return err
}
invitation := &meowlib.Invitation{
Uuid: peer.InvitationId,
Step: 2,
From: peer.MyIdentity.Public,
Payload: packedBytes,
}
out, err := proto.Marshal(invitation)
// messages.Step2 returns a serialized Invitation ready to write directly to file.
invBytes, err := messages.Step2InviteeCreatesInitiatorAndEncryptedContactCard(payloadBytes, nickname, mynick, serverUids)
if err != nil {
return err
}
c := client.GetConfig()
filename := c.StoragePath + string(os.PathSeparator) + mynick + "-" + nickname + ".mwiv"
return os.WriteFile(filename, out, 0600)
return os.WriteFile(filename, invBytes, 0600)
}
+81 -87
View File
@@ -29,114 +29,108 @@ func setupIdentity(t *testing.T, nickname string) (*client.Identity, func()) {
return id, cleanup
}
// TestStep2ProducesPackedUserMessage verifies that Step2 returns a PackedUserMessage
// (not just a peer) and that the message is encrypted with the initiator's temp key
// so Step3 can decrypt it.
func TestStep2ProducesPackedUserMessage(t *testing.T) {
// TestStep1ReturnsBinaryPayload verifies that Step1 returns non-empty bytes that
// deserialise to a valid InvitationInitPayload, and that the pending peer is stored
// with only a temp keypair (no real identity keys yet).
func TestStep1ReturnsBinaryPayload(t *testing.T) {
cfg := client.GetConfig()
cfg.SetMemPass("testpass") //nolint:errcheck
// --- STEP 1: initiator creates temp keypair and payload ---
initiator, cleanInit := setupIdentity(t, "alice")
defer cleanInit()
payload, initPeer, err := messages.Step1InitiatorCreatesInviteeAndTempKey("Bob", "Alice", "Hello!", nil)
step1Bytes, err := messages.Step1InitiatorCreatesInviteeAndTempKey("Bob", "Alice", "Hello!", nil)
require.NoError(t, err)
require.NotNil(t, payload)
require.NotEmpty(t, step1Bytes)
var payload meowlib.InvitationInitPayload
require.NoError(t, proto.Unmarshal(step1Bytes, &payload))
assert.NotEmpty(t, payload.Uuid)
assert.NotEmpty(t, payload.PublicKey)
assert.Equal(t, "Alice", payload.Name)
assert.Equal(t, "Hello!", payload.InvitationMessage)
// Peer is saved with temp keypair only — no real identity keys yet.
initPeer := initiator.Peers.GetFromName("Bob")
require.NotNil(t, initPeer)
// Initiator has only the temp keypair at this stage.
assert.Nil(t, initPeer.MyIdentity)
assert.NotNil(t, initPeer.InvitationKp)
}
// --- STEP 2: invitee receives payload, creates peer, returns packed message ---
_, cleanInvitee := setupIdentity(t, "bob")
// TestFullInvitationFlow runs all four steps end-to-end, passing the binary output of
// each step directly to the next, and verifies that both peers end up with each other's
// real keys after the exchange completes.
func TestFullInvitationFlow(t *testing.T) {
cfg := client.GetConfig()
cfg.SetMemPass("testpass") //nolint:errcheck
// --- STEP 1: initiator creates temp keypair, gets binary payload ---
initiator, cleanInit := setupIdentity(t, "alice2")
defer cleanInit()
step1Bytes, err := messages.Step1InitiatorCreatesInviteeAndTempKey("Bob", "Alice", "", nil)
require.NoError(t, err)
require.NotEmpty(t, step1Bytes)
// --- STEP 2: invitee creates peer, returns serialized Invitation (step=2) ---
invitee, cleanInvitee := setupIdentity(t, "bob2")
defer cleanInvitee()
packed, inviteePeer, err := messages.Step2InviteeCreatesInitiatorAndEncryptedContactCard(
payload, "Alice", "Bob", nil,
)
step2Bytes, err := messages.Step2InviteeCreatesInitiatorAndEncryptedContactCard(step1Bytes, "Alice", "Bob", nil)
require.NoError(t, err)
require.NotNil(t, packed, "step2 must return a PackedUserMessage, not just a peer")
require.NotEmpty(t, step2Bytes, "step2 must return non-empty invitation bytes")
// Invitee now has a peer with full keypairs.
inviteePeer := invitee.Peers.GetFromName("Alice")
require.NotNil(t, inviteePeer)
// The packed message destination is the initiator's temp key (used as lookup key).
assert.Equal(t, payload.PublicKey, packed.Destination)
// The invitee peer has full keypairs now.
assert.NotNil(t, inviteePeer.MyIdentity)
assert.NotNil(t, inviteePeer.MyEncryptionKp)
assert.NotNil(t, inviteePeer.MyLookupKp)
// --- STEP 3: initiator decrypts invitee's packed message and finalises ---
// The step-2 wire format is a serialized Invitation.
var inv2 meowlib.Invitation
require.NoError(t, proto.Unmarshal(step2Bytes, &inv2))
assert.EqualValues(t, 2, inv2.Step)
assert.NotEmpty(t, inv2.Uuid)
assert.Equal(t, inviteePeer.MyIdentity.Public, inv2.From)
assert.NotEmpty(t, inv2.Payload)
// --- STEP 3: initiator decrypts invitee's card, returns serialized Invitation (step=3) ---
cfg.SetIdentity(initiator)
// Simulate how the server delivers the step-2 answer: marshal the PackedUserMessage
// into an Invitation.Payload.
packedBytes, err := proto.Marshal(packed)
step3Bytes, err := messages.Step3InitiatorFinalizesInviteeAndCreatesContactCard(step2Bytes)
require.NoError(t, err)
require.NotEmpty(t, step3Bytes)
invitation := &meowlib.Invitation{
Uuid: payload.Uuid,
Step: 2,
From: inviteePeer.MyIdentity.Public,
Payload: packedBytes,
}
peer, myCC, err := messages.Step3InitiatorFinalizesInviteeAndCreatesContactCard(invitation)
require.NoError(t, err)
require.NotNil(t, peer)
require.NotNil(t, myCC, "step3 must produce the initiator's ContactCard to send to invitee")
// Initiator's peer must now hold invitee's real keys.
assert.Equal(t, inviteePeer.MyIdentity.Public, peer.ContactPublicKey)
assert.Equal(t, inviteePeer.MyEncryptionKp.Public, peer.ContactEncryption)
assert.Equal(t, inviteePeer.MyLookupKp.Public, peer.ContactLookupKey)
assert.Nil(t, peer.InvitationKp, "temp keypair must be cleared after step3")
assert.NotEmpty(t, myCC.DrRootKey)
assert.NotEmpty(t, myCC.DrPublicKey)
}
// TestStep2Step3RoundTripPayload verifies that the PackedUserMessage produced by step2
// actually carries the invitee's ContactCard when decrypted by the initiator.
func TestStep2Step3RoundTripPayload(t *testing.T) {
cfg := client.GetConfig()
cfg.SetMemPass("testpass") //nolint:errcheck
initiator, cleanInit := setupIdentity(t, "alice2")
defer cleanInit()
payload, _, err := messages.Step1InitiatorCreatesInviteeAndTempKey("Bob", "Alice", "", nil)
require.NoError(t, err)
_, cleanInvitee := setupIdentity(t, "bob2")
defer cleanInvitee()
packed, inviteePeer, err := messages.Step2InviteeCreatesInitiatorAndEncryptedContactCard(payload, "Alice", "Bob", nil)
require.NoError(t, err)
// Confirm the message serialises cleanly (transport simulation).
packedBytes, err := proto.Marshal(packed)
require.NoError(t, err)
assert.NotEmpty(t, packedBytes)
// Switch back to initiator and run step3.
cfg.SetIdentity(initiator)
var roundTripped meowlib.PackedUserMessage
require.NoError(t, proto.Unmarshal(packedBytes, &roundTripped))
invitation := &meowlib.Invitation{
Uuid: payload.Uuid,
Step: 2,
From: inviteePeer.MyIdentity.Public,
Payload: packedBytes,
}
_, myCC, err := messages.Step3InitiatorFinalizesInviteeAndCreatesContactCard(invitation)
require.NoError(t, err)
require.NotNil(t, myCC)
// The initiator's CC must reference the invitee's invitation ID so the invitee
// can match it when step4 arrives.
assert.Equal(t, payload.Uuid, myCC.InvitationId)
// Initiator's peer must now hold invitee's real keys; temp keypair must be gone.
initPeer := initiator.Peers.GetFromName("Bob")
require.NotNil(t, initPeer)
assert.Equal(t, inviteePeer.MyIdentity.Public, initPeer.ContactPublicKey)
assert.Equal(t, inviteePeer.MyEncryptionKp.Public, initPeer.ContactEncryption)
assert.Equal(t, inviteePeer.MyLookupKp.Public, initPeer.ContactLookupKey)
assert.Nil(t, initPeer.InvitationKp, "temp keypair must be cleared after step3")
assert.NotEmpty(t, initPeer.DrKpPublic)
assert.NotEmpty(t, initPeer.DrRootKey)
// The step-3 wire format is a serialized Invitation.
var inv3 meowlib.Invitation
require.NoError(t, proto.Unmarshal(step3Bytes, &inv3))
assert.EqualValues(t, 3, inv3.Step)
assert.NotEmpty(t, inv3.Uuid)
assert.NotEmpty(t, inv3.Payload)
// --- STEP 4: invitee finalises initiator ---
cfg.SetIdentity(invitee)
finalPeer, err := messages.Step4InviteeFinalizesInitiator(step3Bytes)
require.NoError(t, err)
require.NotNil(t, finalPeer)
// Invitee's peer must now hold initiator's real keys and the invitation must be complete.
assert.Equal(t, initPeer.MyIdentity.Public, finalPeer.ContactPublicKey)
assert.Equal(t, initPeer.MyEncryptionKp.Public, finalPeer.ContactEncryption)
assert.Equal(t, initPeer.MyLookupKp.Public, finalPeer.ContactLookupKey)
assert.False(t, finalPeer.InvitationPending(), "invitation must be fully finalized")
assert.NotEmpty(t, finalPeer.DrRootKey)
assert.NotEmpty(t, finalPeer.ContactDrPublicKey)
}
+8 -7
View File
@@ -1,22 +1,23 @@
package messages
import (
"forge.redroom.link/yves/meowlib"
"forge.redroom.link/yves/meowlib/client"
"google.golang.org/protobuf/proto"
)
// Step1InitiatorCreatesInviteeAndTempKey creates a minimal pending peer and a temporary
// keypair, and returns the InvitationInitPayload to be transmitted to the invitee
// via any transport (file, QR, server…).
func Step1InitiatorCreatesInviteeAndTempKey(contactName string, myNickname string, invitationMessage string, serverUids []string) (*meowlib.InvitationInitPayload, *client.Peer, error) {
// keypair, and returns the serialized InvitationInitPayload bytes to be transmitted to
// the invitee via any transport (file, QR, server…). The peer is already persisted by
// InvitationStep1 so no peer reference is returned.
func Step1InitiatorCreatesInviteeAndTempKey(contactName string, myNickname string, invitationMessage string, serverUids []string) ([]byte, error) {
mynick := myNickname
if mynick == "" {
mynick = client.GetConfig().GetIdentity().Nickname
}
payload, peer, err := client.GetConfig().GetIdentity().InvitationStep1(mynick, contactName, serverUids, invitationMessage)
payload, _, err := client.GetConfig().GetIdentity().InvitationStep1(mynick, contactName, serverUids, invitationMessage)
if err != nil {
return nil, nil, err
return nil, err
}
client.GetConfig().GetIdentity().Save()
return payload, peer, nil
return proto.Marshal(payload)
}
+26 -12
View File
@@ -3,31 +3,45 @@ package messages
import (
"forge.redroom.link/yves/meowlib"
"forge.redroom.link/yves/meowlib/client"
"google.golang.org/protobuf/proto"
)
// Step2InviteeCreatesInitiatorAndEncryptedContactCard creates the invitee's peer entry
// from an InvitationInitPayload, then builds the invitee's ContactCard and returns it
// as a PackedUserMessage asymmetrically encrypted with the initiator's temporary public
// key. The packed message is ready to be transmitted to the initiator via any transport
// (file, QR, server…); Step3InitiatorFinalizesInviteeAndCreatesContactCard on the
// initiator side will decrypt and process it.
func Step2InviteeCreatesInitiatorAndEncryptedContactCard(payload *meowlib.InvitationInitPayload, nickname string, myNickname string, serverUids []string) (*meowlib.PackedUserMessage, *client.Peer, error) {
// Step2InviteeCreatesInitiatorAndEncryptedContactCard deserialises the step-1 payload bytes,
// creates the invitee's peer entry, builds and encrypts the invitee's ContactCard, and returns
// a serialized Invitation (step=2) whose Payload is the PackedUserMessage encrypted with the
// initiator's temporary public key. The bytes are transport-ready and consumed directly by
// Step3InitiatorFinalizesInviteeAndCreatesContactCard.
func Step2InviteeCreatesInitiatorAndEncryptedContactCard(payloadBytes []byte, nickname string, myNickname string, serverUids []string) ([]byte, error) {
mynick := myNickname
if mynick == "" {
mynick = client.GetConfig().GetIdentity().Nickname
}
peer, err := client.GetConfig().GetIdentity().InvitationStep2(mynick, nickname, serverUids, payload)
var payload meowlib.InvitationInitPayload
if err := proto.Unmarshal(payloadBytes, &payload); err != nil {
return nil, err
}
peer, err := client.GetConfig().GetIdentity().InvitationStep2(mynick, nickname, serverUids, &payload)
if err != nil {
return nil, nil, err
return nil, err
}
usermsg, err := peer.BuildInvitationStep2Message(peer.GetMyContact())
if err != nil {
return nil, nil, err
return nil, err
}
packed, err := peer.ProcessOutboundUserMessage(usermsg)
if err != nil {
return nil, nil, err
return nil, err
}
packedBytes, err := proto.Marshal(packed)
if err != nil {
return nil, err
}
inv := &meowlib.Invitation{
Uuid: payload.Uuid,
Step: 2,
From: peer.MyIdentity.Public,
Payload: packedBytes,
}
client.GetConfig().GetIdentity().Save()
return packed, peer, nil
return proto.Marshal(inv)
}
+29 -13
View File
@@ -8,39 +8,55 @@ import (
"google.golang.org/protobuf/proto"
)
// Step3InitiatorFinalizesInviteeAndCreatesContactCard is called by the initiator when a
// step-2 answer (invitee's encrypted ContactCard) arrives. It decrypts the card, upgrades
// the invitee's peer entry with the real keys, and returns the initiator's own ContactCard
// ready to be sent to the invitee via any transport.
func Step3InitiatorFinalizesInviteeAndCreatesContactCard(invitation *meowlib.Invitation) (*client.Peer, *meowlib.ContactCard, error) {
// Step3InitiatorFinalizesInviteeAndCreatesContactCard is called by the initiator when the
// step-2 answer (serialized Invitation bytes) arrives. It decrypts the invitee's ContactCard,
// upgrades the pending peer with the invitee's real keys, and returns a serialized Invitation
// (step=3) whose Payload is the initiator's ContactCard, ready to be consumed directly by
// Step4InviteeFinalizesInitiator on the invitee side.
func Step3InitiatorFinalizesInviteeAndCreatesContactCard(invitationBytes []byte) ([]byte, error) {
var invitation meowlib.Invitation
if err := proto.Unmarshal(invitationBytes, &invitation); err != nil {
return nil, err
}
var invitationAnswer meowlib.PackedUserMessage
if err := proto.Unmarshal(invitation.Payload, &invitationAnswer); err != nil {
return nil, nil, err
return nil, err
}
peer := client.GetConfig().GetIdentity().Peers.GetFromInvitationId(invitation.Uuid)
if peer == nil {
return nil, nil, errors.New("no peer for invitation uuid " + invitation.Uuid)
return nil, errors.New("no peer for invitation uuid " + invitation.Uuid)
}
// Guard against duplicate delivery (e.g., same answer from multiple servers).
if peer.InvitationKp == nil {
return nil, nil, nil
return nil, nil
}
usermsg, err := peer.ProcessInboundStep2UserMessage(&invitationAnswer, invitation.From)
if err != nil {
return nil, nil, err
return nil, err
}
var inviteeCC meowlib.ContactCard
if err := proto.Unmarshal(usermsg.Invitation.Payload, &inviteeCC); err != nil {
return nil, nil, err
return nil, err
}
myCC, peer, err := client.GetConfig().GetIdentity().InvitationStep3(&inviteeCC)
myCC, _, err := client.GetConfig().GetIdentity().InvitationStep3(&inviteeCC)
if err != nil {
return nil, nil, err
return nil, err
}
client.GetConfig().GetIdentity().Save()
return peer, myCC, nil
ccBytes, err := proto.Marshal(myCC)
if err != nil {
return nil, err
}
inv := &meowlib.Invitation{
Uuid: myCC.InvitationId,
Step: 3,
Payload: ccBytes,
}
return proto.Marshal(inv)
}
+11 -13
View File
@@ -1,27 +1,25 @@
package messages
import (
"errors"
"forge.redroom.link/yves/meowlib"
"forge.redroom.link/yves/meowlib/client"
"google.golang.org/protobuf/proto"
)
// Step4InviteeFinalizesInitiator is called by the invitee's message processor when a
// UserMessage with invitation.step == 3 arrives. It unmarshals the initiator's ContactCard
// and completes the invitee's peer entry with the initiator's real keys.
func Step4InviteeFinalizesInitiator(usermsg *meowlib.UserMessage) (*client.Peer, error) {
if usermsg.Invitation == nil || usermsg.Invitation.Step != 3 {
return nil, errors.New("expected invitation step 3")
}
var initiatorCC meowlib.ContactCard
if err := proto.Unmarshal(usermsg.Invitation.Payload, &initiatorCC); err != nil {
// Step4InviteeFinalizesInitiator is called by the invitee when the step-3 answer
// (serialized Invitation bytes) arrives. It unmarshals the initiator's ContactCard and
// completes the invitee's peer entry with the initiator's real keys.
func Step4InviteeFinalizesInitiator(invitationBytes []byte) (*client.Peer, error) {
var inv meowlib.Invitation
if err := proto.Unmarshal(invitationBytes, &inv); err != nil {
return nil, err
}
var initiatorCC meowlib.ContactCard
if err := proto.Unmarshal(inv.Payload, &initiatorCC); err != nil {
return nil, err
}
// Patch the invitation ID from the outer message in case it was not set in the CC.
if initiatorCC.InvitationId == "" {
initiatorCC.InvitationId = usermsg.Invitation.Uuid
initiatorCC.InvitationId = inv.Uuid
}
if err := client.GetConfig().GetIdentity().InvitationStep4(&initiatorCC); err != nil {
return nil, err
+156 -178
View File
@@ -6,6 +6,7 @@ import (
"math"
"os"
"path/filepath"
"sync"
"forge.redroom.link/yves/meowlib"
"github.com/google/uuid"
@@ -13,71 +14,82 @@ import (
"google.golang.org/protobuf/proto"
)
func storeMessage(peer *Peer, usermessage *meowlib.UserMessage, filenames []string, password string) error {
var dbid string
cfg := GetConfig()
identity := cfg.GetIdentity()
// If no db/no ID create DB + Tablz
// TODO : if file size > X new db
if len(peer.DbIds) == 0 {
dbid = uuid.NewString()
peer.DbIds = []string{dbid}
// One RWMutex per SQLite file path. Entries are never deleted (bounded by
// peer count, which is small). RLock for reads, Lock for writes.
var dbFileMu sync.Map
identity.Peers.StorePeer(peer)
identity.CreateFolder()
file, err := os.Create(filepath.Join(cfg.StoragePath, identity.Uuid, dbid+GetConfig().DbSuffix))
if err != nil {
return err
func getDbFileMutex(path string) *sync.RWMutex {
v, _ := dbFileMu.LoadOrStore(path, &sync.RWMutex{})
return v.(*sync.RWMutex)
}
file.Close()
sqliteDatabase, err := sql.Open("sqlite3", filepath.Join(cfg.StoragePath, identity.Uuid, dbid+GetConfig().DbSuffix))
if err != nil {
return err
}
defer sqliteDatabase.Close()
err = createMessageTable(sqliteDatabase)
if err != nil {
return err
}
sqliteDatabase.Close()
} else {
dbid = peer.DbIds[len(peer.DbIds)-1]
}
// Open Db
db, err := sql.Open("sqlite3", filepath.Join(cfg.StoragePath, identity.Uuid, dbid+GetConfig().DbSuffix)) // Open the created SQLite File
func withDbWrite(path string, fn func(*sql.DB) error) error {
mu := getDbFileMutex(path)
mu.Lock()
defer mu.Unlock()
db, err := sql.Open("sqlite3", path)
if err != nil {
return err
}
defer db.Close()
// Detach Files
return fn(db)
}
func withDbRead(path string, fn func(*sql.DB) error) error {
mu := getDbFileMutex(path)
mu.RLock()
defer mu.RUnlock()
db, err := sql.Open("sqlite3", path)
if err != nil {
return err
}
defer db.Close()
return fn(db)
}
func dbPath(cfg *Config, identity *Identity, dbid string) string {
return filepath.Join(cfg.StoragePath, identity.Uuid, dbid+cfg.DbSuffix)
}
func storeMessage(peer *Peer, usermessage *meowlib.UserMessage, filenames []string, password string) error {
cfg := GetConfig()
identity := cfg.GetIdentity()
isNew := len(peer.DbIds) == 0
var dbid string
if isNew {
dbid = uuid.NewString()
peer.DbIds = []string{dbid}
identity.Peers.StorePeer(peer)
identity.CreateFolder()
} else {
dbid = peer.DbIds[len(peer.DbIds)-1]
}
// Detach file attachments — no DB lock needed for file I/O.
hiddenFilenames := []string{}
if len(usermessage.Files) > 0 {
secureDir := filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles")
if _, err := os.Stat(secureDir); os.IsNotExist(err) {
if err = os.MkdirAll(secureDir, 0755); err != nil {
return err
}
}
for _, f := range usermessage.Files {
hiddenFilename := uuid.NewString()
// Cypher file
encData, err := meowlib.SymEncrypt(password, f.Data)
if err != nil {
return err
}
if _, err := os.Stat(filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles")); os.IsNotExist(err) {
err = os.MkdirAll(filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles"), 0755)
if err != nil {
return err
hidden := filepath.Join(secureDir, hiddenFilename)
os.WriteFile(hidden, encData, 0600)
hiddenFilenames = append(hiddenFilenames, hidden)
f.Data = []byte(hidden)
}
}
os.WriteFile(filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles", hiddenFilename), encData, 0600)
hiddenFilenames = append(hiddenFilenames, filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles", hiddenFilename))
// replace f.Data by uuid filename
f.Data = []byte(filepath.Join(cfg.StoragePath, identity.Uuid, "securefiles", hiddenFilename))
}
}
outbound := true
if usermessage.From == peer.ContactPublicKey {
outbound = false
}
// Convert UserMessage to DbMessage
outbound := usermessage.From != peer.ContactPublicKey
dbm := UserMessageToDbMessage(outbound, usermessage, hiddenFilenames)
// Encrypt message
out, err := proto.Marshal(dbm)
if err != nil {
return err
@@ -86,98 +98,94 @@ func storeMessage(peer *Peer, usermessage *meowlib.UserMessage, filenames []stri
if err != nil {
return err
}
// Insert message
insertMessageSQL := `INSERT INTO message(m) VALUES (?) RETURNING ID`
statement, err := db.Prepare(insertMessageSQL) // Prepare statement.
var id int64
path := dbPath(cfg, identity, dbid)
err = withDbWrite(path, func(db *sql.DB) error {
// SQLite creates the file on first Open; create the table if new DB.
if isNew {
if err := createMessageTable(db); err != nil {
return err
}
}
stmt, err := db.Prepare(`INSERT INTO message(m) VALUES (?) RETURNING ID`)
if err != nil {
return err
}
result, err := statement.Exec(encData)
result, err := stmt.Exec(encData)
if err != nil {
return err
}
id, err := result.LastInsertId()
id, err = result.LastInsertId()
return err
})
if err != nil {
return err
}
ium := DbMessageToInternalUserMessage(id, dbid, dbm)
peer.LastMessage = ium
peer.LastMessage = DbMessageToInternalUserMessage(id, dbid, dbm)
identity.Peers.StorePeer(peer)
return nil
}
// Get new messages from a peer
func loadNewMessages(peer *Peer, lastDbId int, password string) ([]*InternalUserMessage, error) {
var messages []*InternalUserMessage
cfg := GetConfig()
identity := cfg.GetIdentity()
// handle no db yet
if len(peer.DbIds) == 0 {
return messages, nil
}
fileidx := len(peer.DbIds) - 1
// There fileidx should provide the db that we need (unless wantMore overlaps the next DB)
db, err := sql.Open("sqlite3", filepath.Join(cfg.StoragePath, identity.Uuid, peer.DbIds[fileidx]+GetConfig().DbSuffix)) // Open the created SQLite File
if err != nil {
return nil, err
}
defer db.Close()
// if it's first app query, it won't hold a lastIndex, so let's start from end
if lastDbId == 0 {
lastDbId = math.MaxInt64
}
err := withDbRead(dbPath(cfg, identity, peer.DbIds[fileidx]), func(db *sql.DB) error {
stm, err := db.Prepare("SELECT id, m FROM message WHERE id > ? ORDER BY id DESC")
if err != nil {
return nil, err
return err
}
defer stm.Close()
rows, err := stm.Query(lastDbId)
if err != nil {
return nil, err
return err
}
defer rows.Close()
for rows.Next() {
var ium *InternalUserMessage
var dbm meowlib.DbMessage
var id int64
var m []byte
err = rows.Scan(&id, &m)
if err != nil {
return nil, err
if err = rows.Scan(&id, &m); err != nil {
return err
}
decdata, err := meowlib.SymDecrypt(password, m)
if err != nil {
return nil, err
return err
}
err = proto.Unmarshal(decdata, &dbm)
if err != nil {
return nil, err
var dbm meowlib.DbMessage
if err = proto.Unmarshal(decdata, &dbm); err != nil {
return err
}
ium = DbMessageToInternalUserMessage(id, peer.DbIds[fileidx], &dbm)
ium := DbMessageToInternalUserMessage(id, peer.DbIds[fileidx], &dbm)
ium.Dbid = id
ium.Dbfile = peer.DbIds[fileidx]
messages = append(messages, ium)
}
return nil
})
// TODO DB overlap
return messages, nil
return messages, err
}
// Get old messages from a peer
func loadMessagesHistory(peer *Peer, inAppMsgCount int, lastDbId int, wantMore int, password string) ([]InternalUserMessage, error) {
var messages []InternalUserMessage
// handle no db yet
cfg := GetConfig()
if len(peer.DbIds) == 0 {
return messages, nil
}
fileidx := len(peer.DbIds) - 1
// initialize count with last db message count
countStack, err := getMessageCount(peer.DbIds[fileidx])
if err != nil {
return nil, err
}
// while the db message count < what we already have in app, step to next db file
for inAppMsgCount > countStack {
fileidx--
if fileidx < 0 {
@@ -189,92 +197,81 @@ func loadMessagesHistory(peer *Peer, inAppMsgCount int, lastDbId int, wantMore i
}
countStack += newCount
}
// There fileidx should provide the db that we need (unless wantMore overlaps the next DB)
db, err := sql.Open("sqlite3", filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, peer.DbIds[fileidx]+GetConfig().DbSuffix)) // Open the created SQLite File
if err != nil {
return nil, err
}
defer db.Close()
// if it's first app query, it won't hold a lastIndex, so let's start from end
if lastDbId == 0 {
lastDbId = math.MaxInt64
}
err = withDbRead(filepath.Join(cfg.StoragePath, cfg.GetIdentity().Uuid, peer.DbIds[fileidx]+cfg.DbSuffix), func(db *sql.DB) error {
stm, err := db.Prepare("SELECT id, m FROM message WHERE id < ? ORDER BY id DESC LIMIT ?")
if err != nil {
return nil, err
return err
}
defer stm.Close()
rows, err := stm.Query(lastDbId, wantMore)
if err != nil {
return nil, err
return err
}
defer rows.Close()
for rows.Next() {
var ium *InternalUserMessage
var dbm meowlib.DbMessage
var id int64
var m []byte
err = rows.Scan(&id, &m)
if err != nil {
return nil, err
if err = rows.Scan(&id, &m); err != nil {
return err
}
decdata, err := meowlib.SymDecrypt(password, m)
if err != nil {
return nil, err
return err
}
err = proto.Unmarshal(decdata, &dbm)
if err != nil {
return nil, err
var dbm meowlib.DbMessage
if err = proto.Unmarshal(decdata, &dbm); err != nil {
return err
}
ium = DbMessageToInternalUserMessage(id, peer.DbIds[fileidx], &dbm)
ium := DbMessageToInternalUserMessage(id, peer.DbIds[fileidx], &dbm)
ium.Dbid = id
ium.Dbfile = peer.DbIds[fileidx]
messages = append(messages, *ium)
}
return nil
})
// TODO DB overlap
return messages, nil
return messages, err
}
func GetDbMessage(dbFile string, dbId int64, password string) (*meowlib.DbMessage, error) {
// There fileidx should provide the db that we need (unless wantMore overlaps the next DB)
db, err := sql.Open("sqlite3", filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, dbFile+GetConfig().DbSuffix)) // Open the created SQLite dbFile
if err != nil {
return nil, err
}
defer db.Close()
cfg := GetConfig()
path := filepath.Join(cfg.StoragePath, cfg.GetIdentity().Uuid, dbFile+cfg.DbSuffix)
var dbm meowlib.DbMessage
found := false
err := withDbRead(path, func(db *sql.DB) error {
stm, err := db.Prepare("SELECT id, m FROM message WHERE id=?")
if err != nil {
return nil, err
return err
}
defer stm.Close()
rows, err := stm.Query(dbId)
if err != nil {
return nil, err
return err
}
defer rows.Close()
var dbm meowlib.DbMessage
found := false
for rows.Next() {
found = true
var id int64
var m []byte
err = rows.Scan(&id, &m)
if err != nil {
return nil, err
if err = rows.Scan(&id, &m); err != nil {
return err
}
decdata, err := meowlib.SymDecrypt(password, m)
if err != nil {
return nil, err
return err
}
err = proto.Unmarshal(decdata, &dbm)
if err = proto.Unmarshal(decdata, &dbm); err != nil {
return err
}
}
return nil
})
if err != nil {
return nil, err
}
}
if !found {
return nil, fmt.Errorf("message row %d not found in %s", dbId, dbFile)
}
@@ -282,12 +279,8 @@ func GetDbMessage(dbFile string, dbId int64, password string) (*meowlib.DbMessag
}
func UpdateDbMessage(dbm *meowlib.DbMessage, dbFile string, dbId int64, password string) error {
db, err := sql.Open("sqlite3", filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, dbFile+GetConfig().DbSuffix)) // Open the created SQLite dbFile
if err != nil {
return err
}
defer db.Close()
// Encrypt message
cfg := GetConfig()
path := filepath.Join(cfg.StoragePath, cfg.GetIdentity().Uuid, dbFile+cfg.DbSuffix)
out, err := proto.Marshal(dbm)
if err != nil {
return err
@@ -296,20 +289,16 @@ func UpdateDbMessage(dbm *meowlib.DbMessage, dbFile string, dbId int64, password
if err != nil {
return err
}
// Insert message
updateMessageSQL := `UPDATE message SET m=? WHERE id=?`
statement, err := db.Prepare(updateMessageSQL) // Prepare statement.
return withDbWrite(path, func(db *sql.DB) error {
stmt, err := db.Prepare(`UPDATE message SET m=? WHERE id=?`)
if err != nil {
return err
}
_, err = statement.Exec(encData, dbId)
if err != nil {
_, err = stmt.Exec(encData, dbId)
return err
}
return nil
})
}
// Get old messages from a peer
func GetMessagePreview(dbFile string, dbId int64, password string) ([]byte, error) {
dbm, err := GetDbMessage(dbFile, dbId, password)
if err != nil {
@@ -318,24 +307,15 @@ func GetMessagePreview(dbFile string, dbId int64, password string) ([]byte, erro
return FilePreview(dbm.FilePaths[0], password)
}
// decrypt the a file and returns the raw content
func FilePreview(filename string, password string) ([]byte, error) {
// get the hidden file
encData, err := os.ReadFile(filename)
if err != nil {
return nil, err
}
// decrypt the file
data, err := meowlib.SymDecrypt(password, encData)
if err != nil {
return nil, err
}
return data, nil
return meowlib.SymDecrypt(password, encData)
}
// return the raw content from the files content (loads the first image, or build a more complex view)
func InternalUserMessagePreview(msg *InternalUserMessage, password string) ([]byte, error) {
// get the hidden file name
if len(msg.FilePaths) == 0 {
return nil, nil
}
@@ -343,21 +323,16 @@ func InternalUserMessagePreview(msg *InternalUserMessage, password string) ([]by
}
func getMessageCount(dbid string) (int, error) {
db, err := sql.Open("sqlite3", filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, dbid+GetConfig().DbSuffix)) // Open the created SQLite File
if err != nil {
return 0, err
}
defer db.Close()
cfg := GetConfig()
path := filepath.Join(cfg.StoragePath, cfg.GetIdentity().Uuid, dbid+cfg.DbSuffix)
var count int
query := "SELECT COUNT(*) FROM message"
err = db.QueryRow(query).Scan(&count)
if err != nil {
return 0, err
}
return count, nil
err := withDbRead(path, func(db *sql.DB) error {
return db.QueryRow("SELECT COUNT(*) FROM message").Scan(&count)
})
return count, err
}
// SetMessageServerDelivery updates the server delivery UUID and timestamp for an existing stored message.
// SetMessageServerDelivery updates the server delivery UUID and timestamp for a stored message.
func SetMessageServerDelivery(dbFile string, dbId int64, serverUid string, receiveTime uint64, password string) error {
dbm, err := GetDbMessage(dbFile, dbId, password)
if err != nil {
@@ -375,15 +350,16 @@ func FindMessageByUuid(peer *Peer, messageUuid string, password string) (string,
identity := cfg.GetIdentity()
for i := len(peer.DbIds) - 1; i >= 0; i-- {
dbid := peer.DbIds[i]
db, err := sql.Open("sqlite3", filepath.Join(cfg.StoragePath, identity.Uuid, dbid+GetConfig().DbSuffix))
if err != nil {
continue
}
path := filepath.Join(cfg.StoragePath, identity.Uuid, dbid+cfg.DbSuffix)
var foundFile string
var foundId int64
var foundMsg meowlib.DbMessage
err := withDbRead(path, func(db *sql.DB) error {
rows, err := db.Query("SELECT id, m FROM message ORDER BY id DESC")
if err != nil {
db.Close()
continue
return err
}
defer rows.Close()
for rows.Next() {
var id int64
var m []byte
@@ -399,13 +375,17 @@ func FindMessageByUuid(peer *Peer, messageUuid string, password string) (string,
continue
}
if dbm.Status != nil && dbm.Status.Uuid == messageUuid {
rows.Close()
db.Close()
return dbid, id, &dbm, nil
foundFile = dbid
foundId = id
foundMsg = dbm
return nil
}
}
rows.Close()
db.Close()
return nil
})
if err == nil && foundFile != "" {
return foundFile, foundId, &foundMsg, nil
}
}
return "", 0, nil, fmt.Errorf("message with UUID %s not found", messageUuid)
}
@@ -430,19 +410,18 @@ func UpdateMessageAck(peer *Peer, messageUuid string, receivedAt uint64, process
}
func createMessageTable(db *sql.DB) error {
createMessageTableSQL := `CREATE TABLE message (
stmt, err := db.Prepare(`CREATE TABLE message (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"m" BLOB);` // SQL Statement for Create Table
statement, err := db.Prepare(createMessageTableSQL) // Prepare SQL Statement
"m" BLOB)`)
if err != nil {
return err
}
statement.Exec() // Execute SQL Statements
stmt.Exec()
return nil
}
func createServerTable(db *sql.DB) error {
createServerTableSQL := `CREATE TABLE servers (
stmt, err := db.Prepare(`CREATE TABLE servers (
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
"country" varchar(2),
"public" bool,
@@ -453,11 +432,10 @@ func createServerTable(db *sql.DB) error {
"name" varchar(255);
"description" varchar(5000)
"publickey" varchar(10000)
)` // SQL Statement for Create Table
statement, err := db.Prepare(createServerTableSQL) // Prepare SQL Statement
)`)
if err != nil {
return err
}
statement.Exec() // Execute SQL Statements
stmt.Exec()
return nil
}
+57 -37
View File
@@ -9,6 +9,7 @@ import (
"errors"
"path/filepath"
"sort"
"sync"
"forge.redroom.link/yves/meowlib"
"github.com/dgraph-io/badger"
@@ -17,11 +18,12 @@ import (
type PeerStorage struct {
DbFile string `json:"db_file,omitempty"`
mu sync.RWMutex
db *badger.DB
cache map[string]*Peer
}
// Open the badger database from struct PeerStorage
// open opens the Badger database. Caller must hold mu (write).
func (ps *PeerStorage) open() error {
if ps.DbFile == "" {
ps.DbFile = uuid.New().String()
@@ -34,20 +36,27 @@ func (ps *PeerStorage) open() error {
opts.Logger = nil
var err error
ps.db, err = badger.Open(opts)
if err != nil {
return err
}
return nil
// close closes the Badger database. Caller must hold mu (write).
func (ps *PeerStorage) close() {
ps.db.Close()
}
// Store function StorePeer stores a peer in the badger database with Peer.Uid as key
// StorePeer stores a peer in the Badger database with Peer.Uid as key.
func (ps *PeerStorage) StorePeer(peer *Peer) error {
err := ps.open()
if err != nil {
ps.mu.Lock()
defer ps.mu.Unlock()
return ps.storePeerLocked(peer)
}
// storePeerLocked is StorePeer without acquiring the lock. Caller must hold mu (write).
func (ps *PeerStorage) storePeerLocked(peer *Peer) error {
if err := ps.open(); err != nil {
return err
}
defer ps.close()
// first marshal the Peer to bytes with protobuf
jsonsrv, err := json.Marshal(peer)
if err != nil {
return err
@@ -65,26 +74,24 @@ func (ps *PeerStorage) StorePeer(peer *Peer) error {
}
shakey := sha256.Sum256([]byte(peer.Uid))
key := shakey[:]
// add it to cache
ps.cache[peer.Uid] = peer
// then store it in the database
return ps.db.Update(func(txn *badger.Txn) error {
return txn.Set(key, data)
})
}
// LoadPeer function loads a Peer from the badger database with Peer.GetUid() as key
// LoadPeer loads a Peer from the Badger database with Peer.GetUid() as key.
func (ps *PeerStorage) LoadPeer(uid string, password string) (*Peer, error) {
ps.mu.Lock()
defer ps.mu.Unlock()
var peer Peer
err := ps.open()
if err != nil {
if err := ps.open(); err != nil {
return nil, err
}
defer ps.close()
shakey := sha256.Sum256([]byte(uid))
key := shakey[:]
err = ps.db.View(func(txn *badger.Txn) error {
err := ps.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(key)
if err != nil {
return err
@@ -100,29 +107,35 @@ func (ps *PeerStorage) LoadPeer(uid string, password string) (*Peer, error) {
return &peer, err
}
// DeletePeer function deletes a Peer from the badger database with Peer.GetUid() as key
// DeletePeer deletes a Peer from the Badger database with Peer.GetUid() as key.
func (ps *PeerStorage) DeletePeer(uid string) error {
err := ps.open()
if err != nil {
ps.mu.Lock()
defer ps.mu.Unlock()
if err := ps.open(); err != nil {
return err
}
defer ps.close()
shakey := sha256.Sum256([]byte(uid))
key := shakey[:]
return ps.db.Update(func(txn *badger.Txn) error {
err := ps.db.Update(func(txn *badger.Txn) error {
return txn.Delete(key)
})
if err == nil {
delete(ps.cache, uid)
}
return err
}
// LoadPeers function loads Peers from the badger database with a specific password
// LoadPeers loads all Peers from the Badger database and populates the cache.
func (ps *PeerStorage) LoadPeers(password string) ([]*Peer, error) {
ps.mu.Lock()
defer ps.mu.Unlock()
var peers []*Peer
err := ps.open()
if err != nil {
if err := ps.open(); err != nil {
return nil, err
}
defer ps.close()
err = ps.db.View(func(txn *badger.Txn) error {
err := ps.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchSize = 10
it := txn.NewIterator(opts)
@@ -144,32 +157,29 @@ func (ps *PeerStorage) LoadPeers(password string) ([]*Peer, error) {
}
return nil
})
// Sort peers based on peer.Name
sort.Slice(peers, func(i, j int) bool {
return peers[i].Name < peers[j].Name
})
return peers, err
}
// GetPeers function returns all peers from the cache as a sorted array
// GetPeers returns all peers from the cache as a sorted slice.
func (ps *PeerStorage) GetPeers() ([]*Peer, error) {
ps.mu.RLock()
defer ps.mu.RUnlock()
peers := make([]*Peer, 0, len(ps.cache))
for _, peer := range ps.cache {
peers = append(peers, peer)
}
// Sort peers based on peer.Name
sort.Slice(peers, func(i, j int) bool {
return peers[i].Name < peers[j].Name
})
return peers, nil
}
// close the badger database
func (ps *PeerStorage) close() {
ps.db.Close()
}
func (ps *PeerStorage) GetFromPublicKey(publickey string) *Peer {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, peer := range ps.cache {
if peer.ContactPublicKey == publickey {
return peer
@@ -179,6 +189,8 @@ func (ps *PeerStorage) GetFromPublicKey(publickey string) *Peer {
}
func (ps *PeerStorage) GetFromInvitationId(invitationId string) *Peer {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, peer := range ps.cache {
if peer.InvitationId == invitationId {
return peer
@@ -188,6 +200,8 @@ func (ps *PeerStorage) GetFromInvitationId(invitationId string) *Peer {
}
func (ps *PeerStorage) GetFromMyLookupKey(publickey string) *Peer {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, peer := range ps.cache {
if peer.MyLookupKp.Public == publickey {
return peer
@@ -197,6 +211,8 @@ func (ps *PeerStorage) GetFromMyLookupKey(publickey string) *Peer {
}
func (ps *PeerStorage) NameExists(name string) bool {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, peer := range ps.cache {
if peer.Name == name {
return true
@@ -206,6 +222,8 @@ func (ps *PeerStorage) NameExists(name string) bool {
}
func (ps *PeerStorage) GetFromName(name string) *Peer {
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, peer := range ps.cache {
if peer.Name == name {
return peer
@@ -215,26 +233,29 @@ func (ps *PeerStorage) GetFromName(name string) *Peer {
}
func (ps *PeerStorage) GetFromUid(uid string) *Peer {
ps.mu.RLock()
defer ps.mu.RUnlock()
return ps.cache[uid]
}
// Checks if the received contact card is an answer to an invitation, returns true if it is, and the proposed and received nicknames
// CheckInvitation checks if the received contact card is an answer to an invitation.
func (ps *PeerStorage) CheckInvitation(ReceivedContact *meowlib.ContactCard) (isAnswer bool, proposedNick string, receivedNick string, invitationMessage string) {
// invitation Id found, this is an answer to an invitation
ps.mu.RLock()
defer ps.mu.RUnlock()
for _, p := range ps.cache {
if p.InvitationId == ReceivedContact.InvitationId {
return true, p.Name, ReceivedContact.Name, ReceivedContact.InvitationMessage
}
}
// it's an invitation
return false, "", ReceivedContact.Name, ReceivedContact.InvitationMessage
}
// Finalizes an invitation, returns nil if successful
// FinalizeInvitation completes an invitation handshake and persists the updated peer.
func (ps *PeerStorage) FinalizeInvitation(ReceivedContact *meowlib.ContactCard) error {
ps.mu.Lock()
defer ps.mu.Unlock()
for i, p := range ps.cache {
if p.InvitationId == ReceivedContact.InvitationId {
//id.Peers[i].Name = ReceivedContact.Name
ps.cache[i].ContactEncryption = ReceivedContact.EncryptionPublicKey
ps.cache[i].ContactLookupKey = ReceivedContact.LookupPublicKey
ps.cache[i].ContactPublicKey = ReceivedContact.ContactPublicKey
@@ -246,8 +267,7 @@ func (ps *PeerStorage) FinalizeInvitation(ReceivedContact *meowlib.ContactCard)
srvs = append(srvs, ReceivedContact.PullServers[srv].GetUid())
}
ps.cache[i].ContactPullServers = srvs
ps.StorePeer(ps.cache[i])
return nil
return ps.storePeerLocked(ps.cache[i])
}
}
return errors.New("no matching contact found for invitationId " + ReceivedContact.InvitationId)
+57 -44
View File
@@ -7,6 +7,7 @@ import (
"crypto/sha256"
"encoding/json"
"path/filepath"
"sync"
"forge.redroom.link/yves/meowlib"
"github.com/dgraph-io/badger"
@@ -14,30 +15,37 @@ import (
type ServerStorage struct {
DbFile string `json:"db_file,omitempty"`
mu sync.Mutex
db *badger.DB
}
// Open a badger database from struct ServerStorage
// open opens the Badger database. Caller must hold mu.
func (ss *ServerStorage) open() error {
opts := badger.DefaultOptions(filepath.Join(GetConfig().StoragePath, GetConfig().GetIdentity().Uuid, ss.DbFile))
opts.Logger = nil
var err error
ss.db, err = badger.Open(opts)
if err != nil {
return err
}
return nil
// close closes the Badger database. Caller must hold mu.
func (ss *ServerStorage) close() {
ss.db.Close()
}
// Store function StoreServer stores a server in a badger database with Server.GetUid() as key
// StoreServer stores a server in the Badger database with Server.GetUid() as key.
func (ss *ServerStorage) StoreServer(sc *Server) error {
err := ss.open()
if err != nil {
ss.mu.Lock()
defer ss.mu.Unlock()
return ss.storeServerLocked(sc)
}
// storeServerLocked is StoreServer without acquiring the lock. Caller must hold mu.
func (ss *ServerStorage) storeServerLocked(sc *Server) error {
if err := ss.open(); err != nil {
return err
}
defer ss.close()
// first marshal the Server to bytes with protobuf
jsonsrv, err := json.Marshal(sc)
if err != nil {
return err
@@ -52,51 +60,56 @@ func (ss *ServerStorage) StoreServer(sc *Server) error {
}
shakey := sha256.Sum256([]byte(sc.GetServerCard().GetUid()))
key := shakey[:]
// then store it in the database
return ss.db.Update(func(txn *badger.Txn) error {
return txn.Set(key, data)
})
}
// Check if a server exists in a badger database with Server.GetUid() as key
// ServerExists checks if a server exists in the Badger database.
func (ss *ServerStorage) ServerExists(sc *Server) (bool, error) {
err := ss.open()
if err != nil {
ss.mu.Lock()
defer ss.mu.Unlock()
return ss.serverExistsLocked(sc)
}
// serverExistsLocked is ServerExists without acquiring the lock. Caller must hold mu.
func (ss *ServerStorage) serverExistsLocked(sc *Server) (bool, error) {
if err := ss.open(); err != nil {
return false, err
}
defer ss.close()
shakey := sha256.Sum256([]byte(sc.GetServerCard().GetUid()))
key := shakey[:]
// check if key exists in badger database
err = ss.db.View(func(txn *badger.Txn) error {
err := ss.db.View(func(txn *badger.Txn) error {
_, err := txn.Get(key)
return err
}) // Add a comma here
if err != nil { // key does not exist
})
if err != nil {
return false, nil
}
return true, nil
}
// Store a server in a badger database with Server.GetUid() as key if it is not already there
// StoreServerIfNotExists stores a server only if it is not already present.
func (ss *ServerStorage) StoreServerIfNotExists(sc *Server) error {
exists, err := ss.ServerExists(sc)
ss.mu.Lock()
defer ss.mu.Unlock()
exists, err := ss.serverExistsLocked(sc)
if err != nil {
return err
}
if !exists {
return ss.StoreServer(sc)
return ss.storeServerLocked(sc)
}
return nil
}
// LoadServer function loads a Server from a badger database with Server.GetUid() as key
// LoadServer loads a Server from the Badger database by uid.
func (ss *ServerStorage) LoadServer(uid string) (*Server, error) {
ss.mu.Lock()
defer ss.mu.Unlock()
var sc Server
err := ss.open()
if err != nil {
if err := ss.open(); err != nil {
return nil, err
}
defer ss.close()
@@ -122,10 +135,11 @@ func (ss *ServerStorage) LoadServer(uid string) (*Server, error) {
return &sc, err
}
// DeleteServer function deletes a Server from a badger database with Server.GetUid() as key
// DeleteServer deletes a Server from the Badger database by uid.
func (ss *ServerStorage) DeleteServer(uid string) error {
err := ss.open()
if err != nil {
ss.mu.Lock()
defer ss.mu.Unlock()
if err := ss.open(); err != nil {
return err
}
defer ss.close()
@@ -136,11 +150,12 @@ func (ss *ServerStorage) DeleteServer(uid string) error {
})
}
// LoadAllServers function loads all Servers from a badger database
// LoadAllServers loads all Servers from the Badger database.
func (ss *ServerStorage) LoadAllServers() ([]*Server, error) {
ss.mu.Lock()
defer ss.mu.Unlock()
var scs []*Server
err := ss.open()
if err != nil {
if err := ss.open(); err != nil {
return nil, err
}
defer ss.close()
@@ -173,11 +188,12 @@ func (ss *ServerStorage) LoadAllServers() ([]*Server, error) {
return scs, err
}
// LoadAllServers function loads all ServersCards from a badger database
// LoadAllServerCards loads all ServerCards from the Badger database.
func (ss *ServerStorage) LoadAllServerCards() ([]*meowlib.ServerCard, error) {
ss.mu.Lock()
defer ss.mu.Unlock()
var scs []*meowlib.ServerCard
err := ss.open()
if err != nil {
if err := ss.open(); err != nil {
return nil, err
}
defer ss.close()
@@ -210,11 +226,12 @@ func (ss *ServerStorage) LoadAllServerCards() ([]*meowlib.ServerCard, error) {
return scs, err
}
// LoadServersFromUids function loads Servers with id in []Uid parameter from a badger database
// LoadServersFromUids loads Servers whose UIDs are in the provided slice.
func (ss *ServerStorage) LoadServersFromUids(uids []string) ([]*Server, error) {
ss.mu.Lock()
defer ss.mu.Unlock()
var scs []*Server
err := ss.open()
if err != nil {
if err := ss.open(); err != nil {
return nil, err
}
defer ss.close()
@@ -248,11 +265,12 @@ func (ss *ServerStorage) LoadServersFromUids(uids []string) ([]*Server, error) {
return scs, err
}
// LoadServersFromUids function loads Servers with id in []Uid parameter from a badger database
// LoadServerCardsFromUids loads ServerCards whose UIDs are in the provided slice.
func (ss *ServerStorage) LoadServerCardsFromUids(uids []string) ([]*meowlib.ServerCard, error) {
ss.mu.Lock()
defer ss.mu.Unlock()
var scs []*meowlib.ServerCard
err := ss.open()
if err != nil {
if err := ss.open(); err != nil {
return nil, err
}
defer ss.close()
@@ -285,8 +303,3 @@ func (ss *ServerStorage) LoadServerCardsFromUids(uids []string) ([]*meowlib.Serv
})
return scs, err
}
// close a badger database
func (ss *ServerStorage) close() {
ss.db.Close()
}
+5
View File
@@ -1,13 +1,18 @@
@startuml General Invitation Steps
InitiatingUser -> InitiatingUser : STEP_1 = Create InivitedUser_Id generate a public key, invitation uid & message for InvitedUser optionnally password protected
note right of InitiatingUser #Yellow: Invitee created, only temp key
InitiatingUser -> InvitedUser: STEP_1_SEND= transmit step 1 data (QR Code, Bluetooth, messaging, mail, ...) optional password being tranmitted through another channel
InvitedUser -> InvitedUser :Create InitatingUser_Id & InvitedUser ContactCard
note right of InvitedUser #Yellow: Initiator created, empty
InvitedUser -> InitiatingUser: STEP_2_SEND=transmit InvitedUser ContactCard (QR Codes, Bluetooth, messaging, mail, ...) encrypted with initiating user pub key
InitiatingUser -> InitiatingUser : STEP_3=InitiatingUser_Id Accept Invitation and create answer (Generate InitiatingUser ContactCard and create finalized InvitedUser contact)
note right of InitiatingUser #Lime: Invitee complete
InitiatingUser -> InvitedUser: STEP_3_SEND=Send answer through invited user's message servers from contact card
InvitedUser -> InvitedUser : Finalize InitiatingUser from its ContactCard
note right of InvitedUser #Lime: Initiator complete
InvitedUser -> InitiatingUser: STEP_4= Send confirmation to InitiatingUser that communication is possible through initiating user's message servers from contact card
@enduml
+9 -11
View File
@@ -1,11 +1,9 @@
module forge.redroom.link/yves/meowlib
go 1.23.1
toolchain go1.24.2
go 1.25.0
require (
github.com/ProtonMail/gopenpgp/v2 v2.8.3
github.com/ProtonMail/gopenpgp/v2 v2.10.0
github.com/awnumar/memguard v0.23.0
github.com/dgraph-io/badger v1.6.2
github.com/go-redis/redis v6.15.9+incompatible
@@ -16,18 +14,18 @@ require (
github.com/pkg/errors v0.9.1
github.com/rs/zerolog v1.34.0
github.com/stretchr/testify v1.9.0
google.golang.org/protobuf v1.36.6
google.golang.org/protobuf v1.36.11
)
require (
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect
github.com/ProtonMail/go-crypto v1.2.0 // indirect
github.com/ProtonMail/go-crypto v1.4.1 // indirect
github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f // indirect
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect
github.com/alicebob/miniredis v2.5.0+incompatible // indirect
github.com/awnumar/memcall v0.4.0 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cloudflare/circl v1.6.1 // indirect
github.com/cloudflare/circl v1.6.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgraph-io/ristretto v0.0.2 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
@@ -43,11 +41,11 @@ require (
github.com/status-im/doubleratchet v3.0.0+incompatible // indirect
github.com/twitchtv/twirp v8.1.3+incompatible // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
golang.org/x/crypto v0.41.0 // indirect
golang.org/x/crypto v0.50.0 // indirect
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect
golang.org/x/net v0.42.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/text v0.28.0 // indirect
golang.org/x/net v0.52.0 // indirect
golang.org/x/sys v0.43.0 // indirect
golang.org/x/text v0.36.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect
google.golang.org/grpc v1.62.0 // indirect
+18
View File
@@ -5,10 +5,14 @@ github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/ProtonMail/go-crypto v1.2.0 h1:+PhXXn4SPGd+qk76TlEePBfOfivE0zkWFenhGhFLzWs=
github.com/ProtonMail/go-crypto v1.2.0/go.mod h1:9whxjD8Rbs29b4XWbB8irEcE8KHMqaR2e7GWU1R+/PE=
github.com/ProtonMail/go-crypto v1.4.1 h1:9RfcZHqEQUvP8RzecWEUafnZVtEvrBVL9BiF67IQOfM=
github.com/ProtonMail/go-crypto v1.4.1/go.mod h1:e1OaTyu5SYVrO9gKOEhTc+5UcXtTUa+P3uLudwcgPqo=
github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f h1:tCbYj7/299ekTTXpdwKYF8eBlsYsDVoggDAuAjoK66k=
github.com/ProtonMail/go-mime v0.0.0-20230322103455-7d82a3887f2f/go.mod h1:gcr0kNtGBqin9zDW9GOHcVntrwnjrK+qdJ06mWYBybw=
github.com/ProtonMail/gopenpgp/v2 v2.8.3 h1:1jHlELwCR00qovx2B50DkL/FjYwt/P91RnlsqeOp2Hs=
github.com/ProtonMail/gopenpgp/v2 v2.8.3/go.mod h1:LiuOTbnJit8w9ZzOoLscj0kmdALY7hfoCVh5Qlb0bcg=
github.com/ProtonMail/gopenpgp/v2 v2.10.0 h1:llCzLvntC9+iH+if/na4AgKTef/Zm4vpaRrR3+JdKvo=
github.com/ProtonMail/gopenpgp/v2 v2.10.0/go.mod h1:dc0h9Pg3ftfN0U4pfRzujilfh61A2R52wgMkZWcWm2I=
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 h1:uvdUDbHQHO85qeSydJtItA4T55Pw6BtAejd0APRJOCE=
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI=
@@ -28,6 +32,8 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cloudflare/circl v1.6.1 h1:zqIqSPIndyBh1bjLVVDHMPpVKqp8Su/V+6MeDzzQBQ0=
github.com/cloudflare/circl v1.6.1/go.mod h1:uddAzsPgqdMAYatqJ0lsjX1oECcQLIlRpzZh3pJrofs=
github.com/cloudflare/circl v1.6.3 h1:9GPOhQGF9MCYUeXyMYlqTR6a5gTrgR/fBLXvUgtVcg8=
github.com/cloudflare/circl v1.6.3/go.mod h1:2eXP6Qfat4O/Yhh8BznvKnJ+uzEoTQ6jVKJRn81BiS4=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
@@ -88,6 +94,7 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
@@ -248,6 +255,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4=
golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc=
golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI=
golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q=
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 h1:LfspQV/FYTatPTr/3HzIcmiUFH7PGP+OQ6mgDYo3yuQ=
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
@@ -264,6 +273,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0=
golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -271,6 +282,7 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -294,6 +306,8 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI=
golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
@@ -308,6 +322,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg=
golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
@@ -333,6 +349,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
+6
View File
@@ -38,6 +38,12 @@ func HttpPostMessage(url string, msg []byte, timeout int) (response []byte, err
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
// Server already accepted the request (2xx) — body truncation on our
// side doesn't mean the message wasn't stored. Return what we have so
// the caller doesn't retry and produce a duplicate.
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
return body, nil
}
return nil, err
}
return body, nil
+41 -48
View File
@@ -2,7 +2,6 @@ package server
import (
"context"
"sync"
"time"
"forge.redroom.link/yves/meowlib"
@@ -37,7 +36,7 @@ func NewRedisRouter(server *Identity, redisUrl string, password string, db int,
return &r
}
func (r *RedisRouter) Route(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) {
func (r *RedisRouter) Route(ctx context.Context, msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) {
var from_server *meowlib.FromServerMessage
// update messages counter
err := r.Client.Incr("statistics:messages:total").Err()
@@ -59,10 +58,9 @@ func (r *RedisRouter) Route(msg *meowlib.ToServerMessage) (*meowlib.FromServerMe
if err != nil {
return nil, err
}
if msg.Timeout > 0 {
if msg.Timeout > 0 && len(from_server.Chat) == 0 && from_server.Invitation == nil {
logger.Info().Msg("long poll, subscribing for messages")
// set timeout for the lookup
from_server, err = r.subscribe(msg, int(msg.Timeout))
from_server, err = r.subscribe(ctx, msg, int(msg.Timeout))
if err != nil {
return nil, err
}
@@ -206,60 +204,55 @@ func (r *RedisRouter) checkForMessage(msg *meowlib.ToServerMessage) (*meowlib.Fr
return &from_server, nil
}
func goSubscribeAndListen(client *redis.Client, key string, messages chan<- string, wg *sync.WaitGroup, done <-chan struct{}) {
defer wg.Done()
pubsub := client.Subscribe("msgch:" + key)
func (r *RedisRouter) subscribe(reqCtx context.Context, msg *meowlib.ToServerMessage, timeout int) (*meowlib.FromServerMessage, error) {
if err := r.Client.Incr("statistics:messages:messagessubscription").Err(); err != nil {
return nil, err
}
channels := make([]string, 0, len(msg.PullRequest))
for _, rq := range msg.PullRequest {
channels = append(channels, "msgch:"+rq.LookupKey)
}
// Subscribe before re-checking: any publish that fires after Subscribe()
// returns is buffered in pubsub.Channel(), closing the store→publish→check race.
pubsub := r.Client.Subscribe(channels...)
defer pubsub.Close()
// Create a new channel for the messages from this subscription
myMessages := make(chan *redis.Message)
go func() {
for {
msg, err := pubsub.ReceiveMessage()
if err != nil {
close(myMessages)
return
}
myMessages <- msg
}
}()
// Wait for a message or for the done signal
select {
case msg := <-myMessages:
messages <- msg.Payload
case <-done:
return
// Drain one subscribe-confirmation per channel.
for range len(channels) {
if _, err := pubsub.Receive(); err != nil {
return nil, err
}
}
func (r *RedisRouter) subscribe(msg *meowlib.ToServerMessage, timeout int) (*meowlib.FromServerMessage, error) {
var from_server meowlib.FromServerMessage
// update messages counter
err := r.Client.Incr("statistics:messages:messagessubscription").Err()
// Re-check now that we are subscribed; catches messages that arrived
// between the caller's first checkForMessage and our Subscribe call.
fromServer, err := r.checkForMessage(msg)
if err != nil {
return nil, err
}
messages := make(chan string)
var wg sync.WaitGroup
done := make(chan struct{})
// extract lookup keys and subscribe
// iterate over pull requests
for _, rq := range msg.PullRequest {
wg.Add(1)
// subscribe to the lookup key
go goSubscribeAndListen(r.Client, rq.LookupKey, messages, &wg, done)
if len(fromServer.Chat) > 0 || fromServer.Invitation != nil {
return fromServer, nil
}
// wait for timeout or message
ctx, cancel := context.WithTimeout(reqCtx, time.Duration(timeout)*time.Second)
defer cancel()
ch := pubsub.Channel()
select {
case <-messages:
close(done)
return r.checkForMessage(msg)
case <-time.After(time.Duration(timeout) * time.Second): // 10 seconds timeout
close(done)
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
return fromServer, nil
}
return nil, ctx.Err()
case _, ok := <-ch:
if !ok {
return fromServer, nil
}
return r.checkForMessage(msg)
}
wg.Wait()
return &from_server, nil
}
func (r *RedisRouter) handleInvitation(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) {
+14 -13
View File
@@ -1,6 +1,7 @@
package server
import (
"context"
"testing"
"time"
@@ -33,7 +34,7 @@ func newTestRouter(t *testing.T) (*RedisRouter, *miniredis.Miniredis) {
Addr: mr.Addr(),
}),
InvitationTimeout: 3600,
Context: nil,
Context: context.Background(),
}
// seed the statistics:start key that NewRedisRouter normally sets
router.Client.Set("statistics:start", time.Now().UTC().Format(time.RFC3339), 0)
@@ -219,7 +220,7 @@ func TestRouteDispatchesStoreAndCheck(t *testing.T) {
{Destination: dest, Payload: []byte("routed msg")},
},
}
resp, err := router.Route(storeReq)
resp, err := router.Route(context.Background(),storeReq)
assert.NoError(t, err)
assert.Equal(t, "route-store-uuid", resp.UuidAck)
@@ -229,7 +230,7 @@ func TestRouteDispatchesStoreAndCheck(t *testing.T) {
{LookupKey: dest},
},
}
resp, err = router.Route(pullReq)
resp, err = router.Route(context.Background(),pullReq)
assert.NoError(t, err)
assert.Len(t, resp.Chat, 1)
assert.Equal(t, []byte("routed msg"), resp.Chat[0].Payload)
@@ -240,7 +241,7 @@ func TestRouteEmptyMessage(t *testing.T) {
router, mr := newTestRouter(t)
defer mr.Close()
resp, err := router.Route(&meowlib.ToServerMessage{})
resp, err := router.Route(context.Background(),&meowlib.ToServerMessage{})
assert.NoError(t, err)
assert.Nil(t, resp)
}
@@ -250,9 +251,9 @@ func TestRouteIncrementsTotalCounter(t *testing.T) {
router, mr := newTestRouter(t)
defer mr.Close()
router.Route(&meowlib.ToServerMessage{})
router.Route(&meowlib.ToServerMessage{})
router.Route(&meowlib.ToServerMessage{})
router.Route(context.Background(),&meowlib.ToServerMessage{})
router.Route(context.Background(),&meowlib.ToServerMessage{})
router.Route(context.Background(),&meowlib.ToServerMessage{})
val, err := router.Client.Get("statistics:messages:total").Int()
assert.NoError(t, err)
@@ -553,7 +554,7 @@ func TestRouteMatriochka(t *testing.T) {
Data: []byte("wrapped"),
},
}
resp, err := router.Route(msg)
resp, err := router.Route(context.Background(),msg)
assert.NoError(t, err)
assert.Equal(t, "route-mtk", resp.UuidAck)
@@ -577,7 +578,7 @@ func TestRouteInvitation(t *testing.T) {
ShortcodeLen: 6,
},
}
resp, err := router.Route(msg)
resp, err := router.Route(context.Background(),msg)
assert.NoError(t, err)
assert.NotEmpty(t, resp.Invitation.Shortcode)
assert.Len(t, resp.Invitation.Shortcode, 6)
@@ -594,7 +595,7 @@ func TestStatisticsCountersIncrement(t *testing.T) {
dest := "stats-dest"
// one store increments usermessages
router.Route(&meowlib.ToServerMessage{
router.Route(context.Background(),&meowlib.ToServerMessage{
Messages: []*meowlib.PackedUserMessage{
{Destination: dest, Payload: []byte("x")},
},
@@ -603,7 +604,7 @@ func TestStatisticsCountersIncrement(t *testing.T) {
assert.Equal(t, 1, val)
// one pull increments messagelookups
router.Route(&meowlib.ToServerMessage{
router.Route(context.Background(),&meowlib.ToServerMessage{
PullRequest: []*meowlib.ConversationRequest{
{LookupKey: dest},
},
@@ -612,14 +613,14 @@ func TestStatisticsCountersIncrement(t *testing.T) {
assert.Equal(t, 1, val)
// one matriochka increments matriochka counter
router.Route(&meowlib.ToServerMessage{
router.Route(context.Background(),&meowlib.ToServerMessage{
MatriochkaMessage: &meowlib.Matriochka{Data: []byte("m")},
})
val, _ = router.Client.Get("statistics:messages:matriochka").Int()
assert.Equal(t, 1, val)
// one invitation increments invitation counter
router.Route(&meowlib.ToServerMessage{
router.Route(context.Background(),&meowlib.ToServerMessage{
Invitation: &meowlib.Invitation{
Step: 1,
Payload: []byte("i"),