This commit is contained in:
+40
-46
@@ -2,7 +2,6 @@ package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"forge.redroom.link/yves/meowlib"
|
||||
@@ -205,60 +204,55 @@ func (r *RedisRouter) checkForMessage(msg *meowlib.ToServerMessage) (*meowlib.Fr
|
||||
return &from_server, nil
|
||||
}
|
||||
|
||||
func goSubscribeAndListen(client *redis.Client, key string, messages chan<- string, wg *sync.WaitGroup, done <-chan struct{}) {
|
||||
defer wg.Done()
|
||||
pubsub := client.Subscribe("msgch:" + key)
|
||||
defer pubsub.Close()
|
||||
|
||||
// Create a new channel for the messages from this subscription
|
||||
myMessages := make(chan *redis.Message)
|
||||
go func() {
|
||||
for {
|
||||
msg, err := pubsub.ReceiveMessage()
|
||||
if err != nil {
|
||||
close(myMessages)
|
||||
return
|
||||
}
|
||||
myMessages <- msg
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for a message or for the done signal
|
||||
select {
|
||||
case msg := <-myMessages:
|
||||
messages <- msg.Payload
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RedisRouter) subscribe(msg *meowlib.ToServerMessage, timeout int) (*meowlib.FromServerMessage, error) {
|
||||
var from_server meowlib.FromServerMessage
|
||||
// update messages counter
|
||||
err := r.Client.Incr("statistics:messages:messagessubscription").Err()
|
||||
if err := r.Client.Incr("statistics:messages:messagessubscription").Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
channels := make([]string, 0, len(msg.PullRequest))
|
||||
for _, rq := range msg.PullRequest {
|
||||
channels = append(channels, "msgch:"+rq.LookupKey)
|
||||
}
|
||||
|
||||
// Subscribe before re-checking: any publish that fires after Subscribe()
|
||||
// returns is buffered in pubsub.Channel(), closing the store→publish→check race.
|
||||
pubsub := r.Client.Subscribe(channels...)
|
||||
defer pubsub.Close()
|
||||
|
||||
// Drain one subscribe-confirmation per channel.
|
||||
for range len(channels) {
|
||||
if _, err := pubsub.Receive(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Re-check now that we are subscribed; catches messages that arrived
|
||||
// between the caller's first checkForMessage and our Subscribe call.
|
||||
fromServer, err := r.checkForMessage(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
messages := make(chan string)
|
||||
var wg sync.WaitGroup
|
||||
done := make(chan struct{})
|
||||
// extract lookup keys and subscribe
|
||||
// iterate over pull requests
|
||||
for _, rq := range msg.PullRequest {
|
||||
wg.Add(1)
|
||||
// subscribe to the lookup key
|
||||
go goSubscribeAndListen(r.Client, rq.LookupKey, messages, &wg, done)
|
||||
if len(fromServer.Chat) > 0 || fromServer.Invitation != nil {
|
||||
return fromServer, nil
|
||||
}
|
||||
// wait for timeout or message
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.Context, time.Duration(timeout)*time.Second)
|
||||
defer cancel()
|
||||
|
||||
ch := pubsub.Channel()
|
||||
select {
|
||||
case <-messages:
|
||||
close(done)
|
||||
case <-ctx.Done():
|
||||
if ctx.Err() == context.DeadlineExceeded {
|
||||
return fromServer, nil
|
||||
}
|
||||
return nil, ctx.Err()
|
||||
case _, ok := <-ch:
|
||||
if !ok {
|
||||
return fromServer, nil
|
||||
}
|
||||
return r.checkForMessage(msg)
|
||||
case <-time.After(time.Duration(timeout) * time.Second): // 10 seconds timeout
|
||||
close(done)
|
||||
}
|
||||
wg.Wait()
|
||||
return &from_server, nil
|
||||
}
|
||||
|
||||
func (r *RedisRouter) handleInvitation(msg *meowlib.ToServerMessage) (*meowlib.FromServerMessage, error) {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -33,7 +34,7 @@ func newTestRouter(t *testing.T) (*RedisRouter, *miniredis.Miniredis) {
|
||||
Addr: mr.Addr(),
|
||||
}),
|
||||
InvitationTimeout: 3600,
|
||||
Context: nil,
|
||||
Context: context.Background(),
|
||||
}
|
||||
// seed the statistics:start key that NewRedisRouter normally sets
|
||||
router.Client.Set("statistics:start", time.Now().UTC().Format(time.RFC3339), 0)
|
||||
|
||||
Reference in New Issue
Block a user