some adjustmentsfor server delivery and peer storage study
Some checks failed
continuous-integration/drone/push Build is failing
Some checks failed
continuous-integration/drone/push Build is failing
This commit is contained in:
@ -119,8 +119,13 @@ func (r *RedisRouter) storeMessage(msg *meowlib.ToServerMessage) (*meowlib.FromS
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.Client.ZAdd(usrmsg.Destination, redis.Z{Score: float64(time.Now().Unix()), Member: out})
|
||||
r.Client.Publish("ch:"+usrmsg.Destination, "!")
|
||||
r.Client.ZAdd("msg:"+usrmsg.Destination, redis.Z{Score: float64(time.Now().Unix()), Member: out})
|
||||
r.Client.Publish("msgch:"+usrmsg.Destination, "!")
|
||||
// if delivery tracking resquested, store the uid for the sender's key in delivery tracking
|
||||
if usrmsg.ServerDeliveryUuid != "" {
|
||||
r.Client.SAdd("dvyrq:"+usrmsg.ServerDeliveryUuid, redis.Z{Score: float64(time.Now().Unix()), Member: msg.From})
|
||||
}
|
||||
|
||||
}
|
||||
from_server.UuidAck = msg.Uuid
|
||||
return &from_server, nil
|
||||
@ -134,13 +139,15 @@ func (r *RedisRouter) checkForMessage(msg *meowlib.ToServerMessage) (*meowlib.Fr
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// todo check pull requests signature
|
||||
// iterate over pull requests
|
||||
for _, rq := range msg.PullRequest {
|
||||
// get messages from redis
|
||||
msgcnt, err := r.Client.ZCount(rq.LookupKey, "-inf", "+inf").Result()
|
||||
msgcnt, err := r.Client.ZCount("msg:"+rq.LookupKey, "-inf", "+inf").Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res, err := r.Client.ZPopMin(rq.LookupKey, msgcnt).Result()
|
||||
res, err := r.Client.ZPopMin("msg:"+rq.LookupKey, msgcnt).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -156,8 +163,21 @@ func (r *RedisRouter) checkForMessage(msg *meowlib.ToServerMessage) (*meowlib.Fr
|
||||
}
|
||||
// add server timestamp
|
||||
usrmsg.ServerTimestamp = append(usrmsg.ServerTimestamp, int64(redismsg.Score))
|
||||
|
||||
from_server.Chat = append(from_server.Chat, &usrmsg)
|
||||
|
||||
// if delivery requested, create, store and publish delivery message
|
||||
res, err := r.Client.SPop("msg:" + usrmsg.ServerDeliveryUuid).Result()
|
||||
if err != nil {
|
||||
if err != redis.Nil { // exit only if real error
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if err != redis.Nil {
|
||||
// create a delivery record
|
||||
r.Client.ZAdd("dvy:"+res, redis.Z{Score: float64(time.Now().Unix()), Member: usrmsg.ServerDeliveryUuid})
|
||||
// publish it in case of listener
|
||||
r.Client.Publish("dvych:"+usrmsg.ServerDeliveryUuid, "!")
|
||||
}
|
||||
}
|
||||
// if no messages check for invitationanswer payload
|
||||
if msgcnt == 0 {
|
||||
@ -182,7 +202,7 @@ func (r *RedisRouter) checkForMessage(msg *meowlib.ToServerMessage) (*meowlib.Fr
|
||||
|
||||
func goSubscribeAndListen(client *redis.Client, key string, messages chan<- string, wg *sync.WaitGroup, done <-chan struct{}) {
|
||||
defer wg.Done()
|
||||
pubsub := client.Subscribe("ch:" + key)
|
||||
pubsub := client.Subscribe("msgch:" + key)
|
||||
defer pubsub.Close()
|
||||
|
||||
// Create a new channel for the messages from this subscription
|
||||
@ -218,6 +238,7 @@ func (r *RedisRouter) subscribe(msg *meowlib.ToServerMessage, timeout int) (*meo
|
||||
var wg sync.WaitGroup
|
||||
done := make(chan struct{})
|
||||
// extract lookup keys and subscribe
|
||||
// iterate over pull requests
|
||||
for _, rq := range msg.PullRequest {
|
||||
wg.Add(1)
|
||||
// subscribe to the lookup key
|
||||
|
Reference in New Issue
Block a user