Open
Description
Celery tasks:
`celeryApp = Celery(
"celerytasks", broker="pyamqp://localhost//", timezone="Canada/Eastern"
)
@celeryApp.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# celery beats setup
notification = Notification.get_all_notification()
print("Celery Initiated")
for notify in range(len(notification)):
# Creating worker task for every notification
notify_time = notification_time(notification["time"][notify])
# scheduling perioding task based notification table
result = sender.add_periodic_task(
notify_time,
defect_task_call.s(
str(notification["id"][notify]),
notification["defect"][notify],
notification["email"][notify],
notification["time"][notify],
notification["condition"][notify],
str(notification["value"][notify]),
notification["line"][notify],
notification["directory"][notify],
),
name=notification["id"][notify],
)
@celeryApp.task(name="alert")
def defect_task_call(
id: str,
defect_type: str,
receiver_email: str,
time: str,
condition: str,
value: str,
line: str,
directory: str,
):
"""Worker Task based on every notification
Args:
id: str, -> ID of the notification
defect_type: str,
receiver_email: str, -> Notification reciever
time: str, -> time schedule
condition: str,
value: str,
line: str,
directory: str, -> Directory recived
Returns:
condition :str
"""
value = int(value)
# print("TaskID ",celeryApp.task.id)
print("Wrk ID", celeryApp.current_worker_task.request.id)
print(defect_type)
print(id)
# Hourly or minute check
if "M" in time:
parts = Part.get_last_n_minute_types(
line_sub_id=line,
minute=parts_time(time),
defect=defect_type,
approved=True,
)
if "H" in time:
parts = Part.get_last_n_hour_types(
line_sub_id=line,
hour=parts_time(time),
defect=defect_type,
approved=True,
)
if parts.empty:
alert = notification_condition(condition, parts.count()["Defect Type"], value)
# email notification for alerts
if alert == True:
mail_subject = (
"Alert Mail for Defect Type: "
+ str(defect_type).replace("_", " ").title()
)
body_content = (
"Threshold reached alert of Defect: ' "
+ str(defect_type).replace("_", " ").title()
+ " '"
)
table_content = (
"""
<tr>
<th>Defect Type</th>
<th>Time Period</th>
<th>Current Count</th>
<th>Condition</th>
<th>Alert Value</th>
<th>Directory</th>
<th>Line</th>
</tr>
<tr>
<td>"""
+ str(defect_type).replace("_", " ").title()
+ """</td>
<td>"""
+ email_time(time)
+ """</td>
<td>"""
+ str(parts.count()["Defect Type"])
+ """</td>
<td>"""
+ email_condition(condition)
+ """</td>
<td>"""
+ str(value)
+ """</td>
<td>"""
+ directory
+ """</td>
<td>"""
+ line
+ """</td>
</tr>
"""
)
email_notification._email_notification(
mail_subject=mail_subject,
body_content=body_content,
receiver_email=receiver_email,
table_content=table_content,
)
print("Mail Sent")`
email_notification.py
`def _email_notification(
mail_subject: str, body_content: str, receiver_email: str, table_content=""
):
"""Function snds mail to reciever mail ids
Args:
mail_subject: str,
body_content: str,
receiver_email: str,
table_content: table contents
"""
Mail_subject = "Testing Cendiant Mail Bot"
if mail_subject != "":
Mail_subject = mail_subject
Body_content = "I'm Cendiant, The Mail Bot."
if body_content != "":
Body_content = body_content
body = (
"""
<html>
<head>
<style>
#mail {
width: 70%;
border: 15px solid green;
padding: 50px;
margin: 20px;
}
table {
font-family: arial, sans-serif;
border-collapse: collapse;
width: 100%;
}
td, th {
border: 1px solid #dddddd;
text-align: left;
padding: 8px;
}
tr:nth-child(even) {
background-color: #dddddd;
}
</style>
</head>
<body>
<div id="mail">
<b><i>Hi There,</i></b><br>
"""
+ Body_content
+ """<br>
<br>
<table>
"""
+ table_content
+ """
</table>
<br><br>
Thank you<br>
</div>
</body>
</html>
"""
)
email_port = config["email"]["port"]
email_domain = config["email"]["domain"]
email_id = config["email"]["sender"]
app_password = config["email"]["app_password"]
Receiver_email = config["email"]["sender"]
if receiver_email != "":
Receiver_email = receiver_email
email_server = smtplib.SMTP(email_domain, email_port)
email_content = MIMEMultipart("alternative")
email_content["Subject"] = Mail_subject
email_body = MIMEText(body, "html")
email_content.attach(email_body)
email_server.starttls()
email_server.login(email_id, app_password)
# Sending Email
try:
time.sleep(5)
email_server.sendmail(email_id, Receiver_email, email_content.as_string())
email_server.quit()
except Exception as es:
print(str(es))`
Issue:
Some mails are not triggered by smtplib
but the tasks is received and triggered