-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathcopy_to_zarr.py
More file actions
148 lines (123 loc) · 4.34 KB
/
copy_to_zarr.py
File metadata and controls
148 lines (123 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"""
Copy an HDF5 file (or group) into a Zarr store.
Examples
--------
# Basic usage (defaults to LZ4 compression)
python copy_to_zarr.py input.h5 output.zarr
# Overwrite an existing Zarr store
python copy_to_zarr.py input.h5 output.zarr --overwrite
# Choose compressor
python copy_to_zarr.py input.h5 output.zarr --compressor zstd --clevel 7 --shuffle bit
"""
import argparse
import sys
import h5py
import zarr
try:
from numcodecs import Blosc
except Exception:
Blosc = None
def make_compressor(name: str, clevel: int, shuffle: str):
"""
Build a numcodecs compressor from args.
Returns None if name == 'none'.
"""
name = name.lower()
shuffle = shuffle.lower()
if name == "none":
return None
if Blosc is None:
print(
"[warn] numcodecs.Blosc not available; proceeding with no compression.",
file=sys.stderr,
)
return None
cname = {"lz4": "lz4", "zstd": "zstd", "zlib": "zlib"}.get(name)
if cname is None:
raise ValueError(f"Unsupported compressor: {name} (choose: none, lz4, zstd, zlib)")
shuffle_map = {"none": Blosc.NOSHUFFLE, "byte": Blosc.SHUFFLE, "bit": Blosc.BITSHUFFLE}
if shuffle not in shuffle_map:
raise ValueError("shuffle must be one of: none, byte, bit")
return Blosc(cname=cname, clevel=int(clevel), shuffle=shuffle_map[shuffle])
def copy_attrs(src, dst):
"""Copy HDF5 attributes to Zarr object."""
for k, v in src.attrs.items():
try:
dst.attrs[k] = v
except Exception as e:
# Attribute types can occasionally be problematic; skip with a note.
print(f"[warn] Skipping attribute {k!r}: {e}", file=sys.stderr)
def copy_h5_to_zarr(h5_group: h5py.Group, zarr_group: zarr.Group, compressor=None):
"""
Recursively copy data from an HDF5 group to a Zarr group.
"""
# Copy group-level attributes
copy_attrs(h5_group, zarr_group)
for key in h5_group:
item = h5_group[key]
if isinstance(item, h5py.Group):
# Create corresponding group in Zarr and recurse
zarr_subgroup = zarr_group.create_group(key)
copy_h5_to_zarr(item, zarr_subgroup, compressor=compressor)
elif isinstance(item, h5py.Dataset):
# Create dataset in Zarr. Reuse HDF5 chunking if present.
chunks = item.chunks # may be None
# Read data; for very large datasets this loads into memory.
data = item[()]
zds = zarr_group.create_dataset(
name=key,
data=data,
chunks=chunks,
compressor=compressor,
)
# Copy dataset attributes
copy_attrs(item, zds)
else:
print(f"[warn] Unknown item type: {key} ({type(item)})", file=sys.stderr)
def parse_args():
p = argparse.ArgumentParser(
description="Recursively copy an HDF5 file into a Zarr store."
)
p.add_argument("input_h5", help="Path to input HDF5 file")
p.add_argument("output_zarr", help="Path to output Zarr store (directory or .zarr)")
p.add_argument(
"--overwrite",
action="store_true",
help="Overwrite the output Zarr store if it exists",
)
p.add_argument(
"--compressor",
default="lz4",
choices=["none", "lz4", "zstd", "zlib"],
help="Compressor to use for Zarr datasets (default: lz4)",
)
p.add_argument(
"--clevel",
type=int,
default=5,
help="Compression level for Blosc compressors (default: 5)",
)
p.add_argument(
"--shuffle",
default="bit",
choices=["none", "byte", "bit"],
help="Shuffle mode for Blosc (default: bit)",
)
return p.parse_args()
def main():
args = parse_args()
compressor = make_compressor(args.compressor, args.clevel, args.shuffle)
mode = "w" if args.overwrite else "x"
try:
with h5py.File(args.input_h5, "r") as h5_file:
zarr_store = zarr.open(args.output_zarr, mode=mode)
copy_h5_to_zarr(h5_file, zarr_store, compressor=compressor)
except FileExistsError:
print(
f"[error] Output Zarr store already exists: {args.output_zarr}. "
f"Use --overwrite to replace.",
file=sys.stderr,
)
sys.exit(1)
if __name__ == "__main__":
main()