Skip to content

interp on chunked dimension can fail (oom) #10130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
abiasiol opened this issue Mar 14, 2025 · 3 comments
Open

interp on chunked dimension can fail (oom) #10130

abiasiol opened this issue Mar 14, 2025 · 3 comments

Comments

@abiasiol
Copy link

abiasiol commented Mar 14, 2025

What is your issue?

Hi,
I am using interp with linear method along a sorted chunked dimension.
From the Dask dashboard I see workers failing and restarting, till it fails, with messages along the lines of:

distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:45829 (pid=65947) exceeded 95% memory budget. Restarting...
....
distributed.scheduler - ERROR - Task ('chunked_aware_interpnd-transpose-transpose-store-map-f31ac35773dcc2e7a00aa402e89cfe6b', 0, 4, 1) marked as failed because 4 workers died while trying to run it

I allocated 8GB per worker with the distributed scheduler, but you can reproduce the issue even with the default one.

Up to xarray<=2024.11.0 I was able to make it work using a trick (below).

I can successfully run with 2024.11.0 and dask[distributed]<=2025.2.0, so it is "independent" from Dask versions.

I have seen a few discussions in PRs and Issues, but failed to get closure on what would be the best approach in this case. Since my time axis is sorted, interpolation is linear, it should be possible to not have to load in memory all chunks along that axis. Any suggestions? Common ways of dealing with this?

1. Generate data

import pandas as pd
import xarray as xr
import dask.array as da

n_a = 5
n_b = 400
n_avg = 360
n_raw = 1_000_000
time1 = pd.date_range(start="2024-01-01", periods=n_avg, freq="8h")
time2 = pd.date_range(start="2023-12-28", periods=n_raw, freq="min")

data_avg = da.random.normal(size=(n_avg, n_a, n_b), chunks=(360, 1, 200))
data_raw = da.random.normal(size=(n_raw, n_a, n_b), chunks=(8_000, 1, 200))

da_avg = xr.DataArray(data_avg, dims=["time", "a", "b"], coords={"time": time1})
da_avg.to_zarr("da_avg.zarr", mode="w")

da_raw = xr.DataArray(data_raw, dims=["time", "a", "b"], coords={"time": time2})
da_raw.to_zarr("da_raw.zarr", mode="w")

da_raw = xr.open_dataarray(
    "da_raw.zarr",
    engine="zarr",
    chunks={"time": 8_000, "a": 1, "b": 200},
)
da_avg = xr.open_dataarray(
    "da_avg.zarr",
    engine="zarr",
    chunks={"time": 360, "a": 1, "b": 200},
)

# evaluate on this array, basically da_raw["time"] but chunked
time_eval = xr.DataArray(
    da_raw["time"],
    dims=["time"],
    coords={"time": da_raw["time"]},
    name="time_eval",
).chunk({"time": 8_000})

Interp

Now the following does NOT work (regardless of the xarray version), memory issues.

interp_da = da_avg.interp(
    time=time_eval,
    assume_sorted=True,
    method="linear",
    kwargs={"fill_value": "extrapolate", "bounds_error": False},
)
interp_da.to_zarr("interp_da.zarr", mode="w")

Interp with trick to maintain dask indexer

The following DOES work in xarray 2024.11.0, but not with 2025.1.2, I suspect because of the apply_ufunc change to interp.

interp_da = da_avg.interp(
    time=time_eval.expand_dims({"temp": 1}),
    assume_sorted=True,
    method="linear",
    kwargs={"fill_value": "extrapolate", "bounds_error": False},
)
interp_da.to_zarr("interp_da.zarr", mode="w")

Sanity check vs scipy

Sanity check the results I got with 2024.11.0 were indeed the expected ones:

from scipy.interpolate import interp1d

y = da_avg.isel(a=0, b=0).to_numpy()
x = da_avg["time"].to_numpy().astype(int)
f = interp1d(x, y, kind="linear", fill_value="extrapolate", bounds_error=False)
y_eval = f(time_eval.to_numpy().astype(int))
y_xr = interp_da.isel(temp=0, a=0, b=0).to_numpy()
assert np.allclose(y_eval, y_xr)
@abiasiol abiasiol added the needs triage Issue that has not been reviewed by xarray team member label Mar 14, 2025
@dcherian dcherian added upstream issue and removed needs triage Issue that has not been reviewed by xarray team member labels Mar 17, 2025
@dcherian
Copy link
Contributor

I believe this is dask/dask#11613 and has been fixed, but not released.

I don't understand why your trick works on the older versions.

@abiasiol
Copy link
Author

Thank you, @dcherian . I can only make it work by manual rechunking, which is a bit cumbersome.

With xarray==2025.1.2 I tried:

Test 1

Fails both on dask=2025.2.0 and dask==2025.2.0+49.gdf372bdc (which contains 11683).

interp_da = da_avg.interp(
    time=time_eval,
    assume_sorted=True,
    method="linear",
    kwargs={"fill_value": "extrapolate", "bounds_error": False},
)
interp_da.to_zarr("interp_da.zarr", mode="w")
.../dask/array/gufunc.py:485): PerformanceWarning: Increasing number of chunks by factor of 10
  tmp = blockwise(
...
distributed.worker.memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS
distributed.worker.memory - WARNING - Worker is at 88% memory usage. Pausing worker.  Process memory: 6.59 GiB -- Worker memory limit: 7.45 GiB
...
distributed.worker - ERROR - Worker stream died during communication
...
KilledWorker: Attempted to run task ('interpnd-transpose-store-map-70b1c14b32cb76304e3b56617e0666d3', 0, 3, 0) on 4 different workers, but all those workers died while running it

Test 2

Manual rechunking works (both on dask=2025.2.0 and dask==2025.2.0+49.gdf372bdc), memory is surprisingly well behaved, lots of worrying warnings, but I don't visually see anything wrong when looking at the dashboard (no reversing of progress, no workers dying).

interp_da = da_avg.chunk({"a": 1, "b": 5}).interp(
    time=time_eval,
    assume_sorted=True,
    method="linear",
    kwargs={"fill_value": "extrapolate", "bounds_error": False},
)
interp_da = interp_da.chunk({"time": 8_000, "a": 1, "b": 200})
interp_da.to_zarr("interp_da.zarr", mode="w")
.../dask/array/gufunc.py:458: PerformanceWarning: Increasing number of chunks by factor of 125
  tmp = blockwise(
distributed.shuffle._scheduler_plugin - WARNING - Shuffle c1dcafd4ae9808c50b34610dd56bd405 initialized by task ('interpnd-rechunk-transfer-f2dfcdb090be66243ab64f8bfba433b1', 0, 4, 1, 0, 4, 27) executed on worker
...
distributed.shuffle._scheduler_plugin - WARNING - Shuffle c1dcafd4ae9808c50b34610dd56bd405 deactivated due to stimulus 'task-finished-1742232630.492225'
...
distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:35115 (pid=22477) exceeded 95% memory budget. Restarting...
distributed.scheduler - ERROR - Task ('interpnd-transpose-store-map-70b1c14b32cb76304e3b56617e0666d3', 0, 3, 0) marked as failed because 4 workers died while trying to run it
distributed.scheduler - WARNING - Removing worker 'tcp://127.0.0.1:35115' caused the cluster to lose already computed task(s), which will be recomputed elsewhere
2025-03-17 17:38:28,129 - distributed.nanny - WARNING - Restarting worker

Profile looks fine, I think.

Image

@abiasiol
Copy link
Author

I can reproduce with Dask 2025.3.0, too.

@phofl what do you think about this issue? Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants