Current Path: > > opt > > alt > python-internal > lib64 > python3.11 > asyncio >
Operation : Linux premium131.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64 Software : Apache Server IP : 162.0.232.56 | Your IP: 216.73.216.111 Domains : 1034 Domain(s) Permission : [ 0755 ]
Name | Type | Size | Last Modified | Actions |
---|---|---|---|---|
__pycache__ | Directory | - | - | |
__init__.py | File | 1188 bytes | June 03 2025 18:38:25. | |
__main__.py | File | 3379 bytes | June 03 2025 18:38:25. | |
base_events.py | File | 75384 bytes | June 03 2025 18:38:25. | |
base_futures.py | File | 2004 bytes | June 03 2025 18:38:25. | |
base_subprocess.py | File | 8869 bytes | June 03 2025 18:38:25. | |
base_tasks.py | File | 2644 bytes | June 03 2025 18:38:25. | |
constants.py | File | 1326 bytes | June 03 2025 18:38:25. | |
coroutines.py | File | 3400 bytes | June 03 2025 18:38:25. | |
events.py | File | 28641 bytes | June 03 2025 18:38:25. | |
exceptions.py | File | 1752 bytes | June 03 2025 18:38:25. | |
format_helpers.py | File | 2404 bytes | June 03 2025 18:38:25. | |
futures.py | File | 14212 bytes | June 03 2025 18:38:25. | |
locks.py | File | 19014 bytes | June 03 2025 18:38:25. | |
log.py | File | 124 bytes | June 03 2025 18:38:25. | |
mixins.py | File | 481 bytes | June 03 2025 18:38:25. | |
proactor_events.py | File | 33264 bytes | June 03 2025 18:38:25. | |
protocols.py | File | 6957 bytes | June 03 2025 18:38:25. | |
queues.py | File | 7974 bytes | June 03 2025 18:38:25. | |
runners.py | File | 6842 bytes | June 03 2025 18:38:25. | |
selector_events.py | File | 45400 bytes | June 03 2025 18:38:25. | |
sslproto.py | File | 31739 bytes | June 03 2025 18:38:25. | |
staggered.py | File | 5992 bytes | June 03 2025 18:38:25. | |
streams.py | File | 27503 bytes | June 03 2025 18:38:25. | |
subprocess.py | File | 7682 bytes | June 03 2025 18:38:25. | |
taskgroups.py | File | 8471 bytes | June 03 2025 18:38:25. | |
tasks.py | File | 34433 bytes | June 03 2025 18:38:25. | |
threads.py | File | 790 bytes | June 03 2025 18:38:25. | |
timeouts.py | File | 5321 bytes | June 03 2025 18:38:25. | |
transports.py | File | 10722 bytes | June 03 2025 18:38:25. | |
trsock.py | File | 2475 bytes | June 03 2025 18:38:25. | |
unix_events.py | File | 51915 bytes | June 03 2025 18:38:25. | |
windows_events.py | File | 34691 bytes | June 03 2025 18:38:25. | |
windows_utils.py | File | 5060 bytes | June 03 2025 18:38:25. |
import enum from types import TracebackType from typing import final, Optional, Type from . import events from . import exceptions from . import tasks __all__ = ( "Timeout", "timeout", "timeout_at", ) class _State(enum.Enum): CREATED = "created" ENTERED = "active" EXPIRING = "expiring" EXPIRED = "expired" EXITED = "finished" @final class Timeout: """Asynchronous context manager for cancelling overdue coroutines. Use `timeout()` or `timeout_at()` rather than instantiating this class directly. """ def __init__(self, when: Optional[float]) -> None: """Schedule a timeout that will trigger at a given loop time. - If `when` is `None`, the timeout will never trigger. - If `when < loop.time()`, the timeout will trigger on the next iteration of the event loop. """ self._state = _State.CREATED self._timeout_handler: Optional[events.TimerHandle] = None self._task: Optional[tasks.Task] = None self._when = when def when(self) -> Optional[float]: """Return the current deadline.""" return self._when def reschedule(self, when: Optional[float]) -> None: """Reschedule the timeout.""" if self._state is not _State.ENTERED: if self._state is _State.CREATED: raise RuntimeError("Timeout has not been entered") raise RuntimeError( f"Cannot change state of {self._state.value} Timeout", ) self._when = when if self._timeout_handler is not None: self._timeout_handler.cancel() if when is None: self._timeout_handler = None else: loop = events.get_running_loop() if when <= loop.time(): self._timeout_handler = loop.call_soon(self._on_timeout) else: self._timeout_handler = loop.call_at(when, self._on_timeout) def expired(self) -> bool: """Is timeout expired during execution?""" return self._state in (_State.EXPIRING, _State.EXPIRED) def __repr__(self) -> str: info = [''] if self._state is _State.ENTERED: when = round(self._when, 3) if self._when is not None else None info.append(f"when={when}") info_str = ' '.join(info) return f"<Timeout [{self._state.value}]{info_str}>" async def __aenter__(self) -> "Timeout": if self._state is not _State.CREATED: raise RuntimeError("Timeout has already been entered") task = tasks.current_task() if task is None: raise RuntimeError("Timeout should be used inside a task") self._state = _State.ENTERED self._task = task self._cancelling = self._task.cancelling() self.reschedule(self._when) return self async def __aexit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> Optional[bool]: assert self._state in (_State.ENTERED, _State.EXPIRING) if self._timeout_handler is not None: self._timeout_handler.cancel() self._timeout_handler = None if self._state is _State.EXPIRING: self._state = _State.EXPIRED if self._task.uncancel() <= self._cancelling and exc_type is exceptions.CancelledError: # Since there are no new cancel requests, we're # handling this. raise TimeoutError from exc_val elif self._state is _State.ENTERED: self._state = _State.EXITED return None def _on_timeout(self) -> None: assert self._state is _State.ENTERED self._task.cancel() self._state = _State.EXPIRING # drop the reference early self._timeout_handler = None def timeout(delay: Optional[float]) -> Timeout: """Timeout async context manager. Useful in cases when you want to apply timeout logic around block of code or in cases when asyncio.wait_for is not suitable. For example: >>> async with asyncio.timeout(10): # 10 seconds timeout ... await long_running_task() delay - value in seconds or None to disable timeout logic long_running_task() is interrupted by raising asyncio.CancelledError, the top-most affected timeout() context manager converts CancelledError into TimeoutError. """ loop = events.get_running_loop() return Timeout(loop.time() + delay if delay is not None else None) def timeout_at(when: Optional[float]) -> Timeout: """Schedule the timeout at absolute time. Like timeout() but argument gives absolute time in the same clock system as loop.time(). Please note: it is not POSIX time but a time with undefined starting base, e.g. the time of the system power on. >>> async with asyncio.timeout_at(loop.time() + 10): ... await long_running_task() when - a deadline when timeout occurs or None to disable timeout logic long_running_task() is interrupted by raising asyncio.CancelledError, the top-most affected timeout() context manager converts CancelledError into TimeoutError. """ return Timeout(when)
SILENT KILLER Tool