From 5aec7b3ad4f3b8fafe57e33908212f19a32ba5d6 Mon Sep 17 00:00:00 2001 From: ycc Date: Wed, 10 Apr 2024 11:18:42 +0200 Subject: [PATCH] refactor router --- server/router.go | 294 +++++++++++++++++++++++++---------------------- 1 file changed, 157 insertions(+), 137 deletions(-) diff --git a/server/router.go b/server/router.go index 5afc072..6066898 100644 --- a/server/router.go +++ b/server/router.go @@ -37,7 +37,7 @@ func NewRedisRouter(server *Identity, redisUrl string, password string, db int, } func (r *RedisRouter) Route(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { - var from_server meowlib.FromServerMessage + var from_server *meowlib.FromServerMessage // update messages counter err := r.Client.Incr("statistics:messages:total").Err() if err != nil { @@ -45,89 +45,25 @@ func (r *RedisRouter) Route(msg *meowlib.ToServerMessage) (*meowlib.FromServerMe } // user message => store if len(msg.Messages) > 0 { - // update messages counter - err := r.Client.Incr("statistics:messages:usermessages").Err() - if err != nil { - panic(err) - } - for _, usrmsg := range msg.Messages { - // serialize the message to store it as byte array into redis - out, err := proto.Marshal(usrmsg) - if err != nil { - return nil, err - } - r.Client.ZAdd(usrmsg.Destination, redis.Z{Score: float64(time.Now().Unix()), Member: out}) - } - from_server.UuidAck = msg.Uuid - } - // check for messages - if len(msg.PullRequest) > 0 { - //dataFound := false - // update messages counter - err := r.Client.Incr("statistics:messages:messagelookups").Err() - if err != nil { - panic(err) - } - for _, rq := range msg.PullRequest { - // get messages from redis - msgcnt, err := r.Client.ZCount(rq.LookupKey, "-inf", "+inf").Result() - if err != nil { - return nil, err - } - res, err := r.Client.ZPopMin(rq.LookupKey, msgcnt).Result() - if err != nil { - return nil, err - } - // iterate over messages - for _, redismsg := range res { - //println(redismsg.Score) - val := redismsg.Member - test := val.(string) - var usrmsg meowlib.PackedUserMessage - err := proto.Unmarshal([]byte(test), &usrmsg) - if err != nil { - return nil, err - } - // add server timestamp - usrmsg.ServerTimestamp = append(usrmsg.ServerTimestamp, int64(redismsg.Score)) - - from_server.Chat = append(from_server.Chat, &usrmsg) - } - // if no messages check for invitationanswer payload - if msgcnt == 0 { - // get invitation answer - var answer meowlib.Invitation - storedAnswer, _ := r.GetAnswerToInvitation(rq.LookupKey) - if storedAnswer != nil { - err := proto.Unmarshal(storedAnswer, &answer) - if err != nil { - from_server.Invitation.Payload = []byte("invitation answer corrupted") - } - from_server.Invitation = &answer - // exit loop if invitation found, cannot store several in a message - return &from_server, nil - } - // add invitation answer to the response - } - - } - } - // manage Matriochka - if msg.MatriochkaMessage != nil { - // update messages counter - err := r.Client.Incr("statistics:messages:matriochka").Err() - if err != nil { - panic(err) - } - out, err := proto.Marshal(msg) + from_server, err = r.storeMessage(msg) if err != nil { return nil, err } - r.Client.ZAdd("mtk", redis.Z{Score: float64(time.Now().Unix()), Member: out}) - if msg.MatriochkaMessage.LookupKey != "" { - //r.Client.ZAdd("trk:" + msg.MatriochkaMessage.Next.Uuid,{}) + } + // check for messages + if len(msg.PullRequest) > 0 { + from_server, err = r.checkForMessage(msg) + if err != nil { + return nil, err + } + + } + // manage Matriochka + if msg.MatriochkaMessage != nil { + from_server, err = r.handleMatriochka(msg) + if err != nil { + return nil, err } - from_server.UuidAck = msg.Uuid } // Server list exchange if len(msg.KnownServers) > 0 { @@ -135,64 +71,9 @@ func (r *RedisRouter) Route(msg *meowlib.ToServerMessage) (*meowlib.FromServerMe } // Through server invitation process if msg.Invitation != nil { - // update messages counter - err := r.Client.Incr("statistics:messages:invitation").Err() + from_server, err = r.handleInvitation(msg) if err != nil { - panic(err) - } - switch msg.Invitation.Step { - // create invitation => provide shortcode and expiry - case 1: - url, expiry := r.StoreInvitation(msg.Invitation.Payload, int(msg.Invitation.Timeout), msg.Invitation.Password, r.InvitationTimeout, int(msg.Invitation.ShortcodeLen)) - from_server.Invitation = &meowlib.Invitation{} - from_server.Invitation.Shortcode = url - from_server.Invitation.Expiry = expiry.UTC().Unix() - // get invitation => retrieve invitation from redis and send - case 2: - from_server.Invitation = &meowlib.Invitation{} - invitation, err := r.GetInvitation(msg.Invitation.Shortcode, msg.Invitation.Password) - - if err != nil { - if err.Error() == "auth failed" { - from_server.Invitation.Payload = []byte("authentication failure") - } else { - from_server.Invitation.Payload = []byte("invitation expired") - } - } else { - from_server.Invitation.Payload = invitation // protobuf invitation - } - - // accept invitation => store accepted invitation for initiator - case 3: - var usermsg meowlib.PackedUserMessage - err := proto.Unmarshal(msg.Invitation.Payload, &usermsg) - if err != nil { - return nil, err - } - data, err := proto.Marshal(msg.Invitation) - if err != nil { - return nil, err - } - expiry := r.StoreAnswerToInvitation(usermsg.Destination, int(msg.Invitation.Timeout), data, r.InvitationTimeout) - from_server.Invitation = &meowlib.Invitation{} - from_server.Invitation.Expiry = expiry.UTC().Unix() - - // DONE IN NORMAL MESSAGE FLOW - // get accepted invitation => send accepted invitation to initiator - /* case 4: - from_server.Invitation = &meowlib.Invitation{} - var answer meowlib.Invitation - storedAanswer, err := r.GetAnswerToInvitation(msg.Invitation.Uuid) - if err != nil { - from_server.Invitation.Payload = []byte("invitation answer not found") - } else { - err := proto.Unmarshal(storedAanswer, &answer) - if err != nil { - from_server.Invitation.Payload = []byte("invitation answer corrupted") - } - from_server.Invitation = &answer - } - */ + return nil, err } } /* @@ -208,5 +89,144 @@ func (r *RedisRouter) Route(msg *meowlib.ToServerMessage) (*meowlib.FromServerMe break } */ + return from_server, nil +} + +func (r *RedisRouter) storeMessage(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { + var from_server meowlib.FromServerMessage + // update messages counter + err := r.Client.Incr("statistics:messages:usermessages").Err() + if err != nil { + return nil, err + } + for _, usrmsg := range msg.Messages { + // serialize the message to store it as byte array into redis + out, err := proto.Marshal(usrmsg) + if err != nil { + return nil, err + } + r.Client.ZAdd(usrmsg.Destination, redis.Z{Score: float64(time.Now().Unix()), Member: out}) + } + from_server.UuidAck = msg.Uuid + return &from_server, nil +} + +func (r *RedisRouter) checkForMessage(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { + var from_server meowlib.FromServerMessage + //dataFound := false + // update messages counter + err := r.Client.Incr("statistics:messages:messagelookups").Err() + if err != nil { + return nil, err + } + for _, rq := range msg.PullRequest { + // get messages from redis + msgcnt, err := r.Client.ZCount(rq.LookupKey, "-inf", "+inf").Result() + if err != nil { + return nil, err + } + res, err := r.Client.ZPopMin(rq.LookupKey, msgcnt).Result() + if err != nil { + return nil, err + } + // iterate over messages + for _, redismsg := range res { + //println(redismsg.Score) + val := redismsg.Member + test := val.(string) + var usrmsg meowlib.PackedUserMessage + err := proto.Unmarshal([]byte(test), &usrmsg) + if err != nil { + return nil, err + } + // add server timestamp + usrmsg.ServerTimestamp = append(usrmsg.ServerTimestamp, int64(redismsg.Score)) + + from_server.Chat = append(from_server.Chat, &usrmsg) + } + // if no messages check for invitationanswer payload + if msgcnt == 0 { + // get invitation answer + var answer meowlib.Invitation + storedAnswer, _ := r.GetAnswerToInvitation(rq.LookupKey) + if storedAnswer != nil { + err := proto.Unmarshal(storedAnswer, &answer) + if err != nil { + from_server.Invitation.Payload = []byte("invitation answer corrupted") + } + from_server.Invitation = &answer + // exit loop if invitation found, cannot store several in a message + return &from_server, nil + } + // add invitation answer to the response + } + + } + return &from_server, nil +} + +func (r *RedisRouter) handleInvitation(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { + var from_server meowlib.FromServerMessage + // update messages counter + err := r.Client.Incr("statistics:messages:invitation").Err() + if err != nil { + return nil, err + } + switch msg.Invitation.Step { + // create invitation => provide shortcode and expiry + case 1: + url, expiry := r.StoreInvitation(msg.Invitation.Payload, int(msg.Invitation.Timeout), msg.Invitation.Password, r.InvitationTimeout, int(msg.Invitation.ShortcodeLen)) + from_server.Invitation = &meowlib.Invitation{} + from_server.Invitation.Shortcode = url + from_server.Invitation.Expiry = expiry.UTC().Unix() + // get invitation => retrieve invitation from redis and send + case 2: + from_server.Invitation = &meowlib.Invitation{} + invitation, err := r.GetInvitation(msg.Invitation.Shortcode, msg.Invitation.Password) + + if err != nil { + if err.Error() == "auth failed" { + from_server.Invitation.Payload = []byte("authentication failure") + } else { + from_server.Invitation.Payload = []byte("invitation expired") + } + } else { + from_server.Invitation.Payload = invitation // protobuf invitation + } + + // accept invitation => store accepted invitation for initiator + case 3: + var usermsg meowlib.PackedUserMessage + err := proto.Unmarshal(msg.Invitation.Payload, &usermsg) + if err != nil { + return nil, err + } + data, err := proto.Marshal(msg.Invitation) + if err != nil { + return nil, err + } + expiry := r.StoreAnswerToInvitation(usermsg.Destination, int(msg.Invitation.Timeout), data, r.InvitationTimeout) + from_server.Invitation = &meowlib.Invitation{} + from_server.Invitation.Expiry = expiry.UTC().Unix() + } + return &from_server, nil +} + +func (r *RedisRouter) handleMatriochka(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { + var from_server meowlib.FromServerMessage + // update messages counter + err := r.Client.Incr("statistics:messages:matriochka").Err() + if err != nil { + return nil, err + } + out, err := proto.Marshal(msg) + if err != nil { + return nil, err + } + r.Client.ZAdd("mtk", redis.Z{Score: float64(time.Now().Unix()), Member: out}) + if msg.MatriochkaMessage.LookupKey != "" { + //r.Client.ZAdd("trk:" + msg.MatriochkaMessage.Next.Uuid,{}) + } + from_server.UuidAck = msg.Uuid return &from_server, nil }