diff --git a/client/helpers/bgPollHelper.go b/client/helpers/bgPollHelper.go index b7b5f92..931016f 100644 --- a/client/helpers/bgPollHelper.go +++ b/client/helpers/bgPollHelper.go @@ -2,6 +2,7 @@ package helpers import ( "errors" + "fmt" "os" "path/filepath" "strconv" @@ -179,8 +180,15 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error } filenames = []string{} + // Send delivery ack if the peer requested it + if peer.SendDeliveryAck && usermsg.Status != nil && usermsg.Status.Uuid != "" { + storagePath := filepath.Join(client.GetConfig().StoragePath, identity.Uuid) + if ackErr := sendDeliveryAck(storagePath, peer, usermsg.Status.Uuid); ackErr != nil { + logger.Warn().Err(ackErr).Str("peer", peer.Uid).Msg("ConsumeInboxFile: sendDeliveryAck") + } } } + } err = os.Remove(messageFilename) if err != nil { @@ -240,3 +248,45 @@ func LongPollAllServerJobs(storage_path string, jobs []client.RequestsJob, timeo } } + +// sendDeliveryAck builds a delivery acknowledgment for messageUuid and enqueues +// it for sending to the peer's contact pull servers. +func sendDeliveryAck(storagePath string, peer *client.Peer, messageUuid string) error { + packedMsg, _, err := BuildReceivedMessage(messageUuid, peer.Uid, time.Now().UTC().Unix()) + if err != nil { + return fmt.Errorf("sendDeliveryAck: BuildReceivedMessage: %w", err) + } + + data, err := proto.Marshal(packedMsg) + if err != nil { + return fmt.Errorf("sendDeliveryAck: proto.Marshal: %w", err) + } + + outboxDir := filepath.Join(storagePath, "outbox") + if err := os.MkdirAll(outboxDir, 0700); err != nil { + return fmt.Errorf("sendDeliveryAck: MkdirAll: %w", err) + } + + outboxFile := filepath.Join(outboxDir, "ack_"+uuid.New().String()) + if err := os.WriteFile(outboxFile, data, 0600); err != nil { + return fmt.Errorf("sendDeliveryAck: WriteFile: %w", err) + } + + var servers []client.Server + for _, srvUid := range peer.ContactPullServers { + srv, loadErr := client.GetConfig().GetIdentity().MessageServers.LoadServer(srvUid) + if loadErr == nil && srv != nil { + servers = append(servers, *srv) + } + } + if len(servers) == 0 { + os.Remove(outboxFile) + return errors.New("sendDeliveryAck: no contact servers found") + } + + return client.PushSendJob(storagePath, &client.SendJob{ + Queue: peer.Uid, + File: outboxFile, + Servers: servers, + }) +} diff --git a/client/helpers/messageHelper.go b/client/helpers/messageHelper.go index e888a28..1372c46 100644 --- a/client/helpers/messageHelper.go +++ b/client/helpers/messageHelper.go @@ -84,34 +84,38 @@ func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid strin return packedMsg, dbFile, dbId, "", nil } -func BuildAckMessage(messageUid string, srvuid string, peer_uid string, received int64, processed int64) ([]byte, string, error) { - +func BuildReceivedMessage(messageUid string, peer_uid string, received int64) (*meowlib.PackedUserMessage, string, error) { peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid) - srv, err := client.GetConfig().GetIdentity().MessageServers.LoadServer(srvuid) - if err != nil { - return nil, "PrepareServerMessage : LoadServer", err - } // Creating User message usermessage, err := peer.BuildSimpleUserMessage(nil) if err != nil { - return nil, "PrepareServerMessage : BuildSimpleUserMessage", err + return nil, "BuildReceivedMessage : BuildSimpleUserMessage", err } usermessage.Status.Uuid = messageUid usermessage.Status.Received = uint64(received) + // Prepare cyphered + packed user message + packedMsg, err := peer.ProcessOutboundUserMessage(usermessage) + if err != nil { + return nil, "BuildReceivedMessage : ProcessOutboundUserMessage", err + } + return packedMsg, "", nil +} + +func BuildProcessedMessage(messageUid string, peer_uid string, processed int64) (*meowlib.PackedUserMessage, string, error) { + peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid) + // Creating User message + usermessage, err := peer.BuildSimpleUserMessage(nil) + if err != nil { + return nil, "BuildProcessedMessage : BuildSimpleUserMessage", err + } + usermessage.Status.Uuid = messageUid usermessage.Status.Processed = uint64(processed) // Prepare cyphered + packed user message packedMsg, err := peer.ProcessOutboundUserMessage(usermessage) if err != nil { - return nil, "PrepareServerMessage : ProcessOutboundUserMessage", err + return nil, "BuildProcessedMessage : ProcessOutboundUserMessage", err } - // Creating Server message for transporting the user message - toServerMessage := srv.BuildToServerMessageFromUserMessage(packedMsg) - data, err := srv.ProcessOutboundMessage(toServerMessage) - if err != nil { - return nil, "PrepareServerMessage : ProcessOutboundMessage", err - } - - return data, "", nil + return packedMsg, "", nil } func ReadAckMessageResponse() {