-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CCOL-2039: Post process valid and invalid records from batch consumption #207
Changes from 25 commits
6553167
ca6a1e3
3b7461e
8d347e4
4a688a0
385e58b
4661b4f
8813fa4
d6181fd
2d9e8dd
2cb9d4d
6a750c8
eae600e
2b8aa2d
ac27466
1a5b7db
ea099a3
1ea3069
1e5d3a5
74364f6
90b5cf7
ceff055
b61a403
97bd093
1ecee75
2b4dfc2
1c4e6b4
4e4f774
e21bded
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,8 +19,17 @@ class Consumer | |
class << self | ||
# @return [Deimos::SchemaBackends::Base] | ||
def decoder | ||
@decoder ||= Deimos.schema_backend(schema: config[:schema], | ||
namespace: config[:namespace]) | ||
return @decoder if @decoder | ||
|
||
@decoder = if Utils::SchemaClass.use?(config.to_h) | ||
# Initialize an instance of the provided schema | ||
# in the event the schema class is an override, the inherited | ||
# schema and namespace will be applied | ||
schema_class = "Schemas::#{config[:schema]}".constantize.new | ||
Deimos.schema_backend(schema: schema_class.schema, namespace: schema_class.namespace) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this be moved into the |
||
else | ||
Deimos.schema_backend(schema: config[:schema], namespace: config[:namespace]) | ||
end | ||
end | ||
|
||
# @return [Deimos::SchemaBackends::Base] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,15 +11,12 @@ def initialize(config) | |
raise 'Tracing config must specify service_name' if config[:service_name].nil? | ||
|
||
@service = config[:service_name] | ||
@tracer = ::Datadog.respond_to?(:tracer) ? ::Datadog.tracer : ::Datadog::Tracing | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This feels more like a method than an instance variable. It isn't actually state that needs to be stored. |
||
end | ||
|
||
# :nodoc: | ||
def start(span_name, options={}) | ||
span = if ::Datadog.respond_to?(:tracer) | ||
::Datadog.tracer.trace(span_name) | ||
else | ||
::Datadog::Tracing.trace(span_name) | ||
end | ||
span = @tracer.trace(span_name) | ||
span.service = @service | ||
span.resource = options[:resource] | ||
span | ||
|
@@ -32,7 +29,7 @@ def finish(span) | |
|
||
# :nodoc: | ||
def active_span | ||
::Datadog.tracer.active_span | ||
@tracer.active_span | ||
end | ||
|
||
# :nodoc: | ||
|
@@ -45,6 +42,11 @@ def set_tag(tag, value, span=nil) | |
(span || active_span).set_tag(tag, value) | ||
end | ||
|
||
# :nodoc: | ||
def get_tag(tag) | ||
active_span.get_tag(tag) | ||
end | ||
|
||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,6 +42,12 @@ def set_tag(tag, value, span=nil) | |
raise NotImplementedError | ||
end | ||
|
||
# Get a tag from a span with the specified tag. | ||
# @param tag [String] | ||
def get_tag(tag) | ||
raise NotImplementedError | ||
end | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to implement this for datadog. |
||
|
||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be added to the README.