Skip to content

Commit

Permalink
feat: add Prometheus support (#74)
Browse files Browse the repository at this point in the history
* docs: doc around metrics

* feat: added prometheus dependency

* define metric instrument interface

* feat: init prometheus collector

* feat: implement stubs for metric instrument

* feat: interface established for metrics

* update gauge to type interface for less type casting

* update server.go and handler.go to use new instrumentation scheme

* feat: update to make instrumentation static

* changes in metric instrument signature

* feat: add cast dependency for converting values reliably.

* feat: implement interface methods

* feat: add cast dependency update

* feat: implement prometheus methods

* feat: run server in a go routine

* fix: correct error return format

* fix: add missing label for count metric

* feat: added log for metric server shut down

* feat: convert decrement counter to a differnt metric for calculation during observation

* feat: removed unused decrement counter

* feat: bump raccoon to version 1.15

* update sample env files to include new metric config

* feat: bump to version 1.16 for golang

* bump golang version to 1.18

* fix: remove usage of errors.join since it requires go > 1.20

* feat: bump docker golang version to 1.20

* revert: docker changes for protoc

* feat: bump golang version to 1.18 in go.mod

* fix: label inconsistencies

* feat: update setup-go version

* feat: set prometheus as the default for metrics in test

* feat: remove telegraf dependency from docker compose

* feat: added setup go

* run in same image

* feat: added config values documentation

* feat: upgrade running image to debian:bookworm

* feat: added buckets according metric range approximates

* update docs with missing metrics

* refactor: set unused locally rather than using mute options

* refactor: formatting and uniformity changes

* feat: add logging for any error encountered during metric scrape

* fix: early return while registering metrics

* feat: remove 1 as bucket resolution due to low probability of the bucket filling

* feat: add interface abstraction for mocking

* feat: added prometheus tests

* introduce delay in metric server initialisation

* feat: added metric tests

* feat: add metric test

* remove test for statsD setup

* feat: move to config instead of loading from env

* refactor: refactor statsD implementation

* refactor: cleanup metrics.md and fix issues

* added documentation

* feat: improved text on logging and help of metrics

* feat: update go.mod

* feat: add common config for recording runtime stats

* record err in case of casting error

* feat: add support for error in case of unable to cast. Added tests for the same

* fix: metric name fetch
  • Loading branch information
punit-kulal committed Sep 12, 2023
1 parent db7f808 commit 158a8d8
Show file tree
Hide file tree
Showing 29 changed files with 1,381 additions and 324 deletions.
6 changes: 5 additions & 1 deletion .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ SERVER_CORS_ALLOWED_HEADERS=""

SERVER_GRPC_PORT=8081



WORKER_BUFFER_CHANNEL_SIZE=5
WORKER_BUFFER_FLUSH_TIMEOUT_MS=5000
WORKER_POOL_SIZE=5
Expand All @@ -32,7 +34,9 @@ PUBLISHER_KAFKA_CLIENT_STATISTICS_INTERVAL_MS=5000
PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES=100000
PUBLISHER_KAFKA_FLUSH_INTERVAL_MS=1000

METRIC_RUNTIME_STATS_RECORD_INTERVAL_MS=1000
METRIC_PROMETHEUS_ENABLED="true"
METRIC_STATSD_ADDRESS=":8125"
METRIC_STATSD_FLUSH_PERIOD_MS=100
METRIC_STATSD_FLUSH_PERIOD_MS=1000

LOG_LEVEL="info"
4 changes: 3 additions & 1 deletion .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ PUBLISHER_KAFKA_CLIENT_STATISTICS_INTERVAL_MS=5000
PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES=100000
PUBLISHER_KAFKA_FLUSH_INTERVAL_MS=1000

METRIC_RUNTIME_STATS_RECORD_INTERVAL_MS=1000
METRIC_PROMETHEUS_ENABLED="true"
METRIC_STATSD_ADDRESS=":8125"
METRIC_STATSD_FLUSH_PERIOD_MS=100
METRIC_STATSD_FLUSH_PERIOD_MS=1000

LOG_LEVEL="info"
8 changes: 4 additions & 4 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Setup Go
uses: actions/setup-go@v2.1.3
uses: actions/setup-go@v3
with:
go-version: "1.14"
go-version: "1.18"
- name: Checkout repo
uses: actions/checkout@v2
with:
Expand All @@ -25,9 +25,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Setup Go
uses: actions/setup-go@v2.1.3
uses: actions/setup-go@v3
with:
go-version: "1.14"
go-version: "1.18"
- name: Install Protoc
uses: arduino/setup-protoc@v1
- uses: actions/checkout@v2
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/integration-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ jobs:
uses: arduino/setup-protoc@v1
- name: Checkout repo
uses: actions/checkout@v2
- name: Setup Go
uses: actions/setup-go@v3
with:
go-version: "1.18"
- name: Copy integration config
run: cp .env.test .env
- name: Run Raccoon
Expand Down
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.14
FROM golang:1.18

WORKDIR /app
RUN apt-get update && apt-get install unzip --no-install-recommends --assume-yes
Expand All @@ -10,7 +10,8 @@ RUN PROTOC_ZIP=protoc-3.17.3-linux-x86_64.zip && \
COPY . .
RUN make build

FROM debian:buster-slim

FROM debian:bookworm-slim
WORKDIR /app
COPY --from=0 /app/raccoon ./raccoon
COPY . .
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ You can consume the published events from the host machine by using `localhost:9

Prerequisite:

- You need to have [GO](https://golang.org/) 1.14 or above installed
- You need to have [GO](https://golang.org/) 1.18 or above installed
- You need `protoc` [installed](https://github.com/protocolbuffers/protobuf#protocol-compiler-installation)

```sh
Expand Down
23 changes: 11 additions & 12 deletions app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices
Until then we fall back to approximation */
eventsInChannel := len(bufferChannel) * 7
logger.Info(fmt.Sprintf("Outstanding unprocessed events in the channel, data lost ~ (No batches %d * 5 events) = ~%d", len(bufferChannel), eventsInChannel))
metrics.Count("kafka_messages_delivered_total", eventsInChannel+eventsInProducer, "success=false")
metrics.Count("kafka_messages_delivered_total", int64(eventsInChannel+eventsInProducer), map[string]string{"success": "false", "conn_group": "NA", "event_type": "NA"})
logger.Info("Exiting server")
cancel()
default:
Expand All @@ -73,20 +73,19 @@ func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices
}

func reportProcMetrics() {
t := time.Tick(config.MetricStatsd.FlushPeriodMs)
t := time.Tick(config.MetricInfo.RuntimeStatsRecordInterval)
m := &runtime.MemStats{}
for {
<-t
metrics.Gauge("server_go_routines_count_current", runtime.NumGoroutine(), "")

metrics.Gauge("server_go_routines_count_current", runtime.NumGoroutine(), map[string]string{})
runtime.ReadMemStats(m)
metrics.Gauge("server_mem_heap_alloc_bytes_current", m.HeapAlloc, "")
metrics.Gauge("server_mem_heap_inuse_bytes_current", m.HeapInuse, "")
metrics.Gauge("server_mem_heap_objects_total_current", m.HeapObjects, "")
metrics.Gauge("server_mem_stack_inuse_bytes_current", m.StackInuse, "")
metrics.Gauge("server_mem_gc_triggered_current", m.LastGC/1000, "")
metrics.Gauge("server_mem_gc_pauseNs_current", m.PauseNs[(m.NumGC+255)%256]/1000, "")
metrics.Gauge("server_mem_gc_count_current", m.NumGC, "")
metrics.Gauge("server_mem_gc_pauseTotalNs_current", m.PauseTotalNs, "")
metrics.Gauge("server_mem_heap_alloc_bytes_current", m.HeapAlloc, map[string]string{})
metrics.Gauge("server_mem_heap_inuse_bytes_current", m.HeapInuse, map[string]string{})
metrics.Gauge("server_mem_heap_objects_total_current", m.HeapObjects, map[string]string{})
metrics.Gauge("server_mem_stack_inuse_bytes_current", m.StackInuse, map[string]string{})
metrics.Gauge("server_mem_gc_triggered_current", m.LastGC/1000, map[string]string{})
metrics.Gauge("server_mem_gc_pauseNs_current", m.PauseNs[(m.NumGC+255)%256]/1000, map[string]string{})
metrics.Gauge("server_mem_gc_count_current", m.NumGC, map[string]string{})
metrics.Gauge("server_mem_gc_pauseTotalNs_current", m.PauseTotalNs, map[string]string{})
}
}
3 changes: 3 additions & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ func Load() {
viper.ReadInConfig()

logConfigLoader()

publisherKafkaConfigLoader()
serverConfigLoader()
serverWsConfigLoader()
serverGRPCConfigLoader()
serverCorsConfigLoader()
workerConfigLoader()
metricCommonConfigLoader()
metricStatsdConfigLoader()
metricPrometheusConfigLoader()
eventDistributionConfigLoader()
eventConfigLoader()
}
33 changes: 33 additions & 0 deletions config/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,50 @@ import (
)

var MetricStatsd metricStatsdCfg
var MetricPrometheus metricPrometheusCfg
var MetricInfo metricInfoCfg

type metricStatsdCfg struct {
Enabled bool
Address string
FlushPeriodMs time.Duration
}

type metricPrometheusCfg struct {
Enabled bool
Port int
Path string
}

type metricInfoCfg struct {
RuntimeStatsRecordInterval time.Duration
}

func metricStatsdConfigLoader() {
viper.SetDefault("METRIC_STATSD_ENABLED", false)
viper.SetDefault("METRIC_STATSD_ADDRESS", ":8125")
viper.SetDefault("METRIC_STATSD_FLUSH_PERIOD_MS", 10000)
MetricStatsd = metricStatsdCfg{
Enabled: util.MustGetBool("METRIC_STATSD_ENABLED"),
Address: util.MustGetString("METRIC_STATSD_ADDRESS"),
FlushPeriodMs: util.MustGetDuration("METRIC_STATSD_FLUSH_PERIOD_MS", time.Millisecond),
}
}

func metricPrometheusConfigLoader() {
viper.SetDefault("METRIC_PROMETHEUS_ENABLED", false)
viper.SetDefault("METRIC_PROMETHEUS_PORT", 9090)
viper.SetDefault("METRIC_PROMETHEUS_PATH", "/metrics")
MetricPrometheus = metricPrometheusCfg{
Enabled: util.MustGetBool("METRIC_PROMETHEUS_ENABLED"),
Port: util.MustGetInt("METRIC_PROMETHEUS_PORT"),
Path: util.MustGetString("METRIC_PROMETHEUS_PATH"),
}
}

func metricCommonConfigLoader() {
viper.SetDefault("METRIC_RUNTIME_STATS_RECORD_INTERVAL_MS", 10000)
MetricInfo = metricInfoCfg{
RuntimeStatsRecordInterval: util.MustGetDuration("METRIC_RUNTIME_STATS_RECORD_INTERVAL_MS", time.Millisecond),
}
}
22 changes: 12 additions & 10 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ version: '3.9'
networks:
cs-network:


services:
zookeeper:
image: confluentinc/cp-zookeeper:5.1.2
Expand Down Expand Up @@ -41,14 +42,14 @@ services:
cs:
build:
context: .
command: ["/bin/sh", "-c", "./raccoon"]
command: [ "/bin/sh", "-c", "./raccoon" ]
hostname: cs
container_name: cs
stdin_open: true
tty: true
depends_on:
- kafka
- telegraf
# - telegraf
environment:
SERVER_WEBSOCKET_PORT: "8080"
SERVER_WEBSOCKET_CHECK_ORIGIN: "true"
Expand All @@ -74,6 +75,7 @@ services:
PUBLISHER_KAFKA_CLIENT_STATISTICS_INTERVAL_MS: 5000
PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES: 100000
PUBLISHER_KAFKA_FLUSH_INTERVAL_MS: 1000
METRIC_PROMETHEUS_ENABLED: "true"
METRIC_STATSD_ADDRESS: "telegraf:8125"
METRIC_STATSD_FLUSH_PERIOD_MS: 100
LOG_LEVEL: "info"
Expand All @@ -82,11 +84,11 @@ services:
- "8081:8081"
networks:
- cs-network
telegraf:
image: telegraf
volumes:
- ./.telegraf.sample.conf:/etc/telegraf/telegraf.conf:ro
ports:
- "8125:8125"
networks:
- cs-network
# telegraf:
# image: telegraf
# volumes:
# - ./.telegraf.sample.conf:/etc/telegraf/telegraf.conf:ro
# ports:
# - "8125:8125"
# networks:
# - cs-network
35 changes: 35 additions & 0 deletions docs/docs/reference/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,20 @@ Upon shutdown, the publisher will try to finish processing events in buffer befo

## Metric

### `METRIC_RUNTIME_STATS_RECORD_INTERVAL_MS`

The time interval between recording runtime stats of the application in the insturmentation. It's recommended to keep this value equivalent to flush interval when using statsd and your collector's scrape interval when using prometheus as your instrumentation.

- Type `Optional`
- Default Value: `10000`

### `METRIC_STATSD_ENABLED`

Flag to enable export of statsd metric

- Type `Optional`
- Default value: `false`

### `METRIC_STATSD_ADDRESS`

Address to reports the service metrics.
Expand All @@ -263,6 +277,27 @@ Interval for the service to push metrics.
- Type `Optional`
- Default value: `10000`

### `METRIC_PROMETHEUS_ENABLED`

Flag to enable a prometheus http server to expose metrics.

- Type `Optional`
- Default value: `false`

### `METRIC_PROMETHEUS_PATH`

The path at which prometheus server should serve metrics.

- Type `Optional`
- Default value: `/metrics`

### `METRIC_PROMETHEUS_PORT`

The port number on which prometheus server will be listening for metric scraping requests.

- Type `Optional`
- Default value: `9090`

## Log

### `LOG_LEVEL`
Expand Down
47 changes: 46 additions & 1 deletion docs/docs/reference/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,30 @@ Duration of alive connection per session per connection
- Type: `Timing`
- Tags: `conn_group=*`

### `conn_close_err_count`

Number of connection close errors encountered

- Type: `Count`
- Tags: NA

## Kafka Publisher

### `kafka_messages_delivered_total`

Number of delivered events to Kafka
Number of delivered events to Kafka. The metric also contains false increments. To find the true value, one should use the difference between this and `kafka_messages_undelivered_total` metric for the same tag/labels.

- Type: `Count`
- Tags: `success=false` `success=true` `conn_group=*` `event_type=*`

### `kafka_messages_undelivered_total`

The count of false increments done by `kafka_messages_delivered_total`. To be used in conjunction with the former for accurate metrics.

- Type: `Count`
- Tags: `success=false` `success=true` `conn_group=*` `event_type=*`


### `kafka_unknown_topic_failure_total`

Number of delivery failure caused by topic does not exist in kafka.
Expand Down Expand Up @@ -102,6 +117,29 @@ Broker latency / round-trip time in microseconds
- Type: `Gauge`
- Tags: `broker=broker_nodes`

### `ack_event_rtt_ms`

Time taken from ack function called by kafka producer to processed by the ack handler.

- Type: `Timing`
- Tags: NA

### `event_rtt_ms`

Time taken from event is consumed from the queue to be acked by the ack handler.

- Type: `Timing`
- Tags: NA

### `kafka_producebulk_tt_ms`

Response time of produce batch method of the kafka producer

- Type `Timing`
- Tags: NA



## Resource Usage

### `server_mem_gc_triggered_current`
Expand Down Expand Up @@ -178,6 +216,13 @@ Number of events received in requests
- Type: `Count`
- Tags: `conn_group=*` `event_type=*`

### `events_duplicate_total`

Number of duplicate events

- Type: `Count`
- Tags: `conn_group=*` `reason=*`

### `batches_read_total`

Request count
Expand Down
Loading

0 comments on commit 158a8d8

Please sign in to comment.