import json
import logging
import uuid
from os import path
from atom.ext.django_filters.views import UserKwargFilterSetMixin
from atom.views import (
ActionMessageMixin,
ActionView,
CreateMessageMixin,
DeleteMessageMixin,
UpdateMessageMixin,
)
from braces.views import (
FormValidMessageMixin,
MessageMixin,
PrefetchRelatedMixin,
SelectRelatedMixin,
UserFormKwargsMixin,
)
from cached_property import cached_property
from django.contrib.messages.views import SuccessMessageMixin
from django.contrib.syndication.views import Feed
from django.core.exceptions import PermissionDenied
from django.core.files.base import ContentFile
from django.db.models import Q
from django.http import HttpResponseBadRequest, JsonResponse
from django.shortcuts import get_object_or_404
from django.urls import reverse_lazy
from django.utils.datetime_safe import datetime
from django.utils.encoding import force_str
from django.utils.feedgenerator import Atom1Feed
from django.utils.translation import gettext_lazy as _
from django.views import View
from django.views.generic import CreateView, DeleteView, DetailView, FormView
from django_filters.views import FilterView
from extra_views import CreateWithInlinesView, UpdateWithInlinesView
from guardian.shortcuts import get_anonymous_user
from feder.alerts.models import Alert
from feder.cases.models import Case
from feder.letters.formsets import AttachmentInline
from feder.letters.settings import LETTER_RECEIVE_SECRET
from feder.llm_evaluation.tasks import categorize_letter_in_background
from feder.main.mixins import (
AttrPermissionRequiredMixin,
BaseXSendFileView,
RaisePermissionRequiredMixin,
)
from feder.monitorings.models import Monitoring
from feder.monitorings.tasks import send_mass_draft
from feder.records.models import Record
from feder.virus_scan.models import Request as ScanRequest
from .filters import LetterFilter
from .forms import AssignLetterForm, LetterForm, ReplyForm
from .logs.tasks import update_sent_letter_status
from .mixins import LetterObjectFeedMixin, LetterSummaryTableMixin
from .models import Attachment, Letter, LetterEmailDomain
from .tasks import update_letter_attachments_text_content
_("Letters index")
logger = logging.getLogger(__file__)
class MixinGzipXSendFile:
def get_sendfile_kwargs(self, context):
kwargs = super().get_sendfile_kwargs(context)
if kwargs["filename"] and kwargs["filename"].endswith(".gz"):
kwargs["encoding"] = "gzip"
filename = path.basename(kwargs["filename"][: -len(".gz")])
kwargs["attachment_filename"] = filename
return kwargs
class CaseRequiredMixin:
def get_queryset(self):
qs = super().get_queryset().exclude(record__case=None)
return qs.attachment_count()
[docs]
class LetterCommonMixin:
"""
Defines get_queryset and get_permission_object methods.
It should to be specified before permission related mixins.
"""
def get_queryset(self):
return (
super()
.get_queryset()
.exclude(
Q(record__case__isnull=True)
& ~Q(message_type=Letter.MESSAGE_TYPES.mass_draft)
)
.attachment_count()
.with_attachment()
)
def get_permission_object(self):
obj = super().get_object()
return (
obj.mass_draft.monitoring
if obj.is_mass_draft()
else obj.record.case.monitoring
)
[docs]
class LetterListView(
LetterCommonMixin,
UserKwargFilterSetMixin,
PrefetchRelatedMixin,
SelectRelatedMixin,
LetterSummaryTableMixin,
FilterView,
):
filterset_class = LetterFilter
model = Letter
select_related = ["record__case"]
prefetch_related = [
"author_user",
"author_institution",
"record__case__institution",
]
paginate_by = 25
ordering = "-pk"
[docs]
def get_queryset(self):
qs = super().get_queryset().exclude_spam()
return qs.attachment_count().for_user(self.request.user)
[docs]
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["summary_table"] = self.render_summary_table()
return context
[docs]
class LetterDetailView(SelectRelatedMixin, LetterCommonMixin, DetailView):
model = Letter
select_related = ["author_institution", "author_user", "record__case__monitoring"]
[docs]
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
return context
[docs]
def get_queryset(self):
qs = super().get_queryset().exclude_spam()
return qs.for_user(self.request.user)
[docs]
class LetterMessageXSendFileView(MixinGzipXSendFile, BaseXSendFileView):
model = Letter
file_field = "eml"
send_as_attachment = True
[docs]
def get_queryset(self):
qs = super().get_queryset()
return qs.for_user(self.request.user).exclude_spam()
[docs]
class LetterCreateView(
RaisePermissionRequiredMixin,
UserFormKwargsMixin,
CreateMessageMixin,
FormValidMessageMixin,
CreateView,
):
model = Letter
form_class = LetterForm
permission_required = "monitorings.add_letter"
@cached_property
def case(self):
qs = Case.objects.select_related("monitoring").for_user(self.request.user)
return get_object_or_404(qs, pk=self.kwargs["case_pk"])
def get_permission_object(self):
return self.case.monitoring
[docs]
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["case"] = self.case
context["monitoring"] = self.case.monitoring
context["user"] = self.request.user
return context
[docs]
class LetterReplyView(
LetterCommonMixin,
RaisePermissionRequiredMixin,
UserFormKwargsMixin,
FormValidMessageMixin,
CreateWithInlinesView,
):
template_name = "letters/letter_reply.html"
model = Letter
form_class = ReplyForm
inlines = [AttachmentInline]
permission_required = "monitorings.add_draft"
@cached_property
def letter(self):
return get_object_or_404(
self.get_queryset()
.select_related("record__case__monitoring")
.for_user(self.request.user),
pk=self.kwargs["pk"],
)
def get_permission_object(self):
return self.letter.case.monitoring
[docs]
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["object"] = self.letter
context["reply"] = True
return context
[docs]
class LetterSendView(
LetterCommonMixin, AttrPermissionRequiredMixin, MessageMixin, ActionView
):
model = Letter
permission_required = "monitorings.reply"
template_name_suffix = "_send"
def action(self):
if self.object.is_mass_draft():
cases_count = self.object.mass_draft.determine_cases().count()
send_mass_draft(self.object.pk)
self.messages.success(
_(
'Message "{letter}" has been scheduled for sending '
"to {count} recipients!"
).format(letter=self.object, count=cases_count),
fail_silently=True,
)
else:
self.object.send()
self.messages.success(
_('Reply "{letter}" has been sent to {institution}!').format(
letter=self.object, institution=self.object.case.institution
),
fail_silently=True,
)
update_sent_letter_status(schedule=(3 * 60))
def get_success_url(self):
if self.object.is_mass_draft():
obj = self.object.mass_draft.monitoring
else:
obj = self.object
return obj.get_absolute_url()
[docs]
def get_queryset(self):
qs = super().get_queryset()
return qs.for_user(self.request.user)
[docs]
class LetterUpdateView(
LetterCommonMixin,
AttrPermissionRequiredMixin,
UserFormKwargsMixin,
UpdateMessageMixin,
FormValidMessageMixin,
UpdateWithInlinesView,
):
model = Letter
form_class = LetterForm
inlines = [AttachmentInline]
permission_required = "monitorings.change_letter"
[docs]
def get_queryset(self):
return super().get_queryset().for_user(self.request.user).with_attachment()
[docs]
class LetterDeleteView(
LetterCommonMixin, AttrPermissionRequiredMixin, DeleteMessageMixin, DeleteView
):
model = Letter
permission_required = "monitorings.delete_letter"
[docs]
def get_queryset(self):
qs = super().get_queryset()
return qs.for_user(self.request.user)
[docs]
def get_success_url(self):
if self.object.is_mass_draft():
url = self.object.mass_draft.monitoring.get_absolute_url()
else:
url = self.object.case.get_absolute_url()
return url
class LetterRssFeed(Feed):
title = _("Latest letters on whole site")
link = reverse_lazy("letters:list")
description = _(
"Updates on new letters on site including "
+ "receving and sending in all monitorings."
)
feed_url = reverse_lazy("letters:rss")
description_template = "letters/_letter_feed_item.html"
def items(self):
return (
Letter.objects.with_feed_items()
.exclude(record__case=None)
.exclude_spam()
.recent()
.for_user(get_anonymous_user())
.order_by("-created")[:30]
)
def item_title(self, item):
return item.title
def item_author_name(self, item):
return force_str(item.author)
def item_author_link(self, item):
if item.author:
return item.author.get_absolute_url()
def item_pubdate(self, item):
return item.created
def item_updateddate(self, item):
return item.modified
def item_categories(self, item):
return [
item.case,
item.case.monitoring,
item.case.institution,
item.case.institution.jst,
]
def item_enclosure_url(self, item):
return item.eml.url if item.eml else None
class LetterAtomFeed(LetterRssFeed):
feed_type = Atom1Feed
subtitle = LetterRssFeed.description
feed_url = reverse_lazy("letters:atom")
[docs]
class LetterMonitoringAtomFeed(LetterMonitoringRssFeed):
feed_type = Atom1Feed
subtitle = LetterMonitoringRssFeed.description
feed_url = reverse_lazy("letters:atom")
[docs]
class LetterCaseAtomFeed(LetterCaseRssFeed):
feed_type = Atom1Feed
subtitle = LetterCaseRssFeed.description
feed_url = reverse_lazy("letters:atom")
[docs]
class LetterReportSpamView(ActionMessageMixin, CaseRequiredMixin, ActionView):
template_name_suffix = "_spam"
model = Letter
[docs]
def get_queryset(self):
return (
super()
.get_queryset()
.filter(is_spam__in=[Letter.SPAM.unknown, Letter.SPAM.probable_spam])
.for_user(self.request.user)
)
def action(self):
author = None if self.request.user.is_anonymous else self.request.user
Alert.objects.create(
monitoring=self.object.case.monitoring,
reason=_("SPAM"),
author=author,
link_object=self.object,
)
def get_success_message(self):
return _(
"Thanks for your help. The report was forwarded to responsible persons."
)
def get_success_url(self):
return self.object.case.get_absolute_url()
[docs]
class LetterResendView(
ActionMessageMixin, AttrPermissionRequiredMixin, CaseRequiredMixin, ActionView
):
template_name_suffix = "_resend"
model = Letter
permission_required = "monitorings.reply"
[docs]
def get_queryset(self):
return (
super()
.get_queryset()
.select_related("record__case__monitoring")
.is_outgoing()
.for_user(self.request.user)
)
def get_permission_object(self):
return self.get_object().case.monitoring
def action(self):
case = self.object.case
self.resend = Letter(
author_user=self.request.user,
record=Record.objects.create(case=case),
title=self.object.title,
body=self.object.body,
html_body=self.object.html_body,
)
self.resend.save()
self.resend.send(commit=True, only_email=False)
update_sent_letter_status(schedule=(3 * 60))
def get_success_message(self):
return _("The message was resend.")
def get_success_url(self):
return self.object.case.get_absolute_url()
[docs]
class LetterMarkSpamView(RaisePermissionRequiredMixin, ActionMessageMixin, ActionView):
template_name_suffix = "_mark_spam"
model = Letter
permission_required = "monitorings.spam_mark"
accept_global_perms = True
[docs]
def get_object(self, *args, **kwargs):
if not hasattr(self, "object"):
self.object = super().get_object(*args, **kwargs)
return self.object
def get_permission_object(self):
if self.get_object().case:
return self.get_object().case.monitoring
return None
[docs]
def get_queryset(self):
return (
super()
.get_queryset()
.filter(is_spam__in=[Letter.SPAM.unknown, Letter.SPAM.probable_spam])
.for_user(self.request.user)
)
def action(self):
if "valid" in self.request.POST:
self.object.is_spam = Letter.SPAM.non_spam
else:
self.object.is_spam = Letter.SPAM.spam
self.object.mark_spam_by = self.request.user
self.object.mark_spam_at = datetime.now()
self.object.save(update_fields=["is_spam", "mark_spam_by"])
Alert.objects.link_object(self.object).update(
solver=self.request.user, status=True
)
def get_success_message(self):
if "valid" in self.request.POST:
return _("The letter {object} has been marked as valid.").format(
object=self.object
)
return _("The message {object} has been marked as spam and hidden.").format(
object=self.object
)
def get_success_url(self):
if self.get_object().case:
return self.object.case.get_absolute_url()
return reverse_lazy("letters:unrecognized_list")
[docs]
class UnrecognizedLetterListView(
UserKwargFilterSetMixin,
RaisePermissionRequiredMixin,
PrefetchRelatedMixin,
LetterSummaryTableMixin,
FilterView,
):
filterset_class = LetterFilter
model = Letter
paginate_by = 10
permission_object = None
permission_required = "letters.recognize_letter"
template_name_suffix = "_unrecognized_list"
select_related = ["record"]
prefetch_related = ["attachment_set"]
ordering = "-pk"
[docs]
def get_queryset(self):
return (
super()
.get_queryset()
.filter(record__case=None)
.exclude(message_type=Letter.MESSAGE_TYPES.mass_draft)
.exclude_spam()
)
[docs]
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["object_list"] = self.update_object_list(context["object_list"])
context["summary_table"] = self.render_summary_table()
return context
def update_object_list(self, object_list):
result = []
for obj in object_list:
obj.assign_form = AssignLetterForm(letter=obj)
result.append(obj)
return result
[docs]
class AttachmentXSendFileView(MixinGzipXSendFile, BaseXSendFileView):
model = Attachment
file_field = "attachment"
send_as_attachment = True
[docs]
def get_queryset(self):
return super().get_queryset().for_user(self.request.user)
def get_sendfile_kwargs(self, context):
kwargs = super().get_sendfile_kwargs(context)
if kwargs["filename"].endswith(".gz"):
kwargs["encoding"] = "gzip"
return kwargs
def render_to_response(self, context):
if context["object"].is_infected():
raise PermissionDenied(
_(
"You do not have permission to view that file. "
+ "The file was considered dangerous."
)
)
if not self.request.user.is_authenticated:
raise PermissionDenied(_("You do not have permission to view that file."))
if self.request.user.is_authenticated and not (
self.request.user.is_superuser or self.request.user.can_download_attachment
):
raise PermissionDenied(_("You do not have permission to view that file."))
return super().render_to_response(context)
[docs]
class AttachmentRequestCreateView(ActionMessageMixin, ActionView):
template_name_suffix = "_request_scan"
model = Attachment
[docs]
def get_object(self, *args, **kwargs):
if not hasattr(self, "object"):
self.object = super().get_object(*args, **kwargs)
return self.object
[docs]
def get_queryset(self):
return super().get_queryset().for_user(self.request.user)
def action(self):
ScanRequest.objects.create(
content_object=self.object,
field_name="attachment",
)
def get_success_message(self):
return _("The file {} has been queued for scanning").format(self.object)
def get_success_url(self):
return self.object.letter.get_absolute_url()
[docs]
class ReceiveEmail(View):
required_content_type = "multipart/form-data"
required_version = "v2"
def post(self, request):
logger.info(f"Add letter POST request received: {request}")
if request.GET.get("secret") != LETTER_RECEIVE_SECRET:
logger.error("POST request permission denied")
raise PermissionDenied
if request.content_type != self.required_content_type:
logger.error("The request has an invalid Content-Type. ")
return HttpResponseBadRequest(
"The request has an invalid Content-Type. "
'The acceptable Content-Type is "{}".'.format(
self.required_content_type
)
)
manifest = json.load(request.FILES["manifest"])
if manifest.get("version") != self.required_version:
logger.error("The request has an invalid format version. ")
return HttpResponseBadRequest(
"The request has an invalid format version. "
'The acceptable format version is "{}".'.format(self.required_version)
)
eml_data = request.FILES["eml"]
logger.info(f'Letter to add: {manifest["headers"]}')
letter = self.get_letter(
headers=manifest["headers"],
eml_manifest=manifest["eml"],
text=manifest["text"],
eml_data=eml_data,
)
LetterEmailDomain.register_letter_email_domains(letter=letter)
letter_attachemnts = Attachment.objects.bulk_create(
self.get_attachment(attachment, letter)
for attachment in request.FILES.getlist("attachment")
)
letter.save()
logging.info(f"Letter attachments created: {letter_attachemnts}")
update_letter_attachments_text_content(letter.pk)
categorize_letter_in_background(letter.pk)
return JsonResponse({"status": "OK", "letter": letter.pk})
def get_letter(self, headers, eml_manifest, text, eml_data, **kwargs):
case = self.get_case(headers["to+"])
eml_file = self.get_eml_file(eml_manifest, eml_data)
from_email = headers["from"][0] if headers["from"][0] else "unknown@domain.gov"
auto_reply = headers.get("auto_reply_type")
if auto_reply is not None:
auto_reply = auto_reply.replace("-", "_")
message_type = getattr(
Letter.MESSAGE_TYPES, auto_reply, Letter.MESSAGE_TYPES.unknown
)
else:
message_type = Letter.MESSAGE_TYPES.regular
if Letter.objects.filter(
email_from=headers["from"][0] if headers.get("from") else None,
email_to=headers["to"][0] if headers.get("to") else None,
message_id_header=headers["message_id"],
title=headers["subject"],
).exists():
letter_to_add = Letter.objects.filter(
email_from=headers["from"][0] if headers.get("from") else None,
email_to=headers["to"][0] if headers.get("to") else None,
message_id_header=headers["message_id"],
title=headers["subject"],
).first()
letter_to_add.spam_check()
logger.info(f"Request skipped, letter exists: {letter_to_add.pk}")
return letter_to_add
letter_to_add = Letter.objects.create(
author_institution=case.institution if case else None,
email=from_email,
email_from=headers["from"][0] if headers.get("from") else None,
email_to=headers["to"][0] if headers.get("to") else None,
message_id_header=headers["message_id"],
record=Record.objects.create(case=case),
message_type=message_type,
title=headers["subject"],
body=text["content"],
html_body=text.get("html_content", ""),
quote=text["quote"],
html_quote=text.get("html_quote", ""),
eml=eml_file,
is_draft=False,
)
letter_to_add.spam_check()
logger.info(f"Request processed, letter added: {letter_to_add.pk}")
return letter_to_add
def get_case(self, to_plus):
return Case.objects.select_related("institution").by_addresses(to_plus).first()
def get_attachment(self, attachment, letter):
file_obj = ContentFile(content=attachment.read(), name=attachment.name)
return Attachment(letter=letter, attachment=file_obj)
def get_eml_file(self, eml_manifest, eml_data):
eml_extensions = "eml.gz" if eml_manifest["compressed"] else "eml"
eml_filename = f"{uuid.uuid4().hex}.{eml_extensions}"
eml_content = eml_data.read()
return ContentFile(eml_content, eml_filename)