Files
meowlib/client/helpers/messageHelper.go
ycc b722a916a9
Some checks failed
continuous-integration/drone/push Build is failing
change impl of last fix
2026-02-28 21:22:15 +01:00

178 lines
6.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package helpers
import (
"errors"
"os"
"path/filepath"
"time"
"forge.redroom.link/yves/meowlib"
"forge.redroom.link/yves/meowlib/client"
)
func PackMessageForServer(packedMsg *meowlib.PackedUserMessage, srvuid string) ([]byte, string, error) {
// Get the message server
srv, err := client.GetConfig().GetIdentity().MessageServers.LoadServer(srvuid)
if err != nil {
return nil, "messageBuildPostprocess : LoadServer", err
}
// Creating Server message for transporting the user message
toServerMessage := srv.BuildToServerMessageFromUserMessage(packedMsg)
data, err := srv.ProcessOutboundMessage(toServerMessage)
if err != nil {
return nil, "messageBuildPostprocess : ProcessOutboundMessage", err
}
return data, "", nil
}
func CreateStorePackUserMessageForServer(message string, srvuid string, peer_uid string, replyToUid string, filelist []string) ([]byte, string, error) {
usermessage, errtxt, err := CreateAndStoreUserMessage(message, peer_uid, replyToUid, filelist)
if err != nil {
return nil, errtxt, err
}
return PackMessageForServer(usermessage, srvuid)
}
func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid string, filelist []string) (*meowlib.PackedUserMessage, string, error) {
peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid)
// Creating User message
usermessage, err := peer.BuildSimpleUserMessage([]byte(message))
if err != nil {
return nil, "PrepareServerMessage : BuildSimpleUserMessage", err
}
for _, file := range filelist {
err = usermessage.AddFile(file, client.GetConfig().Chunksize)
if err != nil {
return nil, "PrepareServerMessage : AddFile", err
}
}
usermessage.Status.Sent = uint64(time.Now().UTC().Unix())
usermessage.Status.AnswerToUuid = replyToUid
// Store message
err = peer.StoreMessage(usermessage, nil)
if err != nil {
return nil, "messageBuildPostprocess : StoreMessage", err
}
// Prepare cyphered + packed user message
packedMsg, err := peer.ProcessOutboundUserMessage(usermessage)
if err != nil {
return nil, "messageBuildPostprocess : ProcessOutboundUserMessage", err
}
return packedMsg, "", nil
}
func BuildAckMessage(messageUid string, srvuid string, peer_uid string, received int64, processed int64) ([]byte, string, error) {
peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid)
srv, err := client.GetConfig().GetIdentity().MessageServers.LoadServer(srvuid)
if err != nil {
return nil, "PrepareServerMessage : LoadServer", err
}
// Creating User message
usermessage, err := peer.BuildSimpleUserMessage(nil)
if err != nil {
return nil, "PrepareServerMessage : BuildSimpleUserMessage", err
}
usermessage.Status.Uuid = messageUid
usermessage.Status.Received = uint64(received)
usermessage.Status.Processed = uint64(processed)
// Prepare cyphered + packed user message
packedMsg, err := peer.ProcessOutboundUserMessage(usermessage)
if err != nil {
return nil, "PrepareServerMessage : ProcessOutboundUserMessage", err
}
// Creating Server message for transporting the user message
toServerMessage := srv.BuildToServerMessageFromUserMessage(packedMsg)
data, err := srv.ProcessOutboundMessage(toServerMessage)
if err != nil {
return nil, "PrepareServerMessage : ProcessOutboundMessage", err
}
return data, "", nil
}
func ReadAckMessageResponse() {
//! update the status in message store
}
// GetPeerLastMessageDbInfo returns the DB location of the most recently stored
// message for the given peer. Call this immediately after CreateAndStoreUserMessage
// to get the values needed for SendJob.MessageDbFile and SendJob.MessageDbId.
func GetPeerLastMessageDbInfo(peer_uid string) (dbFile string, dbId int64, errTxt string, err error) {
peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid)
if peer == nil {
return "", 0, "GetPeerLastMessageDbInfo: peer not found", errors.New("peer not found")
}
if peer.LastMessage == nil {
return "", 0, "GetPeerLastMessageDbInfo: no message stored yet", errors.New("no message stored yet for this peer")
}
return peer.LastMessage.Dbfile, peer.LastMessage.Dbid, "", nil
}
// ProcessSentMessages scans every send queue under storagePath/queues/, updates
// the message storage entry with server delivery info for each sent job, then
// removes the job from the queue. Returns the number of messages updated.
//
// Each SendJob must have MessageDbFile and MessageDbId set (populated via
// GetPeerLastMessageDbInfo right after the message is stored).
func ProcessSentMessages(storagePath string) int {
password, _ := client.GetConfig().GetMemPass()
queueDir := filepath.Join(storagePath, "queues")
entries, err := os.ReadDir(queueDir)
if err != nil {
logger.Warn().Err(err).Str("dir", queueDir).Msg("ProcessSentMessages: ReadDir")
return 0
}
updated := 0
for _, entry := range entries {
if entry.IsDir() {
continue
}
queue := entry.Name()
jobs, err := client.GetSentJobs(storagePath, queue)
if err != nil {
logger.Error().Err(err).Str("queue", queue).Msg("ProcessSentMessages: GetSentJobs")
continue
}
for _, job := range jobs {
if job.SuccessfulServer == nil || job.SentAt == nil {
// No delivery info discard the job so it doesn't block the queue
if err := client.DeleteSendJob(storagePath, queue, job.ID); err != nil {
logger.Error().Err(err).Int64("id", job.ID).Msg("ProcessSentMessages: DeleteSendJob (incomplete)")
}
continue
}
if job.MessageDbFile == "" || job.MessageDbId == 0 {
logger.Error().Int64("id", job.ID).Str("queue", queue).
Msg("ProcessSentMessages: job missing MessageDbFile/MessageDbId — set them via GetPeerLastMessageDbInfo when building the SendJob")
continue
}
serverUid := job.Servers[*job.SuccessfulServer].GetUid()
receiveTime := uint64(job.SentAt.Unix())
if err := client.SetMessageServerDelivery(job.MessageDbFile, job.MessageDbId, serverUid, receiveTime, password); err != nil {
logger.Error().Err(err).Str("queue", queue).
Str("dbFile", job.MessageDbFile).Int64("dbId", job.MessageDbId).
Msg("ProcessSentMessages: SetMessageServerDelivery")
continue
}
if err := client.DeleteSendJob(storagePath, queue, job.ID); err != nil {
logger.Error().Err(err).Int64("id", job.ID).Msg("ProcessSentMessages: DeleteSendJob")
}
updated++
}
}
return updated
}