package server import ( "context" "sync" "time" "forge.redroom.link/yves/meowlib" "github.com/go-redis/redis" "google.golang.org/protobuf/proto" ) type RedisRouter struct { Name string ServerIdentity *Identity Client *redis.Client InvitationTimeout int Context context.Context } func NewRedisRouter(server *Identity, redisUrl string, password string, db int, invitationTimeout int) *RedisRouter { var r RedisRouter r.ServerIdentity = server r.Name = "Redis" r.Client = redis.NewClient(&redis.Options{ Addr: redisUrl, Password: password, DB: db, }) r.InvitationTimeout = invitationTimeout r.Context = context.Background() // set start for uptime err := r.Client.Set("statistics:start", time.Now().UTC().Format(time.RFC3339), 0).Err() if err != nil { panic(err) } return &r } func (r *RedisRouter) Route(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { var from_server *meowlib.FromServerMessage // update messages counter err := r.Client.Incr("statistics:messages:total").Err() if err != nil { panic(err) } // user message => store if len(msg.Messages) > 0 { from_server, err = r.storeMessage(msg) if err != nil { return nil, err } } // check for messages if len(msg.PullRequest) > 0 { from_server, err = r.checkForMessage(msg) if err != nil { return nil, err } if msg.Timeout > 0 { // set timeout for the lookup r.subscribe(msg) } } // manage Matriochka if msg.MatriochkaMessage != nil { from_server, err = r.handleMatriochka(msg) if err != nil { return nil, err } } // Server list exchange if len(msg.KnownServers) > 0 { } // Through server invitation process if msg.Invitation != nil { from_server, err = r.handleInvitation(msg) if err != nil { return nil, err } } /* case "s": // servers list breakmsgs case "m": // matriochka break case "b": // broadcast break case "a": // admin break } */ return from_server, nil } func (r *RedisRouter) storeMessage(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { var from_server meowlib.FromServerMessage // update messages counter err := r.Client.Incr("statistics:messages:usermessages").Err() if err != nil { return nil, err } for _, usrmsg := range msg.Messages { // serialize the message to store it as byte array into redis out, err := proto.Marshal(usrmsg) if err != nil { return nil, err } r.Client.ZAdd(usrmsg.Destination, redis.Z{Score: float64(time.Now().Unix()), Member: out}) } from_server.UuidAck = msg.Uuid return &from_server, nil } func (r *RedisRouter) checkForMessage(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { var from_server meowlib.FromServerMessage //dataFound := false // update messages counter err := r.Client.Incr("statistics:messages:messagelookups").Err() if err != nil { return nil, err } for _, rq := range msg.PullRequest { // get messages from redis msgcnt, err := r.Client.ZCount(rq.LookupKey, "-inf", "+inf").Result() if err != nil { return nil, err } res, err := r.Client.ZPopMin(rq.LookupKey, msgcnt).Result() if err != nil { return nil, err } // iterate over messages for _, redismsg := range res { //println(redismsg.Score) val := redismsg.Member test := val.(string) var usrmsg meowlib.PackedUserMessage err := proto.Unmarshal([]byte(test), &usrmsg) if err != nil { return nil, err } // add server timestamp usrmsg.ServerTimestamp = append(usrmsg.ServerTimestamp, int64(redismsg.Score)) from_server.Chat = append(from_server.Chat, &usrmsg) } // if no messages check for invitationanswer payload if msgcnt == 0 { // get invitation answer var answer meowlib.Invitation storedAnswer, _ := r.GetAnswerToInvitation(rq.LookupKey) if storedAnswer != nil { err := proto.Unmarshal(storedAnswer, &answer) if err != nil { from_server.Invitation.Payload = []byte("invitation answer corrupted") } from_server.Invitation = &answer // exit loop if invitation found, cannot store several in a message return &from_server, nil } // add invitation answer to the response } } return &from_server, nil } func goSubscribeAndListen(client *redis.Client, key string, messages chan<- string, wg *sync.WaitGroup, done <-chan struct{}) { defer wg.Done() pubsub := client.Subscribe(key) defer pubsub.Close() // Create a new channel for the messages from this subscription myMessages := make(chan *redis.Message) go func() { for { msg, err := pubsub.ReceiveMessage() if err != nil { close(myMessages) return } myMessages <- msg } }() // Wait for a message or for the done signal select { case msg := <-myMessages: messages <- msg.Payload case <-done: return } } func (r *RedisRouter) subscribe(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { var from_server meowlib.FromServerMessage // update messages counter err := r.Client.Incr("statistics:messages:messagessubscription").Err() if err != nil { return nil, err } messages := make(chan string) var wg sync.WaitGroup done := make(chan struct{}) // extract lookup keys and subscribe for _, rq := range msg.PullRequest { wg.Add(1) // subscribe to the lookup key go goSubscribeAndListen(r.Client, rq.LookupKey, messages, &wg, done) } // wait for timeout or message select { case <-messages: close(done) return r.checkForMessage(msg) case <-time.After(10 * time.Second): // 10 seconds timeout close(done) } wg.Wait() return &from_server, nil } func (r *RedisRouter) handleInvitation(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { var from_server meowlib.FromServerMessage // update messages counter err := r.Client.Incr("statistics:messages:invitation").Err() if err != nil { return nil, err } switch msg.Invitation.Step { // create invitation => provide shortcode and expiry case 1: url, expiry := r.StoreInvitation(msg.Invitation.Payload, int(msg.Invitation.Timeout), msg.Invitation.Password, r.InvitationTimeout, int(msg.Invitation.ShortcodeLen)) from_server.Invitation = &meowlib.Invitation{} from_server.Invitation.Shortcode = url from_server.Invitation.Expiry = expiry.UTC().Unix() // get invitation => retrieve invitation from redis and send case 2: from_server.Invitation = &meowlib.Invitation{} invitation, err := r.GetInvitation(msg.Invitation.Shortcode, msg.Invitation.Password) if err != nil { if err.Error() == "auth failed" { from_server.Invitation.Payload = []byte("authentication failure") } else { from_server.Invitation.Payload = []byte("invitation expired") } } else { from_server.Invitation.Payload = invitation // protobuf invitation } // accept invitation => store accepted invitation for initiator case 3: var usermsg meowlib.PackedUserMessage err := proto.Unmarshal(msg.Invitation.Payload, &usermsg) if err != nil { return nil, err } data, err := proto.Marshal(msg.Invitation) if err != nil { return nil, err } expiry := r.StoreAnswerToInvitation(usermsg.Destination, int(msg.Invitation.Timeout), data, r.InvitationTimeout) from_server.Invitation = &meowlib.Invitation{} from_server.Invitation.Expiry = expiry.UTC().Unix() } return &from_server, nil } func (r *RedisRouter) handleMatriochka(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { var from_server meowlib.FromServerMessage // update messages counter err := r.Client.Incr("statistics:messages:matriochka").Err() if err != nil { return nil, err } out, err := proto.Marshal(msg) if err != nil { return nil, err } r.Client.ZAdd("mtk", redis.Z{Score: float64(time.Now().Unix()), Member: out}) if msg.MatriochkaMessage.LookupKey != "" { //r.Client.ZAdd("trk:" + msg.MatriochkaMessage.Next.Uuid,{}) } from_server.UuidAck = msg.Uuid return &from_server, nil }