diff --git a/cmd/csi_driver/main.go b/cmd/csi_driver/main.go index b83b4d2f2..f0e8c4cb4 100644 --- a/cmd/csi_driver/main.go +++ b/cmd/csi_driver/main.go @@ -30,6 +30,7 @@ import ( "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/cloud_provider/storage" driver "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/csi_driver" csimounter "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/csi_mounter" + "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/metrics" "k8s.io/klog/v2" "k8s.io/mount-utils" ) @@ -44,6 +45,7 @@ var ( identityProvider = flag.String("identity-provider", "", "The Identity Provider to authenticate with GCS API.") enableProfiling = flag.Bool("enable-profiling", false, "enable the golang pprof at port 6060") informerResyncDurationSec = flag.Int("informer-resync-duration-sec", 1800, "informer resync duration in seconds") + metricsEndpoint = flag.String("metrics-endpoint", "", "The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled.") // These are set at compile time. version = "unknown" @@ -91,6 +93,7 @@ func main() { } var mounter mount.Interface + var mm metrics.Manager if *runNode { if *nodeID == "" { klog.Fatalf("NodeID cannot be empty for node service") @@ -102,6 +105,11 @@ func main() { if err != nil { klog.Fatalf("Failed to prepare CSI mounter: %v", err) } + + if *metricsEndpoint != "" { + mm = metrics.NewMetricsManager(*metricsEndpoint) + mm.InitializeHTTPHandler() + } } config := &driver.GCSDriverConfig{ @@ -114,6 +122,7 @@ func main() { TokenManager: tm, Mounter: mounter, K8sClients: clientset, + MetricsManager: mm, } gcfsDriver, err := driver.NewGCSDriver(config) diff --git a/deploy/base/node/node.yaml b/deploy/base/node/node.yaml index 90bedeef8..578fea93a 100755 --- a/deploy/base/node/node.yaml +++ b/deploy/base/node/node.yaml @@ -52,6 +52,10 @@ spec: - --nodeid=$(KUBE_NODE_NAME) - --node=true - --identity-provider=$(IDENTITY_PROVIDER) + - --metrics-endpoint=:9920 + ports: + - containerPort: 9920 + name: metrics resources: limits: cpu: 200m diff --git a/deploy/overlays/dev/node_pprof.yaml b/deploy/overlays/dev/node_pprof.yaml index 27ee4b389..6862500ca 100755 --- a/deploy/overlays/dev/node_pprof.yaml +++ b/deploy/overlays/dev/node_pprof.yaml @@ -27,6 +27,8 @@ spec: - --endpoint=unix:/csi/csi.sock - --nodeid=$(KUBE_NODE_NAME) - --node=true + - --identity-provider=$(IDENTITY_PROVIDER) + - --metrics-endpoint=:9920 - --enable-profiling=true ports: - containerPort: 6060 \ No newline at end of file diff --git a/docs/metrics/metrics.md b/docs/metrics/metrics.md new file mode 100644 index 000000000..ab79c8840 --- /dev/null +++ b/docs/metrics/metrics.md @@ -0,0 +1,130 @@ + + +# Metrics + +Google Cloud Storage FUSE offers client-side Prometheus metrics for monitoring file system operations and performance. These metrics, gathered by the sidecar container, are forwarded to the CSI driver node servers. You can access these metrics via a Prometheus server. This guide explains how to enable client-side metrics, set up a Prometheus server, and query the metrics for insights. + +## Enable metrics collection in your workload + +You don't need to set anything to enable the metrics collection. The metrics collection is enabled by default. + +To **disable** the metrics collection, set the volume attribute `disableMetrics: "true"`. + +For in-line ephemeral volumes: + +```yaml +... +spec: + volumes: + - name: gcs-fuse-csi-ephemeral + csi: + driver: gcsfuse.csi.storage.gke.io + volumeAttributes: + bucketName: + disableMetrics: "true" +``` + +For `PersistentVolume` volumes: + +```yaml +apiVersion: v1 +kind: PersistentVolume +metadata: + name: gcs-fuse-csi-pv +spec: + ... + csi: + driver: gcsfuse.csi.storage.gke.io + volumeHandle: + volumeAttributes: + disableMetrics: "true" +``` + +## Install Helm + +The example uses Helm charts to manage Prometheus server. Follow the [Helm documentation](https://helm.sh/docs/intro/install/#from-script) to install Helm. + +## Install a Prometheus server to collect metrics + +Add Prometheus Helm repo and update the repo. + +```bash +helm repo add prometheus-community https://prometheus-community.github.io/helm-charts +helm repo update +``` + +Create a new Kubernetes namespace `prometheus`, and install a Prometheus server using Helm. Note that the following example only installs a Prometheus server without other auxiliary components. + +```bash +kubectl create namespace prometheus + +helm install prometheus prometheus-community/prometheus \ +--namespace prometheus \ +--values ./docs/metrics/prometheus-values.yaml +``` + +## Connect to the Prometheus server + +Create a new terminal session and run the following command to forward traffic from your local machine to the Prometheus server. + +```bash +export POD_NAME=$(kubectl get pods --namespace prometheus -l "app.kubernetes.io/name=prometheus,app.kubernetes.io/instance=prometheus" -o jsonpath="{.items[0].metadata.name}") +kubectl --namespace prometheus port-forward $POD_NAME 9920 +``` + +## Open Prometheus UI to query metrics + +Open a web browser and go to the following URL. The example shows a graph of the metric `fs_ops_count`. + +> + +Below is a list of supported Google Cloud Storage FUSE metrics: + +- fs_ops_count +- fs_ops_error_count +- fs_ops_latency +- gcs_download_bytes_count +- gcs_read_count +- gcs_read_bytes_count +- gcs_reader_count +- gcs_request_count +- gcs_request_latencies +- file_cache_read_count +- file_cache_read_bytes_count +- file_cache_read_latencies + +See the [Google Cloud Storage FUSE Metrics documentation](https://github.com/GoogleCloudPlatform/gcsfuse/blob/master/docs/metrics.md) for detailed explanation. + +In the CSI driver, each metric record includes the following extra labels so that you can filter and aggregate metrics. + +- pod_name +- namespace_name +- volume_name +- bucket_name + +The Prometheus UI provides an easy interface to query and visualize metrics. See [Querying Prometheus documentation](https://prometheus.io/docs/prometheus/latest/querying/basics/) for details. + +## Clean up Prometheus server + +Run the following command to clean up the Prometheus server. + +Warning: the following command will clean up the PV and PVC storing Prometheus data. If you need to retain the metrics data, in the step [Install a Prometheus server to collect metrics](#install-a-prometheus-server-to-collect-metrics), create a `StorageClass` with `reclaimPolicy: Retain`, and set the helm parameter `server.persistentVolume.storageClass` using the new `StorageClass` name. + +```bash +helm uninstall prometheus --namespace prometheus +``` diff --git a/docs/metrics/prometheus-values.yaml b/docs/metrics/prometheus-values.yaml new file mode 100644 index 000000000..b6fa02c20 --- /dev/null +++ b/docs/metrics/prometheus-values.yaml @@ -0,0 +1,49 @@ +# Copyright 2018 The Kubernetes Authors. +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +server: + persistentVolume: + size: 20Gi +prometheus-pushgateway: + enabled: false +alertmanager: + enabled: false +kube-state-metrics: + enabled: false +prometheus-node-exporter: + enabled: false +serverFiles: + prometheus.yml: + scrape_configs: + - job_name: 'gcsfuse-csi-node-pods' + + scrape_interval: 10s + scrape_timeout: 2s + + kubernetes_sd_configs: + - role: pod + + relabel_configs: + - source_labels: [__meta_kubernetes_pod_label_k8s_app] + action: keep + regex: gcs-fuse-csi-driver + - source_labels: [__meta_kubernetes_pod_ip] + action: replace + regex: ((([0-9]+?)(\.|$)){4}) + replacement: $1:9920 + target_label: __address__ + - source_labels: [__meta_kubernetes_pod_node_name] + action: replace + target_label: node diff --git a/go.mod b/go.mod index 851afb21d..30f4cc75d 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,9 @@ require ( github.com/kubernetes-csi/csi-test/v5 v5.2.0 github.com/onsi/ginkgo/v2 v2.19.1 github.com/onsi/gomega v1.34.1 + github.com/prometheus/client_golang v1.18.0 + github.com/prometheus/client_model v0.6.0 + github.com/prometheus/common v0.46.0 golang.org/x/net v0.27.0 golang.org/x/oauth2 v0.22.0 golang.org/x/time v0.6.0 @@ -93,9 +96,6 @@ require ( github.com/opencontainers/selinux v1.11.0 // indirect github.com/pelletier/go-toml/v2 v2.2.1 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.18.0 // indirect - github.com/prometheus/client_model v0.6.0 // indirect - github.com/prometheus/common v0.46.0 // indirect github.com/prometheus/procfs v0.13.0 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect diff --git a/pkg/csi_driver/gcs_fuse_driver.go b/pkg/csi_driver/gcs_fuse_driver.go index 235e0a71b..19dd71f7a 100644 --- a/pkg/csi_driver/gcs_fuse_driver.go +++ b/pkg/csi_driver/gcs_fuse_driver.go @@ -25,6 +25,7 @@ import ( "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/cloud_provider/auth" "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/cloud_provider/clientset" "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/cloud_provider/storage" + "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/metrics" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/klog/v2" @@ -43,6 +44,7 @@ type GCSDriverConfig struct { TokenManager auth.TokenManager Mounter mount.Interface K8sClients clientset.Interface + MetricsManager metrics.Manager } type GCSDriver struct { diff --git a/pkg/csi_driver/gcs_fuse_driver_test.go b/pkg/csi_driver/gcs_fuse_driver_test.go index 6eccdcb02..e19c004dc 100755 --- a/pkg/csi_driver/gcs_fuse_driver_test.go +++ b/pkg/csi_driver/gcs_fuse_driver_test.go @@ -25,6 +25,7 @@ import ( "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/cloud_provider/auth" "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/cloud_provider/clientset" "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/cloud_provider/storage" + "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/metrics" mount "k8s.io/mount-utils" ) @@ -40,6 +41,7 @@ func initTestDriver(t *testing.T, fm *mount.FakeMounter) *GCSDriver { TokenManager: auth.NewFakeTokenManager(), Mounter: fm, K8sClients: &clientset.FakeClientset{}, + MetricsManager: &metrics.FakeMetricsManager{}, } driver, err := NewGCSDriver(config) if err != nil { diff --git a/pkg/csi_driver/node.go b/pkg/csi_driver/node.go index eb36676da..895c564b2 100644 --- a/pkg/csi_driver/node.go +++ b/pkg/csi_driver/node.go @@ -89,7 +89,7 @@ func (s *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublish } // Validate arguments - targetPath, bucketName, fuseMountOptions, skipBucketAccessCheck, err := parseRequestArguments(req) + targetPath, bucketName, fuseMountOptions, skipBucketAccessCheck, disableMetricsCollection, err := parseRequestArguments(req) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -151,6 +151,13 @@ func (s *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublish return nil, status.Error(codes.FailedPrecondition, "failed to find the sidecar container in Pod spec") } + // Register metrics collecter. + // It is idempotent to register the same collector in node republish calls. + if s.driver.config.MetricsManager != nil && !disableMetricsCollection { + klog.V(6).Infof("NodePublishVolume enabling metrics collector for target path %q", targetPath) + s.driver.config.MetricsManager.RegisterMetricsCollector(targetPath, pod.Namespace, pod.Name, bucketName) + } + // Check if the sidecar container is still required, // if not, put an exit file to the emptyDir path to // notify the sidecar container to exit. @@ -220,6 +227,11 @@ func (s *nodeServer) NodeUnpublishVolume(_ context.Context, req *csi.NodeUnpubli } defer s.volumeLocks.Release(targetPath) + // Unregister metrics collecter. + if s.driver.config.MetricsManager != nil { + s.driver.config.MetricsManager.UnregisterMetricsCollector(targetPath) + } + delete(s.volumeStateStore, targetPath) // Check if the target path is already mounted diff --git a/pkg/csi_driver/utils.go b/pkg/csi_driver/utils.go index 1a3d34478..84b2102cc 100644 --- a/pkg/csi_driver/utils.go +++ b/pkg/csi_driver/utils.go @@ -51,6 +51,7 @@ const ( VolumeContextKeyMetadataCacheTTLSeconds = "metadataCacheTTLSeconds" VolumeContextKeyGcsfuseLoggingSeverity = "gcsfuseLoggingSeverity" VolumeContextKeySkipCSIBucketAccessCheck = "skipCSIBucketAccessCheck" + VolumeContextKeyDisableMetrics = "disableMetrics" //nolint:revive,stylecheck VolumeContextKeyMetadataCacheTtlSeconds = "metadataCacheTtlSeconds" @@ -178,14 +179,16 @@ var volumeAttributesToMountOptionsMapping = map[string]string{ VolumeContextKeyMetadataCacheTtlSeconds: "metadata-cache:ttl-secs:", VolumeContextKeyGcsfuseLoggingSeverity: "logging:severity:", VolumeContextKeySkipCSIBucketAccessCheck: "", + VolumeContextKeyDisableMetrics: util.DisableMetricsForGKE + ":", } // parseVolumeAttributes parses volume attributes and convert them to gcsfuse mount options. -func parseVolumeAttributes(fuseMountOptions []string, volumeContext map[string]string) ([]string, bool, error) { +func parseVolumeAttributes(fuseMountOptions []string, volumeContext map[string]string) ([]string, bool, bool, error) { if mountOptions, ok := volumeContext[VolumeContextKeyMountOptions]; ok { fuseMountOptions = joinMountOptions(fuseMountOptions, strings.Split(mountOptions, ",")) } skipCSIBucketAccessCheck := false + disableMetricsCollection := false for volumeAttribute, mountOption := range volumeAttributesToMountOptionsMapping { value, ok := volumeContext[volumeAttribute] if !ok { @@ -200,7 +203,7 @@ func parseVolumeAttributes(fuseMountOptions []string, volumeContext map[string]s case VolumeContextKeyFileCacheCapacity, VolumeContextKeyMetadataStatCacheCapacity, VolumeContextKeyMetadataTypeCacheCapacity: quantity, err := resource.ParseQuantity(value) if err != nil { - return nil, skipCSIBucketAccessCheck, fmt.Errorf("volume attribute %v only accepts a valid Quantity value, got %q, error: %w", volumeAttribute, value, err) + return nil, skipCSIBucketAccessCheck, disableMetricsCollection, fmt.Errorf("volume attribute %v only accepts a valid Quantity value, got %q, error: %w", volumeAttribute, value, err) } megabytes := quantity.Value() @@ -216,7 +219,7 @@ func parseVolumeAttributes(fuseMountOptions []string, volumeContext map[string]s mountOptionWithValue = mountOption + value // parse bool volume attributes - case VolumeContextKeyFileCacheForRangeRead, VolumeContextKeySkipCSIBucketAccessCheck: + case VolumeContextKeyFileCacheForRangeRead, VolumeContextKeySkipCSIBucketAccessCheck, VolumeContextKeyDisableMetrics: if boolVal, err := strconv.ParseBool(value); err == nil { if volumeAttribute == VolumeContextKeySkipCSIBucketAccessCheck { skipCSIBucketAccessCheck = boolVal @@ -226,9 +229,13 @@ func parseVolumeAttributes(fuseMountOptions []string, volumeContext map[string]s continue } + if volumeAttribute == VolumeContextKeyDisableMetrics { + disableMetricsCollection = boolVal + } + mountOptionWithValue = mountOption + strconv.FormatBool(boolVal) } else { - return nil, skipCSIBucketAccessCheck, fmt.Errorf("volume attribute %v only accepts a valid bool value, got %q", volumeAttribute, value) + return nil, skipCSIBucketAccessCheck, disableMetricsCollection, fmt.Errorf("volume attribute %v only accepts a valid bool value, got %q", volumeAttribute, value) } // parse int volume attributes @@ -240,7 +247,7 @@ func parseVolumeAttributes(fuseMountOptions []string, volumeContext map[string]s mountOptionWithValue = mountOption + strconv.Itoa(intVal) } else { - return nil, skipCSIBucketAccessCheck, fmt.Errorf("volume attribute %v only accepts a valid int value, got %q", volumeAttribute, value) + return nil, skipCSIBucketAccessCheck, disableMetricsCollection, fmt.Errorf("volume attribute %v only accepts a valid int value, got %q", volumeAttribute, value) } default: @@ -250,14 +257,14 @@ func parseVolumeAttributes(fuseMountOptions []string, volumeContext map[string]s fuseMountOptions = joinMountOptions(fuseMountOptions, []string{mountOptionWithValue}) } - return fuseMountOptions, skipCSIBucketAccessCheck, nil + return fuseMountOptions, skipCSIBucketAccessCheck, disableMetricsCollection, nil } // parseRequestArguments parses arguments from given NodePublishVolumeRequest. -func parseRequestArguments(req *csi.NodePublishVolumeRequest) (string, string, []string, bool, error) { +func parseRequestArguments(req *csi.NodePublishVolumeRequest) (string, string, []string, bool, bool, error) { targetPath := req.GetTargetPath() if len(targetPath) == 0 { - return "", "", nil, false, errors.New("NodePublishVolume target path must be provided") + return "", "", nil, false, false, errors.New("NodePublishVolume target path must be provided") } vc := req.GetVolumeContext() @@ -265,7 +272,7 @@ func parseRequestArguments(req *csi.NodePublishVolumeRequest) (string, string, [ if vc[VolumeContextKeyEphemeral] == util.TrueStr { bucketName = vc[VolumeContextKeyBucketName] if len(bucketName) == 0 { - return "", "", nil, false, fmt.Errorf("NodePublishVolume VolumeContext %q must be provided for ephemeral storage", VolumeContextKeyBucketName) + return "", "", nil, false, false, fmt.Errorf("NodePublishVolume VolumeContext %q must be provided for ephemeral storage", VolumeContextKeyBucketName) } } @@ -284,12 +291,12 @@ func parseRequestArguments(req *csi.NodePublishVolumeRequest) (string, string, [ fuseMountOptions = joinMountOptions(fuseMountOptions, capMount.GetMountFlags()) } - fuseMountOptions, skipCSIBucketAccessCheck, err := parseVolumeAttributes(fuseMountOptions, vc) + fuseMountOptions, skipCSIBucketAccessCheck, enableMetricsCollection, err := parseVolumeAttributes(fuseMountOptions, vc) if err != nil { - return "", "", nil, false, err + return "", "", nil, false, false, err } - return targetPath, bucketName, fuseMountOptions, skipCSIBucketAccessCheck, nil + return targetPath, bucketName, fuseMountOptions, skipCSIBucketAccessCheck, enableMetricsCollection, nil } func putExitFile(pod *corev1.Pod, targetPath string) error { diff --git a/pkg/csi_driver/utils_test.go b/pkg/csi_driver/utils_test.go index 9a6c0f93b..d2d27d6ea 100644 --- a/pkg/csi_driver/utils_test.go +++ b/pkg/csi_driver/utils_test.go @@ -70,11 +70,12 @@ func TestParseVolumeAttributes(t *testing.T) { t.Run("parsing volume attributes into mount options", func(t *testing.T) { t.Parallel() testCases := []struct { - name string - volumeContext map[string]string - expectedMountOptions []string - expectedSkipBucketAccessCheck bool - expectedErr bool + name string + volumeContext map[string]string + expectedMountOptions []string + expectedSkipBucketAccessCheck bool + expectedDisableMetricsCollection bool + expectedErr bool }{ { name: "should return correct fileCacheCapacity 1", @@ -372,11 +373,28 @@ func TestParseVolumeAttributes(t *testing.T) { volumeContext: map[string]string{VolumeContextKeySkipCSIBucketAccessCheck: util.FalseStr}, expectedMountOptions: []string{}, }, + { + name: "unexpected value for VolumeContextKeyDisableMetrics", + volumeContext: map[string]string{VolumeContextKeyDisableMetrics: "blah"}, + expectedErr: true, + }, + { + name: "value set to true for VolumeContextKeyDisableMetrics", + volumeContext: map[string]string{VolumeContextKeyDisableMetrics: util.TrueStr}, + expectedMountOptions: []string{volumeAttributesToMountOptionsMapping[VolumeContextKeyDisableMetrics] + util.TrueStr}, + expectedDisableMetricsCollection: true, + }, + { + name: "value set to false for VolumeContextKeyDisableMetrics", + volumeContext: map[string]string{VolumeContextKeyDisableMetrics: util.FalseStr}, + expectedMountOptions: []string{volumeAttributesToMountOptionsMapping[VolumeContextKeyDisableMetrics] + util.FalseStr}, + expectedDisableMetricsCollection: false, + }, } for _, tc := range testCases { t.Logf("test case: %s", tc.name) - output, skipCSIBucketAccessCheck, err := parseVolumeAttributes([]string{}, tc.volumeContext) + output, skipCSIBucketAccessCheck, disableMetricsCollection, err := parseVolumeAttributes([]string{}, tc.volumeContext) if (err != nil) != tc.expectedErr { t.Errorf("Got error %v, but expected error %v", err, tc.expectedErr) } @@ -387,6 +405,9 @@ func TestParseVolumeAttributes(t *testing.T) { if tc.expectedSkipBucketAccessCheck != skipCSIBucketAccessCheck { t.Errorf("Got skipBucketAccessCheck %v, but expected %v", skipCSIBucketAccessCheck, tc.expectedSkipBucketAccessCheck) } + if tc.expectedDisableMetricsCollection != disableMetricsCollection { + t.Errorf("Got disableMetricsCollection %v, but expected %v", disableMetricsCollection, tc.expectedDisableMetricsCollection) + } less := func(a, b string) bool { return a > b } if diff := cmp.Diff(output, tc.expectedMountOptions, cmpopts.SortSlices(less)); diff != "" { diff --git a/pkg/metrics/fake.go b/pkg/metrics/fake.go new file mode 100644 index 000000000..4ef6d2e4b --- /dev/null +++ b/pkg/metrics/fake.go @@ -0,0 +1,26 @@ +/* +Copyright 2018 The Kubernetes Authors. +Copyright 2022 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +type FakeMetricsManager struct{} + +func (*FakeMetricsManager) InitializeHTTPHandler() {} + +func (*FakeMetricsManager) RegisterMetricsCollector(_, _, _, _ string) {} + +func (*FakeMetricsManager) UnregisterMetricsCollector(_ string) {} diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 000000000..503ee5936 --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,255 @@ +/* +Copyright 2018 The Kubernetes Authors. +Copyright 2022 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "fmt" + "net/http" + "os" + "strings" + "time" + + "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "k8s.io/klog/v2" +) + +const ( + metricsPath = "/metrics" + metricsFileName = "/metrics.prom" +) + +type Manager interface { + InitializeHTTPHandler() + RegisterMetricsCollector(targetPath, podNamespace, podName, bucketName string) + UnregisterMetricsCollector(targetPath string) +} + +type manager struct { + registry *prometheus.Registry + metricsEndpoint string +} + +func NewMetricsManager(metricsEndpoint string) Manager { + mm := &manager{ + registry: prometheus.NewRegistry(), + metricsEndpoint: metricsEndpoint, + } + + return mm +} + +// InitializeHTTPHandler sets up a server and creates a handler for metrics. +func (mm *manager) InitializeHTTPHandler() { + mux := http.NewServeMux() + mux.HandleFunc(metricsPath, promhttp.HandlerFor(mm.registry, promhttp.HandlerOpts{}).ServeHTTP) + + // Configure the http server and start it. + metricServer := &http.Server{ + Addr: mm.metricsEndpoint, + Handler: mux, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + MaxHeaderBytes: 1 << 20, + } + + go func() { + klog.Infof("metric server listening at %q", mm.metricsEndpoint) + if err := metricServer.ListenAndServe(); err != nil { + klog.Errorf("failed to start metric server at specified endpoint %q and path %q: %v", mm.metricsEndpoint, metricsPath, err) + } + }() +} + +// RegisterMetricsCollector registers the metrics collector. It is idempotent to register the same collector. +func (mm *manager) RegisterMetricsCollector(targetPath, podNamespace, podName, bucketName string) { + emptyDirBasePath, err := util.PrepareEmptyDir(targetPath, false) + if err != nil { + klog.Errorf("failed to register metrics collector for pod %v/%v, bucket %q: %v", podNamespace, podName, bucketName, err) + + return + } + + podUID, volumeName, _ := util.ParsePodIDVolumeFromTargetpath(targetPath) + promFilePath := emptyDirBasePath + metricsFileName + c := NewTextFileCollector(promFilePath, podUID, volumeName, map[string]string{ + "pod_name": podName, + "namespace_name": podNamespace, + "volume_name": volumeName, + "bucket_name": bucketName, + }) + if err := mm.registry.Register(c); err != nil && !strings.Contains(err.Error(), prometheus.AlreadyRegisteredError{}.Error()) { + klog.Errorf("failed to register metrics collector for pod %v/%v, volume %q, bucket %q: %v", podNamespace, podName, volumeName, bucketName, err) + } +} + +// UnregisterMetricsCollector unregisters the metrics collector. +func (mm *manager) UnregisterMetricsCollector(targetPath string) { + podUID, volumeName, _ := util.ParsePodIDVolumeFromTargetpath(targetPath) + + // textFileCollector uses a hash of pod UID and volume name as an identifier. + c := NewTextFileCollector("", podUID, volumeName, nil) + if ok := mm.registry.Unregister(c); !ok { + klog.Infof("Unregister metrics collector for targetPath %q is not needed since the collector is not registered", targetPath) + } +} + +type textFileCollector struct { + path string + constLabels map[string]string + podUID string + volumeName string +} + +// NewTextFileCollector returns a new Collector exposing metrics read from the give path. +func NewTextFileCollector(path, podUID, volumeName string, labels map[string]string) prometheus.Collector { + c := &textFileCollector{ + path: path, + constLabels: labels, + podUID: podUID, + volumeName: volumeName, + } + + return c +} + +// Describe emits the description of metrics. +// Prometheus Registry relies on this func to identify collectors. +func (c *textFileCollector) Describe(ch chan<- *prometheus.Desc) { + // Collector id is a hash of the values of the ConstLabels and fqName. + ch <- prometheus.NewDesc("gke_gcsfuse_csi_metric", "GKE GCSFuse CSI metric.", nil, map[string]string{"pod_uid": c.podUID, "volume_name": c.volumeName}) +} + +// Collect emits metrics. +func (c *textFileCollector) Collect(ch chan<- prometheus.Metric) { + families, err := ProcessMetricsFile(c.path) + if err != nil { + klog.Errorf("failed to process metrics from metrics file: %v", err) + + return + } + + for _, mf := range families { + c.emitMetricFamily(mf, ch) + } +} + +// ProcessMetricsFile processes a metrics file that follows Prometheus text format: https://prometheus.io/docs/instrumenting/exposition_formats/, +// returning its MetricFamily. +func ProcessMetricsFile(path string) (map[string]*dto.MetricFamily, error) { + f, err := os.Open(path) + if err != nil { + return nil, fmt.Errorf("failed to open metrics file %q: %w", path, err) + } + defer f.Close() + + var parser expfmt.TextParser + metricFamilies, err := parser.TextToMetricFamilies(f) + if err != nil { + return nil, fmt.Errorf("failed to parse metrics file %q: %w", path, err) + } + + return metricFamilies, nil +} + +// emitMetricFamily iterates MetricFamily, converts metricFamily.Metric to prometheus.Metric, and emits the metric via the given chan. +func (c *textFileCollector) emitMetricFamily(metricFamily *dto.MetricFamily, ch chan<- prometheus.Metric) { + var valType prometheus.ValueType + var val float64 + + for _, metric := range metricFamily.GetMetric() { + var LabelNames []string + var LabelValues []string + for _, label := range metric.GetLabel() { + LabelNames = append(LabelNames, label.GetName()) + LabelValues = append(LabelValues, label.GetValue()) + } + + for n, v := range c.constLabels { + LabelNames = append(LabelNames, n) + LabelValues = append(LabelValues, v) + } + + emitNewConstMetric := func() { + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + metricFamily.GetName(), + metricFamily.GetHelp(), + LabelNames, nil, + ), + valType, val, LabelValues..., + ) + } + + metricType := metricFamily.GetType() + switch metricType { + case dto.MetricType_COUNTER: + valType = prometheus.CounterValue + val = metric.GetCounter().GetValue() + emitNewConstMetric() + + case dto.MetricType_GAUGE: + valType = prometheus.GaugeValue + val = metric.GetGauge().GetValue() + emitNewConstMetric() + + case dto.MetricType_UNTYPED: + valType = prometheus.UntypedValue + val = metric.GetUntyped().GetValue() + emitNewConstMetric() + + case dto.MetricType_SUMMARY: + quantiles := map[float64]float64{} + for _, q := range metric.GetSummary().GetQuantile() { + quantiles[q.GetQuantile()] = q.GetValue() + } + ch <- prometheus.MustNewConstSummary( + prometheus.NewDesc( + metricFamily.GetName(), + metricFamily.GetHelp(), + LabelNames, nil, + ), + metric.GetSummary().GetSampleCount(), + metric.GetSummary().GetSampleSum(), + quantiles, LabelValues..., + ) + + case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM: + buckets := map[float64]uint64{} + for _, b := range metric.GetHistogram().GetBucket() { + buckets[b.GetUpperBound()] = b.GetCumulativeCount() + } + ch <- prometheus.MustNewConstHistogram( + prometheus.NewDesc( + metricFamily.GetName(), + metricFamily.GetHelp(), + LabelNames, nil, + ), + metric.GetHistogram().GetSampleCount(), + metric.GetHistogram().GetSampleSum(), + buckets, LabelValues..., + ) + + default: + klog.Errorf("unknown metric type: %v", metricType) + } + } +} diff --git a/pkg/sidecar_mounter/sidecar_mounter.go b/pkg/sidecar_mounter/sidecar_mounter.go index 9b5410459..f363ffeaa 100644 --- a/pkg/sidecar_mounter/sidecar_mounter.go +++ b/pkg/sidecar_mounter/sidecar_mounter.go @@ -22,6 +22,7 @@ import ( "context" "fmt" "io" + "net/http" "os" "os/exec" "path/filepath" @@ -110,6 +111,12 @@ func (m *Mounter) Mount(ctx context.Context, mc *MountConfig) error { go logVolumeUsage(ctx, mc.BufferDir, mc.CacheDir) } + promPort := mc.FlagMap["prometheus-port"] + if promPort != "0" { + klog.Infof("start to collect metrics from port %v for volume %q", promPort, mc.VolumeName) + go collectMetrics(ctx, promPort, mc.TempDir) + } + // Since the gcsfuse has taken over the file descriptor, // closing the file descriptor to avoid other process forking it. syscall.Close(mc.FileDescriptor) @@ -204,3 +211,64 @@ func logVolumeTotalSize(dirPath string) { klog.Infof("total volume size of %v: %v bytes", dirPath, totalSize) } } + +// collectMetrics collects metrics from the gcsfuse instance every 10 seconds. +func collectMetrics(ctx context.Context, port, dirPath string) { + metricEndpoint := "http://localhost:" + port + "/metrics" + outputPath := dirPath + "/metrics.prom" + ticker := time.NewTicker(10 * time.Second) + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + newCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + scrapeMetrics(newCtx, metricEndpoint, outputPath) + cancel() + } + } +} + +// scrapeMetrics connects to the metrics endpoint, scrapes metrics, and save the metrics to the given file path. +func scrapeMetrics(ctx context.Context, metricEndpoint, outputPath string) { + // Make the HTTP GET request + req, err := http.NewRequestWithContext(ctx, http.MethodGet, metricEndpoint, nil) + if err != nil { + klog.Errorf("failed to create HTTP request to %q: %v", metricEndpoint, err) + + return + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + klog.Errorf("failed to make HTTP request to %q: %v", metricEndpoint, err) + + return + } + defer resp.Body.Close() // Ensure closure of response body + + // Check for a successful HTTP status code + if resp.StatusCode != http.StatusOK { + klog.Errorf("unexpected HTTP status: %v", resp.Status) + + return + } + + // Create the output file + out, err := os.Create(outputPath) + if err != nil { + klog.Errorf("error creating output file: %v", err) + + return + } + defer out.Close() // Ensure closure of output file + + // Copy the response body (file content) to our output file + _, err = io.Copy(out, resp.Body) + if err != nil { + klog.Errorf("error writing to output file: %v", err) + + return + } +} diff --git a/pkg/sidecar_mounter/sidecar_mounter_config.go b/pkg/sidecar_mounter/sidecar_mounter_config.go index 0dc881104..f1aa8075e 100644 --- a/pkg/sidecar_mounter/sidecar_mounter_config.go +++ b/pkg/sidecar_mounter/sidecar_mounter_config.go @@ -46,6 +46,7 @@ type MountConfig struct { BucketName string `json:"bucketName,omitempty"` BufferDir string `json:"-"` CacheDir string `json:"-"` + TempDir string `json:"-"` ConfigFile string `json:"-"` Options []string `json:"options,omitempty"` ErrWriter stderrWriterInterface `json:"-"` @@ -53,6 +54,8 @@ type MountConfig struct { ConfigFileFlagMap map[string]string `json:"-"` } +var prometheusPort = 8080 + var disallowedFlags = map[string]bool{ "temp-dir": true, "config-file": true, @@ -68,6 +71,7 @@ var disallowedFlags = map[string]bool{ "logging:log-rotate:compress": true, "cache-dir": true, "experimental-local-file-cache": true, + "prometheus-port": true, } var boolFlags = map[string]bool{ @@ -89,13 +93,15 @@ var boolFlags = map[string]bool{ // 4. Mount options passing to gcsfuse (passed by the csi mounter). func NewMountConfig(sp string) *MountConfig { // socket path pattern: /gcsfuse-tmp/.volumes//socket - volumeName := filepath.Base(filepath.Dir(sp)) + tempDir := filepath.Dir(sp) + volumeName := filepath.Base(tempDir) mc := MountConfig{ VolumeName: volumeName, BufferDir: filepath.Join(webhook.SidecarContainerBufferVolumeMountPath, ".volumes", volumeName), CacheDir: filepath.Join(webhook.SidecarContainerCacheVolumeMountPath, ".volumes", volumeName), + TempDir: tempDir, ConfigFile: filepath.Join(webhook.SidecarContainerTmpVolumeMountPath, ".volumes", volumeName, "config.yaml"), - ErrWriter: NewErrorWriter(filepath.Join(filepath.Dir(sp), "error")), + ErrWriter: NewErrorWriter(filepath.Join(tempDir, "error")), } klog.Infof("connecting to socket %q", sp) @@ -144,13 +150,15 @@ func NewMountConfig(sp string) *MountConfig { func (mc *MountConfig) prepareMountArgs() { flagMap := map[string]string{ - "app-name": GCSFuseAppName, - "temp-dir": mc.BufferDir + TempDir, - "config-file": mc.ConfigFile, - "foreground": "", - "uid": "0", - "gid": "0", + "app-name": GCSFuseAppName, + "temp-dir": mc.BufferDir + TempDir, + "config-file": mc.ConfigFile, + "foreground": "", + "uid": "0", + "gid": "0", + "prometheus-port": strconv.Itoa(prometheusPort), } + prometheusPort++ configFileFlagMap := map[string]string{ "logging:file-path": "/dev/fd/1", // redirect the output to cmd stdout @@ -165,6 +173,12 @@ func (mc *MountConfig) prepareMountArgs() { i := strings.LastIndex(arg, ":") f, v := arg[:i], arg[i+1:] + if f == util.DisableMetricsForGKE && v == util.TrueStr { + flagMap["prometheus-port"] = "0" + + continue + } + if disallowedFlags[f] { invalidArgs = append(invalidArgs, arg) } else { diff --git a/pkg/sidecar_mounter/sidecar_mounter_config_test.go b/pkg/sidecar_mounter/sidecar_mounter_config_test.go index c47164b83..841359a7e 100644 --- a/pkg/sidecar_mounter/sidecar_mounter_config_test.go +++ b/pkg/sidecar_mounter/sidecar_mounter_config_test.go @@ -20,8 +20,10 @@ package sidecarmounter import ( "os" "reflect" + "strconv" "testing" + "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/util" "gopkg.in/yaml.v3" ) @@ -211,11 +213,46 @@ func TestPrepareMountArgs(t *testing.T) { "file-cache:max-size-mb": "100", }, }, + { + name: "should return valid args when metrics is disabled", + mc: &MountConfig{ + BucketName: "test-bucket", + BufferDir: "test-buffer-dir", + CacheDir: "test-cache-dir", + ConfigFile: "test-config-file", + Options: []string{util.DisableMetricsForGKE + ":true"}, + }, + expectedArgs: map[string]string{ + "app-name": GCSFuseAppName, + "temp-dir": "test-buffer-dir/temp-dir", + "config-file": "test-config-file", + "foreground": "", + "uid": "0", + "gid": "0", + "prometheus-port": "0", + }, + expectedConfigMapArgs: defaultConfigFileFlagMap, + }, } + prometheusPort := 8080 for _, tc := range testCases { t.Logf("test case: %s", tc.name) + found := false + for _, o := range tc.mc.Options { + if o == util.DisableMetricsForGKE+":true" { + found = true + + break + } + } + + if !found { + tc.expectedArgs["prometheus-port"] = strconv.Itoa(prometheusPort) + prometheusPort++ + } + tc.mc.prepareMountArgs() if !reflect.DeepEqual(tc.mc.FlagMap, tc.expectedArgs) { t.Errorf("Got args %v, but expected %v", tc.mc.FlagMap, tc.expectedArgs) diff --git a/pkg/util/util.go b/pkg/util/util.go index de5cd4a57..a64d54ee5 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -33,6 +33,9 @@ const ( TrueStr = "true" FalseStr = "false" + + // mount options that both CSI mounter and sidecar mounter should understand. + DisableMetricsForGKE = "disable-metrics-for-gke" ) var ( diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 5deee2021..75cf7dfd5 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -109,6 +109,7 @@ var _ = ginkgo.Describe("E2E Test Suite", func() { testsuites.InitGcsFuseCSIFileCacheTestSuite, testsuites.InitGcsFuseCSIGCSFuseIntegrationFileCacheTestSuite, testsuites.InitGcsFuseCSIIstioTestSuite, + testsuites.InitGcsFuseCSIMetricsTestSuite, } testDriver := specs.InitGCSFuseCSITestDriver(c, m, *bucketLocation, *skipGcpSaTest) diff --git a/test/e2e/specs/specs.go b/test/e2e/specs/specs.go index 4cd97094b..57eff1837 100644 --- a/test/e2e/specs/specs.go +++ b/test/e2e/specs/specs.go @@ -159,6 +159,10 @@ func (t *TestPod) Create(ctx context.Context) { framework.ExpectNoError(err) } +func (t *TestPod) GetPodName() string { + return t.pod.Name +} + // VerifyExecInPodSucceed verifies shell cmd in target pod succeed. func (t *TestPod) VerifyExecInPodSucceed(f *framework.Framework, containerName, shExec string) { stdout, stderr, err := e2epod.ExecCommandInContainerWithFullOutput(f, t.pod.Name, containerName, "/bin/sh", "-c", shExec) @@ -344,6 +348,23 @@ func (t *TestPod) GetNode() string { return t.pod.Spec.NodeName } +func (t *TestPod) GetCISDriverNodePodIP(ctx context.Context) string { + node := t.GetNode() + daemonSetLabelSelector := "k8s-app=gcs-fuse-csi-driver" + + pods, err := t.client.CoreV1().Pods("").List(ctx, metav1.ListOptions{ + FieldSelector: "spec.nodeName=" + node, + LabelSelector: daemonSetLabelSelector, + }) + framework.ExpectNoError(err) + gomega.Expect(pods.Items).To(gomega.HaveLen(1)) + + pod := pods.Items[0] + gomega.Expect(pod.Status).ToNot(gomega.BeNil()) + + return pod.Status.PodIP +} + func (t *TestPod) SetNodeAffinity(nodeName string, sameNode bool) { gomega.Expect(nodeName).ToNot(gomega.Equal("")) diff --git a/test/e2e/testsuites/metrics.go b/test/e2e/testsuites/metrics.go new file mode 100644 index 000000000..63ceb4f67 --- /dev/null +++ b/test/e2e/testsuites/metrics.go @@ -0,0 +1,207 @@ +/* +Copyright 2018 The Kubernetes Authors. +Copyright 2022 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testsuites + +import ( + "context" + "fmt" + "os" + "os/exec" + "time" + + "github.com/google/uuid" + metricspkg "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/metrics" + "github.com/googlecloudplatform/gcs-fuse-csi-driver/test/e2e/specs" + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + dto "github.com/prometheus/client_model/go" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/kubernetes/test/e2e/framework" + e2evolume "k8s.io/kubernetes/test/e2e/framework/volume" + storageframework "k8s.io/kubernetes/test/e2e/storage/framework" + admissionapi "k8s.io/pod-security-admission/api" +) + +var expectedMetricNames = map[string]int{ + "fs_ops_count": 15, + "fs_ops_error_count": 2, + "fs_ops_latency": 15, + "gcs_download_bytes_count": 1, + "gcs_read_count": 1, + "gcs_read_bytes_count": 1, + "gcs_reader_count": 2, + "gcs_request_count": 6, + "gcs_request_latencies": 6, + "file_cache_read_count": 1, + "file_cache_read_bytes_count": 1, + "file_cache_read_latencies": 1, +} + +type gcsFuseCSIMetricsTestSuite struct { + tsInfo storageframework.TestSuiteInfo +} + +// InitGcsFuseCSIMetricsTestSuite returns gcsFuseCSIMetricsTestSuite that implements TestSuite interface. +func InitGcsFuseCSIMetricsTestSuite() storageframework.TestSuite { + return &gcsFuseCSIMetricsTestSuite{ + tsInfo: storageframework.TestSuiteInfo{ + Name: "metrics", + TestPatterns: []storageframework.TestPattern{ + storageframework.DefaultFsCSIEphemeralVolume, + storageframework.DefaultFsPreprovisionedPV, + storageframework.DefaultFsDynamicPV, + }, + }, + } +} + +func (t *gcsFuseCSIMetricsTestSuite) GetTestSuiteInfo() storageframework.TestSuiteInfo { + return t.tsInfo +} + +func (t *gcsFuseCSIMetricsTestSuite) SkipUnsupportedTests(_ storageframework.TestDriver, _ storageframework.TestPattern) { +} + +//nolint:maintidx +func (t *gcsFuseCSIMetricsTestSuite) DefineTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) { + type local struct { + config *storageframework.PerTestConfig + volumeResource *storageframework.VolumeResource + artifactsDir string + } + var l local + ctx := context.Background() + + // Beware that it also registers an AfterEach which renders f unusable. Any code using + // f must run inside an It or Context callback. + f := framework.NewFrameworkWithCustomTimeouts("metrics", storageframework.GetDriverTimeouts(driver)) + f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged + + init := func(configPrefix ...string) { + l = local{} + l.config = driver.PrepareTest(ctx, f) + if len(configPrefix) > 0 { + l.config.Prefix = configPrefix[0] + } + l.volumeResource = storageframework.CreateVolumeResource(ctx, driver, l.config, pattern, e2evolume.SizeRange{}) + + l.artifactsDir = "../../_artifacts" + if dir, ok := os.LookupEnv("ARTIFACTS"); ok { + l.artifactsDir = dir + } + } + + cleanup := func() { + var cleanUpErrs []error + cleanUpErrs = append(cleanUpErrs, l.volumeResource.CleanupResource(ctx)) + err := utilerrors.NewAggregate(cleanUpErrs) + framework.ExpectNoError(err, "while cleaning up") + } + + ginkgo.It("should emit metrics", func() { + init(specs.EnableFileCachePrefix) + defer cleanup() + + // The test driver uses config.Prefix to pass the bucket names back to the test suite. + bucketName := l.config.Prefix + + // Create files using gsutil + fileName := uuid.NewString() + specs.CreateTestFileInBucket(fileName, bucketName) + + ginkgo.By("Configuring the pod") + tPod := specs.NewTestPod(f.ClientSet, f.Namespace) + tPod.SetupVolume(l.volumeResource, volumeName, mountPath, false) + + ginkgo.By("Deploying the pod") + tPod.Create(ctx) + defer tPod.Cleanup(ctx) + + ginkgo.By("Checking that the pod is running") + tPod.WaitForRunning(ctx) + + ginkgo.By("Checking that the pod command exits with no error") + tPod.VerifyExecInPodSucceed(f, specs.TesterContainerName, fmt.Sprintf("mount | grep %v | grep rw,", mountPath)) + tPod.VerifyExecInPodSucceed(f, specs.TesterContainerName, fmt.Sprintf("cat %v/%v", mountPath, fileName)) + tPod.VerifyExecInPodSucceed(f, specs.TesterContainerName, fmt.Sprintf("cat %v/%v", mountPath, fileName)) + tPod.VerifyExecInPodSucceed(f, specs.TesterContainerName, fmt.Sprintf("touch %v/testfile", mountPath)) + tPod.VerifyExecInPodSucceed(f, specs.TesterContainerName, fmt.Sprintf("ls %v", mountPath)) + tPod.VerifyExecInPodSucceed(f, specs.TesterContainerName, fmt.Sprintf("echo 'hello world!' > %v/testfile", mountPath)) + tPod.VerifyExecInPodSucceed(f, specs.TesterContainerName, fmt.Sprintf("cat %v/testfile", mountPath)) + tPod.VerifyExecInPodSucceed(f, specs.TesterContainerName, fmt.Sprintf("rm %v/testfile", mountPath)) + + ginkgo.By("Sleeping 20 seconds for metrics to be collected") + time.Sleep(20 * time.Second) + + ginkgo.By("Collecting Prometheus metrics from the CSI driver node server") + csiPodIP := tPod.GetCISDriverNodePodIP(ctx) + tPod.VerifyExecInPodSucceed(f, specs.TesterContainerName, fmt.Sprintf("wget -O %v/metrics.prom http://%v:9920/metrics", mountPath, csiPodIP)) + promFile := fmt.Sprintf("%v/%v/metrics.prom", l.artifactsDir, f.Namespace.Name) + + //nolint:gosec + if output, err := exec.Command("gsutil", "cp", fmt.Sprintf("gs://%v/metrics.prom", bucketName), promFile).CombinedOutput(); err != nil { + framework.Failf("Failed to download the Prometheus metrics data from GCS bucket %q: %v, output: %s", bucketName, err, output) + } + + ginkgo.By("Parsing Prometheus metrics") + families, err := metricspkg.ProcessMetricsFile(promFile) + framework.ExpectNoError(err) + + volume := volumeName + if l.volumeResource.Pv != nil { + volume = l.volumeResource.Pv.Name + } + podName := tPod.GetPodName() + + for metricName, metricCount := range expectedMetricNames { + metricsList := []*dto.Metric{} + metricFamily, ok := families[metricName] + if ok { + metricLoop: + for _, m := range metricFamily.GetMetric() { + for _, pair := range m.GetLabel() { + name, value := pair.GetName(), pair.GetValue() + switch name { + case "bucket_name": + if value != bucketName { + continue metricLoop + } + case "pod_name": + if value != podName { + continue metricLoop + } + case "volume_name": + if value != volume { + continue metricLoop + } + case "namespace_name": + if value != f.Namespace.Name { + continue metricLoop + } + } + } + + metricsList = append(metricsList, m) + } + } + + gomega.Expect(metricsList).To(gomega.HaveLen(metricCount), fmt.Sprintf("Found metric %q count: %v, expected count: %v", metricName, len(metricsList), metricCount)) + ginkgo.By(fmt.Sprintf("Found metric %q count: %v", metricName, len(metricsList))) + } + }) +} diff --git a/test/e2e/utils/handler.go b/test/e2e/utils/handler.go index ca01701d4..07040344b 100644 --- a/test/e2e/utils/handler.go +++ b/test/e2e/utils/handler.go @@ -217,6 +217,8 @@ func generateTestSkip(testParams *TestParameters) string { } if testParams.UseGKEManagedDriver { + skipTests = append(skipTests, "metrics") + // TODO(saikatroyc) remove this skip when GCSFuse CSI v1.4.3 is back-ported to the below GKE versions. if strings.HasPrefix(testParams.GkeClusterVersion, "1.27") || strings.HasPrefix(testParams.GkeClusterVersion, "1.28") { skipTests = append(skipTests, "csi-skip-bucket-access-check") diff --git a/test/sanity/sanity_test.go b/test/sanity/sanity_test.go index 448cbcc2d..132b26066 100644 --- a/test/sanity/sanity_test.go +++ b/test/sanity/sanity_test.go @@ -25,6 +25,7 @@ import ( "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/cloud_provider/clientset" "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/cloud_provider/storage" driver "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/csi_driver" + "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/metrics" sanity "github.com/kubernetes-csi/csi-test/v5/pkg/sanity" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -69,6 +70,7 @@ func TestSanity(t *testing.T) { TokenManager: auth.NewFakeTokenManager(), Mounter: mount.NewFakeMounter([]mount.MountPoint{}), K8sClients: &clientset.FakeClientset{}, + MetricsManager: &metrics.FakeMetricsManager{}, } gcfsDriver, err := driver.NewGCSDriver(driverConfig)