Skip to content

1184.feature - Add 'singlecollect' distribution mode #1184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/1180.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added new 'singlecollect' distribution mode that only collects tests once on a single worker and skips collection verification on other workers. This can significantly improve startup time for test suites with expensive collection.
3 changes: 3 additions & 0 deletions src/xdist/dsession.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from xdist.scheduler import LoadScheduling
from xdist.scheduler import LoadScopeScheduling
from xdist.scheduler import Scheduling
from xdist.scheduler import SingleCollectScheduling
from xdist.scheduler import WorkStealingScheduling
from xdist.workermanage import NodeManager
from xdist.workermanage import WorkerController
Expand Down Expand Up @@ -123,6 +124,8 @@ def pytest_xdist_make_scheduler(
return LoadGroupScheduling(config, log)
if dist == "worksteal":
return WorkStealingScheduling(config, log)
if dist == "singlecollect":
return SingleCollectScheduling(config, log)
return None

@pytest.hookimpl
Expand Down
2 changes: 2 additions & 0 deletions src/xdist/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def pytest_addoption(parser: pytest.Parser) -> None:
"loadfile",
"loadgroup",
"worksteal",
"singlecollect",
"no",
],
dest="dist",
Expand All @@ -124,6 +125,7 @@ def pytest_addoption(parser: pytest.Parser) -> None:
"loadgroup: Like 'load', but sends tests marked with 'xdist_group' to the same worker.\n\n"
"worksteal: Split the test suite between available environments,"
" then re-balance when any worker runs out of tests.\n\n"
"singlecollect: Only collect tests once on a single worker and skip collection verification.\n\n"
"(default) no: Run tests inprocess, don't distribute."
),
)
Expand Down
3 changes: 3 additions & 0 deletions src/xdist/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@
from xdist.scheduler.loadgroup import LoadGroupScheduling as LoadGroupScheduling
from xdist.scheduler.loadscope import LoadScopeScheduling as LoadScopeScheduling
from xdist.scheduler.protocol import Scheduling as Scheduling
from xdist.scheduler.singlecollect import (
SingleCollectScheduling as SingleCollectScheduling,
)
from xdist.scheduler.worksteal import WorkStealingScheduling as WorkStealingScheduling
213 changes: 213 additions & 0 deletions src/xdist/scheduler/singlecollect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
from __future__ import annotations

from collections.abc import Sequence
from itertools import cycle

import pytest

from xdist.remote import Producer
from xdist.workermanage import parse_tx_spec_config
from xdist.workermanage import WorkerController


class SingleCollectScheduling:
"""Implement scheduling with a single test collection phase.

This differs from LoadScheduling by:
1. Only collecting tests on the first node
2. Skipping collection on other nodes
3. Not checking for collection equality

This can significantly improve startup time by avoiding redundant collection
and collection verification across multiple worker processes.
"""

def __init__(self, config: pytest.Config, log: Producer | None = None) -> None:
self.numnodes = len(parse_tx_spec_config(config))
self.node2pending: dict[WorkerController, list[int]] = {}
self.pending: list[int] = []
self.collection: list[str] | None = None
self.first_node: WorkerController | None = None
if log is None:
self.log = Producer("singlecollect")
else:
self.log = log.singlecollect
self.config = config
self.maxschedchunk = self.config.getoption("maxschedchunk")
self.collection_done = False

@property
def nodes(self) -> list[WorkerController]:
"""A list of all nodes in the scheduler."""
return list(self.node2pending.keys())

@property
def collection_is_completed(self) -> bool:
"""Return True once we have collected tests from the first node."""
return self.collection_done

@property
def tests_finished(self) -> bool:
"""Return True if all tests have been executed by the nodes."""
if not self.collection_is_completed:
return False
if self.pending:
return False
for pending in self.node2pending.values():
if len(pending) >= 2:
return False
return True

@property
def has_pending(self) -> bool:
"""Return True if there are pending test items."""
if self.pending:
return True
for pending in self.node2pending.values():
if pending:
return True
return False

def add_node(self, node: WorkerController) -> None:
"""Add a new node to the scheduler."""
assert node not in self.node2pending
self.node2pending[node] = []

# Remember the first node as our collector
if self.first_node is None:
self.first_node = node
self.log(f"Using {node.gateway.id} as collection node")

def add_node_collection(
self, node: WorkerController, collection: Sequence[str]
) -> None:
"""Only use collection from the first node."""
# We only care about collection from the first node
if node == self.first_node:
self.log(f"Received collection from first node {node.gateway.id}")
self.collection = list(collection)
self.collection_done = True
else:
# Skip collection verification for other nodes
self.log(f"Ignoring collection from node {node.gateway.id}")

def mark_test_complete(
self, node: WorkerController, item_index: int | str, duration: float = 0
) -> None:
"""Mark test item as completed by node."""
self.node2pending[node].remove(
int(item_index) if isinstance(item_index, str) else item_index
)
self.check_schedule(node, duration=duration)

def mark_test_pending(self, item: str) -> None:
assert self.collection is not None
self.pending.insert(
0,
self.collection.index(item),
)
for node in self.node2pending:
self.check_schedule(node)

def remove_pending_tests_from_node(
self,
node: WorkerController,
indices: Sequence[int],
) -> None:
raise NotImplementedError()

def check_schedule(self, node: WorkerController, duration: float = 0) -> None:
"""Maybe schedule new items on the node."""
if node.shutting_down:
return

if self.pending:
# how many nodes do we have?
num_nodes = len(self.node2pending)
# if our node goes below a heuristic minimum, fill it out to
# heuristic maximum
items_per_node_min = max(2, len(self.pending) // num_nodes // 4)
items_per_node_max = max(2, len(self.pending) // num_nodes // 2)
node_pending = self.node2pending[node]
if len(node_pending) < items_per_node_min:
if duration >= 0.1 and len(node_pending) >= 2:
# seems the node is doing long-running tests
# and has enough items to continue
# so let's rather wait with sending new items
return
num_send = items_per_node_max - len(node_pending)
# keep at least 2 tests pending even if --maxschedchunk=1
maxschedchunk = max(2 - len(node_pending), self.maxschedchunk)
self._send_tests(node, min(num_send, maxschedchunk))
else:
node.shutdown()

self.log("num items waiting for node:", len(self.pending))

def remove_node(self, node: WorkerController) -> str | None:
"""Remove a node from the scheduler."""
pending = self.node2pending.pop(node)

# If this is the first node (collector), reset it
if node == self.first_node:
self.first_node = None

if not pending:
return None

# Reassign pending items if the node had any
assert self.collection is not None
crashitem = self.collection[pending.pop(0)]
self.pending.extend(pending)
for node in self.node2pending:
self.check_schedule(node)
return crashitem

def schedule(self) -> None:
"""Initiate distribution of the test collection."""
assert self.collection_is_completed

# Initial distribution already happened, reschedule on all nodes
if self.pending:
for node in self.nodes:
self.check_schedule(node)
return

# Initialize the index of pending items
assert self.collection is not None
self.pending[:] = range(len(self.collection))
if not self.collection:
return

if self.maxschedchunk is None:
self.maxschedchunk = len(self.collection)

# Send a batch of tests to run. If we don't have at least two
# tests per node, we have to send them all so that we can send
# shutdown signals and get all nodes working.
if len(self.pending) < 2 * len(self.nodes):
# Distribute tests round-robin
nodes = cycle(self.nodes)
for _ in range(len(self.pending)):
self._send_tests(next(nodes), 1)
else:
# how many items per node do we have about?
items_per_node = len(self.collection) // len(self.node2pending)
# take a fraction of tests for initial distribution
node_chunksize = min(items_per_node // 4, self.maxschedchunk)
node_chunksize = max(node_chunksize, 2)
# and initialize each node with a chunk of tests
for node in self.nodes:
self._send_tests(node, node_chunksize)

if not self.pending:
# initial distribution sent all tests, start node shutdown
for node in self.nodes:
node.shutdown()

def _send_tests(self, node: WorkerController, num: int) -> None:
tests_per_node = self.pending[:num]
if tests_per_node:
del self.pending[:num]
self.node2pending[node].extend(tests_per_node)
node.send_runtest_some(tests_per_node)
63 changes: 63 additions & 0 deletions testing/acceptance_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,69 @@
import xdist


class TestSingleCollectScheduling:
def test_singlecollect_mode(self, pytester: pytest.Pytester) -> None:
"""Test that the singlecollect distribution mode works."""
# Create a simple test file
p1 = pytester.makepyfile(
"""
def test_ok():
pass
"""
)
result = pytester.runpytest(p1, "-n2", "--dist=singlecollect", "-v")
assert result.ret == 0
result.stdout.fnmatch_lines(["*1 passed*"])
# Make sure the tests are correctly distributed
result.stdout.fnmatch_lines(["*scheduling tests via SingleCollectScheduling*"])

def test_singlecollect_many_tests(self, pytester: pytest.Pytester) -> None:
"""Test that the singlecollect mode correctly distributes many tests."""
# Create test file with multiple tests
p1 = pytester.makepyfile(
"""
import pytest
@pytest.mark.parametrize("x", range(10))
def test_ok(x):
assert True
"""
)
result = pytester.runpytest(p1, "-n2", "--dist=singlecollect", "-v")
assert result.ret == 0
result.stdout.fnmatch_lines(["*passed*"])
# Make sure the tests are correctly distributed
result.stdout.fnmatch_lines(["*scheduling tests via SingleCollectScheduling*"])

def test_singlecollect_failure(self, pytester: pytest.Pytester) -> None:
"""Test that failures are correctly reported with singlecollect mode."""
p1 = pytester.makepyfile(
"""
def test_fail():
assert 0
"""
)
result = pytester.runpytest(p1, "-n2", "--dist=singlecollect", "-v")
assert result.ret == 1
result.stdout.fnmatch_lines(["*1 failed*"])

def test_singlecollect_handles_fixtures(self, pytester: pytest.Pytester) -> None:
"""Test that fixtures work correctly with singlecollect mode."""
pytester.makepyfile(
"""
import pytest

@pytest.fixture
def my_fixture():
return 42
def test_with_fixture(my_fixture):
assert my_fixture == 42
"""
)
result = pytester.runpytest("-n2", "--dist=singlecollect", "-v")
assert result.ret == 0
result.stdout.fnmatch_lines(["*1 passed*"])


class TestDistribution:
def test_n1_pass(self, pytester: pytest.Pytester) -> None:
p1 = pytester.makepyfile(
Expand Down
Loading