-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathgmail_lag_alert_notification.py
More file actions
149 lines (118 loc) · 5.12 KB
/
gmail_lag_alert_notification.py
File metadata and controls
149 lines (118 loc) · 5.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""
Author: Hevo
File : gmail_lag_alert_notification.py
Purpose:
--------
This script demonstrates a simple Python program that performs basic operation to
notify about gmail lag monitoring.
Usage Documentation:
------
https://api-docs.hevodata.com/reference/introduction
License:
--------
This script has no license. It is provided "as-is" without any warranty. Feel free to use
and modify it for any purpose.
"""
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from datetime import datetime, timedelta
import pytz
import json
from dateutil import parser
import requests
# Email configuration
# https://docs.google.com/document/d/14TPpmngKsquoGDXT71flLZ7dQW3_DspAxeuSr07-BoE/edit?usp=sharing
SMTP_SERVER = 'smtp.gmail.com' # For outlook use smtp.office365.com
SMTP_PORT = 587 # port would be same for outlook
SMTP_USER = '<your_email_id>' # Add your email id
SMTP_PASSWORD = '<your_password>' # Add your password
# Dictionary of primary recipients and their CC recipients
EMAIL_DICT = {
'to@email.com': [ # CC recipients
'cc1@email.com' , 'cc2@email.com'
]
}
# Base URL and headers for the API request
base_url = 'https://<region>.hevodata.com/api/public/v2.0/pipelines'
headers = {
'accept': 'application/json',
'authorization': 'Basic <REPLACE_WITH_YOUR_TOKEN>'
}
# Define the mapping for timezones
tzinfos = {
'EDT': -4 * 3600, # EDT is UTC-4
'EST': -5 * 3600, # EST is UTC-5
'IST': 5.5 * 3600 # IST is UTC+5:30
# Add other timezones if needed
}
def get_pipeline_position(pipeline_id):
url = f'{base_url}/{pipeline_id}/position'
response = requests.get(url, headers=headers)
# Debugging: Print the entire response
print(f"API Response: {response.json()}")
response_data = response.json()
# Check if 'data' is in the response
if 'data' not in response_data:
raise ValueError("API response does not contain 'data' key")
return response_data['data']['display_position']
def clean_timestamp(timestamp_str):
# Extract only the date and time part before the comma
return timestamp_str.split("IST,")[0].strip()
def parse_timestamp(timestamp_str):
# Parse datetime with timezone abbreviation using dateutil.parser and tzinfos
return parser.parse(timestamp_str, tzinfos=tzinfos)
def check_lag(pipeline_id):
try:
display_position = get_pipeline_position(pipeline_id)
print(f"Display Position: {display_position}") # Debugging line
timestamp_str = clean_timestamp(display_position)
print(f"Cleaned Timestamp: {timestamp_str}") # Debugging line
timestamp = parse_timestamp(timestamp_str)
# Ensure the parsed timestamp is timezone-aware
if timestamp.tzinfo is None:
timestamp = pytz.timezone('Asia/Kolkata').localize(timestamp)
else:
timestamp = timestamp.astimezone(pytz.timezone('Asia/Kolkata'))
current_time = datetime.now(pytz.timezone('Asia/Kolkata'))
lag = current_time - timestamp
result = f"Pipeline ID: {pipeline_id}\nTimestamp: {timestamp_str}\nCurrent Time: {current_time}\nLag: {lag} hours\n"
if lag > timedelta(hours=12):
result += f"Warning: Pipeline {pipeline_id} has a lag of {lag.total_seconds() / 3600:.2f} hours.\n"
elif lag > timedelta(minutes=70):
result += f"Warning: Pipeline {pipeline_id} has a lag of {lag.total_seconds() / 60:.2f} minutes.\n"
else:
result += f"Pipeline {pipeline_id} is running smoothly with a lag of {lag.total_seconds() / 60:.2f} minutes.\n"
return result
except Exception as e:
return f"Error processing pipeline {pipeline_id}: {e}\n"
def main(pipeline_ids):
results = []
for pipeline_id in pipeline_ids:
result = check_lag(pipeline_id)
results.append(result)
return "\n".join(results)
def send_email(to_email, cc_list, subject, body):
try:
msg = MIMEMultipart()
msg['From'] = SMTP_USER
msg['To'] = to_email
msg['Cc'] = ', '.join(cc_list)
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
# Combine primary recipient and CC recipients
all_recipients = [to_email] + cc_list
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls()
server.login(SMTP_USER, SMTP_PASSWORD)
server.sendmail(SMTP_USER, all_recipients, msg.as_string())
print(f"Notification sent to {to_email} with CC to {', '.join(cc_list)}")
except Exception as e:
print(f"Failed to send email to {to_email}: {e}")
if __name__ == "__main__":
subject = "Hevo Lag-Alert Notification System Results"
pipeline_ids = [683] # Example pipeline IDs
results = main(pipeline_ids)
body = f"Please find below the results of the Hevo Lag-Alert Notification System:\n\n{results}"
for primary, cc_list in EMAIL_DICT.items():
send_email(primary, cc_list, subject, body)