Files
meowlib/client/helpers/messageHelper.go
ycc f6531e344e
Some checks failed
continuous-integration/drone/push Build is failing
MarkMessageProcessed added
2026-03-05 21:33:03 +01:00

289 lines
10 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package helpers
import (
"errors"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"forge.redroom.link/yves/meowlib"
"forge.redroom.link/yves/meowlib/client"
"github.com/google/uuid"
"google.golang.org/protobuf/proto"
)
func PackMessageForServer(packedMsg *meowlib.PackedUserMessage, srvuid string) ([]byte, string, error) {
// Get the message server
srv, err := client.GetConfig().GetIdentity().MessageServers.LoadServer(srvuid)
if err != nil {
return nil, "PackMessageForServer : LoadServer", err
}
// Fetch and persist the server public key if it was never stored
// (servers added via invitation finalization only have a UserKp, no PublicKey)
if srv.PublicKey == "" {
srvdata, err := meowlib.HttpGetId(srv.Url)
if err != nil {
return nil, "PackMessageForServer : HttpGetId", err
}
srv.PublicKey = srvdata["publicKey"]
client.GetConfig().GetIdentity().MessageServers.StoreServer(srv)
}
// Creating Server message for transporting the user message
toServerMessage := srv.BuildToServerMessageFromUserMessage(packedMsg)
data, err := srv.ProcessOutboundMessage(toServerMessage)
if err != nil {
return nil, "PackMessageForServer : ProcessOutboundMessage", err
}
return data, "", nil
}
func CreateStorePackUserMessageForServer(message string, srvuid string, peer_uid string, replyToUid string, filelist []string) ([]byte, string, error) {
usermessage, _, _, errtxt, err := CreateAndStoreUserMessage(message, peer_uid, replyToUid, filelist)
if err != nil {
return nil, errtxt, err
}
return PackMessageForServer(usermessage, srvuid)
}
// CreateAndStoreUserMessage creates, signs, and stores an outbound message for
// peer_uid. It returns the packed (encrypted) form ready for server transport,
// the peer DB file UUID (dbFile), the SQLite row ID (dbId), an error context
// string, and any error.
func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid string, filelist []string) (*meowlib.PackedUserMessage, string, int64, string, error) {
peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid)
// Creating User message
usermessage, err := peer.BuildSimpleUserMessage([]byte(message))
if err != nil {
return nil, "", 0, "PrepareServerMessage : BuildSimpleUserMessage", err
}
for _, file := range filelist {
err = usermessage.AddFile(file, client.GetConfig().Chunksize)
if err != nil {
return nil, "", 0, "PrepareServerMessage : AddFile", err
}
}
usermessage.Status.Sent = uint64(time.Now().UTC().Unix())
usermessage.Status.ReplyToUuid = replyToUid
// Store message
err = peer.StoreMessage(usermessage, nil)
if err != nil {
return nil, "", 0, "messageBuildPostprocess : StoreMessage", err
}
dbFile := peer.LastMessage.Dbfile
dbId := peer.LastMessage.Dbid
// Prepare cyphered + packed user message
packedMsg, err := peer.ProcessOutboundUserMessage(usermessage)
if err != nil {
return nil, "", 0, "messageBuildPostprocess : ProcessOutboundUserMessage", err
}
// Persist peer to save updated DR state (DrStateJson)
if peer.DrRootKey != "" {
if storeErr := client.GetConfig().GetIdentity().Peers.StorePeer(peer); storeErr != nil {
logger.Warn().Err(storeErr).Str("peer", peer.Uid).Msg("messageBuildPostprocess: StorePeer (DR state)")
}
}
return packedMsg, dbFile, dbId, "", nil
}
func BuildReceivedMessage(messageUid string, peer_uid string, received int64) (*meowlib.PackedUserMessage, string, error) {
peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid)
// Creating User message
usermessage, err := peer.BuildSimpleUserMessage(nil)
if err != nil {
return nil, "BuildReceivedMessage : BuildSimpleUserMessage", err
}
usermessage.Status.Uuid = messageUid
usermessage.Status.Received = uint64(received)
// Prepare cyphered + packed user message
packedMsg, err := peer.ProcessOutboundUserMessage(usermessage)
if err != nil {
return nil, "BuildReceivedMessage : ProcessOutboundUserMessage", err
}
// Persist peer to save updated DR state (DrStateJson)
if peer.DrRootKey != "" {
client.GetConfig().GetIdentity().Peers.StorePeer(peer)
}
return packedMsg, "", nil
}
func BuildProcessedMessage(messageUid string, peer_uid string, processed int64) (*meowlib.PackedUserMessage, string, error) {
peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid)
// Creating User message
usermessage, err := peer.BuildSimpleUserMessage(nil)
if err != nil {
return nil, "BuildProcessedMessage : BuildSimpleUserMessage", err
}
usermessage.Status.Uuid = messageUid
usermessage.Status.Processed = uint64(processed)
// Prepare cyphered + packed user message
packedMsg, err := peer.ProcessOutboundUserMessage(usermessage)
if err != nil {
return nil, "BuildProcessedMessage : ProcessOutboundUserMessage", err
}
// Persist peer to save updated DR state (DrStateJson)
if peer.DrRootKey != "" {
client.GetConfig().GetIdentity().Peers.StorePeer(peer)
}
return packedMsg, "", nil
}
func ReadAckMessageResponse() {
//! update the status in message store
}
// MarkMessageProcessed stamps the stored message with a processed timestamp of
// now(), persists the updated record, and — if the peer has SendProcessingAck
// enabled and the message carries a UUID — enqueues a processed acknowledgment
// to the peer's contact pull servers.
func MarkMessageProcessed(peerUid string, dbFile string, dbId int64) error {
password, _ := client.GetConfig().GetMemPass()
processedAt := time.Now().UTC().Unix()
dbm, err := client.GetDbMessage(dbFile, dbId, password)
if err != nil {
return fmt.Errorf("MarkMessageProcessed: GetDbMessage: %w", err)
}
if dbm.Status == nil {
dbm.Status = &meowlib.ConversationStatus{}
}
dbm.Status.Processed = uint64(processedAt)
if err := client.UpdateDbMessage(dbm, dbFile, dbId, password); err != nil {
return fmt.Errorf("MarkMessageProcessed: UpdateDbMessage: %w", err)
}
peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peerUid)
if peer == nil || !peer.SendProcessingAck || dbm.Status.Uuid == "" {
return nil
}
identity := client.GetConfig().GetIdentity()
storagePath := filepath.Join(client.GetConfig().StoragePath, identity.Uuid)
return sendProcessingAck(storagePath, peer, dbm.Status.Uuid, processedAt)
}
// sendProcessingAck builds a processing acknowledgment for messageUuid and
// enqueues it for sending to the peer's contact pull servers.
func sendProcessingAck(storagePath string, peer *client.Peer, messageUuid string, processedAt int64) error {
packedMsg, _, err := BuildProcessedMessage(messageUuid, peer.Uid, processedAt)
if err != nil {
return fmt.Errorf("sendProcessingAck: BuildProcessedMessage: %w", err)
}
data, err := proto.Marshal(packedMsg)
if err != nil {
return fmt.Errorf("sendProcessingAck: proto.Marshal: %w", err)
}
outboxDir := filepath.Join(storagePath, "outbox")
if err := os.MkdirAll(outboxDir, 0700); err != nil {
return fmt.Errorf("sendProcessingAck: MkdirAll: %w", err)
}
outboxFile := filepath.Join(outboxDir, "ack_"+uuid.New().String())
if err := os.WriteFile(outboxFile, data, 0600); err != nil {
return fmt.Errorf("sendProcessingAck: WriteFile: %w", err)
}
var servers []client.Server
for _, srvUid := range peer.ContactPullServers {
srv, loadErr := client.GetConfig().GetIdentity().MessageServers.LoadServer(srvUid)
if loadErr == nil && srv != nil {
servers = append(servers, *srv)
}
}
if len(servers) == 0 {
os.Remove(outboxFile)
return errors.New("sendProcessingAck: no contact servers found")
}
return client.PushSendJob(storagePath, &client.SendJob{
Queue: peer.Uid,
File: outboxFile,
Servers: servers,
})
}
// ProcessSentMessages scans every send queue under storagePath/queues/, updates
// the message storage entry with server delivery info for each sent job, then
// removes the job from the queue. Returns the number of messages updated.
//
// The message DB location is recovered from the job's File basename, which must
// follow the naming convention produced by CreateUserMessageAndSendJob:
//
// outbox/{dbFile}_{dbId}
func ProcessSentMessages(storagePath string) int {
password, _ := client.GetConfig().GetMemPass()
queueDir := filepath.Join(storagePath, "queues")
entries, err := os.ReadDir(queueDir)
if err != nil {
logger.Warn().Err(err).Str("dir", queueDir).Msg("ProcessSentMessages: ReadDir")
return 0
}
updated := 0
for _, entry := range entries {
if entry.IsDir() {
continue
}
queue := entry.Name()
jobs, err := client.GetSentJobs(storagePath, queue)
if err != nil {
logger.Error().Err(err).Str("queue", queue).Msg("ProcessSentMessages: GetSentJobs")
continue
}
for _, job := range jobs {
if job.SuccessfulServer == nil || job.SentAt == nil {
// No delivery info discard the job so it doesn't block the queue
if err := client.DeleteSendJob(storagePath, queue, job.ID); err != nil {
logger.Error().Err(err).Int64("id", job.ID).Msg("ProcessSentMessages: DeleteSendJob (incomplete)")
}
continue
}
// Recover dbFile and dbId from the outbox filename: {dbFile}_{dbId}
base := filepath.Base(job.File)
sep := strings.LastIndex(base, "_")
if sep <= 0 {
logger.Error().Int64("id", job.ID).Str("file", job.File).
Msg("ProcessSentMessages: cannot parse dbFile/dbId from job filename — use CreateUserMessageAndSendJob to build jobs")
continue
}
dbFile := base[:sep]
dbId, parseErr := strconv.ParseInt(base[sep+1:], 10, 64)
if parseErr != nil || dbFile == "" || dbId == 0 {
logger.Error().Int64("id", job.ID).Str("file", job.File).
Msg("ProcessSentMessages: invalid dbFile/dbId in job filename")
continue
}
serverUid := job.Servers[*job.SuccessfulServer].GetUid()
receiveTime := uint64(job.SentAt.Unix())
if err := client.SetMessageServerDelivery(dbFile, dbId, serverUid, receiveTime, password); err != nil {
logger.Error().Err(err).Str("queue", queue).
Str("dbFile", dbFile).Int64("dbId", dbId).
Msg("ProcessSentMessages: SetMessageServerDelivery")
continue
}
if err := client.DeleteSendJob(storagePath, queue, job.ID); err != nil {
logger.Error().Err(err).Int64("id", job.ID).Msg("ProcessSentMessages: DeleteSendJob")
}
updated++
}
}
return updated
}