Skip to content

Commit

Permalink
Merge pull request #195 from Kuadrant/memleak-fix
Browse files Browse the repository at this point in the history
Store qualified counters in dedicated capacity limited cache
  • Loading branch information
alexsnaps authored Aug 7, 2023
2 parents 0987560 + c82a258 commit 317037d
Show file tree
Hide file tree
Showing 12 changed files with 426 additions and 185 deletions.
269 changes: 229 additions & 40 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions limitador-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ notify = "6.0.1"
const_format = "0.2.31"
lazy_static = "1.4.0"
clap = "4.3"
sysinfo = "0.29.7"

[build-dependencies]
tonic-build = "0.9.2"
11 changes: 9 additions & 2 deletions limitador-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ impl Default for Configuration {
fn default() -> Self {
Configuration {
limits_file: "".to_string(),
storage: StorageConfiguration::InMemory,
storage: StorageConfiguration::InMemory(InMemoryStorageConfiguration {
cache_size: Some(10_000),
}),
rls_host: "".to_string(),
rls_port: 0,
http_host: "".to_string(),
Expand All @@ -125,13 +127,18 @@ impl Default for Configuration {

#[derive(PartialEq, Eq, Debug)]
pub enum StorageConfiguration {
InMemory,
InMemory(InMemoryStorageConfiguration),
Disk(DiskStorageConfiguration),
Redis(RedisStorageConfiguration),
#[cfg(feature = "infinispan")]
Infinispan(InfinispanStorageConfiguration),
}

#[derive(PartialEq, Eq, Debug)]
pub struct InMemoryStorageConfiguration {
pub cache_size: Option<u64>,
}

#[derive(PartialEq, Eq, Debug)]
pub struct DiskStorageConfiguration {
pub path: String,
Expand Down
8 changes: 4 additions & 4 deletions limitador-server/src/envoy_rls/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ mod tests {
vec!["app_id"],
);

let limiter = RateLimiter::default();
let limiter = RateLimiter::new(10_000);
limiter.add_limit(limit);

let rate_limiter = MyRateLimiter::new(
Expand Down Expand Up @@ -366,7 +366,7 @@ mod tests {

#[tokio::test]
async fn test_takes_into_account_all_the_descriptors() {
let limiter = RateLimiter::default();
let limiter = RateLimiter::new(10_000);

let namespace = "test_namespace";

Expand Down Expand Up @@ -434,7 +434,7 @@ mod tests {
let namespace = "test_namespace";
let limit = Limit::new(namespace, 10, 60, vec!["x == '1'"], vec!["y"]);

let limiter = RateLimiter::default();
let limiter = RateLimiter::new(10_000);
limiter.add_limit(limit);

let rate_limiter = MyRateLimiter::new(
Expand Down Expand Up @@ -499,7 +499,7 @@ mod tests {
let namespace = "test_namespace";
let limit = Limit::new(namespace, 1, 60, vec!["x == '1'"], vec!["y"]);

let limiter = RateLimiter::default();
let limiter = RateLimiter::new(10_000);
limiter.add_limit(limit);

let rate_limiter = MyRateLimiter::new(
Expand Down
52 changes: 42 additions & 10 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ extern crate clap;
#[cfg(feature = "infinispan")]
use crate::config::InfinispanStorageConfiguration;
use crate::config::{
Configuration, DiskStorageConfiguration, RedisStorageCacheConfiguration,
RedisStorageConfiguration, StorageConfiguration,
Configuration, DiskStorageConfiguration, InMemoryStorageConfiguration,
RedisStorageCacheConfiguration, RedisStorageConfiguration, StorageConfiguration,
};
use crate::envoy_rls::server::{run_envoy_rls_server, RateLimitHeaders};
use crate::http_api::server::run_http_server;
use clap::{value_parser, Arg, ArgAction, Command};
use const_format::formatcp;
use env_logger::Builder;
use limitador::counter::Counter;
use limitador::errors::LimitadorError;
use limitador::limit::Limit;
use limitador::storage::disk::DiskStorage;
Expand All @@ -38,6 +39,7 @@ use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use std::{env, process};
use sysinfo::{RefreshKind, System, SystemExt};
use thiserror::Error;
use tokio::runtime::Handle;

Expand Down Expand Up @@ -82,7 +84,9 @@ impl Limiter {
StorageConfiguration::Infinispan(cfg) => {
Self::infinispan_limiter(cfg, config.limit_name_in_labels).await
}
StorageConfiguration::InMemory => Self::in_memory_limiter(config),
StorageConfiguration::InMemory(cfg) => {
Self::in_memory_limiter(cfg, config.limit_name_in_labels)
}
StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg, config.limit_name_in_labels),
};

Expand Down Expand Up @@ -210,7 +214,7 @@ impl Limiter {
}
};
let mut rate_limiter_builder =
RateLimiterBuilder::new().storage(Storage::with_counter_storage(Box::new(storage)));
RateLimiterBuilder::with_storage(Storage::with_counter_storage(Box::new(storage)));

if limit_name_in_labels {
rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels()
Expand All @@ -219,10 +223,11 @@ impl Limiter {
Self::Blocking(rate_limiter_builder.build())
}

fn in_memory_limiter(cfg: Configuration) -> Self {
let mut rate_limiter_builder = RateLimiterBuilder::new();
fn in_memory_limiter(cfg: InMemoryStorageConfiguration, limit_name_in_labels: bool) -> Self {
let mut rate_limiter_builder =
RateLimiterBuilder::new(cfg.cache_size.or_else(guess_cache_size).unwrap());

if cfg.limit_name_in_labels {
if limit_name_in_labels {
rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels()
}

Expand Down Expand Up @@ -513,7 +518,16 @@ fn create_config() -> (Configuration, &'static str) {
.subcommand(
Command::new("memory")
.display_order(1)
.about("Counters are held in Limitador (ephemeral)"),
.about("Counters are held in Limitador (ephemeral)")
.arg(
Arg::new("CACHE_SIZE")
.long("cache")
.short('c')
.action(ArgAction::Set)
.value_parser(value_parser!(u64))
.display_order(1)
.help("Sets the size of the cache for 'qualified counters'"),
),
)
.subcommand(
Command::new("disk")
Expand Down Expand Up @@ -698,7 +712,9 @@ fn create_config() -> (Configuration, &'static str) {
consistency: Some(sub.get_one::<String>("consistency").unwrap().to_string()),
})
}
Some(("memory", _sub)) => StorageConfiguration::InMemory,
Some(("memory", sub)) => StorageConfiguration::InMemory(InMemoryStorageConfiguration {
cache_size: sub.get_one::<u64>("CACHE_SIZE").copied(),
}),
None => match storage_config_from_env() {
Ok(storage_cfg) => storage_cfg,
Err(_) => {
Expand Down Expand Up @@ -785,10 +801,26 @@ fn storage_config_from_env() -> Result<StorageConfiguration, ()> {
consistency: env::var("INFINISPAN_COUNTERS_CONSISTENCY").ok(),
},
)),
_ => Ok(StorageConfiguration::InMemory),
_ => Ok(StorageConfiguration::InMemory(
InMemoryStorageConfiguration { cache_size: None },
)),
}
}

fn guess_cache_size() -> Option<u64> {
let sys = System::new_with_specifics(RefreshKind::new().with_memory());
let free_mem = sys.available_memory();
let memory = free_mem as f64 * 0.7;
let size = (memory
/ (std::mem::size_of::<Counter>() + 16/* size_of::<AtomicExpiringValue>() */) as f64)
as u64;
warn!(
"No cache size provided, aiming at 70% of {}MB, i.e. {size} entries",
free_mem / 1024 / 1024
);
Some(size)
}

fn env_option_is_enabled(env_name: &str) -> bool {
match env::var(env_name) {
Ok(value) => value == "1",
Expand Down
2 changes: 2 additions & 0 deletions limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ infinispan_storage = ["infinispan", "reqwest", "base64"]
lenient_conditions = []

[dependencies]
moka = "0.11.2"
getrandom = { version = "0.2", features = ["js"] }
ttl_cache = "0.5"
serde = { version = "1", features = ["derive"] }
postcard = { version = "1.0.4", features = ["use-std"] }
Expand Down
2 changes: 1 addition & 1 deletion limitador/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ fn bench_in_mem(c: &mut Criterion) {
#[cfg(feature = "disk_storage")]
fn bench_disk(c: &mut Criterion) {
let mut group = c.benchmark_group("Disk");
for (index, scenario) in TEST_SCENARIOS.iter().enumerate() {
for scenario in TEST_SCENARIOS.iter() {
group.bench_with_input(
BenchmarkId::new("is_rate_limited", scenario),
scenario,
Expand Down
4 changes: 4 additions & 0 deletions limitador/src/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ impl Counter {
self.expires_in = Some(duration)
}

pub fn is_qualified(&self) -> bool {
!self.set_variables.is_empty()
}

#[cfg(feature = "disk_storage")]
pub(crate) fn variables_for_key(&self) -> Vec<(&str, &str)> {
let mut variables = Vec::with_capacity(self.set_variables.len());
Expand Down
37 changes: 17 additions & 20 deletions limitador/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
//! Limitador. Storing the limits in Redis is slower, but they can be shared
//! between instances.
//!
//! By default, the rate limiter is configured to store the counters in memory:
//! By default, the rate limiter is configured to store the counters in memory.
//! It'll store only a limited amount of "qualified counters", specified as a
//! `u64` value in the constructor.
//! ```
//! use limitador::RateLimiter;
//! let rate_limiter = RateLimiter::default();
//! let rate_limiter = RateLimiter::new(1000);
//! ```
//!
//! To use Redis:
Expand Down Expand Up @@ -72,7 +74,7 @@
//! vec!["req.method == 'GET'"],
//! vec!["user_id"],
//! );
//! let mut rate_limiter = RateLimiter::default();
//! let mut rate_limiter = RateLimiter::new(1000);
//!
//! // Add a limit
//! rate_limiter.add_limit(limit.clone());
Expand All @@ -95,7 +97,7 @@
//! use limitador::limit::Limit;
//! use std::collections::HashMap;
//!
//! let mut rate_limiter = RateLimiter::default();
//! let mut rate_limiter = RateLimiter::new(1000);
//!
//! let limit = Limit::new(
//! "my_namespace",
Expand Down Expand Up @@ -236,9 +238,16 @@ impl From<CheckResult> for bool {
}

impl RateLimiterBuilder {
pub fn new() -> Self {
pub fn with_storage(storage: Storage) -> Self {
Self {
storage: Storage::new(),
storage,
prometheus_limit_name_labels_enabled: false,
}
}

pub fn new(cache_size: u64) -> Self {
Self {
storage: Storage::new(cache_size),
prometheus_limit_name_labels_enabled: false,
}
}
Expand Down Expand Up @@ -267,12 +276,6 @@ impl RateLimiterBuilder {
}
}

impl Default for RateLimiterBuilder {
fn default() -> Self {
Self::new()
}
}

pub struct AsyncRateLimiterBuilder {
storage: AsyncStorage,
prometheus_limit_name_labels_enabled: bool,
Expand Down Expand Up @@ -306,9 +309,9 @@ impl AsyncRateLimiterBuilder {
}

impl RateLimiter {
pub fn new() -> Self {
pub fn new(cache_size: u64) -> Self {
Self {
storage: Storage::new(),
storage: Storage::new(cache_size),
prometheus_metrics: PrometheusMetrics::new(),
}
}
Expand Down Expand Up @@ -492,12 +495,6 @@ impl RateLimiter {
}
}

impl Default for RateLimiter {
fn default() -> Self {
Self::new()
}
}

// TODO: the code of this implementation is almost identical to the blocking
// one. The only exception is that the functions defined are "async" and all the
// calls to the storage need to include ".await". We'll need to think about how
Expand Down
7 changes: 7 additions & 0 deletions limitador/src/storage/atomic_expiring_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,11 @@ mod tests {
});
assert!([2i64, 3i64].contains(&atomic_expiring_value.value.load(Ordering::SeqCst)));
}

#[test]
fn size_of_struct() {
// This is ugly, but we don't have access to `AtomicExpiringValue` in the server,
// so this is hardcoded in main.rs
assert_eq!(16, std::mem::size_of::<AtomicExpiringValue>());
}
}
Loading

0 comments on commit 317037d

Please sign in to comment.