From 88283f8e537dc4448b102566bb6cc4527ffead5a Mon Sep 17 00:00:00 2001 From: Mateusz Urbanek Date: Sat, 30 Sep 2023 22:24:09 +0200 Subject: [PATCH] feat!: introduce health check feature (#49) Signed-off-by: Mateusz Urbanek --- cmd/bootstrap/template/Dockerfile.tpl | 9 ++-- cmd/bootstrap/template/main.go.tpl | 67 ++++++++++++++++++----- errors.go | 28 ++++++++++ go.mod | 1 + go.sum | 6 +-- gocosi.go | 62 ++++++++++++++++++---- healthz.go | 57 ++++++++++++++++++++ healthz_test.go | 76 +++++++++++++++++++++++++++ options.go | 23 ++++++++ 9 files changed, 299 insertions(+), 30 deletions(-) create mode 100644 errors.go create mode 100644 healthz.go create mode 100644 healthz_test.go diff --git a/cmd/bootstrap/template/Dockerfile.tpl b/cmd/bootstrap/template/Dockerfile.tpl index 0f584e7..90174db 100644 --- a/cmd/bootstrap/template/Dockerfile.tpl +++ b/cmd/bootstrap/template/Dockerfile.tpl @@ -41,11 +41,12 @@ WORKDIR /cosi # Set volume mount point for COSI socket. VOLUME [ "/var/lib/cosi" ] -# TODO: add metrics and healthceck port; -# EXPOSE 80 +# Expose the healthcheck port. +EXPOSE 8080 -# TODO: add healthcheck command; -HEALTHCHECK NONE +# Define a healthcheck for the container. +HEALTHCHECK --interval=30s --timeout=15s --retries=3 \ + CMD [ "/usr/bin/cosi-osp", "--healthcheck" ] # Set the default environment. ENV COSI_ENDPOINT="unix:///var/lib/cosi/cosi.sock" diff --git a/cmd/bootstrap/template/main.go.tpl b/cmd/bootstrap/template/main.go.tpl index cbf6ebf..3859fd4 100644 --- a/cmd/bootstrap/template/main.go.tpl +++ b/cmd/bootstrap/template/main.go.tpl @@ -2,12 +2,17 @@ package main import ( "context" + "flag" + "fmt" stdlog "log" "os" + "os/signal" + "syscall" "github.com/doomshrine/gocosi" "github.com/go-logr/logr" "github.com/go-logr/stdr" + "github.com/hellofresh/health-go/v5" "go.opentelemetry.io/otel/sdk/resource" semconv "go.opentelemetry.io/otel/semconv/v1.21.0" // FIXME: this might need manual update @@ -16,15 +21,20 @@ import ( ) var ( - driverName = "cosi.example.com" // FIXME: replace with your own driver name - driverVersion = "v0.1.0" // FIXME: replace with your own driver version + ospName = "cosi.example.com" // FIXME: replace with your own OSP name + ospVersion = "v0.1.0" // FIXME: replace with your own OSP version exporterKind = gocosi.HTTPExporter log logr.Logger + + healthcheck bool ) -func init() { +func main() { + flag.BoolVar(&healthcheck, "healthcheck", false, "") + flag.Parse() + // Setup your logger here. // You can use one of multiple available implementation, like: // - https://github.com/kubernetes/klog/tree/main/klogr @@ -33,35 +43,66 @@ func init() { // - https://github.com/bombsimon/logrusr stdr.SetVerbosity(10) log = stdr.New(stdlog.New(os.Stdout, "", stdlog.LstdFlags)) -} -func main() { gocosi.SetLogger(log) + if err := realMain(context.Background()); err != nil { + log.Error(err, "critical failure") + os.Exit(1) + } +} + +func realMain(ctx context.Context) error { + ctx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + if healthcheck { + return runHealthcheck(ctx) + } + + return runOSP(ctx) +} + +func runHealthcheck(ctx context.Context) error { + err := gocosi.HealthcheckFunc(ctx, gocosi.HealthcheckAddr) + if err != nil { + return fmt.Errorf("healthcheck call failed: %w", err) + } + + return nil +} + +func runOSP(ctx context.Context) error { res := resource.NewWithAttributes( semconv.SchemaURL, - semconv.ServiceName(driverName), - semconv.ServiceVersion(driverVersion), + semconv.ServiceName(ospName), + semconv.ServiceVersion(ospVersion), ) // If there is any additional confifuration needed for your COSI Driver, // put it below this line. driver, err := gocosi.New( - identity.New(driverName, log), + identity.New(ospName, log), provisioner.New(log), res, + gocosi.WithHealthcheck( + health.WithComponent(health.Component{ + Name: ospName, + Version: ospVersion, + }), + ), gocosi.WithDefaultGRPCOptions(), gocosi.WithDefaultMetricExporter(exporterKind), gocosi.WithDefaultTraceExporter(exporterKind), ) if err != nil { - log.Error(err, "failed to create COSI Driver") - os.Exit(1) + return fmt.Errorf("failed to create COSI OSP: %w", err) } - if err := driver.Run(context.Background()); err != nil { - log.Error(err, "failed to run COSI Driver") - os.Exit(1) + if err := driver.Run(ctx); err != nil { + return fmt.Errorf("failed to run COSI OSP: %w", err) } + + return nil } diff --git a/errors.go b/errors.go new file mode 100644 index 0000000..e42a24e --- /dev/null +++ b/errors.go @@ -0,0 +1,28 @@ +package gocosi + +import ( + "errors" + "fmt" + "strings" +) + +var ( + ErrNilMux = errors.New("nil mux") + ErrHealthcheckStatusUnknown = errors.New("healthcheck status unknown") +) + +type ErrHealthCheckFailure struct { + failures map[string]string +} + +var _ error = (*ErrHealthCheckFailure)(nil) + +func (err *ErrHealthCheckFailure) Error() string { + reasons := []string{} + + for service, reason := range err.failures { + reasons = append(reasons, fmt.Sprintf("%s (reason: '%s')", service, reason)) + } + + return "healthcheck failed: " + strings.Join(reasons, ", ") +} diff --git a/go.mod b/go.mod index f183ada..11606ba 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/doomshrine/testcontext v1.0.0 github.com/go-logr/logr v1.2.4 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 + github.com/hellofresh/health-go/v5 v5.3.0 github.com/stretchr/testify v1.8.4 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.44.0 go.opentelemetry.io/otel v1.18.0 diff --git a/go.sum b/go.sum index 436c625..c1349b9 100644 --- a/go.sum +++ b/go.sum @@ -28,12 +28,12 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0 h1:2cz5kSrxzMYHiWOBbKj8itQm+nRykkB8aMv4ThcHYHA= -github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0/go.mod h1:w9Y7gY31krpLmrVU5ZPG9H7l9fZuRu5/3R3S3FMtVQ4= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 h1:HcUWd006luQPljE73d5sk+/VgYPGUReEVz2y1/qylwY= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1/go.mod h1:w9Y7gY31krpLmrVU5ZPG9H7l9fZuRu5/3R3S3FMtVQ4= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= +github.com/hellofresh/health-go/v5 v5.3.0 h1:T0tapAAuqVIiagRn0YQzFoIPAQek120/vQYPxpMMJ9M= +github.com/hellofresh/health-go/v5 v5.3.0/go.mod h1:N6MLoACjLHjQQhQh+m2S1rXj1PuSBs/5uI32JKBzwf8= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -95,8 +95,6 @@ google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 h1: google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98/go.mod h1:rsr7RhLuwsDKL7RmgDDCUc6yaGr1iqceVb5Wv6f6YvQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20230731193218-e0aa005b6bdf h1:guOdSPaeFgN+jEJwTo1dQ71hdBm+yKSCCKuTRkJzcVo= google.golang.org/genproto/googleapis/rpc v0.0.0-20230731193218-e0aa005b6bdf/go.mod h1:zBEcrKX2ZOcEkHWxBPAIvYUWOKKMIhYcmNiUIu2ji3I= -google.golang.org/grpc v1.58.1 h1:OL+Vz23DTtrrldqHK49FUOPHyY75rvFqJfXC84NYW58= -google.golang.org/grpc v1.58.1/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= google.golang.org/grpc v1.58.2 h1:SXUpjxeVF3FKrTYQI4f4KvbGD5u2xccdYdurwowix5I= google.golang.org/grpc v1.58.2/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= diff --git a/gocosi.go b/gocosi.go index 99ee04f..53e78b3 100644 --- a/gocosi.go +++ b/gocosi.go @@ -18,12 +18,13 @@ import ( "context" "errors" "fmt" + "net/http" "net/url" - "os/signal" - "syscall" + "time" "github.com/doomshrine/must" "github.com/go-logr/logr" + "github.com/hellofresh/health-go/v5" "go.opentelemetry.io/otel/sdk/resource" "google.golang.org/grpc" cosi "sigs.k8s.io/container-object-storage-interface-spec" @@ -44,6 +45,10 @@ type Driver struct { endpoint *Endpoint grpcOptions []grpc.ServerOption + server *http.Server + mux *http.ServeMux + healthz *health.Health + logger logr.Logger otelCollector string } @@ -53,12 +58,24 @@ type Option func(*Driver) error // New creates a new instance of the COSI driver. func New(identity cosi.IdentityServer, provisioner cosi.ProvisionerServer, res *resource.Resource, opts ...Option) (*Driver, error) { + mux := http.NewServeMux() + p := &Driver{ identity: identity, provisioner: provisioner, resource: res, + mux: mux, + server: &http.Server{ + Addr: ":8080", + Handler: mux, + ReadTimeout: 1 * time.Second, + WriteTimeout: 1 * time.Second, + IdleTimeout: 30 * time.Second, + ReadHeaderTimeout: 2 * time.Second, + }, + endpoint: &Endpoint{ permissions: 0o755, address: must.Do(url.Parse(cosiSocket)), @@ -83,9 +100,6 @@ func SetLogger(l logr.Logger) { // Run starts the COSI driver and serves requests. func (d *Driver) Run(ctx context.Context) error { - ctx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) - defer cancel() - lis, err := d.endpoint.Listener(ctx) if err != nil { return fmt.Errorf("listener creation failed: %w", err) @@ -97,10 +111,9 @@ func (d *Driver) Run(ctx context.Context) error { return fmt.Errorf("gRPC server creation failed: %w", err) } - go func() { - <-ctx.Done() - srv.GracefulStop() - }() + go d.serveHTTP() + + go shutdown(ctx, srv, d.server) log.V(4).Info("starting driver", "address", lis.Addr()) @@ -112,6 +125,37 @@ func (d *Driver) Run(ctx context.Context) error { return nil } +func shutdown(ctx context.Context, g *grpc.Server, h *http.Server) { + log.V(8).Info("shutdown watcher started") + <-ctx.Done() + log.Info("starting shutdown") + + if g != nil { + go g.GracefulStop() + } + + if h != nil { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + go func() { + err := h.Shutdown(shutdownCtx) + if err != nil { + log.Error(err, "error during HTTP server shutdown") + } + }() + } +} + +func (d *Driver) serveHTTP() { + log.V(8).Info("http server started", "address", d.server.Addr) + + err := d.server.ListenAndServe() + if err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Error(err, "failed to serve HTTP server", "address", d.server.Addr) + } +} + func (d *Driver) grpcServer() (*grpc.Server, error) { server := grpc.NewServer(d.grpcOptions...) diff --git a/healthz.go b/healthz.go new file mode 100644 index 0000000..a54a27e --- /dev/null +++ b/healthz.go @@ -0,0 +1,57 @@ +package gocosi + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/hellofresh/health-go/v5" +) + +const ( + // HealthcheckEndpoint is the HTTP endpoint path for the healthcheck service. + HealthcheckEndpoint = "/healthz" + + // HealthcheckAddr. + HealthcheckAddr = "http://localhost:8080" + HealthcheckEndpoint +) + +// HealthcheckFunc. +func HealthcheckFunc(ctx context.Context, addr string) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, addr, nil) + if err != nil { + return fmt.Errorf("unable to create new request: %w", err) + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("healthcheck failed: %w", err) + } + defer res.Body.Close() + + c := &health.Check{} + + err = json.NewDecoder(res.Body).Decode(c) + if err != nil { + return fmt.Errorf("unable to decode healthcheck response: %w", err) + } + + log.Info("healthcheck finished", + "status", c.Status, + "system", c.System, + "failures", c.Failures, + "component", c.Component, + ) + + switch res.StatusCode { + case http.StatusOK: + return nil + + case http.StatusServiceUnavailable: + return &ErrHealthCheckFailure{failures: c.Failures} + + default: + return ErrHealthcheckStatusUnknown + } +} diff --git a/healthz_test.go b/healthz_test.go new file mode 100644 index 0000000..b14da28 --- /dev/null +++ b/healthz_test.go @@ -0,0 +1,76 @@ +package gocosi + +import ( + "context" + "fmt" + "net/http" + "testing" + "time" + + "github.com/doomshrine/must" + "github.com/doomshrine/testcontext" + "github.com/hellofresh/health-go/v5" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestHealthcheckProbe(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + assertion func(assert.TestingT, error, ...interface{}) bool + port int + healthz *health.Health + }{ + { + name: "default", + assertion: assert.NoError, + port: 30001, + healthz: must.Do(health.New()), + }, + { + name: "error", + assertion: assert.Error, + port: 30002, + healthz: must.Do(health.New(health.WithChecks( + health.Config{ + Name: "test", + Timeout: time.Second, + Check: func(ctx context.Context) error { + return fmt.Errorf("forced error") + }, + }, + ))), + }, + } { + tc := tc + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + ctx, cancel := testcontext.FromTimeout(context.Background(), t, time.Second) + defer cancel() + + mux := http.NewServeMux() + mux.Handle(HealthcheckEndpoint, tc.healthz.Handler()) + + server := &http.Server{ + Addr: fmt.Sprintf(":%d", tc.port), + Handler: mux, + ReadTimeout: 1 * time.Second, + WriteTimeout: 1 * time.Second, + IdleTimeout: 30 * time.Second, + ReadHeaderTimeout: 2 * time.Second, + } + defer server.Shutdown(ctx) //nolint:errcheck + + go func() { + err := server.ListenAndServe() + require.ErrorIs(t, err, http.ErrServerClosed) + }() + + err := HealthcheckFunc(ctx, fmt.Sprintf("http://localhost:%d/healthz", tc.port)) + tc.assertion(t, err) + }) + } +} diff --git a/options.go b/options.go index be57455..60f0459 100644 --- a/options.go +++ b/options.go @@ -26,6 +26,7 @@ import ( grpclog "github.com/doomshrine/gocosi/grpc/log" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" + "github.com/hellofresh/health-go/v5" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" @@ -253,6 +254,28 @@ func WithGRPCTraceExporter(opt ...otlptracegrpc.Option) Option { } } +// WithHealthcheck returns an Option function that sets up a healthcheck service for the driver. +// It accepts options for configuring the healthcheck service. +func WithHealthcheck(options ...health.Option) Option { + return func(d *Driver) error { + h, err := health.New(options...) + if err != nil { + return fmt.Errorf("unable to create new healthcheck service: %w", err) + } + + d.healthz = h + + if d.mux == nil { + // This should not occur, but just for my sanity... + return ErrNilMux + } + + d.mux.Handle(HealthcheckEndpoint, d.healthz.Handler()) + + return nil + } +} + func registerTraceExporter(res *resource.Resource, exporter sdktrace.SpanExporter) (func(context.Context) error, error) { bsp := sdktrace.NewBatchSpanProcessor(exporter)