From 6521942960bf38f3a7d40b3e459eb8af3e825410 Mon Sep 17 00:00:00 2001 From: Nadia Santalla Date: Tue, 1 Oct 2024 16:27:43 +0200 Subject: [PATCH] k6runner: split local and http runners to different files --- internal/k6runner/http.go | 208 +++++++++++ internal/k6runner/http_test.go | 554 +++++++++++++++++++++++++++++ internal/k6runner/k6runner.go | 338 +----------------- internal/k6runner/k6runner_test.go | 549 +--------------------------- internal/k6runner/local.go | 149 ++++++++ 5 files changed, 917 insertions(+), 881 deletions(-) create mode 100644 internal/k6runner/http.go create mode 100644 internal/k6runner/http_test.go create mode 100644 internal/k6runner/local.go diff --git a/internal/k6runner/http.go b/internal/k6runner/http.go new file mode 100644 index 00000000..e3dfd6b8 --- /dev/null +++ b/internal/k6runner/http.go @@ -0,0 +1,208 @@ +package k6runner + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "time" + + "github.com/rs/zerolog" + "golang.org/x/exp/rand" +) + +type HttpRunner struct { + url string + logger *zerolog.Logger + // backoff sets the minimum amount of time to wait before retrying a request. nth attempt waits n times this value, + // plus some jitter. + backoff time.Duration + // graceTime tells the HttpRunner how much time to add to the script timeout to form the request timeout. + graceTime time.Duration +} + +const ( + defaultBackoff = 10 * time.Second + defaultGraceTime = 20 * time.Second +) + +type requestError struct { + Err string `json:"error"` + Message string `json:"msg"` +} + +var _ error = requestError{} + +func (r requestError) Error() string { + return fmt.Sprintf("%s: %s", r.Err, r.Message) +} + +// HTTPRunRequest +type HTTPRunRequest struct { + Script `json:",inline"` + NotAfter time.Time `json:"notAfter"` +} + +type RunResponse struct { + Error string `json:"error,omitempty"` + ErrorCode string `json:"errorCode,omitempty"` + Metrics []byte `json:"metrics"` + Logs []byte `json:"logs"` +} + +func (r HttpRunner) WithLogger(logger *zerolog.Logger) Runner { + r.logger = logger + + return r +} + +var ErrUnexpectedStatus = errors.New("unexpected status code") + +func (r HttpRunner) Run(ctx context.Context, script Script) (*RunResponse, error) { + if r.backoff == 0 { + panic("zero backoff, runner is misconfigured, refusing to DoS") + } + + if deadline, hasDeadline := ctx.Deadline(); !hasDeadline { + defaultAllRetriesTimeout := time.Duration(script.Settings.Timeout) * time.Millisecond * 2 + r.logger.Error(). + Dur("allRetriesTimeout", defaultAllRetriesTimeout). + Msg("k6 runner does not have a deadline for all retries. This is a bug. Defaulting to twice the timeout to avoid retrying forever") + + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, defaultAllRetriesTimeout) + defer cancel() + } else if tud := time.Until(deadline); tud < time.Duration(script.Settings.Timeout)*time.Millisecond*2 { + r.logger.Debug(). + Str("timeUntilNext", tud.String()). + Str("timeout", (time.Duration(script.Settings.Timeout) * time.Millisecond).String()). + Msg("time until next execution is too close to script timeout, there might not be room for retries") + } + + wait := r.backoff + var response *RunResponse + for { + start := time.Now() + + var err error + response, err = r.request(ctx, script) + if err == nil { + r.logger.Debug().Bytes("metrics", response.Metrics).Bytes("logs", response.Logs).Msg("script result") + return response, nil + } + + if !errors.Is(err, errRetryable) { + // TODO: Log the returned error in the Processor instead. + r.logger.Error().Err(err).Msg("non-retryable error running k6") + return nil, err + } + + // Wait, but subtract the amount of time we've already waited as part of the request timeout. + // We do this because these requests have huge timeouts, and by the nature of the system running these requests, + // we expect the most common error to be a timeout, so we avoid waiting even more on top of an already large + // value. + waitRemaining := max(0, wait-time.Since(start)) + r.logger.Warn().Err(err).Dur("after", waitRemaining).Msg("retrying retryable error") + + waitTimer := time.NewTimer(waitRemaining) + select { + case <-ctx.Done(): + waitTimer.Stop() + // TODO: Log the returned error in the Processor instead. + r.logger.Error().Err(err).Msg("retries exhausted") + return nil, fmt.Errorf("cannot retry further: %w", errors.Join(err, ctx.Err())) + case <-waitTimer.C: + } + + // Backoff linearly, adding some jitter. + wait += r.backoff + time.Duration(rand.Intn(int(r.backoff))) + } +} + +// errRetryable indicates that an error is retryable. It is typically joined with another error. +var errRetryable = errors.New("retryable") + +func (r HttpRunner) request(ctx context.Context, script Script) (*RunResponse, error) { + checkTimeout := time.Duration(script.Settings.Timeout) * time.Millisecond + if checkTimeout == 0 { + return nil, ErrNoTimeout + } + + // requestTimeout should be noticeably larger than [Script.Settings.Timeout], to account for added latencies in the + // system such as network, i/o, seralization, queue wait time, etc. that take place after and before the script is + // ran. + // t0 t1 t2 t3 + // |--- Queue wait ---|-------------- k6 run -----------------|--- Response ---| + // checkTimeout = t2 - t1 + // requestTimeout = t3 - t0 + requestTimeout := checkTimeout + r.graceTime + notAfter := time.Now().Add(requestTimeout) + + ctx, cancel := context.WithDeadline(ctx, notAfter) + defer cancel() + + // Decorate the script request with the NotAfter hint. + // NotAfter hints runners when we're about to drop this request. Runners will refuse to start to run a script if + // this time is in the past, as it is guaranteed that we, the client, have already given up on the request. + // This allows runners to not waste time running scripts which will not complete before the client gives up on the + // request. + runRequest := HTTPRunRequest{ + Script: script, + NotAfter: notAfter, + } + + reqBody, err := json.Marshal(runRequest) + if err != nil { + return nil, fmt.Errorf("encoding script: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.url, bytes.NewReader(reqBody)) + if err != nil { + return nil, fmt.Errorf("building request: %w", err) + } + + req.Header.Add("content-type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + r.logger.Error().Err(err).Msg("sending request") + + // Any error making a request is retryable. + return nil, errors.Join(errRetryable, fmt.Errorf("making request: %w", err)) + } + + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusOK, http.StatusRequestTimeout, http.StatusUnprocessableEntity, http.StatusInternalServerError: + // These are status code that we assume come with a machine-readable response. The response may contain an error, which is + // handled later. + // See: https://github.com/grafana/sm-k6-runner/blob/main/internal/mq/proxy.go#L215 + + case http.StatusBadRequest: + // These are status codes that do not come with a machine readable response, and are not retryable. + // + // There might be an argument to be made to retry 500s, as they can be produced by panic recovering mechanisms which + // _can_ be seen as a transient error. However, it is also possible for a 500 to be returned by a script that failed + // and also needed a lot of time to complete. For this reason, we choose to not retry 500 for the time being. + return nil, fmt.Errorf("%w %d", ErrUnexpectedStatus, resp.StatusCode) + + default: + // Statuses not returned by the proxy directly are assumed to be infrastructure (e.g. ingress, k8s) related and + // thus marked as retriable. + // Runners may also return http.StatusServiceUnavailable if the browser session manager cannot be reached. We want + // to retry those errors, so we let the "default" block catch them. + return nil, errors.Join(errRetryable, fmt.Errorf("%w %d", ErrUnexpectedStatus, resp.StatusCode)) + } + + var response RunResponse + err = json.NewDecoder(resp.Body).Decode(&response) + if err != nil { + r.logger.Error().Err(err).Msg("decoding script result") + return nil, fmt.Errorf("decoding script result: %w", err) + } + + return &response, nil +} diff --git a/internal/k6runner/http_test.go b/internal/k6runner/http_test.go new file mode 100644 index 00000000..26db583c --- /dev/null +++ b/internal/k6runner/http_test.go @@ -0,0 +1,554 @@ +package k6runner + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "net/http/httptest" + "strconv" + "sync/atomic" + "testing" + "time" + + "github.com/go-logfmt/logfmt" + "github.com/grafana/synthetic-monitoring-agent/internal/testhelper" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/expfmt" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" + "golang.org/x/exp/rand" +) + +func TestHttpRunnerRun(t *testing.T) { + t.Parallel() + + script := Script{ + Script: testhelper.MustReadFile(t, "testdata/test.js"), + Settings: Settings{ + Timeout: 1000, + }, + } + + mux := http.NewServeMux() + mux.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, http.MethodPost, r.Method) + require.Equal(t, "application/json", r.Header.Get("Content-Type")) + + var req struct { + Script []byte `json:"script"` + Settings Settings `json:"settings"` + NotAfter time.Time `json:"notAfter"` + } + + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + t.Logf("decoding body: %v", err) + t.Fail() + w.WriteHeader(400) // Use 400 as the client won't retry this failure. + return + } + + if time.Since(req.NotAfter) > time.Hour || time.Until(req.NotAfter) > time.Hour { + t.Log("unexpected value for NotAfter too far from the present") + t.Fail() + w.WriteHeader(400) // Use 400 as the client won't retry this failure. + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + rr := &RunResponse{ + Metrics: testhelper.MustReadFile(t, "testdata/test.out"), + Logs: testhelper.MustReadFile(t, "testdata/test.log"), + } + _ = json.NewEncoder(w).Encode(rr) + }) + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"msg": "bad request"}`)) + }) + + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + + runner := New(RunnerOpts{Uri: srv.URL + "/run"}) + require.IsType(t, &HttpRunner{}, runner) + + ctx := context.Background() + ctx, cancel := testhelper.Context(ctx, t) + t.Cleanup(cancel) + + _, err := runner.Run(ctx, script) + require.NoError(t, err) +} + +func TestHttpRunnerRunError(t *testing.T) { + t.Parallel() + + script := Script{ + Script: testhelper.MustReadFile(t, "testdata/test.js"), + Settings: Settings{ + Timeout: 1000, + }, + } + + mux := http.NewServeMux() + mux.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.WriteHeader(http.StatusBadRequest) + resp := requestError{ + Err: http.StatusText(http.StatusBadRequest), + Message: "test error", + } + _ = json.NewEncoder(w).Encode(resp) + }) + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + t.Log("http runner called the wrong endpoint") + t.Fail() + + w.WriteHeader(http.StatusBadRequest) + resp := requestError{ + Err: http.StatusText(http.StatusBadRequest), + Message: "unexpected request to " + r.URL.Path, + } + _ = json.NewEncoder(w).Encode(resp) + }) + + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + + runner := New(RunnerOpts{Uri: srv.URL + "/run"}) + require.IsType(t, &HttpRunner{}, runner) + + // HTTPRunner will retry until the context deadline is met, so we set a short one. + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + ctx, cancel = testhelper.Context(ctx, t) + t.Cleanup(cancel) + + _, err := runner.Run(ctx, script) + require.ErrorIs(t, err, ErrUnexpectedStatus) +} + +// TestScriptHTTPRun tests that Script reports what it should depending on the status code and responses of the HTTP +// runner. +func TestScriptHTTPRun(t *testing.T) { + t.Parallel() + + var ( + testMetrics = testhelper.MustReadFile(t, "testdata/test.out") + // testLogs is a file containing a bunch of log lines. All lines but one have level=debug, which are discarded + // by the loki submitter implementation. + testLogs = testhelper.MustReadFile(t, "testdata/test.log") + // nonkdebugLogLine is the only line on testLogs that does not have level=debug, therefore the only one that is + // actually submitted. We use it in the test table below as a sentinel to assert whether logs have been + // submitted or not. + nonDebugLogLine = `time="2023-06-01T13:40:26-06:00" level="test" msg="Non-debug message, for testing!"` + "\n" + ) + + for _, tc := range []struct { + name string + response *RunResponse + delay time.Duration + statusCode int + expectSuccess bool + expectError error + expectErrorAs any // To accommodate return of unnamed errors. If set, expectError is ignored. + expectLogs string + }{ + { + name: "all good", + response: &RunResponse{ + Metrics: testMetrics, + Logs: testLogs, + }, + statusCode: http.StatusOK, + expectSuccess: true, + expectError: nil, + expectLogs: nonDebugLogLine, + }, + { + // HTTP runner should report failure and an error when the upstream status is not recognized. + name: "unexpected status", + response: &RunResponse{}, + statusCode: 999, + expectSuccess: false, + expectError: ErrUnexpectedStatus, + }, + { + // HTTP runner should report failure and an error when the error is unknown. + name: "non-user error", + response: &RunResponse{ + Metrics: testMetrics, + Logs: testLogs, + Error: "something went wrong", + ErrorCode: "something-wrong", + }, + statusCode: http.StatusOK, + expectSuccess: false, + expectError: ErrFromRunner, + expectLogs: nonDebugLogLine + fmt.Sprintf( + "msg=\"script did not execute successfully\" error=%q errorCode=%q\n", + "something went wrong", + "something-wrong", + ), + }, + { + // HTTP runner should report failure but no error when the error is unknown. + name: "user error", + response: &RunResponse{ + Metrics: testMetrics, + Logs: testLogs, + Error: "syntax error somewhere or something", + ErrorCode: "user", + }, + statusCode: http.StatusOK, + expectSuccess: false, + expectError: nil, + expectLogs: nonDebugLogLine + fmt.Sprintf( + "msg=\"script did not execute successfully\" error=%q errorCode=%q\n", + "syntax error somewhere or something", + "user", + ), + }, + { + name: "borked logs are sent best-effort", + response: &RunResponse{ + Metrics: testMetrics, + Logs: []byte(`level=error foo="b` + "\n"), + Error: "we killed k6", + ErrorCode: "user", + }, + statusCode: http.StatusUnprocessableEntity, + expectSuccess: false, + expectErrorAs: &logfmt.SyntaxError{}, + expectLogs: `level="error"` + "\n" + fmt.Sprintf( + "msg=\"script did not execute successfully\" error=%q errorCode=%q\n", + "we killed k6", + "user", + ), + }, + { + name: "logs are sent on borked metrics", + response: &RunResponse{ + Metrics: []byte("probe_succ{"), + Logs: testLogs, + Error: "we killed k6", + ErrorCode: "user", + }, + statusCode: http.StatusUnprocessableEntity, + expectSuccess: false, + expectErrorAs: expfmt.ParseError{}, + expectLogs: nonDebugLogLine + fmt.Sprintf( + "msg=\"script did not execute successfully\" error=%q errorCode=%q\n", + "we killed k6", + "user", + ), + }, + { + name: "inconsistent runner response A", + response: &RunResponse{ + Metrics: testMetrics, + Logs: testLogs, + Error: "set", + ErrorCode: "", + }, + statusCode: http.StatusInternalServerError, + expectSuccess: false, + expectError: ErrBuggyRunner, + expectLogs: "", + }, + { + name: "inconsistent runner response B", + response: &RunResponse{ + Metrics: testMetrics, + Logs: testLogs, + Error: "", + ErrorCode: "set", + }, + statusCode: http.StatusInternalServerError, + expectSuccess: false, + expectError: ErrBuggyRunner, + expectLogs: "", + }, + { + name: "request timeout", + response: &RunResponse{ + Metrics: testMetrics, + Logs: testLogs, + }, + delay: 3 * time.Second, // Beyond timeout + graceTime. + statusCode: http.StatusInternalServerError, + expectSuccess: false, + expectError: context.DeadlineExceeded, + }, + } { + tc := tc + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + mux := http.NewServeMux() + mux.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) { + time.Sleep(tc.delay) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(tc.statusCode) + _ = json.NewEncoder(w).Encode(tc.response) + }) + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + + runner := HttpRunner{url: srv.URL + "/run", graceTime: time.Second, backoff: time.Second} + script, err := NewProcessor(Script{ + Script: []byte("tee-hee"), + Settings: Settings{ + Timeout: 1000, + }, + }, runner) + require.NoError(t, err) + + baseCtx, baseCancel := context.WithTimeout(context.Background(), 4*time.Second) + t.Cleanup(baseCancel) + ctx, cancel := testhelper.Context(baseCtx, t) + t.Cleanup(cancel) + + var ( + registry = prometheus.NewRegistry() + logBuf = bytes.Buffer{} + logger = recordingLogger{buf: &logBuf} + zlogger = zerolog.Nop() + ) + + success, err := script.Run(ctx, registry, logger, zlogger) + require.Equal(t, tc.expectSuccess, success) + require.Equal(t, tc.expectLogs, logger.buf.String()) + if tc.expectErrorAs == nil { + require.ErrorIs(t, err, tc.expectError) + } else { + require.ErrorAs(t, err, &tc.expectErrorAs) + } + }) + } +} + +func TestHTTPProcessorRetries(t *testing.T) { + t.Parallel() + + t.Run("status codes", func(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + handler http.Handler + scriptTimeout time.Duration + graceTime time.Duration + globalTimeout time.Duration + expectRequests int64 + expectError error + }{ + { + name: "no retries needed", + handler: emptyJSON(http.StatusOK), + scriptTimeout: time.Second, graceTime: time.Second, globalTimeout: 5 * time.Second, + expectRequests: 1, + expectError: nil, + }, + { + name: "does not retry 400", + handler: emptyJSON(http.StatusBadRequest), + scriptTimeout: time.Second, graceTime: time.Second, globalTimeout: 5 * time.Second, + expectRequests: 1, + expectError: ErrUnexpectedStatus, + }, + { + name: "does not retry 422", + handler: emptyJSON(http.StatusUnprocessableEntity), + scriptTimeout: time.Second, graceTime: time.Second, globalTimeout: 5 * time.Second, + expectRequests: 1, + expectError: nil, + }, + { + name: "does not retry 428", + handler: emptyJSON(http.StatusRequestTimeout), + scriptTimeout: time.Second, graceTime: time.Second, globalTimeout: 5 * time.Second, + expectRequests: 1, + expectError: nil, + }, + { + name: "does not retry 500", + handler: emptyJSON(http.StatusInternalServerError), + scriptTimeout: time.Second, graceTime: time.Second, globalTimeout: 5 * time.Second, + expectRequests: 1, + expectError: nil, + }, + { + name: "retries 503", + handler: afterAttempts(emptyJSON(http.StatusServiceUnavailable), 1, emptyJSON(http.StatusOK)), + scriptTimeout: time.Second, graceTime: time.Second, globalTimeout: 5 * time.Second, + expectRequests: 2, + expectError: nil, + }, + { + name: "retries 504", + handler: afterAttempts(emptyJSON(http.StatusGatewayTimeout), 1, emptyJSON(http.StatusOK)), + scriptTimeout: time.Second, graceTime: time.Second, globalTimeout: 5 * time.Second, + expectRequests: 2, + expectError: nil, + }, + { + name: "retries more than once", + handler: afterAttempts(emptyJSON(http.StatusGatewayTimeout), 2, emptyJSON(http.StatusOK)), + scriptTimeout: time.Second, graceTime: time.Second, globalTimeout: 5 * time.Second, + expectRequests: 3, + expectError: nil, + }, + { + name: "gives up eventually", + handler: emptyJSON(http.StatusServiceUnavailable), + scriptTimeout: time.Second, graceTime: time.Second, globalTimeout: 5 * time.Second, + // Context is forced to timeout after 5 seconds. This means 3 requests, with delays of 0, [1-2), [2-3), as + // the fourth request would need to wait [3-4) seconds after [3-5) have passed, thus guaranteed to + // go beyond the 5s deadline. + expectRequests: 3, + expectError: context.DeadlineExceeded, + }, + { + name: "gives up eventually when server hangs", + handler: delay(10*time.Second, emptyJSON(http.StatusServiceUnavailable)), + scriptTimeout: time.Second, graceTime: time.Second, globalTimeout: 5 * time.Second, + // Requests have 2s timeout and 0s backoff after that (backoff includes timeout in + // this implementation), so that means requests are made at the 0s, 2s and 4s marks. A fourth request + // would happen beyond the 5s timeline. + expectRequests: 3, + expectError: context.DeadlineExceeded, + }, + } { + tc := tc + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + // Use atomic as we write to this in a handler and read from in on the test. + // go test -race trips if we use a regular int here. + var requests atomic.Int64 + + mux := http.NewServeMux() + mux.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) { + requests.Add(1) + tc.handler.ServeHTTP(w, r) + }) + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + + runner := HttpRunner{url: srv.URL + "/run", graceTime: tc.graceTime, backoff: time.Second} + processor, err := NewProcessor(Script{Script: nil, Settings: Settings{tc.scriptTimeout.Milliseconds()}}, runner) + require.NoError(t, err) + + baseCtx, baseCancel := context.WithTimeout(context.Background(), tc.globalTimeout) + t.Cleanup(baseCancel) + ctx, cancel := testhelper.Context(baseCtx, t) + t.Cleanup(cancel) + + var ( + registry = prometheus.NewRegistry() + logger testLogger + zlogger = zerolog.New(io.Discard) + ) + success, err := processor.Run(ctx, registry, &logger, zlogger) + require.ErrorIs(t, err, tc.expectError) + require.Equal(t, tc.expectError == nil, success) + require.Equal(t, tc.expectRequests, requests.Load()) + }) + } + }) + + t.Run("retries network errors", func(t *testing.T) { + t.Parallel() + + mux := http.NewServeMux() + mux.Handle("/run", emptyJSON(http.StatusOK)) + + // TODO: Hand-picking a random port instead of letting the OS allocate one is terrible practice. However, + // I haven't found a way to do this if we really want to know the address before something is listening on it. + addr := net.JoinHostPort("localhost", strconv.Itoa(30000+rand.Intn(35535))) + go func() { + time.Sleep(time.Second) + + listener, err := net.Listen("tcp4", addr) + if err != nil { + t.Logf("failed to set up listener in a random port. You were really unlucky, run the test again. %v", err) + t.Fail() + } + + err = http.Serve(listener, mux) + require.NoError(t, err) + t.Cleanup(func() { + listener.Close() + }) + }() + + runner := HttpRunner{url: "http://" + addr + "/run", graceTime: time.Second, backoff: time.Second} + processor, err := NewProcessor(Script{Script: nil, Settings: Settings{Timeout: 1000}}, runner) + require.NoError(t, err) + + baseCtx, baseCancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(baseCancel) + ctx, cancel := testhelper.Context(baseCtx, t) + t.Cleanup(cancel) + + var ( + registry = prometheus.NewRegistry() + logger testLogger + zlogger = zerolog.New(io.Discard) + ) + success, err := processor.Run(ctx, registry, &logger, zlogger) + require.NoError(t, err) + require.True(t, success) + }) +} + +func emptyJSON(status int) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _, _ = w.Write([]byte("{}")) + }) +} + +// afterAttempts invokes the first handler if strictly less than [afterAttempts] requests have been made before to it. +// afterAttempts(a, 0, b) always invokes b. +// afterAttempts(a, 1, b) always invokes a for the first request, then b for the subsequent ones. +func afterAttempts(a http.Handler, afterAttempts int, b http.Handler) http.Handler { + pastAttempts := 0 + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if pastAttempts >= afterAttempts { + b.ServeHTTP(w, r) + return + } + + a.ServeHTTP(w, r) + pastAttempts++ + }) +} + +// delay calls next after some time has passed. It watches for incoming request context cancellation to avoid leaking +// connections. +func delay(d time.Duration, next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Fully consume the request body before waiting. If we do not do this, cancelling the request context will not + // close the network connection, and httptest.Server will complain. + _, _ = io.Copy(io.Discard, r.Body) + + select { + case <-r.Context().Done(): + // Abort waiting if the client closes the request. Again, if we don't, httptest.Server will complain. + case <-time.After(d): + next.ServeHTTP(w, r) + } + }) +} diff --git a/internal/k6runner/k6runner.go b/internal/k6runner/k6runner.go index aa0b2d0e..0ec4c222 100644 --- a/internal/k6runner/k6runner.go +++ b/internal/k6runner/k6runner.go @@ -3,15 +3,10 @@ package k6runner import ( "bytes" "context" - "encoding/json" "errors" "fmt" "io" - "math/rand" - "net/http" - "os/exec" "strings" - "time" "github.com/go-logfmt/logfmt" "github.com/grafana/synthetic-monitoring-agent/internal/prober/logger" @@ -63,7 +58,7 @@ func New(opts RunnerOpts) Runner { backoff: defaultBackoff, } } else { - r = LocalRunner{ + r = Local{ k6path: opts.Uri, logger: &logger, blacklistedIP: opts.BlacklistedIP, @@ -324,334 +319,3 @@ NEXT_RECORD: return dec.Err() } - -type HttpRunner struct { - url string - logger *zerolog.Logger - // backoff sets the minimum amount of time to wait before retrying a request. nth attempt waits n times this value, - // plus some jitter. - backoff time.Duration - // graceTime tells the HttpRunner how much time to add to the script timeout to form the request timeout. - graceTime time.Duration -} - -const ( - defaultBackoff = 10 * time.Second - defaultGraceTime = 20 * time.Second -) - -type requestError struct { - Err string `json:"error"` - Message string `json:"msg"` -} - -var _ error = requestError{} - -func (r requestError) Error() string { - return fmt.Sprintf("%s: %s", r.Err, r.Message) -} - -// HTTPRunRequest -type HTTPRunRequest struct { - Script `json:",inline"` - NotAfter time.Time `json:"notAfter"` -} - -type RunResponse struct { - Error string `json:"error,omitempty"` - ErrorCode string `json:"errorCode,omitempty"` - Metrics []byte `json:"metrics"` - Logs []byte `json:"logs"` -} - -func (r HttpRunner) WithLogger(logger *zerolog.Logger) Runner { - r.logger = logger - - return r -} - -var ErrUnexpectedStatus = errors.New("unexpected status code") - -func (r HttpRunner) Run(ctx context.Context, script Script) (*RunResponse, error) { - if r.backoff == 0 { - panic("zero backoff, runner is misconfigured, refusing to DoS") - } - - if deadline, hasDeadline := ctx.Deadline(); !hasDeadline { - defaultAllRetriesTimeout := time.Duration(script.Settings.Timeout) * time.Millisecond * 2 - r.logger.Error(). - Dur("allRetriesTimeout", defaultAllRetriesTimeout). - Msg("k6 runner does not have a deadline for all retries. This is a bug. Defaulting to twice the timeout to avoid retrying forever") - - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, defaultAllRetriesTimeout) - defer cancel() - } else if tud := time.Until(deadline); tud < time.Duration(script.Settings.Timeout)*time.Millisecond*2 { - r.logger.Debug(). - Str("timeUntilNext", tud.String()). - Str("timeout", (time.Duration(script.Settings.Timeout) * time.Millisecond).String()). - Msg("time until next execution is too close to script timeout, there might not be room for retries") - } - - wait := r.backoff - var response *RunResponse - for { - start := time.Now() - - var err error - response, err = r.request(ctx, script) - if err == nil { - r.logger.Debug().Bytes("metrics", response.Metrics).Bytes("logs", response.Logs).Msg("script result") - return response, nil - } - - if !errors.Is(err, errRetryable) { - // TODO: Log the returned error in the Processor instead. - r.logger.Error().Err(err).Msg("non-retryable error running k6") - return nil, err - } - - // Wait, but subtract the amount of time we've already waited as part of the request timeout. - // We do this because these requests have huge timeouts, and by the nature of the system running these requests, - // we expect the most common error to be a timeout, so we avoid waiting even more on top of an already large - // value. - waitRemaining := max(0, wait-time.Since(start)) - r.logger.Warn().Err(err).Dur("after", waitRemaining).Msg("retrying retryable error") - - waitTimer := time.NewTimer(waitRemaining) - select { - case <-ctx.Done(): - waitTimer.Stop() - // TODO: Log the returned error in the Processor instead. - r.logger.Error().Err(err).Msg("retries exhausted") - return nil, fmt.Errorf("cannot retry further: %w", errors.Join(err, ctx.Err())) - case <-waitTimer.C: - } - - // Backoff linearly, adding some jitter. - wait += r.backoff + time.Duration(rand.Intn(int(r.backoff))) - } -} - -// errRetryable indicates that an error is retryable. It is typically joined with another error. -var errRetryable = errors.New("retryable") - -func (r HttpRunner) request(ctx context.Context, script Script) (*RunResponse, error) { - checkTimeout := time.Duration(script.Settings.Timeout) * time.Millisecond - if checkTimeout == 0 { - return nil, ErrNoTimeout - } - - // requestTimeout should be noticeably larger than [Script.Settings.Timeout], to account for added latencies in the - // system such as network, i/o, seralization, queue wait time, etc. that take place after and before the script is - // ran. - // t0 t1 t2 t3 - // |--- Queue wait ---|-------------- k6 run -----------------|--- Response ---| - // checkTimeout = t2 - t1 - // requestTimeout = t3 - t0 - requestTimeout := checkTimeout + r.graceTime - notAfter := time.Now().Add(requestTimeout) - - ctx, cancel := context.WithDeadline(ctx, notAfter) - defer cancel() - - // Decorate the script request with the NotAfter hint. - // NotAfter hints runners when we're about to drop this request. Runners will refuse to start to run a script if - // this time is in the past, as it is guaranteed that we, the client, have already given up on the request. - // This allows runners to not waste time running scripts which will not complete before the client gives up on the - // request. - runRequest := HTTPRunRequest{ - Script: script, - NotAfter: notAfter, - } - - reqBody, err := json.Marshal(runRequest) - if err != nil { - return nil, fmt.Errorf("encoding script: %w", err) - } - - req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.url, bytes.NewReader(reqBody)) - if err != nil { - return nil, fmt.Errorf("building request: %w", err) - } - - req.Header.Add("content-type", "application/json") - - resp, err := http.DefaultClient.Do(req) - if err != nil { - r.logger.Error().Err(err).Msg("sending request") - - // Any error making a request is retryable. - return nil, errors.Join(errRetryable, fmt.Errorf("making request: %w", err)) - } - - defer resp.Body.Close() - - switch resp.StatusCode { - case http.StatusOK, http.StatusRequestTimeout, http.StatusUnprocessableEntity, http.StatusInternalServerError: - // These are status code that we assume come with a machine-readable response. The response may contain an error, which is - // handled later. - // See: https://github.com/grafana/sm-k6-runner/blob/main/internal/mq/proxy.go#L215 - - case http.StatusBadRequest: - // These are status codes that do not come with a machine readable response, and are not retryable. - // - // There might be an argument to be made to retry 500s, as they can be produced by panic recovering mechanisms which - // _can_ be seen as a transient error. However, it is also possible for a 500 to be returned by a script that failed - // and also needed a lot of time to complete. For this reason, we choose to not retry 500 for the time being. - return nil, fmt.Errorf("%w %d", ErrUnexpectedStatus, resp.StatusCode) - - default: - // Statuses not returned by the proxy directly are assumed to be infrastructure (e.g. ingress, k8s) related and - // thus marked as retriable. - // Runners may also return http.StatusServiceUnavailable if the browser session manager cannot be reached. We want - // to retry those errors, so we let the "default" block catch them. - return nil, errors.Join(errRetryable, fmt.Errorf("%w %d", ErrUnexpectedStatus, resp.StatusCode)) - } - - var response RunResponse - err = json.NewDecoder(resp.Body).Decode(&response) - if err != nil { - r.logger.Error().Err(err).Msg("decoding script result") - return nil, fmt.Errorf("decoding script result: %w", err) - } - - return &response, nil -} - -type LocalRunner struct { - k6path string - logger *zerolog.Logger - fs afero.Fs - blacklistedIP string -} - -func (r LocalRunner) WithLogger(logger *zerolog.Logger) Runner { - r.logger = logger - return r -} - -func (r LocalRunner) Run(ctx context.Context, script Script) (*RunResponse, error) { - afs := afero.Afero{Fs: r.fs} - - checkTimeout := time.Duration(script.Settings.Timeout) * time.Millisecond - if checkTimeout == 0 { - return nil, ErrNoTimeout - } - - workdir, err := afs.TempDir("", "k6-runner") - if err != nil { - return nil, fmt.Errorf("cannot create temporary directory: %w", err) - } - - defer func() { - if err := r.fs.RemoveAll(workdir); err != nil { - r.logger.Error().Err(err).Str("severity", "critical").Msg("cannot remove temporary directory") - } - }() - - metricsFn, err := mktemp(r.fs, workdir, "*.json") - if err != nil { - return nil, fmt.Errorf("cannot obtain temporary metrics filename: %w", err) - } - - logsFn, err := mktemp(r.fs, workdir, "*.log") - if err != nil { - return nil, fmt.Errorf("cannot obtain temporary logs filename: %w", err) - } - - scriptFn, err := mktemp(r.fs, workdir, "*.js") - if err != nil { - return nil, fmt.Errorf("cannot obtain temporary script filename: %w", err) - } - - if err := afs.WriteFile(scriptFn, script.Script, 0o644); err != nil { - return nil, fmt.Errorf("cannot write temporary script file: %w", err) - } - - k6Path, err := exec.LookPath(r.k6path) - if err != nil { - return nil, fmt.Errorf("cannot find k6 executable: %w", err) - } - - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, checkTimeout) - defer cancel() - - // #nosec G204 -- the variables are not user-controlled - cmd := exec.CommandContext( - ctx, - k6Path, - "run", - "--out", "sm="+metricsFn, - "--log-format", "logfmt", - "--log-output", "file="+logsFn, - "--vus", "1", - "--iterations", "1", - "--max-redirects", "10", - "--batch", "10", - "--batch-per-host", "4", - "--no-connection-reuse", - "--blacklist-ip", r.blacklistedIP, - "--block-hostnames", "*.cluster.local", // TODO(mem): make this configurable - "--summary-time-unit", "s", - // "--discard-response-bodies", // TODO(mem): make this configurable - "--dns", "ttl=30s,select=random,policy=preferIPv4", // TODO(mem): this needs fixing, preferIPv4 is probably not what we want - "--no-thresholds", - "--no-usage-report", - "--no-color", - "--no-summary", - "--verbose", - scriptFn, - ) - - var stdout, stderr bytes.Buffer - - cmd.Dir = workdir - cmd.Stdin = nil - cmd.Stdout = &stdout - cmd.Stderr = &stderr - - start := time.Now() - - r.logger.Info().Str("command", cmd.String()).Bytes("script", script.Script).Msg("running k6 script") - - if err := cmd.Run(); err != nil { - r.logger.Error().Err(err).Str("stdout", stdout.String()).Str("stderr", stderr.String()).Msg("k6 exited with error") - return nil, fmt.Errorf("cannot execute k6 script: %w", err) - } - - duration := time.Since(start) - - r.logger.Debug().Str("stdout", stdout.String()).Str("stderr", stderr.String()).Dur("duration", duration).Msg("ran k6 script") - r.logger.Info().Dur("duration", duration).Msg("ran k6 script") - - var result RunResponse - - result.Metrics, err = afs.ReadFile(metricsFn) - if err != nil { - r.logger.Error().Err(err).Str("filename", metricsFn).Msg("cannot read metrics file") - return nil, fmt.Errorf("cannot read metrics: %w", err) - } - - result.Logs, err = afs.ReadFile(logsFn) - if err != nil { - r.logger.Error().Err(err).Str("filename", logsFn).Msg("cannot read metrics file") - return nil, fmt.Errorf("cannot read logs: %w", err) - } - - r.logger.Debug().Bytes("metrics", result.Metrics).Bytes("logs", result.Logs).Msg("k6 result") - - return &result, nil -} - -func mktemp(fs afero.Fs, dir, pattern string) (string, error) { - f, err := afero.TempFile(fs, dir, pattern) - if err != nil { - return "", fmt.Errorf("cannot create temporary file: %w", err) - } - if err := f.Close(); err != nil { - return "", fmt.Errorf("cannot close temporary file: %w", err) - } - return f.Name(), nil -} diff --git a/internal/k6runner/k6runner_test.go b/internal/k6runner/k6runner_test.go index d44ba926..e5b1c613 100644 --- a/internal/k6runner/k6runner_test.go +++ b/internal/k6runner/k6runner_test.go @@ -3,22 +3,13 @@ package k6runner import ( "bytes" "context" - "encoding/json" "errors" "fmt" "io" - "math/rand" - "net" - "net/http" - "net/http/httptest" "sort" - "strconv" "strings" - "sync/atomic" "testing" - "time" - "github.com/go-logfmt/logfmt" "github.com/grafana/synthetic-monitoring-agent/internal/prober/logger" "github.com/grafana/synthetic-monitoring-agent/internal/testhelper" "github.com/prometheus/client_golang/prometheus" @@ -32,16 +23,16 @@ func TestNew(t *testing.T) { t.Parallel() r1 := New(RunnerOpts{Uri: "k6"}) - require.IsType(t, LocalRunner{}, r1) - require.Equal(t, "", r1.(LocalRunner).blacklistedIP) + require.IsType(t, Local{}, r1) + require.Equal(t, "", r1.(Local).blacklistedIP) r2 := New(RunnerOpts{Uri: "/usr/bin/k6", BlacklistedIP: "192.168.4.0/24"}) - require.IsType(t, LocalRunner{}, r2) - require.Equal(t, "192.168.4.0/24", r2.(LocalRunner).blacklistedIP) + require.IsType(t, Local{}, r2) + require.Equal(t, "192.168.4.0/24", r2.(Local).blacklistedIP) // Ensure WithLogger preserves config. zl := zerolog.New(io.Discard) r2 = r2.WithLogger(&zl) - require.Equal(t, "192.168.4.0/24", r2.(LocalRunner).blacklistedIP) + require.Equal(t, "192.168.4.0/24", r2.(Local).blacklistedIP) r3 := New(RunnerOpts{Uri: "http://localhost:6565"}) require.IsType(t, &HttpRunner{}, r3) @@ -102,536 +93,6 @@ func TestScriptRun(t *testing.T) { require.True(t, success) } -func TestHttpRunnerRun(t *testing.T) { - t.Parallel() - - script := Script{ - Script: testhelper.MustReadFile(t, "testdata/test.js"), - Settings: Settings{ - Timeout: 1000, - }, - } - - mux := http.NewServeMux() - mux.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) { - require.Equal(t, http.MethodPost, r.Method) - require.Equal(t, "application/json", r.Header.Get("Content-Type")) - - var req struct { - Script []byte `json:"script"` - Settings Settings `json:"settings"` - NotAfter time.Time `json:"notAfter"` - } - - err := json.NewDecoder(r.Body).Decode(&req) - if err != nil { - t.Logf("decoding body: %v", err) - t.Fail() - w.WriteHeader(400) // Use 400 as the client won't retry this failure. - return - } - - if time.Since(req.NotAfter) > time.Hour || time.Until(req.NotAfter) > time.Hour { - t.Log("unexpected value for NotAfter too far from the present") - t.Fail() - w.WriteHeader(400) // Use 400 as the client won't retry this failure. - return - } - - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - rr := &RunResponse{ - Metrics: testhelper.MustReadFile(t, "testdata/test.out"), - Logs: testhelper.MustReadFile(t, "testdata/test.log"), - } - _ = json.NewEncoder(w).Encode(rr) - }) - mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusBadRequest) - _, _ = w.Write([]byte(`{"msg": "bad request"}`)) - }) - - srv := httptest.NewServer(mux) - t.Cleanup(srv.Close) - - runner := New(RunnerOpts{Uri: srv.URL + "/run"}) - require.IsType(t, &HttpRunner{}, runner) - - ctx := context.Background() - ctx, cancel := testhelper.Context(ctx, t) - t.Cleanup(cancel) - - _, err := runner.Run(ctx, script) - require.NoError(t, err) -} - -func TestHttpRunnerRunError(t *testing.T) { - t.Parallel() - - script := Script{ - Script: testhelper.MustReadFile(t, "testdata/test.js"), - Settings: Settings{ - Timeout: 1000, - }, - } - - mux := http.NewServeMux() - mux.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json; charset=utf-8") - w.WriteHeader(http.StatusBadRequest) - resp := requestError{ - Err: http.StatusText(http.StatusBadRequest), - Message: "test error", - } - _ = json.NewEncoder(w).Encode(resp) - }) - mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - t.Log("http runner called the wrong endpoint") - t.Fail() - - w.WriteHeader(http.StatusBadRequest) - resp := requestError{ - Err: http.StatusText(http.StatusBadRequest), - Message: "unexpected request to " + r.URL.Path, - } - _ = json.NewEncoder(w).Encode(resp) - }) - - srv := httptest.NewServer(mux) - t.Cleanup(srv.Close) - - runner := New(RunnerOpts{Uri: srv.URL + "/run"}) - require.IsType(t, &HttpRunner{}, runner) - - // HTTPRunner will retry until the context deadline is met, so we set a short one. - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - t.Cleanup(cancel) - - ctx, cancel = testhelper.Context(ctx, t) - t.Cleanup(cancel) - - _, err := runner.Run(ctx, script) - require.ErrorIs(t, err, ErrUnexpectedStatus) -} - -// TestScriptHTTPRun tests that Script reports what it should depending on the status code and responses of the HTTP -// runner. -func TestScriptHTTPRun(t *testing.T) { - t.Parallel() - - var ( - testMetrics = testhelper.MustReadFile(t, "testdata/test.out") - // testLogs is a file containing a bunch of log lines. All lines but one have level=debug, which are discarded - // by the loki submitter implementation. - testLogs = testhelper.MustReadFile(t, "testdata/test.log") - // nonkdebugLogLine is the only line on testLogs that does not have level=debug, therefore the only one that is - // actually submitted. We use it in the test table below as a sentinel to assert whether logs have been - // submitted or not. - nonDebugLogLine = `time="2023-06-01T13:40:26-06:00" level="test" msg="Non-debug message, for testing!"` + "\n" - ) - - for _, tc := range []struct { - name string - response *RunResponse - delay time.Duration - statusCode int - expectSuccess bool - expectError error - expectErrorAs any // To accommodate return of unnamed errors. If set, expectError is ignored. - expectLogs string - }{ - { - name: "all good", - response: &RunResponse{ - Metrics: testMetrics, - Logs: testLogs, - }, - statusCode: http.StatusOK, - expectSuccess: true, - expectError: nil, - expectLogs: nonDebugLogLine, - }, - { - // HTTP runner should report failure and an error when the upstream status is not recognized. - name: "unexpected status", - response: &RunResponse{}, - statusCode: 999, - expectSuccess: false, - expectError: ErrUnexpectedStatus, - }, - { - // HTTP runner should report failure and an error when the error is unknown. - name: "non-user error", - response: &RunResponse{ - Metrics: testMetrics, - Logs: testLogs, - Error: "something went wrong", - ErrorCode: "something-wrong", - }, - statusCode: http.StatusOK, - expectSuccess: false, - expectError: ErrFromRunner, - expectLogs: nonDebugLogLine + fmt.Sprintf( - "msg=\"script did not execute successfully\" error=%q errorCode=%q\n", - "something went wrong", - "something-wrong", - ), - }, - { - // HTTP runner should report failure but no error when the error is unknown. - name: "user error", - response: &RunResponse{ - Metrics: testMetrics, - Logs: testLogs, - Error: "syntax error somewhere or something", - ErrorCode: "user", - }, - statusCode: http.StatusOK, - expectSuccess: false, - expectError: nil, - expectLogs: nonDebugLogLine + fmt.Sprintf( - "msg=\"script did not execute successfully\" error=%q errorCode=%q\n", - "syntax error somewhere or something", - "user", - ), - }, - { - name: "borked logs are sent best-effort", - response: &RunResponse{ - Metrics: testMetrics, - Logs: []byte(`level=error foo="b` + "\n"), - Error: "we killed k6", - ErrorCode: "user", - }, - statusCode: http.StatusUnprocessableEntity, - expectSuccess: false, - expectErrorAs: &logfmt.SyntaxError{}, - expectLogs: `level="error"` + "\n" + fmt.Sprintf( - "msg=\"script did not execute successfully\" error=%q errorCode=%q\n", - "we killed k6", - "user", - ), - }, - { - name: "logs are sent on borked metrics", - response: &RunResponse{ - Metrics: []byte("probe_succ{"), - Logs: testLogs, - Error: "we killed k6", - ErrorCode: "user", - }, - statusCode: http.StatusUnprocessableEntity, - expectSuccess: false, - expectErrorAs: expfmt.ParseError{}, - expectLogs: nonDebugLogLine + fmt.Sprintf( - "msg=\"script did not execute successfully\" error=%q errorCode=%q\n", - "we killed k6", - "user", - ), - }, - { - name: "inconsistent runner response A", - response: &RunResponse{ - Metrics: testMetrics, - Logs: testLogs, - Error: "set", - ErrorCode: "", - }, - statusCode: http.StatusInternalServerError, - expectSuccess: false, - expectError: ErrBuggyRunner, - expectLogs: "", - }, - { - name: "inconsistent runner response B", - response: &RunResponse{ - Metrics: testMetrics, - Logs: testLogs, - Error: "", - ErrorCode: "set", - }, - statusCode: http.StatusInternalServerError, - expectSuccess: false, - expectError: ErrBuggyRunner, - expectLogs: "", - }, - { - name: "request timeout", - response: &RunResponse{ - Metrics: testMetrics, - Logs: testLogs, - }, - delay: 3 * time.Second, // Beyond timeout + graceTime. - statusCode: http.StatusInternalServerError, - expectSuccess: false, - expectError: context.DeadlineExceeded, - }, - } { - tc := tc - - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - mux := http.NewServeMux() - mux.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) { - time.Sleep(tc.delay) - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(tc.statusCode) - _ = json.NewEncoder(w).Encode(tc.response) - }) - srv := httptest.NewServer(mux) - t.Cleanup(srv.Close) - - runner := HttpRunner{url: srv.URL + "/run", graceTime: time.Second, backoff: time.Second} - script, err := NewProcessor(Script{ - Script: []byte("tee-hee"), - Settings: Settings{ - Timeout: 1000, - }, - }, runner) - require.NoError(t, err) - - baseCtx, baseCancel := context.WithTimeout(context.Background(), 4*time.Second) - t.Cleanup(baseCancel) - ctx, cancel := testhelper.Context(baseCtx, t) - t.Cleanup(cancel) - - var ( - registry = prometheus.NewRegistry() - logBuf = bytes.Buffer{} - logger = recordingLogger{buf: &logBuf} - zlogger = zerolog.Nop() - ) - - success, err := script.Run(ctx, registry, logger, zlogger) - require.Equal(t, tc.expectSuccess, success) - require.Equal(t, tc.expectLogs, logger.buf.String()) - if tc.expectErrorAs == nil { - require.ErrorIs(t, err, tc.expectError) - } else { - require.ErrorAs(t, err, &tc.expectErrorAs) - } - }) - } -} - -func TestHTTPProcessorRetries(t *testing.T) { - t.Parallel() - - t.Run("status codes", func(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - name string - handler http.Handler - scriptTimeout time.Duration - graceTime time.Duration - globalTimeout time.Duration - expectRequests int64 - expectError error - }{ - { - name: "no retries needed", - handler: emptyJSON(http.StatusOK), - scriptTimeout: time.Second, graceTime: time.Second, globalTimeout: 5 * time.Second, - expectRequests: 1, - expectError: nil, - }, - { - name: "does not retry 400", - handler: emptyJSON(http.StatusBadRequest), - scriptTimeout: time.Second, graceTime: time.Second, globalTimeout: 5 * time.Second, - expectRequests: 1, - expectError: ErrUnexpectedStatus, - }, - { - name: "does not retry 422", - handler: emptyJSON(http.StatusUnprocessableEntity), - scriptTimeout: time.Second, graceTime: time.Second, globalTimeout: 5 * time.Second, - expectRequests: 1, - expectError: nil, - }, - { - name: "does not retry 428", - handler: emptyJSON(http.StatusRequestTimeout), - scriptTimeout: time.Second, graceTime: time.Second, globalTimeout: 5 * time.Second, - expectRequests: 1, - expectError: nil, - }, - { - name: "does not retry 500", - handler: emptyJSON(http.StatusInternalServerError), - scriptTimeout: time.Second, graceTime: time.Second, globalTimeout: 5 * time.Second, - expectRequests: 1, - expectError: nil, - }, - { - name: "retries 503", - handler: afterAttempts(emptyJSON(http.StatusServiceUnavailable), 1, emptyJSON(http.StatusOK)), - scriptTimeout: time.Second, graceTime: time.Second, globalTimeout: 5 * time.Second, - expectRequests: 2, - expectError: nil, - }, - { - name: "retries 504", - handler: afterAttempts(emptyJSON(http.StatusGatewayTimeout), 1, emptyJSON(http.StatusOK)), - scriptTimeout: time.Second, graceTime: time.Second, globalTimeout: 5 * time.Second, - expectRequests: 2, - expectError: nil, - }, - { - name: "retries more than once", - handler: afterAttempts(emptyJSON(http.StatusGatewayTimeout), 2, emptyJSON(http.StatusOK)), - scriptTimeout: time.Second, graceTime: time.Second, globalTimeout: 5 * time.Second, - expectRequests: 3, - expectError: nil, - }, - { - name: "gives up eventually", - handler: emptyJSON(http.StatusServiceUnavailable), - scriptTimeout: time.Second, graceTime: time.Second, globalTimeout: 5 * time.Second, - // Context is forced to timeout after 5 seconds. This means 3 requests, with delays of 0, [1-2), [2-3), as - // the fourth request would need to wait [3-4) seconds after [3-5) have passed, thus guaranteed to - // go beyond the 5s deadline. - expectRequests: 3, - expectError: context.DeadlineExceeded, - }, - { - name: "gives up eventually when server hangs", - handler: delay(10*time.Second, emptyJSON(http.StatusServiceUnavailable)), - scriptTimeout: time.Second, graceTime: time.Second, globalTimeout: 5 * time.Second, - // Requests have 2s timeout and 0s backoff after that (backoff includes timeout in - // this implementation), so that means requests are made at the 0s, 2s and 4s marks. A fourth request - // would happen beyond the 5s timeline. - expectRequests: 3, - expectError: context.DeadlineExceeded, - }, - } { - tc := tc - - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - // Use atomic as we write to this in a handler and read from in on the test. - // go test -race trips if we use a regular int here. - var requests atomic.Int64 - - mux := http.NewServeMux() - mux.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) { - requests.Add(1) - tc.handler.ServeHTTP(w, r) - }) - srv := httptest.NewServer(mux) - t.Cleanup(srv.Close) - - runner := HttpRunner{url: srv.URL + "/run", graceTime: tc.graceTime, backoff: time.Second} - processor, err := NewProcessor(Script{Script: nil, Settings: Settings{tc.scriptTimeout.Milliseconds()}}, runner) - require.NoError(t, err) - - baseCtx, baseCancel := context.WithTimeout(context.Background(), tc.globalTimeout) - t.Cleanup(baseCancel) - ctx, cancel := testhelper.Context(baseCtx, t) - t.Cleanup(cancel) - - var ( - registry = prometheus.NewRegistry() - logger testLogger - zlogger = zerolog.New(io.Discard) - ) - success, err := processor.Run(ctx, registry, &logger, zlogger) - require.ErrorIs(t, err, tc.expectError) - require.Equal(t, tc.expectError == nil, success) - require.Equal(t, tc.expectRequests, requests.Load()) - }) - } - }) - - t.Run("retries network errors", func(t *testing.T) { - t.Parallel() - - mux := http.NewServeMux() - mux.Handle("/run", emptyJSON(http.StatusOK)) - - // TODO: Hand-picking a random port instead of letting the OS allocate one is terrible practice. However, - // I haven't found a way to do this if we really want to know the address before something is listening on it. - addr := net.JoinHostPort("localhost", strconv.Itoa(30000+rand.Intn(35535))) - go func() { - time.Sleep(time.Second) - - listener, err := net.Listen("tcp4", addr) - if err != nil { - t.Logf("failed to set up listener in a random port. You were really unlucky, run the test again. %v", err) - t.Fail() - } - - err = http.Serve(listener, mux) - require.NoError(t, err) - t.Cleanup(func() { - listener.Close() - }) - }() - - runner := HttpRunner{url: "http://" + addr + "/run", graceTime: time.Second, backoff: time.Second} - processor, err := NewProcessor(Script{Script: nil, Settings: Settings{Timeout: 1000}}, runner) - require.NoError(t, err) - - baseCtx, baseCancel := context.WithTimeout(context.Background(), 5*time.Second) - t.Cleanup(baseCancel) - ctx, cancel := testhelper.Context(baseCtx, t) - t.Cleanup(cancel) - - var ( - registry = prometheus.NewRegistry() - logger testLogger - zlogger = zerolog.New(io.Discard) - ) - success, err := processor.Run(ctx, registry, &logger, zlogger) - require.NoError(t, err) - require.True(t, success) - }) -} - -func emptyJSON(status int) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(status) - _, _ = w.Write([]byte("{}")) - }) -} - -// afterAttempts invokes the first handler if strictly less than [afterAttempts] requests have been made before to it. -// afterAttempts(a, 0, b) always invokes b. -// afterAttempts(a, 1, b) always invokes a for the first request, then b for the subsequent ones. -func afterAttempts(a http.Handler, afterAttempts int, b http.Handler) http.Handler { - pastAttempts := 0 - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if pastAttempts >= afterAttempts { - b.ServeHTTP(w, r) - return - } - - a.ServeHTTP(w, r) - pastAttempts++ - }) -} - -// delay calls next after some time has passed. It watches for incoming request context cancellation to avoid leaking -// connections. -func delay(d time.Duration, next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Fully consume the request body before waiting. If we do not do this, cancelling the request context will not - // close the network connection, and httptest.Server will complain. - _, _ = io.Copy(io.Discard, r.Body) - - select { - case <-r.Context().Done(): - // Abort waiting if the client closes the request. Again, if we don't, httptest.Server will complain. - case <-time.After(d): - next.ServeHTTP(w, r) - } - }) -} - type testRunner struct { metrics []byte logs []byte diff --git a/internal/k6runner/local.go b/internal/k6runner/local.go new file mode 100644 index 00000000..dbbaeb1f --- /dev/null +++ b/internal/k6runner/local.go @@ -0,0 +1,149 @@ +package k6runner + +import ( + "bytes" + "context" + "fmt" + "os/exec" + "time" + + "github.com/rs/zerolog" + "github.com/spf13/afero" +) + +type Local struct { + k6path string + logger *zerolog.Logger + fs afero.Fs + blacklistedIP string +} + +func (r Local) WithLogger(logger *zerolog.Logger) Runner { + r.logger = logger + return r +} + +func (r Local) Run(ctx context.Context, script Script) (*RunResponse, error) { + afs := afero.Afero{Fs: r.fs} + + checkTimeout := time.Duration(script.Settings.Timeout) * time.Millisecond + if checkTimeout == 0 { + return nil, ErrNoTimeout + } + + workdir, err := afs.TempDir("", "k6-runner") + if err != nil { + return nil, fmt.Errorf("cannot create temporary directory: %w", err) + } + + defer func() { + if err := r.fs.RemoveAll(workdir); err != nil { + r.logger.Error().Err(err).Str("severity", "critical").Msg("cannot remove temporary directory") + } + }() + + metricsFn, err := mktemp(r.fs, workdir, "*.json") + if err != nil { + return nil, fmt.Errorf("cannot obtain temporary metrics filename: %w", err) + } + + logsFn, err := mktemp(r.fs, workdir, "*.log") + if err != nil { + return nil, fmt.Errorf("cannot obtain temporary logs filename: %w", err) + } + + scriptFn, err := mktemp(r.fs, workdir, "*.js") + if err != nil { + return nil, fmt.Errorf("cannot obtain temporary script filename: %w", err) + } + + if err := afs.WriteFile(scriptFn, script.Script, 0o644); err != nil { + return nil, fmt.Errorf("cannot write temporary script file: %w", err) + } + + k6Path, err := exec.LookPath(r.k6path) + if err != nil { + return nil, fmt.Errorf("cannot find k6 executable: %w", err) + } + + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, checkTimeout) + defer cancel() + + // #nosec G204 -- the variables are not user-controlled + cmd := exec.CommandContext( + ctx, + k6Path, + "run", + "--out", "sm="+metricsFn, + "--log-format", "logfmt", + "--log-output", "file="+logsFn, + "--vus", "1", + "--iterations", "1", + "--max-redirects", "10", + "--batch", "10", + "--batch-per-host", "4", + "--no-connection-reuse", + "--blacklist-ip", r.blacklistedIP, + "--block-hostnames", "*.cluster.local", // TODO(mem): make this configurable + "--summary-time-unit", "s", + // "--discard-response-bodies", // TODO(mem): make this configurable + "--dns", "ttl=30s,select=random,policy=preferIPv4", // TODO(mem): this needs fixing, preferIPv4 is probably not what we want + "--no-thresholds", + "--no-usage-report", + "--no-color", + "--no-summary", + "--verbose", + scriptFn, + ) + + var stdout, stderr bytes.Buffer + + cmd.Dir = workdir + cmd.Stdin = nil + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + start := time.Now() + + r.logger.Info().Str("command", cmd.String()).Bytes("script", script.Script).Msg("running k6 script") + + if err := cmd.Run(); err != nil { + r.logger.Error().Err(err).Str("stdout", stdout.String()).Str("stderr", stderr.String()).Msg("k6 exited with error") + return nil, fmt.Errorf("cannot execute k6 script: %w", err) + } + + duration := time.Since(start) + + r.logger.Debug().Str("stdout", stdout.String()).Str("stderr", stderr.String()).Dur("duration", duration).Msg("ran k6 script") + r.logger.Info().Dur("duration", duration).Msg("ran k6 script") + + var result RunResponse + + result.Metrics, err = afs.ReadFile(metricsFn) + if err != nil { + r.logger.Error().Err(err).Str("filename", metricsFn).Msg("cannot read metrics file") + return nil, fmt.Errorf("cannot read metrics: %w", err) + } + + result.Logs, err = afs.ReadFile(logsFn) + if err != nil { + r.logger.Error().Err(err).Str("filename", logsFn).Msg("cannot read metrics file") + return nil, fmt.Errorf("cannot read logs: %w", err) + } + + r.logger.Debug().Bytes("metrics", result.Metrics).Bytes("logs", result.Logs).Msg("k6 result") + + return &result, nil +} + +func mktemp(fs afero.Fs, dir, pattern string) (string, error) { + f, err := afero.TempFile(fs, dir, pattern) + if err != nil { + return "", fmt.Errorf("cannot create temporary file: %w", err) + } + if err := f.Close(); err != nil { + return "", fmt.Errorf("cannot close temporary file: %w", err) + } + return f.Name(), nil +}