import json
from datetime import datetime
from itertools import groupby
import reversion
from autoslug.fields import AutoSlugField
from django.conf import settings
from django.contrib.auth import get_user_model
from django.db import models
from django.urls import reverse
from django.utils import timezone
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _
from guardian.models import GroupObjectPermissionBase, UserObjectPermissionBase
from jsonfield import JSONField
from model_utils.models import TimeStampedModel
from feder.domains.models import Domain
from feder.llm_evaluation.prompts import (
EMAIL_IS_ANSWER,
answer_categorization,
letter_response_normalization,
monitoring_response_normalized_template,
)
from feder.main.utils import (
FormattedDatetimeMixin,
RenderBooleanFieldMixin,
render_normalized_response_html_table,
)
from feder.teryt.models import JST
from .validators import validate_nested_lists, validate_template_syntax
_("Monitorings index")
_("Can add Monitoring")
_("Can change Monitoring")
_("Can delete Monitoring")
NOTIFY_HELP = _("Notify about new alerts person who can view alerts")
def validate_json(j):
try:
return json.loads(j or "")
except json.JSONDecodeError:
return {}
[docs]
class MonitoringQuerySet(FormattedDatetimeMixin, models.QuerySet):
def with_case_count(self):
return self.annotate(case_count=models.Count("case"))
[docs]
def with_case_confirmation_received_count(self):
"""
function to annotate with case count
when case.confirmation_received field is True
"""
return self.annotate(
case_confirmation_received_count=models.Count(
"case", filter=models.Q(case__confirmation_received=True)
)
)
[docs]
def with_case_response_received_count(self):
"""
function to annotate with case count
when case.response_received field is True
"""
return self.annotate(
case_response_received_count=models.Count(
"case", filter=models.Q(case__response_received=True)
)
)
[docs]
def with_case_quarantined_count(self):
"""
function to annotate with case count
when case.is_quarantined field is True
"""
return self.annotate(
case_quarantined_count=models.Count(
"case", filter=models.Q(case__is_quarantined=True)
)
)
def area(self, jst):
# Using Exists() instead of a join avoids a cartesian product with
# annotate(Count('case')) in with_case_count() and removes the need
# for .distinct() to deduplicate the multiplied rows.
from django.db.models import Exists, OuterRef
from feder.cases.models import Case
return self.filter(
Exists(
Case.objects.filter(
monitoring=OuterRef("pk"),
institution__jst__tree_id=jst.tree_id,
institution__jst__lft__range=(jst.lft, jst.rght),
)
)
)
def with_feed_item(self):
return self.select_related("user")
def for_user(self, user):
if user.is_anonymous:
return self.filter(is_public=True)
if user.is_superuser:
return self
any_permission = models.Q(monitoringuserobjectpermission__user=user)
public_only = models.Q(is_public=True)
return self.filter(any_permission | public_only).distinct()
[docs]
@reversion.register()
class Monitoring(RenderBooleanFieldMixin, TimeStampedModel):
perm_model = "monitoringuserobjectpermission"
name = models.CharField(verbose_name=_("Name"), max_length=100)
slug = AutoSlugField(
populate_from="name", max_length=110, verbose_name=_("Slug"), unique=True
)
user = models.ForeignKey(
settings.AUTH_USER_MODEL, on_delete=models.PROTECT, verbose_name=_("User")
)
description = models.TextField(verbose_name=_("Description"), blank=True)
subject = models.CharField(verbose_name=_("Subject"), max_length=100)
hide_new_cases = models.BooleanField(
default=False, verbose_name=_("Hide new cases when assigning?")
)
template = models.TextField(
verbose_name=_("Template"),
help_text=_(
"Use: {{EMAIL}} to insert reply address,"
" {{ADRESAT}} to insert addressee name"
),
validators=[validate_template_syntax, validate_nested_lists],
)
use_llm = models.BooleanField(
default=False,
verbose_name=_("Use LLM"),
help_text=_("Use LLM to evaluate responses"),
)
responses_chat_context = JSONField(
verbose_name=_("Responses chat context"),
null=True,
blank=True,
help_text=_("Monitoring responses context for AI chat"),
)
normalized_response_template = JSONField(
verbose_name=_("Normalized response template"),
null=True,
blank=True,
)
normalized_response_answers_categories = JSONField(
verbose_name=_("Normalized response answers categories"),
null=True,
blank=True,
)
letter_normalization_prompt_extension = models.TextField(
verbose_name=_("Letter normalization prompt extension"),
null=True,
blank=True,
)
letter_normalization_prompt_extension_modified = models.DateTimeField(
verbose_name=_("Letter normalization prompt extension modified"),
null=True,
blank=True,
)
results = models.TextField(
default="",
verbose_name=_("Results"),
help_text=_("Resulrs of monitoring and received responses"),
blank=True,
)
email_footer = models.TextField(
default="",
verbose_name=_("Email footer"),
help_text=_("Footer for sent mail and replies"),
)
notify_alert = models.BooleanField(
default=True, verbose_name=_("Notify about alerts"), help_text=NOTIFY_HELP
)
objects = MonitoringQuerySet.as_manager()
is_public = models.BooleanField(default=True, verbose_name=_("Is public visible?"))
domain = models.ForeignKey(
to=Domain, help_text=_("Domain used to sends emails"), on_delete=models.PROTECT
)
class Meta:
verbose_name = _("Monitoring")
verbose_name_plural = _("Monitoring")
ordering = ["created"]
permissions = (
("add_case", _("Can add case")),
("change_case", _("Can change case")),
("delete_case", _("Can delete case")),
("view_quarantined_case", _("Can view quarantine cases")),
("add_letter", _("Can add letter")),
("reply", _("Can reply")),
("add_draft", _("Add reply draft")),
("change_letter", _("Can change letter")),
("delete_letter", _("Can delete letter")),
("view_alert", _("Can view alert")),
("change_alert", _("Can change alert")),
("delete_alert", _("Can delete alert")),
("manage_perm", _("Can manage perms")),
("view_log", _("Can view logs")),
("spam_mark", _("Can mark spam")),
("add_parcelpost", _("Can add parcel post")),
("change_parcelpost", _("Can change parcel post")),
("delete_parcelpost", _("Can delete parcel post")),
("view_email_address", _("Can view e-mail address")),
("view_tag", _("Can view tag")),
("change_tag", _("Can change tag")),
("delete_tag", _("Can delete tag")),
("view_report", _("Can view report")),
)
def __str__(self):
return self.name
def get_users_with_perm(self, perm=None):
qs = get_user_model().objects.filter(
**{self.perm_model + "__content_object": self}
)
if perm:
qs = qs.filter(**{self.perm_model + "__permission__codename": perm})
return qs.distinct().all()
def get_absolute_url(self):
return reverse("monitorings:details", kwargs={"slug": self.slug})
def render_monitoring_link(self):
url = self.get_absolute_url()
label = self.name
bold_start = "" if not self.is_public else "<b>"
bold_end = "" if not self.is_public else "</b>"
return f'{bold_start}<a href="{url}">{label}</a>{bold_end}'
def render_monitoring_id_link(self):
url = self.get_absolute_url()
bold_start = "" if not self.is_public else "<b>"
bold_end = "" if not self.is_public else "</b>"
return f'{bold_start}<a href="{url}">{self.pk}</a>{bold_end}'
def get_monitoring_cases_table_url(self):
return reverse(
"monitorings:monitoring_cases_table",
kwargs={"slug": self.slug},
)
def render_monitoring_cases_table_link(self):
url = self.get_monitoring_cases_table_url()
label = self.name
bold_start = "" if not self.is_public else "<b>"
bold_end = "" if not self.is_public else "</b>"
return f'{bold_start}<a href="{url}">{label}</a>{bold_end}'
[docs]
def generate_voivodeship_table(self):
"""
Generate html table with monitoring voivodeships and their
institutions and cases counts
"""
voivodeship_list = JST.objects.filter(category__level=1).all().order_by("name")
table = """
<table class="table table-bordered compact" style="width: 100%">
"""
table += """
<tr>
<th>Województwo</th>
<th>Liczba spraw</th>
<th>Liczba spraw z potw. odbioru</th>
<th>Liczba spraw z odpowiedzią</th>
</tr>"""
for voivodeship in voivodeship_list:
table += (
"<tr><td>"
+ voivodeship.name
+ "</td><td class='text-right'>"
+ str(self.case_set.area(voivodeship).count())
+ "</td><td class='text-right'>"
+ str(
self.case_set.filter(confirmation_received=True)
.area(voivodeship)
.count()
)
+ "</td><td class='text-right'>"
+ str(
self.case_set.filter(response_received=True)
.area(voivodeship)
.count()
)
+ "</td></tr>"
)
table += (
"<tr><td><b>Wszystkie</b></td><td class='text-right'><b>"
+ str(str(self.case_set.all().count()))
+ "</b></td><td class='text-right'><b>"
+ str(self.case_set.filter(confirmation_received=True).all().count())
+ "</b></td><td class='text-right'><b>"
+ str(self.case_set.filter(response_received=True).all().count())
+ "</b></td></tr>"
)
table += "</table>"
return mark_safe(table)
def permission_map(self):
dataset = (
self.monitoringuserobjectpermission_set.select_related("permission", "user")
.order_by("permission")
.all()
)
user_list = {x.user for x in dataset}
def index_generate():
grouped = groupby(dataset, lambda x: x.permission)
for perm, users in grouped:
user_perm_list = [x.user for x in users]
yield perm, [(perm, (user in user_perm_list)) for user in user_list]
return user_list, index_generate()
def get_normalized_response_html_table(self):
if self.normalized_response_template:
return render_normalized_response_html_table(
self.normalized_response_template
)
return ""
def get_normalized_responses_data(self, user):
if not self.use_llm:
return []
from feder.letters.models import Letter
resp_letters = (
Letter.objects.filter(record__case__monitoring=self)
.filter(ai_evaluation__contains=EMAIL_IS_ANSWER)
.for_user(user)
.annotate(
case_name=models.F("record__case__name"),
case_id=models.F("record__case__id"),
case_slug=models.F("record__case__slug"),
institution_name=models.F("record__case__institution__name"),
institution_id=models.F("record__case__institution__id"),
institution_email=models.F("record__case__institution__email"),
institution_tags=models.F("record__case__institution__tags__name"),
jst=models.F("record__case__institution__jst__name"),
jst_category=models.F("record__case__institution__jst__category__name"),
jst_code=models.F("record__case__institution__jst__id"),
jst_level=models.F("record__case__institution__jst__category__level"),
jst_parent=models.F("record__case__institution__jst__parent__name"),
jst_parent_parent=models.F(
"record__case__institution__jst__parent__parent__name"
),
)
.order_by(
"record__case__institution__jst__parent__parent__name",
"record__case__institution__jst__parent__name",
"record__case__institution__jst__name",
"record__case__institution__name",
)
)
resp_data = [
{
"case_name": x.case_name,
"case_id": x.case_id,
"case_slug": x.case_slug,
"case_url": "", # url placeholder to add in view
"institution_name": x.institution_name,
"institution_id": x.institution_id,
"institution_email": x.institution_email,
"institution_tags": x.institution_tags,
"jst": x.jst,
"jst_category": x.jst_category,
"jst_code": x.jst_code,
"voivodship": (
x.jst
if x.jst_level == 1
else x.jst_parent if x.jst_level == 2 else x.jst_parent_parent
),
"county": (
x.jst
if x.jst_level == 2
else x.jst_parent if x.jst_level == 3 else ""
),
"community": (x.jst if x.jst_level == 3 else ""),
"jst_full_name": (
(f"{x.jst_parent_parent} / " if x.jst_parent_parent else "")
+ (f"{x.jst_parent} / " if x.jst_parent else "")
+ f"{x.jst} ({x.jst_code}, {x.jst_category})"
),
"letter_id": x.id,
"letter_url": "", # url placeholder to add in view
"received_on": x.created.astimezone(
timezone.get_default_timezone()
).strftime("%Y-%m-%d %H:%M:%S"),
"normalized_response": validate_json(x.normalized_response),
}
for x in resp_letters
]
return resp_data
def refresh_normalized_response_answers_categories(self):
nr_answers_categories = validate_json(
self.normalized_response_answers_categories
)
nr_template_dict = validate_json(self.normalized_response_template)
if len(nr_template_dict.keys()) > 0:
for key in nr_template_dict.keys():
if key not in nr_answers_categories:
nr_answers_categories[key] = {
"question": nr_template_dict[key].get("Pytanie", ""),
"answer_categories": "",
}
else:
nr_answers_categories[key]["question"] = nr_template_dict[key].get(
"Pytanie", ""
)
for key in nr_answers_categories.keys():
if key not in nr_template_dict:
del nr_answers_categories[key]
self.normalized_response_answers_categories = json.dumps(
nr_answers_categories, indent=4
)
self.save()
def get_normalized_response_answers_categories_dict(self):
self.refresh_normalized_response_answers_categories()
return validate_json(self.normalized_response_answers_categories)
def set_answer_categories_for_question(self, question_number, answer_categories):
nr_answers_categories = self.get_normalized_response_answers_categories_dict()
nr_answers_categories[question_number]["answer_categories"] = answer_categories
nr_answers_categories[question_number][
"update_time"
] = timezone.localtime().strftime("%Y-%m-%d %H:%M:%S %Z%z")
self.normalized_response_answers_categories = json.dumps(
nr_answers_categories, indent=4
)
self.save()
def get_answer_categories_for_question(self, question_number):
answers_categorization_dict = (
self.get_normalized_response_answers_categories_dict()
)
if question_number in answers_categorization_dict:
answer_categories = answers_categorization_dict[question_number][
"answer_categories"
]
return answer_categories
return ""
def get_categories_update_time_for_question(self, question_number):
answers_categorization_dict = (
self.get_normalized_response_answers_categories_dict()
)
if question_number in answers_categorization_dict:
update_time = answers_categorization_dict[question_number].get(
"update_time", ""
)
try:
return datetime.strptime(update_time, "%Y-%m-%d %H:%M:%S %Z%z")
except ValueError:
return None
return None
def get_answer_categorization_prompt_sample(self, question_number):
answers_categorization_dict = (
self.get_normalized_response_answers_categories_dict()
)
if question_number in answers_categorization_dict:
question_text = answers_categorization_dict[question_number]["question"]
answer_categories = answers_categorization_dict[question_number][
"answer_categories"
]
if not answer_categories:
return _(
"Response categories have not been defined, so the LLM request"
+ " for response categories will not be sent."
)
return answer_categorization.format(
institution=_("INSTITUTION"),
question=question_text,
answer=_("INSTITUTION RESPONSE"),
answer_categories=answer_categories,
).replace(" ", "")
else:
return _(
f'There is no question "{question_number}", so the LLM query for'
+ " answer categories will not be sent."
)
def get_template_normalization_prompt_sample(self):
return monitoring_response_normalized_template.format(
monitoring_template=self.template,
)
@property
def normalized_response_template_is_up_to_date(self):
if not self.normalized_response_template or not self.use_llm:
return False
from feder.llm_evaluation.models import LlmMonitoringRequest
return LlmMonitoringRequest.objects.filter(
name="get_response_normalized_template",
evaluated_monitoring=self,
request_prompt=self.get_template_normalization_prompt_sample(),
).exists()
@property
def normalized_response_template_created(self):
if not self.normalized_response_template or not self.use_llm:
return None
from feder.llm_evaluation.models import LlmMonitoringRequest
llm_request = (
LlmMonitoringRequest.objects.filter(
name="get_response_normalized_template",
evaluated_monitoring=self,
)
.order_by("created")
.last()
)
return llm_request.created if llm_request else None
def get_letter_normalization_prompt_sample(self):
if not self.use_llm:
return _(
"The LLM has not been enabled, so the LLM request for normalization"
+ " will not be sent."
)
if not self.normalized_response_template:
return _(
"The normalization template has not been defined, so the LLM request"
+ " for normalization will not be sent."
)
return letter_response_normalization.format(
institution=_("INSTITUTION"),
normalized_questions=self.normalized_response_template,
monitoring_response=_("INSTITUTION RESPONSE"),
prompt_instruction_extension=self.letter_normalization_prompt_extension
or _("PROMPT INSTRUCTION EXTENSION"),
)
[docs]
class MonitoringUserObjectPermission(UserObjectPermissionBase):
content_object = models.ForeignKey(Monitoring, on_delete=models.PROTECT)
[docs]
class MonitoringGroupObjectPermission(GroupObjectPermissionBase):
content_object = models.ForeignKey(Monitoring, on_delete=models.PROTECT)