Skip to content

Add open_virtual_mfdataset #349

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Mar 29, 2025
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a48e8a4
copy implementation from xarray
TomNicholas Dec 15, 2024
75c7da3
sketch idea for lithops parallelization
TomNicholas Dec 15, 2024
ce5a096
standardize naming of variables
TomNicholas Dec 16, 2024
bcf1b70
add to public API
TomNicholas Dec 16, 2024
61f0f32
fix errors caused by trying to import xarray types
TomNicholas Dec 16, 2024
5317207
start writing tests
TomNicholas Dec 16, 2024
cd54328
passing test for combining in serial
TomNicholas Dec 17, 2024
323904c
Merge branch 'main' into open_virtual_mfdataset
TomNicholas Dec 17, 2024
c229c06
requires_kerchunk
TomNicholas Dec 17, 2024
f296ef9
test for lithops with default LocalHost executor
TomNicholas Dec 17, 2024
542f063
notes on confusing AssertionError
TomNicholas Dec 17, 2024
a013b2c
ensure lithops is installed
TomNicholas Dec 17, 2024
f5123cf
remove uneeded fixture
TomNicholas Dec 17, 2024
f134644
Merge branch 'main' into open_virtual_mfdataset
TomNicholas Dec 18, 2024
a2c64d0
Merge branch 'main' into open_virtual_mfdataset
TomNicholas Dec 18, 2024
86f2daf
Merge branch 'main' into open_virtual_mfdataset
TomNicholas Dec 18, 2024
ae8d31d
Merge branch 'develop' into open_virtual_mfdataset
TomNicholas Mar 24, 2025
b5f382e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 24, 2025
1507378
Additions to `open_virtual_mfdataset` (#508)
TomNicholas Mar 25, 2025
f4dcdf6
Merge branch 'develop' into open_virtual_mfdataset
TomNicholas Mar 25, 2025
72aa697
Additions to `open_virtual_mfdataset` (#509)
TomNicholas Mar 25, 2025
6418c82
Merge branch 'develop' into open_virtual_mfdataset
TomNicholas Mar 25, 2025
abf46d5
More open_virtual_mfdataset (#510)
TomNicholas Mar 25, 2025
5e851f3
Merge branch 'develop' into open_virtual_mfdataset
TomNicholas Mar 29, 2025
bba378b
Final fixes for open_virtual_mfdataset (#517)
TomNicholas Mar 29, 2025
92f03a6
Apply suggestions from code reviewRemRemove new deps
TomNicholas Mar 29, 2025
1e0643e
remove rogue print statement
TomNicholas Mar 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Reading
:toctree: generated/

open_virtual_dataset
open_virtual_mfdataset
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: the docs and especially the readme should be rewritten to put this function front and center.


Serialization
-------------
Expand Down
8 changes: 7 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ classifiers = [
requires-python = ">=3.11"
dynamic = ["version"]
dependencies = [
"xarray>=2025.1.1",
"xarray>=2025.3.0",
"numpy>=2.0.0",
"universal-pathlib",
"numcodecs>=0.15.1",
Expand Down Expand Up @@ -119,9 +119,15 @@ dev = [
"pytest-asyncio",
"pytest-cov",
"pytest-mypy",
"pytest",
"pytest-asyncio",
"pytest-xdist",
"ruff",
"s3fs",
"scipy",
"lithops",
"dask",
"virtualizarr[hdf]"
]

[project.urls]
Expand Down
2 changes: 1 addition & 1 deletion virtualizarr/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from virtualizarr.manifests import ChunkManifest, ManifestArray # type: ignore # noqa
from virtualizarr.accessor import VirtualiZarrDatasetAccessor # type: ignore # noqa
from virtualizarr.backend import open_virtual_dataset # noqa: F401
from virtualizarr.backend import open_virtual_dataset, open_virtual_mfdataset # noqa: F401

from importlib.metadata import version as _version

Expand Down
195 changes: 194 additions & 1 deletion virtualizarr/backend.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
import functools
import os
import warnings
from collections.abc import Iterable, Mapping
from concurrent.futures import Executor
from enum import Enum, auto
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Callable,
Literal,
Optional,
Sequence,
cast,
)

from xarray import Dataset, Index
from xarray import DataArray, Dataset, Index, combine_by_coords
from xarray.backends.common import _find_absolute_paths
from xarray.structure.combine import _infer_concat_order_from_positions, _nested_combine

from virtualizarr.parallel import SerialExecutor
from virtualizarr.readers import (
DMRPPVirtualBackend,
FITSVirtualBackend,
Expand All @@ -20,6 +31,15 @@
from virtualizarr.readers.api import VirtualBackend
from virtualizarr.utils import _FsspecFSFromFilepath

if TYPE_CHECKING:
from xarray.core.types import (
CombineAttrsOptions,
CompatOptions,
JoinOptions,
NestedSequence,
)


# TODO add entrypoint to allow external libraries to add to this mapping
VIRTUAL_BACKENDS = {
"kerchunk": KerchunkVirtualBackend,
Expand Down Expand Up @@ -66,6 +86,7 @@ def automatically_determine_filetype(
# TODO this should ideally handle every filetype that we have a reader for, not just kerchunk

# TODO how do we handle kerchunk json / parquet here?
print(f"{filepath=}")
if Path(filepath).suffix == ".zarr":
# TODO we could imagine opening an existing zarr store, concatenating it, and writing a new virtual one...
raise NotImplementedError()
Expand Down Expand Up @@ -197,3 +218,175 @@ def open_virtual_dataset(
)

return vds


def open_virtual_mfdataset(
paths: str
| os.PathLike
| Sequence[str | os.PathLike]
| "NestedSequence[str | os.PathLike]",
concat_dim: (
str
| DataArray
| Index
| Sequence[str]
| Sequence[DataArray]
| Sequence[Index]
| None
) = None,
compat: "CompatOptions" = "no_conflicts",
preprocess: Callable[[Dataset], Dataset] | None = None,
data_vars: Literal["all", "minimal", "different"] | list[str] = "all",
coords="different",
combine: Literal["by_coords", "nested"] = "by_coords",
parallel: Literal[False] | Executor = False,
join: "JoinOptions" = "outer",
attrs_file: str | os.PathLike | None = None,
combine_attrs: "CombineAttrsOptions" = "override",
**kwargs,
) -> Dataset:
"""Open multiple files as a single virtual dataset.

If combine='by_coords' then the function ``combine_by_coords`` is used to combine
the datasets into one before returning the result, and if combine='nested' then
``combine_nested`` is used. The filepaths must be structured according to which
combining function is used, the details of which are given in the documentation for
``combine_by_coords`` and ``combine_nested``. By default ``combine='by_coords'``
will be used. Global attributes from the ``attrs_file`` are used
for the combined dataset.

Parameters
----------
paths
Same as in xarray.open_mfdataset
concat_dim
Same as in xarray.open_mfdataset
compat
Same as in xarray.open_mfdataset
preprocess
Same as in xarray.open_mfdataset
data_vars
Same as in xarray.open_mfdataset
coords
Same as in xarray.open_mfdataset
combine
Same as in xarray.open_mfdataset
parallel : instance of a subclass of ``concurrent.futures.Executor``, or False
Specify whether the open and preprocess steps of this function will be
performed in parallel using any executor compatible with the ``concurrent.futures`` interface
(such as those provided by Lithops), or in serial.
Default is False, which will execute these steps in serial.
join
Same as in xarray.open_mfdataset
attrs_file
Same as in xarray.open_mfdataset
combine_attrs
Same as in xarray.open_mfdataset
**kwargs : optional
Additional arguments passed on to :py:func:`virtualizarr.open_virtual_dataset`. For an
overview of some of the possible options, see the documentation of
:py:func:`virtualizarr.open_virtual_dataset`.

Returns
-------
xarray.Dataset

Notes
-----
The results of opening each virtual dataset in parallel are sent back to the client process, so must not be too large.
"""

# TODO this is practically all just copied from xarray.open_mfdataset - an argument for writing a virtualizarr engine for xarray?

# TODO list kwargs passed to open_virtual_dataset explicitly in docstring?

paths = _find_absolute_paths(paths)

if not paths:
raise OSError("no files to open")

if preprocess:
# TODO
raise NotImplementedError

paths1d: list[str]
if combine == "nested":
if isinstance(concat_dim, str | DataArray) or concat_dim is None:
concat_dim = [concat_dim] # type: ignore[assignment]

# This creates a flat list which is easier to iterate over, whilst
# encoding the originally-supplied structure as "ids".
# The "ids" are not used at all if combine='by_coords`.
combined_ids_paths = _infer_concat_order_from_positions(paths)
ids, paths1d = (
list(combined_ids_paths.keys()),
list(combined_ids_paths.values()),
)
elif concat_dim is not None:
raise ValueError(
"When combine='by_coords', passing a value for `concat_dim` has no "
"effect. To manually combine along a specific dimension you should "
"instead specify combine='nested' along with a value for `concat_dim`.",
)
else:
paths1d = paths # type: ignore[assignment]

executor: Executor = SerialExecutor if parallel is False else parallel
with executor() as exec:
# wait for all the workers to finish, and send their resulting virtual datasets back to the client for concatenation there
virtual_datasets = list(
exec.map(
functools.partial(open_virtual_dataset, **kwargs),
paths1d,
)
)

# TODO add file closers

# Combine all datasets, closing them in case of a ValueError
try:
if combine == "nested":
# Combined nested list by successive concat and merge operations
# along each dimension, using structure given by "ids"
combined_vds = _nested_combine(
virtual_datasets,
concat_dims=concat_dim,
compat=compat,
data_vars=data_vars,
coords=coords,
ids=ids,
join=join,
combine_attrs=combine_attrs,
)
elif combine == "by_coords":
# Redo ordering from coordinates, ignoring how they were ordered
# previously
combined_vds = combine_by_coords(
virtual_datasets,
compat=compat,
data_vars=data_vars,
coords=coords,
join=join,
combine_attrs=combine_attrs,
)
else:
raise ValueError(
f"{combine} is an invalid option for the keyword argument ``combine``"
)
except ValueError:
for vds in virtual_datasets:
vds.close()
raise

# combined_vds.set_close(partial(_multi_file_closer, closers))

# read global attributes from the attrs_file or from the first dataset
if attrs_file is not None:
if isinstance(attrs_file, os.PathLike):
attrs_file = cast(str, os.fspath(attrs_file))
combined_vds.attrs = virtual_datasets[paths1d.index(attrs_file)].attrs

# TODO should we just immediately close everything?
# TODO We should have already read everything we're ever going to read into memory at this point

return combined_vds
Loading
Loading