Skip to content

Commit

Permalink
Merge pull request #302 from chirino/async-bench
Browse files Browse the repository at this point in the history
Adding benchmarks for the async rate limiters.
  • Loading branch information
alexsnaps authored May 21, 2024
2 parents f6f8f18 + 352158d commit 5368e72
Show file tree
Hide file tree
Showing 7 changed files with 344 additions and 72 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl Limiter {
cfg.name,
cfg.cache_size.or_else(guess_cache_size).unwrap(),
cfg.local,
cfg.broadcast,
Some(cfg.broadcast),
);
let rate_limiter_builder =
RateLimiterBuilder::with_storage(Storage::with_counter_storage(Box::new(storage)));
Expand Down
2 changes: 1 addition & 1 deletion limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ tokio = { version = "1", optional = true, features = [

[dev-dependencies]
serial_test = "3.0"
criterion = { version = "0.5.1", features = ["html_reports"] }
criterion = { version = "0.5.1", features = ["html_reports", "async_tokio"] }
redis-test = { version = "0.4.0", features = ["aio"] }
paste = "1"
rand = "0.8"
Expand Down
291 changes: 273 additions & 18 deletions limitador/benches/bench.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::future::Future;
use std::time::Instant;

use criterion::{black_box, criterion_group, criterion_main, Bencher, BenchmarkId, Criterion};
use rand::seq::SliceRandom;
use rand::SeedableRng;

use limitador::limit::Limit;
#[cfg(feature = "disk_storage")]
use limitador::storage::disk::{DiskStorage, OptimizeFor};
#[cfg(feature = "distributed_storage")]
use limitador::storage::distributed::CrInMemoryStorage;
use limitador::storage::in_memory::InMemoryStorage;
use limitador::storage::CounterStorage;
use limitador::RateLimiter;
use rand::SeedableRng;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use limitador::storage::redis::CachedRedisStorageBuilder;
use limitador::storage::{AsyncCounterStorage, CounterStorage};
use limitador::{AsyncRateLimiter, RateLimiter};

const SEED: u64 = 42;

Expand All @@ -18,9 +24,32 @@ criterion_group!(benches, bench_in_mem);
#[cfg(all(feature = "disk_storage", not(feature = "redis_storage")))]
criterion_group!(benches, bench_in_mem, bench_disk);
#[cfg(all(not(feature = "disk_storage"), feature = "redis_storage"))]
criterion_group!(benches, bench_in_mem, bench_redis);
#[cfg(all(feature = "disk_storage", feature = "redis_storage"))]
criterion_group!(benches, bench_in_mem, bench_disk, bench_redis);
criterion_group!(benches, bench_in_mem, bench_redis, bench_cached_redis);
#[cfg(all(
feature = "disk_storage",
feature = "redis_storage",
not(feature = "distributed_storage")
))]
criterion_group!(
benches,
bench_in_mem,
bench_disk,
bench_redis,
bench_cached_redis
);
#[cfg(all(
feature = "disk_storage",
feature = "redis_storage",
feature = "distributed_storage"
))]
criterion_group!(
benches,
bench_in_mem,
bench_disk,
bench_redis,
bench_cached_redis,
bench_distributed,
);

criterion_main!(benches);

Expand Down Expand Up @@ -70,7 +99,7 @@ impl Display for TestScenario {
}

fn bench_in_mem(c: &mut Criterion) {
let mut group = c.benchmark_group("In memory");
let mut group = c.benchmark_group("Memory");
for scenario in TEST_SCENARIOS {
group.bench_with_input(
BenchmarkId::new("is_rate_limited", scenario),
Expand Down Expand Up @@ -100,6 +129,63 @@ fn bench_in_mem(c: &mut Criterion) {
group.finish();
}

#[cfg(feature = "distributed_storage")]
fn bench_distributed(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

let mut group = c.benchmark_group("Distributed");
for scenario in TEST_SCENARIOS {
group.bench_with_input(
BenchmarkId::new("is_rate_limited", scenario),
scenario,
|b: &mut Bencher, test_scenario: &&TestScenario| {
runtime.block_on(async move {
let storage = Box::new(CrInMemoryStorage::new(
"test_node".to_owned(),
10_000,
"127.0.0.1:0".to_owned(),
None,
));
bench_is_rate_limited(b, test_scenario, storage);
})
},
);
group.bench_with_input(
BenchmarkId::new("update_counters", scenario),
scenario,
|b: &mut Bencher, test_scenario: &&TestScenario| {
runtime.block_on(async move {
let storage = Box::new(CrInMemoryStorage::new(
"test_node".to_owned(),
10_000,
"127.0.0.1:0".to_owned(),
None,
));
bench_update_counters(b, test_scenario, storage);
})
},
);
group.bench_with_input(
BenchmarkId::new("check_rate_limited_and_update", scenario),
scenario,
|b: &mut Bencher, test_scenario: &&TestScenario| {
runtime.block_on(async move {
let storage = Box::new(CrInMemoryStorage::new(
"test_node".to_owned(),
10_000,
"127.0.0.1:0".to_owned(),
None,
));
bench_check_rate_limited_and_update(b, test_scenario, storage);
})
},
);
}
group.finish();
}
#[cfg(feature = "disk_storage")]
fn bench_disk(c: &mut Criterion) {
let mut group = c.benchmark_group("Disk");
Expand Down Expand Up @@ -138,6 +224,55 @@ fn bench_disk(c: &mut Criterion) {
group.finish();
}

#[cfg(feature = "redis_storage")]
fn bench_cached_redis(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

async fn create_storage() -> Box<dyn AsyncCounterStorage> {
let storage_builder = CachedRedisStorageBuilder::new("redis://127.0.0.1:6379");
let storage = storage_builder
.build()
.await
.expect("We need a Redis running locally");
storage.clear().await.unwrap();
Box::new(storage)
}

let mut group = c.benchmark_group("CachedRedis");
for scenario in TEST_SCENARIOS {
group.bench_with_input(
BenchmarkId::new("is_rate_limited", scenario),
scenario,
|b: &mut Bencher, test_scenario: &&TestScenario| {
async_bench_is_rate_limited(&runtime, b, test_scenario, create_storage);
},
);
group.bench_with_input(
BenchmarkId::new("update_counters", scenario),
scenario,
|b: &mut Bencher, test_scenario: &&TestScenario| {
async_bench_update_counters(&runtime, b, test_scenario, create_storage);
},
);
group.bench_with_input(
BenchmarkId::new("check_rate_limited_and_update", scenario),
scenario,
|b: &mut Bencher, test_scenario: &&TestScenario| {
async_bench_check_rate_limited_and_update(
&runtime,
b,
test_scenario,
create_storage,
);
},
);
}
group.finish();
}

#[cfg(feature = "redis_storage")]
fn bench_redis(c: &mut Criterion) {
let mut group = c.benchmark_group("Redis");
Expand Down Expand Up @@ -195,6 +330,37 @@ fn bench_is_rate_limited(
})
}

fn async_bench_is_rate_limited<F>(
runtime: &tokio::runtime::Runtime,
b: &mut Bencher,
test_scenario: &TestScenario,
storage: fn() -> F,
) where
F: Future<Output = Box<dyn AsyncCounterStorage>>,
{
b.to_async(runtime).iter_custom(|iters| async move {
let storage = storage().await;
let (rate_limiter, call_params) = generate_async_test_data(test_scenario, storage);
let rng = &mut rand::rngs::StdRng::seed_from_u64(SEED);

let start = Instant::now();
for _i in 0..iters {
black_box({
let params = call_params.choose(rng).unwrap();
rate_limiter
.is_rate_limited(
&params.namespace.to_owned().into(),
&params.values,
params.delta,
)
.await
.unwrap()
});
}
start.elapsed()
})
}

fn bench_update_counters(
b: &mut Bencher,
test_scenario: &TestScenario,
Expand All @@ -214,7 +380,37 @@ fn bench_update_counters(
params.delta,
)
.unwrap();
black_box(())
})
}

fn async_bench_update_counters<F>(
runtime: &tokio::runtime::Runtime,
b: &mut Bencher,
test_scenario: &TestScenario,
storage: fn() -> F,
) where
F: Future<Output = Box<dyn AsyncCounterStorage>>,
{
b.to_async(runtime).iter_custom(|iters| async move {
let storage = storage().await;
let (rate_limiter, call_params) = generate_async_test_data(test_scenario, storage);
let rng = &mut rand::rngs::StdRng::seed_from_u64(SEED);

let start = Instant::now();
for _i in 0..iters {
black_box({
let params = call_params.choose(rng).unwrap();
rate_limiter
.update_counters(
&params.namespace.to_owned().into(),
&params.values,
params.delta,
)
.await
})
.unwrap();
}
start.elapsed()
})
}

Expand Down Expand Up @@ -243,6 +439,39 @@ fn bench_check_rate_limited_and_update(
})
}

fn async_bench_check_rate_limited_and_update<F>(
runtime: &tokio::runtime::Runtime,
b: &mut Bencher,
test_scenario: &TestScenario,
storage: fn() -> F,
) where
F: Future<Output = Box<dyn AsyncCounterStorage>>,
{
b.to_async(runtime).iter_custom(|iters| async move {
let storage = storage().await;
let (rate_limiter, call_params) = generate_async_test_data(test_scenario, storage);
let rng = &mut rand::rngs::StdRng::seed_from_u64(SEED);

let start = Instant::now();
for _i in 0..iters {
black_box({
let params = call_params.choose(rng).unwrap();

rate_limiter
.check_rate_limited_and_update(
&params.namespace.to_owned().into(),
&params.values,
params.delta,
false,
)
.await
.unwrap()
});
}
start.elapsed()
})
}

// Notice that this function creates all the limits with the same conditions and
// variables. Also, all the conditions have the same format: "cond_x == 1".
// That's to simplify things, those are not the aspects that should have the
Expand All @@ -255,6 +484,39 @@ fn generate_test_data(
scenario: &TestScenario,
storage: Box<dyn CounterStorage>,
) -> (RateLimiter, Vec<TestCallParams>) {
let rate_limiter = RateLimiter::new_with_storage(storage);

let (test_limits, call_params) = generate_test_limits(scenario);
for limit in test_limits {
rate_limiter.add_limit(limit);
}

(rate_limiter, call_params)
}

// Notice that this function creates all the limits with the same conditions and
// variables. Also, all the conditions have the same format: "cond_x == 1".
// That's to simplify things, those are not the aspects that should have the
// greatest impact on performance.
// The limits generated are big enough to avoid being rate-limited during the
// benchmark.
// Note that with this test data each request only increases one counter, we can
// that as another variable in the future.
fn generate_async_test_data(
scenario: &TestScenario,
storage: Box<dyn AsyncCounterStorage>,
) -> (AsyncRateLimiter, Vec<TestCallParams>) {
let rate_limiter = AsyncRateLimiter::new_with_storage(storage);

let (test_limits, call_params) = generate_test_limits(scenario);
for limit in test_limits {
rate_limiter.add_limit(limit);
}

(rate_limiter, call_params)
}

fn generate_test_limits(scenario: &TestScenario) -> (Vec<Limit>, Vec<TestCallParams>) {
let mut test_values: HashMap<String, String> = HashMap::new();

let mut conditions = vec![];
Expand Down Expand Up @@ -293,12 +555,5 @@ fn generate_test_data(
delta: 1,
});
}

let rate_limiter = RateLimiter::new_with_storage(storage);

for limit in test_limits {
rate_limiter.add_limit(limit);
}

(rate_limiter, call_params)
(test_limits, call_params)
}
Loading

0 comments on commit 5368e72

Please sign in to comment.