This commit is contained in:
242
client/helpers/bgPollHelper.go
Normal file
242
client/helpers/bgPollHelper.go
Normal file
@@ -0,0 +1,242 @@
|
||||
package helpers
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"forge.redroom.link/yves/meowlib"
|
||||
"forge.redroom.link/yves/meowlib/client"
|
||||
"github.com/google/uuid"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
type ReceivedMessage struct {
|
||||
Text string
|
||||
files []string
|
||||
Server string
|
||||
Sent uint64
|
||||
Received uint64
|
||||
LocalUuid string
|
||||
LocalSequence uint64
|
||||
AppData string
|
||||
Location meowlib.Location
|
||||
}
|
||||
|
||||
// PollServer checks for messages on a single server
|
||||
func PollServer(storage_path string, job *client.RequestsJob, timeout int, longPoll bool) (int, string, error) {
|
||||
|
||||
count := 0
|
||||
|
||||
// if folder does not exist, create it
|
||||
if _, err := os.Stat(filepath.Join(storage_path, "inbox")); os.IsNotExist(err) {
|
||||
err := os.MkdirAll(filepath.Join(storage_path, "inbox"), 0700)
|
||||
if err != nil {
|
||||
return -1, "CheckMessages: MkdirAll", err
|
||||
}
|
||||
}
|
||||
//convert server to a server object
|
||||
|
||||
var crl []*meowlib.ConversationRequest
|
||||
// build conversation requests
|
||||
if job.LookupKeys != nil {
|
||||
for _, key := range job.LookupKeys {
|
||||
var cr meowlib.ConversationRequest
|
||||
cr.LookupKey = key.Public
|
||||
cr.SendTimestamp = time.Now().UTC().Unix()
|
||||
// todo sign it
|
||||
//cr.LookupSignature =
|
||||
crl = append(crl, &cr)
|
||||
}
|
||||
// get server public key
|
||||
if job.Server.PublicKey == "" {
|
||||
key, err := meowlib.HttpGetId(job.Server.Url)
|
||||
if err != nil {
|
||||
return -1, "CheckMessages: HttpGetId", err
|
||||
}
|
||||
job.Server.PublicKey = key["publicKey"]
|
||||
}
|
||||
// build server message
|
||||
var toSrv meowlib.ToServerMessage
|
||||
toSrv.PullRequest = crl
|
||||
toSrv.From = job.Server.UserKp.Public
|
||||
|
||||
if longPoll {
|
||||
toSrv.Timeout = int64(timeout)
|
||||
}
|
||||
|
||||
data, err := job.Server.ProcessOutboundMessage(&toSrv)
|
||||
if err != nil {
|
||||
return -1, "CheckMessages: ProcessOutboundMessage", err
|
||||
}
|
||||
|
||||
response, err := meowlib.HttpPostMessage(job.Server.Url, data, timeout)
|
||||
if err != nil {
|
||||
return -1, "CheckMessages: httpPostMessage", err
|
||||
}
|
||||
fs_msg, err := job.Server.ProcessInboundServerResponse(response)
|
||||
if err != nil {
|
||||
return -1, "CheckMessages: ProcessInboundServerResponse", err
|
||||
}
|
||||
if len(fs_msg.Chat) > 0 || (fs_msg.Invitation != nil && fs_msg.Invitation.Step == 3) {
|
||||
// chat or invitation answer => save the server message
|
||||
|
||||
out, err := proto.Marshal(fs_msg)
|
||||
if err != nil {
|
||||
return -1, "CheckMessages: protobuf marshal", err
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(storage_path, "inbox", strconv.FormatInt(time.Now().UTC().UnixNano(), 10)), out, 0644); err != nil {
|
||||
return -1, "CheckMessages: WriteFile", err
|
||||
}
|
||||
}
|
||||
count = len(fs_msg.Chat)
|
||||
} else {
|
||||
// manage non uszer messages like devices or server
|
||||
}
|
||||
|
||||
return count, "", nil
|
||||
}
|
||||
|
||||
// SaveCheckJobs
|
||||
func SaveCheckJobs() (string, error) {
|
||||
me := client.GetConfig().GetIdentity()
|
||||
err := me.SaveBackgroundJob()
|
||||
if err != nil {
|
||||
|
||||
return "CheckMessages: json.Marshal", err
|
||||
}
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// ConsumeInboxFile
|
||||
func ConsumeInboxFile(messageFilename string) ([]string, []string, string, error) {
|
||||
|
||||
messagesOverview := []string{}
|
||||
filenames := []string{}
|
||||
identity := client.GetConfig().GetIdentity()
|
||||
// read message file
|
||||
msg, err := os.ReadFile(messageFilename)
|
||||
if err != nil {
|
||||
return nil, nil, "ReadMessage: ReadFile", err
|
||||
}
|
||||
// protobuf unmarshal message
|
||||
var fromServerMessage meowlib.FromServerMessage
|
||||
err = proto.Unmarshal(msg, &fromServerMessage)
|
||||
if err != nil {
|
||||
return nil, nil, "ReadMessage: Unmarshal FromServerMessage", err
|
||||
}
|
||||
// check if invitation answer
|
||||
if fromServerMessage.Invitation != nil {
|
||||
invitationGetAnswerReadResponse(fromServerMessage.Invitation)
|
||||
}
|
||||
// Chat messages
|
||||
if len(fromServerMessage.Chat) > 0 {
|
||||
for _, packedUserMessage := range fromServerMessage.Chat {
|
||||
|
||||
// find the peer with that lookup key
|
||||
peer := identity.Peers.GetFromMyLookupKey(packedUserMessage.Destination)
|
||||
if peer == nil {
|
||||
return nil, nil, "ReadMessage: GetFromMyLookupKey", errors.New("no visible peer for that message")
|
||||
}
|
||||
// Unpack the message
|
||||
usermsg, err := peer.ProcessInboundUserMessage(packedUserMessage.Payload, packedUserMessage.Signature)
|
||||
if err != nil {
|
||||
return nil, nil, "ReadMessage: ProcessInboundUserMessage", err
|
||||
}
|
||||
|
||||
//fmt.Println("From:", usermsg.From)
|
||||
//jsonUserMessage, _ := json.Marshal(usermsg)
|
||||
//fmt.Println(string(jsonUserMessage))
|
||||
//peer = client.GetConfig().GetIdentity().Peers.GetFromPublicKey(usermsg.From)
|
||||
|
||||
// detach files
|
||||
if usermsg.Files != nil {
|
||||
// create files folder
|
||||
if _, err := os.Stat(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files")); os.IsNotExist(err) {
|
||||
err = os.MkdirAll(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files"), 0700)
|
||||
if err != nil {
|
||||
return nil, nil, "ReadMessage: MkdirAll", err
|
||||
}
|
||||
}
|
||||
for _, file := range usermsg.Files {
|
||||
filename := uuid.New().String() + "_" + file.Filename
|
||||
filenames = append(filenames, peer.Name+" sent: "+filename)
|
||||
// detach file
|
||||
os.WriteFile(filepath.Join(client.GetConfig().StoragePath, identity.Uuid, "files", filename), file.Data, 0600)
|
||||
}
|
||||
//? result["invitation finalized"] = peer.Name
|
||||
}
|
||||
// user message
|
||||
|
||||
messagesOverview = append(messagesOverview, peer.Name+" > "+string(usermsg.Data))
|
||||
// add message to storage
|
||||
err = peer.StoreMessage(usermsg, filenames)
|
||||
if err != nil {
|
||||
return nil, nil, "ReadMessage: StoreMessage", err
|
||||
}
|
||||
filenames = []string{}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
err = os.Remove(messageFilename)
|
||||
if err != nil {
|
||||
return nil, nil, "ReadMessage: Remove", err
|
||||
}
|
||||
|
||||
// list of messages & detached files
|
||||
return messagesOverview, filenames, "", nil
|
||||
}
|
||||
|
||||
// LongPollAllSerevrJobs checks for messages on a all servers defived in job file
|
||||
func LongPollAllServerJobs(storage_path string, jobs []client.RequestsJob, timeout int, longPoll bool) (int, string, error) {
|
||||
|
||||
// Channel to collect results
|
||||
resultChan := make(chan int, len(jobs))
|
||||
errChan := make(chan error, len(jobs))
|
||||
|
||||
// WaitGroup to sync goroutines
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Loop through each job (server)
|
||||
for _, job := range jobs {
|
||||
wg.Add(1)
|
||||
|
||||
go func(job client.RequestsJob) {
|
||||
defer wg.Done()
|
||||
|
||||
// Long-polling call to the server
|
||||
cnt, _, err := PollServer(storage_path, &job, timeout, true)
|
||||
|
||||
if err == nil && cnt > 0 {
|
||||
select {
|
||||
case resultChan <- cnt:
|
||||
default:
|
||||
}
|
||||
|
||||
// Close the error channel to notify all goroutines
|
||||
close(errChan)
|
||||
|
||||
}
|
||||
}(job)
|
||||
}
|
||||
|
||||
// Close the result channel when all workers are done
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(resultChan)
|
||||
}()
|
||||
|
||||
// Wait for the first message or all timeouts
|
||||
select {
|
||||
case cnt := <-resultChan:
|
||||
return cnt, "", nil
|
||||
case <-errChan:
|
||||
// If one fails and exitOnMessage is true
|
||||
return 0, "", nil
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user