package server import ( "context" "time" "forge.redroom.link/yves/meowlib" "github.com/go-redis/redis" "google.golang.org/protobuf/proto" ) type RedisRouter struct { Name string ServerIdentity *Identity Client *redis.Client InvitationTimeout int Context context.Context } func NewRedisRouter(server *Identity, redisUrl string, password string, db int, invitationTimeout int) *RedisRouter { var r RedisRouter r.ServerIdentity = server r.Name = "Redis" r.Client = redis.NewClient(&redis.Options{ Addr: redisUrl, Password: password, DB: db, }) r.InvitationTimeout = invitationTimeout r.Context = context.Background() // set start for uptime err := r.Client.Set("statistics:start", time.Now().UTC().Format(time.RFC3339), 0).Err() if err != nil { panic(err) } return &r } func (r *RedisRouter) Route(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { var from_server meowlib.FromServerMessage // update messages counter err := r.Client.Incr("statistics:messages:total").Err() if err != nil { panic(err) } // user message if len(msg.Messages) > 0 { // update messages counter err := r.Client.Incr("statistics:messages:usermessages").Err() if err != nil { panic(err) } for _, usrmsg := range msg.Messages { // serialize the message to store it as byte array into redis out, err := proto.Marshal(usrmsg) if err != nil { return nil, err } r.Client.ZAdd(usrmsg.Destination, redis.Z{Score: float64(time.Now().Unix()), Member: out}) } from_server.UuidAck = msg.Uuid } // check for messages if len(msg.PullRequest) > 0 { // update messages counter err := r.Client.Incr("statistics:messages:messagelookups").Err() if err != nil { panic(err) } for _, rq := range msg.PullRequest { // get messages from redis msgcnt, err := r.Client.ZCount(rq.LookupKey, "-inf", "+inf").Result() if err != nil { return nil, err } res, err := r.Client.ZPopMin(rq.LookupKey, msgcnt).Result() if err != nil { return nil, err } // iterate over messages for _, redismsg := range res { //println(redismsg.Score) val := redismsg.Member test := val.(string) var usrmsg meowlib.PackedUserMessage err := proto.Unmarshal([]byte(test), &usrmsg) if err != nil { return nil, err } // add server timestamp usrmsg.ServerTimestamp = append(usrmsg.ServerTimestamp, int64(redismsg.Score)) from_server.Chat = append(from_server.Chat, &usrmsg) } // if no messages check for invitationanswer payload if msgcnt == 0 { // get invitation answer var answer meowlib.Invitation storedAanswer, err := r.GetAnswerToInvitation(rq.LookupKey) if err != nil { err := proto.Unmarshal(storedAanswer, &answer) if err != nil { from_server.Invitation.Payload = []byte("invitation answer corrupted") } from_server.Invitation = &answer } // add invitation answer to the response } } } // manage Matriochka if msg.MatriochkaMessage != nil { // update messages counter err := r.Client.Incr("statistics:messages:matriochka").Err() if err != nil { panic(err) } out, err := proto.Marshal(msg) if err != nil { return nil, err } r.Client.ZAdd("mtk", redis.Z{Score: float64(time.Now().Unix()), Member: out}) if msg.MatriochkaMessage.LookupKey != "" { //r.Client.ZAdd("trk:" + msg.MatriochkaMessage.Next.Uuid,{}) } from_server.UuidAck = msg.Uuid } // Server list exchange if len(msg.KnownServers) > 0 { } // Through server invitation process if msg.Invitation != nil { // update messages counter err := r.Client.Incr("statistics:messages:invitation").Err() if err != nil { panic(err) } switch msg.Invitation.Step { // create invitation => provide shortcode and expiry case 1: url, expiry := r.StoreInvitation(msg.Invitation.Payload, int(msg.Invitation.Timeout), msg.Invitation.Password, r.InvitationTimeout, int(msg.Invitation.ShortcodeLen)) from_server.Invitation = &meowlib.Invitation{} from_server.Invitation.Shortcode = url from_server.Invitation.Expiry = expiry.UTC().Unix() // get invitation => retrieve invitation from redis and send case 2: from_server.Invitation = &meowlib.Invitation{} invitation, err := r.GetInvitation(msg.Invitation.Shortcode, msg.Invitation.Password) if err != nil { if err.Error() == "auth failed" { from_server.Invitation.Payload = []byte("authentication failure") } else { from_server.Invitation.Payload = []byte("invitation expired") } } else { from_server.Invitation.Payload = invitation // protobuf invitation } // accept invitation => store accepted invitation for initiator case 3: var usermsg meowlib.PackedUserMessage err := proto.Unmarshal(msg.Invitation.Payload, &usermsg) if err != nil { return nil, err } data, err := proto.Marshal(msg.Invitation) if err != nil { return nil, err } expiry := r.StoreAnswerToInvitation(usermsg.Destination, int(msg.Invitation.Timeout), data, r.InvitationTimeout) from_server.Invitation = &meowlib.Invitation{} from_server.Invitation.Expiry = expiry.UTC().Unix() // DONE IN NORMAL MESSAGE FLOW // get accepted invitation => send accepted invitation to initiator /* case 4: from_server.Invitation = &meowlib.Invitation{} var answer meowlib.Invitation storedAanswer, err := r.GetAnswerToInvitation(msg.Invitation.Uuid) if err != nil { from_server.Invitation.Payload = []byte("invitation answer not found") } else { err := proto.Unmarshal(storedAanswer, &answer) if err != nil { from_server.Invitation.Payload = []byte("invitation answer corrupted") } from_server.Invitation = &answer } */ } } /* case "s": // servers list breakmsgs case "m": // matriochka break case "b": // broadcast break case "a": // admin break } */ return &from_server, nil }