@@ -165,6 +165,7 @@ def __init__(
165
165
consumer_id : str = "$" ,
166
166
mkstream : bool = True ,
167
167
xread_block : int = 10000 ,
168
+ maxlen : Optional [int ] = None ,
168
169
additional_streams : Optional [Dict [str , str ]] = None ,
169
170
** connection_kwargs : Any ,
170
171
) -> None :
@@ -184,6 +185,8 @@ def __init__(
184
185
:param mkstream: create stream if it does not exist.
185
186
:param xread_block: block time in ms for xreadgroup.
186
187
Better to set it to a bigger value, to avoid unnecessary calls.
188
+ :param maxlen: sets the maximum length of the stream
189
+ trims (the old values of) the stream each time a new element is added
187
190
:param additional_streams: additional streams to read from.
188
191
Each key is a stream name, value is a consumer id.
189
192
"""
@@ -200,6 +203,7 @@ def __init__(
200
203
self .consumer_id = consumer_id
201
204
self .mkstream = mkstream
202
205
self .block = xread_block
206
+ self .maxlen = maxlen
203
207
self .additional_streams = additional_streams or {}
204
208
205
209
async def _declare_consumer_group (self ) -> None :
@@ -235,7 +239,11 @@ async def kick(self, message: BrokerMessage) -> None:
235
239
:param message: message to append.
236
240
"""
237
241
async with Redis (connection_pool = self .connection_pool ) as redis_conn :
238
- await redis_conn .xadd (self .queue_name , {b"data" : message .message })
242
+ await redis_conn .xadd (
243
+ self .queue_name ,
244
+ {b"data" : message .message },
245
+ maxlen = self .maxlen ,
246
+ )
239
247
240
248
def _ack_generator (self , id : str ) -> Callable [[], Awaitable [None ]]:
241
249
async def _ack () -> None :
0 commit comments