Skip to content

Commit

Permalink
Merge pull request #334 from Kuadrant/ttl_expiry
Browse files Browse the repository at this point in the history
Ttl expiry
  • Loading branch information
alexsnaps authored May 22, 2024
2 parents dfc5093 + 709fca1 commit 0a70dff
Show file tree
Hide file tree
Showing 17 changed files with 115 additions and 135 deletions.
2 changes: 1 addition & 1 deletion limitador-server/src/envoy_rls/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ pub fn to_response_header(
let mut all_limits_text = String::with_capacity(20 * counters.len());
counters.iter_mut().for_each(|counter| {
all_limits_text.push_str(
format!(", {};w={}", counter.max_value(), counter.seconds()).as_str(),
format!(", {};w={}", counter.max_value(), counter.window().as_secs()).as_str(),
);
if let Some(name) = counter.limit().name() {
all_limits_text
Expand Down
2 changes: 1 addition & 1 deletion limitador-server/src/http_api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ pub fn add_response_header(
let mut all_limits_text = String::with_capacity(20 * counters.len());
counters.iter_mut().for_each(|counter| {
all_limits_text.push_str(
format!(", {};w={}", counter.max_value(), counter.seconds()).as_str(),
format!(", {};w={}", counter.max_value(), counter.window().as_secs()).as_str(),
);
if let Some(name) = counter.limit().name() {
all_limits_text
Expand Down
5 changes: 3 additions & 2 deletions limitador/src/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl Counter {
}
}

#[cfg(any(feature = "redis_storage", feature = "disk_storage"))]
pub(crate) fn key(&self) -> Self {
Self {
limit: self.limit.clone(),
Expand Down Expand Up @@ -68,8 +69,8 @@ impl Counter {
false
}

pub fn seconds(&self) -> u64 {
self.limit.seconds()
pub fn window(&self) -> Duration {
Duration::from_secs(self.limit.seconds())
}

pub fn namespace(&self) -> &Namespace {
Expand Down
35 changes: 17 additions & 18 deletions limitador/src/storage/atomic_expiring_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ impl AtomicExpiringValue {
self.value_at(SystemTime::now())
}

#[allow(dead_code)]
pub fn add_and_set_expiry(&self, delta: u64, expire_at: SystemTime) -> u64 {
self.expiry.update(expire_at);
#[cfg(feature = "redis_storage")]
pub fn add_and_set_expiry(&self, delta: u64, expiry: SystemTime) -> u64 {
self.expiry.update(expiry);
self.value.fetch_add(delta, Ordering::SeqCst) + delta
}

pub fn update(&self, delta: u64, ttl: u64, when: SystemTime) -> u64 {
pub fn update(&self, delta: u64, ttl: Duration, when: SystemTime) -> u64 {
if self.expiry.update_if_expired(ttl, when) {
self.value.store(delta, Ordering::SeqCst);
return delta;
Expand All @@ -42,7 +42,7 @@ impl AtomicExpiringValue {
}

pub fn ttl(&self) -> Duration {
self.expiry.duration()
self.expiry.ttl()
}
}

Expand All @@ -59,18 +59,13 @@ impl AtomicExpiryTime {
}
}

#[allow(dead_code)]
pub fn from_now(ttl: Duration) -> Self {
Self::new(SystemTime::now() + ttl)
}

fn since_epoch(when: SystemTime) -> u64 {
when.duration_since(UNIX_EPOCH)
.expect("SystemTime before UNIX EPOCH!")
.as_micros() as u64
}

pub fn duration(&self) -> Duration {
pub fn ttl(&self) -> Duration {
let expiry =
SystemTime::UNIX_EPOCH + Duration::from_micros(self.expiry.load(Ordering::SeqCst));
expiry
Expand All @@ -83,14 +78,14 @@ impl AtomicExpiryTime {
self.expiry.load(Ordering::SeqCst) <= when
}

#[allow(dead_code)]
#[cfg(feature = "redis_storage")]
pub fn update(&self, expiry: SystemTime) {
self.expiry
.store(Self::since_epoch(expiry), Ordering::SeqCst);
}

pub fn update_if_expired(&self, ttl: u64, when: SystemTime) -> bool {
let ttl_micros = ttl * 1_000_000;
pub fn update_if_expired(&self, ttl: Duration, when: SystemTime) -> bool {
let ttl_micros = u64::try_from(ttl.as_micros()).expect("Wow! The future is here!");
let when_micros = Self::since_epoch(when);
let expiry = self.expiry.load(Ordering::SeqCst);
if expiry <= when_micros {
Expand Down Expand Up @@ -208,7 +203,7 @@ mod tests {
fn updates_when_valid() {
let now = SystemTime::now();
let val = AtomicExpiringValue::new(42, now + Duration::from_secs(1));
val.update(3, 10, now);
val.update(3, Duration::from_secs(10), now);
assert_eq!(val.value_at(now - Duration::from_secs(1)), 45);
}

Expand All @@ -217,7 +212,7 @@ mod tests {
let now = SystemTime::now();
let val = AtomicExpiringValue::new(42, now);
assert_eq!(val.ttl(), Duration::ZERO);
val.update(3, 10, now);
val.update(3, Duration::from_secs(10), now);
assert_eq!(val.value_at(now - Duration::from_secs(1)), 3);
}

Expand All @@ -228,10 +223,14 @@ mod tests {

thread::scope(|s| {
s.spawn(|| {
atomic_expiring_value.update(1, 1, now);
atomic_expiring_value.update(1, Duration::from_secs(1), now);
});
s.spawn(|| {
atomic_expiring_value.update(2, 1, now + Duration::from_secs(11));
atomic_expiring_value.update(
2,
Duration::from_secs(1),
now + Duration::from_secs(11),
);
});
});
assert!([2u64, 3u64].contains(&atomic_expiring_value.value.load(Ordering::SeqCst)));
Expand Down
12 changes: 8 additions & 4 deletions limitador/src/storage/disk/expiring_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ impl ExpiringValue {
}

#[must_use]
pub fn update(self, delta: u64, ttl: u64, now: SystemTime) -> Self {
pub fn update(self, delta: u64, ttl: Duration, now: SystemTime) -> Self {
let expiry = if self.expiry <= now {
now + Duration::from_secs(ttl)
now + ttl
} else {
self.expiry
};
Expand Down Expand Up @@ -132,7 +132,11 @@ mod tests {
#[test]
fn updates_when_valid() {
let now = SystemTime::now();
let val = ExpiringValue::new(42, now + Duration::from_secs(1)).update(3, 10, now);
let val = ExpiringValue::new(42, now + Duration::from_secs(1)).update(
3,
Duration::from_secs(10),
now,
);
assert_eq!(val.value_at(now - Duration::from_secs(1)), 45);
}

Expand All @@ -141,7 +145,7 @@ mod tests {
let now = SystemTime::now();
let val = ExpiringValue::new(42, now);
assert_eq!(val.ttl(), Duration::ZERO);
let val = val.update(3, 10, now);
let val = val.update(3, Duration::from_secs(10), now);
assert_eq!(val.value_at(now - Duration::from_secs(1)), 3);
}

Expand Down
2 changes: 1 addition & 1 deletion limitador/src/storage/disk/rocksdb_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl RocksDbStorage {
let _entered = span.enter();
self.db
.merge(key, <ExpiringValue as Into<Vec<u8>>>::into(expiring_value))?;
return Ok(value.update(delta, counter.seconds(), now));
return Ok(value.update(delta, counter.window(), now));
}
Ok(value)
}
Expand Down
13 changes: 5 additions & 8 deletions limitador/src/storage/distributed/cr_counter_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl<A: Ord> CrCounterValue<A> {
ourselves: actor,
value: Default::default(),
others: RwLock::default(),
expiry: AtomicExpiryTime::from_now(time_window),
expiry: AtomicExpiryTime::new(SystemTime::now() + time_window),
}
}

Expand All @@ -43,7 +43,7 @@ impl<A: Ord> CrCounterValue<A> {
}

pub fn inc_at(&self, increment: u64, time_window: Duration, when: SystemTime) {
if self.expiry.update_if_expired(time_window.as_secs(), when) {
if self.expiry.update_if_expired(time_window, when) {
self.value.store(increment, Ordering::SeqCst);
} else {
self.value.fetch_add(increment, Ordering::SeqCst);
Expand All @@ -59,10 +59,7 @@ impl<A: Ord> CrCounterValue<A> {
self.inc_at(increment, time_window, when);
} else {
let mut guard = self.others.write().unwrap();
if self
.expiry
.update_if_expired(time_window.as_micros() as u64, when)
{
if self.expiry.update_if_expired(time_window, when) {
guard.insert(actor, increment);
} else {
*guard.entry(actor).or_insert(0) += increment;
Expand Down Expand Up @@ -109,7 +106,7 @@ impl<A: Ord> CrCounterValue<A> {
}

pub fn ttl(&self) -> Duration {
self.expiry.duration()
self.expiry.ttl()
}

pub fn expiry(&self) -> SystemTime {
Expand Down Expand Up @@ -282,6 +279,6 @@ mod tests {
a.inc(3, later);
b.inc(2, later);
a.merge(b);
assert!(a.expiry.duration() < sooner);
assert!(a.expiry.ttl() < sooner);
}
}
17 changes: 8 additions & 9 deletions limitador/src/storage/distributed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl CounterStorage for CrInMemoryStorage {
None => self.qualified_counters.get_with(counter.clone(), || {
Arc::new(CrCounterValue::new(
self.identifier.clone(),
Duration::from_secs(counter.seconds()),
counter.window(),
))
}),
Some(counter) => counter,
Expand All @@ -82,16 +82,16 @@ impl CounterStorage for CrInMemoryStorage {
match limits_by_namespace.entry(counter.limit().namespace().clone()) {
Entry::Vacant(v) => {
let mut limits = HashMap::new();
let duration = Duration::from_secs(counter.seconds());
let counter_val = CrCounterValue::new(self.identifier.clone(), duration);
let counter_val =
CrCounterValue::new(self.identifier.clone(), counter.window());
self.increment_counter(counter.clone(), &counter_val, delta, now);
limits.insert(counter.limit().clone(), counter_val);
v.insert(limits);
}
Entry::Occupied(mut o) => match o.get_mut().entry(counter.limit().clone()) {
Entry::Vacant(v) => {
let duration = Duration::from_secs(counter.seconds());
let counter_value = CrCounterValue::new(self.identifier.clone(), duration);
let counter_value =
CrCounterValue::new(self.identifier.clone(), counter.window());
self.increment_counter(counter.clone(), &counter_value, delta, now);
v.insert(counter_value);
}
Expand Down Expand Up @@ -158,7 +158,7 @@ impl CounterStorage for CrInMemoryStorage {
None => self.qualified_counters.get_with(counter.clone(), || {
Arc::new(CrCounterValue::new(
self.identifier.clone(),
Duration::from_secs(counter.seconds()),
counter.window(),
))
}),
Some(counter) => counter,
Expand Down Expand Up @@ -338,8 +338,7 @@ impl CrInMemoryStorage {
delta: u64,
when: SystemTime,
) {
counter.inc_at(delta, Duration::from_secs(key.seconds()), when);

counter.inc_at(delta, key.window(), when);
let counter = counter.clone();
let (expiry, values) = counter.into_inner();
let key: CounterKey = key.into();
Expand All @@ -366,7 +365,7 @@ impl From<Counter> for CounterKey {
fn from(value: Counter) -> Self {
Self {
namespace: value.namespace().clone(),
seconds: value.seconds(),
seconds: value.window().as_secs(),
variables: value.limit().variables(),
conditions: value.limit().conditions(),
vars: value.set_variables().clone(),
Expand Down
32 changes: 10 additions & 22 deletions limitador/src/storage/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,36 +56,27 @@ impl CounterStorage for InMemoryStorage {
if counter.is_qualified() {
let value = match self.qualified_counters.get(counter) {
None => self.qualified_counters.get_with(counter.clone(), || {
Arc::new(AtomicExpiringValue::new(
0,
now + Duration::from_secs(counter.seconds()),
))
Arc::new(AtomicExpiringValue::new(0, now + counter.window()))
}),
Some(counter) => counter,
};
value.update(delta, counter.seconds(), now);
value.update(delta, counter.window(), now);
} else {
match limits_by_namespace.entry(counter.limit().namespace().clone()) {
Entry::Vacant(v) => {
let mut limits = HashMap::new();
limits.insert(
counter.limit().clone(),
AtomicExpiringValue::new(
delta,
now + Duration::from_secs(counter.seconds()),
),
AtomicExpiringValue::new(delta, now + counter.window()),
);
v.insert(limits);
}
Entry::Occupied(mut o) => match o.get_mut().entry(counter.limit().clone()) {
Entry::Vacant(v) => {
v.insert(AtomicExpiringValue::new(
delta,
now + Duration::from_secs(counter.seconds()),
));
v.insert(AtomicExpiringValue::new(delta, now + counter.window()));
}
Entry::Occupied(o) => {
o.get().update(delta, counter.seconds(), now);
o.get().update(delta, counter.window(), now);
}
},
}
Expand All @@ -102,8 +93,8 @@ impl CounterStorage for InMemoryStorage {
) -> Result<Authorization, StorageErr> {
let limits_by_namespace = self.limits_for_namespace.read().unwrap();
let mut first_limited = None;
let mut counter_values_to_update: Vec<(&AtomicExpiringValue, u64)> = Vec::new();
let mut qualified_counter_values_to_updated: Vec<(Arc<AtomicExpiringValue>, u64)> =
let mut counter_values_to_update: Vec<(&AtomicExpiringValue, Duration)> = Vec::new();
let mut qualified_counter_values_to_updated: Vec<(Arc<AtomicExpiringValue>, Duration)> =
Vec::new();
let now = SystemTime::now();

Expand Down Expand Up @@ -138,17 +129,14 @@ impl CounterStorage for InMemoryStorage {
return Ok(limited);
}
}
counter_values_to_update.push((atomic_expiring_value, counter.seconds()));
counter_values_to_update.push((atomic_expiring_value, counter.window()));
}

// Process qualified counters
for counter in counters.iter_mut().filter(|c| c.is_qualified()) {
let value = match self.qualified_counters.get(counter) {
None => self.qualified_counters.get_with(counter.clone(), || {
Arc::new(AtomicExpiringValue::new(
0,
now + Duration::from_secs(counter.seconds()),
))
Arc::new(AtomicExpiringValue::new(0, now + counter.window()))
}),
Some(counter) => counter,
};
Expand All @@ -159,7 +147,7 @@ impl CounterStorage for InMemoryStorage {
}
}

qualified_counter_values_to_updated.push((value, counter.seconds()));
qualified_counter_values_to_updated.push((value, counter.window()));
}

if let Some(limited) = first_limited {
Expand Down
2 changes: 1 addition & 1 deletion limitador/src/storage/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub mod bin {

CounterKey {
ns: counter.namespace().as_ref(),
seconds: counter.seconds(),
seconds: counter.window().as_secs(),
conditions,
variables: counter.variables_for_key(),
}
Expand Down
Loading

0 comments on commit 0a70dff

Please sign in to comment.