diff --git a/default.nix b/default.nix index fead9ed7..b2494937 100644 --- a/default.nix +++ b/default.nix @@ -2,7 +2,7 @@ buildGoModule { pname = "oplogtoredis"; - version = "3.5.1"; + version = "3.6.0"; src = builtins.path { path = ./.; }; postInstall = '' diff --git a/lib/redispub/publisher.go b/lib/redispub/publisher.go index 217bd17e..57a595f3 100644 --- a/lib/redispub/publisher.go +++ b/lib/redispub/publisher.go @@ -55,12 +55,13 @@ var metricTemporaryFailures = promauto.NewCounter(prometheus.CounterOpts{ Help: "Number of failures encountered when trying to send a message. We automatically retry, and only register a permanent failure (in otr_redispub_processed_messages) after 30 failures.", }) -var metricLastCommandDuration = promauto.NewGauge(prometheus.GaugeOpts{ +var redisCommandDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "otr", Subsystem: "redispub", - Name: "last_command_duration_seconds", - Help: "The round trip time in seconds of the most recent write to Redis.", -}) + Name: "redis_command_duration_seconds", + Help: "A histogram recording the duration in seconds of round trips to redis.", + Buckets: []float64{0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5}, +}, []string{"ordinal"}) var metricStalenessPreRetries = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "otr", @@ -76,6 +77,14 @@ var metricLastOplogEntryStaleness = promauto.NewGaugeVec(prometheus.GaugeOpts{ Help: "Gauge recording the difference between this server's clock and the timestamp on the last published oplog entry.", }, []string{"ordinal"}) +var metricOplogEntryStaleness = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "otr", + Subsystem: "redispub", + Name: "entry_staleness_seconds", + Help: "Histogram recording the difference between this server's clock and the timestamp of each processed oplog entry.", + Buckets: []float64{0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50, 100}, +}, []string{"ordinal"}) + // PublishStream reads Publications from the given channel and publishes them // to Redis. func PublishStream(clients []redis.UniversalClient, in <-chan *Publication, opts *PublishOpts, stop <-chan bool, ordinal int) { @@ -170,7 +179,10 @@ func publishSingleMessageWithRetries(p *Publication, maxRetries int, sleepTime t func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix string, dedupeExpirationSeconds int, ordinal int) error { start := time.Now() - metricLastOplogEntryStaleness.WithLabelValues(strconv.Itoa(ordinal)).Set(float64(time.Since(time.Unix(int64(p.OplogTimestamp.T), 0)).Seconds())) + ordinalStr := strconv.Itoa(ordinal) + staleness := float64(time.Since(time.Unix(int64(p.OplogTimestamp.T), 0)).Seconds()) + metricLastOplogEntryStaleness.WithLabelValues(ordinalStr).Set(staleness) + metricOplogEntryStaleness.WithLabelValues(ordinalStr).Observe(staleness) _, err := publishDedupe.Run( context.Background(), @@ -191,7 +203,7 @@ func publishSingleMessage(p *Publication, client redis.UniversalClient, prefix s strings.Join(p.Channels, "$"), // ARGV[3], channels ).Result() - metricLastCommandDuration.Set(time.Since(start).Seconds()) + redisCommandDuration.WithLabelValues(ordinalStr).Observe(time.Since(start).Seconds()) return err }