Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to set connection options on a per server basis #107

Merged
merged 25 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
578044b
Use connection options to create bunny in publisher
david-krentzlin Aug 7, 2024
90ea9a7
Move per server options into configuration class
david-krentzlin Aug 7, 2024
82d37f6
Use connection options in subscriber
david-krentzlin Aug 7, 2024
8c979aa
Replace docker-compose with docker compose
david-krentzlin Aug 7, 2024
dfdd263
Add tests for configuration
david-krentzlin Aug 7, 2024
abbac65
Add tests that verifies custom options are used
david-krentzlin Aug 7, 2024
a692388
Add subscriber test with per server settings
david-krentzlin Aug 7, 2024
4344ec6
Also use correct admin connection options
david-krentzlin Aug 9, 2024
d2ef005
add tests
david-krentzlin Aug 9, 2024
4d8fdaf
sketch per server API base url
david-krentzlin Aug 9, 2024
daf442f
Configure ssl for admin connections
david-krentzlin Aug 12, 2024
623cddd
Log connection options
david-krentzlin Aug 12, 2024
e5effef
Set proper default port
david-krentzlin Aug 12, 2024
bcc3895
clean up
danielgoncharov Aug 13, 2024
9272748
wip
david-krentzlin Aug 13, 2024
f33c421
Add tests for api requests in queue properties
david-krentzlin Aug 13, 2024
db859ca
Overhaul API requests in queue_properties
david-krentzlin Aug 13, 2024
2f19c64
Update Gemfile
danielgoncharov Aug 15, 2024
89c84d0
naming
danielgoncharov Aug 15, 2024
9a24cc1
Revert "naming"
danielgoncharov Aug 15, 2024
c4c0980
naming
danielgoncharov Aug 15, 2024
d0e34c5
update tests
danielgoncharov Aug 15, 2024
6533727
change ssl to integer
arte7 Aug 15, 2024
c672059
adapt workflow to also run with v4.x branch
david-krentzlin Aug 15, 2024
01a016f
Merge branch 'per-server-config' of github.com:xing/beetle into per-s…
david-krentzlin Aug 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
run: sudo apt-get install redis

- name: Start required services
run: docker-compose up -d
run: docker compose up -d

- name: Install gems
run: bundle install && bundle exec appraisal install
Expand All @@ -53,5 +53,5 @@ jobs:
BUNDLE_GEMFILE: gemfiles/redis_${{ matrix.redis-version }}.gemfile

- name: Stop services
run: docker-compose down
run: docker compose down
if: always()
1 change: 0 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ source 'https://rubygems.org'
gemspec

gem "hiredis-client"

# gem 'bunny', '=0.7.10', :path => "#{ENV['HOME']}/src/bunny"

# Use patched appraisal gem until it is fixed upstream.
Expand Down
5 changes: 5 additions & 0 deletions lib/beetle/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ def current_port
@server =~ /:(\d+)$/ ? $1.to_i : 5672
end


def connection_options_for_server(server)
danielgoncharov marked this conversation as resolved.
Show resolved Hide resolved
@client.config.connection_options_for_server(server)
end

def set_current_server(s)
@server = s
end
Expand Down
18 changes: 18 additions & 0 deletions lib/beetle/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,13 @@ class Configuration

# list of amqp servers to use (defaults to <tt>"localhost:5672"</tt>)
attr_accessor :servers

# list of additional amqp servers to use for subscribers (defaults to <tt>""</tt>)
attr_accessor :additional_subscription_servers
david-krentzlin marked this conversation as resolved.
Show resolved Hide resolved

# a hash mapping a server name to a hash of connection options for that server or additional subscription server
attr_accessor :server_connection_options

# the virtual host to use on the AMQP servers (defaults to <tt>"/"</tt>)
attr_accessor :vhost
# the AMQP user to use when connecting to the AMQP servers (defaults to <tt>"guest"</tt>)
Expand Down Expand Up @@ -177,6 +182,7 @@ def initialize #:nodoc:
self.redis_configuration_client_ids = ""

self.servers = "localhost:5672"
self.server_connection_options = {}
self.additional_subscription_servers = ""
self.vhost = "/"
self.user = "guest"
Expand Down Expand Up @@ -237,7 +243,19 @@ def redis_options
}
end

def connection_options_for_server(server)
default_server_connection_options(server).merge(server_connection_options[server] || {})
end

private

def default_server_connection_options(server)
host, port = server.split(':')
danielgoncharov marked this conversation as resolved.
Show resolved Hide resolved
port ||= 5672

{ host: host, port: port.to_i, user: user, pass: password, vhost: vhost }
end

def load_config
raw = ERB.new(IO.read(config_file)).result
hash = if config_file =~ /\.json$/
Expand Down
17 changes: 10 additions & 7 deletions lib/beetle/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -161,18 +161,21 @@ def bunny?
end

def new_bunny
options = connection_options_for_server(@server)

b = Bunny.new(
:host => current_host,
:port => current_port,
:logging => !!@options[:logging],
:user => @client.config.user,
danielgoncharov marked this conversation as resolved.
Show resolved Hide resolved
:pass => @client.config.password,
:vhost => @client.config.vhost,
:host => options[:host],
:port => options[:port],
:user => options[:user],
:pass => options[:pass],
:vhost => options[:vhost],
:ssl => options[:ssl],
:frame_max => @client.config.frame_max,
:channel_max => @client.config.channel_max,
:socket_timeout => @client.config.publishing_timeout,
:connect_timeout => @client.config.publisher_connect_timeout,
:spec => '09')
:spec => '09',
:logging => !!@options[:logging])
b.start
b
end
Expand Down
11 changes: 9 additions & 2 deletions lib/beetle/subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def initialize(client, options = {}) #:nodoc:
def listen_queues(queues) #:nodoc:
@listened_queues = queues
@exchanges_for_queues = exchanges_for_queues(queues)

EM.run do
each_server_sorted_randomly do
connect_server connection_settings
Expand Down Expand Up @@ -229,9 +230,15 @@ def bind_queue!(queue, exchange_name, binding_options)
end

def connection_settings
options = connection_options_for_server(@server)
{
:host => current_host, :port => current_port, :logging => false,
:user => @client.config.user, :pass => @client.config.password, :vhost => @client.config.vhost,
:host => options[:host],
:port => options[:port],
:user => options[:user],
:pass => options[:pass],
:vhost => options[:vhost],
:ssl => options[:ssl],
:logging => false,
:on_tcp_connection_failure => on_tcp_connection_failure,
:on_possible_authentication_failure => on_possible_authentication_failure,
}
Expand Down
46 changes: 46 additions & 0 deletions test/beetle/configuration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,50 @@ class ConfigurationTest < Minitest::Test
assert_equal "10.0.0.1:3001", config.additional_subscription_servers
end
end

class ConnectionOptionsForServerTest < Minitest::Test
test "returns the options for the server provided" do
config = Configuration.new
config.servers = 'localhost:5672'
config.server_connection_options["localhost:5672"] = {host: 'localhost', port: 5672, user: "john", pass: "doe", vhost: "test", ssl: "0"}
danielgoncharov marked this conversation as resolved.
Show resolved Hide resolved

config.connection_options_for_server("localhost:5672").tap do |options|
assert_equal "localhost", options[:host]
assert_equal 5672, options[:port]
assert_equal "john", options[:user]
assert_equal "doe", options[:pass]
assert_equal "test", options[:vhost]
assert_equal "0", options[:ssl]
end
end

test "returns default options if no options are set for the server" do
config = Configuration.new
config.servers = 'localhost:5672'

config.connection_options_for_server("localhost:5672").tap do |options|
assert_equal "localhost", options[:host]
assert_equal 5672, options[:port]
assert_equal "guest", options[:user]
assert_equal "guest", options[:pass]
assert_equal "/", options[:vhost]
assert_nil options[:ssl]
end
end

test "allows to set specific options while retaining defaults for the rest" do
config = Configuration.new
config.servers = 'localhost:5672'
config.server_connection_options["localhost:5672"] = { pass: "another_pass", ssl: "1" }

config.connection_options_for_server("localhost:5672").tap do |options|
assert_equal "localhost", options[:host]
assert_equal 5672, options[:port]
assert_equal "guest", options[:user]
assert_equal "another_pass", options[:pass]
assert_equal "/", options[:vhost]
assert_equal "1", options[:ssl]
end
end
end
end
29 changes: 29 additions & 0 deletions test/beetle/publisher_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def setup
:user => "guest",
:pass => "guest",
:vhost => "/",
:ssl => nil,
:socket_timeout => 0,
:connect_timeout => 5,
:frame_max => 131072,
Expand All @@ -34,6 +35,34 @@ def setup
assert_equal m, @pub.send(:new_bunny)
end

test "new bunnies should be created using custom connection options and they should be started" do
config = Configuration.new
config.servers = 'localhost:5672'
config.server_connection_options["localhost:5672"] = { user: "john", pass: "doe", vhost: "test", ssl: "0" }
client = Client.new(config)
pub = Publisher.new(client)

m = mock("dummy")
expected_bunny_options = {
host: "localhost",
port: 5672,
user: "john",
pass: "doe",
vhost: "test",
ssl: "0",
socket_timeout: 0,
connect_timeout: 5,
frame_max: 131072,
channel_max: 2047,
spec: '09',
logging: false
}

Bunny.expects(:new).with(expected_bunny_options).returns(m)
m.expects(:start)
assert_equal m, pub.send(:new_bunny)
end

test "initially there should be no bunnies" do
assert_equal({}, @pub.instance_variable_get("@bunnies"))
end
Expand Down
34 changes: 29 additions & 5 deletions test/beetle/subscriber_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ def setup
@sub.stubs(:queues).returns("a" => q)
@sub.__send__(:resume, "a")
end

end

class AdditionalSubscriptionServersTest < Minitest::Test
Expand Down Expand Up @@ -415,12 +414,15 @@ def setup
assert_equal({:ack => true}, opts)
assert_equal 42, block.call(1)
end

end

class ConnectionTest < Minitest::Test
def setup
@client = Client.new
@config = Beetle::Configuration.new
@config.servers = "mickey:42"
@config.server_connection_options["mickey:42"] = { user: "john", pass: "doe", vhost: "test", ssl: "0" }

@client = Client.new(@config)
@sub = @client.send(:subscriber)
@sub.send(:set_current_server, "mickey:42")
@settings = @sub.send(:connection_settings)
Expand Down Expand Up @@ -470,6 +472,30 @@ def setup
assert_equal connection, @sub.instance_variable_get("@connections")["mickey:42"]
end

test "uses server connection options" do
@client.register_exchange(:an_exchange)
@client.register_queue(:a_queue, :exchange => :an_exchange)
@client.register_message(:a_message, :key => "foo", :exchange => :an_exchange)

connection = mock("connection")

connection.expects(:on_tcp_connection_loss)
danielgoncharov marked this conversation as resolved.
Show resolved Hide resolved
connection.expects(:next_channel_id).returns(1)
connection.expects(:auto_recovering?).returns(true)
connection.expects(:open?).returns(true)
connection.expects(:channel_max)
connection.expects(:on_connection)


EM.expects(:run).yields
EM.expects(:reactor_running?).returns(true)

AMQP.expects(:connect).once.with(has_entries(host: "mickey", port: 42, user: "john", pass: "doe", ssl: "0")).yields(connection)

@sub.listen_queues(["a_queue"])
end


test "channel opening, exchange creation, queue bindings and subscription" do
connection = mock("connection")
channel = mock("channel")
Expand All @@ -482,7 +508,5 @@ def setup
@sub.send(:open_channel_and_subscribe, connection, @settings)
assert_equal channel, @sub.instance_variable_get("@channels")["mickey:42"]
end

end

end
Loading