Skip to content

Commit 41da01e

Browse files
authored
ListShceduleSource now always try to fetch tasks from the past. (#83)
1 parent 3312f1f commit 41da01e

File tree

3 files changed

+25
-5
lines changed

3 files changed

+25
-5
lines changed

Diff for: taskiq_redis/list_schedule_source.py

+12-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,17 @@ def __init__(
2525
skip_past_schedules: bool = False,
2626
**connection_kwargs: Any,
2727
) -> None:
28+
"""
29+
Create a new schedule source.
30+
31+
:param url: Redis URL
32+
:param prefix: Prefix for all the keys
33+
:param max_connection_pool_size: Maximum size of the connection pool
34+
:param serializer: Serializer to use for the schedules
35+
:param buffer_size: Buffer size for getting schedules
36+
:param skip_past_schedules: Skip schedules that are in the past.
37+
:param connection_kwargs: Additional connection kwargs
38+
"""
2839
super().__init__()
2940
self._prefix = prefix
3041
self._buffer_size = buffer_size
@@ -179,7 +190,7 @@ async def get_schedules(self) -> List["ScheduledTask"]:
179190
current_time = datetime.datetime.now(datetime.timezone.utc)
180191
timed: list[bytes] = []
181192
# Only during first run, we need to get previous time schedules
182-
if self._is_first_run and not self._skip_past_schedules:
193+
if not self._skip_past_schedules:
183194
timed = await self._get_previous_time_schedules()
184195
self._is_first_run = False
185196
async with Redis(connection_pool=self._connection_pool) as redis:

Diff for: taskiq_redis/schedule_source.py

+7
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import sys
2+
import warnings
23
from contextlib import asynccontextmanager
34
from typing import TYPE_CHECKING, Any, AsyncIterator, List, Optional, Tuple
45

@@ -53,6 +54,12 @@ def __init__(
5354
serializer: Optional[TaskiqSerializer] = None,
5455
**connection_kwargs: Any,
5556
) -> None:
57+
warnings.warn(
58+
"RedisScheduleSource is deprecated. "
59+
"Please switch to ListRedisScheduleSource",
60+
DeprecationWarning,
61+
stacklevel=2,
62+
)
5663
self.prefix = prefix
5764
self.connection_pool: _BlockingConnectionPool = BlockingConnectionPool.from_url(
5865
url=url,

Diff for: tests/test_list_schedule_source.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,14 @@ async def test_schedule_from_past(redis_url: str) -> None:
4444
await source.add_schedule(schedule)
4545
# When running for the first time, the scheduler will get all the
4646
# schedules that are in the past.
47-
scehdules = await source.get_schedules()
48-
assert scehdules == [schedule]
47+
schedules = await source.get_schedules()
48+
assert schedules == [schedule]
49+
for schedule in schedules:
50+
await source.post_send(schedule)
4951
# After getting the schedules for the second time,
5052
# all the schedules in the past are ignored.
51-
scehdules = await source.get_schedules()
52-
assert scehdules == []
53+
schedules = await source.get_schedules()
54+
assert schedules == []
5355

5456

5557
@pytest.mark.anyio

0 commit comments

Comments
 (0)