Skip to content

Commit

Permalink
Merge pull request #112 from xing/v4.x
Browse files Browse the repository at this point in the history
Merge 4.x changes into master
  • Loading branch information
jojahner committed Sep 3, 2024
2 parents 5f55e85 + 0918ab4 commit edf1d54
Show file tree
Hide file tree
Showing 18 changed files with 260 additions and 64 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
matrix:
ruby-version: [3.1.6, 3.2.5, 3.3.4]
redis-version: [4, 5]
rails-version: [6.1.7.8, 7.0.8.4, 7.1.3.4]
rails-version: [6.1.7.8, 7.0.8.4, 7.1.4, 7.2.1]

steps:
- uses: actions/checkout@v3
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ gemfiles/*
/master-client.txt
/master-server.txt
/redis-master-rcc
/gems
/bundler
4 changes: 2 additions & 2 deletions Appraisals
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
rails_versions = [
"6.1.7.8",
"7.0.8.4",
"7.1.3.4",
"7.2.0"
"7.1.4",
"7.2.1"
]
rails_versions.each do |rails_version|
appraise "redis_4_rails_#{rails_version}" do
Expand Down
1 change: 0 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@ source 'https://rubygems.org'
gemspec

gem "hiredis-client"

# gem 'bunny', '=0.7.10', :path => "#{ENV['HOME']}/src/bunny"
9 changes: 9 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Release Notes

## Version 4.0.0.rc2
* The Beetle::Configuration.server_connection_options now take true and false for the SSL configuration.

## Version 4.0.0.rc1
* [breaking] Remove Beetle::Configuration.api_port without replacement. The port is now always derived from server port
* Allow to specify connection options on a per server basis using a new setting Beetle::Configuration.server_connection_options.
This can be used to selectively overwrite credentials and ssl settings for specific servers.
The latter change is backwards-compatible.

## Version 3.5.7
* Require bunny versions (`~> 0.7.13`) that work for Ruby 3.2 out of
the box.
Expand Down
1 change: 1 addition & 0 deletions beetle.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Gem::Specification.new do |s|
s.require_paths = ["lib"]
s.test_files = Dir['test/**/*.rb']
s.metadata = {
'allowed_push_host' => "https://gems.xing.com",
"changelog_uri" => "https://github.com/xing/beetle/blob/master/RELEASE_NOTES.rdoc"
}

Expand Down
5 changes: 5 additions & 0 deletions lib/beetle/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ def current_port
@server =~ /:(\d+)$/ ? $1.to_i : 5672
end


def connection_options_for_server(server)
@client.config.connection_options_for_server(server)
end

def set_current_server(s)
@server = s
end
Expand Down
33 changes: 29 additions & 4 deletions lib/beetle/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,13 @@ class Configuration

# list of amqp servers to use (defaults to <tt>"localhost:5672"</tt>)
attr_accessor :servers

# list of additional amqp servers to use for subscribers (defaults to <tt>""</tt>)
attr_accessor :additional_subscription_servers

# a hash mapping a server name to a hash of connection options for that server or additional subscription server
attr_accessor :server_connection_options

# the virtual host to use on the AMQP servers (defaults to <tt>"/"</tt>)
attr_accessor :vhost
# the AMQP user to use when connecting to the AMQP servers (defaults to <tt>"guest"</tt>)
Expand Down Expand Up @@ -118,9 +123,6 @@ class Configuration
# Write timeout for http requests to RabbitMQ HTTP API
attr_accessor :rabbitmq_api_write_timeout

# Returns the port on which the Rabbit API is hosted
attr_accessor :api_port

# the socket timeout in seconds for message publishing (defaults to <tt>0</tt>).
# consider this a highly experimental feature for now.
attr_accessor :publishing_timeout
Expand Down Expand Up @@ -177,11 +179,11 @@ def initialize #:nodoc:
self.redis_configuration_client_ids = ""

self.servers = "localhost:5672"
self.server_connection_options = {}
self.additional_subscription_servers = ""
self.vhost = "/"
self.user = "guest"
self.password = "guest"
self.api_port = 15672
self.frame_max = 131072
self.channel_max = 2047
self.prefetch_count = 1
Expand Down Expand Up @@ -237,7 +239,30 @@ def redis_options
}
end

# Returns a hash of connection options for the given server.
# If no server specific options are set, it constructs defaults which
# use the global user, password and vhost settings.
def connection_options_for_server(server)
overrides = server_connection_options[server] || {}

default_server_connection_options(server).merge(overrides)
end

private

def default_server_connection_options(server)
host, port = server.split(':')
port ||= 5672

{
host: host,
port: port.to_i,
user: user,
pass: password,
vhost: vhost,
}
end

def load_config
raw = ERB.new(IO.read(config_file)).result
hash = if config_file =~ /\.json$/
Expand Down
4 changes: 2 additions & 2 deletions lib/beetle/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,12 @@ def run_handler(handler)
Timer.timeout(@timeout.to_f) { @handler_result = handler.call(self) }
RC::OK
rescue Exception => @exception
ActiveRecord::Base.clear_all_connections! if defined?(ActiveRecord)
ActiveRecord::Base.connection_handler.clear_all_connections! if defined?(ActiveRecord)
Beetle::reraise_expectation_errors!
logger.debug "Beetle: message handler crashed on #{msg_id}"
RC::HandlerCrash
ensure
ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord)
ActiveRecord::Base.connection_handler.clear_active_connections! if defined?(ActiveRecord)
end

def run_handler!(handler)
Expand Down
17 changes: 10 additions & 7 deletions lib/beetle/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -161,18 +161,21 @@ def bunny?
end

def new_bunny
options = connection_options_for_server(@server)

b = Bunny.new(
:host => current_host,
:port => current_port,
:logging => !!@options[:logging],
:user => @client.config.user,
:pass => @client.config.password,
:vhost => @client.config.vhost,
:host => options[:host],
:port => options[:port],
:user => options[:user],
:pass => options[:pass],
:vhost => options[:vhost],
:ssl => options[:ssl] || false,
:frame_max => @client.config.frame_max,
:channel_max => @client.config.channel_max,
:socket_timeout => @client.config.publishing_timeout,
:connect_timeout => @client.config.publisher_connect_timeout,
:spec => '09')
:spec => '09',
:logging => !!@options[:logging])
b.start
b
end
Expand Down
62 changes: 26 additions & 36 deletions lib/beetle/queue_properties.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ def set_queue_policy!(server, queue_name, options={})

# no need to worry that the server has the port 5672. Net:HTTP will take care of this. See below.
policy_name = "#{queue_name}_policy"
request_url = URI("http://#{server}/api/policies/#{vhost}/#{policy_name}")
get_request = Net::HTTP::Get.new(request_url)
put_request = Net::HTTP::Put.new(request_url)
delete_request = Net::HTTP::Delete.new(request_url)
request_path = "/api/policies/#{vhost}/#{policy_name}"

# set up queue policy
definition = {}
Expand All @@ -70,29 +67,24 @@ def set_queue_policy!(server, queue_name, options={})

is_default_policy = definition == config.broker_default_policy

get_response = run_rabbit_http_request(request_url, get_request) do |http|
http.request(get_request, nil)
end
get_response = run_api_request(server, Net::HTTP::Get, request_path)

case get_response.code
when "200"
response_body = JSON.parse(get_response.body) rescue {}
same_policy = put_request_body.all? { |k,v| response_body[k] == v }
if same_policy
if is_default_policy
run_rabbit_http_request(request_url, delete_request) do |http|
http.request(get_request, nil)
end
run_api_request(server, Net::HTTP::Delete, request_path)
end

return :ok
end
when "404"
return :ok if is_default_policy
end

put_response = run_rabbit_http_request(request_url, put_request) do |http|
http.request(put_request, put_request_body.to_json)
end
put_response = run_api_request(server, Net::HTTP::Put, request_path, put_request_body.to_json)

unless %w(200 201 204).include?(put_response.code)
log_error("Failed to create policy for queue #{queue_name}", put_response)
Expand Down Expand Up @@ -125,12 +117,7 @@ def remove_obsolete_bindings(server, queue_name, bindings)
end

def retrieve_bindings(server, queue_name)
request_url = URI("http://#{server}/api/queues/#{vhost}/#{queue_name}/bindings")
request = Net::HTTP::Get.new(request_url)

response = run_rabbit_http_request(request_url, request) do |http|
http.request(request)
end
response = run_api_request(server, Net::HTTP::Get, "/api/queues/#{vhost}/#{queue_name}/bindings")

unless response.code == "200"
log_error("Failed to retrieve bindings for queue #{queue_name}", response)
Expand All @@ -141,34 +128,37 @@ def retrieve_bindings(server, queue_name)
end

def remove_binding(server, queue_name, exchange, properties_key)
request_url = URI("http://#{server}/api/bindings/#{vhost}/e/#{exchange}/q/#{queue_name}/#{properties_key}")
request = Net::HTTP::Delete.new(request_url)

response = run_rabbit_http_request(request_url, request) do |http|
http.request(request)
end
response = run_api_request(server, Net::HTTP::Delete, "/api/bindings/#{vhost}/e/#{exchange}/q/#{queue_name}/#{properties_key}")

unless %w(200 201 204).include?(response.code)
log_error("Failed to remove obsolete binding for queue #{queue_name}", response)
raise FailedRabbitRequest.new("Could not retrieve queue bindings")
end
end

def run_rabbit_http_request(uri, request, &block)
request.basic_auth(config.user, config.password)
case request.class::METHOD
when 'GET'
request["Accept"] = "application/json"
when 'PUT'
request["Content-Type"] = "application/json"
def run_api_request(server, request_const, path, *request_args)
connection_options = config.connection_options_for_server(server)

derived_api_port = "1#{connection_options[:port]}".to_i
request_url = URI("http://#{connection_options[:host]}:#{derived_api_port}#{path}")

request = request_const.new(request_url).tap do |req|
req.basic_auth(connection_options[:user], connection_options[:pass])
case request_const::METHOD
when 'GET'
req["Accept"] = "application/json"
when 'PUT'
req["Content-Type"] = "application/json"
end
end
http = Net::HTTP.new(uri.hostname, config.api_port)

http = Net::HTTP.new(connection_options[:host], derived_api_port)
http.use_ssl = connection_options[:ssl]
http.read_timeout = config.rabbitmq_api_read_timeout
http.write_timeout = config.rabbitmq_api_write_timeout if http.respond_to?(:write_timeout=)
# don't do this in production:
# http.set_debug_output(logger.instance_eval{ @logdev.dev })

http.start do |instance|
block.call(instance) if block_given?
instance.request(request, *request_args)
end
end

Expand Down
11 changes: 9 additions & 2 deletions lib/beetle/subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def initialize(client, options = {}) #:nodoc:
def listen_queues(queues) #:nodoc:
@listened_queues = queues
@exchanges_for_queues = exchanges_for_queues(queues)

EM.run do
each_server_sorted_randomly do
connect_server connection_settings
Expand Down Expand Up @@ -229,9 +230,15 @@ def bind_queue!(queue, exchange_name, binding_options)
end

def connection_settings
options = connection_options_for_server(@server)
{
:host => current_host, :port => current_port, :logging => false,
:user => @client.config.user, :pass => @client.config.password, :vhost => @client.config.vhost,
:host => options[:host],
:port => options[:port],
:user => options[:user],
:pass => options[:pass],
:vhost => options[:vhost],
:ssl => options[:ssl],
:logging => false,
:on_tcp_connection_failure => on_tcp_connection_failure,
:on_possible_authentication_failure => on_possible_authentication_failure,
}
Expand Down
2 changes: 1 addition & 1 deletion lib/beetle/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Beetle
VERSION = "3.5.7"
VERSION = "4.0.0.rc2"
end
1 change: 0 additions & 1 deletion test/beetle/base_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,5 @@ def setup
@client.expects(:update_queue_properties!).with(options.merge(:server => "localhost:5672"))
@bs.__send__(:publish_policy_options, options)
end

end
end
47 changes: 47 additions & 0 deletions test/beetle/configuration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,51 @@ class ConfigurationTest < Minitest::Test
assert_equal "10.0.0.1:3001", config.additional_subscription_servers
end
end

class ConnectionOptionsForServerTest < Minitest::Test

test "returns the options for the server provided" do
config = Configuration.new
config.servers = 'localhost:5672'
config.server_connection_options["localhost:5672"] = {host: 'localhost', port: 5672, user: "john", pass: "doe", vhost: "test", ssl: false}

config.connection_options_for_server("localhost:5672").tap do |options|
assert_equal "localhost", options[:host]
assert_equal 5672, options[:port]
assert_equal "john", options[:user]
assert_equal "doe", options[:pass]
assert_equal "test", options[:vhost]
assert_equal false, options[:ssl]
end
end

test "returns default options if no specific options are set for the server" do
config = Configuration.new
config.servers = 'localhost:5672'

config.connection_options_for_server("localhost:5672").tap do |options|
assert_equal "localhost", options[:host]
assert_equal 5672, options[:port]
assert_equal "guest", options[:user]
assert_equal "guest", options[:pass]
assert_equal "/", options[:vhost]
assert_nil options[:ssl]
end
end

test "allows to set specific options while retaining defaults for the rest" do
config = Configuration.new
config.servers = 'localhost:5672'
config.server_connection_options["localhost:5672"] = { pass: "another_pass", ssl: true }

config.connection_options_for_server("localhost:5672").tap do |options|
assert_equal "localhost", options[:host]
assert_equal 5672, options[:port]
assert_equal "guest", options[:user]
assert_equal "another_pass", options[:pass]
assert_equal "/", options[:vhost]
assert_equal true, options[:ssl]
end
end
end
end
Loading

0 comments on commit edf1d54

Please sign in to comment.