package server import ( "context" "time" "forge.redroom.link/yves/meowlib" "github.com/go-redis/redis" "google.golang.org/protobuf/proto" ) type RedisRouter struct { Name string ServerIdentity *Identity Client *redis.Client InvitationTimeout int Context context.Context } func NewRedisRouter(server *Identity, redisUrl string, password string, db int, invitationTimeout int) *RedisRouter { var r RedisRouter r.ServerIdentity = server r.Name = "Redis" r.Client = redis.NewClient(&redis.Options{ Addr: redisUrl, Password: password, DB: db, }) r.InvitationTimeout = invitationTimeout r.Context = context.Background() return &r } func (r *RedisRouter) Route(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { var from_server meowlib.FromServerMessage // user message if len(msg.Messages) > 0 { for _, usrmsg := range msg.Messages { // serialize the message to store it as byte array into redis out, err := proto.Marshal(usrmsg) if err != nil { return nil, err } r.Client.ZAdd(usrmsg.Destination, redis.Z{Score: float64(time.Now().Unix()), Member: out}) } from_server.UuidAck = msg.Uuid } // check for messages if len(msg.PullRequest) > 0 { for _, rq := range msg.PullRequest { msgcnt, err := r.Client.ZCount(rq.LookupKey, "-inf", "+inf").Result() if err != nil { return nil, err } res, err := r.Client.ZPopMin(rq.LookupKey, msgcnt).Result() if err != nil { return nil, err } for _, redismsg := range res { //println(redismsg.Score) val := redismsg.Member test := val.(string) var usrmsg meowlib.PackedUserMessage err := proto.Unmarshal([]byte(test), &usrmsg) if err != nil { return nil, err } // add server timestamp usrmsg.ServerTimestamp = append(usrmsg.ServerTimestamp, int64(redismsg.Score)) from_server.Chat = append(from_server.Chat, &usrmsg) } } } // manage Matriochka if msg.MatriochkaMessage != nil { out, err := proto.Marshal(msg) if err != nil { return nil, err } r.Client.ZAdd("mtk", redis.Z{Score: float64(time.Now().Unix()), Member: out}) if msg.MatriochkaMessage.LookupKey != "" { //r.Client.ZAdd("trk:" + msg.MatriochkaMessage.Next.Uuid,{}) } from_server.UuidAck = msg.Uuid } // Server list exchange if len(msg.KnownServers) > 0 { } // Through server invitation process if msg.Invitation != nil { switch msg.Invitation.Step { case 1: // create invitation url, expiry := r.CreateInvitation(msg.Invitation.Payload, int(msg.Invitation.Timeout), msg.Invitation.Password, r.InvitationTimeout, int(msg.Invitation.ShortcodeLen)) from_server.Invitation.Shortcode = url from_server.Invitation.Expiry = expiry.UTC().Unix() case 2: // get invitation invitation, err := r.GetInvitation(msg.Invitation.Shortcode, msg.Invitation.Password) if err != nil { if err.Error() == "auth failed" { from_server.Invitation.Payload = []byte("authentication failure") } else { from_server.Invitation.Payload = []byte("invitation expired") } } else { from_server.Invitation.Payload = invitation } /* should not happen case 3: // answer invitation expiry := r.AnswerInvitation(msg.Invitation.Id, int(msg.Invitation.Timeout), msg.Invitation.Payload, r.InvitationTimeout) from_server.Invitation.Expiry = expiry.UTC().Unix() case 4: // get answer answer, err := r.GetInvitationAnswer(msg.Invitation.Id) if err != nil { from_server.Invitation.Payload = []byte("invitation expired") } else { from_server.Invitation.Payload = answer } */ } } /* case "s": // servers list breakmsgs case "m": // matriochka break case "b": // broadcast break case "a": // admin break } */ return &from_server, nil }