Skip to content

Commit

Permalink
[Flink 32701] [cep] Fix CEP Operator Memory Leak Issue (apache#24084)
Browse files Browse the repository at this point in the history
  • Loading branch information
pdream17 authored Mar 21, 2024
1 parent 3f4a809 commit a9cde49
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ void advanceTime(long timestamp) throws Exception {
iterator.remove();
}
}

// memory leak resolution
if (eventsCount.isEmpty()) {
eventsCount.clear();
}
}

EventId registerEvent(V value, long timestamp) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,11 @@ public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exceptio

// STEP 4
updateNFA(nfaState);

// In order to remove dangling partial matches.
if (nfaState.getPartialMatches().size() == 1 && nfaState.getCompletedMatches().isEmpty()) {
computationStates.clear();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ public void testComplexBranchingAfterZeroOrMore() throws Exception {
nfaTestHarness.consumeRecords(inputEvents);

assertEquals(58, sharedBuffer.getStateReads());
assertEquals(33, sharedBuffer.getStateWrites());
assertEquals(91, sharedBuffer.getStateAccesses());
assertEquals(41, sharedBuffer.getStateWrites());
assertEquals(99, sharedBuffer.getStateAccesses());
}

@Test
Expand Down Expand Up @@ -149,7 +149,7 @@ public boolean filter(Event value, Context<Event> ctx)
nfaTestHarness.consumeRecords(inputEvents);

assertEquals(90, sharedBuffer.getStateReads());
assertEquals(31, sharedBuffer.getStateWrites());
assertEquals(121, sharedBuffer.getStateAccesses());
assertEquals(34, sharedBuffer.getStateWrites());
assertEquals(124, sharedBuffer.getStateAccesses());
}
}

0 comments on commit a9cde49

Please sign in to comment.