Skip to content

Extreme Slowness/Timeouts with Large Dataset (Chunked, Multi-Band Zarr Store) #3085

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
mickyals opened this issue May 23, 2025 · 5 comments

Comments

@mickyals
Copy link

mickyals commented May 23, 2025

Zarr version

v3.0.6

Environment

OS: Linux Ubuntu

Python: 3.11.7

Dependencies: numcodecs[pcodec]==0.15.1, xarray==2025.3.1, dask==2024.2.0

Description

A Zarr store containing GOES-16 satellite data (dimensions: t:8741, lat:8133, lon:8130, chunks (24, 512, 512)) exhibits severe performance issues:

  • Operations like .compute(), rolling-window means, and plotting take orders of magnitude longer than expected.
  • Jupyter kernels time out with IOStream.flush timed out during computation/plotting.

The final dataset of all 2023 GOES imagery is about 11TB with 6 more years of data left to be processed.

Steps to Reproduce

The minimal code logic for reproduction is below

import zarr
from numcodecs.zarr3 import PCodec
import numpy as np

# Creating store with similar structure
store = zarr.DirectoryStore("test_store.zarr")
root = zarr.group(store=store)

# Simulate a single band (repeat for multiple bands)
chunks = (24, 512, 512)
shape = (8741, 8133, 8130)
band = root.create_dataset(
    name="CMI_C07",
    shape=shape,
    chunks=chunks,
    dtype="uint16",
    compressor=zarr.Zstd(level=9),
    serializer=PCodec(level=9)
)

# Simulate data appending 
for i in range(0, shape[0], chunks[0]):
    band[i:i+chunks[0]] = np.random.randint(0, 65535, (chunks[0],) + chunks[1:], dtype="uint16")

# Consolidate metadata
zarr.consolidate_metadata(store)

Questions for Zarr Devs

  1. Are there known performance bottlenecks with high-dimensional, multi-band Zarr stores?
  2. Could chunking/sharding interact poorly with dask/xarray for time-series operations?
  3. Where could the bottlenecks be, and what strategies would improve performance?
  4. Recommendations and considerations in using zarr for generating very large public datasets.

Additional Context

Full Dataset creation code: https://github.com/mickyals/goes2zarr/blob/main/convert_goes_to_zarr.py

@ianhi
Copy link
Contributor

ianhi commented May 23, 2025

Hi @mickyals two follow up questions

  1. Are you sure this running on zarr 3? DirectoryStore is zarr 2 only (see https://zarr.readthedocs.io/en/stable/user-guide/v3_migration.html#store-import-paths)
  2. Please add a line to the reproducer that demonstrates the slow step. It seems that your reproducer only creates the zarr but does not use it.

@rabernat
Copy link
Contributor

This code does not work for me with Zarr Python 3.0.8 for several reasons. What version of Zarr are you using?

But most critically, this code

for i in range(0, shape[0], chunks[0]):
    band[i:i+chunks[0]] = np.random.randint(0, 65535, (chunks[0],) + chunks[1:], dtype="uint16")

seems impossible. You're trying to assign to a region of shape (24, 8133, 8130) with an array of shape (24,512,512).

I would also not both using Zstd together with PCodec. Just PCodec is enough. Zstd does not buy you any additional compression.

@mickyals
Copy link
Author

mickyals commented May 23, 2025

Honestly sorry about that, the full pipeline is very long so I tried to summarised the 400 lines to a simpler method with deep seek to make it simpler for others to understand as i'm not great at articulation of issues. Here is the version from my own code.

some variables are not shown at initialization but are mentioned in the code snippet but their absence should not corrupt the overall logical flow of reading the code

def initialize_zarr_store(store_name: str, metadata: dict):
        """Initialize Zarr storage with metadata"""
        zarr_store = zarr.storage.LocalStore(store_name) # base store
        root_group = zarr.open_group(store=self.zarr_store) # base group

# not very relevant to the issue
        for k, v in metadata.items(): 
            root_group.attrs[k]= v


    def process_band(self, band, ds, chunks, mask, shards):
        da = ds[band]
        ds_regrid = self.regridder(da).chunk(chunks)

        # Scale and offset values due to GOES have somewhat dynamic scale and offsets per file
        scale_factor = config.UNIVERSAL_BAND_METADATA[band]['scale_factor']
        add_offset = config.UNIVERSAL_BAND_METADATA[band]['add_offset']
        ds_regrid = np.rint((ds_regrid.values - add_offset) / scale_factor)
        # Apply mask: set values to UINT16_MAX where mask is True
        ds_regrid = np.where(mask, ds_regrid, UINT16_MAX)
        
        #check for the presence of current band store
        if band in root_group:
            za = zarr.open_array(self.zarr_store, path=band)
            za.append(ds_regrid)
        else:
            za = zarr.create_array(zarr_store, name=band, shape=ds_regrid.shape, dtype=np.uint16,
                                    attributes=config.UNIVERSAL_BAND_METADATA[band],
                                    chunks=chunks, dimension_names=['t', 'lat', 'lon'],
                                    compressors=compressors, serializer=serializer, shards=shards)
            za[:] = ds_regrid

     def process_files(file_paths: list[str], store_name: str, target_grid_file: str,
                      regridder_weights: str = None, satellite: str = 'west',
                      chunks: tuple[int] = (24, 512, 512), shards: int = None,
                      dt_units: str = "seconds since 2000-01-01T12:00:00", dt_calendar: str = "proleptic_gregorian"):

        # Initialize Zarr store
        initialize_zarr_store(store_name, config.GOES18_METADATA if 'west' in satellite else config.GOES16_METADATA)

        # Create coordinate arrays
        if 'lat' not in self.root_group:
            lat = zarr.create_array(zarr_store, name='lat', shape=sample_ds['lat'].values.shape, dtype=np.float32,
                                    attributes=config.GOES18_LAT_LON_METADATA['lat'] if 'west' in satellite else   config.GOES16_LAT_LON_METADATA['lat'],
                                    dimension_names=['lat'])
            lat[:] = sample_ds['lat'].values

        if 'lon' not in self.root_group:
            lon = zarr.create_array(zarr_store, name='lon', shape=sample_ds['lon'].values.shape, dtype=np.float32,
                                   attributes=config.GOES18_LAT_LON_METADATA['lon'] if 'west' in satellite else config.GOES16_LAT_LON_METADATA['lon'],
                                   dimension_names=['lon'])
            lon[:] = sample_ds['lon'].values

         batch_size = chunks[0]
        # Process in batches
        # iterate through each batch
        for i in range(0, len(file_paths), batch_size):
            # Open and concatenate files
            batch_end = min(len(file_paths), i + batch_size

            ds = xr.open_mfdataset(
                file_paths[i: batch_end],
                combine="nested",
                concat_dim="t",
                chunks='auto',
                drop_variables=config.UNUSED_DS_VARIABLES
            )

            # Encode time values
            encoded_time, _, _ = xr.coding.times.encode_cf_datetime(ds['t'].values, units=dt_units,
                                                                    calendar=dt_calendar, dtype=np.float64)
            # Write time values to Zarr
            if 't' in root_group:
                ta = zarr.open_array(self.zarr_store, path="t")
                ta.append(encoded_time)
            else:
                ta = zarr.create_array(zarr_store, name="t", shape=encoded_time.shape, dtype=np.float64,
                                       attributes={"units": dt_units, "calendar": dt_calendar, "standard_name": "time"},
                                       dimension_names=['t'])
                ta[:] = encoded_time

            # Process bands using concurrent.futures to optimize up processing of bands
            with concurrent.futures.ThreadPoolExecutor() as executor:
                executor.map(partial(process_band, ds=ds, chunks=chunks, mask=mask, shards=shards),
                             config.BANDS
        # Consolidate metadata
        zarr.consolidate_metadata(zarr_store)

I suspect the issue stems from the size of the dataset and chunking selection. Opting for 24 in the time dimension since higher values resulted in less efficient compression and 512x512 for lat and lon since our experiments showed spatial dimensions impacted compression the least. The initial focus was simply not ballooning the size of the GOES NetCDFs when regridding to lat/lon grids from geo projection. I'd like to make this data publicly available to access given the current GOES data comes as discrete nc files but this is hindered by the slowness of processes on the data.

# Thredds stored copy for dev access,
# other version stored locally on device

goes2023= xr.open_zarr("https://redoak.cs.toronto.edu/twitcher/ows/proxy/thredds/fileServer/datasets/WORLDCLIM/test_worldclim.zarr")
goes2023

# account for prime meridian
goes2023 = goes2023.assign_coords(
    lon=((goes2023.lon + 180) % 360 - 180)
)

#calculate rolling mean
roll_mean = goes2023.rolling(t=10, center=True).mean()
roll_mean
CPU times: user 1min 31s, sys: 162 ms, total: 1min 31s
Wall time: 1min 31s

# rolling mean calculation


# prep for video of single band for a year
%%time 
var = 'CMI_C07'
data = goes2023[var].compute()
lat = goes2023.coords['lat']
lon = goes2023.coords['lon']
time = goes2023.coords['t']
vmin, vmax = data.values.min()), data.values.max())

​

IOStream.flush timed out
IOStream.flush timed out

# timed out without receiving timing.

Also the compression combinations used proved to be significant reducers to overall storage footprint, without PCodec the size balloons and the same though to a lesser extent with zstd. This specific combination was the only one I found that kept the final dataset size at a comparable level to the orginal combined nc file size

Again really sorry if I confused things more, working mostly on my own here for this project and been working with zarr for less than 3 months. Let me know if more detail is needed @ianhi @rabernat

@ianhi
Copy link
Contributor

ianhi commented May 23, 2025

@mickyals It's quite difficult to help if we aren't able to run the code. That means that in the example, every variable the code uses needs to be defined. Please also check that it runs on its own and reproduces the error. A great way to do this is to define all the script's dependencies according to pep-723 (https://peps.python.org/pep-0723/#example).

See also: https://stackoverflow.com/help/minimal-reproducible-example

In your smaller code snippet I can open the zarr (which is very helpful for debugging!) but the assign_coords line fails with AttributeError: 'Dataset' object has no attribute 'lon'.?

Please make sure that the script runs on it's own without other file, does so from a fresh python environment, and demonstrates the slowness. Then we can help narrow down the issue.

@mickyals
Copy link
Author

@ianhi

Thank you so much for that. Below is the code to generate a dummy version of another dataset I encountered a relatively drastic change between executions times tiff vs zarr in this case. The original dataset, worldclim 2.1 comes as discrete geotiffs.

# Generation of a dummy version of a single world clim variable
def generate_dummy_data(height: int = 21600, width: int = 43200):
    
    data = np.random.normal(loc=0.0, scale=30.0, size=(height, width))
    data = np.clip(data, -100, 100).astype(np.int16)
    
    y = np.linspace(-90, 90, height).astype(np.float64)
    x = np.linspace(-180, 180, width).astype(np.float64)
    
    ds = xr.Dataset(
    data_vars={
        "dummy_var": (("y", "x"), data)
    },
    coords={
        "y": y,
        "x": x
    },
    attrs={
        "description": "Synthetic dataset for testing reflecting the smaller worldclim dataset for single variable",
        "units": "arbitrary"
    })
    
    return ds

#--------------
# Main zarr conversion logic used in all works so far
#------------

# initialise the store
def initialize_zarr_store(store_name: str):
    zarr_store = zarr.storage.LocalStore(store_name)
    root_group = zarr.open_group(store=zarr_store)
    
    return zarr_store, root_group


# process a single variable of a multi variable dataset
def process_variable(ds, time_steps, t, chunks, zarr_store, root_group):
    
    var_name = list(ds.data_vars.keys())[0]
    data = ds[var_name].values
    y_len, x_len = data.shape
    attrs = ds.attrs

    if var_name in root_group:
        za = zarr.open_array(zarr_store, path=var_name)
        za[t, :, :] = data

    else:
        za = zarr.create_array(zarr_store,
                                name=var_name,
                                shape=(time_steps, y_len, x_len),
                                dtype = np.float32, 
                                attributes = attrs, 
                                fill_value=-999,
                                chunks=chunks,
                                dimension_names=['t', 'y', 'x'],
                                compressors=zarr.codecs.ZstdCodec(level=9),
                                serializer=PCodec(level=9)
                                  )
        za[t, :, :] = data

# processing all files and variables
def process_all(time_steps: int, store_name: str, chunks: tuple[int]):
    
    # Use any example GeoTIFF to extract x/y coordinates
    first = generate_dummy_data()
    
    
    zarr_store, root_group = initialize_zarr_store(store_name)
    
    if 'x' not in root_group:
        x_coords = first.coords['x'].values
        zarr.create_array(
            store=zarr_store,
            name='x',
            shape=x_coords.shape,
            dtype=x_coords.dtype,
            dimension_names=['x']
        )[:] = x_coords
        
    if 'y' not in root_group:
        y_coords = first.coords['y'].values
        zarr.create_array(
            store=zarr_store,
            name='y',
            shape=y_coords.shape,
            dtype=y_coords.dtype,
            dimension_names=['y']
        )[:] = y_coords
    
    if 't' not in root_group:
        t_vals = np.arange(1, time_steps+1, dtype=np.int32)
        zarr.create_array(store=zarr_store,
                          name='t',
                          shape=(time_steps,),
                          dtype=t_vals.dtype,
                          dimension_names=['t'])[...] = t_vals
        
        
    # Process each variable independently
    for t in range(time_steps):
        ds = generate_dummy_data()
        process_variable(ds, time_steps, t, chunks, zarr_store, root_group) 
        
    zarr.consolidate_metadata(zarr_store)

# process the files into zarr store with desired chunk size
process_all(12,'dummy_var.zarr', (1,1024,1024))

The above code converts discrete files fine and is not the issue here. The issue arises for example when attempting to plot variables from the newly created zarr store.

# in jupyter
%%timeit

# opening zarr - this is also fine
tmp = xr.open_zarr('dummy_var.zarr')
tmp

>>> 15.9 ms ± 748 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

#attempting to plot for example in zarr
%%time
tmp.dummy_var.sel(t=1).plot()
plt.show()

>>> CPU times: user 4min 44s, sys: 14.9 s, total: 4min 59s
>>> Wall time: 4min 50s

#attempting to plot for example in as non-zarr
ds = generate_dummy_data()

%%time 
ds.dummy_var.plot()
plt.show()

The question really is to understand whether there is anything from the zarr end of my pipeline that I can do to improve the speed of execution of downstream operations like rendering the plots which takes the longest but there is still pretty slow execution after attempting precomputation in an attempt to speed up other tasks

var = 'CMI_C07'
data = goes2023[var].compute()

From the goes dataset which can be recreated from the above code by changing height=8133, width=8130 and timesteps=8741 but given storage constraints, I do not recommend building a dummy version of GOES.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants