This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
@@ -13,6 +14,7 @@ type LokiWriter struct {
|
||||
url string
|
||||
labels map[string]string
|
||||
httpClient *http.Client
|
||||
disabled bool
|
||||
|
||||
// Circuit breaker fields
|
||||
mu sync.RWMutex
|
||||
@@ -38,15 +40,29 @@ const (
|
||||
warningInterval = 1 * time.Minute // Minimum time between warning messages
|
||||
)
|
||||
|
||||
func NewLokiWriter(url string, labels map[string]string) *LokiWriter {
|
||||
func NewLokiWriter(rawURL string, labels map[string]string) *LokiWriter {
|
||||
disabled := false
|
||||
if rawURL == "" {
|
||||
disabled = true
|
||||
} else {
|
||||
u, err := url.ParseRequestURI(rawURL)
|
||||
if err != nil || (u.Scheme != "http" && u.Scheme != "https") || u.Host == "" {
|
||||
disabled = true
|
||||
}
|
||||
}
|
||||
return &LokiWriter{
|
||||
url: url,
|
||||
url: rawURL,
|
||||
labels: labels,
|
||||
httpClient: &http.Client{},
|
||||
disabled: disabled,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *LokiWriter) Write(p []byte) (n int, err error) {
|
||||
if w.disabled {
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
// Check circuit breaker status
|
||||
w.mu.RLock()
|
||||
if w.circuitOpen {
|
||||
|
||||
191
lokiwriter_test.go
Normal file
191
lokiwriter_test.go
Normal file
@@ -0,0 +1,191 @@
|
||||
package meowlib
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestNewLokiWriterDisabledOnEmptyURL(t *testing.T) {
|
||||
w := NewLokiWriter("", map[string]string{"app": "test"})
|
||||
assert.True(t, w.disabled)
|
||||
}
|
||||
|
||||
func TestNewLokiWriterDisabledOnInvalidURL(t *testing.T) {
|
||||
cases := []string{
|
||||
"not-a-url",
|
||||
"ftp://example.com/loki",
|
||||
"://missing-scheme",
|
||||
"http://",
|
||||
"justtext",
|
||||
}
|
||||
for _, u := range cases {
|
||||
w := NewLokiWriter(u, map[string]string{"app": "test"})
|
||||
assert.True(t, w.disabled, "expected disabled for URL: %s", u)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewLokiWriterEnabledOnValidURL(t *testing.T) {
|
||||
cases := []string{
|
||||
"http://localhost:3100/loki/api/v1/push",
|
||||
"https://log.redroom.link/loki/api/v1/push",
|
||||
}
|
||||
for _, u := range cases {
|
||||
w := NewLokiWriter(u, map[string]string{"app": "test"})
|
||||
assert.False(t, w.disabled, "expected enabled for URL: %s", u)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteDisabledReturnsLength(t *testing.T) {
|
||||
w := NewLokiWriter("", map[string]string{"app": "test"})
|
||||
msg := []byte(`{"level":"info","message":"should be discarded"}`)
|
||||
n, err := w.Write(msg)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(msg), n)
|
||||
}
|
||||
|
||||
func TestWriteToMockLoki(t *testing.T) {
|
||||
var received LokiPayload
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
assert.Equal(t, "application/json", r.Header.Get("Content-Type"))
|
||||
err := json.NewDecoder(r.Body).Decode(&received)
|
||||
assert.NoError(t, err)
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
labels := map[string]string{"app": "meowlib", "env": "test"}
|
||||
w := NewLokiWriter(server.URL, labels)
|
||||
|
||||
msg := []byte(`{"level":"info","message":"hello from test"}`)
|
||||
n, err := w.Write(msg)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(msg), n)
|
||||
|
||||
// Verify payload structure
|
||||
assert.Len(t, received.Streams, 1)
|
||||
assert.Equal(t, "meowlib", received.Streams[0].Stream["app"])
|
||||
assert.Equal(t, "test", received.Streams[0].Stream["env"])
|
||||
assert.Equal(t, "info", received.Streams[0].Stream["level"])
|
||||
assert.Len(t, received.Streams[0].Values, 1)
|
||||
assert.Equal(t, "hello from test", received.Streams[0].Values[0][1])
|
||||
}
|
||||
|
||||
func TestWriteInvalidJSON(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
t.Fatal("server should not be called for invalid JSON input")
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
w := NewLokiWriter(server.URL, map[string]string{"app": "test"})
|
||||
msg := []byte(`not json at all`)
|
||||
n, err := w.Write(msg)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(msg), n)
|
||||
}
|
||||
|
||||
func TestCircuitBreakerOpensAfterFailures(t *testing.T) {
|
||||
callCount := 0
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
callCount++
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
w := NewLokiWriter(server.URL, map[string]string{"app": "test"})
|
||||
msg := []byte(`{"level":"error","message":"fail test"}`)
|
||||
|
||||
// Send maxFailures requests to trip the circuit breaker
|
||||
for i := 0; i < maxFailures; i++ {
|
||||
n, err := w.Write(msg)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(msg), n)
|
||||
}
|
||||
assert.True(t, w.circuitOpen)
|
||||
|
||||
// Next write should be silently discarded (no server call)
|
||||
prevCount := callCount
|
||||
n, err := w.Write(msg)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(msg), n)
|
||||
assert.Equal(t, prevCount, callCount, "no HTTP call should be made while circuit is open")
|
||||
}
|
||||
|
||||
func TestCircuitBreakerResetsAfterSuccess(t *testing.T) {
|
||||
failFirst := true
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if failFirst {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
} else {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
w := NewLokiWriter(server.URL, map[string]string{"app": "test"})
|
||||
msg := []byte(`{"level":"info","message":"recovery test"}`)
|
||||
|
||||
// Cause some failures (but not enough to open circuit)
|
||||
w.Write(msg)
|
||||
w.Write(msg)
|
||||
|
||||
assert.False(t, w.circuitOpen)
|
||||
assert.Equal(t, 2, w.failureCount)
|
||||
|
||||
// Now succeed
|
||||
failFirst = false
|
||||
w.Write(msg)
|
||||
assert.Equal(t, 0, w.failureCount)
|
||||
}
|
||||
|
||||
func TestCircuitBreakerRetriesAfterTimeout(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
w := NewLokiWriter(server.URL, map[string]string{"app": "test"})
|
||||
|
||||
// Manually open the circuit with an old failure time
|
||||
w.mu.Lock()
|
||||
w.circuitOpen = true
|
||||
w.failureCount = maxFailures
|
||||
w.lastFailureTime = time.Now().Add(-circuitOpenTime - time.Second)
|
||||
w.mu.Unlock()
|
||||
|
||||
msg := []byte(`{"level":"info","message":"retry test"}`)
|
||||
n, err := w.Write(msg)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(msg), n)
|
||||
|
||||
// Circuit should now be closed after successful retry
|
||||
assert.False(t, w.circuitOpen)
|
||||
assert.Equal(t, 0, w.failureCount)
|
||||
}
|
||||
|
||||
func TestWriteToRealLoki(t *testing.T) {
|
||||
lokiURL := "https://log.redroom.link/loki/api/v1/push"
|
||||
|
||||
// Quick connectivity check
|
||||
client := &http.Client{Timeout: 5 * time.Second}
|
||||
resp, err := client.Get("https://log.redroom.link/ready")
|
||||
if err != nil || resp.StatusCode != http.StatusOK {
|
||||
t.Skip("Loki not reachable, skipping live test")
|
||||
}
|
||||
resp.Body.Close()
|
||||
|
||||
labels := map[string]string{"app": "meowlib", "env": "test"}
|
||||
w := NewLokiWriter(lokiURL, labels)
|
||||
assert.False(t, w.disabled)
|
||||
|
||||
msg := fmt.Sprintf(`{"level":"info","message":"lokiwriter_test at %s"}`, time.Now().Format(time.RFC3339))
|
||||
n, err := w.Write([]byte(msg))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(msg), n)
|
||||
assert.Equal(t, 0, w.failureCount)
|
||||
}
|
||||
Reference in New Issue
Block a user