Skip to content

Commit 3312f1f

Browse files
authored
Fixed unacknowledged messages. (#82)
1 parent e664b84 commit 3312f1f

File tree

1 file changed

+34
-1
lines changed

1 file changed

+34
-1
lines changed

Diff for: taskiq_redis/redis_broker.py

+34-1
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,10 @@ def __init__(
164164
consumer_name: Optional[str] = None,
165165
consumer_id: str = "$",
166166
mkstream: bool = True,
167-
xread_block: int = 10000,
167+
xread_block: int = 2000,
168168
maxlen: Optional[int] = None,
169+
idle_timeout: int = 600000, # 10 minutes
170+
unacknowledged_batch_size: int = 100,
169171
additional_streams: Optional[Dict[str, str]] = None,
170172
**connection_kwargs: Any,
171173
) -> None:
@@ -189,6 +191,8 @@ def __init__(
189191
trims (the old values of) the stream each time a new element is added
190192
:param additional_streams: additional streams to read from.
191193
Each key is a stream name, value is a consumer id.
194+
:param redeliver_timeout: time in ms to wait before redelivering a message.
195+
:param unacknowledged_batch_size: number of unacknowledged messages to fetch.
192196
"""
193197
super().__init__(
194198
url,
@@ -205,6 +209,8 @@ def __init__(
205209
self.block = xread_block
206210
self.maxlen = maxlen
207211
self.additional_streams = additional_streams or {}
212+
self.idle_timeout = idle_timeout
213+
self.unacknowledged_batch_size = unacknowledged_batch_size
208214

209215
async def _declare_consumer_group(self) -> None:
210216
"""
@@ -260,6 +266,7 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
260266
"""Listen to incoming messages."""
261267
async with Redis(connection_pool=self.connection_pool) as redis_conn:
262268
while True:
269+
logger.debug("Starting fetching new messages")
263270
fetched = await redis_conn.xreadgroup(
264271
self.consumer_group_name,
265272
self.consumer_name,
@@ -277,3 +284,29 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
277284
data=msg[b"data"],
278285
ack=self._ack_generator(msg_id),
279286
)
287+
logger.debug("Starting fetching unacknowledged messages")
288+
for stream in [self.queue_name, *self.additional_streams.keys()]:
289+
lock = redis_conn.lock(
290+
f"autoclaim:{self.consumer_group_name}:{stream}",
291+
)
292+
if await lock.locked():
293+
continue
294+
async with lock:
295+
pending = await redis_conn.xautoclaim(
296+
name=stream,
297+
groupname=self.consumer_group_name,
298+
consumername=self.consumer_name,
299+
min_idle_time=self.idle_timeout,
300+
count=self.unacknowledged_batch_size,
301+
)
302+
logger.debug(
303+
"Found %d pending messages in stream %s",
304+
len(pending),
305+
stream,
306+
)
307+
for msg_id, msg in pending[1]:
308+
logger.debug("Received message: %s", msg)
309+
yield AckableMessage(
310+
data=msg[b"data"],
311+
ack=self._ack_generator(msg_id),
312+
)

0 commit comments

Comments
 (0)