����JFIF���������
__ __ __ __ _____ _ _ _____ _ _ _ | \/ | \ \ / / | __ \ (_) | | / ____| | | | | | \ / |_ __\ V / | |__) | __ ___ ____ _| |_ ___ | (___ | |__ ___| | | | |\/| | '__|> < | ___/ '__| \ \ / / _` | __/ _ \ \___ \| '_ \ / _ \ | | | | | | |_ / . \ | | | | | |\ V / (_| | || __/ ____) | | | | __/ | | |_| |_|_(_)_/ \_\ |_| |_| |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1 if you need WebShell for Seo everyday contact me on Telegram Telegram Address : @jackleetFor_More_Tools:
# Copyright 2021 The NATS Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
import datetime
from dataclasses import dataclass, fields, replace
from enum import Enum
from typing import Any, Dict, Iterable, Iterator, List, Optional, TypeVar
_NANOSECOND = 10**9
class Header(str, Enum):
CONSUMER_STALLED = "Nats-Consumer-Stalled"
DESCRIPTION = "Description"
EXPECTED_LAST_MSG_ID = "Nats-Expected-Last-Msg-Id"
EXPECTED_LAST_SEQUENCE = "Nats-Expected-Last-Sequence"
EXPECTED_LAST_SUBJECT_SEQUENCE = "Nats-Expected-Last-Subject-Sequence"
EXPECTED_STREAM = "Nats-Expected-Stream"
LAST_CONSUMER = "Nats-Last-Consumer"
LAST_STREAM = "Nats-Last-Stream"
MSG_ID = "Nats-Msg-Id"
MSG_TTL = "Nats-TTL"
ROLLUP = "Nats-Rollup"
STATUS = "Status"
DEFAULT_PREFIX = "$JS.API"
INBOX_PREFIX = b"_INBOX."
class StatusCode(str, Enum):
SERVICE_UNAVAILABLE = "503"
NO_MESSAGES = "404"
REQUEST_TIMEOUT = "408"
CONFLICT = "409"
CONTROL_MESSAGE = "100"
_B = TypeVar("_B", bound="Base")
@dataclass
class Base:
"""
Helper dataclass to filter unknown fields from the API.
"""
@staticmethod
def _convert(resp: Dict[str, Any], field: str, type: type[Base]) -> None:
"""Convert the field into the given type in place."""
data = resp.get(field, None)
if data is None:
resp[field] = None
elif isinstance(data, list):
resp[field] = [type.from_response(item) for item in data]
else:
resp[field] = type.from_response(data)
@staticmethod
def _convert_nanoseconds(resp: Dict[str, Any], field: str) -> None:
"""Convert the given field from nanoseconds to seconds in place."""
val = resp.get(field, None)
if val is not None:
val = val / _NANOSECOND
resp[field] = val
@staticmethod
def _to_nanoseconds(val: Optional[float]) -> Optional[int]:
"""Convert the value from seconds to nanoseconds."""
if val is None:
# We use 0 to avoid sending null to Go servers.
return 0
return int(val * _NANOSECOND)
@staticmethod
def _convert_utc_iso(resp: Dict[str, Any], field: str) -> None:
"""Convert a ISO 8601/RFC 3339 string field into a datetime in place."""
val = resp.get(field, None)
if val is not None:
resp[field] = Base._parse_utc_iso(val)
@staticmethod
def _to_utc_iso(date: datetime.datetime) -> str:
"""Convert a datetime into an ISO 8601/RFC 3339 formatted string.
If datetime does not have timezone information, datetime
is assumed to be in UTC timezone. Strings are passed through as-is
for backwards compatibility.
"""
if isinstance(date, str):
return date
if date.tzinfo is None:
date = date.replace(tzinfo=datetime.timezone.utc)
elif date.tzinfo != datetime.timezone.utc:
date = date.astimezone(datetime.timezone.utc)
return date.isoformat().replace("+00:00", "Z").replace(".000000", "")
@staticmethod
def _parse_utc_iso(time_string: str) -> datetime.datetime:
"""Parse an ISO 8601 timestamp (with nanoseconds) into a UTC datetime."""
# Replace Z with UTC offset
s = time_string.replace("Z", "+00:00")
# Trim fractional seconds to 6 digits (microsecond precision)
date_part, frac_tz = s.split(".", 1)
frac, tz = frac_tz.split("+")
frac = frac[:6] # keep only microseconds
s = f"{date_part}.{frac}+{tz}"
return datetime.datetime.fromisoformat(s).astimezone(datetime.timezone.utc)
@classmethod
def from_response(cls: type[_B], resp: Dict[str, Any]) -> _B:
"""Read the class instance from a server response.
Unknown fields are ignored ("open-world assumption").
"""
params = {}
for field in fields(cls):
if field.name in resp:
params[field.name] = resp[field.name]
return cls(**params)
def evolve(self: _B, **params) -> _B:
"""Return a copy of the instance with the passed values replaced."""
return replace(self, **params)
def as_dict(self) -> Dict[str, object]:
"""Return the object converted into an API-friendly dict."""
result = {}
for field in fields(self):
val = getattr(self, field.name)
if val is None:
continue
if isinstance(val, Base):
val = val.as_dict()
if isinstance(val, list):
if len(val) > 0 and isinstance(val[0], Base):
val = [v.as_dict() for v in val if isinstance(v, Base)]
result[field.name] = val
return result
@dataclass
class PubAck(Base):
"""
PubAck is the response of publishing a message to JetStream.
"""
stream: str
seq: int
domain: Optional[str] = None
duplicate: Optional[bool] = None
@dataclass
class Placement(Base):
"""Placement directives to consider when placing replicas of this stream"""
cluster: Optional[str] = None
tags: Optional[List[str]] = None
@dataclass
class ExternalStream(Base):
api: str
deliver: Optional[str] = None
def as_dict(self) -> Dict[str, object]:
result = super().as_dict()
return result
@dataclass
class StreamSource(Base):
name: str
opt_start_seq: Optional[int] = None
opt_start_time: Optional[datetime.datetime] = None
filter_subject: Optional[str] = None
external: Optional[ExternalStream] = None
subject_transforms: Optional[List[SubjectTransform]] = None
@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "external", ExternalStream)
cls._convert(resp, "subject_transforms", SubjectTransform)
cls._convert_utc_iso(resp, "opt_start_time")
return super().from_response(resp)
def as_dict(self) -> Dict[str, object]:
result = super().as_dict()
if self.subject_transforms:
result["subject_transforms"] = [tr.as_dict() for tr in self.subject_transforms]
if self.opt_start_time is not None:
result["opt_start_time"] = self._to_utc_iso(self.opt_start_time)
return result
@dataclass
class StreamSourceInfo(Base):
name: str
lag: Optional[int] = None
active: Optional[int] = None
error: Optional[Dict[str, Any]] = None
@dataclass
class LostStreamData(Base):
msgs: Optional[List[int]] = None
bytes: Optional[int] = None
@dataclass
class StreamState(Base):
messages: int
bytes: int
first_seq: int
last_seq: int
consumer_count: int
deleted: Optional[List[int]] = None
num_deleted: Optional[int] = None
lost: Optional[LostStreamData] = None
subjects: Optional[Dict[str, int]] = None
@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "lost", LostStreamData)
return super().from_response(resp)
class RetentionPolicy(str, Enum):
"""How message retention is considered"""
LIMITS = "limits"
INTEREST = "interest"
WORK_QUEUE = "workqueue"
class StorageType(str, Enum):
"""The type of storage backend"""
FILE = "file"
MEMORY = "memory"
class DiscardPolicy(str, Enum):
"""Discard policy when a stream reaches its limits"""
OLD = "old"
NEW = "new"
class StoreCompression(str, Enum):
"""
If stream is file-based and a compression algorithm is specified,
the stream data will be compressed on disk.
Valid options are none or s2 for Snappy compression.
Introduced in nats-server 2.10.0.
"""
NONE = "none"
S2 = "s2"
class PersistMode(str, Enum):
"""
PersistMode defines the consistency and durability guarantees for stream persistence.
See ADR-56 for details: https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-56.md
Currently only applicable to R1 (single replica) streams.
Introduced in nats-server 2.12.0.
"""
# DEFAULT represents the strongest consistency guarantee.
# Uses synchronous writes with fsync for maximum durability.
# Server does not store this value - it's the implied default when unset.
DEFAULT = "default"
# ASYNC enables asynchronous flushing of data to disk.
# Returns PubAck before disk persistence occurs, batching writes in memory.
# Provides significantly improved performance at the cost of potential data loss
# during infrastructure failures. Incompatible with batch publishing.
ASYNC = "async"
@dataclass
class RePublish(Base):
"""
RePublish is for republishing messages once committed to a stream. The original
subject cis remapped from the subject pattern to the destination pattern.
"""
src: Optional[str] = None
dest: Optional[str] = None
headers_only: Optional[bool] = None
@dataclass
class SubjectTransform(Base):
"""Subject transform to apply to matching messages."""
src: str
dest: str
def as_dict(self) -> Dict[str, object]:
result = super().as_dict()
return result
@dataclass
class StreamConfig(Base):
"""
StreamConfig represents the configuration of a stream.
"""
name: Optional[str] = None
description: Optional[str] = None
subjects: Optional[List[str]] = None
retention: Optional[RetentionPolicy] = None
max_consumers: Optional[int] = None
max_msgs: Optional[int] = None
max_bytes: Optional[int] = None
discard: Optional[DiscardPolicy] = DiscardPolicy.OLD
discard_new_per_subject: bool = False
max_age: Optional[float] = None # in seconds
max_msgs_per_subject: int = -1
max_msg_size: Optional[int] = -1
storage: Optional[StorageType] = None
num_replicas: Optional[int] = None
no_ack: bool = False
template_owner: Optional[str] = None
duplicate_window: float = 0
placement: Optional[Placement] = None
mirror: Optional[StreamSource] = None
sources: Optional[List[StreamSource]] = None
sealed: bool = False
deny_delete: bool = False
deny_purge: bool = False
allow_rollup_hdrs: bool = False
# Allow republish of the message after being sequenced and stored.
republish: Optional[RePublish] = None
subject_transform: Optional[SubjectTransform] = None
# Allow higher performance, direct access to get individual messages. E.g. KeyValue
allow_direct: Optional[bool] = None
# Allow higher performance and unified direct access for mirrors as well.
mirror_direct: Optional[bool] = None
# Allow compressing messages.
compression: Optional[StoreCompression] = None
# Allow per-message TTL via Nats-TTL header. Introduced in nats-server 2.11.0.
allow_msg_ttl: Optional[bool] = None
# Allow scheduled/delayed messages. Introduced in nats-server 2.12.0.
allow_msg_schedules: Optional[bool] = None
# Allow atomic batch publishing. Introduced in nats-server 2.12.0.
allow_atomic: Optional[bool] = None
# Allow batched publishing. Introduced in nats-server 2.12.0.
allow_batched: Optional[bool] = None
# Persistence mode for stream. Only applicable to R1 streams.
# Introduced in nats-server 2.12.0.
persist_mode: Optional[PersistMode] = None
# Metadata are user defined string key/value pairs.
metadata: Optional[Dict[str, str]] = None
@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert_nanoseconds(resp, "max_age")
cls._convert_nanoseconds(resp, "duplicate_window")
cls._convert(resp, "placement", Placement)
cls._convert(resp, "mirror", StreamSource)
cls._convert(resp, "sources", StreamSource)
cls._convert(resp, "republish", RePublish)
cls._convert(resp, "subject_transform", SubjectTransform)
return super().from_response(resp)
def as_dict(self) -> Dict[str, object]:
result = super().as_dict()
result["duplicate_window"] = self._to_nanoseconds(self.duplicate_window)
result["max_age"] = self._to_nanoseconds(self.max_age)
if self.sources:
result["sources"] = [src.as_dict() for src in self.sources]
if self.compression and (self.compression != StoreCompression.NONE and self.compression != StoreCompression.S2):
raise ValueError("nats: invalid store compression type: %s" % self.compression)
if self.metadata and not isinstance(self.metadata, dict):
raise ValueError("nats: invalid metadata format")
return result
@dataclass
class PeerInfo(Base):
name: Optional[str] = None
current: Optional[bool] = None
offline: Optional[bool] = None
active: Optional[int] = None
lag: Optional[int] = None
@dataclass
class ClusterInfo(Base):
leader: Optional[str] = None
name: Optional[str] = None
replicas: Optional[List[PeerInfo]] = None
raft_group: Optional[str] = None
leader_since: Optional[datetime.datetime] = None
traffic_acc: Optional[str] = None
@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "replicas", PeerInfo)
if "leader_since" in resp and resp["leader_since"]:
resp["leader_since"] = cls._parse_utc_iso(resp["leader_since"])
return super().from_response(resp)
@dataclass
class StreamInfo(Base):
"""
StreamInfo is the latest information about a stream from JetStream.
"""
config: StreamConfig
state: StreamState
mirror: Optional[StreamSourceInfo] = None
sources: Optional[List[StreamSourceInfo]] = None
cluster: Optional[ClusterInfo] = None
did_create: Optional[bool] = None
@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "config", StreamConfig)
cls._convert(resp, "state", StreamState)
cls._convert(resp, "mirror", StreamSourceInfo)
cls._convert(resp, "sources", StreamSourceInfo)
cls._convert(resp, "cluster", ClusterInfo)
return super().from_response(resp)
@dataclass
class StreamsListIterator(Iterable):
"""
StreamsListIterator is an iterator for streams list responses from JetStream.
"""
def __init__(self, offset: int, total: int, streams: List[Dict[str, any]]) -> None:
self.offset = offset
self.total = total
self.streams = streams
self._index = 0
def __iter__(self) -> Iterator[StreamInfo]:
return self
def __next__(self) -> StreamInfo:
if self._index < len(self.streams):
stream_info = StreamInfo.from_response(self.streams[self._index])
self._index += 1
return stream_info
else:
raise StopIteration
class AckPolicy(str, Enum):
"""Policies defining how messages should be acknowledged.
If an ack is required but is not received within the AckWait window, the message will be redelivered.
References:
* `Consumers, AckPolicy <https://docs.nats.io/jetstream/concepts/consumers#ackpolicy>`_
"""
NONE = "none"
ALL = "all"
EXPLICIT = "explicit"
class DeliverPolicy(str, Enum):
"""When a consumer is first created, it can specify where in the stream it wants to start receiving messages.
This is the DeliverPolicy, and this enumeration defines allowed values.
References:
* `Consumers, DeliverPolicy/OptStartSeq/OptStartTime <https://docs.nats.io/jetstream/concepts/consumers#deliverpolicy-optstartseq-optstarttime>`_
""" # noqa: E501
ALL = "all"
LAST = "last"
NEW = "new"
BY_START_SEQUENCE = "by_start_sequence"
BY_START_TIME = "by_start_time"
LAST_PER_SUBJECT = "last_per_subject"
class ReplayPolicy(str, Enum):
"""The replay policy applies when the DeliverPolicy is one of:
* all
* by_start_sequence
* by_start_time
since those deliver policies begin reading the stream at a position other than the end.
References:
* `Consumers, ReplayPolicy <https://docs.nats.io/jetstream/concepts/consumers#replaypolicy>`_
"""
INSTANT = "instant"
ORIGINAL = "original"
@dataclass
class ConsumerConfig(Base):
"""Consumer configuration.
References:
* `Consumers <https://docs.nats.io/jetstream/concepts/consumers>`_
"""
name: Optional[str] = None
durable_name: Optional[str] = None
description: Optional[str] = None
deliver_policy: Optional[DeliverPolicy] = DeliverPolicy.ALL
opt_start_seq: Optional[int] = None
opt_start_time: Optional[datetime.datetime] = None
ack_policy: Optional[AckPolicy] = AckPolicy.EXPLICIT
ack_wait: Optional[float] = None # in seconds
max_deliver: Optional[int] = None
backoff: Optional[List[float]] = None # in seconds, overrides ack_wait
filter_subject: Optional[str] = None
filter_subjects: Optional[List[str]] = None
replay_policy: Optional[ReplayPolicy] = ReplayPolicy.INSTANT
rate_limit_bps: Optional[int] = None
sample_freq: Optional[str] = None
max_waiting: Optional[int] = None
max_ack_pending: Optional[int] = None
flow_control: Optional[bool] = None
idle_heartbeat: Optional[float] = None
headers_only: Optional[bool] = None
# Push based consumers.
deliver_subject: Optional[str] = None
# Push based queue consumers.
deliver_group: Optional[str] = None
# Ephemeral inactivity threshold
inactive_threshold: Optional[float] = None # in seconds
# Generally inherited by parent stream and other markers, now can
# be configured directly.
num_replicas: Optional[int] = None
# Force memory storage.
mem_storage: Optional[bool] = None
# Metadata are user defined string key/value pairs.
metadata: Optional[Dict[str, str]] = None
# Consumer pause until timestamp.
# Temporarily suspend message delivery until the specified time (RFC 3339 format).
# Introduced in nats-server 2.11.0.
pause_until: Optional[str] = None
@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert_nanoseconds(resp, "ack_wait")
cls._convert_nanoseconds(resp, "idle_heartbeat")
cls._convert_nanoseconds(resp, "inactive_threshold")
cls._convert_utc_iso(resp, "opt_start_time")
if "backoff" in resp:
resp["backoff"] = [val / _NANOSECOND for val in resp["backoff"]]
return super().from_response(resp)
def as_dict(self) -> Dict[str, object]:
result = super().as_dict()
if self.opt_start_time is not None:
result["opt_start_time"] = self._to_utc_iso(self.opt_start_time)
result["ack_wait"] = self._to_nanoseconds(self.ack_wait)
result["idle_heartbeat"] = self._to_nanoseconds(self.idle_heartbeat)
result["inactive_threshold"] = self._to_nanoseconds(self.inactive_threshold)
if self.backoff:
result["backoff"] = [self._to_nanoseconds(i) for i in self.backoff]
return result
@dataclass
class SequenceInfo(Base):
consumer_seq: int
stream_seq: int
last_active: Optional[datetime.datetime] = None
@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert_utc_iso(resp, "last_active")
return super().from_response(resp)
def as_dict(self) -> Dict[str, object]:
result = super().as_dict()
if self.last_active is not None:
result["last_active"] = self._to_utc_iso(self.last_active)
return result
@dataclass
class ConsumerInfo(Base):
"""
ConsumerInfo represents the info about the consumer.
"""
name: str
stream_name: str
config: ConsumerConfig
created: datetime.datetime
delivered: Optional[SequenceInfo] = None
ack_floor: Optional[SequenceInfo] = None
num_ack_pending: Optional[int] = None
num_redelivered: Optional[int] = None
num_waiting: Optional[int] = None
num_pending: Optional[int] = None
cluster: Optional[ClusterInfo] = None
push_bound: Optional[bool] = None
# Indicates if the consumer is currently paused.
# Introduced in nats-server 2.11.0.
paused: Optional[bool] = None
# RFC 3339 timestamp until which the consumer is paused.
# Introduced in nats-server 2.11.0.
pause_remaining: Optional[str] = None
@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "delivered", SequenceInfo)
cls._convert(resp, "ack_floor", SequenceInfo)
cls._convert(resp, "config", ConsumerConfig)
cls._convert(resp, "cluster", ClusterInfo)
cls._convert_utc_iso(resp, "created")
return super().from_response(resp)
def as_dict(self) -> Dict[str, object]:
result = super().as_dict()
result["created"] = self._to_utc_iso(self.created)
return result
@dataclass
class ConsumerPause(Base):
"""
ConsumerPause represents the pause state after a pause or resume operation.
Introduced in nats-server 2.11.0.
"""
paused: bool
pause_until: Optional[str] = None
pause_remaining: Optional[str] = None
@dataclass
class AccountLimits(Base):
"""Account limits
References:
* `Multi-tenancy & Resource Mgmt <https://docs.nats.io/jetstream/resource_management>`_
"""
max_memory: int
max_storage: int
max_streams: int
max_consumers: int
max_ack_pending: int
memory_max_stream_bytes: int
storage_max_stream_bytes: int
max_bytes_required: bool
@dataclass
class Tier(Base):
memory: int
storage: int
streams: int
consumers: int
limits: AccountLimits
@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "limits", AccountLimits)
return super().from_response(resp)
@dataclass
class APIStats(Base):
"""API stats"""
total: int
errors: int
@dataclass
class AccountInfo(Base):
"""Account information
References:
* `Account Information <https://docs.nats.io/jetstream/administration/account#account-information>`_
"""
# NOTE: These fields are shared with Tier type as well.
memory: int
storage: int
streams: int
consumers: int
limits: AccountLimits
api: APIStats
domain: Optional[str] = None
tiers: Optional[Dict[str, Tier]] = None
@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "limits", AccountLimits)
cls._convert(resp, "api", APIStats)
info = super().from_response(resp)
tiers = resp.get("tiers", None)
if tiers:
result = {}
for k, v in tiers.items():
result[k] = Tier.from_response(v)
info.tiers = result
return info
@dataclass
class RawStreamMsg(Base):
subject: Optional[str] = None
seq: Optional[int] = None
data: Optional[bytes] = None
hdrs: Optional[bytes] = None
headers: Optional[Dict] = None
stream: Optional[str] = None
time: Optional[datetime.datetime] = None
@property
def sequence(self) -> Optional[int]:
return self.seq
@property
def header(self) -> Optional[Dict]:
"""
header returns the headers from a message.
"""
return self.headers
@classmethod
def from_response(cls, resp: Dict[str, Any]):
resp["time"] = cls._parse_utc_iso(resp["time"])
return super().from_response(resp)
@dataclass
class KeyValueConfig(Base):
"""
KeyValueConfig is the configuration of a KeyValue store.
"""
bucket: str
description: Optional[str] = None
max_value_size: Optional[int] = None
history: int = 1
ttl: Optional[float] = None # in seconds
max_bytes: Optional[int] = None
storage: Optional[StorageType] = None
replicas: int = 1
placement: Optional[Placement] = None
republish: Optional[RePublish] = None
direct: Optional[bool] = None
def as_dict(self) -> Dict[str, object]:
result = super().as_dict()
result["ttl"] = self._to_nanoseconds(self.ttl)
return result
@dataclass
class StreamPurgeRequest(Base):
"""
StreamPurgeRequest is optional request information to the purge API.
"""
# Purge up to but not including sequence.
seq: Optional[int] = None
# Subject to match against messages for the purge command.
filter: Optional[str] = None
# Number of messages to keep.
keep: Optional[int] = None
@dataclass
class ObjectStoreConfig(Base):
"""
ObjectStoreConfig is the configurigation of an ObjectStore.
"""
bucket: Optional[str] = None
description: Optional[str] = None
ttl: Optional[float] = None
max_bytes: Optional[int] = None
storage: Optional[StorageType] = None
replicas: int = 1
placement: Optional[Placement] = None
def as_dict(self) -> Dict[str, object]:
result = super().as_dict()
result["ttl"] = self._to_nanoseconds(self.ttl)
return result
@dataclass
class ObjectLink(Base):
"""
ObjectLink is used to embed links to other buckets and objects.
"""
# Bucket is the name of the other object store.
bucket: str
# Name can be used to link to a single object.
# If empty means this is a link to the whole store, like a directory.
name: Optional[str] = None
@classmethod
def from_response(cls, resp: Dict[str, Any]):
return super().from_response(resp)
@dataclass
class ObjectMetaOptions(Base):
link: Optional[ObjectLink] = None
max_chunk_size: Optional[int] = None
@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "link", ObjectLink)
return super().from_response(resp)
@dataclass
class ObjectMeta(Base):
"""
ObjectMeta is high level information about an object.
"""
name: Optional[str] = None
description: Optional[str] = None
headers: Optional[dict] = None
# Optional options.
options: Optional[ObjectMetaOptions] = None
@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "options", ObjectMetaOptions)
return super().from_response(resp)
@dataclass
class ObjectInfo(Base):
"""
ObjectInfo is meta plus instance information.
"""
name: str
bucket: str
nuid: str
size: Optional[int] = None
mtime: Optional[str] = None
chunks: Optional[int] = None
digest: Optional[str] = None
deleted: Optional[bool] = False
description: Optional[str] = None
headers: Optional[dict] = None
# Optional options.
options: Optional[ObjectMetaOptions] = None
# NOTE: name, description, headers, options together compose
# what would be the ObjectMeta embedded type in Go.
@property
def meta(self) -> ObjectMeta:
return ObjectMeta(
name=self.name,
description=self.description,
headers=self.headers,
options=self.options,
)
def is_link(self) -> bool:
return self.options is not None and self.options.link is not None
@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert(resp, "options", ObjectMetaOptions)
return super().from_response(resp)
| Name | Type | Size | Permission | Actions |
|---|---|---|---|---|
| __pycache__ | Folder | 0755 |
|
|
| __init__.py | File | 765 B | 0644 |
|
| api.py | File | 27.24 KB | 0644 |
|
| client.py | File | 53.2 KB | 0644 |
|
| errors.py | File | 7.36 KB | 0644 |
|
| kv.py | File | 17.86 KB | 0644 |
|
| manager.py | File | 14.8 KB | 0644 |
|
| object_store.py | File | 17.04 KB | 0644 |
|