Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retarus Event Collector #38096

Merged
merged 33 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
b041d6b
generate new pack
YaelShamai Dec 17, 2024
ddfc673
small fix
YaelShamai Dec 18, 2024
558946a
continue
YaelShamai Dec 29, 2024
9b381f1
continue- skeleton is ready
YaelShamai Jan 1, 2025
ff82656
more
YaelShamai Jan 5, 2025
863ad5a
more
YaelShamai Jan 5, 2025
090dab3
working version with insecure implementation, no proxy implementation
YaelShamai Jan 7, 2025
10b842d
remove proxy parameter and code improvements
YaelShamai Jan 9, 2025
b4aadd5
more code improvements
YaelShamai Jan 9, 2025
a1ed84a
BaseIntegration Help
YaelShamai Jan 9, 2025
d19f3c3
start unit tests
YaelShamai Jan 9, 2025
e4457eb
changed datetime calls to not use the deprecated function
YaelShamai Jan 9, 2025
bf749d3
more unit tests
YaelShamai Jan 12, 2025
e2dd4b9
another unit test
YaelShamai Jan 15, 2025
2b63642
some code improvements
YaelShamai Jan 20, 2025
1b77816
fix needed fields for events
YaelShamai Jan 20, 2025
a984cd9
docs improvements
YaelShamai Jan 20, 2025
8e26117
add get-last-run-results command
YaelShamai Jan 21, 2025
3fe8479
code review fixes and more tests
YaelShamai Jan 22, 2025
cd55bb7
add troubleshooting section to readme
YaelShamai Jan 22, 2025
c54a8ac
.
YaelShamai Jan 22, 2025
09ea746
readme improvements
YaelShamai Jan 22, 2025
6f361c1
run pre commit
YaelShamai Jan 22, 2025
f824e99
build improvements
YaelShamai Jan 22, 2025
cd7e7f4
build improvements
YaelShamai Jan 22, 2025
17e650b
Merge branch 'master' into ys-event-collector
YaelShamai Jan 22, 2025
9d53591
build
YaelShamai Jan 22, 2025
5b0cd71
docs improvements
YaelShamai Jan 23, 2025
6907a5e
code review improvements
YaelShamai Jan 28, 2025
e66df89
pre commit
YaelShamai Jan 28, 2025
915c162
Merge branch 'master' into ys-event-collector
YaelShamai Jan 28, 2025
9491346
add command example
YaelShamai Jan 29, 2025
66f82f9
change command examples file name
YaelShamai Jan 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
Integrate Retarus Secure Email Gateway to seamlessly fetch events and enhance email security.

## Configure RetarusSecureEmailGateway Event Collector on Cortex XSIAM

1. Navigate to **Settings** > **Integrations** > **Servers & Services**.
2. Search for Retarus.
3. Click **Add instance** to create and configure a new integration instance.

| **Parameter** | **Required** | **Description** |
| --- | --- | --- |
| Server URL | True | |
| Token ID | True | |
| Channel name | False | The channel to fetch events from. In Retarus, a channel name represents a specific configuration or processing pipeline used to manage email traffic based on criteria like sender, recipient, domain, or metadata, enabling tailored routing, filtering, compliance, and logging rules.|
| Fetch interval in seconds | True | |
| Trust any certificate (not secure) | False | |

### Troubleshooting

If you encounter any issues with this integration, follow these steps to troubleshoot:
Run the retarus-get-last-run-results command to obtain detailed information about the errors and problems you are facing. This command provides insights into the last execution of the integration and helps you understand the root cause of the issues.
If you receive an HTTP 400 or HTTP 401 status code when running the command, verify the token provided in the instance configuration.

When opening a support case, include the results you obtained from running the retarus-get-last-run-results command.

Note, due to the Retarus API limitation, only one instance can be configured for each token and channel. It is important to note that while two instances with the same token but different channels are allowed, configuring two instances with the same token and channel may result in errors and/or missing events.
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
import demistomock as demisto # noqa: F401
from CommonServerPython import * # noqa: F401
from CommonServerUserPython import * # noqa
import urllib3
from websockets import Data
from websockets.sync.client import connect
from websockets.sync.connection import Connection
import traceback
import threading
from contextlib import contextmanager

# Disable insecure warnings
urllib3.disable_warnings()


''' CONSTANTS '''
VENDOR = "Retarus"
PRODUCT = "Secure Email Gateway"
FETCH_INTERVAL_IN_SECONDS = 60
FETCH_SLEEP = 5
SERVER_IDLE_TIMEOUT = 60
DEFAULT_CHANNEL = "default"
LOG_PREFIX = "Retarus-logs"


''' CLIENT CLASS '''


class EventConnection:
def __init__(self, connection: Connection, fetch_interval: int = FETCH_INTERVAL_IN_SECONDS,
idle_timeout: int = SERVER_IDLE_TIMEOUT): # pragma: no cover
self.connection = connection
self.lock = threading.Lock()
YaelShamai marked this conversation as resolved.
Show resolved Hide resolved
self.idle_timeout = idle_timeout
self.fetch_interval = fetch_interval

def recv(self, timeout: float | None = None) -> Data: # pragma: no cover
"""
Receive the next message from the connection

Args:
timeout (float): Block until timeout seconds have elapsed or a message is received. If None, waits indefinitely.
If timeout passes, raises TimeoutError

Returns:
Data: Next event received from the connection
"""
with self.lock:
demisto.debug("Locked the thread to recv a message")
event = self.connection.recv(timeout=timeout)
return event

def heartbeat(self): # pragma: no cover
"""
Heartbeat thread function to periodically send keep-alives to the server.
For the sake of simplicity and error prevention, keep-alives are sent regardless of the actual connection activity.
"""
while True:
with self.lock:
demisto.debug("Locked the thread to pong the connection")
self.connection.pong()
time.sleep(self.idle_timeout)


''' HELPER FUNCTIONS '''


def push_events(events: list[dict]): # pragma: no cover
"""
Push events to XSIAM.
"""
send_events_to_xsiam(events=events, vendor=VENDOR, product=PRODUCT)
demisto.debug(f"{LOG_PREFIX} Pushed {len(events)} to XSIAM successfully")


@contextmanager
def websocket_connection(url: str, token_id: str, fetch_interval: int, channel: str, verify_ssl: bool): # pragma: no cover
"""
Create a connection to the api.

Args:
url (str): host URL for the websocket connection.
channel (str): Retarus channel to connect through.
token_id (str): Retarus token id.
fetch_interval (int): Time between fetch iterations.
verify_ssl (bool): Whether to verify ssl when connecting.

Yields:
EventConnection: eventConnection to receive events from.
"""
extra_headers = {"Authorization": f"Bearer {token_id}"}

ssl_context = ssl.create_default_context()
if not verify_ssl:
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

try:
with connect("wss://" + url + f"/email/siem/v1/websocket?channel={channel}",
additional_headers=extra_headers,
ssl=ssl_context) as ws:
connection = EventConnection(
connection=ws,
fetch_interval=fetch_interval
)
set_the_integration_context(
"last_run_results", f"Opened a connection successfully at {datetime.now().astimezone(timezone.utc)}")
yield connection

except Exception as e:
set_the_integration_context("last_run_results",
f"{str(e)} \n This error happened at {datetime.now().astimezone(timezone.utc)}")
raise DemistoException(f"{str(e)}\n")


def set_the_integration_context(key: str, val): # pragma: no cover
"""Adds a key-value pair to the integration context dictionary.
If the key already exists in the integration context, the function will overwrite the existing value with the new one.
"""
cnx = demisto.getIntegrationContext()
cnx[key] = val
demisto.setIntegrationContext(cnx)


def is_interval_passed(fetch_start_time: datetime, fetch_interval: int) -> bool: # pragma: no cover
"""Checks if the specified interval has passed since the given start time.
This function is used within the fetch_events function to determine if the time to fetch events is over or not.

Args:
fetch_start_time (datetime): The start time of the interval
fetch_interval (int): The interval in seconds

Returns:
bool: True if the interval has passed, False otherwise
"""
is_interval_passed = fetch_start_time + timedelta(seconds=fetch_interval) < datetime.now().astimezone(timezone.utc)
demisto.debug(f"returning {is_interval_passed=}")
return is_interval_passed


def perform_long_running_loop(connection: EventConnection, fetch_interval: int):
"""
Long running loop iteration function. Fetches events from the connection and sends them to XSIAM.

Args:
connection (EventConnection): A connection object to fetch events from.
fetch_interval (int): Fetch time for this fetching events cycle.
"""
demisto.debug(f"{LOG_PREFIX} starting to fetch events")
events = fetch_events(connection, fetch_interval)
YaelShamai marked this conversation as resolved.
Show resolved Hide resolved
demisto.debug(f'{LOG_PREFIX} Adding {len(events)} Events to XSIAM')

# Send the events to the XSIAM.
try:
send_events_to_xsiam(events, vendor=VENDOR, product=PRODUCT)
YaelShamai marked this conversation as resolved.
Show resolved Hide resolved
demisto.debug("Sended events to XSIAM successfully")
except DemistoException:
demisto.error(f"Failed to send events to XSIAM. Error: {traceback.format_exc()}")


''' COMMAND FUNCTIONS '''


def long_running_execution_command(url, token_id, fetch_interval, channel, verify_ssl):
"""
Performs the long running execution loop.
Opens a connection to Retarus.
Heartbeat thread is opened for the connection to send keepalives if the connection is idle for too long.

Args:
url (str): URL for the websocket connection.
token_id (str): Retarus token_id to connect to.
channel (str): channel to connect with.
fetch_interval (int): Total time allocated per fetch cycle.
verify_ssl (bool): Whether to verify ssl when opening the connection.
"""
with websocket_connection(url, token_id, fetch_interval, channel, verify_ssl) as connection:
demisto.info(f"{LOG_PREFIX} Connected to websocket")

# Retarus will keep connections with no traffic open for at most 5 minutes.
# It is highly recommended that the client sends a PING control frame every 60 seconds to keep the connection open.
# (sentence taken from Retarus API docs)
# Setting up heartbeat daemon threads to send keep-alives if needed
threading.Thread(target=connection.heartbeat, daemon=True).start()
demisto.debug(f"{LOG_PREFIX} Created heartbeat")

while True:
perform_long_running_loop(connection, fetch_interval)
# sleep for a bit to not throttle the CPU
time.sleep(FETCH_SLEEP)
YaelShamai marked this conversation as resolved.
Show resolved Hide resolved


def test_module(): # pragma: no cover
raise DemistoException(
"No test option is available due to API limitations.\
To verify the configuration, run the retarus-get-last-run-results command and ensure it returns no errors.")


def get_last_run_results_command():
last_run_results = demisto.getIntegrationContext().get("last_run_results")
if last_run_results:
return CommandResults(readable_output=last_run_results)
else:
return CommandResults(readable_output="No results from the last run yet. Ensure that a Retarus instance \
is configured and enabled. If it is, please wait one minute and try running the command again.")


def fetch_events(connection: EventConnection, fetch_interval: int, recv_timeout: int = 10) -> list[dict]:
"""
This function fetches events from the given connection, for the given fetch interval

Args:
connection (EventConnection): the connection to the event type
fetch_interval (int): Total time to keep fetching before stopping
recv_timeout (int): The timeout for the receive function in the socket connection

Returns:
list[dict]: A list of events
"""
events: list[dict] = []
event_ids = set()
fetch_start_time = datetime.now().astimezone(timezone.utc)
demisto.debug(f'{LOG_PREFIX} Starting to fetch events at {fetch_start_time}')

while not is_interval_passed(fetch_start_time, fetch_interval):
try:
event = json.loads(connection.recv(timeout=recv_timeout))
except TimeoutError:
continue
except Exception as e:
set_the_integration_context("last_run_results",
f"{str(e)} \n This error happened at {datetime.now().astimezone(timezone.utc)}")
raise DemistoException(str(e))

event_id = event.get("rmxId")
event_ts = event.get("ts")
if not event_ts:
# if timestamp is not in the response, use the current time
demisto.debug(f"{LOG_PREFIX} Event {event_id} does not have a timestamp, using current time")
event_ts = datetime.now().isoformat()

date = dateparser.parse(event_ts)
if not date:
demisto.debug(f"{LOG_PREFIX} Event {event_id} has an invalid timestamp, using current time")
# if timestamp is not in correct format, use the current time
date = datetime.now()

event["_time"] = date.astimezone(timezone.utc).isoformat()
event["SOURCE_LOG_TYPE"] = event.get("type")

events.append(event)
event_ids.add(event_id)

num_events = len(events)
demisto.debug(f"{LOG_PREFIX} Fetched {num_events} events")
demisto.debug(f"{LOG_PREFIX} The fetched events ids are: " + ", ".join([str(event_id) for event_id in event_ids]))

set_the_integration_context("last_run_results",
f"Got from connection {num_events} events starting\
at {str(fetch_start_time)} untill {datetime.now().astimezone(timezone.utc)}")
return events


''' MAIN FUNCTION '''


def main(): # pragma: no cover
command = demisto.command()
params = demisto.params()
url = params["url"]
token_id = params.get("credentials", {}).get("password", "")
fetch_interval = arg_to_number(params.get("fetch_interval", FETCH_INTERVAL_IN_SECONDS))
verify_ssl = argToBoolean(not params.get("insecure", False))
channel = params.get("channel", DEFAULT_CHANNEL)

YaelShamai marked this conversation as resolved.
Show resolved Hide resolved
demisto.debug(f"{LOG_PREFIX} command being called is {command}")

try:
if command == "long-running-execution":
return_results(long_running_execution_command(url, token_id, fetch_interval, channel, verify_ssl))
elif command == "retarus-get-last-run-results":
return_results(get_last_run_results_command())
elif command == "test-module":
return_results(test_module())
else:
raise NotImplementedError(f"Command {command} is not implemented.")
except Exception:
return_error(f'Failed to execute {command} command.\nError:\n{traceback.format_exc()}')


if __name__ in ('__main__', '__builtin__', 'builtins'):
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
commonfields:
id: Retarus Secure Email Gateway
version: -1
sectionOrder:
- Connect
- Collect
name: Retarus Secure Email Gateway
display: Retarus Secure Email Gateway
category: Email
description: Integrate Retarus Secure Email Gateway to seamlessly fetch events from Secure Email Gateway by Retarus and enhance email security.
configuration:
- defaultvalue: events.retarus.com
display: Server URL
name: url
required: true
type: 0
section: Connect
- displaypassword: Token ID
additionalinfo: The Token ID to use for connection.
name: credentials
required: true
hiddenusername: true
type: 9
section: Connect
- defaultvalue: default
display: Channel name
section: Collect
name: channel
type: 0
- display: Fetch interval in seconds
name: fetch_interval
type: 0
defaultvalue: 60
required: true
section: Collect
advanced: true
- defaultvalue: 'true'
display: Long Running Instance
hidden: true
name: longRunning
type: 8
section: Connect
- display: Trust any certificate (not secure)
name: insecure
type: 8
required: false
section: Connect
script:
script: ""
type: python
commands:
- name: "retarus-get-last-run-results"
description: Retrieves the results of a connection attempt to Retarus, indicating whether it was successful or failed and why. If event fetching has been initiated, this command provides the results of the most recent fetch attempt.
subtype: python3
dockerimage: demisto/netutils:1.0.0.115452
longRunning: true
marketplaces:
- marketplacev2
fromversion: 6.9.0
tests:
- No tests
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
## Retarus Integration help

Provide the API token Retarus created for you.

Provide the channel you created or use the default channel.
In Retarus, a channel name represents a specific configuration or processing pipeline used to manage email traffic based on criteria like sender, recipient, domain, or metadata, enabling tailored routing, filtering, compliance, and logging rules.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Loading