Current Path: > > opt > hc_python > > > lib > python3.12 > site-packages > prometheus_client > bridge
Operation : Linux premium131.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64 Software : Apache Server IP : 162.0.232.56 | Your IP: 216.73.216.111 Domains : 1034 Domain(s) Permission : [ 0755 ]
Name | Type | Size | Last Modified | Actions |
---|---|---|---|---|
__pycache__ | Directory | - | - | |
__init__.py | File | 0 bytes | April 04 2025 08:02:24. | |
graphite.py | File | 2897 bytes | April 04 2025 08:02:24. |
#!/usr/bin/env python import logging import re import socket import threading import time from timeit import default_timer from typing import Callable, Tuple from ..registry import CollectorRegistry, REGISTRY # Roughly, have to keep to what works as a file name. # We also remove periods, so labels can be distinguished. _INVALID_GRAPHITE_CHARS = re.compile(r"[^a-zA-Z0-9_-]") def _sanitize(s): return _INVALID_GRAPHITE_CHARS.sub('_', s) class _RegularPush(threading.Thread): def __init__(self, pusher, interval, prefix): super().__init__() self._pusher = pusher self._interval = interval self._prefix = prefix def run(self): wait_until = default_timer() while True: while True: now = default_timer() if now >= wait_until: # May need to skip some pushes. while wait_until < now: wait_until += self._interval break # time.sleep can return early. time.sleep(wait_until - now) try: self._pusher.push(prefix=self._prefix) except OSError: logging.exception("Push failed") class GraphiteBridge: def __init__(self, address: Tuple[str, int], registry: CollectorRegistry = REGISTRY, timeout_seconds: float = 30, _timer: Callable[[], float] = time.time, tags: bool = False, ): self._address = address self._registry = registry self._tags = tags self._timeout = timeout_seconds self._timer = _timer def push(self, prefix: str = '') -> None: now = int(self._timer()) output = [] prefixstr = '' if prefix: prefixstr = prefix + '.' for metric in self._registry.collect(): for s in metric.samples: if s.labels: if self._tags: sep = ';' fmt = '{0}={1}' else: sep = '.' fmt = '{0}.{1}' labelstr = sep + sep.join( [fmt.format( _sanitize(k), _sanitize(v)) for k, v in sorted(s.labels.items())]) else: labelstr = '' output.append(f'{prefixstr}{_sanitize(s.name)}{labelstr} {float(s.value)} {now}\n') conn = socket.create_connection(self._address, self._timeout) conn.sendall(''.join(output).encode('ascii')) conn.close() def start(self, interval: float = 60.0, prefix: str = '') -> None: t = _RegularPush(self, interval, prefix) t.daemon = True t.start()
SILENT KILLER Tool