Skip to content

Add open_virtual_mfdataset #349

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Mar 29, 2025
Merged
Changes from 2 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a48e8a4
copy implementation from xarray
TomNicholas Dec 15, 2024
75c7da3
sketch idea for lithops parallelization
TomNicholas Dec 15, 2024
ce5a096
standardize naming of variables
TomNicholas Dec 16, 2024
bcf1b70
add to public API
TomNicholas Dec 16, 2024
61f0f32
fix errors caused by trying to import xarray types
TomNicholas Dec 16, 2024
5317207
start writing tests
TomNicholas Dec 16, 2024
cd54328
passing test for combining in serial
TomNicholas Dec 17, 2024
323904c
Merge branch 'main' into open_virtual_mfdataset
TomNicholas Dec 17, 2024
c229c06
requires_kerchunk
TomNicholas Dec 17, 2024
f296ef9
test for lithops with default LocalHost executor
TomNicholas Dec 17, 2024
542f063
notes on confusing AssertionError
TomNicholas Dec 17, 2024
a013b2c
ensure lithops is installed
TomNicholas Dec 17, 2024
f5123cf
remove uneeded fixture
TomNicholas Dec 17, 2024
f134644
Merge branch 'main' into open_virtual_mfdataset
TomNicholas Dec 18, 2024
a2c64d0
Merge branch 'main' into open_virtual_mfdataset
TomNicholas Dec 18, 2024
86f2daf
Merge branch 'main' into open_virtual_mfdataset
TomNicholas Dec 18, 2024
ae8d31d
Merge branch 'develop' into open_virtual_mfdataset
TomNicholas Mar 24, 2025
b5f382e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 24, 2025
1507378
Additions to `open_virtual_mfdataset` (#508)
TomNicholas Mar 25, 2025
f4dcdf6
Merge branch 'develop' into open_virtual_mfdataset
TomNicholas Mar 25, 2025
72aa697
Additions to `open_virtual_mfdataset` (#509)
TomNicholas Mar 25, 2025
6418c82
Merge branch 'develop' into open_virtual_mfdataset
TomNicholas Mar 25, 2025
abf46d5
More open_virtual_mfdataset (#510)
TomNicholas Mar 25, 2025
5e851f3
Merge branch 'develop' into open_virtual_mfdataset
TomNicholas Mar 29, 2025
bba378b
Final fixes for open_virtual_mfdataset (#517)
TomNicholas Mar 29, 2025
92f03a6
Apply suggestions from code reviewRemRemove new deps
TomNicholas Mar 29, 2025
1e0643e
remove rogue print statement
TomNicholas Mar 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 228 additions & 1 deletion virtualizarr/backend.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
import os
import warnings
from collections.abc import Iterable, Mapping
from enum import Enum, auto
from functools import partial
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Callable,
Literal,
Optional,
Sequence,
cast,
)

from xarray import Dataset, Index
from xarray import DataArray, Dataset, Index, combine_by_coords
from xarray.backends.api import _multi_file_closer
from xarray.backends.common import _find_absolute_paths
from xarray.core.combine import _infer_concat_order_from_positions, _nested_combine
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like importing these deep xarray internals like this (though _infer_concat_order_from_positions and _nested_combine haven't changed since I wrote them 6 years ago), but the only alternative would be to make a general virtualizarr backend engine for xarray (see #35).


from virtualizarr.manifests import ManifestArray
from virtualizarr.readers import (
Expand All @@ -22,6 +32,15 @@
from virtualizarr.readers.common import VirtualBackend
from virtualizarr.utils import _FsspecFSFromFilepath, check_for_collisions

if TYPE_CHECKING:
from xarray.core.types import (
CombineAttrsOptions,
CompatOptions,
JoinOptions,
NestedSequence,
)


# TODO add entrypoint to allow external libraries to add to this mapping
VIRTUAL_BACKENDS = {
"kerchunk": KerchunkVirtualBackend,
Expand Down Expand Up @@ -209,3 +228,211 @@ def open_virtual_dataset(
)

return vds


def open_virtual_mfdataset(
paths: str | Sequence[str | os.PathLike] | NestedSequence[str | os.PathLike],
concat_dim: (
str
| DataArray
| Index
| Sequence[str]
| Sequence[DataArray]
| Sequence[Index]
| None
) = None,
compat: CompatOptions = "no_conflicts",
preprocess: Callable[[Dataset], Dataset] | None = None,
data_vars: Literal["all", "minimal", "different"] | list[str] = "all",
coords="different",
combine: Literal["by_coords", "nested"] = "by_coords",
parallel: Literal["lithops", "dask", False] = False,
join: JoinOptions = "outer",
attrs_file: str | os.PathLike | None = None,
combine_attrs: CombineAttrsOptions = "override",
**kwargs,
) -> Dataset:
"""Open multiple files as a single virtual dataset

If combine='by_coords' then the function ``combine_by_coords`` is used to combine
the datasets into one before returning the result, and if combine='nested' then
``combine_nested`` is used. The filepaths must be structured according to which
combining function is used, the details of which are given in the documentation for
``combine_by_coords`` and ``combine_nested``. By default ``combine='by_coords'``
will be used. Global attributes from the ``attrs_file`` are used
for the combined dataset.

Parameters
----------
paths
Same as in xarray.open_mfdataset
concat_dim
Same as in xarray.open_mfdataset
compat
Same as in xarray.open_mfdataset
preprocess
Same as in xarray.open_mfdataset
data_vars
Same as in xarray.open_mfdataset
coords
Same as in xarray.open_mfdataset
combine
Same as in xarray.open_mfdataset
parallel : 'dask', 'lithops', or False
Specify whether the open and preprocess steps of this function will be
performed in parallel using ``dask.delayed``, in parallel using ``lithops.map``, or in serial.
Default is False.
join
Same as in xarray.open_mfdataset
attrs_file
Same as in xarray.open_mfdataset
combine_attrs
Same as in xarray.open_mfdataset
**kwargs : optional
Additional arguments passed on to :py:func:`virtualizarr.open_virtual_dataset`. For an
overview of some of the possible options, see the documentation of
:py:func:`virtualizarr.open_virtual_dataset`.

Returns
-------
xarray.Dataset

Notes
-----
The results of opening each virtual dataset in parallel are sent back to the client process, so must not be too large.
"""

# TODO this is practically all just copied from xarray.open_mfdataset - an argument for writing a virtualizarr engine for xarray?

# TODO add options passed to open_virtual_dataset explicitly?

paths = _find_absolute_paths(paths)

if not paths:
raise OSError("no files to open")

paths1d: list[str]
if combine == "nested":
if isinstance(concat_dim, str | DataArray) or concat_dim is None:
concat_dim = [concat_dim] # type: ignore[assignment]

# This creates a flat list which is easier to iterate over, whilst
# encoding the originally-supplied structure as "ids".
# The "ids" are not used at all if combine='by_coords`.
combined_ids_paths = _infer_concat_order_from_positions(paths)
ids, paths1d = (
list(combined_ids_paths.keys()),
list(combined_ids_paths.values()),
)
elif concat_dim is not None:
raise ValueError(
"When combine='by_coords', passing a value for `concat_dim` has no "
"effect. To manually combine along a specific dimension you should "
"instead specify combine='nested' along with a value for `concat_dim`.",
)
else:
paths1d = paths # type: ignore[assignment]

if parallel == "dask":
import dask

# wrap the open_dataset, getattr, and preprocess with delayed
open_ = dask.delayed(open_virtual_dataset)
getattr_ = dask.delayed(getattr)
if preprocess is not None:
preprocess = dask.delayed(preprocess)
elif parallel == "lithops":
import lithops
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe all of this could also be useful upstream in xr.open_mfdataset

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


# TODO use RetryingFunctionExecutor instead?
# TODO what's the easiest way to pass the lithops config in?
fn_exec = lithops.FunctionExecutor()

# lithops doesn't have a delayed primitive
open_ = open_virtual_dataset
Copy link
Member Author

@TomNicholas TomNicholas Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code would be more straightforward if the parallel primitive we used for lithops was the same as the one we used for dask.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# TODO I don't know how best to chain this with the getattr, or if that closing stuff is even necessary for virtual datasets
# getattr_ = getattr
elif parallel is not False:
raise ValueError(
f"{parallel} is an invalid option for the keyword argument ``parallel``"
)
else:
open_ = open_virtual_dataset
getattr_ = getattr

if parallel == "dask":
datasets = [open_(p, **kwargs) for p in paths1d]
closers = [getattr_(ds, "_close") for ds in datasets]
if preprocess is not None:
datasets = [preprocess(ds) for ds in datasets]

# calling compute here will return the datasets/file_objs lists,
# the underlying datasets will still be stored as dask arrays
datasets, closers = dask.compute(datasets, closers)
elif parallel == "lithops":

def generate_refs(path):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the equivalent of @thodson-usgs 's map_references function

# allows passing the open_virtual_dataset function to lithops without evaluating it
vds = open_(path, **kwargs)
# TODO perhaps we should just load the loadable_vars here and close before returning?
return vds

futures = fn_exec.map(generate_refs, paths1d)

# wait for all the serverless workers to finish, and send their resulting virtual datasets back to the client
completed_futures, _ = fn_exec.wait(futures, download_results=True)
virtual_datasets = [future.get_result() for future in completed_futures]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC this will cause every serverless worker to send a small virtual dataset back to the client process over the internet somehow

elif parallel is False:
datasets = [open_(p, **kwargs) for p in paths1d]
closers = [getattr_(ds, "_close") for ds in datasets]
if preprocess is not None:
datasets = [preprocess(ds) for ds in datasets]

# Combine all datasets, closing them in case of a ValueError
try:
if combine == "nested":
# Combined nested list by successive concat and merge operations
# along each dimension, using structure given by "ids"
combined = _nested_combine(
virtual_datasets,
concat_dims=concat_dim,
compat=compat,
data_vars=data_vars,
coords=coords,
ids=ids,
join=join,
combine_attrs=combine_attrs,
)
elif combine == "by_coords":
# Redo ordering from coordinates, ignoring how they were ordered
# previously
combined = combine_by_coords(
virtual_datasets,
compat=compat,
data_vars=data_vars,
coords=coords,
join=join,
combine_attrs=combine_attrs,
)
else:
raise ValueError(
f"{combine} is an invalid option for the keyword argument"
" ``combine``"
)
except ValueError:
for ds in virtual_datasets:
ds.close()
raise

combined.set_close(partial(_multi_file_closer, closers))

# read global attributes from the attrs_file or from the first dataset
if attrs_file is not None:
if isinstance(attrs_file, os.PathLike):
attrs_file = cast(str, os.fspath(attrs_file))
combined.attrs = virtual_datasets[paths1d.index(attrs_file)].attrs

# TODO should we just immediately close everything?
# TODO We should have already read everything we're ever going to read into memory at this point

return combined
Loading