Newer
Older
from django.contrib import messages
from django.utils.dateparse import parse_datetime
from django_cron import CronJobBase, Schedule
from django_cron.models import CronJobLog
from notifications import get_filter_locations, get_today_midnight_date
from web.models import ConfigurationItem, Language, Worker
from web.models.constants import KIT_EMAIL_HOUR_CONFIGURATION_TYPE, \
KIT_EMAIL_DAY_OF_WEEK_CONFIGURATION_TYPE, CRON_JOB_TIMEOUT
from web.models.constants import KIT_RECIPIENT_EMAIL_CONFIGURATION_TYPE
from . import wrap_response
from ..forms import KitRequestForm
from ..models import AppointmentType, Appointment
def get_kit_requests(user, start_date=None, end_date=None):
if start_date is None:
start_date = get_today_midnight_date() + datetime.timedelta(days=1)
end_date = start_date + datetime.timedelta(days=7)
else:
if isinstance(start_date, str):
start_date = parse_datetime(start_date)
if isinstance(start_date, unicode):
start_date = datetime.datetime.strptime(start_date, '%Y-%m-%d')
if (end_date is not None) and (isinstance(end_date, str)):
end_date = parse_datetime(end_date)
if (end_date is not None) and (isinstance(end_date, unicode)):
end_date = datetime.datetime.strptime(end_date, '%Y-%m-%d')
appointment_types = AppointmentType.objects.filter(required_equipment__disposable=True)
appointments = Appointment.objects.filter(
appointment_types__in=appointment_types,
datetime_when__gt=start_date,
location__in=get_filter_locations(user),
status=Appointment.APPOINTMENT_STATUS_SCHEDULED,
Piotr Gawron
committed
).distinct().order_by('datetime_when')
if end_date is not None:
appointments = appointments.filter(datetime_when__lt=end_date)
result = {
'start_date': start_date,
'end_date': end_date,
'appointments': appointments,
}
return result
def get_kit_requests_data(request, start_date=None, end_date=None):
form = KitRequestForm()
if request.method == 'POST':
form = KitRequestForm(request.POST)
if form.is_valid():
form_data = form.cleaned_data
start_date = form_data.get('start_date')
end_date = form_data.get('end_date')
params = get_kit_requests(request.user, start_date, end_date)
params.update({
'form': form
})
return params
def kit_requests(request):
return wrap_response(request, 'equipment_and_rooms/kit_requests/kit_requests.html', get_kit_requests_data(request))
end_date_str = " end of time"
end_date_str = data["end_date"].strftime('%Y-%m-%d')
title = "Samples between " + data["start_date"].strftime('%Y-%m-%d') + " and " + end_date_str
cell_style = "padding: 8px; line-height: 1.42857143; vertical-align: top; " \
"font-size: 14px; font-family: 'Source Sans Pro','Helvetica Neue',Helvetica,Arial,sans-serif;"
email_body = "<h1>" + title + "</h1>"
email_body += '<table style="border: 1px solid #f4f4f4;border-spacing: 0;border-collapse: collapse;">' \
'<thead><tr>' \
'<th>Date</th>' \
'<th>ND number</th>' \
'<th>Samples</th>' \
'<th>Location</th>' \
'<th>Person responsible</th>' \
'</tr></thead>'
email_body += "<tbody>"
for appointment in data["appointments"]:
row_style = ""
even = not even
if even:
row_style = ' background-color: #f9f9f9;'
email_body += "<tr style='" + row_style + "'>"
email_body += "<td style='" + cell_style + "'>" + appointment.datetime_when.strftime('%Y-%m-%d %H:%M') + "</td>"
email_body += "<td style='" + cell_style + "'>" + appointment.visit.subject.nd_number + "</td>"
email_body += "<td style='" + cell_style + "'>"
for type in appointment.appointment_types.all():
for item in type.required_equipment.all():
if item.disposable:
email_body += item.name + ", "
email_body += "</td>"
location += " (" + unicode(appointment.flying_team) + ")"
email_body += "<td style='" + cell_style + "'>" + location + "</td>"
email_body += "<td style='" + cell_style + "'>" + unicode(appointment.worker_assigned) + "</td>"
email_body += "</tr>"
email_body += "</tbody></table>"
recipients = ConfigurationItem.objects.get(type=KIT_RECIPIENT_EMAIL_CONFIGURATION_TYPE).value
cc_recipients = []
EmailSender().send_email(title, email_body, recipients, cc_recipients)
def kit_requests_send_mail(request, start_date, end_date=None):
data = get_kit_requests_data(request, start_date, end_date)
messages.add_message(request, messages.SUCCESS, 'Mail sent')
except:
messages.add_message(request, messages.ERROR, 'There was problem with sending email')
return wrap_response(request, 'equipment_and_rooms/kit_requests/kit_requests.html', get_kit_requests_data(request))
class KitRequestEmailSendJob(CronJobBase):
RUN_EVERY_MINUTES = 1
schedule = Schedule(run_every_mins=RUN_EVERY_MINUTES)
code = 'web.kit_request_weekly_email' # a unique code
@timeout_decorator.timeout(CRON_JOB_TIMEOUT)
def do(self):
now = datetime.datetime.utcnow()
hour = int(ConfigurationItem.objects.get(
type=KIT_EMAIL_HOUR_CONFIGURATION_TYPE).value.split(":")[0])
minute = int(ConfigurationItem.objects.get(
type=KIT_EMAIL_HOUR_CONFIGURATION_TYPE).value.split(":")[1])
# check if we sent email this day already
# TODO it's a hack assuming that we are in CEST
date = pytz.utc.localize(date - datetime.timedelta(minutes=122))
jobs = CronJobLog.objects.filter(code=KitRequestEmailSendJob.code, message="mail sent", start_time__gte=date)
if jobs.count() == 0:
if pytz.utc.localize(datetime.datetime.utcnow()) > date:
worker = Worker.objects.create()
data = get_kit_requests(worker)
return "mail sent"
else:
return "day of week doesn't match"
def match_day_of_week(self):
user_day_of_week = ConfigurationItem.objects.get(type=KIT_EMAIL_DAY_OF_WEEK_CONFIGURATION_TYPE).value
locale_name = language.locale
if platform.system() == 'Windows':
locale_name = language.windows_locale_name
try:
locale.setlocale(locale.LC_TIME, locale_name)
except:
logger.error("Problem with setting locale: " + locale_name)
user_day_of_week_int = int(time.strptime(user_day_of_week, '%A').tm_wday) + 1
current_day_of_week_int = int(datetime.datetime.now().strftime("%w"))
return user_day_of_week_int == current_day_of_week_int