Skip to content

Add open_virtual_mfdataset #349

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Mar 29, 2025
Merged

Add open_virtual_mfdataset #349

merged 27 commits into from
Mar 29, 2025

Conversation

TomNicholas
Copy link
Member

@TomNicholas TomNicholas commented Dec 16, 2024

Here I have copied the code from xr.open_mfdataset, changed it to use open_virtual_dataset, and added an option to parallelize with lithops as an alternative to using dask.delayed.

I haven't even tried to run this yet, but I think this is the right approach @tomwhite? I realised we don't need cubed's blockwise because xarray.open_mfdataset has internal logic to turn the N-dimensional concat into a 1D map already, so lithops.map should be fine?

Also I think based on our conversation we should be able to use lithops.map instead of lithops.map_reduce like @thodson-usgs did in #203 because the tiny size of the virtual datasets being returned to the client means that we should be able to get away with a single reduction step on the client even at large scale? (see also #104 for justification that we only need to send back kB-sized objects).

datasets, closers = dask.compute(datasets, closers)
elif parallel == "lithops":

def generate_refs(path):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the equivalent of @thodson-usgs 's map_references function


# wait for all the serverless workers to finish, and send their resulting virtual datasets back to the client
completed_futures, _ = fn_exec.wait(futures, download_results=True)
virtual_datasets = [future.get_result() for future in completed_futures]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC this will cause every serverless worker to send a small virtual dataset back to the client process over the internet somehow

Comment on lines 18 to 20
from xarray.backends.api import _multi_file_closer
from xarray.backends.common import _find_absolute_paths
from xarray.core.combine import _infer_concat_order_from_positions, _nested_combine
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like importing these deep xarray internals like this (though _infer_concat_order_from_positions and _nested_combine haven't changed since I wrote them 6 years ago), but the only alternative would be to make a general virtualizarr backend engine for xarray (see #35).

Comment on lines 351 to 352
# lithops doesn't have a delayed primitive
open_ = open_virtual_dataset
Copy link
Member Author

@TomNicholas TomNicholas Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code would be more straightforward if the parallel primitive we used for lithops was the same as the one we used for dask.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 344 to 345
elif parallel == "lithops":
import lithops
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe all of this could also be useful upstream in xr.open_mfdataset

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tomwhite
Copy link
Collaborator

I realised we don't need cubed's blockwise because xarray.open_mfdataset has internal logic to turn the N-dimensional concat into a 1D map already, so lithops.map should be fine?

Yes, that should work fine. We may want to loosen/generalize blockwise slightly in Cubed to return an arbitrary object so it can be done with Cubed - but that can be done later.

Also I think based on our conversation we should be able to use lithops.map instead of lithops.map_reduce like @thodson-usgs did in #203 because the tiny size of the virtual datasets being returned to the client means that we should be able to get away with a single reduction step on the client even at large scale? (see also #104 for justification that we only need to send back kB-sized objects).

Agreed - it will be interesting to see this for large datasets. (It's also similar to the approach I've taken for storing data in Icechunk where the changesets are returned to the client - again, small kB-sized UUIDs.)

@TomNicholas TomNicholas changed the base branch from main to develop March 24, 2025 15:45
TomNicholas and others added 2 commits March 25, 2025 13:26
* need latest version of xarray to import internals correctly

* Fix metadata equality for nan fill value (#502)

* add check that works for fill_values too

* note about removing once merged upstream

* type hint

* regression test

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Remove accidental changes to pyproject.toml

* Update pyproject.toml

* ignore mypy

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

* Setup intersphinx mapping for docs (#503)

* Setup intersphinx mapping for docs

---------

Co-authored-by: Kyle Barron <[email protected]>

* Change default loadable_variables (and indexes) to match xarray's behaviour (#477)

* draft refactor

* sketch of simplified handling of loadable_variables

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* get at least some tests working

* separate VirtualBackend api definition from common utilities

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* remove indexes={} everywhere in tests

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* stop passing through loadable_variables to where it isn't used

* implement logic to load 1D dimension coords by default

* remove more instances of indexes={}

* remove more indexes={}

* refactor logic for choosing loadable_variables

* fix more tets

* xfail Aimee's test that I don't understand

* xfail test that explicitly specifies no indexes

* made a bunch more stuff pass

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix netcdf3 reader

* fix bad import in FITS reader

* fix import in tiff reader

* fix import in icechunk test

* release note

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* update docstring

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix fits reader

* xfail on empty dict for indexes

* linting

* actually test new expected behaviour

* fix logic for setting loadable_variables

* update docs page to reflect new behaviour

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix expected behaviour in another tests

* additional assert

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* use encode_dataset_coordinates in kerchunk writer

* Encode zarr vars

* fix some mypy errors

* move drop_variables implmentation to the end of every reader

* override loadable_variables and raise warning

* fix failing test by not creating loadable variables that would get inlined by default

* improve error message

* remove some more occurrences of indexes={}

* skip slow test

* slay mypy errors

* docs typos

* should fix dmrpp test

* Delete commented-out code

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* remove unecessary test skip

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Max Jones <[email protected]>

* Update pyproject.toml deps (#504)

* re-add icechunk to upstream tests

* add pytest-asyncio to test envs

* passing serial open_virtual_mfdataset test

* passes with lithops but only for the HDF backend

* add test for dask

* refactored serial and lithops codepaths to use an executor pattern

* xfail lithops

* consolidate tests by parametrizing over parallel kwarg

* re-enable lithops test

* remove unneeded get_executor function

* add test for using dask distributed to parallelize

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Max Jones <[email protected]>
Co-authored-by: Kyle Barron <[email protected]>
@TomNicholas TomNicholas marked this pull request as ready for review March 25, 2025 20:28
TomNicholas and others added 2 commits March 25, 2025 14:39
* need latest version of xarray to import internals correctly

* passing serial open_virtual_mfdataset test

* passes with lithops but only for the HDF backend

* add test for dask

* refactored serial and lithops codepaths to use an executor pattern

* xfail lithops

* consolidate tests by parametrizing over parallel kwarg

* re-enable lithops test

* remove unneeded get_executor function

* add test for using dask distributed to parallelize

* Add ManifestStore for loading data from ManifestArrays (#490)

* Draft ManifestStore implementation

---------

Co-authored-by: Tom Nicholas <[email protected]>
Co-authored-by: Kyle Barron <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

* make it work for dask delayed

* correct docstring

---------

Co-authored-by: Max Jones <[email protected]>
Co-authored-by: Kyle Barron <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
* need latest version of xarray to import internals correctly

* passing serial open_virtual_mfdataset test

* passes with lithops but only for the HDF backend

* add test for dask

* refactored serial and lithops codepaths to use an executor pattern

* xfail lithops

* consolidate tests by parametrizing over parallel kwarg

* re-enable lithops test

* remove unneeded get_executor function

* add test for using dask distributed to parallelize

* make it work for dask delayed

* correct docstring

* added compliant executor for lithops

* add links to lithops issues

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Comment on lines 524 to 525
pytest.mark.xfail(
reason="Lithops bug - see https://github.com/lithops-cloud/lithops/issues/1428"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to dodge this by not using functools.partial, but I'm a bit worried that my newer approach of using a closure won't work properly in a remote execution context.

Comment on lines +187 to +192
class LithopsEagerFunctionExecutor(Executor):
"""
Lithops-based function executor which follows the concurrent.futures.Executor API.

Only required because lithops doesn't follow the concurrent.futures.Executor API, see https://github.com/lithops-cloud/lithops/issues/1427.
"""
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* need latest version of xarray to import internals correctly

* passing serial open_virtual_mfdataset test

* passes with lithops but only for the HDF backend

* add test for dask

* refactored serial and lithops codepaths to use an executor pattern

* xfail lithops

* consolidate tests by parametrizing over parallel kwarg

* re-enable lithops test

* remove unneeded get_executor function

* add test for using dask distributed to parallelize

* make it work for dask delayed

* correct docstring

* added compliant executor for lithops

* add links to lithops issues

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* specify dask and lithops executors with a string again

* fix easy typing stuff

* fix typing errors by aligning executor signatures

* remove open_virtual_mfdataset from public API for now

* release note

* refactor construction of expected result

* implement preprocess arg, and dodge lithops bug

* update comment

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
@TomNicholas TomNicholas merged commit 810d4d0 into develop Mar 29, 2025
11 checks passed
@TomNicholas TomNicholas deleted the open_virtual_mfdataset branch March 29, 2025 18:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants