From 5d40c5240fdd2a137ca4813e6e1b3458257f34b0 Mon Sep 17 00:00:00 2001 From: Donny Peeters Date: Wed, 15 Jan 2025 15:36:18 +0100 Subject: [PATCH] Test the https-availability nibble-query Use or-join to make the tests pass --- octopoes/nibbles/https_availability/nibble.py | 2 +- .../test_https_availability_nibble.py | 60 ++++++++++++++++++- 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/octopoes/nibbles/https_availability/nibble.py b/octopoes/nibbles/https_availability/nibble.py index 09f2f99413c..2c480880649 100644 --- a/octopoes/nibbles/https_availability/nibble.py +++ b/octopoes/nibbles/https_availability/nibble.py @@ -23,7 +23,7 @@ def pull(statements: list[str]) -> str: [?ipservice :IPService/ip_port ?ipport80] [?ipport80 :IPPort/port 80] [?ipport80 :IPPort/address ?ipaddress] - (or + (or-join [?ipport443] (and [?ipport443 :IPPort/address ?ipaddress][?ipport443 :IPPort/port 443]) [(identity nil) ?ipport443] ) diff --git a/octopoes/tests/integration/test_https_availability_nibble.py b/octopoes/tests/integration/test_https_availability_nibble.py index 314078d806a..8dc509e9a5f 100644 --- a/octopoes/tests/integration/test_https_availability_nibble.py +++ b/octopoes/tests/integration/test_https_availability_nibble.py @@ -1,5 +1,6 @@ import os from datetime import datetime +from itertools import permutations from unittest.mock import Mock import pytest @@ -11,14 +12,69 @@ from octopoes.models.ooi.network import IPAddressV4, IPPort, Network, Protocol from octopoes.models.ooi.service import IPService, Service from octopoes.models.ooi.web import HostnameHTTPURL, HTTPHeader, HTTPResource, WebScheme, Website +from octopoes.repositories.ooi_repository import XTDBOOIRepository if os.environ.get("CI") != "1": pytest.skip("Needs XTDB multinode container.", allow_module_level=True) -STATIC_IP = ".".join((4 * "1 ").split()) +STATIC_IP = ".".join(4 * ["1"]) -def test_https_availability_query(xtdb_octopoes_service: OctopoesService, event_manager: Mock, valid_time: datetime): +def create_port(xtdb_ooi_repository: XTDBOOIRepository, ip: str, port: int, valid_time: datetime) -> IPAddressV4: + network = Network(name="internet") + ip = IPAddressV4(address=ip, network=network.reference) + port = IPPort(port=port, address=ip.reference, protocol=Protocol.TCP) + hostname = Hostname(name="example.com", network=network.reference) + service = Service(name="http") + ip_service = IPService(ip_port=port.reference, service=service.reference) + website = Website(ip_service=ip_service.reference, hostname=hostname.reference) + + xtdb_ooi_repository.save(network, valid_time) + xtdb_ooi_repository.save(port, valid_time) + xtdb_ooi_repository.save(ip, valid_time) + xtdb_ooi_repository.save(service, valid_time) + xtdb_ooi_repository.save(ip_service, valid_time) + xtdb_ooi_repository.save(hostname, valid_time) + xtdb_ooi_repository.save(website, valid_time) + + xtdb_ooi_repository.commit() + + return ip + + +def ip_generator(): + for ip in permutations(range(1, 256), 4): + yield ".".join([str(x) for x in ip]) + + +def test_https_availability_query(xtdb_ooi_repository: XTDBOOIRepository, event_manager: Mock, valid_time: datetime): + query = ('{ :query { :find [(pull ?ipaddress [*]) (pull ?ipport80 [*]) (pull ?website [*]) (- (count ?ipport443) 1)] :where [' + '[?website :Website/ip_service ?ip_service]' + '[?ipservice :IPService/ip_port ?ipport80]' + '[?ipport80 :IPPort/port 80]' + '[?ipport80 :IPPort/address ?ipaddress] ' + '(or-join [?ipport443] ' + '(and [?ipport443 :IPPort/address ?ipaddress] [?ipport443 :IPPort/port 443] )' + '[(identity nil) ?ipport443]' + ')]}}') + ip = ip_generator() + + first_ip = create_port(xtdb_ooi_repository, next(ip), 80, valid_time) + assert xtdb_ooi_repository.session.client.query(query)[0][-1] == 0 + + for _ in range(20): + create_port(xtdb_ooi_repository, next(ip), 80, valid_time) + assert xtdb_ooi_repository.session.client.query(query)[0][-1] == 0 + + create_port(xtdb_ooi_repository, str(first_ip.address), 443, valid_time) + assert xtdb_ooi_repository.session.client.query(query)[0][-1] == 1 + + for _ in range(12): + create_port(xtdb_ooi_repository, next(ip), 443, valid_time) + assert xtdb_ooi_repository.session.client.query(query)[0][-1] == 13 + + +def test_https_availability(xtdb_octopoes_service: OctopoesService, event_manager: Mock, valid_time: datetime): xtdb_octopoes_service.nibbler.nibbles = {https_availability.id: https_availability} network = Network(name="internet")