This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
package helpers
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
@@ -9,6 +11,8 @@ import (
|
||||
|
||||
"forge.redroom.link/yves/meowlib"
|
||||
"forge.redroom.link/yves/meowlib/client"
|
||||
"github.com/google/uuid"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
func PackMessageForServer(packedMsg *meowlib.PackedUserMessage, srvuid string) ([]byte, string, error) {
|
||||
@@ -137,6 +141,78 @@ func ReadAckMessageResponse() {
|
||||
//! update the status in message store
|
||||
}
|
||||
|
||||
// MarkMessageProcessed stamps the stored message with a processed timestamp of
|
||||
// now(), persists the updated record, and — if the peer has SendProcessingAck
|
||||
// enabled and the message carries a UUID — enqueues a processed acknowledgment
|
||||
// to the peer's contact pull servers.
|
||||
func MarkMessageProcessed(peerUid string, dbFile string, dbId int64) error {
|
||||
password, _ := client.GetConfig().GetMemPass()
|
||||
processedAt := time.Now().UTC().Unix()
|
||||
|
||||
dbm, err := client.GetDbMessage(dbFile, dbId, password)
|
||||
if err != nil {
|
||||
return fmt.Errorf("MarkMessageProcessed: GetDbMessage: %w", err)
|
||||
}
|
||||
if dbm.Status == nil {
|
||||
dbm.Status = &meowlib.ConversationStatus{}
|
||||
}
|
||||
dbm.Status.Processed = uint64(processedAt)
|
||||
if err := client.UpdateDbMessage(dbm, dbFile, dbId, password); err != nil {
|
||||
return fmt.Errorf("MarkMessageProcessed: UpdateDbMessage: %w", err)
|
||||
}
|
||||
|
||||
peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peerUid)
|
||||
if peer == nil || !peer.SendProcessingAck || dbm.Status.Uuid == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
identity := client.GetConfig().GetIdentity()
|
||||
storagePath := filepath.Join(client.GetConfig().StoragePath, identity.Uuid)
|
||||
return sendProcessingAck(storagePath, peer, dbm.Status.Uuid, processedAt)
|
||||
}
|
||||
|
||||
// sendProcessingAck builds a processing acknowledgment for messageUuid and
|
||||
// enqueues it for sending to the peer's contact pull servers.
|
||||
func sendProcessingAck(storagePath string, peer *client.Peer, messageUuid string, processedAt int64) error {
|
||||
packedMsg, _, err := BuildProcessedMessage(messageUuid, peer.Uid, processedAt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("sendProcessingAck: BuildProcessedMessage: %w", err)
|
||||
}
|
||||
|
||||
data, err := proto.Marshal(packedMsg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("sendProcessingAck: proto.Marshal: %w", err)
|
||||
}
|
||||
|
||||
outboxDir := filepath.Join(storagePath, "outbox")
|
||||
if err := os.MkdirAll(outboxDir, 0700); err != nil {
|
||||
return fmt.Errorf("sendProcessingAck: MkdirAll: %w", err)
|
||||
}
|
||||
|
||||
outboxFile := filepath.Join(outboxDir, "ack_"+uuid.New().String())
|
||||
if err := os.WriteFile(outboxFile, data, 0600); err != nil {
|
||||
return fmt.Errorf("sendProcessingAck: WriteFile: %w", err)
|
||||
}
|
||||
|
||||
var servers []client.Server
|
||||
for _, srvUid := range peer.ContactPullServers {
|
||||
srv, loadErr := client.GetConfig().GetIdentity().MessageServers.LoadServer(srvUid)
|
||||
if loadErr == nil && srv != nil {
|
||||
servers = append(servers, *srv)
|
||||
}
|
||||
}
|
||||
if len(servers) == 0 {
|
||||
os.Remove(outboxFile)
|
||||
return errors.New("sendProcessingAck: no contact servers found")
|
||||
}
|
||||
|
||||
return client.PushSendJob(storagePath, &client.SendJob{
|
||||
Queue: peer.Uid,
|
||||
File: outboxFile,
|
||||
Servers: servers,
|
||||
})
|
||||
}
|
||||
|
||||
// ProcessSentMessages scans every send queue under storagePath/queues/, updates
|
||||
// the message storage entry with server delivery info for each sent job, then
|
||||
// removes the job from the queue. Returns the number of messages updated.
|
||||
|
||||
Reference in New Issue
Block a user