Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,14 @@ jobs:
if: matrix.install-profile == 'nlp'
run: |
pip install -e ".[test,cli,nlp]" -r requirements-test.txt
python -m spacy download en_core_web_sm
python -m spacy download en_core_web_lg

- name: Install dependencies (nlp-advanced)
if: matrix.install-profile == 'nlp-advanced'
run: |
pip install -e ".[test,cli,nlp,nlp-advanced]" -r requirements-test.txt
python -m spacy download en_core_web_sm
python -m spacy download en_core_web_lg
datafog download-model urchade/gliner_multi_pii-v1 --engine gliner

- name: Run tests (core)
if: matrix.install-profile == 'core'
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ jobs:
python -m pip install --upgrade pip
pip install -e ".[all,test]"
pip install -r requirements-test.txt
pip install https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1.tar.gz
python -m spacy download en_core_web_lg
datafog download-model urchade/gliner_multi_pii-v1 --engine gliner

- name: Run tests with segfault protection
run: |
Expand Down
2 changes: 1 addition & 1 deletion datafog/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def download_model(
Download a model for specified engine.

Examples:
spaCy: datafog download-model en_core_web_sm --engine spacy
spaCy: datafog download-model en_core_web_lg --engine spacy
GLiNER: datafog download-model urchade/gliner_multi_pii-v1 --engine gliner
"""
if engine == "spacy":
Expand Down
24 changes: 18 additions & 6 deletions datafog/models/spacy_nlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

from .annotator import AnnotationResult, AnnotatorRequest

DEFAULT_SPACY_MODEL = "en_core_web_lg"


class SpacyAnnotator:
"""
Expand All @@ -22,14 +24,18 @@ class SpacyAnnotator:
Supports various NLP tasks including entity recognition and model management.
"""

def __init__(self, model_name: str = "en_core_web_lg"):
def __init__(self, model_name: str = DEFAULT_SPACY_MODEL):
self.model_name = model_name
self.nlp = None

def load_model(self):
if not spacy.util.is_package(self.model_name):
spacy.cli.download(self.model_name)
self.nlp = spacy.load(self.model_name)
try:
self.nlp = spacy.load(self.model_name)
except OSError as exc:
raise ImportError(
f"spaCy model {self.model_name!r} is not installed. "
f"Download it explicitly with: datafog download-model {self.model_name} --engine spacy"
) from exc

def annotate_text(self, text: str, language: str = "en") -> List[AnnotationResult]:
if not self.nlp:
Expand Down Expand Up @@ -72,6 +78,12 @@ def list_models() -> List[str]:
return spacy.util.get_installed_models()

@staticmethod
def list_entities() -> List[str]:
nlp = spacy.load("en_core_web_lg")
def list_entities(model_name: str = DEFAULT_SPACY_MODEL) -> List[str]:
try:
nlp = spacy.load(model_name)
except OSError as exc:
raise ImportError(
f"spaCy model {model_name!r} is not installed. "
f"Download it explicitly with: datafog download-model {model_name} --engine spacy"
) from exc
return [ent for ent in nlp.pipe_labels["ner"]]
65 changes: 31 additions & 34 deletions datafog/processing/image_processing/donut_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,10 @@
from images of documents.
"""

import importlib
import importlib.util
import json
import logging
import os
import re
import subprocess
import sys
from typing import TYPE_CHECKING, Any

from .image_downloader import ImageDownloader
Expand Down Expand Up @@ -43,13 +39,12 @@ def __init__(self, model_path="naver-clova-ix/donut-base-finetuned-cord-v2"):
self.model_path = model_path
self.downloader = ImageDownloader()

def ensure_installed(self, package_name):
try:
importlib.import_module(package_name)
except ImportError:
subprocess.check_call(
[sys.executable, "-m", "pip", "install", package_name]
)
@staticmethod
def _missing_dependency_message(package_name: str) -> str:
return (
f"Donut OCR requires {package_name}. "
"Install with: pip install datafog[nlp-advanced,ocr]"
)

def preprocess_image(self, image: "Image.Image") -> Any:
import numpy as np
Expand Down Expand Up @@ -86,40 +81,40 @@ async def extract_text_from_image(self, image: "Image.Image") -> str:
"PYTEST_DONUT=yes is set, running actual OCR in test environment"
)

# Only import torch and transformers when actually needed and not in test environment
try:
# Check if torch is available before trying to import it
try:
# Try to find the module without importing it
spec = importlib.util.find_spec("torch")
if spec is None:
# If we're in a test that somehow bypassed the IN_TEST_ENV check,
# still return a mock result instead of failing
logging.warning("torch module not found, returning mock result")
return json.dumps({"text": "Mock OCR text (torch not available)"})

# Ensure dependencies are installed
self.ensure_installed("torch")
self.ensure_installed("transformers")
except ImportError:
# If importlib.util is not available, fall back to direct try/except
pass

# Import dependencies only when needed
try:
import torch
except ImportError as exc:
raise ImportError(self._missing_dependency_message("torch")) from exc

try:
from transformers import DonutProcessor as TransformersDonutProcessor
from transformers import VisionEncoderDecoderModel
except ImportError as e:
logging.warning(f"Import error: {e}, returning mock result")
return json.dumps({"text": f"Mock OCR text (import error: {e})"})
raise ImportError(
self._missing_dependency_message("transformers")
) from e

# Preprocess the image
image_np = self.preprocess_image(image)

# Initialize model components
processor = TransformersDonutProcessor.from_pretrained(self.model_path)
model = VisionEncoderDecoderModel.from_pretrained(self.model_path)
try:
processor = TransformersDonutProcessor.from_pretrained(
self.model_path,
local_files_only=True,
)
model = VisionEncoderDecoderModel.from_pretrained(
self.model_path,
local_files_only=True,
)
except OSError as exc:
raise RuntimeError(
f"Donut model {self.model_path!r} is not available locally. "
"Download it explicitly before using Donut OCR, or pass a local "
"model path."
) from exc

device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
model.eval()
Expand Down Expand Up @@ -153,6 +148,8 @@ async def extract_text_from_image(self, image: "Image.Image") -> str:
result = processor.token2json(sequence)
return json.dumps(result)

except (ImportError, RuntimeError):
raise
except Exception as e:
logging.error(f"Error in extract_text_from_image: {e}")
# Return a placeholder in case of error
Expand Down
24 changes: 16 additions & 8 deletions datafog/processing/spark_processing/pyspark_udfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@
PySpark UDFs for PII annotation and related utilities.

This module provides functions for PII (Personally Identifiable Information) annotation
using SpaCy models in a PySpark environment. It includes utilities for installing
dependencies, creating and broadcasting PII annotator UDFs, and performing PII annotation
on text data.
using SpaCy models in a PySpark environment. It includes utilities for validating
dependencies, creating and broadcasting PII annotator UDFs, and performing PII
annotation on text data.
"""

import importlib
import subprocess
import sys

PII_ANNOTATION_LABELS = ["DATE_TIME", "LOC", "NRP", "ORG", "PER"]
MAXIMAL_STRING_SIZE = 1000000
DEFAULT_SPACY_MODEL = "en_core_web_lg"


def pii_annotator(text: str, broadcasted_nlp) -> list[list[str]]:
Expand Down Expand Up @@ -45,7 +44,7 @@ def pii_annotator(text: str, broadcasted_nlp) -> list[list[str]]:


def broadcast_pii_annotator_udf(
spark_session=None, spacy_model: str = "en_core_web_lg"
spark_session=None, spacy_model: str = DEFAULT_SPACY_MODEL
):
"""Broadcast PII annotator across Spark cluster and create UDF"""
ensure_installed("pyspark")
Expand All @@ -69,5 +68,14 @@ def broadcast_pii_annotator_udf(
def ensure_installed(package_name):
try:
importlib.import_module(package_name)
except ImportError:
subprocess.check_call([sys.executable, "-m", "pip", "install", package_name])
except ImportError as exc:
if package_name == "pyspark":
extra = "distributed"
elif package_name == "spacy":
extra = "nlp"
else:
extra = "all"
raise ImportError(
f"{package_name} is required for Spark PII UDF support. "
f"Install with: pip install datafog[{extra}]"
) from exc
8 changes: 6 additions & 2 deletions datafog/processing/text_processing/gliner_annotator.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,18 @@ def create(

try:
# Load the GLiNER model
model = GLiNER.from_pretrained(model_name)
model = GLiNER.from_pretrained(model_name, local_files_only=True)
logging.info(f"Successfully loaded GLiNER model: {model_name}")

return cls(model=model, entity_types=entity_types, model_name=model_name)

except Exception as e:
logging.error(f"Failed to load GLiNER model {model_name}: {str(e)}")
raise
raise RuntimeError(
f"GLiNER model {model_name!r} is not available locally. "
"Download it explicitly with: "
f"datafog download-model {model_name} --engine gliner"
) from e

def annotate(self, text: str) -> Dict[str, List[str]]:
"""
Expand Down
39 changes: 17 additions & 22 deletions datafog/processing/text_processing/spacy_pii_annotator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,34 @@
"WORK_OF_ART",
]
MAXIMAL_STRING_SIZE = 1000000
DEFAULT_SPACY_MODEL = "en_core_web_lg"


class SpacyPIIAnnotator(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)

nlp: Any
model_name: str = DEFAULT_SPACY_MODEL

@classmethod
def create(cls) -> "SpacyPIIAnnotator":
import spacy

def create(cls, model_name: str = DEFAULT_SPACY_MODEL) -> "SpacyPIIAnnotator":
try:
nlp = spacy.load("en_core_web_lg")
except OSError:
import subprocess
import sys
import spacy
except ImportError as exc:
raise ImportError(
"SpaCy engine requires the nlp extra. "
"Install with: pip install datafog[nlp]"
) from exc

interpreter_location = sys.executable
subprocess.run(
[
interpreter_location,
"-m",
"pip",
"install",
"--no-deps",
"--no-cache-dir",
"https://github.com/explosion/spacy-models/releases/download/en_core_web_lg-3.7.1/en_core_web_lg-3.7.1-py3-none-any.whl",
],
check=True,
)
nlp = spacy.load("en_core_web_lg")
try:
nlp = spacy.load(model_name)
except OSError as exc:
raise ImportError(
f"spaCy model {model_name!r} is not installed. "
f"Download it explicitly with: datafog download-model {model_name} --engine spacy"
) from exc

return cls(nlp=nlp)
return cls(nlp=nlp, model_name=model_name)

def annotate(self, text: str) -> Dict[str, List[str]]:
try:
Expand Down
29 changes: 9 additions & 20 deletions datafog/services/spark_service.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,26 @@
"""
Spark service for data processing and analysis.

Provides a wrapper around PySpark functionality, including session creation,
JSON reading, and package management.
Provides a wrapper around PySpark functionality, including session creation and
JSON reading.
"""

import importlib
import os
import subprocess
import sys
from typing import List


class SparkService:
"""
Manages Spark operations and dependencies.

Initializes a Spark session, handles imports, and provides methods for
data reading and package installation.
Initializes a Spark session, handles imports, and provides methods for data
reading.
"""

def __init__(self, master=None):
self.master = master

# Ensure pyspark is installed first
self.ensure_installed("pyspark")

# Now import necessary modules after ensuring pyspark is installed
Expand Down Expand Up @@ -84,16 +81,8 @@ def read_json(self, path: str) -> List[dict]:
def ensure_installed(self, package_name):
try:
importlib.import_module(package_name)
except ImportError:
print(f"Installing {package_name}...")
try:
subprocess.check_call(
[sys.executable, "-m", "pip", "install", package_name]
)
print(f"{package_name} installed successfully.")
except subprocess.CalledProcessError as e:
print(f"Failed to install {package_name}: {e}")
raise ImportError(
f"Could not install {package_name}. "
f"Please install it manually with 'pip install {package_name}'."
)
except ImportError as exc:
raise ImportError(
f"{package_name} is required for Spark support. "
"Install with: pip install datafog[distributed]"
) from exc
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
distributed_deps = [
"pandas>=2.0.0",
"numpy>=1.24.0",
"pyspark>=3.5.0",
]

web_deps = [
Expand Down
Loading
Loading