|
1 | 1 | import logging
|
2 | 2 | import re
|
| 3 | +from functools import cache |
3 | 4 |
|
| 5 | +from django.core.exceptions import FieldDoesNotExist |
4 | 6 | from greedybear.consts import REGEX_DOMAIN, REGEX_IP
|
5 | 7 | from greedybear.models import IOC, GeneralHoneypot
|
6 | 8 | from rest_framework import serializers
|
@@ -47,37 +49,85 @@ def validate(self, data):
|
47 | 49 | return data
|
48 | 50 |
|
49 | 51 |
|
50 |
| -def feed_type_validation(feed_type): |
51 |
| - feed_choices = ["log4j", "cowrie", "all"] |
52 |
| - generalHoneypots = GeneralHoneypot.objects.all().filter(active=True) |
53 |
| - feed_choices.extend([hp.name.lower() for hp in generalHoneypots]) # FEEDS |
| 52 | +def feed_type_validation(feed_type: str, valid_feed_types: frozenset) -> str: |
| 53 | + """Validates that a given feed type exists in the set of valid feed types. |
54 | 54 |
|
55 |
| - if feed_type not in feed_choices: |
56 |
| - logger.info(f"Feed type {feed_type} not in feed_choices {feed_choices}") |
57 |
| - raise serializers.ValidationError("Invalid feed_type") |
| 55 | + Args: |
| 56 | + feed_type (str): The feed type to validate |
| 57 | + valid_feed_types (frozenset): Set of allowed feed type values |
| 58 | +
|
| 59 | + Returns: |
| 60 | + str: The validated feed type string, unchanged |
| 61 | +
|
| 62 | + Raises: |
| 63 | + serializers.ValidationError: If feed_type is not found in valid_feed_types |
| 64 | + """ |
| 65 | + if feed_type not in valid_feed_types: |
| 66 | + logger.info(f"Feed type {feed_type} not in feed_choices {valid_feed_types}") |
| 67 | + raise serializers.ValidationError(f"Invalid feed_type: {feed_type}") |
58 | 68 | return feed_type
|
59 | 69 |
|
60 | 70 |
|
61 |
| -class FeedsSerializer(serializers.Serializer): |
| 71 | +@cache |
| 72 | +def ordering_validation(ordering: str) -> str: |
| 73 | + """Validates that given ordering corresponds to a field in the IOC model. |
| 74 | +
|
| 75 | + Args: |
| 76 | + ordering (str): The ordering to validate |
| 77 | +
|
| 78 | + Returns: |
| 79 | + str: The validated ordering string, unchanged |
| 80 | +
|
| 81 | + Raises: |
| 82 | + serializers.ValidationError: If ordering does not correspond to a field in the IOC model |
| 83 | + """ |
| 84 | + if not ordering: |
| 85 | + raise serializers.ValidationError("Invalid ordering: <empty string>") |
| 86 | + # remove minus sign if present |
| 87 | + field_name = ordering[1:] if ordering.startswith("-") else ordering |
| 88 | + try: |
| 89 | + IOC._meta.get_field(field_name) |
| 90 | + except FieldDoesNotExist as exc: |
| 91 | + raise serializers.ValidationError(f"Invalid ordering: {ordering}") from exc |
| 92 | + return ordering |
| 93 | + |
| 94 | + |
| 95 | +class FeedsRequestSerializer(serializers.Serializer): |
62 | 96 | feed_type = serializers.CharField(max_length=120)
|
63 | 97 | attack_type = serializers.ChoiceField(choices=["scanner", "payload_request", "all"])
|
64 |
| - age = serializers.ChoiceField(choices=["persistent", "recent"]) |
65 |
| - format = serializers.ChoiceField(choices=["csv", "json", "txt"], default="json") |
| 98 | + max_age = serializers.IntegerField(min_value=1) |
| 99 | + min_days_seen = serializers.IntegerField(min_value=1) |
| 100 | + include_reputation = serializers.ListField(child=serializers.CharField(max_length=120)) |
| 101 | + exclude_reputation = serializers.ListField(child=serializers.CharField(max_length=120)) |
| 102 | + feed_size = serializers.IntegerField(min_value=1) |
| 103 | + ordering = serializers.CharField(max_length=120) |
| 104 | + verbose = serializers.ChoiceField(choices=["true", "false"]) |
| 105 | + paginate = serializers.ChoiceField(choices=["true", "false"]) |
| 106 | + format = serializers.ChoiceField(choices=["csv", "json", "txt"]) |
66 | 107 |
|
67 | 108 | def validate_feed_type(self, feed_type):
|
68 |
| - logger.debug(f"FeedsSerializer - Validation feed_type: '{feed_type}'") |
69 |
| - return feed_type_validation(feed_type) |
| 109 | + logger.debug(f"FeedsRequestSerializer - validation feed_type: '{feed_type}'") |
| 110 | + return feed_type_validation(feed_type, self.context["valid_feed_types"]) |
| 111 | + |
| 112 | + def validate_ordering(self, ordering): |
| 113 | + logger.debug(f"FeedsRequestSerializer - validation ordering: '{ordering}'") |
| 114 | + return ordering_validation(ordering) |
70 | 115 |
|
71 | 116 |
|
72 | 117 | class FeedsResponseSerializer(serializers.Serializer):
|
73 | 118 | feed_type = serializers.CharField(max_length=120)
|
74 |
| - value = serializers.CharField(max_length=120) |
| 119 | + value = serializers.CharField(max_length=256) |
75 | 120 | scanner = serializers.BooleanField()
|
76 | 121 | payload_request = serializers.BooleanField()
|
77 | 122 | first_seen = serializers.DateField(format="%Y-%m-%d")
|
78 | 123 | last_seen = serializers.DateField(format="%Y-%m-%d")
|
79 |
| - times_seen = serializers.IntegerField() |
| 124 | + attack_count = serializers.IntegerField(min_value=1) |
| 125 | + interaction_count = serializers.IntegerField(min_value=1) |
| 126 | + ip_reputation = serializers.CharField(allow_blank=True, max_length=32) |
| 127 | + asn = serializers.IntegerField(allow_null=True, min_value=1) |
| 128 | + destination_port_count = serializers.IntegerField(min_value=0) |
| 129 | + login_attempts = serializers.IntegerField(min_value=0) |
80 | 130 |
|
81 | 131 | def validate_feed_type(self, feed_type):
|
82 | 132 | logger.debug(f"FeedsResponseSerializer - validation feed_type: '{feed_type}'")
|
83 |
| - return feed_type_validation(feed_type) |
| 133 | + return feed_type_validation(feed_type, self.context["valid_feed_types"]) |
0 commit comments