Skip to content

Commit

Permalink
Handle read while writing race condition.
Browse files Browse the repository at this point in the history
  • Loading branch information
hime committed Aug 14, 2024
1 parent d4bae3a commit 4f6a08b
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 14 deletions.
65 changes: 55 additions & 10 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ limitations under the License.
package metrics

import (
"bufio"
"fmt"
"net/http"
"os"
"strconv"
"strings"
"time"

Expand All @@ -33,8 +35,9 @@ import (
)

const (
metricsPath = "/metrics"
metricsFileName = "/metrics.prom"
metricsPath = "/metrics"
metricsFileNameTemplate = `/metrics_%d.prom`
generationFileName = "/generation.txt"
)

type Manager interface {
Expand Down Expand Up @@ -89,8 +92,7 @@ func (mm *manager) RegisterMetricsCollector(targetPath, podNamespace, podName, b
}

podUID, volumeName, _ := util.ParsePodIDVolumeFromTargetpath(targetPath)
promFilePath := emptyDirBasePath + metricsFileName
c := NewTextFileCollector(promFilePath, podUID, volumeName, map[string]string{
c := NewTextFileCollector(emptyDirBasePath, podUID, volumeName, map[string]string{
"pod_name": podName,
"namespace_name": podNamespace,
"volume_name": volumeName,
Expand Down Expand Up @@ -154,22 +156,65 @@ func (c *textFileCollector) Collect(ch chan<- prometheus.Metric) {

// ProcessMetricsFile processes a metrics file that follows Prometheus text format: https://prometheus.io/docs/instrumenting/exposition_formats/,
// returning its MetricFamily.
func ProcessMetricsFile(path string) (map[string]*dto.MetricFamily, error) {
f, err := os.Open(path)
func ProcessMetricsFile(directoryPath string) (map[string]*dto.MetricFamily, error) {
// Find latest file generation.
generationFilePath := directoryPath + generationFileName
genNumber, err := getGenerationNumber(generationFilePath)
if err != nil {
return nil, fmt.Errorf("failed to open metrics file %q: %w", path, err)
return nil, fmt.Errorf("could not get generation number from %s: %w", generationFilePath, err)
}
defer f.Close()

// Find latest metrics file name and open.
metricsFilePath := directoryPath + GetMetricsFileName(genNumber)
metricsFile, err := os.Open(metricsFilePath)
if err != nil {
return nil, fmt.Errorf("failed to open metrics file %q: %w", metricsFilePath, err)
}
defer metricsFile.Close()

var parser expfmt.TextParser
metricFamilies, err := parser.TextToMetricFamilies(f)
metricFamilies, err := parser.TextToMetricFamilies(metricsFile)
if err != nil {
return nil, fmt.Errorf("failed to parse metrics file %q: %w", path, err)
return nil, fmt.Errorf("failed to parse metrics file %q: %w", metricsFilePath, err)
}

return metricFamilies, nil
}

// GetMetricsFilePath creates the expected name for the latest metrics file.
func GetMetricsFileName(genNumber int) string {
return fmt.Sprintf(metricsFileNameTemplate, genNumber)
}

func GetGenerationFileName() string {
return generationFileName
}

// getGenerationNumber opens the generation.txt file and parses the payload into
// an integer. This integer represents the current generation of the metrics file.
func getGenerationNumber(filePath string) (int, error) {
genFile, err := os.Open(filePath)
if err != nil {
return -1, fmt.Errorf("failed to open generation file %q: %w", filePath, err)
}
defer genFile.Close()

// Create file reader
reader := bufio.NewReader(genFile)

// Read the first line.
input, _ := reader.ReadString('\n')
input = strings.TrimSpace(input) // Remove leading/trailing whitespace

// Convert to integer
num, err := strconv.Atoi(input)
if err != nil {
return -1, fmt.Errorf(`invalid input "%s" must be an integer: %w`, input, err)
}

return num, nil
}

// emitMetricFamily iterates MetricFamily, converts metricFamily.Metric to prometheus.Metric, and emits the metric via the given chan.
func (c *textFileCollector) emitMetricFamily(metricFamily *dto.MetricFamily, ch chan<- prometheus.Metric) {
var valType prometheus.ValueType
Expand Down
47 changes: 43 additions & 4 deletions pkg/sidecar_mounter/sidecar_mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"syscall"
"time"

"github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/metrics"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -214,8 +215,8 @@ func logVolumeTotalSize(dirPath string) {

// collectMetrics collects metrics from the gcsfuse instance every 10 seconds.
func collectMetrics(ctx context.Context, port, dirPath string) {
genNumber := 0
metricEndpoint := "http://localhost:" + port + "/metrics"
outputPath := dirPath + "/metrics.prom"
ticker := time.NewTicker(10 * time.Second)

for {
Expand All @@ -224,14 +225,18 @@ func collectMetrics(ctx context.Context, port, dirPath string) {
return
case <-ticker.C:
newCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
scrapeMetrics(newCtx, metricEndpoint, outputPath)
scrapeMetrics(newCtx, metricEndpoint, dirPath, genNumber)
deleteOutdatedFile(dirPath, genNumber-2)
cancel()
genNumber++
}
}
}

// scrapeMetrics connects to the metrics endpoint, scrapes metrics, and save the metrics to the given file path.
func scrapeMetrics(ctx context.Context, metricEndpoint, outputPath string) {
func scrapeMetrics(ctx context.Context, metricEndpoint, dirPath string, genNumber int) {
outputPath := dirPath + metrics.GetMetricsFileName(genNumber)

// Make the HTTP GET request
req, err := http.NewRequestWithContext(ctx, http.MethodGet, metricEndpoint, nil)
if err != nil {
Expand Down Expand Up @@ -262,7 +267,6 @@ func scrapeMetrics(ctx context.Context, metricEndpoint, outputPath string) {

return
}
defer out.Close() // Ensure closure of output file

// Copy the response body (file content) to our output file
_, err = io.Copy(out, resp.Body)
Expand All @@ -271,4 +275,39 @@ func scrapeMetrics(ctx context.Context, metricEndpoint, outputPath string) {

return
}

// Ensure closure of output file.
out.Close()

// Update generation number.
generationFilePath := dirPath + metrics.GetGenerationFileName()
genFile, err := os.Create(generationFilePath)
if err != nil {
klog.Errorf("error getting file descriptor for metrics generation file: %v", err)

return
}
defer genFile.Close()

_, err = fmt.Fprintf(genFile, "%d\n", genNumber)
if err != nil {
klog.Infof("Error writing to file: %v", err)

return
}
}

func deleteOutdatedFile(dirPath string, genNumber int) {
if genNumber < 0 {
return
}

// Create path for file to delete.
outputPath := dirPath + metrics.GetMetricsFileName(genNumber)

// Delete the output file.
err := os.Remove(outputPath)
if err != nil {
klog.Errorf(`error deleting obsolete metrics file "%s": %v`, outputPath, err)
}
}

0 comments on commit 4f6a08b

Please sign in to comment.