Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,34 @@ def validate_ordering(self, ordering):
return ordering_validation(ordering)


# NOTE: While the FeedsRequestSerializer enforces ordering on db
# Model fields, aggregation requires ordering by non-model, which is annotated fields
class ASNFeedsOrderingSerializer(FeedsRequestSerializer):
ALLOWED_ORDERING_FIELDS = frozenset(
{
"asn",
"ioc_count",
"total_attack_count",
"total_interaction_count",
"total_login_attempts",
"expected_ioc_count",
"expected_interactions",
"first_seen",
"last_seen",
}
)

def validate_ordering(self, ordering):
field_name = ordering.lstrip("-").strip()

if field_name not in self.ALLOWED_ORDERING_FIELDS:
raise serializers.ValidationError(
{f"Invalid ordering field for ASN aggregated feed: '{field_name}'. Allowed fields: {', '.join(sorted(self.ALLOWED_ORDERING_FIELDS))}"}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a set literal but should be a string, right?

)

return ordering


class FeedsResponseSerializer(serializers.Serializer):
"""
Serializer for feed response data structure.
Expand Down
2 changes: 2 additions & 0 deletions api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
enrichment_view,
feeds,
feeds_advanced,
feeds_asn,
feeds_pagination,
general_honeypot_list,
)
Expand All @@ -22,6 +23,7 @@
urlpatterns = [
path("feeds/", feeds_pagination),
path("feeds/advanced/", feeds_advanced),
path("feeds/asn/", feeds_asn),
path("feeds/<str:feed_type>/<str:attack_type>/<str:prioritize>.<str:format_>", feeds),
path("enrichment", enrichment_view),
path("cowrie_session", cowrie_session_view),
Expand Down
45 changes: 45 additions & 0 deletions api/views/feeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
permission_classes,
)
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response

from api.serializers import ASNFeedsOrderingSerializer
from api.views.utils import (
FeedRequestParams,
asn_aggregated_queryset,
feeds_response,
get_queryset,
get_valid_feed_types,
Expand Down Expand Up @@ -116,3 +119,45 @@ def feeds_advanced(request):
resp_data = feeds_response(iocs, feed_params, valid_feed_types, dict_only=True, verbose=verbose)
return paginator.get_paginated_response(resp_data)
return feeds_response(iocs_queryset, feed_params, valid_feed_types, verbose=verbose)


@api_view(["GET"])
@authentication_classes([CookieTokenAuthentication])
@permission_classes([IsAuthenticated])
def feeds_asn(request):
"""
Retrieve aggregated IOC feed data grouped by ASN (Autonomous System Number).

Args:
request: The HTTP request object.
feed_type (str): Filter by feed type (e.g., 'cowrie', 'log4j'). Default: 'all'.
attack_type (str): Filter by attack type (e.g., 'scanner', 'payload_request'). Default: 'all'.
max_age (int): Maximum age of IOCs in days. Default: 3.
min_days_seen (int): Minimum days an IOC must have been observed. Default: 1.
exclude_reputation (str): ';'-separated reputations to exclude (e.g., 'mass scanner'). Default: none.
ordering (str): Aggregation ordering field (e.g., 'total_attack_count', 'asn'). Default: '-ioc_count'.
asn (str, optional): Filter results to a single ASN.

Returns:
Response: HTTP response with a JSON list of ASN aggregation objects.
Each object contains:
asn (int): ASN number.
ioc_count (int): Number of IOCs for this ASN.
total_attack_count (int): Sum of attack_count for all IOCs.
total_interaction_count (int): Sum of interaction_count for all IOCs.
total_login_attempts (int): Sum of login_attempts for all IOCs.
honeypots (List[str]): Sorted list of unique honeypots that observed these IOCs.
expected_ioc_count (float): Sum of recurrence_probability for all IOCs, rounded to 4 decimals.
expected_interactions (float): Sum of expected_interactions for all IOCs, rounded to 4 decimals.
first_seen (DateTime): Earliest first_seen timestamp among IOCs.
last_seen (DateTime): Latest last_seen timestamp among IOCs.
"""
logger.info(f"request /api/feeds/asn/ with params: {request.query_params}")
feed_params = FeedRequestParams(request.query_params)
valid_feed_types = get_valid_feed_types()

iocs_qs = get_queryset(request, feed_params, valid_feed_types, is_aggregated=True, serializer_class=ASNFeedsOrderingSerializer)

asn_aggregates = asn_aggregated_queryset(iocs_qs, request, feed_params)
data = list(asn_aggregates)
return Response(data)
93 changes: 89 additions & 4 deletions api/views/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from django.conf import settings
from django.contrib.postgres.aggregates import ArrayAgg
from django.db.models import F
from django.db.models import Count, F, Max, Min, Q, Sum
from django.http import HttpResponse, HttpResponseBadRequest, StreamingHttpResponse
from rest_framework import status
from rest_framework.response import Response
Expand Down Expand Up @@ -121,14 +121,23 @@ def get_valid_feed_types() -> frozenset[str]:
return frozenset(feed_types)


def get_queryset(request, feed_params, valid_feed_types):
def get_queryset(request, feed_params, valid_feed_types, is_aggregated=False, serializer_class=FeedsRequestSerializer):
"""
Build a queryset to filter IOC data based on the request parameters.

Args:
request: The incoming request object.
feed_params: A FeedRequestParams instance.
valid_feed_types (frozenset): The set of all valid feed types.
is_aggregated (bool, optional):
- If True, disables slicing (`feed_size`) and model-level ordering.
- Ensures full dataset is available for aggregation or specialized computation.
- Default: False.
serializer_class (class, optional):
- Serializer class used to validate request parameters.
- Allows injecting a custom serializer to enforce rules for specific feed types
(e.g., to restrict ordering fields or validation for specialized feeds).
- Default: `FeedsRequestSerializer`.

Returns:
QuerySet: The filtered queryset of IOC data.
Expand All @@ -139,7 +148,7 @@ def get_queryset(request, feed_params, valid_feed_types):
f"Age: {feed_params.max_age}, format: {feed_params.format}"
)

serializer = FeedsRequestSerializer(
serializer = serializer_class(
data=vars(feed_params),
context={"valid_feed_types": valid_feed_types},
)
Expand Down Expand Up @@ -168,9 +177,14 @@ def get_queryset(request, feed_params, valid_feed_types):
.annotate(value=F("name"))
.annotate(honeypots=ArrayAgg("general_honeypot__name"))
.distinct()
.order_by(feed_params.ordering)[: int(feed_params.feed_size)]
)

# aggregated endpoints should operate on the full queryset
# to compute sums, counts, and other metrics correctly.
if not is_aggregated:
iocs = iocs.order_by(feed_params.ordering)
iocs = iocs[: int(feed_params.feed_size)]

Comment on lines +182 to +187
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

# save request source for statistics
source_ip = str(request.META["REMOTE_ADDR"])
request_source = Statistics(source=source_ip)
Expand Down Expand Up @@ -317,3 +331,74 @@ def is_sha256hash(string: str) -> bool:
bool: True if the string is a valid SHA-256 hash, False otherwise
"""
return bool(re.fullmatch(r"^[A-Fa-f0-9]{64}$", string))


def resolve_aggregation_ordering(ordering, *, default, fallback_fields=None):
"""
Resolve effective ordering for aggregated endpoints.

Args
ordering (str or None): The user-provided ordering string from query params.
default (str): The default ordering to use if `ordering` is None or in fallback_fields.
fallback_fields (set[str], optional): A set of orderings that are allowed in other
contexts but should be overridden here. Defaults to None.

Returns
str: A safe ordering string to use directly in the aggregation query.
"""
fallback_fields = fallback_fields or set()

if not ordering or ordering in fallback_fields:
return default

return ordering
Comment on lines +336 to +354
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little overengineered in my opinion. I think instead of this, we can just return -ioc_count in the ASNFeedsOrderingSerializer if the validation fails. Then, if a user requests a supported ordering, it just works and if not, the results are ordered by -ioc_count. Or am I missing something?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default ordering (-ioc_count) cannot be enforced in the serializer because by the time the aggregator runs, feed_params.ordering is already populated (default -last_seen) from FeedRequestParams. So the serializer never sees it as missing.

I acknowledge that using resolve_aggregation_ordering was over-engineered, and I agree that we can simplify it by adding a small override directly in asn_aggregated_queryset, like this:

if not ordering or ordering.strip() in {"", "-last_seen"}:
    ordering = "-ioc_count"

This keeps things simple while ensuring the default ordering works correctly. If you’re okay with it, I can push these changes but if you have a better approach, I’d love to apply that instead.



def asn_aggregated_queryset(iocs_qs, request, feed_params):
"""
Perform DB-level aggregation grouped by ASN.

Args
iocs_qs (QuerySet): Filtered IOC queryset from get_queryset;
request (Request): The API request object;
feed_params (FeedRequestParams): Validated parameter object

Returns: A values-grouped queryset with annotated metrics and honeypot arrays.
"""
# optional asn params for single asn filter
asn_filter = request.query_params.get("asn")
if asn_filter:
iocs_qs = iocs_qs.filter(asn=asn_filter)

aggregated = (
iocs_qs.exclude(asn__isnull=True)
.values("asn")
.annotate(
ioc_count=Count("id", distinct=True),
total_attack_count=Sum("attack_count", distinct=True),
total_interaction_count=Sum("interaction_count", distinct=True),
total_login_attempts=Sum("login_attempts", distinct=True),
expected_ioc_count=Sum("recurrence_probability", distinct=True),
expected_interactions=Sum("expected_interactions", distinct=True),
Comment on lines +377 to +382
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain why you are using distinct=True here? For the Count it does not really do anything, because id is unique and for Sum I can't find any documentation of what the distinct argument actually does.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing that out. To be frank, I had used distinct as a quick workaround, but the root cause of the error was row duplication in the SQL query. This happened because of the Many-to-Many join from .filter(general_honeypot__active=True) combined with ArrayAgg on the honeypot field in get_queryset.

This duplication caused Count("id") and other aggregated fields to be inflated (e.g., 4 instead of 2), which led to the test failures. At first, I thought it was related to ordering or feed_size in get_queryset, and even considered a possible test case issue, but the real problem was the join itself.

Screenshot from 2026-01-24 15-30-30

Using distinct=True on Count was only a workaround it masks the issue but doesn’t fix it properly and can break Sum when values repeat and sorry , I realized this issue later after you pointed out.

The clean solution i think to separate numeric aggregation from the M2M honeypot aggregation in asn_aggreated_queryset and little refactor in get_queryset. That way, counts and sums stay accurate without duplication tricks.

The changes in get_queryset would roughly reflect this approach.

iocs = (
        IOC.objects.filter(**query_dict)
        .exclude(ip_reputation__in=feed_params.exclude_reputation)
        .annotate(value=F("name"))
        
        .distinct()
    )

    # aggregated endpoints should operate on the full queryset
    # to compute sums, counts, and other metrics correctly.
    if not is_aggregated:
        iocs= iocs.filter(general_honeypot__active=True)
        iocs = iocs.annotate(honeypots=ArrayAgg("general_honeypot__name"))
        iocs = iocs.order_by(feed_params.ordering)
        iocs = iocs[:int(feed_params.feed_size)]

what do you think?

honeypots=ArrayAgg(
"general_honeypot__name",
filter=Q(general_honeypot__name__isnull=False),
distinct=True,
),
first_seen=Min("first_seen"),
last_seen=Max("last_seen"),
)
)

resolved_ordering = resolve_aggregation_ordering(
ordering=feed_params.ordering,
default="-ioc_count",
fallback_fields={"-last_seen"},
)

direction = "-" if resolved_ordering.startswith("-") else ""
field = resolved_ordering.lstrip("-").strip()

aggregated = aggregated.order_by(f"{direction}{field}")
Comment on lines +393 to +402
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also very complicated. Can we also rely on the Serializer here?


return aggregated
Loading
Loading