-
Piotr Gawron authoredPiotr Gawron authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
daily_planning.py 11.04 KiB
import datetime
import json
import random
from operator import itemgetter
from django.contrib.auth.decorators import login_required
from django.http import JsonResponse
from django.shortcuts import get_object_or_404
from ..models import Appointment, AppointmentTypeLink, Worker, Availability, Holiday
from ..views.notifications import get_filter_locations
# mix_color = (0, 166, 90)
mix_color = (155, 155, 155)
def get_random_color():
r = lambda: random.randint(0, 255)
return '#%02X%02X%02X' % ((r() + mix_color[0]) / 2, (r() + mix_color[1]) / 2, (r() + mix_color[2]) / 2)
RANDOM_COLORS = [get_random_color() for i in range(20)]
def build_duration(duration):
number_of_hours = duration // 60
minutes = duration - number_of_hours * 60
return "{0:02d}:{1:02d}".format(number_of_hours, minutes)
def get_holidays(worker, date):
today_start = datetime.datetime.strptime(date, '%Y-%m-%d').replace(hour=0, minute=0)
today_end = datetime.datetime.strptime(date, '%Y-%m-%d').replace(hour=23, minute=59)
holidays = Holiday.objects.filter(person=worker, datetime_start__lte=today_end, datetime_end__gte=today_start)
result = []
for holiday in holidays:
minutes = int((holiday.datetime_end - holiday.datetime_start).total_seconds() / 60)
# hack for time zones
start_date = datetime.datetime.combine(today_start, holiday.datetime_start.time())
end_date = datetime.datetime.combine(today_start, holiday.datetime_end.time())
event = {
'duration': build_duration(minutes),
'link_when': start_date,
'link_who': worker.id,
'link_end': end_date,
}
result.append(event)
return result
def remove_holidays(availability, worker, date):
today_start = datetime.datetime.strptime(date, '%Y-%m-%d').replace(hour=0, minute=0)
today_end = datetime.datetime.strptime(date, '%Y-%m-%d').replace(hour=23, minute=59)
holidays_starting_before = Holiday.objects.filter(person=worker, datetime_start__lte=today_start,
datetime_end__gte=today_start)
holidays_starting_today = Holiday.objects.filter(person=worker, datetime_start__lte=today_end,
datetime_start__gte=today_start)
holidays_ending_today = Holiday.objects.filter(person=worker, datetime_end__lte=today_end,
datetime_end__gte=today_start)
result = []
timestamps = []
timestamps.append({
"time": availability.available_from,
"type": "start"
})
timestamps.append({
"time": availability.available_till,
"type": "stop"
})
direction = -holidays_starting_before.count()
for holiday in holidays_starting_today:
timestamps.append({
"time": holiday.datetime_start.time(),
"type": "stop"
})
for holiday in holidays_ending_today:
timestamps.append({
"time": holiday.datetime_end.time(),
"type": "start"
})
start_date = None
stop_date = None
sorted_array = sorted(timestamps, key=itemgetter('time'))
for timestamp in sorted_array:
type = timestamp.get("type", None)
if type == "stop":
direction -= 1
stop_date = timestamp["time"]
if direction == 0:
result.append(Availability(person=worker, day_number=availability.day_number,
available_from=start_date, available_till=stop_date))
elif type == "start":
direction += 1
start_date = timestamp["time"]
else:
raise TypeError("Unknown type: " + str(type))
if direction > 0:
result.append(Availability(person=worker, day_number=availability.day_number,
available_from=start_date, available_till=today_end.time()))
return result
def get_availabilities(worker, date):
result = []
today = datetime.datetime.strptime(date, '%Y-%m-%d')
weekday = today.weekday() + 1
availabilities = Availability.objects.filter(person=worker, day_number=weekday)
for availability in availabilities:
availabilities_without_holidays = remove_holidays(availability, worker, date)
for availability_without_holiday in availabilities_without_holidays:
start_date = datetime.datetime.combine(today, availability_without_holiday.available_from)
end_date = datetime.datetime.combine(today, availability_without_holiday.available_till)
delta = end_date - start_date
minutes = int(delta.total_seconds() / 60)
event = {
'duration': build_duration(minutes),
'link_when': start_date,
'link_who': worker.id,
'link_end': end_date,
}
result.append(event)
return result
def get_conflicts(worker, date):
result = []
appointments = Appointment.objects.filter(
datetime_when__date=date,
location__in=get_filter_locations(worker),
status=Appointment.APPOINTMENT_STATUS_SCHEDULED,
visit__isnull=False).all()
for i, appointment in enumerate(appointments):
if appointment.worker_assigned is not None:
link_when = appointment.datetime_when
link_when = link_when.replace(tzinfo=None)
link_end = link_when + datetime.timedelta(minutes=appointment.length)
event = {
'title': appointment.title(),
'duration': appointment.length,
'appointment_id': appointment.id,
'link_when': link_when,
'link_who': appointment.worker_assigned.id,
'link_end': link_end,
'location': str(appointment.location),
}
result.append(event)
links = AppointmentTypeLink.objects.filter(appointment=appointment).all()
for j, link in enumerate(links):
link_when = link.date_when
if link_when is not None:
link_when = link_when.replace(tzinfo=None)
link_end = link_when + datetime.timedelta(minutes=link.appointment_type.default_duration)
event = {
'title': link.appointment_type.description,
'duration': build_duration(link.appointment_type.default_duration),
'appointment_id': appointment.id,
'link_when': link_when,
'link_who': link.worker_id,
'link_end': link_end,
'location': str(appointment.location),
}
result.append(event)
return result
@login_required
def events(request, date):
appointments = Appointment.objects.filter(
datetime_when__date=date,
location__in=get_filter_locations(request.user),
status=Appointment.APPOINTMENT_STATUS_SCHEDULED,
visit__isnull=False).all()
subjects = {}
for i, appointment in enumerate(appointments):
appointment_subject = appointment.visit.subject
if appointment_subject.id not in subjects:
# create subject
subject = {
'name': unicode(appointment_subject),
'id': appointment_subject.id,
'color': RANDOM_COLORS[i],
'start': appointment.datetime_when.replace(tzinfo=None).strftime("%H:%M:00"),
# this indicates only location of the first appointment
# (there is small chance to have two appointments in two different places at the same day)
'location': str(appointment.location),
'events': []
}
subjects[appointment_subject.id] = subject
links = AppointmentTypeLink.objects.filter(appointment=appointment).all()
for j, link in enumerate(links):
link_when = link.date_when
link_end = None
if link_when is not None:
link_when = link_when.replace(tzinfo=None)
link_end = link_when + datetime.timedelta(minutes=link.appointment_type.default_duration)
event = {
'title': link.appointment_type.description,
'duration': build_duration(link.appointment_type.default_duration),
'subject': unicode(appointment_subject),
'id': '{}-{}'.format(i, j),
'link_id': link.id,
'link_when': link_when,
'link_who': link.worker_id,
'link_end': link_end,
'location': str(appointment.location),
'flying_team_location': unicode(appointment.flying_team),
'appointment_start': appointment.datetime_when.replace(tzinfo=None),
'appointment_end': appointment.datetime_when.replace(tzinfo=None, hour=19, minute=0),
}
subject_events = subjects[appointment_subject.id]['events']
subject_events.append(event)
availabilities = []
holidays = []
workers = Worker.objects.filter(locations__in=get_filter_locations(request.user)).exclude(
role=Worker.ROLE_CHOICES_SECRETARY).distinct()
for worker in workers:
availabilities = availabilities + get_availabilities(worker, date)
holidays = holidays + get_holidays(worker, date)
return JsonResponse({
"data": subjects.values(),
'availabilities': availabilities,
'holidays': holidays
})
def availabilities(request, date):
availabilities = []
holidays = []
conflicts = []
workers = Worker.objects.filter(locations__in=get_filter_locations(request.user)).exclude(
role=Worker.ROLE_CHOICES_SECRETARY).distinct()
for worker in workers:
availabilities = availabilities + get_availabilities(worker, date)
holidays = holidays + get_holidays(worker, date)
conflicts = conflicts + get_conflicts(worker, date)
return JsonResponse({
"conflicts": conflicts,
'availabilities': availabilities,
'holidays': holidays
})
@login_required
def events_persist(request):
event_links = json.loads(request.POST.get('events_to_persist'))
for event_link in event_links:
try:
when = datetime.datetime.strptime(event_link['start'], '%Y-%m-%dT%H:%M:00')
except ValueError:
when = datetime.datetime.strptime(event_link['start'][0:-6], '%Y-%m-%dT%H:%M:00')
worker_id = event_link['link_who']
link_id = event_link['link_id']
appointment_type_link = get_object_or_404(AppointmentTypeLink, pk=link_id)
appointment_type_link.worker_id = worker_id
appointment_type_link.date_when = when
appointment_type_link.save()
events_to_clear = json.loads(request.POST.get('events_to_clear'))
for link_id in events_to_clear:
appointment_type_link = get_object_or_404(AppointmentTypeLink, pk=link_id)
appointment_type_link.worker_id = None
appointment_type_link.date_when = None
appointment_type_link.save()
return JsonResponse({}, status=201)