Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 121 additions & 86 deletions dclimate_zarr_client/ipfs_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ def fetch_json_from_ipns(
last_error = StacCatalogError(
f"Unexpected error (nocache=true): {type(e).__name__}: {e}"
)

# --- Attempt 2: GET without nocache (if Attempt 1 failed) ---
logger.info(
f"Retrying fetch JSON via Gateway GET without nocache for: {ipns_name_for_url}"
Expand All @@ -204,20 +203,34 @@ def fetch_json_from_ipns(
allow_redirects=True,
) # Retry
response.raise_for_status()
json_content = response.json()
json_content = response.json() # This call may raise JSONDecodeError
logger.info(
f"Successfully fetched JSON from IPNS '{ipns_name}' (nocache=false)"
)
return json_content

except json.JSONDecodeError as e:
# Handle JSON decode errors explicitly on the retry attempt.
response_text = response.text[:500] if response else "[No Response]"
status_code = response.status_code if response else "[No Status]"
err_msg = (
f"Invalid JSON fetching IPNS '{ipns_name}' (retry) via Gateway {gateway_base}: {e}. "
f"Response text: {response_text[:100]}"
)
if last_error:
err_msg += f" | Initial error (nocache=true): {type(last_error).__name__}: {last_error}"
raise StacCatalogError(err_msg) from e

except requests.exceptions.ConnectionError as e:
raise IpfsConnectionError(
f"Connection error during IPNS fetch retry for '{ipns_name}' via Gateway {gateway_base}. Details: {e}"
) from e

except requests.exceptions.Timeout as e:
raise IpfsConnectionError(
f"Timeout ({timeout}s) during IPNS fetch retry for '{ipns_name}' via Gateway {gateway_base}."
) from e

except requests.exceptions.RequestException as e: # Includes HTTP errors on retry
err_msg = (
f"Error fetching IPNS '{ipns_name}' (retry) via Gateway {gateway_base}: {e}"
Expand All @@ -231,18 +244,12 @@ def fetch_json_from_ipns(
except Exception:
response_text = "[Could not read response text]"
err_msg += f" Status Code: {status_code}, Response: {response_text}"
if last_error:
err_msg += f" | Initial error (nocache=true): {type(last_error).__name__}: {last_error}"
raise StacCatalogError(
err_msg
) from e # Raise as StacCatalogError as it prevents catalog reading
except json.JSONDecodeError as e:
err_msg = f"Invalid JSON fetching IPNS '{ipns_name}' (retry) via Gateway {gateway_base}: {e}. Response text: {response.text[:500] if response else '[No Response]'}"
if last_error:
err_msg += f" | Initial error (nocache=true): {type(last_error).__name__}: {last_error}"
raise StacCatalogError(err_msg) from e
except Exception as e: # Catch any other unexpected error during retry
err_msg = f"Unexpected error during IPNS fetch retry for '{ipns_name}' via Gateway: {e}"

except Exception as e: # Catch any other unexpected exceptions
err_msg = f"Unexpected error during IPNS fetch retry for '{ipns_name}' via Gateway {gateway_base}: {e}"
if last_error:
err_msg += f" | Initial error (nocache=true): {type(last_error).__name__}: {last_error}"
raise StacCatalogError(err_msg) from e
Expand All @@ -253,8 +260,9 @@ def get_dataset_hamt_cid_from_stac(
root_catalog_ipns: str,
target_dataset_id: str,
gateway_uri_stem: str | None = None,
rpc_uri_stem: str
| None = None, # Keep rpc_uri_stem for IPFSStore config if needed by fetch_json_from_cid
rpc_uri_stem: (
str | None
) = None, # Keep rpc_uri_stem for IPFSStore config if needed by fetch_json_from_cid
) -> str:
"""
Traverses the dClimate STAC catalog starting from a root IPNS name
Expand Down Expand Up @@ -307,7 +315,7 @@ def get_dataset_hamt_cid_from_stac(
logger.warning(
f"Skipping child link with unexpected string href format (expected dict): {link}"
)
# If needed: collections_to_visit.append(href_obj[6:])
collections_to_visit.append(href_obj[6:])
else:
logger.warning(
f"Skipping invalid child link format in root catalog: {link}"
Expand All @@ -323,113 +331,132 @@ def get_dataset_hamt_cid_from_stac(
for collection_cid in collections_to_visit:
logger.debug(f"Fetching collection content for CID: {collection_cid}")
try:
# *** Use fetch_json_from_cid ***
# --- collection JSON ---
collection = fetch_json_from_cid(collection_cid, ipfs_store)
if (
not isinstance(collection, dict)
or collection.get("type") != "Collection"
):
logger.warning(
f"Skipping invalid collection format for CID {collection_cid}. Type: {collection.get('type')}"
f"Skipping invalid collection format for CID {collection_cid}. "
f"Type: {collection.get('type')}"
)
continue

items_found_in_collection = 0
for link in collection.get("links", []):
if link.get("rel") == "item" and link.get("type") == "application/json":
item_href_obj = link.get("href")
item_cid = None # Reset item_cid for each link
item_cid = None # reset each link

# *** MODIFIED: Handle dict href for item links ***
# --- handle IPLD or legacy /ipfs/ links ---
if isinstance(item_href_obj, dict):
item_cid = item_href_obj.get("/") # Extract item CID string
item_cid = item_href_obj.get("/") # IPLD dict
elif isinstance(item_href_obj, str) and item_href_obj.startswith(
"/ipfs/"
):
logger.warning(
f"Found item link with legacy string href format in {collection_cid}: {link}"
f"Found legacy string href in {collection_cid}: {link}"
)
item_cid = item_href_obj[6:]
item_cid = item_href_obj[6:] # strip "/ipfs/"
else:
logger.warning(
f"Skipping invalid item link format in collection {collection_cid}: {link}"
f"Skipping invalid item link in {collection_cid}: {link}"
)
continue # Skip this link if format is wrong
continue

if isinstance(item_cid, str):
items_found_in_collection += 1
# logger.debug(f"Fetching item content for CID: {item_cid}") # Can be verbose
try:
# *** Use fetch_json_from_cid with the extracted item CID string ***
item = fetch_json_from_cid(item_cid, ipfs_store)
if not isinstance(item_cid, str):
# already logged warning
continue

if (
not isinstance(item, dict)
or item.get("type") != "Feature"
):
logger.warning(
f"Skipping invalid item format for CID {item_cid}. Type: {item.get('type')}"
)
continue
items_found_in_collection += 1
item_id: str | None = None # keep in scope for except
try:
# --- item JSON ---
item = fetch_json_from_cid(item_cid, ipfs_store)

item_id = item.get("id")
if item_id == target_dataset_id:
logger.info(
f"Found matching item for '{target_dataset_id}' with CID {item_cid} in collection {collection_cid}"
)
hamt_asset = item.get("assets", {}).get("hamt-zarr", {})
hamt_cid_href = hamt_asset.get(
"href"
) # This should be the /ipfs/ string

if not isinstance(
hamt_cid_href, str
) or not hamt_cid_href.startswith("/ipfs/"):
raise StacCatalogError(
f"STAC Item '{item_id}' (CID: {item_cid}) is missing a valid string 'assets.hamt-zarr.href' starting with /ipfs/. Found: '{hamt_cid_href}' (type: {type(hamt_cid_href).__name__})"
)
if not isinstance(item, dict) or item.get("type") != "Feature":
logger.warning(
f"Skipping invalid item format for CID {item_cid}. "
f"Type: {item.get('type')}"
)
continue

hamt_cid_str = hamt_cid_href[
6:
] # Slice the /ipfs/ prefix
logger.info(
f"Successfully extracted HAMT CID for '{target_dataset_id}': {hamt_cid_str}"
)
_stac_hamt_cid_cache[target_dataset_id] = hamt_cid_str
return hamt_cid_str
item_id = item.get("id")
if item_id != target_dataset_id:
# not the dataset we're looking for
continue

except (StacCatalogError, IpfsConnectionError) as item_err:
# Log error but continue searching other items/collections
logger.error(
f"Error processing item {item_cid} in collection {collection_cid}, continuing search: {item_err}"
)
except (
Exception
) as item_err: # Catch unexpected errors during item processing
logger.error(
f"Unexpected error processing item {item_cid} in collection {collection_cid}, continuing search: {type(item_err).__name__}: {item_err}"
logger.info(
f"Found matching item for '{target_dataset_id}' "
f"(CID {item_cid}) in collection {collection_cid}"
)
hamt_asset = item.get("assets", {}).get("hamt-zarr", {})
hamt_cid_href = hamt_asset.get("href") # expected "/ipfs/<cid>"

if not isinstance(
hamt_cid_href, str
) or not hamt_cid_href.startswith("/ipfs/"):
raise StacCatalogError(
f"STAC Item '{item_id}' (CID: {item_cid}) is missing a "
f"valid string 'assets.hamt-zarr.href' starting with "
f"/ipfs/. Found: '{hamt_cid_href}' "
f"(type: {type(hamt_cid_href).__name__})"
)
# else: Invalid item CID extracted, already logged warning

hamt_cid_str = hamt_cid_href[6:] # drop "/ipfs/"
logger.info(
f"Successfully extracted HAMT CID for '{target_dataset_id}': "
f"{hamt_cid_str}"
)
_stac_hamt_cid_cache[target_dataset_id] = hamt_cid_str
return hamt_cid_str

# ── error handling ──────────────────────────────────────────
except StacCatalogError as item_err:
# If the failing item *is* the target dataset, bubble it up.
if item_id == target_dataset_id:
raise item_err
logger.error(
f"Error processing non-target item {item_cid} in collection "
f"{collection_cid}: {item_err}"
)
except IpfsConnectionError as item_err:
logger.error(
f"IPFS error processing item {item_cid} in collection "
f"{collection_cid}: {item_err}"
)
except Exception as item_err:
logger.error(
f"Unexpected error processing item {item_cid} in collection "
f"{collection_cid}: {type(item_err).__name__}: {item_err}"
)

logger.debug(
f"Finished searching {items_found_in_collection} items in collection {collection.get('id', collection_cid)}."
f"Finished searching {items_found_in_collection} items in collection "
f"{collection.get('id', collection_cid)}."
)

except (StacCatalogError, IpfsConnectionError) as col_err:
# Log error but continue searching other collections
except StacCatalogError as col_err:
# ← this is the error that means “the target dataset is malformed”
# → let it propagate to the caller so tests (and callers) can see it.
raise col_err
except IpfsConnectionError as col_err:
# ← still swallow network errors so other collections can be tried
logger.error(
f"Error processing collection {collection_cid}, continuing search: {col_err}"
f"IPFS error processing collection {collection_cid}, continuing search: "
f"{col_err}"
)
except (
Exception
) as col_err: # Catch unexpected errors during collection processing
except Exception as col_err:
logger.error(
f"Unexpected error processing collection {collection_cid}, continuing search: {type(col_err).__name__}: {col_err}"
f"Unexpected error processing collection {collection_cid}, continuing "
f"search: {type(col_err).__name__}: {col_err}"
)

# If loop completes without finding the dataset
# If the loop completes without returning
raise DatasetNotFoundError(
f"Dataset ID '{target_dataset_id}' not found after searching all collections in the STAC catalog rooted at IPNS '{root_catalog_ipns}'."
f"Dataset ID '{target_dataset_id}' not found after searching all collections "
f"in the STAC catalog rooted at IPNS '{root_catalog_ipns}'."
)


Expand Down Expand Up @@ -668,6 +695,9 @@ def _get_dataset_by_ipfs_cid(
raise StacCatalogError(
f"Zarr metadata not found at CID {ipfs_cid}. Is it a valid Zarr root? Error: {e}"
) from e
except ValueError:
# Let ValueErrors propagate, e.g. from invalid CID format
raise
except Exception as e:
# Catch other potential errors (e.g., Zarr format errors, py-hamt errors)
logger.error(
Expand Down Expand Up @@ -876,10 +906,15 @@ def list_datasets(
)
# else: Invalid item CID extracted, already logged warning

except (StacCatalogError, IpfsConnectionError) as col_err:
# Log and skip this specific collection if fetching/parsing fails
logger.warning(
f"Skipping collection {collection_cid} during list due to error: {col_err}"
# 1️⃣ propagate a StacCatalogError that bubbled up from the **target item**
except StacCatalogError as col_err:
raise col_err

# 2️⃣ still swallow IPFS/network problems so that other collections can be tried
except IpfsConnectionError as col_err:
logger.error(
f"IPFS error processing collection {collection_cid}, continuing search: "
f"{col_err}"
)
except Exception as col_err: # Catch unexpected errors
logger.warning(
Expand Down
32 changes: 32 additions & 0 deletions tests/test_geotemporal_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import numpy as np
import pytest

from dclimate_zarr_client.geotemporal_data import _haversine, _check_input_parameters
from dclimate_zarr_client import dclimate_zarr_errors as errors


def test_haversine_single_points():
dist = _haversine(0, 0, 0, 1)
assert dist == pytest.approx(111.195, rel=1e-3)

dist = _haversine(36.12, -86.67, 33.94, -118.40)
assert dist == pytest.approx(2886.44, rel=1e-2)


def test_haversine_arrays():
lats1 = np.array([0, 10])
lons1 = np.array([0, 0])
lats2 = np.array([0, 20])
lons2 = np.array([1, 0])
dists = _haversine(lats1, lons1, lats2, lons2)
assert np.allclose(dists, [111.195, 1111.95], rtol=1e-3)


def test_check_input_parameters_invalid_period():
with pytest.raises(errors.InvalidTimePeriodError):
_check_input_parameters(time_period="decade")


def test_check_input_parameters_invalid_method():
with pytest.raises(errors.InvalidAggregationMethodError):
_check_input_parameters(agg_method="average")
Loading
Loading