Current Path: > > opt > cloudlinux > venv > lib64 > python3.11 > site-packages > alembic > util
Operation : Linux premium131.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64 Software : Apache Server IP : 162.0.232.56 | Your IP: 216.73.216.111 Domains : 1034 Domain(s) Permission : [ 0755 ]
Name | Type | Size | Last Modified | Actions |
---|---|---|---|---|
__pycache__ | Directory | - | - | |
__init__.py | File | 1202 bytes | April 17 2025 13:10:59. | |
compat.py | File | 1911 bytes | April 17 2025 13:10:59. | |
editor.py | File | 2546 bytes | April 17 2025 13:10:59. | |
exc.py | File | 98 bytes | April 17 2025 13:10:59. | |
langhelpers.py | File | 8592 bytes | April 17 2025 13:10:59. | |
messaging.py | File | 3042 bytes | April 17 2025 13:10:59. | |
pyfiles.py | File | 3374 bytes | April 17 2025 13:10:59. | |
sqla_compat.py | File | 17712 bytes | April 17 2025 13:10:59. |
from __future__ import annotations from collections.abc import Iterable from contextlib import contextmanager import logging import sys import textwrap from typing import Optional from typing import TextIO from typing import Union import warnings from sqlalchemy.engine import url from . import sqla_compat log = logging.getLogger(__name__) # disable "no handler found" errors logging.getLogger("alembic").addHandler(logging.NullHandler()) try: import fcntl import termios import struct ioctl = fcntl.ioctl(0, termios.TIOCGWINSZ, struct.pack("HHHH", 0, 0, 0, 0)) _h, TERMWIDTH, _hp, _wp = struct.unpack("HHHH", ioctl) if TERMWIDTH <= 0: # can occur if running in emacs pseudo-tty TERMWIDTH = None except (ImportError, OSError): TERMWIDTH = None def write_outstream( stream: TextIO, *text: Union[str, bytes], quiet: bool = False ) -> None: if quiet: return encoding = getattr(stream, "encoding", "ascii") or "ascii" for t in text: if not isinstance(t, bytes): t = t.encode(encoding, "replace") t = t.decode(encoding) try: stream.write(t) except OSError: # suppress "broken pipe" errors. # no known way to handle this on Python 3 however # as the exception is "ignored" (noisily) in TextIOWrapper. break @contextmanager def status(status_msg: str, newline: bool = False, quiet: bool = False): msg(status_msg + " ...", newline, flush=True, quiet=quiet) try: yield except: if not quiet: write_outstream(sys.stdout, " FAILED\n") raise else: if not quiet: write_outstream(sys.stdout, " done\n") def err(message: str, quiet: bool = False): log.error(message) msg(f"FAILED: {message}", quiet=quiet) sys.exit(-1) def obfuscate_url_pw(input_url: str) -> str: u = url.make_url(input_url) return sqla_compat.url_render_as_string(u, hide_password=True) def warn(msg: str, stacklevel: int = 2) -> None: warnings.warn(msg, UserWarning, stacklevel=stacklevel) def msg( msg: str, newline: bool = True, flush: bool = False, quiet: bool = False ) -> None: if quiet: return if TERMWIDTH is None: write_outstream(sys.stdout, msg) if newline: write_outstream(sys.stdout, "\n") else: # left indent output lines lines = textwrap.wrap(msg, TERMWIDTH) if len(lines) > 1: for line in lines[0:-1]: write_outstream(sys.stdout, " ", line, "\n") write_outstream(sys.stdout, " ", lines[-1], ("\n" if newline else "")) if flush: sys.stdout.flush() def format_as_comma(value: Optional[Union[str, Iterable[str]]]) -> str: if value is None: return "" elif isinstance(value, str): return value elif isinstance(value, Iterable): return ", ".join(value) else: raise ValueError("Don't know how to comma-format %r" % value)
SILENT KILLER Tool