Current Path: > > opt > cloudlinux > venv > lib64 > python3.11 > > site-packages > lvestats > lib > bursting
Operation : Linux premium131.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64 Software : Apache Server IP : 162.0.232.56 | Your IP: 216.73.216.111 Domains : 1034 Domain(s) Permission : [ 0755 ]
Name | Type | Size | Last Modified | Actions |
---|---|---|---|---|
__pycache__ | Directory | - | - | |
__init__.py | File | 197 bytes | May 30 2025 10:30:46. | |
history.py | File | 3830 bytes | May 30 2025 10:30:46. | |
info.py | File | 6962 bytes | May 30 2025 10:30:46. |
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import datetime import sqlalchemy as sa from lvestats.orm import bursting_events_table class TableDoesNotExistError(Exception): def __init__(self, table_name): self.message = f'Table "{table_name}" does not exist in the database' super().__init__(self.message) class HistoryShowBursting: def __init__(self, dbengine: sa.engine.base.Engine, period_from: datetime.datetime, period_to: datetime.datetime, uid: int | None = None, server_id: str = 'localhost') -> None: self.dbengine = dbengine self.period_from = period_from self.period_to = period_to self.uid = uid self.server_id = server_id def get(self) -> list[sa.engine.RowProxy]: """ Get history from the 'bursting_events' table. Retrieving records within the required time frame, along with one record preceding this time frame to detect the bursting status at the start of the time frame. """ # Since bursting limits functionality isn't enabled by default, # we need to check if the table exists first inspector = sa.inspect(self.dbengine) if bursting_events_table.name not in inspector.get_table_names(): raise TableDoesNotExistError(bursting_events_table.name) ts_from = self.period_from.timestamp() ts_to = self.period_to.timestamp() # Get the rows with timestamp between ts_from and ts_to stmt1 = sa.select([ bursting_events_table.c.lve_id, bursting_events_table.c.timestamp, bursting_events_table.c.event_type, ]).where( sa.and_( bursting_events_table.c.server_id == self.server_id, bursting_events_table.c.timestamp >= ts_from, bursting_events_table.c.timestamp <= ts_to, # Add lve_id condition if it's specified (bursting_events_table.c.lve_id == self.uid) if self.uid is not None else True, ) ) # Subquery to get the maximum timestamp for each lve_id where timestamp < ts_from subquery = sa.select([ bursting_events_table.c.lve_id, sa.func.max(bursting_events_table.c.timestamp).label('max_timestamp'), ]).where( sa.and_( bursting_events_table.c.server_id == self.server_id, bursting_events_table.c.timestamp < ts_from, # Add lve_id condition if it's specified (bursting_events_table.c.lve_id == self.uid) if self.uid is not None else True, ) ).group_by( bursting_events_table.c.lve_id, ).alias('subquery') # Get the row with the maximum timestamp less than ts_from stmt2 = sa.select([ bursting_events_table.c.lve_id, bursting_events_table.c.timestamp, bursting_events_table.c.event_type, ]).select_from( bursting_events_table.join( subquery, sa.and_( bursting_events_table.c.lve_id == subquery.c.lve_id, bursting_events_table.c.timestamp == subquery.c.max_timestamp, ) ) ) stmt = sa.union( stmt1, stmt2, ) stmt = stmt.order_by( stmt.c.lve_id, stmt.c.timestamp, ) with self.dbengine.connect() as connection: result = connection.execute(stmt).fetchall() return result
SILENT KILLER Tool