diff --git a/lokiwriter.go b/lokiwriter.go index 6422425..e276b53 100644 --- a/lokiwriter.go +++ b/lokiwriter.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "net/http" + "net/url" "sync" "time" ) @@ -13,6 +14,7 @@ type LokiWriter struct { url string labels map[string]string httpClient *http.Client + disabled bool // Circuit breaker fields mu sync.RWMutex @@ -38,15 +40,29 @@ const ( warningInterval = 1 * time.Minute // Minimum time between warning messages ) -func NewLokiWriter(url string, labels map[string]string) *LokiWriter { +func NewLokiWriter(rawURL string, labels map[string]string) *LokiWriter { + disabled := false + if rawURL == "" { + disabled = true + } else { + u, err := url.ParseRequestURI(rawURL) + if err != nil || (u.Scheme != "http" && u.Scheme != "https") || u.Host == "" { + disabled = true + } + } return &LokiWriter{ - url: url, + url: rawURL, labels: labels, httpClient: &http.Client{}, + disabled: disabled, } } func (w *LokiWriter) Write(p []byte) (n int, err error) { + if w.disabled { + return len(p), nil + } + // Check circuit breaker status w.mu.RLock() if w.circuitOpen { diff --git a/lokiwriter_test.go b/lokiwriter_test.go new file mode 100644 index 0000000..7685a45 --- /dev/null +++ b/lokiwriter_test.go @@ -0,0 +1,191 @@ +package meowlib + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewLokiWriterDisabledOnEmptyURL(t *testing.T) { + w := NewLokiWriter("", map[string]string{"app": "test"}) + assert.True(t, w.disabled) +} + +func TestNewLokiWriterDisabledOnInvalidURL(t *testing.T) { + cases := []string{ + "not-a-url", + "ftp://example.com/loki", + "://missing-scheme", + "http://", + "justtext", + } + for _, u := range cases { + w := NewLokiWriter(u, map[string]string{"app": "test"}) + assert.True(t, w.disabled, "expected disabled for URL: %s", u) + } +} + +func TestNewLokiWriterEnabledOnValidURL(t *testing.T) { + cases := []string{ + "http://localhost:3100/loki/api/v1/push", + "https://log.redroom.link/loki/api/v1/push", + } + for _, u := range cases { + w := NewLokiWriter(u, map[string]string{"app": "test"}) + assert.False(t, w.disabled, "expected enabled for URL: %s", u) + } +} + +func TestWriteDisabledReturnsLength(t *testing.T) { + w := NewLokiWriter("", map[string]string{"app": "test"}) + msg := []byte(`{"level":"info","message":"should be discarded"}`) + n, err := w.Write(msg) + assert.NoError(t, err) + assert.Equal(t, len(msg), n) +} + +func TestWriteToMockLoki(t *testing.T) { + var received LokiPayload + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + err := json.NewDecoder(r.Body).Decode(&received) + assert.NoError(t, err) + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + labels := map[string]string{"app": "meowlib", "env": "test"} + w := NewLokiWriter(server.URL, labels) + + msg := []byte(`{"level":"info","message":"hello from test"}`) + n, err := w.Write(msg) + assert.NoError(t, err) + assert.Equal(t, len(msg), n) + + // Verify payload structure + assert.Len(t, received.Streams, 1) + assert.Equal(t, "meowlib", received.Streams[0].Stream["app"]) + assert.Equal(t, "test", received.Streams[0].Stream["env"]) + assert.Equal(t, "info", received.Streams[0].Stream["level"]) + assert.Len(t, received.Streams[0].Values, 1) + assert.Equal(t, "hello from test", received.Streams[0].Values[0][1]) +} + +func TestWriteInvalidJSON(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Fatal("server should not be called for invalid JSON input") + })) + defer server.Close() + + w := NewLokiWriter(server.URL, map[string]string{"app": "test"}) + msg := []byte(`not json at all`) + n, err := w.Write(msg) + assert.NoError(t, err) + assert.Equal(t, len(msg), n) +} + +func TestCircuitBreakerOpensAfterFailures(t *testing.T) { + callCount := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + callCount++ + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + w := NewLokiWriter(server.URL, map[string]string{"app": "test"}) + msg := []byte(`{"level":"error","message":"fail test"}`) + + // Send maxFailures requests to trip the circuit breaker + for i := 0; i < maxFailures; i++ { + n, err := w.Write(msg) + assert.NoError(t, err) + assert.Equal(t, len(msg), n) + } + assert.True(t, w.circuitOpen) + + // Next write should be silently discarded (no server call) + prevCount := callCount + n, err := w.Write(msg) + assert.NoError(t, err) + assert.Equal(t, len(msg), n) + assert.Equal(t, prevCount, callCount, "no HTTP call should be made while circuit is open") +} + +func TestCircuitBreakerResetsAfterSuccess(t *testing.T) { + failFirst := true + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if failFirst { + w.WriteHeader(http.StatusInternalServerError) + } else { + w.WriteHeader(http.StatusNoContent) + } + })) + defer server.Close() + + w := NewLokiWriter(server.URL, map[string]string{"app": "test"}) + msg := []byte(`{"level":"info","message":"recovery test"}`) + + // Cause some failures (but not enough to open circuit) + w.Write(msg) + w.Write(msg) + + assert.False(t, w.circuitOpen) + assert.Equal(t, 2, w.failureCount) + + // Now succeed + failFirst = false + w.Write(msg) + assert.Equal(t, 0, w.failureCount) +} + +func TestCircuitBreakerRetriesAfterTimeout(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNoContent) + })) + defer server.Close() + + w := NewLokiWriter(server.URL, map[string]string{"app": "test"}) + + // Manually open the circuit with an old failure time + w.mu.Lock() + w.circuitOpen = true + w.failureCount = maxFailures + w.lastFailureTime = time.Now().Add(-circuitOpenTime - time.Second) + w.mu.Unlock() + + msg := []byte(`{"level":"info","message":"retry test"}`) + n, err := w.Write(msg) + assert.NoError(t, err) + assert.Equal(t, len(msg), n) + + // Circuit should now be closed after successful retry + assert.False(t, w.circuitOpen) + assert.Equal(t, 0, w.failureCount) +} + +func TestWriteToRealLoki(t *testing.T) { + lokiURL := "https://log.redroom.link/loki/api/v1/push" + + // Quick connectivity check + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Get("https://log.redroom.link/ready") + if err != nil || resp.StatusCode != http.StatusOK { + t.Skip("Loki not reachable, skipping live test") + } + resp.Body.Close() + + labels := map[string]string{"app": "meowlib", "env": "test"} + w := NewLokiWriter(lokiURL, labels) + assert.False(t, w.disabled) + + msg := fmt.Sprintf(`{"level":"info","message":"lokiwriter_test at %s"}`, time.Now().Format(time.RFC3339)) + n, err := w.Write([]byte(msg)) + assert.NoError(t, err) + assert.Equal(t, len(msg), n) + assert.Equal(t, 0, w.failureCount) +}