����JFIF��������� Mr.X
  
  __  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

deexcl@216.73.217.71: ~ $
# Copyright 2021-2022 The NATS Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import annotations

import asyncio
import datetime
import logging
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING, List, Optional

import nats.errors
import nats.js.errors
from nats.js import api

if TYPE_CHECKING:
    from nats.js import JetStreamContext

KV_OP = "KV-Operation"
KV_DEL = "DEL"
KV_PURGE = "PURGE"
MSG_ROLLUP_SUBJECT = "sub"

logger = logging.getLogger(__name__)

VALID_KEY_RE = re.compile(r"^[-/_=\.a-zA-Z0-9]+$")


def _is_key_valid(key: str) -> bool:
    if len(key) == 0 or key[0] == "." or key[-1] == ".":
        return False
    return bool(VALID_KEY_RE.match(key))


class StopIterSentinel:
    """A sentinel class used to indicate that iteration should stop."""

    pass


class KeyValue:
    """
    KeyValue uses the JetStream KeyValue functionality.

    ::

        import asyncio
        import nats

        async def main():
            nc = await nats.connect()
            js = nc.jetstream()

            # Create a KV
            kv = await js.create_key_value(bucket='MY_KV')

            # Set and retrieve a value
            await kv.put('hello', b'world')
            entry = await kv.get('hello')
            print(f'KeyValue.Entry: key={entry.key}, value={entry.value}')
            # KeyValue.Entry: key=hello, value=world

            await nc.close()

        if __name__ == '__main__':
            asyncio.run(main())

    """

    @dataclass
    class Entry:
        """
        An entry from a KeyValue store in JetStream.
        """

        bucket: str
        key: str
        value: Optional[bytes]
        revision: Optional[int]
        delta: Optional[int]
        created: Optional[int]
        operation: Optional[str]

    @dataclass(frozen=True)
    class BucketStatus:
        """
        BucketStatus is the status of a KeyValue bucket.
        """

        stream_info: api.StreamInfo
        bucket: str

        @property
        def values(self) -> int:
            """
            values returns the number of stored messages in the stream.
            """
            return self.stream_info.state.messages

        @property
        def history(self) -> int:
            """
            history returns the max msgs per subject.
            """
            return self.stream_info.config.max_msgs_per_subject

        @property
        def ttl(self) -> Optional[float]:
            """
            ttl returns the max age in seconds.
            """
            if self.stream_info.config.max_age is None:
                return None
            return self.stream_info.config.max_age

    def __init__(
        self,
        name: str,
        stream: str,
        pre: str,
        js: JetStreamContext,
        direct: bool,
    ) -> None:
        self._name = name
        self._stream = stream
        self._pre = pre
        self._js = js
        self._direct = direct

    async def get(self, key: str, revision: Optional[int] = None, validate_keys: bool = True) -> Entry:
        """
        get returns the latest value for the key.
        """
        if validate_keys and not _is_key_valid(key):
            raise nats.js.errors.InvalidKeyError

        entry = None
        try:
            entry = await self._get(key, revision)
        except nats.js.errors.KeyDeletedError as err:
            raise nats.js.errors.KeyNotFoundError(err.entry, err.op)
        return entry

    async def _get(self, key: str, revision: Optional[int] = None) -> Entry:
        msg = None
        subject = f"{self._pre}{key}"
        try:
            if revision:
                msg = await self._js.get_msg(
                    self._stream,
                    seq=revision,
                    direct=self._direct,
                )
            else:
                msg = await self._js.get_msg(
                    self._stream,
                    subject=subject,
                    seq=revision,
                    direct=self._direct,
                )
        except nats.js.errors.NotFoundError:
            raise nats.js.errors.KeyNotFoundError

        # Check whether the revision from the stream does not match the key.
        if subject != msg.subject:
            raise nats.js.errors.KeyNotFoundError(message=f"expected '{subject}', but got '{msg.subject}'")

        entry = KeyValue.Entry(
            bucket=self._name,
            key=key,
            value=msg.data,
            revision=msg.seq,
            delta=None,
            created=None,
            operation=None,
        )

        # Check headers to see if deleted or purged.
        if msg.headers:
            op = msg.headers.get(KV_OP, None)
            if op == KV_DEL or op == KV_PURGE:
                raise nats.js.errors.KeyDeletedError(entry, op)

        return entry

    async def put(self, key: str, value: bytes, validate_keys: bool = True) -> int:
        """
        put will place the new value for the key into the store
        and return the revision number.

        Note: This method does not support TTL. Use create() if you need TTL support.

        :param key: The key to put
        :param value: The value to store
        :param validate_keys: Whether to validate the key format
        """
        if validate_keys and not _is_key_valid(key):
            raise nats.js.errors.InvalidKeyError(key)

        pa = await self._js.publish(f"{self._pre}{key}", value)
        return pa.seq

    async def create(self, key: str, value: bytes, validate_keys: bool = True, msg_ttl: Optional[float] = None) -> int:
        """
        create will add the key/value pair iff it does not exist.

        :param key: The key to create
        :param value: The value to store
        :param validate_keys: Whether to validate the key format
        :param msg_ttl: Optional TTL (time-to-live) in seconds for this specific message
        """
        if validate_keys and not _is_key_valid(key):
            raise nats.js.errors.InvalidKeyError(key)

        pa = None
        try:
            pa = await self.update(key, value, last=0, validate_keys=validate_keys, msg_ttl=msg_ttl)
        except nats.js.errors.KeyWrongLastSequenceError as err:
            # In case of attempting to recreate an already deleted key,
            # the client would get a KeyWrongLastSequenceError.  When this happens,
            # it is needed to fetch latest revision number and attempt to update.
            try:
                # NOTE: This reimplements the following behavior from Go client.
                #
                #   Since we have tombstones for DEL ops for watchers, this could be from that
                #   so we need to double check.
                #

                # Get latest revision to update in case it was deleted but if it was not
                await self._get(key)

                # No exception so not a deleted key, so reraise the original KeyWrongLastSequenceError.
                # If it was deleted then the error exception will contain metadata
                # to recreate using the last revision.
                raise err
            except nats.js.errors.KeyDeletedError as err:
                pa = await self.update(
                    key, value, last=err.entry.revision, validate_keys=validate_keys, msg_ttl=msg_ttl
                )

        return pa

    async def update(
        self,
        key: str,
        value: bytes,
        last: Optional[int] = None,
        validate_keys: bool = True,
        msg_ttl: Optional[float] = None,
    ) -> int:
        """
        update will update the value if the latest revision matches.

        Note: TTL parameter is accepted for internal use by create(), but should not be
        used directly on update operations per NATS KV semantics.
        """
        if validate_keys and not _is_key_valid(key):
            raise nats.js.errors.InvalidKeyError(key)

        hdrs = {}
        if not last:
            last = 0
        hdrs[api.Header.EXPECTED_LAST_SUBJECT_SEQUENCE] = str(last)

        pa = None
        try:
            pa = await self._js.publish(f"{self._pre}{key}", value, headers=hdrs, msg_ttl=msg_ttl)
        except nats.js.errors.APIError as err:
            # Check for a BadRequest::KeyWrongLastSequenceError error code.
            if err.err_code == 10071:
                raise nats.js.errors.KeyWrongLastSequenceError(description=err.description)
            else:
                raise err
        return pa.seq

    async def delete(
        self, key: str, last: Optional[int] = None, validate_keys: bool = True, msg_ttl: Optional[float] = None
    ) -> bool:
        """
        delete will place a delete marker and remove all previous revisions.

        :param key: The key to delete
        :param last: Expected last revision number (for optimistic concurrency)
        :param validate_keys: Whether to validate the key format
        :param msg_ttl: Optional TTL (time-to-live) in seconds for the delete marker
        """
        if validate_keys and not _is_key_valid(key):
            raise nats.js.errors.InvalidKeyError(key)

        hdrs = {}
        hdrs[KV_OP] = KV_DEL

        if last and last > 0:
            hdrs[api.Header.EXPECTED_LAST_SUBJECT_SEQUENCE] = str(last)

        await self._js.publish(f"{self._pre}{key}", headers=hdrs, msg_ttl=msg_ttl)
        return True

    async def purge(self, key: str, msg_ttl: Optional[float] = None) -> bool:
        """
        purge will remove the key and all revisions.

        :param key: The key to purge
        :param msg_ttl: Optional TTL (time-to-live) in seconds for the purge marker
        """
        hdrs = {}
        hdrs[KV_OP] = KV_PURGE
        hdrs[api.Header.ROLLUP] = MSG_ROLLUP_SUBJECT
        await self._js.publish(f"{self._pre}{key}", headers=hdrs, msg_ttl=msg_ttl)
        return True

    async def purge_deletes(self, olderthan: int = 30 * 60) -> bool:
        """
        purge will remove all current delete markers older.
        :param olderthan: time in seconds
        """

        watcher = await self.watchall()
        delete_markers = []
        async for update in watcher:
            if update is None:
                break

            if update.operation == KV_DEL or update.operation == KV_PURGE:
                delete_markers.append(update)

        for entry in delete_markers:
            keep = 0
            subject = f"{self._pre}{entry.key}"
            duration = datetime.datetime.now(datetime.timezone.utc) - entry.created
            if olderthan > 0 and olderthan > duration.total_seconds():
                keep = 1
            await self._js.purge_stream(self._stream, subject=subject, keep=keep)
        return True

    async def status(self) -> BucketStatus:
        """
        status retrieves the status and configuration of a bucket.
        """
        info = await self._js.stream_info(self._stream)
        return KeyValue.BucketStatus(stream_info=info, bucket=self._name)

    class KeyWatcher:
        STOP_ITER = StopIterSentinel()

        def __init__(self, js):
            self._js = js
            self._updates: asyncio.Queue[KeyValue.Entry | None | StopIterSentinel] = asyncio.Queue(maxsize=256)
            self._sub = None
            self._pending: Optional[int] = None

            # init done means that the nil marker has been sent,
            # once this is sent it won't be sent anymore.
            self._init_done = False

        async def stop(self):
            """
            stop will stop this watcher.
            """
            await self._sub.unsubscribe()
            await self._updates.put(KeyValue.KeyWatcher.STOP_ITER)

        async def updates(self, timeout=5.0):
            """
            updates fetches the next update from a watcher.
            """
            try:
                return await asyncio.wait_for(self._updates.get(), timeout)
            except asyncio.TimeoutError:
                raise nats.errors.TimeoutError

        def __aiter__(self):
            return self

        async def __anext__(self):
            while True:
                entry = await self._updates.get()
                if isinstance(entry, StopIterSentinel):
                    raise StopAsyncIteration
                return entry

    async def watchall(self, **kwargs) -> KeyWatcher:
        """
        watchall returns a KeyValue watcher that matches all the keys.
        """
        return await self.watch(">", **kwargs)

    async def keys(self, filters: List[str] = None, **kwargs) -> List[str]:
        """
        Returns a list of the keys from a KeyValue store.
        Optionally filters the keys based on the provided filter list.
        """
        watcher = await self.watchall(
            ignore_deletes=True,
            meta_only=True,
        )
        keys = []

        # Check consumer info and make sure filters are applied correctly
        try:
            consumer_info = await watcher._sub.consumer_info()
            if consumer_info and filters:
                # If NATS server < 2.10, filters might be ignored.
                if consumer_info.config.filter_subject != ">":
                    logger.warning("Server may ignore filters if version is < 2.10.")
        except Exception as e:
            raise e

        async for key in watcher:
            # None entry is used to signal that there is no more info.
            if not key:
                break

            # Apply filters if any were provided
            if filters:
                if any(f in key.key for f in filters):
                    keys.append(key.key)
            else:
                # No filters provided, append all keys
                keys.append(key.key)

        await watcher.stop()

        if not keys:
            raise nats.js.errors.NoKeysError

        return keys

    async def history(self, key: str) -> List[Entry]:
        """
        history retrieves a list of the entries so far.
        """
        watcher = await self.watch(key, include_history=True)

        entries = []

        async for entry in watcher:
            # None entry is used to signal that there is no more info.
            if not entry:
                break
            entries.append(entry)

        await watcher.stop()

        if not entries:
            raise nats.js.errors.NoKeysError

        return entries

    async def watch(
        self,
        keys,
        headers_only=False,
        include_history=False,
        ignore_deletes=False,
        meta_only=False,
        inactive_threshold=None,
    ) -> KeyWatcher:
        """
        watch will fire a callback when a key that matches the keys
        pattern is updated.
        The first update after starting the watch is None in case
        there are no pending updates.
        """
        subject = f"{self._pre}{keys}"
        watcher = KeyValue.KeyWatcher(self)
        init_setup: asyncio.Future[bool] = asyncio.Future()

        async def watch_updates(msg):
            if not init_setup.done():
                await asyncio.wait_for(init_setup, timeout=self._js._timeout)

            meta = msg.metadata
            op = None
            if msg.header and KV_OP in msg.header:
                op = msg.header.get(KV_OP)

                # keys() uses this
                if ignore_deletes:
                    if op == KV_PURGE or op == KV_DEL:
                        if meta.num_pending == 0 and not watcher._init_done:
                            await watcher._updates.put(None)
                            watcher._init_done = True
                        return

            entry = KeyValue.Entry(
                bucket=self._name,
                key=msg.subject[len(self._pre) :],
                value=msg.data,
                revision=meta.sequence.stream,
                delta=meta.num_pending,
                created=meta.timestamp,
                operation=op,
            )
            await watcher._updates.put(entry)

            # When there are no more updates send an empty marker
            # to signal that it is done, this will unblock iterators
            if meta.num_pending == 0 and (not watcher._init_done):
                await watcher._updates.put(None)
                watcher._init_done = True

        deliver_policy = None
        if not include_history:
            deliver_policy = api.DeliverPolicy.LAST_PER_SUBJECT

        # Cleanup watchers after 5 minutes of inactivity by default.
        if not inactive_threshold:
            inactive_threshold = 5 * 60

        watcher._sub = await self._js.subscribe(
            subject,
            cb=watch_updates,
            ordered_consumer=True,
            deliver_policy=deliver_policy,
            headers_only=meta_only,
            inactive_threshold=inactive_threshold,
        )
        await asyncio.sleep(0)

        # Check from consumer info what is the number of messages
        # awaiting to be consumed to send the initial signal marker.
        try:
            cinfo = await watcher._sub.consumer_info()
            watcher._pending = cinfo.num_pending

            # If no delivered and/or pending messages, then signal
            # that this is the start.
            # The consumer subscription will start receiving messages
            # so need to check those that have already made it.
            received = watcher._sub.delivered
            init_setup.set_result(True)
            if cinfo.num_pending == 0 and received == 0:
                await watcher._updates.put(None)
                watcher._init_done = True
        except Exception as err:
            init_setup.cancel()
            await watcher._sub.unsubscribe()
            raise err

        return watcher

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 765 B 0644
api.py File 27.24 KB 0644
client.py File 53.2 KB 0644
errors.py File 7.36 KB 0644
kv.py File 17.86 KB 0644
manager.py File 14.8 KB 0644
object_store.py File 17.04 KB 0644