Skip to content

Commit

Permalink
Merge pull request #61 from metrico/pre-release
Browse files Browse the repository at this point in the history
Pre release
  • Loading branch information
akvlad committed Feb 8, 2024
2 parents 145e994 + ce96d49 commit 07da0ba
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 9 deletions.
4 changes: 2 additions & 2 deletions exporter/qrynexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ type QueueSettings struct {

// LogsConfig holds the configuration for log data.
type LogsConfig struct {
// AttritubeLabels is the string representing attribute labels.
AttritubeLabels string `mapstructure:"attritube_labels"`
// AttributeLabels is the string representing attribute labels.
AttributeLabels string `mapstructure:"attribute_labels"`
// ResourceLabels is the string representing resource labels.
ResourceLabels string `mapstructure:"resource_labels"`
// Format is the string representing the format.
Expand Down
18 changes: 13 additions & 5 deletions exporter/qrynexporter/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type logsExporter struct {

db clickhouse.Conn

attritubeLabels string
attributeLabels string
resourceLabels string
format string
cluster bool
Expand All @@ -52,7 +52,7 @@ func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) {
logger: logger,
db: db,
format: cfg.Logs.Format,
attritubeLabels: cfg.Logs.AttritubeLabels,
attributeLabels: cfg.Logs.AttributeLabels,
resourceLabels: cfg.Logs.ResourceLabels,
cluster: cfg.ClusteredClickhouse,
}, nil
Expand Down Expand Up @@ -107,13 +107,13 @@ func (e *logsExporter) convertAttributesAndMerge(logAttrs pcommon.Map, resAttrs
out = out.Merge(labels)
}

if e.attritubeLabels != "" {
if e.attributeLabels != "" {
labels := convertSelectedAttributesToLabels(logAttrs, pcommon.NewValueStr(e.resourceLabels))
out = out.Merge(labels)
}

if e.resourceLabels != "" {
labels := convertSelectedAttributesToLabels(resAttrs, pcommon.NewValueStr(e.attritubeLabels))
labels := convertSelectedAttributesToLabels(resAttrs, pcommon.NewValueStr(e.attributeLabels))
out = out.Merge(labels)
}

Expand Down Expand Up @@ -389,6 +389,8 @@ func convertLogToTimeSerie(fingerprint model.Fingerprint, log plog.LogRecord, la
}

func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
start := time.Now()

var (
samples []Sample
timeSeries []TimeSerie
Expand Down Expand Up @@ -430,7 +432,13 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
}
}

return batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries)
if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil {
return err
}

e.logger.Debug("pushLogsData", zap.Int("samples", len(samples)), zap.Int("timeseries", len(timeSeries)), zap.String("cost", time.Since(start).String()))

return nil
}

func batchSamplesAndTimeSeries(ctx context.Context, db clickhouse.Conn, samples []Sample, timeSeries []TimeSerie) error {
Expand Down
11 changes: 10 additions & 1 deletion exporter/qrynexporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"strconv"
"strings"
"time"
"unicode"

"github.com/ClickHouse/clickhouse-go/v2"
Expand Down Expand Up @@ -461,6 +462,8 @@ func (e *metricsExporter) collectFromMetric(metric pmetric.Metric, resource pcom
}

func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metrics) error {
start := time.Now()

// for collect samples and timeSeries
var (
samples []Sample
Expand All @@ -479,7 +482,13 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric
}
}

return batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries)
if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil {
return err
}

e.logger.Debug("pushMetricsData", zap.Int("samples", len(samples)), zap.Int("timeseries", len(timeSeries)), zap.String("cost", time.Since(start).String()))

return nil
}

// isValidAggregationTemporality checks whether an OTel metric has a valid
Expand Down
2 changes: 1 addition & 1 deletion exporter/qrynexporter/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) er
if err := e.exportResourceSapns(_ctx, td.ResourceSpans()); err != nil {
return err
}
e.logger.Info("pushTraceData", zap.Int("spanCount", td.SpanCount()), zap.String("cost", time.Since(start).String()))
e.logger.Debug("pushTraceData", zap.Int("spanCount", td.SpanCount()), zap.String("cost", time.Since(start).String()))
return nil
}

Expand Down

0 comments on commit 07da0ba

Please sign in to comment.