duplicate messages send fixes
continuous-integration/drone/push Build is failing

This commit is contained in:
yc
2026-04-21 15:53:56 +02:00
parent 4e78ce5799
commit 8b106db52f
6 changed files with 437 additions and 441 deletions
+91 -108
View File
@@ -15,8 +15,8 @@ import (
"forge.redroom.link/yves/meowlib/client"
invmsgs "forge.redroom.link/yves/meowlib/client/invitation/messages"
invsrv "forge.redroom.link/yves/meowlib/client/invitation/server"
doubleratchet "github.com/status-im/doubleratchet"
"github.com/google/uuid"
doubleratchet "github.com/status-im/doubleratchet"
"google.golang.org/protobuf/proto"
)
@@ -109,7 +109,7 @@ func PollServer(storage_path string, job *client.RequestsJob, timeout int, longP
// SaveCheckJobs
func SaveCheckJobs() (string, error) {
me := client.GetConfig().GetIdentity()
err := me.SaveBackgroundJob()
err := me.SaveCheckJobs()
if err != nil {
return "CheckMessages: json.Marshal", err
@@ -174,7 +174,8 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error
// find the peer with that lookup key
peer := identity.Peers.GetFromMyLookupKey(packedUserMessage.Destination)
if peer == nil {
return nil, nil, "ReadMessage: GetFromMyLookupKey", errors.New("no visible peer for that message")
logger.Error().Str("destination", packedUserMessage.Destination).Msg("ConsumeInboxFile: no visible peer for that message, skipping")
continue
}
// Unpack the message — step-3 messages arrive before the initiator's identity
// key is known, so skip signature verification for pending peers.
@@ -185,100 +186,103 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error
usermsg, err = peer.ProcessInboundUserMessage(packedUserMessage)
}
if err != nil {
return nil, nil, "ReadMessage: ProcessInboundUserMessage", err
}
//return nil, nil, "ReadMessage: ProcessInboundUserMessage", err
logger.Error().Msg("ReadMessage: ProcessInboundUserMessage" + err.Error())
} else {
// Handle invitation step 3: initiator's full ContactCard arriving at the invitee.
if usermsg.Invitation != nil && usermsg.Invitation.Step == 3 {
invBytes, marshalErr := proto.Marshal(usermsg.Invitation)
if marshalErr == nil {
finalizedPeer, finalErr := invmsgs.Step4InviteeFinalizesInitiator(invBytes)
if finalErr == nil && finalizedPeer != nil {
// Auto-send step-4 confirmation to initiator's servers.
step4msgs, sendErr := invsrv.Step4PostConfirmation(finalizedPeer.InvitationId)
if sendErr == nil {
for i, bytemsg := range step4msgs {
if i < len(finalizedPeer.ContactPullServers) {
meowlib.HttpPostMessage(finalizedPeer.ContactPullServers[i], bytemsg, client.GetConfig().HttpTimeOut)
// Handle invitation step 3: initiator's full ContactCard arriving at the invitee.
if usermsg.Invitation != nil && usermsg.Invitation.Step == 3 {
invBytes, marshalErr := proto.Marshal(usermsg.Invitation)
if marshalErr == nil {
finalizedPeer, finalErr := invmsgs.Step4InviteeFinalizesInitiator(invBytes)
if finalErr == nil && finalizedPeer != nil {
// Auto-send step-4 confirmation to initiator's servers.
step4msgs, sendErr := invsrv.Step4PostConfirmation(finalizedPeer.InvitationId)
if sendErr == nil {
for i, bytemsg := range step4msgs {
if i < len(finalizedPeer.ContactPullServers) {
meowlib.HttpPostMessage(finalizedPeer.ContactPullServers[i], bytemsg, client.GetConfig().HttpTimeOut)
}
}
}
}
}
continue
}
continue
}
// Handle invitation step 4: invitee's confirmation arriving at the initiator.
if usermsg.Invitation != nil && usermsg.Invitation.Step == 4 {
// Contact is fully active — nothing more to do on the initiator side.
continue
}
// Check for received or processed already filled => it's an ack for one of our sent messages
if len(usermsg.Data) == 0 && usermsg.Status != nil && usermsg.Status.Uuid != "" &&
(usermsg.Status.Received != 0 || usermsg.Status.Processed != 0) {
password, _ := client.GetConfig().GetMemPass()
if ackErr := client.UpdateMessageAck(peer, usermsg.Status.Uuid, usermsg.Status.Received, usermsg.Status.Processed, password); ackErr != nil {
logger.Warn().Err(ackErr).Str("uuid", usermsg.Status.Uuid).Msg("ConsumeInboxFile: UpdateMessageAck")
// Handle invitation step 4: invitee's confirmation arriving at the initiator.
if usermsg.Invitation != nil && usermsg.Invitation.Step == 4 {
// Contact is fully active — nothing more to do on the initiator side.
continue
}
continue
}
//fmt.Println("From:", usermsg.From)
//jsonUserMessage, _ := json.Marshal(usermsg)
//fmt.Println(string(jsonUserMessage))
//peer = client.GetConfig().GetIdentity().Peers.GetFromPublicKey(usermsg.From)
// Check for received or processed already filled => it's an ack for one of our sent messages
if len(usermsg.Data) == 0 && usermsg.Status != nil && usermsg.Status.Uuid != "" &&
(usermsg.Status.Received != 0 || usermsg.Status.Processed != 0) {
password, _ := client.GetConfig().GetMemPass()
if ackErr := client.UpdateMessageAck(peer, usermsg.Status.Uuid, usermsg.Status.Received, usermsg.Status.Processed, password); ackErr != nil {
logger.Warn().Err(ackErr).Str("uuid", usermsg.Status.Uuid).Msg("ConsumeInboxFile: UpdateMessageAck")
}
continue
}
// detach files
if usermsg.Files != nil {
// create files folder
if _, err := os.Stat(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files")); os.IsNotExist(err) {
err = os.MkdirAll(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files"), 0700)
if err != nil {
return nil, nil, "ReadMessage: MkdirAll", err
//fmt.Println("From:", usermsg.From)
//jsonUserMessage, _ := json.Marshal(usermsg)
//fmt.Println(string(jsonUserMessage))
//peer = client.GetConfig().GetIdentity().Peers.GetFromPublicKey(usermsg.From)
// detach files
if usermsg.Files != nil {
// create files folder
if _, err := os.Stat(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files")); os.IsNotExist(err) {
err = os.MkdirAll(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files"), 0700)
if err != nil {
return nil, nil, "ReadMessage: MkdirAll", err
}
}
for _, file := range usermsg.Files {
filename := uuid.New().String() + "_" + file.Filename
filenames = append(filenames, peer.Name+" sent: "+filename)
// detach file
os.WriteFile(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files", filename), file.Data, 0600)
}
//? result["invitation finalized"] = peer.Name
}
// user message
messagesOverview = append(messagesOverview, peer.Name+" > "+string(usermsg.Data))
// stamp the received time before storing
receivedAt := time.Now().UTC().Unix()
if usermsg.Status == nil {
usermsg.Status = &meowlib.ConversationStatus{}
}
usermsg.Status.Received = uint64(receivedAt)
// add message to storage
err = peer.StoreMessage(usermsg, filenames)
if err != nil {
return nil, nil, "ReadMessage: StoreMessage", err
}
filenames = []string{}
// Persist peer to save updated DR state (DrStateJson)
if peer.DrRootKey != "" {
if storeErr := identity.Peers.StorePeer(peer); storeErr != nil {
logger.Warn().Err(storeErr).Str("peer", peer.Uid).Msg("ConsumeInboxFile: StorePeer (DR state)")
}
}
for _, file := range usermsg.Files {
filename := uuid.New().String() + "_" + file.Filename
filenames = append(filenames, peer.Name+" sent: "+filename)
// detach file
os.WriteFile(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files", filename), file.Data, 0600)
}
//? result["invitation finalized"] = peer.Name
}
// user message
messagesOverview = append(messagesOverview, peer.Name+" > "+string(usermsg.Data))
// stamp the received time before storing
receivedAt := time.Now().UTC().Unix()
if usermsg.Status == nil {
usermsg.Status = &meowlib.ConversationStatus{}
}
usermsg.Status.Received = uint64(receivedAt)
// add message to storage
err = peer.StoreMessage(usermsg, filenames)
if err != nil {
return nil, nil, "ReadMessage: StoreMessage", err
}
filenames = []string{}
// Persist peer to save updated DR state (DrStateJson)
if peer.DrRootKey != "" {
if storeErr := identity.Peers.StorePeer(peer); storeErr != nil {
logger.Warn().Err(storeErr).Str("peer", peer.Uid).Msg("ConsumeInboxFile: StorePeer (DR state)")
}
}
// Send delivery ack if the peer requested it
if peer.SendDeliveryAck && usermsg.Status.Uuid != "" {
storagePath := filepath.Join(client.GetConfig().StoragePath, identity.Uuid)
if ackErr := sendDeliveryAck(storagePath, peer, usermsg.Status.Uuid, receivedAt); ackErr != nil {
logger.Warn().Err(ackErr).Str("peer", peer.Uid).Msg("ConsumeInboxFile: sendDeliveryAck")
// Send delivery ack if the peer requested it
if peer.SendDeliveryAck && usermsg.Status.Uuid != "" {
storagePath := filepath.Join(client.GetConfig().StoragePath, identity.Uuid)
if ackErr := sendDeliveryAck(storagePath, peer, usermsg.Status.Uuid, receivedAt); ackErr != nil {
logger.Warn().Err(ackErr).Str("peer", peer.Uid).Msg("ConsumeInboxFile: sendDeliveryAck")
}
}
}
}
}
err = os.Remove(messageFilename)
@@ -290,54 +294,33 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error
return messagesOverview, filenames, "", nil
}
// LongPollAllSerevrJobs checks for messages on a all servers defived in job file
// LongPollAllServerJobs checks for messages on all servers defined in job file.
// It returns as soon as any server delivers at least one message, or 0 when all
// polls time out. resultChan is buffered so goroutines never block on write.
func LongPollAllServerJobs(storage_path string, jobs []client.RequestsJob, timeout int, longPoll bool) (int, string, error) {
// Channel to collect results
resultChan := make(chan int, len(jobs))
errChan := make(chan error, len(jobs))
// WaitGroup to sync goroutines
var wg sync.WaitGroup
// Loop through each job (server)
for _, job := range jobs {
wg.Add(1)
go func(job client.RequestsJob) {
defer wg.Done()
// Long-polling call to the server
cnt, _, err := PollServer(storage_path, &job, timeout, true)
if err == nil && cnt > 0 {
select {
case resultChan <- cnt:
default:
}
// Close the error channel to notify all goroutines
close(errChan)
resultChan <- cnt
}
}(job)
}
// Close the result channel when all workers are done
go func() {
wg.Wait()
close(resultChan)
}()
// Wait for the first message or all timeouts
select {
case cnt := <-resultChan:
if cnt, ok := <-resultChan; ok {
return cnt, "", nil
case <-errChan:
// If one fails and exitOnMessage is true
return 0, "", nil
}
return 0, "", nil
}
// sendDeliveryAck builds a delivery acknowledgment for messageUuid and enqueues