This commit is contained in:
@@ -21,33 +21,36 @@ const defaultPostTimeout = 200
|
|||||||
// It creates and stores the user message, serialises the packed form to
|
// It creates and stores the user message, serialises the packed form to
|
||||||
// storagePath/outbox/{dbFile}_{dbId}, and enqueues a SendJob in
|
// storagePath/outbox/{dbFile}_{dbId}, and enqueues a SendJob in
|
||||||
// storagePath/queues/{peerUid}.
|
// storagePath/queues/{peerUid}.
|
||||||
func CreateUserMessageAndSendJob(storagePath, message, peerUid, replyToUid string, filelist []string, servers []client.Server, timeout int) error {
|
func CreateUserMessageAndSendJob(storagePath, message, peerUid, replyToUid string, filelist []string, servers []client.Server, timeout int) (string, error) {
|
||||||
packedMsg, dbFile, dbId, errTxt, err := CreateAndStoreUserMessage(message, peerUid, replyToUid, filelist)
|
packedMsg, dbFile, dbId, msgUuid, errTxt, err := CreateAndStoreUserMessage(message, peerUid, replyToUid, filelist)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s: %w", errTxt, err)
|
return "", fmt.Errorf("%s: %w", errTxt, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := proto.Marshal(packedMsg)
|
data, err := proto.Marshal(packedMsg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("CreateUserMessageAndSendJob: proto.Marshal: %w", err)
|
return "", fmt.Errorf("CreateUserMessageAndSendJob: proto.Marshal: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
outboxDir := filepath.Join(storagePath, "outbox")
|
outboxDir := filepath.Join(storagePath, "outbox")
|
||||||
if err := os.MkdirAll(outboxDir, 0700); err != nil {
|
if err := os.MkdirAll(outboxDir, 0700); err != nil {
|
||||||
return fmt.Errorf("CreateUserMessageAndSendJob: MkdirAll: %w", err)
|
return "", fmt.Errorf("CreateUserMessageAndSendJob: MkdirAll: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
outboxFile := filepath.Join(outboxDir, fmt.Sprintf("%s_%d", dbFile, dbId))
|
outboxFile := filepath.Join(outboxDir, fmt.Sprintf("%s_%d", dbFile, dbId))
|
||||||
if err := os.WriteFile(outboxFile, data, 0600); err != nil {
|
if err := os.WriteFile(outboxFile, data, 0600); err != nil {
|
||||||
return fmt.Errorf("CreateUserMessageAndSendJob: WriteFile: %w", err)
|
return "", fmt.Errorf("CreateUserMessageAndSendJob: WriteFile: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return client.PushSendJob(storagePath, &client.SendJob{
|
if err := client.PushSendJob(storagePath, &client.SendJob{
|
||||||
Queue: peerUid,
|
Queue: peerUid,
|
||||||
File: outboxFile,
|
File: outboxFile,
|
||||||
Servers: servers,
|
Servers: servers,
|
||||||
Timeout: timeout,
|
Timeout: timeout,
|
||||||
})
|
}); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return msgUuid, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProcessSendQueues discovers every queue DB file under storagePath/queues/
|
// ProcessSendQueues discovers every queue DB file under storagePath/queues/
|
||||||
|
|||||||
@@ -202,7 +202,7 @@ func TestCreateUserMessageAndSendJob(t *testing.T) {
|
|||||||
|
|
||||||
srv := newTestServer(t, "http://test-srv.example")
|
srv := newTestServer(t, "http://test-srv.example")
|
||||||
|
|
||||||
err := CreateUserMessageAndSendJob(
|
msgUuid, err := CreateUserMessageAndSendJob(
|
||||||
dir,
|
dir,
|
||||||
"hello from integration",
|
"hello from integration",
|
||||||
"peer-create-send",
|
"peer-create-send",
|
||||||
@@ -212,6 +212,7 @@ func TestCreateUserMessageAndSendJob(t *testing.T) {
|
|||||||
60,
|
60,
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
assert.NotEmpty(t, msgUuid, "returned UUID must not be empty")
|
||||||
|
|
||||||
// A pending job must be in the queue.
|
// A pending job must be in the queue.
|
||||||
job, _, err := client.PeekSendJob(dir, "peer-create-send")
|
job, _, err := client.PeekSendJob(dir, "peer-create-send")
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ func PackMessageForServer(packedMsg *meowlib.PackedUserMessage, srvuid string) (
|
|||||||
}
|
}
|
||||||
|
|
||||||
func CreateStorePackUserMessageForServer(message string, srvuid string, peer_uid string, replyToUid string, filelist []string) ([]byte, string, error) {
|
func CreateStorePackUserMessageForServer(message string, srvuid string, peer_uid string, replyToUid string, filelist []string) ([]byte, string, error) {
|
||||||
usermessage, _, _, errtxt, err := CreateAndStoreUserMessage(message, peer_uid, replyToUid, filelist)
|
usermessage, _, _, _, errtxt, err := CreateAndStoreUserMessage(message, peer_uid, replyToUid, filelist)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errtxt, err
|
return nil, errtxt, err
|
||||||
}
|
}
|
||||||
@@ -51,20 +51,20 @@ func CreateStorePackUserMessageForServer(message string, srvuid string, peer_uid
|
|||||||
|
|
||||||
// CreateAndStoreUserMessage creates, signs, and stores an outbound message for
|
// CreateAndStoreUserMessage creates, signs, and stores an outbound message for
|
||||||
// peer_uid. It returns the packed (encrypted) form ready for server transport,
|
// peer_uid. It returns the packed (encrypted) form ready for server transport,
|
||||||
// the peer DB file UUID (dbFile), the SQLite row ID (dbId), an error context
|
// the peer DB file UUID (dbFile), the SQLite row ID (dbId), the message UUID
|
||||||
// string, and any error.
|
// (conversation_status uuid), an error context string, and any error.
|
||||||
func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid string, filelist []string) (*meowlib.PackedUserMessage, string, int64, string, error) {
|
func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid string, filelist []string) (*meowlib.PackedUserMessage, string, int64, string, string, error) {
|
||||||
peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid)
|
peer := client.GetConfig().GetIdentity().Peers.GetFromUid(peer_uid)
|
||||||
|
|
||||||
// Creating User message
|
// Creating User message
|
||||||
usermessage, err := peer.BuildSimpleUserMessage([]byte(message))
|
usermessage, err := peer.BuildSimpleUserMessage([]byte(message))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", 0, "PrepareServerMessage : BuildSimpleUserMessage", err
|
return nil, "", 0, "", "PrepareServerMessage : BuildSimpleUserMessage", err
|
||||||
}
|
}
|
||||||
for _, file := range filelist {
|
for _, file := range filelist {
|
||||||
err = usermessage.AddFile(file, client.GetConfig().Chunksize)
|
err = usermessage.AddFile(file, client.GetConfig().Chunksize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", 0, "PrepareServerMessage : AddFile", err
|
return nil, "", 0, "", "PrepareServerMessage : AddFile", err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
usermessage.Status.Sent = uint64(time.Now().UTC().Unix())
|
usermessage.Status.Sent = uint64(time.Now().UTC().Unix())
|
||||||
@@ -73,16 +73,17 @@ func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid strin
|
|||||||
// Store message
|
// Store message
|
||||||
err = peer.StoreMessage(usermessage, nil)
|
err = peer.StoreMessage(usermessage, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", 0, "messageBuildPostprocess : StoreMessage", err
|
return nil, "", 0, "", "messageBuildPostprocess : StoreMessage", err
|
||||||
}
|
}
|
||||||
|
|
||||||
dbFile := peer.LastMessage.Dbfile
|
dbFile := peer.LastMessage.Dbfile
|
||||||
dbId := peer.LastMessage.Dbid
|
dbId := peer.LastMessage.Dbid
|
||||||
|
msgUuid := usermessage.Status.Uuid
|
||||||
|
|
||||||
// Prepare cyphered + packed user message
|
// Prepare cyphered + packed user message
|
||||||
packedMsg, err := peer.ProcessOutboundUserMessage(usermessage)
|
packedMsg, err := peer.ProcessOutboundUserMessage(usermessage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", 0, "messageBuildPostprocess : ProcessOutboundUserMessage", err
|
return nil, "", 0, "", "messageBuildPostprocess : ProcessOutboundUserMessage", err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Persist peer to save updated DR state (DrStateJson)
|
// Persist peer to save updated DR state (DrStateJson)
|
||||||
@@ -92,7 +93,7 @@ func CreateAndStoreUserMessage(message string, peer_uid string, replyToUid strin
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return packedMsg, dbFile, dbId, "", nil
|
return packedMsg, dbFile, dbId, msgUuid, "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func BuildReceivedMessage(messageUid string, peer_uid string, received int64) (*meowlib.PackedUserMessage, string, error) {
|
func BuildReceivedMessage(messageUid string, peer_uid string, received int64) (*meowlib.PackedUserMessage, string, error) {
|
||||||
|
|||||||
Reference in New Issue
Block a user