from django.contrib import admin
from django.contrib.contenttypes.models import ContentType
from django.db.models import IntegerField, OuterRef, Q, Subquery
from django.urls import reverse
from django.utils import timezone
from django.utils.html import format_html
from django.utils.translation import gettext_lazy as _
from feder.llm_evaluation.prompts import letter_categories_list
from feder.virus_scan.models import Request
from .models import Attachment, Letter, LetterEmailDomain, ReputableLetterEmailTLD
from .tasks import update_attachment_text_content
class LetterDirectionListFilter(admin.SimpleListFilter):
title = _("Letter Direction") # Displayed in the admin sidebar
parameter_name = "letter_direction_filter" # The URL parameter name
def lookups(self, request, model_admin):
# Return the filter options as a list of tuples
return (
("outgoing", _("Outgoing")),
("incoming", _("Incoming")),
)
def queryset(self, request, queryset):
# Apply the filter to the queryset based on the selected option
if self.value() == "outgoing":
return queryset.is_outgoing()
elif self.value() == "incoming":
return queryset.is_incoming()
class LetterLlmEvaluationListFilter(admin.SimpleListFilter):
title = _("Letter LLM Evaluation") # Displayed in the admin sidebar
parameter_name = "letter_llm_evaluation_filter" # The URL parameter name
def lookups(self, request, model_admin):
# Return the filter options as a list of tuples
return list(
(
" ".join(item.format(institution=" ... ").replace("\n", "").split())[
:20
],
" ".join(item.format(institution=" ... ").replace("\n", "").split()),
)
for item in letter_categories_list
)
def queryset(self, request, queryset):
# Apply the filter to the queryset based on the selected option
if self.value():
return queryset.filter(ai_evaluation__startswith=self.value())
return queryset
[docs]
class AttachmentInline(admin.StackedInline):
"""
Stacked Inline View for Attachment
"""
model = Attachment
[docs]
@admin.register(Letter)
class LetterAdmin(admin.ModelAdmin):
"""
Admin View for Letter
"""
date_hierarchy = "created"
list_display = (
"id",
"get_record_id",
"title",
"get_case",
"get_monitoring",
"author",
"created",
# "modified",
"is_draft",
# "is_incoming",
"get_outgoing",
"get_delivery_status",
"is_spam",
"ai_evaluation",
"email_from",
"email_to",
"eml",
"message_id_header",
)
list_filter = (
"is_spam",
LetterLlmEvaluationListFilter,
LetterDirectionListFilter,
# "created",
"record__case__monitoring",
# "modified",
# "is_outgoing",
)
inlines = [AttachmentInline]
search_fields = (
"id",
"title",
# "body",
"record__case__name",
"eml",
"message_id_header",
"email_from",
"email_to",
)
# raw_id_fields = ("author_user", "author_institution", "record")
# list_editable = ("is_spam",)
ordering = ("-id",)
actions = [
"delete_selected",
"mark_spam",
"mark_probable_spam",
"mark_spam_unknown",
"mark_non_spam",
]
readonly_fields = ("author_user", "author_institution", "record")
[docs]
def has_add_permission(self, request):
return False
[docs]
def has_change_permission(self, request, obj=None):
return False
[docs]
def has_delete_permission(self, request, obj=None):
return request.user.is_superuser
@admin.display(
description=_("Record id"),
ordering="record__id",
)
def get_record_id(self, obj):
if obj.record is None:
return None
return obj.record.id
@admin.display(
description=_("Is outgoing"),
boolean=True,
)
def get_outgoing(self, obj):
return obj.is_outgoing
@admin.display(
description=_("Delivery Status"),
)
def get_delivery_status(self, obj):
return obj.emaillog.status_verbose
@admin.display(
description=_("Case name"),
ordering="record__case",
)
def get_case(self, obj):
return obj.record.case
@admin.display(
description=_("Monitoring name"),
ordering="record__case__monitoring",
)
def get_monitoring(self, obj):
if obj.record.case is not None:
return obj.record.case.monitoring
return None
@admin.action(description=_("Mark selected letters as Spam"))
def mark_spam(modeladmin, request, queryset):
queryset.update(
is_spam=Letter.SPAM.spam,
mark_spam_by=request.user,
mark_spam_at=timezone.now(),
)
@admin.action(description=_("Mark selected letters as Non Spam"))
def mark_non_spam(modeladmin, request, queryset):
queryset.update(is_spam=Letter.SPAM.non_spam)
@admin.action(description=_("Mark selected letters as Spam Unknown"))
def mark_spam_unknown(modeladmin, request, queryset):
queryset.update(is_spam=Letter.SPAM.unknown)
@admin.action(description=_("Mark selected letters as Probable Spam"))
def mark_probable_spam(modeladmin, request, queryset):
queryset.update(is_spam=Letter.SPAM.probable_spam)
# def get_queryset(self, *args, **kwargs):
# qs = super().get_queryset(*args, **kwargs)
# return qs.with_author()
class AttachmentLetterSpamFilter(admin.SimpleListFilter):
title = _("Letter is spam") # Displayed in the admin sidebar
parameter_name = "letter_is_spam" # The URL parameter name
def lookups(self, request, model_admin):
# Return the filter options as a list of tuples
return Letter.SPAM
def queryset(self, request, queryset):
# Apply the filter to the queryset based on the selected option
if self.value():
return queryset.filter(letter__is_spam=self.value())
return queryset
class AttachmentVirusScanListFilter(admin.SimpleListFilter):
title = _("Virus Scan status") # Displayed in the admin sidebar
parameter_name = "virus_scan_status" # The URL parameter name
def lookups(self, request, model_admin):
# Return the filter options as a list of tuples
return Request.STATUS
def queryset(self, request, queryset):
# Define the content_type for Attachment
attachment_ct = ContentType.objects.get_for_model(Attachment)
if self.value() is not None:
# Use a subquery to filter Attachments based on their related Request status
queryset = queryset.filter(
pk__in=Request.objects.filter(
content_type=attachment_ct,
object_id=OuterRef("pk"),
status=self.value(),
).values("object_id")
)
return queryset
class AttachmentVirusScanExistsFilter(admin.SimpleListFilter):
title = _("Has Virus Scan Request") # Displayed in the admin sidebar
parameter_name = "virus_scan_exists" # The URL parameter name
def lookups(self, request, model_admin):
return (
("True", _("Has scan request")),
("False", _("No scan request")),
)
def queryset(self, request, queryset):
# Define the content_type for Attachment
attachment_ct = ContentType.objects.get_for_model(Attachment)
if self.value() == "True":
# Find Attachments that have at least one related ScanRequest
queryset = queryset.filter(
pk__in=Request.objects.filter(
content_type=attachment_ct, object_id=OuterRef("pk")
).values("object_id")
)
elif self.value() == "False":
# Find Attachments that have no related ScanRequest
queryset = queryset.exclude(
pk__in=Request.objects.filter(
content_type=attachment_ct, object_id=OuterRef("pk")
).values("object_id")
)
return queryset
[docs]
@admin.register(Attachment)
class AttachmentAdmin(admin.ModelAdmin):
"""
Admin View for Attachment
"""
date_hierarchy = "created"
list_display = (
"id",
"get_letter_id",
"get_letter_is_spam",
"attachment",
"get_scan_status",
"text_content_update_result",
"created",
)
search_fields = (
"id",
"attachment",
"letter__id",
"text_content_update_result",
)
list_filter = (
AttachmentLetterSpamFilter,
AttachmentVirusScanExistsFilter,
AttachmentVirusScanListFilter,
"text_content_update_result",
)
ordering = ("-id",)
actions = ["schedule_update_text_content"]
@admin.action(
description=_("Schedule text content update for selected attachments")
)
def schedule_update_text_content(self, request, queryset):
count = queryset.count()
for pk in queryset.values_list("pk", flat=True):
update_attachment_text_content(pk)
self.message_user(
request,
_("Scheduled text content update for %(count)d attachment(s).")
% {"count": count},
)
[docs]
def has_add_permission(self, request):
return False
[docs]
def has_change_permission(self, request, obj=None):
return False
[docs]
def has_delete_permission(self, request, obj=None):
return request.user.is_superuser
[docs]
def get_queryset(self, request):
"""
Override the default get_queryset method to annotate the queryset with
the latest scan_status from the related Request model.
"""
queryset = super().get_queryset(request)
# Define the content_type for Attachment
attachment_ct = ContentType.objects.get_for_model(Attachment)
# Add an annotation that gets the latest scan_status from the related Request
latest_status_subquery = (
Request.objects.filter(content_type=attachment_ct, object_id=OuterRef("pk"))
.order_by("-created")
.values("status")[:1]
) # Get the latest status
return queryset.annotate(
scan_status=Subquery(latest_status_subquery, output_field=IntegerField())
)
@admin.display(description=_("Virus Scan status"))
def get_scan_status(self, obj):
# Access the annotated scan_status field directly
scan_status = getattr(
obj, "scan_status", None
) # Safe access in case annotation is missing
if scan_status is not None and scan_status in Request.STATUS._display_map:
return Request.STATUS._display_map[scan_status]
return "-"
@admin.display(description=_("Letter is spam"))
def get_letter_is_spam(self, obj):
return Letter.SPAM._display_map[obj.letter.is_spam] if obj.letter else "-"
@admin.display(description=_("Letter id"))
def get_letter_id(self, obj):
if obj.letter:
# Generate a link to the Letter admin change page
url = reverse("admin:letters_letter_change", args=[obj.letter.id])
return format_html('<a href="{}">{}</a>', url, obj.letter.id)
return "-"
class ReputableTLDListFilter(admin.SimpleListFilter):
title = "TLD"
parameter_name = "tld"
def lookups(self, request, model_admin):
return [
("reputable", _("Reputable TLDs")),
("non_reputable", _("Non-reputable TLDs")),
]
def queryset(self, request, queryset):
tlds = ReputableLetterEmailTLD.objects.values_list("name", flat=True)
q_object = Q()
for tld in tlds:
q_object |= Q(domain_name__iendswith=tld)
if self.value() == "reputable":
return queryset.filter(q_object)
elif self.value() == "non_reputable":
return queryset.exclude(q_object)
[docs]
@admin.register(LetterEmailDomain)
class LetterEmailDomainAdmin(admin.ModelAdmin):
"""
Admin View for LetterEmailDomain
"""
list_display = (
"domain_name",
"is_trusted_domain",
"is_monitoring_email_to_domain",
"is_non_spammer_domain",
"is_spammer_domain",
"email_to_count",
"email_from_count",
)
list_filter = (
"is_trusted_domain",
"is_monitoring_email_to_domain",
"is_non_spammer_domain",
"is_spammer_domain",
ReputableTLDListFilter,
)
search_fields = ("domain_name",)
ordering = ("-email_from_count",)
list_editable = ("is_spammer_domain", "is_non_spammer_domain")
[docs]
@admin.register(ReputableLetterEmailTLD)
class ReputableLetterEmailTLDAdmin(admin.ModelAdmin):
"""
Admin View for ReputableLetterEmailTLD
"""
list_display = ("id", "name")
search_fields = ("name",)
ordering = ("name",)