Skip to content

Commit 3d16ec0

Browse files
authored
Merge pull request #52 from ARMmaster17/46-better-pipeline-logging
Console MetricsExporter module
2 parents 3d2e10d + 588e67c commit 3d16ec0

File tree

3 files changed

+86
-0
lines changed

3 files changed

+86
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## [Unreleased]
44

55
### Added
6+
- `ConsoleMetricsExporter` for locally debugging pipelines without an APM service.
67

78
### Changed
89
- Bumped redis dependency to 4.2.2.

test/console_me_tests.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import unittest
2+
3+
from watergrid.context import DataContext
4+
from watergrid.metrics.ConsoleMetricsExporter import ConsoleMetricsExporter
5+
from watergrid.pipelines.pipeline import Pipeline
6+
from watergrid.steps import Step
7+
8+
9+
class MockStep(Step):
10+
def __init__(self, throw_exception=False):
11+
self.throw_exception = throw_exception
12+
super().__init__(self.__class__.__name__)
13+
14+
def run(self, context: DataContext):
15+
if self.throw_exception:
16+
raise Exception("MockStep failed")
17+
18+
19+
class ConsoleMetricsExporterTestCase(unittest.TestCase):
20+
def test_outputs_pipeline_start(self):
21+
pipeline = Pipeline("test_pipeline")
22+
pipeline.add_metrics_exporter(ConsoleMetricsExporter())
23+
with self.assertLogs() as captured:
24+
pipeline.run()
25+
self.assertIn("INFO:root:Starting pipeline: test_pipeline", captured.output)
26+
27+
def test_outputs_pipeline_end(self):
28+
pipeline = Pipeline("test_pipeline")
29+
pipeline.add_metrics_exporter(ConsoleMetricsExporter())
30+
with self.assertLogs() as captured:
31+
pipeline.run()
32+
self.assertIn("INFO:root:Ending pipeline", captured.output)
33+
34+
def test_outputs_pipeline_end_with_error(self):
35+
pipeline = Pipeline("test_pipeline")
36+
pipeline.add_metrics_exporter(ConsoleMetricsExporter())
37+
pipeline.add_step(MockStep(throw_exception=True))
38+
with self.assertLogs() as captured:
39+
pipeline.run()
40+
self.assertIn("ERROR:root:Exception: MockStep failed", captured.output)
41+
42+
def test_outputs_step_start(self):
43+
pipeline = Pipeline("test_pipeline")
44+
pipeline.add_metrics_exporter(ConsoleMetricsExporter())
45+
pipeline.add_step(MockStep())
46+
with self.assertLogs() as captured:
47+
pipeline.run()
48+
self.assertIn("INFO:root:Starting step: MockStep", captured.output)
49+
50+
def test_outputs_step_end(self):
51+
pipeline = Pipeline("test_pipeline")
52+
pipeline.add_metrics_exporter(ConsoleMetricsExporter())
53+
pipeline.add_step(MockStep())
54+
with self.assertLogs() as captured:
55+
pipeline.run()
56+
self.assertIn("INFO:root:Ending step", captured.output)
57+
58+
59+
if __name__ == "__main__":
60+
unittest.main()
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import logging
2+
3+
from watergrid.metrics.MetricsExporter import MetricsExporter
4+
5+
6+
class ConsoleMetricsExporter(MetricsExporter):
7+
def __init__(self):
8+
super().__init__()
9+
logging.basicConfig(level=logging.INFO)
10+
self._logger = logging.getLogger(__name__)
11+
12+
def start_pipeline(self, pipeline_name):
13+
logging.info("Starting pipeline: " + pipeline_name)
14+
15+
def end_pipeline(self):
16+
logging.info("Ending pipeline")
17+
18+
def start_step(self, step_name):
19+
logging.info("Starting step: " + step_name)
20+
21+
def end_step(self):
22+
logging.info("Ending step")
23+
24+
def capture_exception(self, exception: Exception):
25+
logging.error("Exception: " + str(exception))

0 commit comments

Comments
 (0)