package server import ( "context" "time" "forge.redroom.link/yves/meowlib" "github.com/go-redis/redis" "google.golang.org/protobuf/proto" ) type RedisRouter struct { Name string ServerIdentity *Identity Client *redis.Client InvitationTimeout int Context context.Context } func NewRedisRouter(server *Identity, redisUrl string, password string, db int, invitationTimeout int) *RedisRouter { var r RedisRouter r.ServerIdentity = server r.Name = "Redis" r.Client = redis.NewClient(&redis.Options{ Addr: redisUrl, Password: password, DB: db, }) r.InvitationTimeout = invitationTimeout r.Context = context.Background() // set start for uptime err := r.Client.Set("statistics:start", time.Now().UTC().Format(time.RFC3339), 0).Err() if err != nil { panic(err) } return &r } func (r *RedisRouter) Route(ctx context.Context, msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { var from_server *meowlib.FromServerMessage // update messages counter err := r.Client.Incr("statistics:messages:total").Err() if err != nil { panic(err) } // user message => store if len(msg.Messages) > 0 { logger.Info().Msg("storing message") from_server, err = r.storeMessage(msg) if err != nil { return nil, err } } // check for messages if len(msg.PullRequest) > 0 { logger.Info().Msg("checking for messages") from_server, err = r.checkForMessage(msg) if err != nil { return nil, err } if msg.Timeout > 0 && len(from_server.Chat) == 0 && from_server.Invitation == nil { logger.Info().Msg("long poll, subscribing for messages") from_server, err = r.subscribe(ctx, msg, int(msg.Timeout)) if err != nil { return nil, err } } } // initiate video if msg.VideoData != nil { logger.Info().Msg("handling video") from_server, err = r.handleVideo(msg) if err != nil { return nil, err } } // manage Matriochka if msg.MatriochkaMessage != nil { logger.Info().Msg("handling matriochka") from_server, err = r.handleMatriochka(msg) if err != nil { return nil, err } } // Server list exchange if len(msg.KnownServers) > 0 { } // Through server invitation process if msg.Invitation != nil { logger.Info().Msg("handling invitation") from_server, err = r.handleInvitation(msg) if err != nil { return nil, err } } /* case "s": // servers list breakmsgs case "m": // matriochka break case "b": // broadcast break case "a": // admin break } */ return from_server, nil } func (r *RedisRouter) storeMessage(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { var from_server meowlib.FromServerMessage // update messages counter err := r.Client.Incr("statistics:messages:usermessages").Err() if err != nil { return nil, err } for _, usrmsg := range msg.Messages { // serialize the message to store it as byte array into redis out, err := proto.Marshal(usrmsg) if err != nil { return nil, err } r.Client.ZAdd("msg:"+usrmsg.Destination, redis.Z{Score: float64(time.Now().Unix()), Member: out}) r.Client.Publish("msgch:"+usrmsg.Destination, "!") // if delivery tracking resquested, store the uid for the sender's key in delivery tracking if usrmsg.ServerDeliveryUuid != "" { r.Client.SAdd("dvyrq:"+usrmsg.ServerDeliveryUuid, redis.Z{Score: float64(time.Now().Unix()), Member: msg.From}) // TODO : this probably fails ! } } from_server.UuidAck = msg.Uuid return &from_server, nil } func (r *RedisRouter) checkForMessage(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { var from_server meowlib.FromServerMessage //dataFound := false // update messages counter err := r.Client.Incr("statistics:messages:messagelookups").Err() if err != nil { return nil, err } // todo check pull requests signature // iterate over pull requests for _, rq := range msg.PullRequest { // get messages from redis msgcnt, err := r.Client.ZCount("msg:"+rq.LookupKey, "-inf", "+inf").Result() if err != nil { return nil, err } res, err := r.Client.ZPopMin("msg:"+rq.LookupKey, msgcnt).Result() if err != nil { return nil, err } // iterate over messages for _, redismsg := range res { //println(redismsg.Score) val := redismsg.Member test := val.(string) var usrmsg meowlib.PackedUserMessage err := proto.Unmarshal([]byte(test), &usrmsg) if err != nil { return nil, err } // add server timestamp usrmsg.ServerTimestamp = append(usrmsg.ServerTimestamp, int64(redismsg.Score)) from_server.Chat = append(from_server.Chat, &usrmsg) // if delivery for that pick up requested, create, store and publish delivery message deliveryRequester, err := r.Client.SPop("msg:" + usrmsg.ServerDeliveryUuid).Result() if err != nil { if err != redis.Nil { // exit only if real error return nil, err } } if err != redis.Nil { // create a delivery record r.Client.ZAdd("dvy:"+deliveryRequester, redis.Z{Score: float64(time.Now().Unix()), Member: usrmsg.ServerDeliveryUuid}) // publish it in case of listener r.Client.Publish("dvych:"+usrmsg.ServerDeliveryUuid, "!") } } // if no messages check for invitationanswer payload if msgcnt == 0 { // get invitation answer var answer meowlib.Invitation storedAnswer, _ := r.GetAnswerToInvitation(rq.LookupKey) if storedAnswer != nil { err := proto.Unmarshal(storedAnswer, &answer) if err != nil { from_server.Invitation.Payload = []byte("invitation answer corrupted") } from_server.Invitation = &answer // exit loop if invitation found, cannot store several in a message return &from_server, nil } // add invitation answer to the response } } return &from_server, nil } func (r *RedisRouter) subscribe(reqCtx context.Context, msg *meowlib.ToServerMessage, timeout int) (*meowlib.FromServerMessage, error) { if err := r.Client.Incr("statistics:messages:messagessubscription").Err(); err != nil { return nil, err } channels := make([]string, 0, len(msg.PullRequest)) for _, rq := range msg.PullRequest { channels = append(channels, "msgch:"+rq.LookupKey) } // Subscribe before re-checking: any publish that fires after Subscribe() // returns is buffered in pubsub.Channel(), closing the store→publish→check race. pubsub := r.Client.Subscribe(channels...) defer pubsub.Close() // Drain one subscribe-confirmation per channel. for range len(channels) { if _, err := pubsub.Receive(); err != nil { return nil, err } } // Re-check now that we are subscribed; catches messages that arrived // between the caller's first checkForMessage and our Subscribe call. fromServer, err := r.checkForMessage(msg) if err != nil { return nil, err } if len(fromServer.Chat) > 0 || fromServer.Invitation != nil { return fromServer, nil } ctx, cancel := context.WithTimeout(reqCtx, time.Duration(timeout)*time.Second) defer cancel() ch := pubsub.Channel() select { case <-ctx.Done(): if ctx.Err() == context.DeadlineExceeded { return fromServer, nil } return nil, ctx.Err() case _, ok := <-ch: if !ok { return fromServer, nil } return r.checkForMessage(msg) } } func (r *RedisRouter) handleInvitation(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { var from_server meowlib.FromServerMessage // update messages counter err := r.Client.Incr("statistics:messages:invitation").Err() if err != nil { return nil, err } switch msg.Invitation.Step { // create invitation => provide shortcode and expiry case 1: url, expiry, err := r.StoreInvitation(msg.Invitation.Payload, int(msg.Invitation.Timeout), msg.Invitation.Password, r.InvitationTimeout, int(msg.Invitation.ShortcodeLen)) if err != nil { return nil, err } from_server.Invitation = &meowlib.Invitation{} from_server.Invitation.Shortcode = url from_server.Invitation.Expiry = expiry.UTC().Unix() // get invitation => retrieve invitation from redis and send case 2: from_server.Invitation = &meowlib.Invitation{} invitation, err := r.GetInvitation(msg.Invitation.Shortcode, msg.Invitation.Password) if err != nil { if err.Error() == "auth failed" { from_server.Invitation.Payload = []byte("authentication failure") } else { from_server.Invitation.Payload = []byte("invitation expired") } } else { from_server.Invitation.Payload = invitation // protobuf invitation } // accept invitation => store accepted invitation for initiator case 3: var usermsg meowlib.PackedUserMessage err := proto.Unmarshal(msg.Invitation.Payload, &usermsg) if err != nil { return nil, err } data, err := proto.Marshal(msg.Invitation) if err != nil { return nil, err } expiry := r.StoreAnswerToInvitation(usermsg.Destination, int(msg.Invitation.Timeout), data, r.InvitationTimeout) from_server.Invitation = &meowlib.Invitation{} from_server.Invitation.Expiry = expiry.UTC().Unix() } return &from_server, nil } func (r *RedisRouter) handleVideo(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { var from_server meowlib.FromServerMessage // update messages counter err := r.Client.Incr("statistics:messages:video").Err() if err != nil { return nil, err } videoData, err := r.ServerIdentity.VideoServer.UpdateVideoData(msg.VideoData) if err != nil { return nil, err } from_server.VideoData = videoData from_server.UuidAck = msg.Uuid return &from_server, nil } func (r *RedisRouter) handleMatriochka(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) { var from_server meowlib.FromServerMessage // update messages counter err := r.Client.Incr("statistics:messages:matriochka").Err() if err != nil { return nil, err } out, err := proto.Marshal(msg) if err != nil { return nil, err } r.Client.ZAdd("mtk", redis.Z{Score: float64(time.Now().Unix()), Member: out}) if msg.MatriochkaMessage.LookupKey != "" { //r.Client.ZAdd("trk:" + msg.MatriochkaMessage.Next.Uuid,{}) } from_server.UuidAck = msg.Uuid return &from_server, nil }