Skip to content

Commit 3d9ebe3

Browse files
authored
Add rechunking notebook for reference (#22)
1 parent b69a5e8 commit 3d9ebe3

File tree

2 files changed

+222
-0
lines changed

2 files changed

+222
-0
lines changed

rechunking/environment.yml

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
name: rechunking
2+
channels:
3+
- conda-forge
4+
- nodefaults
5+
dependencies:
6+
- python=3.12
7+
- jupyterlab
8+
- xarray
9+
- zarr
10+
- dask
11+
- dask-jobqueue

rechunking/era5_rechunking.ipynb

+211
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": null,
6+
"id": "153c98aa-2f72-4cb4-a96a-c01374d84930",
7+
"metadata": {},
8+
"outputs": [],
9+
"source": [
10+
"# Dask imports\n",
11+
"\n",
12+
"from dask_jobqueue import PBSCluster\n",
13+
"from dask.distributed import Client"
14+
]
15+
},
16+
{
17+
"cell_type": "code",
18+
"execution_count": null,
19+
"id": "338d23f5-92d2-422b-bbe4-01955aceff50",
20+
"metadata": {},
21+
"outputs": [],
22+
"source": [
23+
"# Dask cluster config\n",
24+
"\n",
25+
"cluster = PBSCluster(\n",
26+
" # Basic job directives\n",
27+
" job_name = 'hackathon-rechunk',\n",
28+
" queue = 'casper',\n",
29+
" walltime = '120:00',\n",
30+
" # Make sure you change the project code if running this notebook!!\n",
31+
" account = 'UCSG0002',\n",
32+
" log_directory = 'dask-logs',\n",
33+
" # These settings impact the resources assigned to the job\n",
34+
" cores = 1,\n",
35+
" memory = '10GiB',\n",
36+
" resource_spec = 'select=1:ncpus=1:mem=10GB',\n",
37+
" # These settings define the resources assigned to a worker\n",
38+
" processes = 1,\n",
39+
" # This controls where Dask will write data to disk if memory is exhausted\n",
40+
" local_directory = '/local_scratch/pbs.$PBS_JOBID/dask/spill',\n",
41+
" # This specifies which network interface the cluster will use\n",
42+
" interface = 'ext'\n",
43+
")"
44+
]
45+
},
46+
{
47+
"cell_type": "code",
48+
"execution_count": null,
49+
"id": "0d4322e9-cc4f-4b45-815c-9b8228eb03a2",
50+
"metadata": {},
51+
"outputs": [],
52+
"source": [
53+
"# Create the client to load the Dashboard\n",
54+
"client = Client(cluster)\n",
55+
"\n",
56+
"# Display the client repr\n",
57+
"client"
58+
]
59+
},
60+
{
61+
"cell_type": "code",
62+
"execution_count": null,
63+
"id": "0c737b08-9cf2-4e90-9646-2013641815b7",
64+
"metadata": {},
65+
"outputs": [],
66+
"source": [
67+
"# Scale and wait for workers\n",
68+
"\n",
69+
"cluster.scale(40)\n",
70+
"client.wait_for_workers(40)"
71+
]
72+
},
73+
{
74+
"cell_type": "code",
75+
"execution_count": null,
76+
"id": "9d7d7583-c695-43c9-86a8-12f20b5d432d",
77+
"metadata": {
78+
"scrolled": true
79+
},
80+
"outputs": [],
81+
"source": [
82+
"import xarray as xr\n",
83+
"import pandas as pd\n",
84+
"import dask\n",
85+
"\n",
86+
"# Read in files\n",
87+
"ds = xr.open_mfdataset('/glade/derecho/scratch/ksha/CREDIT_data/ERA5_mlevel_arXiv/SixHourly_y_TOTAL_202*.zarr',\n",
88+
" engine = 'zarr',\n",
89+
" consolidated=True,\n",
90+
" data_vars='minimal',\n",
91+
" coords='minimal',\n",
92+
" compat='override',\n",
93+
" parallel=True)\n",
94+
"\n",
95+
"# Rechunk the data\n",
96+
"ds = ds.chunk({\"time\": 1, \"level\": 1, \"latitude\": 640, \"longitude\": 1280})\n",
97+
"\n",
98+
"# Remove the old encoding info and set compression to none\n",
99+
"for k, v in ds.variables.items():\n",
100+
" v.encoding['compressors'] = None\n",
101+
" del v.encoding['chunks']\n",
102+
" del v.encoding['preferred_chunks']\n",
103+
"\n",
104+
"# Remove the old encoding info (default compression will then apply when written to Zarr)\n",
105+
"# for k, v in ds.variables.items():\n",
106+
"# del v.encoding['compressors']\n",
107+
"# del v.encoding['chunks']\n",
108+
"# del v.encoding['preferred_chunks']\n"
109+
]
110+
},
111+
{
112+
"cell_type": "code",
113+
"execution_count": null,
114+
"id": "53fd0270-d21e-4f2f-a769-1701900f66f4",
115+
"metadata": {},
116+
"outputs": [],
117+
"source": [
118+
"# Some not particularly polished data wrangling to combine the arrays\n",
119+
"# Skip this to write separate arrays\n",
120+
"\n",
121+
"full_variables = ['Q', 'T', 'U', 'V']\n",
122+
"single_level_variables = ['Q500', 'T500', 'U500', 'V500', 'Z500', 't2m', 'SP']\n",
123+
"\n",
124+
"ds1 = xr.concat([ds[x] for x in single_level_variables],\n",
125+
" pd.Index(single_level_variables,\n",
126+
" name='channel')).transpose('time',\n",
127+
" 'channel',\n",
128+
" 'latitude',\n",
129+
" 'longitude')\n",
130+
"\n",
131+
"c = xr.concat([ds[x] for x in full_variables], dim=full_variables)\n",
132+
"\n",
133+
"s = c.stack(channel = ('concat_dim','level')).transpose('time',\n",
134+
" 'channel',\n",
135+
" 'latitude',\n",
136+
" 'longitude').reset_index('channel')\n",
137+
"\n",
138+
"s['channel'] = s['concat_dim'] + s['level'].astype('str')\n",
139+
"\n",
140+
"ds2 = s.drop_vars(['level', 'concat_dim'])\n",
141+
"\n",
142+
"combined = xr.concat([ds1, ds2], dim='channel').rename('combined')\n",
143+
"\n",
144+
"combined.encoding"
145+
]
146+
},
147+
{
148+
"cell_type": "code",
149+
"execution_count": null,
150+
"id": "a3b394cc-9186-4a83-8a5d-2fedc3f10825",
151+
"metadata": {
152+
"scrolled": true
153+
},
154+
"outputs": [],
155+
"source": [
156+
"# Write to Zarr v3 with consolidated metdata\n",
157+
"\n",
158+
"combined.to_zarr('/glade/derecho/scratch/katelynw/era5/rechunked_stacked_uncompressed_test.zarr',\n",
159+
" zarr_version=3,\n",
160+
" consolidated=True)"
161+
]
162+
},
163+
{
164+
"cell_type": "code",
165+
"execution_count": null,
166+
"id": "cd075006-9d9c-43b4-82ce-9cb1a7d1c576",
167+
"metadata": {},
168+
"outputs": [],
169+
"source": [
170+
"# Shutdown the cluster\n",
171+
"\n",
172+
"client.shutdown()"
173+
]
174+
},
175+
{
176+
"cell_type": "code",
177+
"execution_count": null,
178+
"id": "9eedf120-3afa-4e26-a345-f58cbdc032a7",
179+
"metadata": {},
180+
"outputs": [],
181+
"source": [
182+
"# Open up the new dataset and check the encoding\n",
183+
"\n",
184+
"ds_new = xr.open_dataset('/glade/derecho/scratch/katelynw/era5/rechunked_stacked_uncompressed_test.zarr')\n",
185+
"\n",
186+
"ds_new.combined.encoding"
187+
]
188+
}
189+
],
190+
"metadata": {
191+
"kernelspec": {
192+
"display_name": "Python [conda env:my-env]",
193+
"language": "python",
194+
"name": "conda-env-my-env-py"
195+
},
196+
"language_info": {
197+
"codemirror_mode": {
198+
"name": "ipython",
199+
"version": 3
200+
},
201+
"file_extension": ".py",
202+
"mimetype": "text/x-python",
203+
"name": "python",
204+
"nbconvert_exporter": "python",
205+
"pygments_lexer": "ipython3",
206+
"version": "3.12.9"
207+
}
208+
},
209+
"nbformat": 4,
210+
"nbformat_minor": 5
211+
}

0 commit comments

Comments
 (0)