Skip to content

Commit

Permalink
four version
Browse files Browse the repository at this point in the history
  • Loading branch information
aamalev committed Dec 28, 2019
1 parent 8bebe6b commit 91915e0
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 146 deletions.
26 changes: 5 additions & 21 deletions aioworkers_prometheus/metric.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
import os
from multiprocessing import Queue
from typing import Dict

from aioworkers.core.base import LoggingEntity
from prometheus_client import metrics

# true
from . import MULTIPROC_DIR, metrics
from .metrics import QueueMetric
from .registry import REGISTRY, Receiver, get_registry
from .registry import REGISTRY, get_registry


class Metric(LoggingEntity):
Expand All @@ -20,28 +15,17 @@ class Metric(LoggingEntity):
summary=metrics.Summary,
)

def __init__(self, *args, **kwargs):
self._metrics: Dict = {}
self._queue = Queue()
self._pid = os.getpid()
Receiver(self._queue, self._metrics).start()
super().__init__(*args, **kwargs)

def set_config(self, config):
cfg = config.new_parent(logger='aioworkers_prometheus')
super().set_config(cfg)
registry = self.config.get('registry', REGISTRY)
namespace = self.config.get('namespace')
metrics = self.config.get('metrics', {})
for mid, (attr, params) in enumerate(metrics.items()):
kw = dict(params, queue=self._queue, mid=mid)
for attr, params in metrics.items():
kw = dict(params)
kw.setdefault('name', attr)
cls = self.METRICS[kw.pop('type', 'gauge')]
kw['registry'] = get_registry(kw.get('registry', registry))
kw.setdefault('namespace', namespace)
metric = cls(**kw)
self._metrics[mid] = metric
setattr(self, attr, metric)

async def init(self):
QueueMetric.fork = not MULTIPROC_DIR and self._pid != os.getpid()
await super().init()
92 changes: 0 additions & 92 deletions aioworkers_prometheus/metrics.py

This file was deleted.

25 changes: 0 additions & 25 deletions aioworkers_prometheus/registry.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import logging
from functools import lru_cache
from multiprocessing import Queue
from threading import Thread
from typing import Dict

from prometheus_client import registry

Expand All @@ -17,25 +14,3 @@ def get_registry(name: str = REGISTRY) -> registry.CollectorRegistry:
return registry.REGISTRY
else:
return registry.CollectorRegistry(auto_describe=True)


class Receiver:
def __init__(self, queue: Queue, metrics: Dict):
self._queue = queue
self._metrics = metrics

def process_msg(self, mid, *args):
metric = self._metrics[mid]
metric.from_msg(*args)

def process(self):
while True:
msg = self._queue.get()
try:
self.process_msg(*msg)
except Exception:
logger.exception('Metric message error')

def start(self):
tr = Thread(target=self.process, daemon=True)
tr.start()
8 changes: 4 additions & 4 deletions aioworkers_prometheus/service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from aioworkers.core.base import AbstractEntity
from aioworkers.core.base import ExecutorEntity
from prometheus_client.bridge.graphite import GraphiteBridge
from prometheus_client.exposition import generate_latest, start_http_server
from prometheus_client.multiprocess import MultiProcessCollector
Expand All @@ -8,7 +8,7 @@
from .registry import REGISTRY, get_registry


class Service(AbstractEntity):
class Service(ExecutorEntity):
_registry = None

def set_config(self, config):
Expand All @@ -33,5 +33,5 @@ def set_config(self, config):
prefix = graphite.get('prefix', '')
gb.start(interval, prefix=prefix)

def generate_latest(self):
return generate_latest(self._registry)
async def generate_latest(self):
return await self.run_in_executor(generate_latest, self._registry)
5 changes: 1 addition & 4 deletions tests/test_metric.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import time

import pytest


Expand Down Expand Up @@ -41,8 +39,7 @@ async def test_counter(context):
assert 45 == context.metric.counter._value.get()

c = context.metric.counter
c.send_msg(0, 4)
time.sleep(2)
c.inc(4)
assert 49 == c._value.get()

g = context.metric.gauge
Expand Down

0 comments on commit 91915e0

Please sign in to comment.