Source code for feder.virus_scan.models

from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.db import models
from django.utils.translation import gettext_lazy as _
from jsonfield import JSONField
from model_utils import Choices
from model_utils.models import TimeStampedModel


[docs] class RequestQuerySet(models.QuerySet): def with_content_object(self): return self.prefetch_related("content_object").select_related("content_type") def for_object(self, obj): return self.filter( content_type=ContentType.objects.get_for_model(obj._meta.model), object_id=obj.pk, )
[docs] class Request(TimeStampedModel): STATUS = Choices( (0, "created", _("Created")), (1, "queued", _("Queued")), (2, "infected", _("Infected")), (3, "not_detected", _("Not detected")), (4, "failed", _("Failed")), ) content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE) object_id = models.PositiveIntegerField() content_object = GenericForeignKey("content_type", "object_id") field_name = models.CharField(max_length=50) engine_name = models.CharField( verbose_name=_("Engine name"), max_length=20, blank=True ) engine_id = models.CharField( max_length=100, verbose_name=_("External ID"), blank=True ) engine_report = JSONField(verbose_name=_("Engine result"), blank=True) engine_link = models.CharField( max_length=150, verbose_name=_("Engine result URL"), blank=True ) status = models.IntegerField(choices=STATUS, default=STATUS.created) objects = RequestQuerySet.as_manager() def get_file(self): return getattr(self.content_object, self.field_name) def receive_result(self): from feder.virus_scan.engine import get_engine current_engine = get_engine() result = current_engine.receive_result(self.engine_id) for key in result: setattr(self, key, result[key]) def send_scan(self): f = self.get_file() if bool(f.name) and f.storage.exists(f.name) and f.size > 0: from feder.virus_scan.engine import get_engine current_engine = get_engine() result = current_engine.send_scan(f.file, f.name) self.engine_name = current_engine.name else: result = { "status": self.STATUS.failed, "engine_report": { "error": "Attachement file to scan is missing or 0 length" }, } for key in result: setattr(self, key, result[key]) class Meta: verbose_name = _("Request of virus scan") verbose_name_plural = _("Requests of virus scan") ordering = ["created"] indexes = [ models.Index(fields=["content_type", "object_id"]), ]