import email
import logging
import uuid
from email.utils import getaddresses
import requests
from atom.models import AttachmentBase
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.fields import GenericRelation
from django.contrib.sites.shortcuts import get_current_site
from django.core.exceptions import ValidationError
from django.core.files.base import ContentFile
from django.core.mail.message import EmailMultiAlternatives, make_msgid
from django.core.validators import validate_email
from django.db import models
from django.db.models.manager import BaseManager
from django.template.loader import render_to_string
from django.urls import reverse
from django.utils import timezone
from django.utils.encoding import force_str
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _
from django_extensions.db.models import TimeStampedModel
from jsonfield import JSONField
from model_utils import Choices
from feder.cases.models import Case, enforce_quarantined_queryset
from feder.domains.models import Domain
from feder.institutions.models import Institution
from feder.llm_evaluation.prompts import letter_categorization
from feder.main.exceptions import FederValueError
from feder.main.utils import get_email_domain, render_normalized_response_html_table
from feder.records.models import AbstractRecord, AbstractRecordQuerySet, Record
from ..virus_scan.models import Request as ScanRequest
from .logs.tasks import update_sent_letter_status
from .utils import (
html_email_wrapper,
html_to_text,
is_formatted_html,
normalize_msg_id,
text_email_wrapper,
text_to_html,
)
logger = logging.getLogger(__name__)
[docs]
class LetterQuerySet(AbstractRecordQuerySet):
def attachment_count(self):
return self.annotate(attachment_count=models.Count("attachment"))
def with_author(self):
return self.select_related("author_user", "author_institution")
def for_milestone(self):
return self.with_attachment().with_author()
def for_api(self):
return self.for_milestone().select_related("emaillog")
def is_draft(self):
return self.filter(is_draft=True).is_outgoing()
def is_outgoing(self):
return self.filter(author_user__isnull=False)
def is_incoming(self):
return self.filter(author_user__isnull=True)
def recent(self):
return self.filter(created__gt=timezone.now() - timezone.timedelta(days=7))
def with_feed_items(self):
return (
self.with_author()
.select_related(
"record__case__institution__jst", "record__case__monitoring"
)
.with_attachment()
)
def with_attachment(self):
return self.prefetch_related("attachment_set").prefetch_related(
"attachment_set__scan_request"
)
def exclude_spam(self):
return self.exclude(is_spam=Letter.SPAM.spam)
def filter_automatic(self):
return self.filter(message_type__in=[i[0] for i in Letter.MESSAGE_TYPES_AUTO])
def exclude_automatic(self):
return self.exclude(message_type__in=[i[0] for i in Letter.MESSAGE_TYPES_AUTO])
def for_user(self, user):
if user.is_anonymous:
return self.filter(
record__case__is_quarantined=False,
record__case__monitoring__is_public=True,
)
if user.is_superuser or user.is_authenticated:
return self
class LetterManager(BaseManager.from_queryset(LetterQuerySet)):
def get_queryset(self):
return (
super().get_queryset()
# TODO use this filter in particular views only
# .filter(is_spam__in=[Letter.SPAM.unknown, Letter.SPAM.non_spam])
)
[docs]
class Letter(AbstractRecord):
SPAM = Choices(
(0, "unknown", _("Unknown")),
(1, "non_spam", _("Non-spam")),
(2, "spam", _("Spam")),
(3, "probable_spam", _("Probable spam")),
)
MESSAGE_TYPES = Choices(
(0, "unknown", _("Unknown")),
(1, "regular", _("Regular")),
(2, "disposition_notification", _("Disposition notification")),
(3, "vacation_reply", _("Vacation reply")),
(4, "mass_draft", _("Mass message draft")),
)
MESSAGE_TYPES_AUTO = MESSAGE_TYPES.subset(
"disposition_notification", "vacation_reply"
)
author_user = models.ForeignKey(
to=settings.AUTH_USER_MODEL,
on_delete=models.PROTECT,
verbose_name=_("Author (if user)"),
null=True,
blank=True,
)
author_institution = models.ForeignKey(
Institution,
on_delete=models.PROTECT,
verbose_name=_("Author (if institution)"),
null=True,
blank=True,
)
title = models.CharField(verbose_name=_("Subject"), max_length=200)
body = models.TextField(verbose_name=_("Text"))
html_body = models.TextField(verbose_name=_("Text in HTML"), blank=True)
quote = models.TextField(verbose_name=_("Quote"), blank=True)
html_quote = models.TextField(verbose_name=_("Quote in HTML"), blank=True)
email = models.EmailField(verbose_name=_("E-mail"), max_length=100, blank=True)
email_from = models.EmailField(
verbose_name=_("From email address"), max_length=100, blank=True, null=True
)
email_to = models.EmailField(
verbose_name=_("To email address"), max_length=100, blank=True, null=True
)
note = models.TextField(verbose_name=_("Comments from editor"), blank=True)
ai_evaluation = models.TextField(
verbose_name=_("Letter AI evaluation"), blank=True, null=True
)
normalized_response = JSONField(
verbose_name=_("Normalized monitoring response"),
null=True,
blank=True,
)
is_spam = models.IntegerField(
verbose_name=_("Is SPAM?"), choices=SPAM, default=SPAM.unknown, db_index=True
)
is_draft = models.BooleanField(verbose_name=_("Is draft?"), default=True)
message_type = models.IntegerField(
verbose_name=_("Message type"),
choices=MESSAGE_TYPES,
default=MESSAGE_TYPES.unknown,
)
mark_spam_by = models.ForeignKey(
to=settings.AUTH_USER_MODEL,
null=True,
blank=True,
on_delete=models.PROTECT,
verbose_name=_("Spam marker"),
help_text=_("The person who marked it as spam"),
related_name="letter_mark_spam_by",
)
mark_spam_at = models.DateTimeField(
null=True,
blank=True,
verbose_name=_("Time of mark as spam"),
help_text=_("Time when letter was marked as spam"),
)
message_id_header = models.CharField(
blank=True,
verbose_name=_('ID of sent email message "Message-ID"'),
max_length=500,
)
eml = models.FileField(
upload_to="messages/%Y/%m/%d", verbose_name=_("File"), null=True, blank=True
)
objects = LetterManager()
objects_with_spam = LetterQuerySet.as_manager()
def is_spam_validated(self):
return self.is_spam in (Letter.SPAM.spam, Letter.SPAM.non_spam)
def is_mass_draft(self):
return self.is_draft and self.message_type == self.MESSAGE_TYPES.mass_draft
class Meta:
verbose_name = _("Letter")
verbose_name_plural = _("Letters")
ordering = ["created"]
indexes = AbstractRecord.Meta.indexes
permissions = (
("can_filter_eml", _("Can filter eml")),
("recognize_letter", _("Can recognize letter")),
)
def delete(self, *args, **kwargs):
self.record.delete() # Delete the associated Record instance
super().delete(*args, **kwargs)
@property
def is_incoming(self):
return not bool(self.author_user_id)
@property
def is_outgoing(self):
return bool(self.author_user_id)
def get_title(self):
if self.title and self.title.strip():
return self.title
return _("(no subject)")
def __str__(self):
return force_str(self.get_title())
def get_absolute_url(self):
if self.case or self.is_mass_draft():
url = reverse("letters:details", kwargs={"pk": self.pk})
else:
url = reverse("letters:assign", kwargs={"pk": self.pk})
return url
def get_eml_url(self):
if not self.eml:
return None
return reverse("letters:download", kwargs={"pk": self.pk})
@property
def author(self):
return self.author_user if self.author_user_id else self.author_institution
@author.setter
def author(self, value):
if isinstance(value, Institution):
self.author_user = None
self.author_institution = value
elif isinstance(value, get_user_model()):
self.author_institution = None
self.author_user = value
else:
raise ValueError(
"Only User and Institution is allowed for attribute author"
)
@classmethod
def send_new_case(cls, case):
context = {
"html_body": mark_safe(
case.monitoring.template
if is_formatted_html(case.monitoring.template)
else text_to_html(case.monitoring.template)
),
"text_body": mark_safe(
html_to_text(case.monitoring.template)
if is_formatted_html(case.monitoring.template)
else case.monitoring.template
),
"html_footer": mark_safe(
case.monitoring.email_footer
if is_formatted_html(case.monitoring.email_footer)
else text_to_html(case.monitoring.email_footer)
),
"text_footer": mark_safe(
html_to_text(case.monitoring.email_footer)
if is_formatted_html(case.monitoring.email_footer)
else case.monitoring.email_footer
),
}
letter = cls(
author_user=case.user,
email_from=str(case.get_email_address()),
record=Record.objects.create(case=case),
title=case.monitoring.subject,
html_body=render_to_string("letters/_letter_reply_body.html", context),
body=render_to_string("letters/_letter_reply_body.txt", context),
)
letter.save()
letter.send(commit=True, only_email=False)
update_sent_letter_status(schedule=(3 * 60))
return letter
def _email_context(self):
body = self.body.replace("{{EMAIL}}", self.case.email)
html_body = self.html_body.replace("{{EMAIL}}", self.case.email)
quote = self.quote.replace("{{EMAIL}}", self.case.email)
html_quote = self.html_quote.replace("{{EMAIL}}", self.case.email)
context = {
"html_body": mark_safe(html_body),
"text_body": mark_safe(body),
# "html_footer": mark_safe(self.case.monitoring.email_footer),
# "text_footer": mark_safe(html_to_text(self.case.monitoring.email_footer)),
"text_quote": mark_safe(text_email_wrapper(quote)),
"html_quote": mark_safe(html_email_wrapper(html_quote)),
}
return context
def html_body_with_footer(self):
context = {
"html_body": mark_safe(self.html_body),
"html_footer": mark_safe(self.case.monitoring.email_footer),
}
return render_to_string("letters/_letter_reply_body.html", context)
def email_body(self):
context = self._email_context()
html_content = render_to_string("letters/_letter_reply_body.html", context)
txt_content = render_to_string("letters/_letter_reply_body.txt", context)
return html_content, txt_content
def _construct_message(self, msg_id=None):
headers = {
"Return-Receipt-To": self.case.email,
"Disposition-Notification-To": self.case.email,
}
if msg_id:
headers["Message-ID"] = msg_id
html_content, txt_content = self.email_body()
msg = EmailMultiAlternatives(
subject=(
self.case.monitoring.subject if self.is_mass_draft() else self.title
),
from_email=str(self.case.get_email_address()),
reply_to=[self.case.email],
to=[self.case.institution.email],
body=txt_content,
headers=headers,
attachments=[
(att.filename, att.attachment.file.read(), "application/octet-stream")
for att in self.attachment_set.all()
],
)
msg.attach_alternative(html_content, "text/html")
return msg
[docs]
def generate_mass_letters(self):
"""
Uses this letter as a template for generating mass message
(it has to be defined with "mass draft" message type).
prepares and returns generated letters ready for sending.
"""
if not self.is_mass_draft():
raise FederValueError(
'mass_send method can only be executed for "mass_draft" message type.'
)
# preparing letter content
letter_data = {}
for name in [
"author_user",
"title",
"html_body",
"html_quote",
"body",
"quote",
"note",
]:
letter_data[name] = getattr(self, name)
letter_data["is_draft"] = False
letter_data["message_type"] = self.MESSAGE_TYPES.regular
letters = []
for case in self.mass_draft.determine_cases():
letter = Letter(**letter_data)
letter.record = Record.objects.create(case=case)
letter.save()
# Copying attachments
for attachment in self.attachment_set.all():
attachment_copy = Attachment(letter=letter)
file_copy = ContentFile(attachment.attachment.read())
file_copy.name = attachment.attachment.name
attachment_copy.attachment = file_copy
attachment_copy.save()
letters.append(letter)
return letters
def send(self, commit=True, only_email=False):
if self.is_mass_draft():
raise FederValueError(
'send method can not be executed for "mass_draft" message type.'
)
self.case.update_email()
msg_id = make_msgid(domain=self.case.email.split("@", 2)[1])
message = self._construct_message(msg_id=msg_id)
text = message.message().as_bytes()
self.email = self.case.institution.email
self.message_id_header = normalize_msg_id(msg_id)
self.eml.save("%s.eml" % uuid.uuid4(), ContentFile(text), save=False)
self.is_draft = False
if commit:
self.save(update_fields=["eml", "email"] if only_email else None)
if self.case.first_request is None:
self.case.first_request = self
self.case.save()
else:
self.case.last_request = self
self.case.save()
return message.send()
[docs]
def get_recipients(self):
"""
Returns a list of all email addresses from the "To" and "Cc" fields of the
Letter eml file.
"""
if not self.eml:
return []
with self.eml.open(mode="rb") as f:
msg = email.message_from_binary_file(f)
to_addrs = msg.get_all("To", [])
cc_addrs = msg.get_all("Cc", [])
all_addrs = to_addrs + cc_addrs
return [addr for name, addr in getaddresses(all_addrs)]
@property
def allowed_recipient(self):
"""
Returns True if any of the recipients from Letter.get_recipients email domain
is in monitoring domains.
"""
recipients = self.get_recipients()
monitoring_domains = Domain.objects.all().values_list("name", flat=True)
for recipient in recipients:
try:
validate_email(recipient)
recipient_domain = recipient.split("@")[1]
if recipient_domain in monitoring_domains:
return True
except ValidationError:
pass
return False
def spam_check(self):
if not self.allowed_recipient:
self.is_spam = Letter.SPAM.probable_spam
self.save()
return
if self.email_from is not None and "@" in self.email_from:
from_domain = LetterEmailDomain.objects.filter(
domain_name=get_email_domain(self.email_from)
).first()
else:
from_domain = None
if (
# (self.email_to not in self.body) or
(self.email_from is None or self.email_from == "")
or (from_domain is not None and from_domain.is_spammer_domain)
):
self.is_spam = Letter.SPAM.probable_spam
self.save()
return
def get_full_content(self):
attachments_text_content_list = [
(
attachment.text_content
if attachment.text_content_update_result == "Processed"
and attachment.text_content
else ""
)
for attachment in self.attachment_set.all()
]
attachments_text_content = "\n".join(attachments_text_content_list)
return self.body + "\n" + attachments_text_content
def ai_prompt_help(self):
return (
"Ocena wykonana za pomocą Azure OpenAI. Wszystkie możliwe opcje: \n"
+ letter_categorization.format(
intro="",
institution=self.case.institution.name if self.case else "???",
monitoring_response="",
).split("```")[1]
)
def get_normalized_response_html_table(self):
if self.normalized_response:
response_received = timezone.localtime(self.created).strftime(
"%Y-%m-%d %H:%M:%S"
)
return mark_safe(
f"<p> Otrzymano: {response_received} </p>"
+ render_normalized_response_html_table(self.normalized_response)
)
return ""
[docs]
class LetterEmailDomain(TimeStampedModel):
domain_name = models.CharField(
verbose_name=_("Email address domain"), max_length=100, blank=True, null=True
)
is_trusted_domain = models.BooleanField(
verbose_name=_("Is trusted (own or partner) domain?"), default=False
)
is_monitoring_email_to_domain = models.BooleanField(
verbose_name=_("Is monitoring Email To domain?"), default=False
)
is_spammer_domain = models.BooleanField(
verbose_name=_("Is spammer domain?"), default=False
)
is_non_spammer_domain = models.BooleanField(
verbose_name=_("Is non spammer domain?"), default=False
)
email_to_count = models.IntegerField(
verbose_name=_("Email To addres counter"), default=0
)
email_from_count = models.IntegerField(
verbose_name=_("Email From addres counter"), default=0
)
[docs]
def save(self, *args, **kwargs):
if (
self.is_monitoring_email_to_domain
or self.is_trusted_domain
or self.is_non_spammer_domain
):
self.is_spammer_domain = False
super().save(*args, **kwargs)
def add_email_to_letter(self):
self.email_to_count += 1
self.save()
def add_email_from_letter(self):
self.email_from_count += 1
self.save()
@classmethod
def register_letter_email_domains(cls, letter: Letter):
trusted_domains = Domain.objects.all().values_list("name", flat=True)
is_outgoing = (
letter.is_outgoing or "fedrowanie.siecobywatelska.pl" in letter.email_from
)
if letter.email_from and "@" in letter.email_from:
from_domain_name = get_email_domain(letter.email_from)
from_domain, _ = cls.objects.get_or_create(domain_name=from_domain_name)
from_domain.is_trusted_domain = from_domain.domain_name in trusted_domains
from_domain.save()
from_domain.add_email_from_letter()
if letter.email_to and "@" in letter.email_to:
to_domain_name = get_email_domain(letter.email_to)
to_domain, _ = cls.objects.get_or_create(domain_name=to_domain_name)
to_domain.is_trusted_domain = to_domain.domain_name in trusted_domains
to_domain.is_monitoring_email_to_domain = is_outgoing
to_domain.save()
to_domain.add_email_to_letter()
class Meta:
verbose_name = _("Letter Email domain")
verbose_name_plural = _("Letter Email domains")
def validate_tld_name(value):
if not value.isalpha():
raise ValidationError(_("TLD name must be a single word"), code="invalid")
[docs]
class ReputableLetterEmailTLD(TimeStampedModel):
name = models.CharField(
verbose_name=_("Email address repurable TLD"),
max_length=100,
blank=False,
null=False,
unique=True,
validators=[validate_tld_name],
)
class Meta:
verbose_name = _("Reputable Letter Email TLD")
verbose_name_plural = _("Reputable Letter Email TLDs")
[docs]
class MassMessageDraft(TimeStampedModel):
letter = models.OneToOneField(
to=Letter,
verbose_name=_("Letter"),
related_name="mass_draft",
on_delete=models.CASCADE,
)
monitoring = models.ForeignKey(
to="monitorings.Monitoring",
verbose_name=_("Monitoring"),
on_delete=models.PROTECT,
)
recipients_tags = models.ManyToManyField(
to="cases_tags.Tag",
verbose_name=_("Recipient tags"),
help_text=_("Used to determine recipients by case tags."),
blank=True,
)
class Meta:
verbose_name = _("Mass message draft")
verbose_name_plural = _("Mass message drafts")
def __str__(self):
return f"Mass draft for {self.letter}"
def determine_cases(self):
return Case.objects.filter(
monitoring=self.monitoring, tags__in=self.recipients_tags.all()
).filter(institution__archival=False)
[docs]
class AttachmentQuerySet(models.QuerySet):
def _enforce_quarantine(self, user):
return enforce_quarantined_queryset(self, user, "letter__record__case")
def for_user(self, user):
if not user.is_superuser:
return self.filter(
letter__is_spam__in=[Letter.SPAM.unknown, Letter.SPAM.non_spam]
)._enforce_quarantine(user)
return self
def with_scan_result(self):
return self.prefetch_related("scan_request")
[docs]
class Attachment(AttachmentBase):
letter = models.ForeignKey(Letter, on_delete=models.CASCADE)
objects = AttachmentQuerySet.as_manager()
scan_request = GenericRelation(ScanRequest, verbose_name=_("Virus scan request"))
text_content = models.TextField(
verbose_name=_("Text content"), blank=True, null=True
)
text_content_update_result = models.TextField(
verbose_name=_("Text content update result"), blank=True, null=True
)
def current_scan_request(self):
scans = self.scan_request.all()
if scans:
return scans[0]
def scan_status(self):
scan = self.current_scan_request()
if scan:
return scan.status
def is_infected(self):
scan = self.scan_status()
return scan == ScanRequest.STATUS.infected
def __str__(self):
if self.attachment:
return f"{self.filename}"
return "None"
def get_absolute_url(self):
return reverse(
"letters:attachment", kwargs={"pk": self.pk, "letter_pk": self.letter_id}
)
def get_full_url(self):
return "".join(
["https://", get_current_site(None).domain, self.get_absolute_url()]
)
def update_text_content(self):
try:
logger.info(
f"Updating text content for att. {self.pk}: {self.attachment.name}"
)
response = requests.post(
settings.FILE_TO_TEXT_URL,
files={
"file": (
self.attachment.name.split("/")[-1],
self.attachment.read(),
)
},
headers={"Authorization": f"JWT {settings.FILE_TO_TEXT_TOKEN}"},
)
if response.status_code != 200:
self.text_content_update_result = (
f"status_code: {response.status_code}, content: {response.content}"
)
# save update_fields does not work with MySQL 5.7
# self.save(update_fields=["text_content_update_result"])
self.save()
return False
log_message_dict = response.json().copy()
_ = log_message_dict.pop("text")
logger.info(
f"File to text API response:{response.status_code}, {log_message_dict}"
)
self.text_content = response.json()["text"]
self.text_content_update_result = response.json()["message"]
# save update_fields does not work with MySQL 5.7
# self.save(update_fields=["text_content", "text_content_update_result"])
self.save()
return True
except Exception as e:
logger.error(e)
self.text_content_update_result = str(e)
# save update_fields does not work with MySQL 5.7
# self.save(update_fields=["text_content_update_result"])
self.save()
return False
@property
def text_content_warning(self):
warning = """
Uwaga: treść załączników została odczytana maszynowo, więc może
zawierać błędy związane z nieprawidłowym odczytaniem znaków,
a także błędną interpretacji układu tekstu na stronie.
Jeśli nie masz stosownych uprawnień i potrzebujesz dostępu
do oryginału, skontaktuj się z biurem SOWP.
"""
return warning # " ".join(warning.split())