SILENT KILLERPanel

Current Path: > > opt > hc_python > lib64 > python3.12 > site-packages > > > dns


Operation   : Linux premium131.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64
Software     : Apache
Server IP    : 162.0.232.56 | Your IP: 216.73.216.111
Domains      : 1034 Domain(s)
Permission   : [ 0755 ]

Files and Folders in: //opt/hc_python/lib64/python3.12/site-packages///dns

NameTypeSizeLast ModifiedActions
__pycache__ Directory - -
dnssecalgs Directory - -
quic Directory - -
rdtypes Directory - -
__init__.py File 1663 bytes April 04 2025 08:02:26.
_asyncbackend.py File 2396 bytes April 04 2025 08:02:26.
_asyncio_backend.py File 9051 bytes April 04 2025 08:02:26.
_ddr.py File 5247 bytes April 04 2025 08:02:26.
_features.py File 2492 bytes April 04 2025 08:02:26.
_immutable_ctx.py File 2459 bytes April 04 2025 08:02:26.
_trio_backend.py File 8473 bytes April 04 2025 08:02:26.
asyncbackend.py File 2796 bytes April 04 2025 08:02:26.
asyncquery.py File 30821 bytes April 04 2025 08:02:26.
asyncresolver.py File 17852 bytes April 04 2025 08:02:26.
dnssec.py File 41717 bytes April 04 2025 08:02:26.
dnssectypes.py File 1799 bytes April 04 2025 08:02:26.
e164.py File 3978 bytes April 04 2025 08:02:26.
edns.py File 17089 bytes April 04 2025 08:02:26.
entropy.py File 4242 bytes April 04 2025 08:02:26.
enum.py File 3691 bytes April 04 2025 08:02:26.
exception.py File 5953 bytes April 04 2025 08:02:26.
flags.py File 2750 bytes April 04 2025 08:02:26.
grange.py File 2144 bytes April 04 2025 08:02:26.
immutable.py File 2017 bytes April 04 2025 08:02:26.
inet.py File 5772 bytes April 04 2025 08:02:26.
ipv4.py File 2552 bytes April 04 2025 08:02:26.
ipv6.py File 6554 bytes April 04 2025 08:02:26.
message.py File 68185 bytes April 04 2025 08:02:26.
name.py File 42778 bytes April 04 2025 08:02:26.
namedict.py File 4000 bytes April 04 2025 08:02:26.
nameserver.py File 10115 bytes April 04 2025 08:02:26.
node.py File 12663 bytes April 04 2025 08:02:26.
opcode.py File 2730 bytes April 04 2025 08:02:26.
py.typed File 0 bytes April 04 2025 08:02:26.
query.py File 56298 bytes April 04 2025 08:02:26.
rcode.py File 4156 bytes April 04 2025 08:02:26.
rdata.py File 31022 bytes April 04 2025 08:02:26.
rdataclass.py File 2984 bytes April 04 2025 08:02:26.
rdataset.py File 16664 bytes April 04 2025 08:02:26.
rdatatype.py File 7448 bytes April 04 2025 08:02:26.
renderer.py File 11254 bytes April 04 2025 08:02:26.
resolver.py File 73730 bytes April 04 2025 08:02:26.
reversename.py File 3828 bytes April 04 2025 08:02:26.
rrset.py File 9170 bytes April 04 2025 08:02:26.
serial.py File 3606 bytes April 04 2025 08:02:26.
set.py File 9213 bytes April 04 2025 08:02:26.
tokenizer.py File 23583 bytes April 04 2025 08:02:26.
transaction.py File 22589 bytes April 04 2025 08:02:26.
tsig.py File 11413 bytes April 04 2025 08:02:26.
tsigkeyring.py File 2633 bytes April 04 2025 08:02:26.
ttl.py File 2977 bytes April 04 2025 08:02:26.
update.py File 12243 bytes April 04 2025 08:02:26.
version.py File 1926 bytes April 04 2025 08:02:26.
versioned.py File 11765 bytes April 04 2025 08:02:26.
win32util.py File 8874 bytes April 04 2025 08:02:26.
wire.py File 2830 bytes April 04 2025 08:02:26.
xfr.py File 13271 bytes April 04 2025 08:02:26.
zone.py File 52086 bytes April 04 2025 08:02:26.
zonefile.py File 27926 bytes April 04 2025 08:02:26.
zonetypes.py File 690 bytes April 04 2025 08:02:26.

Reading File: //opt/hc_python/lib64/python3.12/site-packages///dns/versioned.py

# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license

"""DNS Versioned Zones."""

import collections
import threading
from typing import Callable, Deque, Optional, Set, Union

import dns.exception
import dns.immutable
import dns.name
import dns.node
import dns.rdataclass
import dns.rdataset
import dns.rdatatype
import dns.rdtypes.ANY.SOA
import dns.zone


class UseTransaction(dns.exception.DNSException):
    """To alter a versioned zone, use a transaction."""


# Backwards compatibility
Node = dns.zone.VersionedNode
ImmutableNode = dns.zone.ImmutableVersionedNode
Version = dns.zone.Version
WritableVersion = dns.zone.WritableVersion
ImmutableVersion = dns.zone.ImmutableVersion
Transaction = dns.zone.Transaction


class Zone(dns.zone.Zone):  # lgtm[py/missing-equals]
    __slots__ = [
        "_versions",
        "_versions_lock",
        "_write_txn",
        "_write_waiters",
        "_write_event",
        "_pruning_policy",
        "_readers",
    ]

    node_factory = Node

    def __init__(
        self,
        origin: Optional[Union[dns.name.Name, str]],
        rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
        relativize: bool = True,
        pruning_policy: Optional[Callable[["Zone", Version], Optional[bool]]] = None,
    ):
        """Initialize a versioned zone object.

        *origin* is the origin of the zone.  It may be a ``dns.name.Name``,
        a ``str``, or ``None``.  If ``None``, then the zone's origin will
        be set by the first ``$ORIGIN`` line in a zone file.

        *rdclass*, an ``int``, the zone's rdata class; the default is class IN.

        *relativize*, a ``bool``, determine's whether domain names are
        relativized to the zone's origin.  The default is ``True``.

        *pruning policy*, a function taking a ``Zone`` and a ``Version`` and returning
        a ``bool``, or ``None``.  Should the version be pruned?  If ``None``,
        the default policy, which retains one version is used.
        """
        super().__init__(origin, rdclass, relativize)
        self._versions: Deque[Version] = collections.deque()
        self._version_lock = threading.Lock()
        if pruning_policy is None:
            self._pruning_policy = self._default_pruning_policy
        else:
            self._pruning_policy = pruning_policy
        self._write_txn: Optional[Transaction] = None
        self._write_event: Optional[threading.Event] = None
        self._write_waiters: Deque[threading.Event] = collections.deque()
        self._readers: Set[Transaction] = set()
        self._commit_version_unlocked(
            None, WritableVersion(self, replacement=True), origin
        )

    def reader(
        self, id: Optional[int] = None, serial: Optional[int] = None
    ) -> Transaction:  # pylint: disable=arguments-differ
        if id is not None and serial is not None:
            raise ValueError("cannot specify both id and serial")
        with self._version_lock:
            if id is not None:
                version = None
                for v in reversed(self._versions):
                    if v.id == id:
                        version = v
                        break
                if version is None:
                    raise KeyError("version not found")
            elif serial is not None:
                if self.relativize:
                    oname = dns.name.empty
                else:
                    assert self.origin is not None
                    oname = self.origin
                version = None
                for v in reversed(self._versions):
                    n = v.nodes.get(oname)
                    if n:
                        rds = n.get_rdataset(self.rdclass, dns.rdatatype.SOA)
                        if rds and rds[0].serial == serial:
                            version = v
                            break
                if version is None:
                    raise KeyError("serial not found")
            else:
                version = self._versions[-1]
            txn = Transaction(self, False, version)
            self._readers.add(txn)
            return txn

    def writer(self, replacement: bool = False) -> Transaction:
        event = None
        while True:
            with self._version_lock:
                # Checking event == self._write_event ensures that either
                # no one was waiting before we got lucky and found no write
                # txn, or we were the one who was waiting and got woken up.
                # This prevents "taking cuts" when creating a write txn.
                if self._write_txn is None and event == self._write_event:
                    # Creating the transaction defers version setup
                    # (i.e.  copying the nodes dictionary) until we
                    # give up the lock, so that we hold the lock as
                    # short a time as possible.  This is why we call
                    # _setup_version() below.
                    self._write_txn = Transaction(
                        self, replacement, make_immutable=True
                    )
                    # give up our exclusive right to make a Transaction
                    self._write_event = None
                    break
                # Someone else is writing already, so we will have to
                # wait, but we want to do the actual wait outside the
                # lock.
                event = threading.Event()
                self._write_waiters.append(event)
            # wait (note we gave up the lock!)
            #
            # We only wake one sleeper at a time, so it's important
            # that no event waiter can exit this method (e.g. via
            # cancellation) without returning a transaction or waking
            # someone else up.
            #
            # This is not a problem with Threading module threads as
            # they cannot be canceled, but could be an issue with trio
            # tasks when we do the async version of writer().
            # I.e. we'd need to do something like:
            #
            # try:
            #     event.wait()
            # except trio.Cancelled:
            #     with self._version_lock:
            #         self._maybe_wakeup_one_waiter_unlocked()
            #     raise
            #
            event.wait()
        # Do the deferred version setup.
        self._write_txn._setup_version()
        return self._write_txn

    def _maybe_wakeup_one_waiter_unlocked(self):
        if len(self._write_waiters) > 0:
            self._write_event = self._write_waiters.popleft()
            self._write_event.set()

    # pylint: disable=unused-argument
    def _default_pruning_policy(self, zone, version):
        return True

    # pylint: enable=unused-argument

    def _prune_versions_unlocked(self):
        assert len(self._versions) > 0
        # Don't ever prune a version greater than or equal to one that
        # a reader has open.  This pins versions in memory while the
        # reader is open, and importantly lets the reader open a txn on
        # a successor version (e.g. if generating an IXFR).
        #
        # Note our definition of least_kept also ensures we do not try to
        # delete the greatest version.
        if len(self._readers) > 0:
            least_kept = min(txn.version.id for txn in self._readers)
        else:
            least_kept = self._versions[-1].id
        while self._versions[0].id < least_kept and self._pruning_policy(
            self, self._versions[0]
        ):
            self._versions.popleft()

    def set_max_versions(self, max_versions: Optional[int]) -> None:
        """Set a pruning policy that retains up to the specified number
        of versions
        """
        if max_versions is not None and max_versions < 1:
            raise ValueError("max versions must be at least 1")
        if max_versions is None:

            def policy(zone, _):  # pylint: disable=unused-argument
                return False

        else:

            def policy(zone, _):
                return len(zone._versions) > max_versions

        self.set_pruning_policy(policy)

    def set_pruning_policy(
        self, policy: Optional[Callable[["Zone", Version], Optional[bool]]]
    ) -> None:
        """Set the pruning policy for the zone.

        The *policy* function takes a `Version` and returns `True` if
        the version should be pruned, and `False` otherwise.  `None`
        may also be specified for policy, in which case the default policy
        is used.

        Pruning checking proceeds from the least version and the first
        time the function returns `False`, the checking stops.  I.e. the
        retained versions are always a consecutive sequence.
        """
        if policy is None:
            policy = self._default_pruning_policy
        with self._version_lock:
            self._pruning_policy = policy
            self._prune_versions_unlocked()

    def _end_read(self, txn):
        with self._version_lock:
            self._readers.remove(txn)
            self._prune_versions_unlocked()

    def _end_write_unlocked(self, txn):
        assert self._write_txn == txn
        self._write_txn = None
        self._maybe_wakeup_one_waiter_unlocked()

    def _end_write(self, txn):
        with self._version_lock:
            self._end_write_unlocked(txn)

    def _commit_version_unlocked(self, txn, version, origin):
        self._versions.append(version)
        self._prune_versions_unlocked()
        self.nodes = version.nodes
        if self.origin is None:
            self.origin = origin
        # txn can be None in __init__ when we make the empty version.
        if txn is not None:
            self._end_write_unlocked(txn)

    def _commit_version(self, txn, version, origin):
        with self._version_lock:
            self._commit_version_unlocked(txn, version, origin)

    def _get_next_version_id(self):
        if len(self._versions) > 0:
            id = self._versions[-1].id + 1
        else:
            id = 1
        return id

    def find_node(
        self, name: Union[dns.name.Name, str], create: bool = False
    ) -> dns.node.Node:
        if create:
            raise UseTransaction
        return super().find_node(name)

    def delete_node(self, name: Union[dns.name.Name, str]) -> None:
        raise UseTransaction

    def find_rdataset(
        self,
        name: Union[dns.name.Name, str],
        rdtype: Union[dns.rdatatype.RdataType, str],
        covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
        create: bool = False,
    ) -> dns.rdataset.Rdataset:
        if create:
            raise UseTransaction
        rdataset = super().find_rdataset(name, rdtype, covers)
        return dns.rdataset.ImmutableRdataset(rdataset)

    def get_rdataset(
        self,
        name: Union[dns.name.Name, str],
        rdtype: Union[dns.rdatatype.RdataType, str],
        covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
        create: bool = False,
    ) -> Optional[dns.rdataset.Rdataset]:
        if create:
            raise UseTransaction
        rdataset = super().get_rdataset(name, rdtype, covers)
        if rdataset is not None:
            return dns.rdataset.ImmutableRdataset(rdataset)
        else:
            return None

    def delete_rdataset(
        self,
        name: Union[dns.name.Name, str],
        rdtype: Union[dns.rdatatype.RdataType, str],
        covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
    ) -> None:
        raise UseTransaction

    def replace_rdataset(
        self, name: Union[dns.name.Name, str], replacement: dns.rdataset.Rdataset
    ) -> None:
        raise UseTransaction

SILENT KILLER Tool