This commit is contained in:
@@ -2,6 +2,7 @@ package helpers
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
@@ -179,8 +180,15 @@ func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error
|
|||||||
}
|
}
|
||||||
filenames = []string{}
|
filenames = []string{}
|
||||||
|
|
||||||
|
// Send delivery ack if the peer requested it
|
||||||
|
if peer.SendDeliveryAck && usermsg.Status != nil && usermsg.Status.Uuid != "" {
|
||||||
|
storagePath := filepath.Join(client.GetConfig().StoragePath, identity.Uuid)
|
||||||
|
if ackErr := sendDeliveryAck(storagePath, peer, usermsg.Status.Uuid); ackErr != nil {
|
||||||
|
logger.Warn().Err(ackErr).Str("peer", peer.Uid).Msg("ConsumeInboxFile: sendDeliveryAck")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
err = os.Remove(messageFilename)
|
err = os.Remove(messageFilename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -240,3 +248,45 @@ func LongPollAllServerJobs(storage_path string, jobs []client.RequestsJob, timeo
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sendDeliveryAck builds a delivery acknowledgment for messageUuid and enqueues
|
||||||
|
// it for sending to the peer's contact pull servers.
|
||||||
|
func sendDeliveryAck(storagePath string, peer *client.Peer, messageUuid string) error {
|
||||||
|
packedMsg, _, err := BuildReceivedMessage(messageUuid, peer.Uid, time.Now().UTC().Unix())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("sendDeliveryAck: BuildReceivedMessage: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := proto.Marshal(packedMsg)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("sendDeliveryAck: proto.Marshal: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
outboxDir := filepath.Join(storagePath, "outbox")
|
||||||
|
if err := os.MkdirAll(outboxDir, 0700); err != nil {
|
||||||
|
return fmt.Errorf("sendDeliveryAck: MkdirAll: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
outboxFile := filepath.Join(outboxDir, "ack_"+uuid.New().String())
|
||||||
|
if err := os.WriteFile(outboxFile, data, 0600); err != nil {
|
||||||
|
return fmt.Errorf("sendDeliveryAck: WriteFile: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var servers []client.Server
|
||||||
|
for _, srvUid := range peer.ContactPullServers {
|
||||||
|
srv, loadErr := client.GetConfig().GetIdentity().MessageServers.LoadServer(srvUid)
|
||||||
|
if loadErr == nil && srv != nil {
|
||||||
|
servers = append(servers, *srv)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(servers) == 0 {
|
||||||
|
os.Remove(outboxFile)
|
||||||
|
return errors.New("sendDeliveryAck: no contact servers found")
|
||||||
|
}
|
||||||
|
|
||||||
|
return client.PushSendJob(storagePath, &client.SendJob{
|
||||||
|
Queue: peer.Uid,
|
||||||
|
File: outboxFile,
|
||||||
|
Servers: servers,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@@ -84,34 +84,38 @@ func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid strin
|
|||||||
return packedMsg, dbFile, dbId, "", nil
|
return packedMsg, dbFile, dbId, "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func BuildAckMessage(messageUid string, srvuid string, peer_uid string, received int64, processed int64) ([]byte, string, error) {
|
func BuildReceivedMessage(messageUid string, peer_uid string, received int64) (*meowlib.PackedUserMessage, string, error) {
|
||||||
|
|
||||||
peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid)
|
peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid)
|
||||||
srv, err := client.GetConfig().GetIdentity().MessageServers.LoadServer(srvuid)
|
|
||||||
if err != nil {
|
|
||||||
return nil, "PrepareServerMessage : LoadServer", err
|
|
||||||
}
|
|
||||||
// Creating User message
|
// Creating User message
|
||||||
usermessage, err := peer.BuildSimpleUserMessage(nil)
|
usermessage, err := peer.BuildSimpleUserMessage(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "PrepareServerMessage : BuildSimpleUserMessage", err
|
return nil, "BuildReceivedMessage : BuildSimpleUserMessage", err
|
||||||
}
|
}
|
||||||
usermessage.Status.Uuid = messageUid
|
usermessage.Status.Uuid = messageUid
|
||||||
usermessage.Status.Received = uint64(received)
|
usermessage.Status.Received = uint64(received)
|
||||||
|
// Prepare cyphered + packed user message
|
||||||
|
packedMsg, err := peer.ProcessOutboundUserMessage(usermessage)
|
||||||
|
if err != nil {
|
||||||
|
return nil, "BuildReceivedMessage : ProcessOutboundUserMessage", err
|
||||||
|
}
|
||||||
|
return packedMsg, "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func BuildProcessedMessage(messageUid string, peer_uid string, processed int64) (*meowlib.PackedUserMessage, string, error) {
|
||||||
|
peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid)
|
||||||
|
// Creating User message
|
||||||
|
usermessage, err := peer.BuildSimpleUserMessage(nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, "BuildProcessedMessage : BuildSimpleUserMessage", err
|
||||||
|
}
|
||||||
|
usermessage.Status.Uuid = messageUid
|
||||||
usermessage.Status.Processed = uint64(processed)
|
usermessage.Status.Processed = uint64(processed)
|
||||||
// Prepare cyphered + packed user message
|
// Prepare cyphered + packed user message
|
||||||
packedMsg, err := peer.ProcessOutboundUserMessage(usermessage)
|
packedMsg, err := peer.ProcessOutboundUserMessage(usermessage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "PrepareServerMessage : ProcessOutboundUserMessage", err
|
return nil, "BuildProcessedMessage : ProcessOutboundUserMessage", err
|
||||||
}
|
}
|
||||||
// Creating Server message for transporting the user message
|
return packedMsg, "", nil
|
||||||
toServerMessage := srv.BuildToServerMessageFromUserMessage(packedMsg)
|
|
||||||
data, err := srv.ProcessOutboundMessage(toServerMessage)
|
|
||||||
if err != nil {
|
|
||||||
return nil, "PrepareServerMessage : ProcessOutboundMessage", err
|
|
||||||
}
|
|
||||||
|
|
||||||
return data, "", nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func ReadAckMessageResponse() {
|
func ReadAckMessageResponse() {
|
||||||
|
|||||||
Reference in New Issue
Block a user