Skip to content

Granule to STAC Item / Kerchunk Script #32

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96,800 changes: 96,800 additions & 0 deletions src/kerchunk/data/urls.txt

Large diffs are not rendered by default.

200 changes: 200 additions & 0 deletions src/kerchunk/gran_to_cat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
import base64
import functools
import copy
import warnings
from io import BytesIO
from pathlib import Path
from typing import Any, Dict

import click
import fsspec
import geopandas as gpd
import numpy as np
import pandas as pd
import pyproj
import shapely
import json
import xstac
import zarr
from kerchunk.hdf import SingleHdf5ToZarr
from netCDF4 import Dataset


STAC_TEMPLATE = {
"type": "Feature",
"stac_version": "1.0.0",
"properties": {},
"links": [],
"assets": {},
"stac_extensions": ["https://stac-extensions.github.io/datacube/v2.2.0/schema.json"]
}


def refs_from_granule(url: str, s3_url: str = None) -> Dict[str, Any]:
"""Generate references from a single granule. Data is loaded directly
into memory first."""
if s3_url is None:
s3_url = url
data = fsspec.open(url).open().read()
with warnings.catch_warnings():
warnings.simplefilter("ignore")
refs = SingleHdf5ToZarr(BytesIO(data), s3_url).translate()
nc = Dataset("t", memory=data)
attrs = json.loads(refs["refs"][".zattrs"])
# Inline coordinate arrays directly using delta encoding, this way
# we don"t need to read them directly from netCDF in the future
for c in ["x", "y"]:
val = nc[c][:]
z = zarr.array(val, compressor=zarr.Blosc(cname="zstd", clevel=5),
filters=[zarr.Delta(val.dtype)])
refs["refs"][f"{c}/.zarray"] = z.store[".zarray"].decode()
refs["refs"][f"{c}/0"] = "base64:" + base64.b64encode(z.store["0"]).decode()
# Kerchunk can"t handle these structs properly so these are converted to
# global attributes
Comment on lines +52 to +53

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Kerchunk can"t handle these structs properly so these are converted to
# global attributes
# Kerchunk can"t handle these structs properly so we convert them to
# global attributes

for variable in ["mapping", "img_pair_info"]:
if variable in nc.variables:
obj = nc[variable]
for attr in obj.ncattrs():
val = getattr(obj, attr)
if hasattr(val, "item"):
val = val.item()
attrs[attr] = val
del refs["refs"][variable+"/0"]
del refs["refs"][variable+"/.zattrs"]
del refs["refs"][variable+"/.zarray"]
Comment on lines +54 to +64

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we inject mapping into a dictionary, so the top-level metadata looks like:

...
"mapping": {grid_mapping_keys...},
...

In conversation with the MPI-BGC folks who also use quite a lot of geo-Zarr data, this is the format they've settled on and it's the most compliant with the CF spec that we can get. In NetCDF, because you can't have nested metadata attributes, these were stored in empty variables. But Zarr doesn't have the concept of empty variables, so a single top-level key holding a JSON dictionary is as close as we can get.

Given that each file has a single CRS, we may also want to have a top-level spatial_epsg or spatial_ref key as well, just for safety's sake.


refs["refs"][".zattrs"] = json.dumps(attrs)
return refs


class DuckVariable:
"""Simple mock xarray variable"""
def __init__(self, name, attrs, chunks, dims, shape):
self.name = name
self.attrs = attrs
self.chunks = chunks
self.dims = dims
self.shape = shape

@property
def chunksize(self):
return self.chunks

@property
def data(self):
return self

class DuckDataset:
"""Mock lightweight dataset object from kerchunk"""
def __init__(self, refs: Dict[str, Any]):
r = refs["refs"]
vnames = [k.split("/")[0] for k in r.keys() if "/" in k]
self.variables = {}
self.dims = ["y", "x"]
self.coords = self.dims
for v in vnames:
attrs = json.loads(r.get(f"{v}/.zattrs", "{}"))
zarray = json.loads(r[f"{v}/.zarray"])
shape = zarray["shape"]
chunks = zarray["chunks"]
dims = attrs.get("_ARRAY_DIMENSIONS")
if dims is None:
continue
dims = dict(zip(dims, zarray["shape"]))
self.variables[v] = DuckVariable(v, attrs, chunks, dims, shape)

def __getitem__(self, attr: str):
return self.variables[attr]

def __getattr__(self, attr: str):
if attr in self.variables:
return self.variables[attr]
return super().__getattr__(attr)


def coord_from_ref(ref: Dict[str, Any], dim: str):
data = ref["refs"][f"{dim}/0"][6:]
blosc = zarr.Blosc("zstd")
delta = zarr.Delta(dtype=np.float64)
decoders = [base64.b64decode, blosc.decode, delta.decode]
for decode in decoders:
data = decode(data)
return data


def geom_from_refs(refs: Dict[str, Any]):
def ensure_scalar(g):
if isinstance(g, tuple):
return g[0]
return g
crs = []
n = len(refs)
bbox = dict(xmin=np.empty(n), ymin=np.empty(n),
xmax=np.empty(n), ymax=np.empty(n))
for i, ref in enumerate(refs.values()):
attrs = json.loads(ref["refs"][".zattrs"])
crs.append(attrs["crs_wkt"])
coords = {dim: coord_from_ref(ref, dim) for dim in ["x", "y"]}
for stat in ["min", "max"]:
for dim, coord in coords.items():
bbox[dim+stat][i] = getattr(coord, stat)()
geoms = pd.Series(shapely.box(**bbox), index=list(refs.keys()))
geoms = pd.concat([gpd.GeoSeries(geom, crs=ensure_scalar(g)).to_crs(epsg=4326)
for g, geom in geoms.groupby(crs)])
return geoms


def refs_to_stac_item(ref: Dict[str, Any]) -> Dict[str, Any]:
"""Convert kerchunk references JSON to STAC item JSON"""
@functools.lru_cache(maxsize=None)
def _crs_json_from_epsg(epsg):
return pyproj.CRS.from_epsg(epsg).to_json_dict()
uri = ref["refs"]["v/0.0"][0]
if not uri.startswith("s3://"):
uri = "s3://" + uri
name = Path(uri).name
poly = geom_from_refs({name: ref}).values[0]
attrs = json.loads(ref["refs"][".zattrs"])
epsg = attrs["spatial_epsg"]
time = attrs["date_center"]
# dateutil can"t parse ISO times ending in "."
if time.endswith("."):
time += "0"
template = copy.deepcopy(STAC_TEMPLATE)
template["properties"]["datetime"] = time
template["id"] = name
template["assets"] = {name: {"href": uri}}
reference_system = _crs_json_from_epsg(epsg)

x = coord_from_ref(ref, "x")
y = coord_from_ref(ref, "y")
item = xstac.xarray_to_stac(DuckDataset(ref),
template,
temporal_dimension=False,
x_dimension="x",
y_dimension="y",
validate=False,
reference_system=reference_system,
x_step=x[1]-x[0],
y_step=y[1]-y[0],
y_extent=[y.min(), y.max()],
x_extent=[x.min(), x.max()])

item.geometry = json.loads(shapely.to_geojson(poly))
item.bbox = list(poly.bounds)
return item.to_dict()


@click.command()
@click.argument("path")
@click.option("--s3-path", type=str, default=None)
def make_granule_stac_kerchunk(path: str, s3_path: str):
refs = refs_from_granule(path, s3_path)
stac = refs_to_stac_item(refs)
out_path = Path.cwd() / Path(path).name
out_path.with_suffix('.refs.json').write_text(json.dumps(refs, indent=2))
out_path.with_suffix('.stac.json').write_text(json.dumps(stac, indent=2))


if __name__ == '__main__':
make_granule_stac_kerchunk()
108 changes: 108 additions & 0 deletions src/kerchunk/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import pyarrow as pa
import ujson
import zarr
from typing import Sequence, Any, Dict, Tuple
from collections.abc import MutableMapping

class MultiRefs(MutableMapping):
def __init__(self, mrefs: Dict[str, Dict[str, Any]], is_zarr=False):
self.mrefs = mrefs
self.is_zarr = is_zarr

def _trans_key(self, key: str) -> Tuple[str, str]:
kdata = key.split("/")
pref = kdata[0]
rkey = "/".join(kdata[1:])
return pref, rkey

def __getitem__(self, key: str):
if self.is_zarr and key == ".zgroup":
return '{"zarr_format":2}'
elif self.is_zarr and key.startswith("."):
return self.mrefs[key]
pref, rkey = self._trans_key(key)
return self.mrefs[pref]["refs"][rkey]

def __setitem__(self, key: str, value):
if self.is_zarr and key.startswith("."):
self.mrefs[key] = value
else:
pref, rkey = self._trans_key(key)
self.mrefs[pref]["refs"][rkey] = value

def __delitem__(self, key: str):
pref, rkey = self._trans_key(key)
del self.mrefs[pref]["refs"][rkey]

def __iter__(self):
for pref, refs in self.mrefs.items():
if not hasattr(refs, "__len__"):
yield refs
for rkey in refs["refs"]:
yield pref + "/" + rkey
if self.is_zarr:
yield ".zgroup"

def __len__(self):
size = sum([len(refs) for refs in self.mrefs.values()])
if self.is_zarr:
size += 1
return size

class ConcLRUStoreCache(zarr.storage.LRUStoreCache):
def getitems(self, keys: Sequence[str], contexts) -> Dict[str, Any]:
cached_keys = set(keys).intersection(self._values_cache.keys())
uncached_keys = set(keys).difference(cached_keys)
items = super().getitems(cached_keys, contexts=contexts)
if uncached_keys:
uncached_items = self.base_store.getitems(
list(uncached_keys),
contexts=contexts
)
for key, value in uncached_items.items():
if key not in self._values_cache:
self._cache_value(key, value)
self.hits += len(uncached_items)
self.misses += len(keys) - len(uncached_items)
items.update(uncached_items)
return items

@property
def base_store(self):
return self._store

@base_store.setter
def base_store(self, store: MutableMapping):
self._store = store


def expand_struct_column(table: pa.Table, column: str) -> pa.Table:
keys = column.split(".")
for key in keys:
table = pa.Table.from_struct_array(table[key])
return table


def stac_to_kerchunk(
cube_stac: Dict[str, Any],
dim_stac: Dict[str, Any]
) -> Dict[str, Dict[str, Any]]:
out = dict(version=1, refs={})
refs = out["refs"]
refs[".zgroup"] = '{"zarr_format": 2}'
for stac in [cube_stac, dim_stac]:
for v, r in stac.items():
refs[f"{v}/.zarray"] = r["kerchunk:zarray"]
refs[f"{v}/.zattrs"] = r["kerchunk:zattrs"]
kv = r["kerchunk:value"]
for i, key in enumerate(kv["key"]):
ckey = f"{v}/{key}"
if kv["path"][i] is not None:
refs[ckey] = [
"s3://"+kv["path"][i],
kv["offset"][i],
kv["size"][i]
]
else:
refs[ckey] = kv["raw"][i]
return out
Loading