Skip to content

Commit

Permalink
Gcr in process collector (#339)
Browse files Browse the repository at this point in the history
* initial gcr collector flow

* adding the specific google cloud run metrics

* small cleanup

* adding tests for gcr

* add headers to the request

* add debug logs for testing

* add debug logs for testing

* fixing parsing gcr response

* update version

* changes in regards to PR comments and cleanup

* use request mocking in tests

* add requests mock in python27 requirements

* fixing typo

* Update instana/agent/google_cloud_run.py

Co-authored-by: Andrey Slotin <[email protected]>

* Update instana/agent/google_cloud_run.py

Co-authored-by: Andrey Slotin <[email protected]>

* Update instana/collector/helpers/google_cloud_run/process.py

Co-authored-by: Andrey Slotin <[email protected]>

* Update instana/collector/helpers/google_cloud_run/process.py

Co-authored-by: Andrey Slotin <[email protected]>

* PR review fixes

* fix conflict

Co-authored-by: Andrey Slotin <[email protected]>
  • Loading branch information
pdimitra and Andrey Slotin authored Oct 20, 2021
1 parent b7b6882 commit f3d3ab8
Show file tree
Hide file tree
Showing 26 changed files with 665 additions and 57 deletions.
11 changes: 0 additions & 11 deletions instana/agent/aws_fargate.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,12 @@
from ..version import VERSION


class AWSFargateFrom(object):
""" The source identifier for AWSFargateAgent """
hl = True
cp = "aws"
e = "taskDefinition"

def __init__(self, **kwds):
self.__dict__.update(kwds)


class AWSFargateAgent(BaseAgent):
""" In-process agent for AWS Fargate """
def __init__(self):
super(AWSFargateAgent, self).__init__()

self.options = AWSFargateOptions()
self.from_ = AWSFargateFrom()
self.collector = None
self.report_headers = None
self._can_send = False
Expand Down
11 changes: 0 additions & 11 deletions instana/agent/aws_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,11 @@
from ..options import AWSLambdaOptions


class AWSLambdaFrom(object):
""" The source identifier for AWSLambdaAgent """
hl = True
cp = "aws"
e = "qualifiedARN"

def __init__(self, **kwds):
self.__dict__.update(kwds)


class AWSLambdaAgent(BaseAgent):
""" In-process agent for AWS Lambda """
def __init__(self):
super(AWSLambdaAgent, self).__init__()

self.from_ = AWSLambdaFrom()
self.collector = None
self.options = AWSLambdaOptions()
self.report_headers = None
Expand Down
95 changes: 95 additions & 0 deletions instana/agent/google_cloud_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# (c) Copyright IBM Corp. 2021
# (c) Copyright Instana Inc. 2021

"""
The Instana agent (for GCR) that manages
monitoring state and reporting that data.
"""
import time
from instana.options import GCROptions
from instana.collector.google_cloud_run import GCRCollector
from instana.log import logger
from instana.util import to_json
from instana.agent.base import BaseAgent
from instana.version import VERSION


class GCRAgent(BaseAgent):
""" In-process agent for Google Cloud Run """

def __init__(self, service, configuration, revision):
super(GCRAgent, self).__init__()

self.options = GCROptions()
self.collector = None
self.report_headers = None
self._can_send = False

# Update log level (if INSTANA_LOG_LEVEL was set)
self.update_log_level()

logger.info("Stan is on the AWS Fargate scene. Starting Instana instrumentation version: %s", VERSION)

if self._validate_options():
self._can_send = True
self.collector = GCRCollector(self, service, configuration, revision)
self.collector.start()
else:
logger.warning("Required INSTANA_AGENT_KEY and/or INSTANA_ENDPOINT_URL environment variables not set. "
"We will not be able monitor this GCR cluster.")

def can_send(self):
"""
Are we in a state where we can send data?
@return: Boolean
"""
return self._can_send

def get_from_structure(self):
"""
Retrieves the From data that is reported alongside monitoring data.
@return: dict()
"""
return {'hl': True, 'cp': 'gcp', 'e': self.collector.get_instance_id()}

def report_data_payload(self, payload):
"""
Used to report metrics and span data to the endpoint URL in self.options.endpoint_url
"""
response = None
try:
if self.report_headers is None:
# Prepare request headers
self.report_headers = {
"Content-Type": "application/json",
"X-Instana-Host": "gcp:cloud-run:revision:{revision}".format(
revision=self.collector.revision),
"X-Instana-Key": self.options.agent_key
}

self.report_headers["X-Instana-Time"] = str(round(time.time() * 1000))

response = self.client.post(self.__data_bundle_url(),
data=to_json(payload),
headers=self.report_headers,
timeout=self.options.timeout,
verify=self.options.ssl_verify,
proxies=self.options.endpoint_proxy)

if response.status_code >= 400:
logger.info("report_data_payload: Instana responded with status code %s", response.status_code)
except Exception as exc:
logger.debug("report_data_payload: connection error (%s)", type(exc))
return response

def _validate_options(self):
"""
Validate that the options used by this Agent are valid. e.g. can we report data?
"""
return self.options.endpoint_url is not None and self.options.agent_key is not None

def __data_bundle_url(self):
"""
URL for posting metrics to the host agent. Only valid when announced.
"""
return "{endpoint_url}/bundle".format(endpoint_url=self.options.endpoint_url)
16 changes: 8 additions & 8 deletions instana/collector/aws_fargate.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from ..util import DictionaryOfStan, validate_url
from ..singletons import env_is_test

from .helpers.process import ProcessHelper
from .helpers.fargate.process import FargateProcessHelper
from .helpers.runtime import RuntimeHelper
from .helpers.fargate.task import TaskHelper
from .helpers.fargate.docker import DockerHelper
Expand All @@ -23,6 +23,7 @@

class AWSFargateCollector(BaseCollector):
""" Collector for AWS Fargate """

def __init__(self, agent):
super(AWSFargateCollector, self).__init__(agent)
logger.debug("Loading AWS Fargate Collector")
Expand Down Expand Up @@ -77,7 +78,7 @@ def __init__(self, agent):
# Populate the collection helpers
self.helpers.append(TaskHelper(self))
self.helpers.append(DockerHelper(self))
self.helpers.append(ProcessHelper(self))
self.helpers.append(FargateProcessHelper(self))
self.helpers.append(RuntimeHelper(self))
self.helpers.append(ContainerHelper(self))

Expand All @@ -98,7 +99,8 @@ def get_ecs_metadata(self):
return

try:
delta = int(time()) - self.last_ecmu_full_fetch
self.fetching_start_time = int(time())
delta = self.fetching_start_time - self.last_ecmu_full_fetch
if delta > self.ecmu_full_fetch_interval:
# Refetch the ECMU snapshot data
self.last_ecmu_full_fetch = int(time())
Expand Down Expand Up @@ -126,10 +128,7 @@ def get_ecs_metadata(self):
logger.debug("AWSFargateCollector.get_ecs_metadata", exc_info=True)

def should_send_snapshot_data(self):
delta = int(time()) - self.snapshot_data_last_sent
if delta > self.snapshot_data_interval:
return True
return False
return int(time()) - self.snapshot_data_last_sent > self.snapshot_data_interval

def prepare_payload(self):
payload = DictionaryOfStan()
Expand All @@ -147,7 +146,7 @@ def prepare_payload(self):

plugins = []
for helper in self.helpers:
plugins.extend(helper.collect_metrics(with_snapshot))
plugins.extend(helper.collect_metrics(with_snapshot=with_snapshot))

payload["metrics"]["plugins"] = plugins

Expand All @@ -162,6 +161,7 @@ def get_fq_arn(self):
if self._fq_arn is not None:
return self._fq_arn

task_arn = ""
if self.root_metadata is not None:
labels = self.root_metadata.get("Labels", None)
if labels is not None:
Expand Down
3 changes: 3 additions & 0 deletions instana/collector/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ def __init__(self, agent):
# Flag to indicate if start/shutdown state
self.started = False

# Startime of fetching metadata
self.fetching_start_time = 0

def is_reporting_thread_running(self):
"""
Indicates if there is a thread running with the name self.THREAD_NAME
Expand Down
140 changes: 140 additions & 0 deletions instana/collector/google_cloud_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# (c) Copyright IBM Corp. 2021
# (c) Copyright Instana Inc. 2021

"""
Google Cloud Run Collector: Manages the periodic collection of metrics & snapshot data
"""
import os
from time import time
import requests

from instana.log import logger
from instana.collector.base import BaseCollector
from instana.util import DictionaryOfStan, validate_url
from instana.collector.helpers.google_cloud_run.process import GCRProcessHelper
from instana.collector.helpers.google_cloud_run.instance_entity import InstanceEntityHelper


class GCRCollector(BaseCollector):
""" Collector for Google Cloud Run """

def __init__(self, agent, service, configuration, revision):
super(GCRCollector, self).__init__(agent)
logger.debug("Loading Google Cloud Run Collector")

# Indicates if this Collector has all requirements to run successfully
self.ready_to_start = True

self.revision = revision
self.service = service
self.configuration = configuration
# Prepare the URLS that we will collect data from
self._gcr_md_uri = os.environ.get("GOOGLE_CLOUD_RUN_METADATA_ENDPOINT", "http://metadata.google.internal")

if self._gcr_md_uri == "" or validate_url(self._gcr_md_uri) is False:
logger.warning("GCRCollector: GOOGLE_CLOUD_RUN_METADATA_ENDPOINT not in environment or invalid URL. "
"Instana will not be able to monitor this environment")
self.ready_to_start = False

self._gcr_md_project_uri = self._gcr_md_uri + '/computeMetadata/v1/project/?recursive=true'
self._gcr_md_instance_uri = self._gcr_md_uri + '/computeMetadata/v1/instance/?recursive=true'

# Timestamp in seconds of the last time we fetched all GCR metadata
self.__last_gcr_md_full_fetch = 0

# How often to do a full fetch of GCR metadata
self.__gcr_md_full_fetch_interval = 300

# HTTP client with keep-alive
self._http_client = requests.Session()

# The fully qualified ARN for this process
self._gcp_arn = None

# Response from the last call to
# Instance URI
self.instance_metadata = None

# Response from the last call to
# Project URI
self.project_metadata = None

# Populate the collection helpers
self.helpers.append(GCRProcessHelper(self))
self.helpers.append(InstanceEntityHelper(self))

def start(self):
if self.ready_to_start is False:
logger.warning("Google Cloud Run Collector is missing requirements and cannot monitor this environment.")
return

super(GCRCollector, self).start()

def __get_project_instance_metadata(self):
"""
Get the latest data from the service revision instance entity metadata and store in the class
@return: Boolean
"""
try:
# Refetch the GCR snapshot data
self.__last_gcr_md_full_fetch = int(time())
headers = {"Metadata-Flavor": "Google"}
# Response from the last call to
# ${GOOGLE_CLOUD_RUN_METADATA_ENDPOINT}/computeMetadata/v1/project/?recursive=true
self.project_metadata = self._http_client.get(self._gcr_md_project_uri, timeout=1,
headers=headers).json()

# Response from the last call to
# ${GOOGLE_CLOUD_RUN_METADATA_ENDPOINT}/computeMetadata/v1/instance/?recursive=true
self.instance_metadata = self._http_client.get(self._gcr_md_instance_uri, timeout=1,
headers=headers).json()
except Exception:
logger.debug("GoogleCloudRunCollector.get_project_instance_metadata", exc_info=True)

def should_send_snapshot_data(self):
return int(time()) - self.snapshot_data_last_sent > self.snapshot_data_interval

def prepare_payload(self):
payload = DictionaryOfStan()
payload["spans"] = []
payload["metrics"]["plugins"] = []

try:

if not self.span_queue.empty():
payload["spans"] = self.queued_spans()

self.fetching_start_time = int(time())
delta = self.fetching_start_time - self.__last_gcr_md_full_fetch
if delta < self.__gcr_md_full_fetch_interval:
return payload

with_snapshot = self.should_send_snapshot_data()

# Fetch the latest metrics
self.__get_project_instance_metadata()
if self.instance_metadata is None and self.project_metadata is None:
return payload

plugins = []
for helper in self.helpers:
plugins.extend(
helper.collect_metrics(with_snapshot=with_snapshot, instance_metadata=self.instance_metadata,
project_metadata=self.project_metadata))

payload["metrics"]["plugins"] = plugins

if with_snapshot:
self.snapshot_data_last_sent = int(time())
except Exception:
logger.debug("collect_snapshot error", exc_info=True)

return payload

def get_instance_id(self):
try:
if self.instance_metadata:
return self.instance_metadata.get("id")
except Exception:
logger.debug("get_instance_id error", exc_info=True)
return None
2 changes: 1 addition & 1 deletion instana/collector/helpers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,5 @@ def apply_delta(self, source, previous, new, metric, with_snapshot):
if previous_value != new_value or with_snapshot is True:
previous[dst_metric] = new[dst_metric] = new_value

def collect_metrics(self, with_snapshot=False):
def collect_metrics(self, **kwargs):
logger.debug("BaseHelper.collect_metrics must be overridden")
4 changes: 2 additions & 2 deletions instana/collector/helpers/fargate/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class ContainerHelper(BaseHelper):
""" This class acts as a helper to collect container snapshot and metric information """
def collect_metrics(self, with_snapshot=False):
def collect_metrics(self, **kwargs):
"""
Collect and return metrics (and optionally snapshot data) for every container in this task
@return: list - with one or more plugin entities
Expand All @@ -34,7 +34,7 @@ def collect_metrics(self, with_snapshot=False):
plugin_data["data"]["dockerId"] = container.get("DockerId", None)
plugin_data["data"]["taskArn"] = labels.get("com.amazonaws.ecs.task-arn", None)

if with_snapshot is True:
if kwargs.get("with_snapshot"):
plugin_data["data"]["runtime"] = "python"
plugin_data["data"]["dockerName"] = container.get("DockerName", None)
plugin_data["data"]["containerName"] = container.get("Name", None)
Expand Down
Loading

0 comments on commit f3d3ab8

Please sign in to comment.