Skip to content

Commit 165b60e

Browse files
committed
implement limited WorkerPool size
1 parent 116637a commit 165b60e

File tree

2 files changed

+25
-4
lines changed

2 files changed

+25
-4
lines changed

Diff for: execnet/gateway_base.py

+25-3
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,29 @@ def reraise(cls, val, tb):
6464
# def log_extra(*msg):
6565
# f.write(" ".join([str(x) for x in msg]) + "\n")
6666

67+
if sys.version_info >= (3, 7):
68+
from contextlib import nullcontext
69+
else:
70+
class nullcontext(object):
71+
"""Context manager that does no additional processing.
72+
Used as a stand-in for a normal context manager, when a particular
73+
block of code is only sometimes used with a normal context manager:
74+
cm = optional_cm if condition else nullcontext()
75+
with cm:
76+
# Perform operation, using optional_cm if condition is True
77+
"""
78+
79+
def __init__(self, enter_result=None):
80+
self.enter_result = enter_result
81+
82+
def __enter__(self):
83+
return self.enter_result
6784

68-
class EmptySemaphore:
85+
def __exit__(self, *excinfo):
86+
pass
87+
88+
89+
class EmptySemaphore(nullcontext):
6990
acquire = release = lambda self: None
7091

7192

@@ -238,13 +259,14 @@ class WorkerPool(object):
238259
when the pool received a trigger_shutdown().
239260
"""
240261

241-
def __init__(self, execmodel, hasprimary=False):
262+
def __init__(self, execmodel, hasprimary=False, size=None):
242263
""" by default allow unlimited number of spawns. """
243264
self.execmodel = execmodel
244265
self._running_lock = self.execmodel.Lock()
245266
self._running = set()
246267
self._shuttingdown = False
247268
self._waitall_events = []
269+
self._semaphore = self.execmodel.Semaphore(size)
248270
if hasprimary:
249271
if self.execmodel.backend != "thread":
250272
raise ValueError("hasprimary=True requires thread model")
@@ -307,7 +329,7 @@ def spawn(self, func, *args, **kwargs):
307329
of the given func(*args, **kwargs).
308330
"""
309331
reply = Reply((func, args, kwargs), self.execmodel)
310-
with self._running_lock:
332+
with self._semaphore, self._running_lock:
311333
if self._shuttingdown:
312334
raise ValueError("pool is shutting down")
313335
self._running.add(reply)

Diff for: testing/test_threadpool.py

-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ def test_waitfinish_on_reply(pool):
7474
pytest.raises(ZeroDivisionError, reply.get)
7575

7676

77-
@pytest.mark.xfail(reason="WorkerPool does not implement limited size")
7877
def test_limited_size(execmodel):
7978
pool = WorkerPool(execmodel, size=1)
8079
q = execmodel.queue.Queue()

0 commit comments

Comments
 (0)