Skip to content

Commit

Permalink
Merge pull request #76 from tomershafir/exporter-richer-queue-config
Browse files Browse the repository at this point in the history
Exporter richer queue config
  • Loading branch information
akvlad committed Mar 27, 2024
2 parents a7399e2 + 29ca160 commit 75892dc
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 57 deletions.
15 changes: 1 addition & 14 deletions exporter/clickhouseprofileexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,17 @@ import (
type Config struct {
exporterhelper.TimeoutSettings `mapstructure:",squash"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
QueueSettings `mapstructure:"sending_queue"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`

// DSN is the ClickHouse server Data Source Name.
// For tcp protocol reference: [ClickHouse/clickhouse-go#dsn](https://github.com/ClickHouse/clickhouse-go#dsn).
// For http protocol reference: [mailru/go-clickhouse/#dsn](https://github.com/mailru/go-clickhouse/#dsn).
Dsn string `mapstructure:"dsn"`
}

type QueueSettings struct {
// Length of the sending queue
QueueSize int `mapstructure:"queue_size"`
}

var _ component.Config = (*Config)(nil)

// Checks that the receiver configuration is valid
func (cfg *Config) Validate() error {
return nil
}

func (cfg *Config) enforceQueueSettings() exporterhelper.QueueSettings {
return exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 1,
QueueSize: cfg.QueueSettings.QueueSize,
}
}
4 changes: 2 additions & 2 deletions exporter/clickhouseprofileexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
func createDefaultConfig() component.Config {
return &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
QueueSettings: QueueSettings{QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize},
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
Dsn: defaultDsn,
}
Expand All @@ -37,7 +37,7 @@ func createLogsExporter(ctx context.Context, set exporter.CreateSettings, cfg co
cfg,
exp.send,
exporterhelper.WithShutdown(exp.Shutdown),
exporterhelper.WithQueue(c.enforceQueueSettings()),
exporterhelper.WithQueue(c.QueueSettings),
exporterhelper.WithTimeout(c.TimeoutSettings),
exporterhelper.WithRetry(c.BackOffConfig),
)
Expand Down
18 changes: 1 addition & 17 deletions exporter/qrynexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ const (
type Config struct {
exporterhelper.TimeoutSettings `mapstructure:",squash"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
// QueueSettings is a subset of exporterhelper.QueueSettings,
// because only QueueSize is user-settable.
QueueSettings QueueSettings `mapstructure:"sending_queue"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`

ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"`

Expand All @@ -45,12 +43,6 @@ type Config struct {
Metrics MetricsConfig `mapstructure:"metrics"`
}

// QueueSettings is a subset of exporterhelper.QueueSettings.
type QueueSettings struct {
// QueueSize set the length of the sending queue
QueueSize int `mapstructure:"queue_size"`
}

// LogsConfig holds the configuration for log data.
type LogsConfig struct {
// AttributeLabels is the string representing attribute labels.
Expand All @@ -74,11 +66,3 @@ var _ component.Config = (*Config)(nil)
func (cfg *Config) Validate() error {
return nil
}

func (cfg *Config) enforcedQueueSettings() exporterhelper.QueueSettings {
return exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 1,
QueueSize: cfg.QueueSettings.QueueSize,
}
}
7 changes: 5 additions & 2 deletions exporter/qrynexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ func TestLoadConfig(t *testing.T) {
RandomizationFactor: 0.5,
Multiplier: 1.5,
},
QueueSettings: QueueSettings{
QueueSize: 100,
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
QueueSize: 100,
NumConsumers: 10,
StorageID: nil,
},
},
},
Expand Down
18 changes: 9 additions & 9 deletions exporter/qrynexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewFactory() exporter.Factory {
func createDefaultConfig() component.Config {
return &Config{
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
QueueSettings: QueueSettings{QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize},
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
DSN: defaultDSN,
}
Expand All @@ -52,23 +52,23 @@ func createDefaultConfig() component.Config {
// Traces are directly insert into clickhouse.
func createTracesExporter(
ctx context.Context,
params exporter.CreateSettings,
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Traces, error) {
c := cfg.(*Config)
oce, err := newTracesExporter(params.Logger, c)
oce, err := newTracesExporter(set.Logger, c, &set)
if err != nil {
return nil, fmt.Errorf("cannot configure qryn traces exporter: %w", err)
}

return exporterhelper.NewTracesExporter(
ctx,
params,
set,
cfg,
oce.pushTraceData,
exporterhelper.WithShutdown(oce.Shutdown),
exporterhelper.WithTimeout(c.TimeoutSettings),
exporterhelper.WithQueue(c.enforcedQueueSettings()),
exporterhelper.WithQueue(c.QueueSettings),
exporterhelper.WithRetry(c.BackOffConfig),
)
}
Expand All @@ -81,7 +81,7 @@ func createLogsExporter(
cfg component.Config,
) (exporter.Logs, error) {
c := cfg.(*Config)
exporter, err := newLogsExporter(set.Logger, c)
exporter, err := newLogsExporter(set.Logger, c, &set)
if err != nil {
return nil, fmt.Errorf("cannot configure qryn logs exporter: %w", err)
}
Expand All @@ -93,7 +93,7 @@ func createLogsExporter(
exporter.pushLogsData,
exporterhelper.WithShutdown(exporter.Shutdown),
exporterhelper.WithTimeout(c.TimeoutSettings),
exporterhelper.WithQueue(c.enforcedQueueSettings()),
exporterhelper.WithQueue(c.QueueSettings),
exporterhelper.WithRetry(c.BackOffConfig),
)
}
Expand All @@ -106,7 +106,7 @@ func createMetricsExporter(
cfg component.Config,
) (exporter.Metrics, error) {
c := cfg.(*Config)
exporter, err := newMetricsExporter(set.Logger, c)
exporter, err := newMetricsExporter(set.Logger, c, &set)
if err != nil {
return nil, fmt.Errorf("cannot configure qryn logs exporter: %w", err)
}
Expand All @@ -118,7 +118,7 @@ func createMetricsExporter(
exporter.pushMetricsData,
exporterhelper.WithShutdown(exporter.Shutdown),
exporterhelper.WithTimeout(c.TimeoutSettings),
exporterhelper.WithQueue(c.enforcedQueueSettings()),
exporterhelper.WithQueue(c.QueueSettings),
exporterhelper.WithRetry(c.BackOffConfig),
)
}
20 changes: 15 additions & 5 deletions exporter/qrynexporter/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/go-logfmt/logfmt"
"github.com/prometheus/common/model"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)

Expand All @@ -30,6 +32,7 @@ const (

type logsExporter struct {
logger *zap.Logger
meter metric.Meter

db clickhouse.Conn

Expand All @@ -39,7 +42,7 @@ type logsExporter struct {
cluster bool
}

func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) {
func newLogsExporter(logger *zap.Logger, cfg *Config, set *exporter.CreateSettings) (*logsExporter, error) {
opts, err := clickhouse.ParseDSN(cfg.DSN)
if err != nil {
return nil, err
Expand All @@ -48,14 +51,20 @@ func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) {
if err != nil {
return nil, err
}
return &logsExporter{
exp := &logsExporter{
logger: logger,
meter: set.MeterProvider.Meter(typeStr),
db: db,
format: cfg.Logs.Format,
attributeLabels: cfg.Logs.AttributeLabels,
resourceLabels: cfg.Logs.ResourceLabels,
cluster: cfg.ClusteredClickhouse,
}, nil
}
if err := initMetrics(exp.meter); err != nil {
exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error()))
return exp, err
}
return exp, nil
}

// Shutdown will shutdown the exporter.
Expand Down Expand Up @@ -433,11 +442,12 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
}

if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeLogs)))
e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
}

otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess, dataTypeLogs)))
e.logger.Debug("pushLogsData", zap.Int("samples", len(samples)), zap.Int("timeseries", len(timeSeries)), zap.String("cost", time.Since(start).String()))

return nil
}

Expand Down
20 changes: 15 additions & 5 deletions exporter/qrynexporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/value"
"go.opentelemetry.io/collector/exporter"
conventions "go.opentelemetry.io/collector/model/semconv/v1.5.0"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)

Expand All @@ -28,14 +30,15 @@ const (

type metricsExporter struct {
logger *zap.Logger
meter metric.Meter

db clickhouse.Conn

namespace string
cluster bool
}

func newMetricsExporter(logger *zap.Logger, cfg *Config) (*metricsExporter, error) {
func newMetricsExporter(logger *zap.Logger, cfg *Config, set *exporter.CreateSettings) (*metricsExporter, error) {
opts, err := clickhouse.ParseDSN(cfg.DSN)
if err != nil {
return nil, err
Expand All @@ -44,12 +47,18 @@ func newMetricsExporter(logger *zap.Logger, cfg *Config) (*metricsExporter, erro
if err != nil {
return nil, err
}
return &metricsExporter{
exp := &metricsExporter{
logger: logger,
meter: set.MeterProvider.Meter(typeStr),
db: db,
namespace: cfg.Metrics.Namespace,
cluster: cfg.ClusteredClickhouse,
}, nil
}
if err := initMetrics(exp.meter); err != nil {
exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error()))
return exp, err
}
return exp, nil
}

// Shutdown will shutdown the exporter.
Expand Down Expand Up @@ -483,11 +492,12 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric
}

if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeMetrics)))
e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
}

otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess, dataTypeMetrics)))
e.logger.Debug("pushMetricsData", zap.Int("samples", len(samples)), zap.Int("timeseries", len(timeSeries)), zap.String("cost", time.Since(start).String()))

return nil
}

Expand Down
43 changes: 43 additions & 0 deletions exporter/qrynexporter/self_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package qrynexporter

import (
"fmt"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

const (
prefix = "exporter_qryn_"

errorCodeError = "1"
errorCodeSuccess = ""

dataTypeLogs = "logs"
dataTypeMetrics = "metrics"
dataTypeTraces = "traces"
)

var (
otelcolExporterQrynBatchInsertDurationMillis metric.Int64Histogram
)

func initMetrics(meter metric.Meter) error {
var err error
if otelcolExporterQrynBatchInsertDurationMillis, err = meter.Int64Histogram(
fmt.Sprint(prefix, "batch_insert_duration_millis"),
metric.WithDescription("Qryn exporter batch insert duration in millis"),
metric.WithExplicitBucketBoundaries(0, 5, 10, 20, 50, 100, 200, 500, 1000, 5000),
); err != nil {
return err
}
return nil
}

func newOtelcolAttrSetBatch(errorCode string, dataType string) *attribute.Set {
s := attribute.NewSet(
attribute.KeyValue{Key: "error_code", Value: attribute.StringValue(errorCode)},
attribute.KeyValue{Key: "data_type", Value: attribute.StringValue(dataType)},
)
return &s
}
Loading

0 comments on commit 75892dc

Please sign in to comment.