-
Notifications
You must be signed in to change notification settings - Fork 7
Granule to STAC Item / Kerchunk Script #32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
4d3280b
5c1d97e
8fe0e88
46b5372
dfd995b
4e3bd2b
81bff49
d46f64a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,200 @@ | ||
import base64 | ||
import functools | ||
import copy | ||
import warnings | ||
from io import BytesIO | ||
from pathlib import Path | ||
from typing import Any, Dict | ||
|
||
import click | ||
import fsspec | ||
import geopandas as gpd | ||
import numpy as np | ||
import pandas as pd | ||
import pyproj | ||
import shapely | ||
import json | ||
import xstac | ||
import zarr | ||
from kerchunk.hdf import SingleHdf5ToZarr | ||
from netCDF4 import Dataset | ||
|
||
|
||
STAC_TEMPLATE = { | ||
"type": "Feature", | ||
"stac_version": "1.0.0", | ||
"properties": {}, | ||
"links": [], | ||
"assets": {}, | ||
"stac_extensions": ["https://stac-extensions.github.io/datacube/v2.2.0/schema.json"] | ||
} | ||
|
||
|
||
def refs_from_granule(url: str, s3_url: str = None) -> Dict[str, Any]: | ||
"""Generate references from a single granule. Data is loaded directly | ||
into memory first.""" | ||
if s3_url is None: | ||
s3_url = url | ||
data = fsspec.open(url).open().read() | ||
with warnings.catch_warnings(): | ||
warnings.simplefilter("ignore") | ||
refs = SingleHdf5ToZarr(BytesIO(data), s3_url).translate() | ||
nc = Dataset("t", memory=data) | ||
attrs = json.loads(refs["refs"][".zattrs"]) | ||
# Inline coordinate arrays directly using delta encoding, this way | ||
# we don"t need to read them directly from netCDF in the future | ||
for c in ["x", "y"]: | ||
val = nc[c][:] | ||
z = zarr.array(val, compressor=zarr.Blosc(cname="zstd", clevel=5), | ||
filters=[zarr.Delta(val.dtype)]) | ||
refs["refs"][f"{c}/.zarray"] = z.store[".zarray"].decode() | ||
refs["refs"][f"{c}/0"] = "base64:" + base64.b64encode(z.store["0"]).decode() | ||
# Kerchunk can"t handle these structs properly so these are converted to | ||
# global attributes | ||
for variable in ["mapping", "img_pair_info"]: | ||
if variable in nc.variables: | ||
obj = nc[variable] | ||
for attr in obj.ncattrs(): | ||
val = getattr(obj, attr) | ||
if hasattr(val, "item"): | ||
val = val.item() | ||
attrs[attr] = val | ||
del refs["refs"][variable+"/0"] | ||
del refs["refs"][variable+"/.zattrs"] | ||
del refs["refs"][variable+"/.zarray"] | ||
Comment on lines
+54
to
+64
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we inject ...
"mapping": {grid_mapping_keys...},
... In conversation with the MPI-BGC folks who also use quite a lot of geo-Zarr data, this is the format they've settled on and it's the most compliant with the CF spec that we can get. In NetCDF, because you can't have nested metadata attributes, these were stored in empty variables. But Zarr doesn't have the concept of empty variables, so a single top-level key holding a JSON dictionary is as close as we can get. Given that each file has a single CRS, we may also want to have a top-level |
||
|
||
refs["refs"][".zattrs"] = json.dumps(attrs) | ||
return refs | ||
|
||
|
||
class DuckVariable: | ||
"""Simple mock xarray variable""" | ||
def __init__(self, name, attrs, chunks, dims, shape): | ||
self.name = name | ||
self.attrs = attrs | ||
self.chunks = chunks | ||
self.dims = dims | ||
self.shape = shape | ||
|
||
@property | ||
def chunksize(self): | ||
return self.chunks | ||
|
||
@property | ||
def data(self): | ||
return self | ||
|
||
class DuckDataset: | ||
"""Mock lightweight dataset object from kerchunk""" | ||
def __init__(self, refs: Dict[str, Any]): | ||
r = refs["refs"] | ||
vnames = [k.split("/")[0] for k in r.keys() if "/" in k] | ||
self.variables = {} | ||
self.dims = ["y", "x"] | ||
self.coords = self.dims | ||
for v in vnames: | ||
attrs = json.loads(r.get(f"{v}/.zattrs", "{}")) | ||
zarray = json.loads(r[f"{v}/.zarray"]) | ||
shape = zarray["shape"] | ||
chunks = zarray["chunks"] | ||
dims = attrs.get("_ARRAY_DIMENSIONS") | ||
if dims is None: | ||
continue | ||
dims = dict(zip(dims, zarray["shape"])) | ||
self.variables[v] = DuckVariable(v, attrs, chunks, dims, shape) | ||
|
||
def __getitem__(self, attr: str): | ||
return self.variables[attr] | ||
|
||
def __getattr__(self, attr: str): | ||
if attr in self.variables: | ||
return self.variables[attr] | ||
return super().__getattr__(attr) | ||
|
||
|
||
def coord_from_ref(ref: Dict[str, Any], dim: str): | ||
data = ref["refs"][f"{dim}/0"][6:] | ||
blosc = zarr.Blosc("zstd") | ||
delta = zarr.Delta(dtype=np.float64) | ||
decoders = [base64.b64decode, blosc.decode, delta.decode] | ||
for decode in decoders: | ||
data = decode(data) | ||
return data | ||
|
||
|
||
def geom_from_refs(refs: Dict[str, Any]): | ||
def ensure_scalar(g): | ||
if isinstance(g, tuple): | ||
return g[0] | ||
return g | ||
crs = [] | ||
n = len(refs) | ||
bbox = dict(xmin=np.empty(n), ymin=np.empty(n), | ||
xmax=np.empty(n), ymax=np.empty(n)) | ||
for i, ref in enumerate(refs.values()): | ||
attrs = json.loads(ref["refs"][".zattrs"]) | ||
crs.append(attrs["crs_wkt"]) | ||
coords = {dim: coord_from_ref(ref, dim) for dim in ["x", "y"]} | ||
for stat in ["min", "max"]: | ||
for dim, coord in coords.items(): | ||
bbox[dim+stat][i] = getattr(coord, stat)() | ||
geoms = pd.Series(shapely.box(**bbox), index=list(refs.keys())) | ||
geoms = pd.concat([gpd.GeoSeries(geom, crs=ensure_scalar(g)).to_crs(epsg=4326) | ||
for g, geom in geoms.groupby(crs)]) | ||
return geoms | ||
|
||
|
||
def refs_to_stac_item(ref: Dict[str, Any]) -> Dict[str, Any]: | ||
"""Convert kerchunk references JSON to STAC item JSON""" | ||
@functools.lru_cache(maxsize=None) | ||
def _crs_json_from_epsg(epsg): | ||
return pyproj.CRS.from_epsg(epsg).to_json_dict() | ||
uri = ref["refs"]["v/0.0"][0] | ||
if not uri.startswith("s3://"): | ||
uri = "s3://" + uri | ||
name = Path(uri).name | ||
poly = geom_from_refs({name: ref}).values[0] | ||
attrs = json.loads(ref["refs"][".zattrs"]) | ||
epsg = attrs["spatial_epsg"] | ||
time = attrs["date_center"] | ||
# dateutil can"t parse ISO times ending in "." | ||
if time.endswith("."): | ||
time += "0" | ||
template = copy.deepcopy(STAC_TEMPLATE) | ||
template["properties"]["datetime"] = time | ||
template["id"] = name | ||
template["assets"] = {name: {"href": uri}} | ||
reference_system = _crs_json_from_epsg(epsg) | ||
|
||
x = coord_from_ref(ref, "x") | ||
y = coord_from_ref(ref, "y") | ||
item = xstac.xarray_to_stac(DuckDataset(ref), | ||
template, | ||
temporal_dimension=False, | ||
x_dimension="x", | ||
y_dimension="y", | ||
validate=False, | ||
reference_system=reference_system, | ||
x_step=x[1]-x[0], | ||
y_step=y[1]-y[0], | ||
y_extent=[y.min(), y.max()], | ||
x_extent=[x.min(), x.max()]) | ||
|
||
item.geometry = json.loads(shapely.to_geojson(poly)) | ||
item.bbox = list(poly.bounds) | ||
return item.to_dict() | ||
|
||
|
||
@click.command() | ||
@click.argument("path") | ||
@click.option("--s3-path", type=str, default=None) | ||
def make_granule_stac_kerchunk(path: str, s3_path: str): | ||
refs = refs_from_granule(path, s3_path) | ||
stac = refs_to_stac_item(refs) | ||
out_path = Path.cwd() / Path(path).name | ||
out_path.with_suffix('.refs.json').write_text(json.dumps(refs, indent=2)) | ||
out_path.with_suffix('.stac.json').write_text(json.dumps(stac, indent=2)) | ||
|
||
|
||
if __name__ == '__main__': | ||
make_granule_stac_kerchunk() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
import pyarrow as pa | ||
import ujson | ||
import zarr | ||
from typing import Sequence, Any, Dict, Tuple | ||
from collections.abc import MutableMapping | ||
|
||
class MultiRefs(MutableMapping): | ||
def __init__(self, mrefs: Dict[str, Dict[str, Any]], is_zarr=False): | ||
self.mrefs = mrefs | ||
self.is_zarr = is_zarr | ||
|
||
def _trans_key(self, key: str) -> Tuple[str, str]: | ||
kdata = key.split("/") | ||
pref = kdata[0] | ||
rkey = "/".join(kdata[1:]) | ||
return pref, rkey | ||
|
||
def __getitem__(self, key: str): | ||
if self.is_zarr and key == ".zgroup": | ||
return '{"zarr_format":2}' | ||
elif self.is_zarr and key.startswith("."): | ||
return self.mrefs[key] | ||
pref, rkey = self._trans_key(key) | ||
return self.mrefs[pref]["refs"][rkey] | ||
|
||
def __setitem__(self, key: str, value): | ||
if self.is_zarr and key.startswith("."): | ||
self.mrefs[key] = value | ||
else: | ||
pref, rkey = self._trans_key(key) | ||
self.mrefs[pref]["refs"][rkey] = value | ||
|
||
def __delitem__(self, key: str): | ||
pref, rkey = self._trans_key(key) | ||
del self.mrefs[pref]["refs"][rkey] | ||
|
||
def __iter__(self): | ||
for pref, refs in self.mrefs.items(): | ||
if not hasattr(refs, "__len__"): | ||
yield refs | ||
for rkey in refs["refs"]: | ||
yield pref + "/" + rkey | ||
if self.is_zarr: | ||
yield ".zgroup" | ||
|
||
def __len__(self): | ||
size = sum([len(refs) for refs in self.mrefs.values()]) | ||
if self.is_zarr: | ||
size += 1 | ||
return size | ||
|
||
class ConcLRUStoreCache(zarr.storage.LRUStoreCache): | ||
def getitems(self, keys: Sequence[str], contexts) -> Dict[str, Any]: | ||
cached_keys = set(keys).intersection(self._values_cache.keys()) | ||
uncached_keys = set(keys).difference(cached_keys) | ||
items = super().getitems(cached_keys, contexts=contexts) | ||
if uncached_keys: | ||
uncached_items = self.base_store.getitems( | ||
list(uncached_keys), | ||
contexts=contexts | ||
) | ||
for key, value in uncached_items.items(): | ||
if key not in self._values_cache: | ||
self._cache_value(key, value) | ||
self.hits += len(uncached_items) | ||
self.misses += len(keys) - len(uncached_items) | ||
items.update(uncached_items) | ||
return items | ||
|
||
@property | ||
def base_store(self): | ||
return self._store | ||
|
||
@base_store.setter | ||
def base_store(self, store: MutableMapping): | ||
self._store = store | ||
|
||
|
||
def expand_struct_column(table: pa.Table, column: str) -> pa.Table: | ||
keys = column.split(".") | ||
for key in keys: | ||
table = pa.Table.from_struct_array(table[key]) | ||
return table | ||
|
||
|
||
def stac_to_kerchunk( | ||
cube_stac: Dict[str, Any], | ||
dim_stac: Dict[str, Any] | ||
) -> Dict[str, Dict[str, Any]]: | ||
out = dict(version=1, refs={}) | ||
refs = out["refs"] | ||
refs[".zgroup"] = '{"zarr_format": 2}' | ||
for stac in [cube_stac, dim_stac]: | ||
for v, r in stac.items(): | ||
refs[f"{v}/.zarray"] = r["kerchunk:zarray"] | ||
refs[f"{v}/.zattrs"] = r["kerchunk:zattrs"] | ||
kv = r["kerchunk:value"] | ||
for i, key in enumerate(kv["key"]): | ||
ckey = f"{v}/{key}" | ||
if kv["path"][i] is not None: | ||
refs[ckey] = [ | ||
"s3://"+kv["path"][i], | ||
kv["offset"][i], | ||
kv["size"][i] | ||
] | ||
else: | ||
refs[ckey] = kv["raw"][i] | ||
return out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.