long poll exit fix
This commit is contained in:
@ -1,7 +1,6 @@
|
|||||||
package helpers
|
package helpers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"errors"
|
"errors"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
@ -194,46 +193,49 @@ func ReadMessage(messageFilename string) ([]string, []string, string, error) {
|
|||||||
// CheckForMessages checks for messages on a single server
|
// CheckForMessages checks for messages on a single server
|
||||||
func LongPollForMessages(storage_path string, jobs []client.RequestsJob, timeout int, longPoll bool) (int, string, error) {
|
func LongPollForMessages(storage_path string, jobs []client.RequestsJob, timeout int, longPoll bool) (int, string, error) {
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
// Channel to collect results
|
||||||
defer cancel()
|
resultChan := make(chan int, len(jobs))
|
||||||
|
errChan := make(chan error, len(jobs))
|
||||||
resultChan := make(chan int, len(jobs)) // We'll just send counts here
|
|
||||||
|
|
||||||
|
// WaitGroup to sync goroutines
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
// Loop through each job (server)
|
||||||
for _, job := range jobs {
|
for _, job := range jobs {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
||||||
go func(job client.RequestsJob) {
|
go func(job client.RequestsJob) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
for {
|
// Long-polling call to the server
|
||||||
select {
|
cnt, _, err := CheckForMessages(storage_path, &job, timeout, true)
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
cnt, _, err := CheckForMessages(storage_path, &job, timeout, true)
|
|
||||||
if err != nil {
|
|
||||||
continue // Optionally handle/log error
|
|
||||||
}
|
|
||||||
if cnt > 0 {
|
|
||||||
select {
|
|
||||||
case resultChan <- cnt:
|
|
||||||
case <-ctx.Done():
|
|
||||||
}
|
|
||||||
cancel()
|
|
||||||
|
|
||||||
return
|
if err == nil && cnt > 0 {
|
||||||
}
|
select {
|
||||||
|
case resultChan <- cnt:
|
||||||
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close the error channel to notify all goroutines
|
||||||
|
close(errChan)
|
||||||
|
|
||||||
}
|
}
|
||||||
}(job)
|
}(job)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for a result or context timeout
|
// Close the result channel when all workers are done
|
||||||
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(resultChan)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for the first message or all timeouts
|
||||||
select {
|
select {
|
||||||
case cnt := <-resultChan:
|
case cnt := <-resultChan:
|
||||||
return cnt, "", nil
|
return cnt, "", nil
|
||||||
case <-ctx.Done():
|
case <-errChan:
|
||||||
|
// If one fails and exitOnMessage is true
|
||||||
return 0, "", nil
|
return 0, "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user