diff --git a/faust/streams.py b/faust/streams.py index 3cdd8adbd..2e6fa2769 100644 --- a/faust/streams.py +++ b/faust/streams.py @@ -1202,6 +1202,7 @@ async def _py_aiter(self) -> AsyncIterator[T_co]: sensor_state = None # reduce using processors + processor = None try: for processor in processors: with trace(f"processor-{_shortlabel(processor)}"): @@ -1213,8 +1214,12 @@ async def _py_aiter(self) -> AsyncIterator[T_co]: value = skipped_value except StopAsyncIteration: raise - except Exception: + except Exception as exc: value = skipped_value + self.log.exception( + f"Error in processor {_shortlabel(processor)}: {exc}" + ) + try: if value is not skipped_value: self.events_total += 1 diff --git a/faust/transport/consumer.py b/faust/transport/consumer.py index e8ae4a524..c000d5de4 100644 --- a/faust/transport/consumer.py +++ b/faust/transport/consumer.py @@ -749,10 +749,17 @@ async def getmany(self, timeout: float) -> AsyncIterator[Tuple[TP, Message]]: or tp in active_partitions or tp in self._buffered_partitions ): - highwater_mark = self.highwater(tp) - self.app.monitor.track_tp_end_offset(tp, highwater_mark) - # convert timestamp to seconds from int milliseconds. - yield tp, to_message(tp, record) + try: + highwater_mark = self.highwater(tp) + self.app.monitor.track_tp_end_offset(tp, highwater_mark) + # convert timestamp to seconds from int milliseconds. + yield tp, to_message(tp, record) + except StopAsyncIteration: + raise + except Exception as exc: + self.log.exception( + "Error while processing message", exc_info=exc + ) else: self.log.dev( "getmany called while flow not active. Seek back to committed offsets." @@ -1118,10 +1125,7 @@ def _new_offset(self, tp: TP) -> Optional[int]: # the return value will be None (the same as 31) if self._committed_offset[tp]: if min(acked) - self._committed_offset[tp] > 1: - new_acked = list(range(self._committed_offset[tp] + 1, min(acked))) - self.log.dev(f"insert {new_acked=}") - # return None - acked = new_acked + acked + return None # Note: acked is always kept sorted. # find first list of consecutive numbers