import json
from itertools import groupby
import reversion
from autoslug.fields import AutoSlugField
from django.conf import settings
from django.contrib.auth import get_user_model
from django.db import models
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from guardian.models import GroupObjectPermissionBase, UserObjectPermissionBase
from jsonfield import JSONField
from model_utils.models import TimeStampedModel
from feder.domains.models import Domain
from feder.main.utils import (
FormattedDatetimeMixin,
RenderBooleanFieldMixin,
render_normalized_response_html_table,
)
from feder.teryt.models import JST
from .validators import validate_nested_lists, validate_template_syntax
_("Monitorings index")
_("Can add Monitoring")
_("Can change Monitoring")
_("Can delete Monitoring")
NOTIFY_HELP = _("Notify about new alerts person who can view alerts")
[docs]
class MonitoringQuerySet(FormattedDatetimeMixin, models.QuerySet):
def with_case_count(self):
return self.annotate(case_count=models.Count("case"))
[docs]
def with_case_confirmation_received_count(self):
"""
function to annotate with case count
when case.confirmation_received field is True
"""
return self.annotate(
case_confirmation_received_count=models.Count(
"case", filter=models.Q(case__confirmation_received=True)
)
)
[docs]
def with_case_response_received_count(self):
"""
function to annotate with case count
when case.response_received field is True
"""
return self.annotate(
case_response_received_count=models.Count(
"case", filter=models.Q(case__response_received=True)
)
)
[docs]
def with_case_quarantined_count(self):
"""
function to annotate with case count
when case.is_quarantined field is True
"""
return self.annotate(
case_quarantined_count=models.Count(
"case", filter=models.Q(case__is_quarantined=True)
)
)
def area(self, jst):
return self.filter(
case__institution__jst__tree_id=jst.tree_id,
case__institution__jst__lft__range=(jst.lft, jst.rght),
)
def with_feed_item(self):
return self.select_related("user")
def for_user(self, user):
if user.is_anonymous:
return self.filter(is_public=True)
if user.is_superuser:
return self
any_permission = models.Q(monitoringuserobjectpermission__user=user)
public_only = models.Q(is_public=True)
return self.filter(any_permission | public_only).distinct()
[docs]
@reversion.register()
class Monitoring(RenderBooleanFieldMixin, TimeStampedModel):
perm_model = "monitoringuserobjectpermission"
name = models.CharField(verbose_name=_("Name"), max_length=100)
slug = AutoSlugField(
populate_from="name", max_length=110, verbose_name=_("Slug"), unique=True
)
user = models.ForeignKey(
settings.AUTH_USER_MODEL, on_delete=models.PROTECT, verbose_name=_("User")
)
description = models.TextField(verbose_name=_("Description"), blank=True)
subject = models.CharField(verbose_name=_("Subject"), max_length=100)
hide_new_cases = models.BooleanField(
default=False, verbose_name=_("Hide new cases when assigning?")
)
template = models.TextField(
verbose_name=_("Template"),
help_text=_("Use {{EMAIL}} for insert reply address"),
validators=[validate_template_syntax, validate_nested_lists],
)
use_llm = models.BooleanField(
default=False,
verbose_name=_("Use LLM"),
help_text=_("Use LLM to evaluate responses"),
)
responses_chat_context = JSONField(
verbose_name=_("Responses chat context"),
null=True,
blank=True,
help_text=_("Monitoring responses context for AI chat"),
)
normalized_response_template = JSONField(
verbose_name=_("Normalized response template"),
null=True,
blank=True,
)
results = models.TextField(
default="",
verbose_name=_("Results"),
help_text=_("Resulrs of monitoring and received responses"),
blank=True,
)
email_footer = models.TextField(
default="",
verbose_name=_("Email footer"),
help_text=_("Footer for sent mail and replies"),
)
notify_alert = models.BooleanField(
default=True, verbose_name=_("Notify about alerts"), help_text=NOTIFY_HELP
)
objects = MonitoringQuerySet.as_manager()
is_public = models.BooleanField(default=True, verbose_name=_("Is public visible?"))
domain = models.ForeignKey(
to=Domain, help_text=_("Domain used to sends emails"), on_delete=models.PROTECT
)
class Meta:
verbose_name = _("Monitoring")
verbose_name_plural = _("Monitoring")
ordering = ["created"]
permissions = (
("add_case", _("Can add case")),
("change_case", _("Can change case")),
("delete_case", _("Can delete case")),
("view_quarantined_case", _("Can view quarantine cases")),
("add_letter", _("Can add letter")),
("reply", _("Can reply")),
("add_draft", _("Add reply draft")),
("change_letter", _("Can change letter")),
("delete_letter", _("Can delete letter")),
("view_alert", _("Can view alert")),
("change_alert", _("Can change alert")),
("delete_alert", _("Can delete alert")),
("manage_perm", _("Can manage perms")),
("view_log", _("Can view logs")),
("spam_mark", _("Can mark spam")),
("add_parcelpost", _("Can add parcel post")),
("change_parcelpost", _("Can change parcel post")),
("delete_parcelpost", _("Can delete parcel post")),
("view_email_address", _("Can view e-mail address")),
("view_tag", _("Can view tag")),
("change_tag", _("Can change tag")),
("delete_tag", _("Can delete tag")),
("view_report", _("Can view report")),
)
def __str__(self):
return self.name
def get_users_with_perm(self, perm=None):
qs = get_user_model().objects.filter(
**{self.perm_model + "__content_object": self}
)
if perm:
qs = qs.filter(**{self.perm_model + "__permission__codename": perm})
return qs.distinct().all()
def get_absolute_url(self):
return reverse("monitorings:details", kwargs={"slug": self.slug})
def render_monitoring_link(self):
url = self.get_absolute_url()
label = self.name
bold_start = "" if not self.is_public else "<b>"
bold_end = "" if not self.is_public else "</b>"
return f'{bold_start}<a href="{url}">{label}</a>{bold_end}'
def get_monitoring_cases_table_url(self):
return reverse(
"monitorings:monitoring_cases_table",
kwargs={"slug": self.slug},
)
def render_monitoring_cases_table_link(self):
url = self.get_monitoring_cases_table_url()
label = self.name
bold_start = "" if not self.is_public else "<b>"
bold_end = "" if not self.is_public else "</b>"
return f'{bold_start}<a href="{url}">{label}</a>{bold_end}'
[docs]
def generate_voivodeship_table(self):
"""
Generate html table with monitoring voivodeships and their
institutions and cases counts
"""
voivodeship_list = JST.objects.filter(category__level=1).all().order_by("name")
table = """
<table class="table table-bordered compact" style="width: 100%">
"""
table += """
<tr>
<th>Województwo</th>
<th>Liczba spraw</th>
<th>Liczba spraw w kwarantannie</th>
</tr>"""
for voivodeship in voivodeship_list:
table += (
"<tr><td>"
+ voivodeship.name
+ "</td><td>"
+ str(self.case_set.area(voivodeship).count())
+ "</td><td>"
+ str(
self.case_set.filter(is_quarantined=True).area(voivodeship).count()
)
+ "</td></tr>"
)
table += "</table>"
return table
def permission_map(self):
dataset = (
self.monitoringuserobjectpermission_set.select_related("permission", "user")
.order_by("permission")
.all()
)
user_list = {x.user for x in dataset}
def index_generate():
grouped = groupby(dataset, lambda x: x.permission)
for perm, users in grouped:
user_perm_list = [x.user for x in users]
yield perm, [(perm, (user in user_perm_list)) for user in user_list]
return user_list, index_generate()
def get_normalized_response_html_table(self):
if self.normalized_response_template:
return render_normalized_response_html_table(
self.normalized_response_template
)
return ""
def get_normalized_responses_data(self, user):
if not self.use_llm:
return []
from feder.letters.models import Letter
def validate_json(j):
try:
return json.loads(j)
except json.JSONDecodeError:
return {}
resp_letters = (
Letter.objects.filter(record__case__monitoring=self)
.filter(ai_evaluation__contains="A) email jest odpowiedzią")
.for_user(user)
.annotate(
case_name=models.F("record__case__name"),
case_id=models.F("record__case__id"),
institution_name=models.F("record__case__institution__name"),
institution_id=models.F("record__case__institution__id"),
institution_email=models.F("record__case__institution__email"),
jst=models.F("record__case__institution__jst__name"),
jst_category=models.F("record__case__institution__jst__category__name"),
jst_code=models.F("record__case__institution__jst__id"),
jst_level=models.F("record__case__institution__jst__category__level"),
jst_parent=models.F("record__case__institution__jst__parent__name"),
jst_parent_parent=models.F(
"record__case__institution__jst__parent__parent__name"
),
)
.order_by(
"record__case__institution__jst__parent__parent__name",
"record__case__institution__jst__parent__name",
"record__case__institution__jst__name",
"record__case__institution__name",
)
)
resp_data = [
{
"case_name": x.case_name,
"case_id": x.case_id,
"institution_name": x.institution_name,
"institution_id": x.institution_id,
"institution_email": x.institution_email,
"jst": x.jst,
"jst_category": x.jst_category,
"jst_code": x.jst_code,
"voivodship": (
x.jst
if x.jst_level == 1
else x.jst_parent if x.jst_level == 2 else x.jst_parent_parent
),
"county": (
x.jst
if x.jst_level == 2
else x.jst_parent if x.jst_level == 3 else ""
),
"community": (x.jst if x.jst_level == 3 else ""),
"jst_full_name": (
(f"{x.jst_parent_parent} / " if x.jst_parent_parent else "")
+ (f"{x.jst_parent} / " if x.jst_parent else "")
+ f"{x.jst} ({x.jst_code}, {x.jst_category})"
),
"received_on": x.created.astimezone(
timezone.get_default_timezone()
).strftime("%Y-%m-%d %H:%M:%S"),
"normalized_response": validate_json(x.normalized_response),
}
for x in resp_letters
]
return resp_data
[docs]
class MonitoringUserObjectPermission(UserObjectPermissionBase):
content_object = models.ForeignKey(Monitoring, on_delete=models.PROTECT)
[docs]
class MonitoringGroupObjectPermission(GroupObjectPermissionBase):
content_object = models.ForeignKey(Monitoring, on_delete=models.PROTECT)