Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V2 rebased #220

Draft
wants to merge 18 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/deimos.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require 'karafka'

require 'deimos/version'
require 'deimos/logging'
require 'deimos/config/configuration'
require 'deimos/producer'
require 'deimos/active_record_producer'
Expand Down
35 changes: 2 additions & 33 deletions lib/deimos/backends/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ class << self
# @param messages [Array<Deimos::Message>]
# @return [void]
def publish(producer_class:, messages:)
Deimos.config.logger.info(log_message(messages))
message = ::Deimos::Logging.messages_log_text(producer_class.karafka_config.payload_log, messages)
Deimos::Logging.log_info({message: 'Publishing Messages:'}.merge(message))
execute(producer_class: producer_class, messages: messages)
end

Expand All @@ -22,38 +23,6 @@ def execute(producer_class:, messages:)

private

def log_message(messages)
log_message = {
message: 'Publishing messages',
topic: messages.first&.topic
}

case Deimos.config.payload_log
when :keys
log_message.merge!(
payload_keys: messages.map(&:key)
)
when :count
log_message.merge!(
payloads_count: messages.count
)
when :headers
log_message.merge!(
payload_headers: messages.map(&:headers)
)
else
log_message.merge!(
payloads: messages.map do |message|
{
payload: message.payload,
key: message.key
}
end
)
end

log_message
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/deimos/consume/batch_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def _handle_batch_error(exception, payloads, metadata)
status:batch_error
topic:#{metadata[:topic]}
))
Deimos.config.logger.warn(
Deimos::Logging.log_warn(
message: 'Error consuming message batch',
handler: self.class.name,
metadata: metadata.except(:keys),
Expand Down
4 changes: 2 additions & 2 deletions lib/deimos/consume/message_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def consume(_payload, _metadata)
private

def _received_message(payload, metadata)
Deimos.config.logger.info(
Deimos::Logging.log_info(
message: 'Got Kafka event',
payload: payload,
metadata: metadata
Expand Down Expand Up @@ -84,7 +84,7 @@ def _handle_success(time_taken, payload, metadata)
status:success
topic:#{metadata[:topic]}
))
Deimos.config.logger.info(
Deimos::Logging.log_info(
message: 'Finished processing Kafka event',
payload: payload,
time_elapsed: time_taken,
Expand Down
71 changes: 71 additions & 0 deletions lib/deimos/logging.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
module Deimos
module Logging
class << self

def log_add(method, msg)
Karafka.logger.tagged('Deimos') do |logger|
logger.send(method, msg.to_json)
end

end

def log_info(*args)
log_add(:info, *args)
end

def log_debug(*args)
log_add(:debug, *args)
end

def log_error(*args)
log_add(:error, *args)
end

def log_warn(*args)
log_add(:warn, *args)
end

def metadata_log_text(metadata)
metadata.to_h.slice(:timestamp, :offset, :first_offset, :last_offset, :partition, :topic, :size)
end

def _payloads(messages)

end

def messages_log_text(payload_log, messages)
log_message = {}

case payload_log
when :keys
keys = messages.map do |m|
m.respond_to?(:payload) ? m.key || m.payload['message_id'] : m[:key] || m[:payload]['message_id']
end
log_message.merge!(
payload_keys: keys
)
when :count
log_message.merge!(
payloads_count: messages.count
)
when :headers
log_message.merge!(
payload_headers: messages.map { |m| m.respond_to?(:headers) ? m.headers : m[:headers] }
)
else
log_message.merge!(
payloads: messages.map do |m|
{
payload: m.respond_to?(:payload) ? m.payload : m[:payload],
key: m.respond_to?(:payload) ? m.key : m[:key]
}
end
)
end

log_message
end

end
end
end
2 changes: 1 addition & 1 deletion lib/deimos/schema_backends/avro_schema_registry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def avro_turf_messaging
user: Deimos.config.schema.user,
password: Deimos.config.schema.password,
namespace: @namespace,
logger: Deimos.config.logger
logger: Karafka.logger
)
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/deimos/utils/db_poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def self.start!
end
executor = Sigurd::Executor.new(pollers,
sleep_seconds: 5,
logger: Deimos.config.logger)
logger: Karafka.logger)
signal_handler = Sigurd::SignalHandler.new(executor)
signal_handler.run!
end
Expand Down
14 changes: 7 additions & 7 deletions lib/deimos/utils/db_poller/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ def start
if Deimos.config.producers.backend == :kafka_async
Deimos.config.producers.backend = :kafka
end
Deimos.config.logger.info('Starting...')
Deimos::Logging.log_info('Starting...')
@signal_to_stop = false
ActiveRecord::Base.connection.reconnect! unless ActiveRecord::Base.connection.open_transactions.positive?

retrieve_poll_info
loop do
if @signal_to_stop
Deimos.config.logger.info('Shutting down')
Deimos::Logging.log_info('Shutting down')
break
end
process_updates if should_run?
Expand Down Expand Up @@ -95,7 +95,7 @@ def should_run?
# Stop the poll.
# @return [void]
def stop
Deimos.config.logger.info('Received signal to stop')
Deimos::Logging.log_info('Received signal to stop')
@signal_to_stop = true
end

Expand All @@ -111,9 +111,9 @@ def process_updates
# @param span [Object]
# @return [Boolean]
def handle_message_too_large(exception, batch, status, span)
Deimos.config.logger.error("Error publishing through DB Poller: #{exception.message}")
Deimos::Logging.log_error("Error publishing through DB Poller: #{exception.message}")
if @config.skip_too_large_messages
Deimos.config.logger.error("Skipping messages #{batch.map(&:id).join(', ')} since they are too large")
Deimos::Logging.log_error("Skipping messages #{batch.map(&:id).join(', ')} since they are too large")
Deimos.config.tracer&.set_error(span, exception)
status.batches_errored += 1
true
Expand Down Expand Up @@ -145,13 +145,13 @@ def process_batch_with_span(batch, status)
sleep(0.5)
retry
rescue StandardError => e
Deimos.config.logger.error("Error publishing through DB poller: #{e.message}}")
Deimos::Logging.log_error("Error publishing through DB poller: #{e.message}}")
if @config.retries.nil? || retries < @config.retries
retries += 1
sleep(0.5)
retry
else
Deimos.config.logger.error('Retries exceeded, moving on to next batch')
Deimos::Logging.log_error('Retries exceeded, moving on to next batch')
Deimos.config.tracer&.set_error(span, e)
status.batches_errored += 1
return false
Expand Down
6 changes: 3 additions & 3 deletions lib/deimos/utils/db_poller/state_based.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ class StateBased < Base
# Send messages for updated data.
# @return [void]
def process_updates
Deimos.config.logger.info("Polling #{log_identifier}")
Deimos::Logging.log_info("Polling #{log_identifier}")
status = PollStatus.new(0, 0, 0)
first_batch = true

# poll_query gets all the relevant data from the database, as defined
# by the producer itself.
loop do
Deimos.config.logger.debug("Polling #{log_identifier}, batch #{status.current_batch}")
Deimos::Logging.log_debug("Polling #{log_identifier}, batch #{status.current_batch}")
batch = fetch_results.to_a
break if batch.empty?

Expand All @@ -29,7 +29,7 @@ def process_updates
# If there were no results at all, we update last_sent so that we still get a wait
# before the next poll.
@info.touch(:last_sent) if first_batch
Deimos.config.logger.info("Poll #{log_identifier} complete (#{status.report}")
Deimos::Logging.log_info("Poll #{log_identifier} complete (#{status.report}")
end

# @return [ActiveRecord::Relation]
Expand Down
6 changes: 3 additions & 3 deletions lib/deimos/utils/db_poller/time_based.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ def process_and_touch_info(batch, status)
def process_updates
time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone
time_to = Time.zone.now - @config.delay_time
Deimos.config.logger.info("Polling #{log_identifier} from #{time_from} to #{time_to}")
Deimos::Logging.log_info("Polling #{log_identifier} from #{time_from} to #{time_to}")
status = PollStatus.new(0, 0, 0)
first_batch = true

# poll_query gets all the relevant data from the database, as defined
# by the producer itself.
loop do
Deimos.config.logger.debug("Polling #{log_identifier}, batch #{status.current_batch}")
Deimos::Logging.log_debug("Polling #{log_identifier}, batch #{status.current_batch}")
batch = fetch_results(time_from, time_to).to_a
break if batch.empty?

Expand All @@ -47,7 +47,7 @@ def process_updates
# If there were no results at all, we update last_sent so that we still get a wait
# before the next poll.
@info.touch(:last_sent) if first_batch
Deimos.config.logger.info("Poll #{log_identifier} complete at #{time_to} (#{status.report})")
Deimos::Logging.log_info("Poll #{log_identifier} complete at #{time_to} (#{status.report})")
end

# @param time_from [ActiveSupport::TimeWithZone]
Expand Down
2 changes: 1 addition & 1 deletion lib/deimos/utils/deadlock_retry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def wrap(tags=[])
# Reraise if all retries exhausted
raise if count <= 0

Deimos.config.logger.warn(
Deimos::Logging.log_warn(
message: 'Deadlock encountered when trying to execute query. '\
"Retrying. #{count} attempt(s) remaining",
tags: tags
Expand Down
23 changes: 0 additions & 23 deletions spec/backends/base_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,4 @@
described_class.publish(producer_class: MyProducer, messages: messages)
end

describe 'payload_log method' do
it 'should return whole payload (default behavior)' do
log_message = described_class.send(:log_message, messages)
expect(log_message[:payloads].count).to eq(3)
expect(log_message[:payloads].first[:payload]).to eq({ 'foo' => 1 })
expect(log_message[:payloads].first[:key]).to eq('foo1')
end

it 'should return only keys of messages' do
Deimos.config.payload_log = :keys
log_message = described_class.send(:log_message, messages)
expect(log_message[:payload_keys].count).to eq(3)
expect(log_message[:payload_keys]).to be_a(Array)
expect(log_message[:payload_keys].first).to eq('foo1')
end

it 'should return only messages count' do
Deimos.config.payload_log = :count
log_message = described_class.send(:log_message, messages)
expect(log_message[:payloads_count]).to be_a(Integer)
expect(log_message[:payloads_count]).to eq(3)
end
end
end
17 changes: 5 additions & 12 deletions spec/batch_consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -251,18 +251,11 @@ def consume_batch(_payloads, _metadata)
'timestamp' => 2.minutes.ago.to_s, 'message_id' => 'two' }
]

allow(Deimos.config.logger).
to receive(:info)

expect(Deimos.config.logger).
to receive(:info).
with(hash_including(
message_ids: [
{ key: 1, message_id: 'one' },
{ key: 2, message_id: 'two' }
]
)).
twice
allow(Deimos::Logging).to receive(:log_info)

expect(Deimos::Logging).
to receive(:log_info).
with(hash_including(payload_keys: ["1", "2"]))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it 'should log message identifiers'. The expectation does not seem to test the message_id part and removes it.

I think the title should change if you say that this is the correct way in v2.


test_consume_batch('my_batch_consume_topic', batch_with_message_id, keys: [1, 2])
end
Expand Down
25 changes: 25 additions & 0 deletions spec/logging_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
RSpec.describe Deimos::Logging do
include_context 'with publish_backend'
describe '#messages_log_text' do
it 'should return whole payload (default behavior)' do
log_message = described_class.messages_log_text(:payloads, messages)
expect(log_message[:payloads].count).to eq(3)
expect(log_message[:payloads].first[:payload]).to eq({ some_int: 1, test_id: 'foo1' })
expect(log_message[:payloads].first[:key]).to eq('foo1')
end

it 'should return only keys of messages' do
log_message = described_class.messages_log_text(:keys, messages)
expect(log_message[:payload_keys].count).to eq(3)
expect(log_message[:payload_keys]).to be_a(Array)
expect(log_message[:payload_keys].first).to eq('foo1')
end

it 'should return only messages count' do
log_message = described_class.messages_log_text(:count, messages)
expect(log_message[:payloads_count]).to be_a(Integer)
expect(log_message[:payloads_count]).to eq(3)
end
end

end