Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Introduce ability to ignore requests on initial "ADDED" events #1

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ If the filename ends with `.url` suffix, the content will be processed as a URL
| `REQ_RETRY_CONNECT` | How many connection-related errors to retry on for any http request (`*.url` triggered requests, requests to `REQ_URI` and k8s api requests) | false | `10` | integer |
| `REQ_RETRY_READ` | How many times to retry on read errors for any http request (`.url` triggered requests, requests to `REQ_URI` and k8s api requests) | false | `5` | integer |
| `REQ_RETRY_BACKOFF_FACTOR` | A backoff factor to apply between attempts after the second try for any http request (`.url` triggered requests, requests to `REQ_URI` and k8s api requests) | false | `1.1` | float |
| `REQ_IGNORE_INITIAL_EVENT` | Set to `true` to ignore requests for first events or initial list for configmaps and secrets events. Applicable only when `IGNORE_ALREADY_PROCESSED` is enabled. | false | `false` | boolean |
| `REQ_TIMEOUT` | How many seconds to wait for the server to send data before giving up for `.url` triggered requests or requests to `REQ_URI` (does not apply to k8s api requests) | false | `10` | float |
| `REQ_USERNAME` | Username to use for basic authentication for requests to `REQ_URL` and for `*.url` triggered requests | false | - | string |
| `REQ_PASSWORD` | Password to use for basic authentication for requests to `REQ_URL` and for `*.url` triggered requests | false | - | string |
Expand Down
28 changes: 22 additions & 6 deletions src/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _get_destination_folder(metadata, default_folder, folder_annotation):

def list_resources(label, label_value, target_folder, request_url, request_method, request_payload,
namespace, folder_annotation, resource, unique_filenames, script, enable_5xx,
ignore_already_processed):
ignore_already_processed, request_ignore_initial_event):
v1 = client.CoreV1Api()
# Filter resources based on label and value or just label
label_selector = f"{label}={label_value}" if label_value else label
Expand All @@ -87,6 +87,7 @@ def list_resources(label, label_value, target_folder, request_url, request_metho
ret = getattr(v1, _list_namespace[namespace][resource])(**additional_args)

files_changed = False
ignore_request = False

# For all the found resources
for item in ret.items:
Expand All @@ -99,6 +100,8 @@ def list_resources(label, label_value, target_folder, request_url, request_metho
logger.debug(f"Ignoring {resource} {metadata.namespace}/{metadata.name}")
continue

logger.debug(f"Initial list for {resource} {metadata.namespace}/{metadata.name}")
ignore_request = True
_resources_version_map[metadata.namespace + metadata.name] = metadata.resource_version

logger.debug(f"Working on {resource}: {metadata.namespace}/{metadata.name}")
Expand All @@ -114,6 +117,10 @@ def list_resources(label, label_value, target_folder, request_url, request_metho
if script and files_changed:
execute(script)

if request_ignore_initial_event and ignore_request:
logger.debug(f"Ignoring sending request for initial list {resource} {metadata.namespace}/{metadata.name}")
return

if request_url and files_changed:
request(request_url, request_method, enable_5xx, request_payload)

Expand Down Expand Up @@ -206,7 +213,7 @@ def _update_file(data_key, data_content, dest_folder, metadata, resource,

def _watch_resource_iterator(label, label_value, target_folder, request_url, request_method, request_payload,
namespace, folder_annotation, resource, unique_filenames, script, enable_5xx,
ignore_already_processed):
ignore_already_processed, request_ignore_initial_event):
v1 = client.CoreV1Api()
# Filter resources based on label and value or just label
label_selector = f"{label}={label_value}" if label_value else label
Expand All @@ -219,6 +226,8 @@ def _watch_resource_iterator(label, label_value, target_folder, request_url, req
if namespace != "ALL":
additional_args['namespace'] = namespace

ignore_request = False

stream = watch.Watch().stream(getattr(v1, _list_namespace[namespace][resource]), **additional_args)

# Process events
Expand All @@ -238,6 +247,9 @@ def _watch_resource_iterator(label, label_value, target_folder, request_url, req
_resources_version_map.pop(metadata.namespace + metadata.name)

if event_type == "ADDED" or event_type == "MODIFIED":
if request_ignore_initial_event and _resources_version_map.get(metadata.namespace + metadata.name) is None:
logger.debug(f"Initial event for {event_type} {resource} {metadata.namespace}/{metadata.name}")
ignore_request = True
_resources_version_map[metadata.namespace + metadata.name] = metadata.resource_version

logger.debug(f"Working on {event_type} {resource} {metadata.namespace}/{metadata.name}")
Expand All @@ -257,6 +269,10 @@ def _watch_resource_iterator(label, label_value, target_folder, request_url, req
if script and files_changed:
execute(script)

if request_ignore_initial_event and ignore_request:
logger.debug(f"Ignoring sending request for initial {event_type} {resource} {metadata.namespace}/{metadata.name}")
return

if request_url and files_changed:
request(request_url, request_method, enable_5xx, request_payload)

Expand Down Expand Up @@ -287,11 +303,11 @@ def _watch_resource_loop(mode, *args):

def watch_for_changes(mode, label, label_value, target_folder, request_url, request_method, request_payload,
current_namespace, folder_annotation, resources, unique_filenames, script, enable_5xx,
ignore_already_processed):
ignore_already_processed, request_ignore_initial_event):
processes = _start_watcher_processes(current_namespace, folder_annotation, label,
label_value, request_method, mode, request_payload, resources,
target_folder, unique_filenames, script, request_url, enable_5xx,
ignore_already_processed)
ignore_already_processed, request_ignore_initial_event)

while True:
died = False
Expand All @@ -311,14 +327,14 @@ def watch_for_changes(mode, label, label_value, target_folder, request_url, requ

def _start_watcher_processes(namespace, folder_annotation, label, label_value, request_method,
mode, request_payload, resources, target_folder, unique_filenames, script, request_url,
enable_5xx, ignore_already_processed):
enable_5xx, ignore_already_processed, request_ignore_initial_event):
processes = []
for resource in resources:
for ns in namespace.split(','):
proc = Process(target=_watch_resource_loop,
args=(mode, label, label_value, target_folder, request_url, request_method, request_payload,
ns, folder_annotation, resource, unique_filenames, script, enable_5xx,
ignore_already_processed)
ignore_already_processed, request_ignore_initial_event)
)
proc.daemon = True
proc.start()
Expand Down
8 changes: 7 additions & 1 deletion src/sidecar.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
REQ_PAYLOAD = "REQ_PAYLOAD"
REQ_URL = "REQ_URL"
REQ_METHOD = "REQ_METHOD"
REQ_IGNORE_INITIAL_EVENT = "REQ_IGNORE_INITIAL_EVENT"
SCRIPT = "SCRIPT"
ENABLE_5XX = "ENABLE_5XX"
IGNORE_ALREADY_PROCESSED = "IGNORE_ALREADY_PROCESSED"
Expand Down Expand Up @@ -103,6 +104,11 @@ def main():
if not ignore_already_processed:
logger.debug("Ignore already processed resource version will not be enabled.")

request_ignore_initial_event = os.getenv(REQ_IGNORE_INITIAL_EVENT) and ignore_already_processed

if request_ignore_initial_event:
logger.debug("Initial list or first event for a given resource will skip requests to a given URL.")

with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace") as f:
namespace = os.getenv("NAMESPACE", f.read())

Expand All @@ -116,7 +122,7 @@ def main():
else:
watch_for_changes(method, label, label_value, target_folder, request_url, request_method, request_payload,
namespace, folder_annotation, resources, unique_filenames, script, enable_5xx,
ignore_already_processed)
ignore_already_processed, request_ignore_initial_event)


def _initialize_kubeclient_configuration():
Expand Down