diff --git a/.pylintrc b/.pylintrc
index 237ae36..2eeeb6f 100644
--- a/.pylintrc
+++ b/.pylintrc
@@ -65,8 +65,8 @@ confidence=
# --enable=similarities". If you want to run only the classes checker, but have
# no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W"
-# Entries added for qradar start at relative-import
-disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,relative-import,global-variable-not-assigned,global-statement,import-error,import-self,protected-access
+# Entries added for qradar start at global-statement
+disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,global-statement,missing-docstring,duplicate-code
[REPORTS]
@@ -117,10 +117,10 @@ include-naming-hint=no
property-classes=abc.abstractproperty
# Regular expression matching correct function names
-function-rgx=[a-z_][a-z0-9_]{2,30}$
+function-rgx=[a-z_][a-z0-9_]{2,40}$
# Naming hint for function names
-function-name-hint=[a-z_][a-z0-9_]{2,30}$
+function-name-hint=[a-z_][a-z0-9_]{2,40}$
# Regular expression matching correct variable names
variable-rgx=[a-z_][a-z0-9_]{2,30}$
@@ -194,7 +194,7 @@ max-nested-blocks=5
[FORMAT]
# Maximum number of characters on a single line.
-max-line-length=100
+max-line-length=120
# Regexp for a line that is allowed to be longer than the limit.
ignore-long-lines=^\s*(# )??$
diff --git a/.travis.yml b/.travis.yml
index c910780..50db099 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -13,7 +13,10 @@ addons:
script:
- ./lint.sh && ./test.sh
- - if [ "$TRAVIS_PULL_REQUEST" = "false" ] && [ -z "$TRAVIS_TAG" ] && [ "$TRAVIS_REPO_SLUG" == "ibm/qpylib"]; then sonar-scanner; fi # sonar only on non-PRs
+ # sonar only on non-PRs
+ - if [ "$TRAVIS_PULL_REQUEST" = "false" ] && [ -z "$TRAVIS_TAG" ] && [ "$TRAVIS_REPO_SLUG" == "ibm/qpylib"]; then
+ sonar-scanner;
+ fi
after_success:
- if [[ $TRAVIS_TAG ]]; then
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 0e781d1..fa7f7b2 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -1,49 +1,49 @@
-## Contributing In General
+# Contributing
To contribute code or documentation, please submit a [pull request](https://github.com/ibm/qpylib/pulls).
-### Proposing new features
+## Proposing new features
If you would like to implement a new feature, please [raise an issue](https://github.com/ibm/qpylib/issues)
before sending a pull request so the feature can be discussed.
-### Fixing bugs
+## Fixing bugs
To fix a bug, please [raise an issue](https://github.ibm.com/ibm/qpylib/issues) before sending a
pull request so it can be tracked.
-### Merge approval
+## Merge approval
-Any change requires approval from two maintainers before it can be merged.
-
-For a list of the maintainers, see the [MAINTAINERS](MAINTAINERS.md) page.
+Any change requires approval before it can be merged.
+A list of maintainers can be found on the [MAINTAINERS](MAINTAINERS.md) page.
## Legal
-Each source file must include a license header for the Apache
-Software License 2.0. Using the SPDX format is the simplest approach.
-See existing source files for an example.
+Each source file must include a license header for the Apache Software License 2.0.
+Using the SPDX format is the simplest approach. See existing source files for an example.
## Setup
-This project has been set up so that on your local machine you can lint, test and build in mostly the same way that Travis CI does. There are three scripts you can use:
+On your local machine you can lint, test and build in mostly
+the same way that Travis CI does. There are three scripts you can use:
-* lint.sh
-* test.sh
-* build.sh
+* `lint.sh`
+* `test.sh`
+* `build.sh`
-The requirements.txt file in the project contains only those Python packages needed to supplement what is already provided by the Travis CI environment. If you want to run the lint/test/build scripts locally, you may need to install other packages, e.g. pytest and mock, into your local Python environment.
+The `requirements.txt` file contains the Python packages needed to run these scripts.
## Code style
Pull requests will only be accepted if they pass the linting.
-Linting can be run either on your fork through Travis CI, or locally using **lint.sh**.
+Linting can be run either on your fork through Travis CI, or locally using `lint.sh`.
## Test
Pull requests will only be accepted if they pass all tests.
-Tests can be run either on your fork through Travis CI, or locally using **test.sh**.
+Tests can be run either on your fork through Travis CI, or locally using `test.sh`.
## Build
-The output of **build.sh** is a tar.gz file containing the qpylib package. You can use `pip install` to install the package into your Python environment.
+The output of `build.sh` is a tar.gz file containing the qpylib Python package.
+You can use `pip install` to install the package into your Python 3 environment.
diff --git a/README.md b/README.md
index cb0544e..fc7abce 100644
--- a/README.md
+++ b/README.md
@@ -4,15 +4,22 @@
# QRadar App Python Library (qpylib)
-QPyLib provides a library of Python utility functions to assist in developing QRadar applications.
+A library of Python utility functions to assist in developing QRadar applications
+by providing a simplified interface to the QRadar App Framework and REST API.
-The utility functions ensure compatibility with QRadar and provide a simplified interface to the QRadar REST API and App Framework.
+## Compatibility
+
+The qpylib library hosted here is for use only with apps that have been written in
+Python 3 to run on Red Hat Universal Base Image. It is not compatible with apps
+written to run on a CentOS base image.
+
+**NOTE**: until the first official qpylib release, planned for early Q3 2020,
+any code contained herein is unsupported and subject to change.
+
+## Project details
* [LICENSE](LICENSE)
* [CONTRIBUTING](CONTRIBUTING.md)
* [MAINTAINERS](MAINTAINERS.md)
* [CHANGELOG](CHANGELOG.md)
-## Getting started
-
-See Setup section in [CONTRIBUTING](CONTRIBUTING.md#Setup).
diff --git a/clean.sh b/clean.sh
index b273006..fecc7d1 100755
--- a/clean.sh
+++ b/clean.sh
@@ -7,4 +7,4 @@ rm -rf .pytest_cache
rm -f qpylib/version.py
rm -rf dist qpylib.egg-info
rm -rf qpylib/__pycache__
-rm -rf "test/__pycache__"
+rm -rf test/__pycache__
diff --git a/lint.sh b/lint.sh
index b69533a..2ad2ddb 100755
--- a/lint.sh
+++ b/lint.sh
@@ -3,4 +3,5 @@
#
# SPDX-License-Identifier: Apache-2.0
-python -m pylint -d C,R -r n --rcfile=.pylintrc qpylib test/*.py
+python -m pylint --version
+python -m pylint -r n --rcfile=.pylintrc qpylib test/*.py
diff --git a/qpylib/abstract_qpylib.py b/qpylib/abstract_qpylib.py
deleted file mode 100644
index 7a84618..0000000
--- a/qpylib/abstract_qpylib.py
+++ /dev/null
@@ -1,253 +0,0 @@
-# Copyright 2019 IBM Corporation All Rights Reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-from abc import ABCMeta, abstractmethod
-from flask import url_for
-import json
-import logging
-import requests
-import os
-from . import asset_qpylib
-from . import json_qpylib
-from . import offense_qpylib
-
-class AbstractQpylib(object, metaclass=ABCMeta):
-
- def __init__(self):
- self.LOGGER_NAME = 'com.ibm.applicationLogger'
- self.qlogger = 0
- self.cached_manifest = None
-
- # ==== Logging ====
-
- def log(self, message, level='info'):
- log_fn = self._choose_log_level(level)
- log_fn("127.0.0.1 [APP_ID/{0}][NOT:{1}] {2}".format(
- self.get_app_id(), self._map_notification_code(level), message))
-
- def create_log(self):
- self.qlogger = logging.getLogger(self.LOGGER_NAME)
- self._add_log_handler(self.qlogger)
- self.log("Created log {0}".format(self.LOGGER_NAME), 'info')
-
- def set_log_level(self, log_level='INFO'):
- self.qlogger.setLevel(self._map_log_level(log_level))
-
- @abstractmethod
- def _add_log_handler(self, loc_logger):
- pass
-
- def _choose_log_level(self, level='INFO'):
- if self.qlogger == 0:
- raise SystemError('You cannot use log before logging has been initialised')
- return {
- 'INFO': self.qlogger.info,
- 'DEBUG': self.qlogger.debug,
- 'ERROR': self.qlogger.error,
- 'WARNING': self.qlogger.warning,
- 'CRITICAL': self.qlogger.critical,
- 'EXCEPTION': self.qlogger.exception,
- }.get(level.upper(), self.qlogger.info)
-
- def _map_notification_code(self, log_level='INFO'):
- log_level = log_level.upper()
- return {
- 'INFO': "0000006000",
- 'DEBUG': "0000006000",
- 'ERROR': "0000003000",
- 'WARNING': "0000004000",
- 'CRITICAL': "0000003000",
- }.get(log_level, "0000006000")
-
- def _map_log_level(self, log_level='INFO'):
- log_level = log_level.upper()
- return {
- 'INFO': logging.INFO,
- 'DEBUG': logging.DEBUG,
- 'ERROR': logging.ERROR,
- 'WARNING': logging.WARNING,
- 'CRITICAL': logging.CRITICAL,
- }.get(log_level, logging.INFO)
-
- # ==== App details ====
-
- @abstractmethod
- def get_app_id(self):
- pass
-
- @abstractmethod
- def get_app_name(self):
- pass
-
- def get_manifest_json(self):
- if self.cached_manifest is None:
- full_manifest_location = self.get_root_path(self._get_manifest_location())
- with open(full_manifest_location) as manifest_file:
- self.cached_manifest = json.load(manifest_file)
- return self.cached_manifest
-
- @abstractmethod
- def _get_manifest_location(self):
- pass
-
- def _get_manifest_field_value(self, key, default_value=None):
- manifest = self.get_manifest_json()
- if key in manifest.keys():
- return manifest[key]
- if default_value is not None:
- return default_value
- raise KeyError('{0} is a required manifest field'.format(key))
-
- def get_store_path(self, relative_path):
- if relative_path == '':
- return os.path.join(self._root_path(), 'store')
- return os.path.join(self._root_path(), 'store', relative_path)
-
- def get_root_path(self, relative_path):
- return os.path.join(self._root_path(), relative_path)
-
- @abstractmethod
- def _root_path(self):
- pass
-
- @abstractmethod
- def get_app_base_url(self):
- pass
-
- def q_url_for(self, endpoint, **values):
- """
- Wraps the standard Flask url_for() method to include the proxied url
- through Qradar as a prefix to the Flask route name
- """
- url = self.get_app_base_url() + self._get_endpoint_url(endpoint, **values)
- self.log("q_url_for={0}".format(url), 'debug')
- return url
-
- def _get_endpoint_url(self, endpoint, **values):
- return url_for(endpoint, **values)
-
- @abstractmethod
- def get_console_address(self):
- pass
-
- # ==== REST ====
-
- @abstractmethod
- def REST(self, rest_type, request_url, headers=None, data=None,
- params=None, json_body=None, version=None, verify=None,
- timeout=60):
- pass
-
- def _chooseREST(self, rest_type):
- return {
- 'GET': requests.get,
- 'PUT': requests.put,
- 'POST': requests.post,
- 'DELETE': requests.delete,
- }.get(rest_type.upper(), self._unsupported_REST)
-
- def _unsupported_REST(self, *args, **kw_args):
- raise ValueError('Unsupported REST action was requested')
-
- # ==== JSON ====
-
- def to_json_dict(self, python_obj, classkey=None):
- """
- Helper function to convert a Python object into a dict
- usable with the JSON REST.
- Recursively converts fields which are also Python objects.
- @param python_obj: Python object to be converted into a dict
- @return dict object containing key:value pairs for the python
- objects fields. Useable with JSON REST.
- """
- if isinstance(python_obj, str):
- return python_obj
- if isinstance(python_obj, dict):
- data = {}
- for (k, v) in list(python_obj.items()):
- data[k] = self.to_json_dict(v, classkey)
- return data
- elif hasattr(python_obj, "_ast"):
- return self.to_json_dict(python_obj._ast())
- elif hasattr(python_obj, "__iter__"):
- return [self.to_json_dict(v, classkey) for v in python_obj]
- elif hasattr(python_obj, "__dict__"):
- data = dict([(key, self.to_json_dict(value, classkey))
- for key, value in python_obj.__dict__.items()
- if not callable(value) and not key.startswith('_')])
- if classkey is not None and hasattr(python_obj, "__class__"):
- data[classkey] = python_obj.__class__.__name__
- return data
- else:
- return python_obj
-
- def register_jsonld_type(self, context):
- if context is not None:
- jsonld_type = self._extract_type(context)
- self.log("register_jsonld_type {0}".format(str(jsonld_type)), "info")
- json_qpylib.register_jsonld_type(jsonld_type, context)
-
- def _extract_type(self, argument):
- type_id=None
- if '@context' in argument.keys():
- context=argument['@context']
- if '@type' in context.keys():
- type_id=context['@type']
- if type_id == '@id' and '@id' in context.keys():
- type_id=context['@id']
- return type_id
-
- def get_offense_rendering(self, offense_id, render_type):
- rendering_fn = self._choose_offense_rendering(render_type)
- return rendering_fn(offense_id)
-
- def _choose_offense_rendering(self, render_type):
- self.log('_choose_offense_rendering {0}'.format(str(render_type)), 'debug')
- return {
- 'HTML': offense_qpylib.get_offense_json_html,
- 'JSONLD': offense_qpylib.get_offense_json_ld,
- }.get(render_type.upper(), offense_qpylib.get_offense_json_html)
-
- def get_asset_rendering(self, asset_id, render_type):
- rendering_fn = self._choose_asset_rendering(render_type)
- return rendering_fn(asset_id)
-
- def _choose_asset_rendering(self, render_type):
- self.log('_choose_asset_rendering {0}'.format(str(render_type)), 'debug')
- return {
- 'HTML': asset_qpylib.get_asset_json_html,
- 'JSONLD': asset_qpylib.get_asset_json_ld,
- }.get(render_type.upper(), asset_qpylib.get_asset_json_html)
-
- def render_json_ld_type(self, jld_type, data, jld_id = None):
- return json_qpylib.render_json_ld_type(jld_type, data, jld_id)
-
- def register_jsonld_endpoints(self):
- manifest = self.get_manifest_json()
- services=None
- endpoints=None
- if 'services' in manifest.keys():
- services=manifest['services']
-
- if services is not None:
- for service in services:
- if 'endpoints' in service.keys():
- endpoints=service['endpoints']
-
- if endpoints is not None:
- for endpoint in endpoints:
- jsonld_context = None
- if 'request_mime_type' in endpoint.keys():
- argument=endpoint
- jsonld_context = self._extract_jsonld_context(argument, 'request_mime_type', 'request_body_type')
- self.register_jsonld_type(jsonld_context)
- if 'response' in endpoint.keys():
- argument = endpoint['response']
- jsonld_context = self._extract_jsonld_context(argument, 'mime_type', 'body_type')
- self.register_jsonld_type(jsonld_context)
-
- def _extract_jsonld_context(self, argument, mime_id, context_id):
- if mime_id in argument.keys() and context_id in argument.keys():
- if argument[mime_id] == 'application/json+ld':
- return argument[context_id]
diff --git a/qpylib/app_qpylib.py b/qpylib/app_qpylib.py
new file mode 100644
index 0000000..f1b7138
--- /dev/null
+++ b/qpylib/app_qpylib.py
@@ -0,0 +1,91 @@
+# Copyright 2019 IBM Corporation All Rights Reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import json
+import os
+from flask import request, url_for
+
+Q_CACHED_MANIFEST = None
+
+def get_app_id():
+ return get_manifest_field_value('app_id', 0)
+
+def get_app_name():
+ return get_manifest_field_value('name')
+
+def get_manifest_json():
+ global Q_CACHED_MANIFEST
+ if Q_CACHED_MANIFEST is None:
+ full_manifest_location = get_root_path('manifest.json')
+ with open(full_manifest_location) as manifest_file:
+ Q_CACHED_MANIFEST = json.load(manifest_file)
+ return Q_CACHED_MANIFEST
+
+def get_manifest_field_value(key, default_value=None):
+ manifest = get_manifest_json()
+ if key in manifest.keys():
+ return manifest[key]
+ if default_value is not None:
+ return default_value
+ raise KeyError('{0} is a required manifest field'.format(key))
+
+def get_root_path(*path_entries):
+ return _build_path(*path_entries)
+
+def get_store_path(*path_entries):
+ return _build_path('store', *path_entries)
+
+def get_log_path(*path_entries):
+ return _build_path('store', 'log', *path_entries)
+
+def _build_path(*path_entries):
+ return os.path.join(get_env_var('APP_ROOT'), *path_entries)
+
+def get_endpoint_url(endpoint, **values):
+ return url_for(endpoint, **values)
+
+def get_console_ip():
+ return get_env_var('QRADAR_CONSOLE_IP')
+
+def get_console_fqdn():
+ return get_env_var('QRADAR_CONSOLE_FQDN')
+
+def get_env_var(key):
+ value = os.getenv(key)
+ if value is None:
+ raise KeyError('Environment variable {0} is not set'.format(key))
+ return value
+
+def get_app_base_url():
+ """
+ Gets the full url that will proxy an app request to its plugin servlet.
+ If any of the information required for building the proxy is missing
+ then an empty string is returned.
+ """
+ app_id = get_app_id()
+ if app_id == '':
+ return ''
+
+ host = _get_host()
+ if host is None:
+ return ''
+
+ return "https://{0}/console/plugins/{1}/app_proxy".format(host, app_id)
+
+def _get_host():
+ try:
+ host = _get_host_header()
+ except: # pylint: disable=W0702
+ host = None
+
+ if host is None:
+ try:
+ host = get_console_ip()
+ except KeyError:
+ return None
+
+ return host
+
+def _get_host_header():
+ return request.headers.get('X-Console-Host')
diff --git a/qpylib/asset_qpylib.py b/qpylib/asset_qpylib.py
index 8bb9ebc..ad1e752 100644
--- a/qpylib/asset_qpylib.py
+++ b/qpylib/asset_qpylib.py
@@ -2,8 +2,8 @@
#
# SPDX-License-Identifier: Apache-2.0
+from . import app_qpylib
from . import json_qpylib
-from . import qpylib
# Context location yet to be finalised.
JSON_LD_CONTEXT = 'http://qradar/context/location'
@@ -13,7 +13,7 @@ def get_asset_url(asset_id):
return 'api/asset_model/assets/{0}'.format(asset_id)
def get_asset_url_full(asset_id):
- return 'https://{0}/{1}'.format(qpylib.get_console_address(), get_asset_url(asset_id))
+ return 'https://{0}/{1}'.format(app_qpylib.get_console_fqdn(), get_asset_url(asset_id))
def get_asset_json(asset_id):
# Actual implementation commented out for now - see get_asset_url above
@@ -25,6 +25,16 @@ def get_asset_json(asset_id):
asset_json['id'] = asset_id
return asset_json
+def get_asset_rendering(asset_id, render_type):
+ rendering_fn = _choose_asset_rendering(render_type)
+ return rendering_fn(asset_id)
+
+def _choose_asset_rendering(render_type):
+ return {
+ 'HTML': get_asset_json_html,
+ 'JSONLD': get_asset_json_ld,
+ }.get(render_type.upper(), get_asset_json_html)
+
def get_asset_json_ld(asset_id):
asset_json = get_asset_json(asset_id)
return json_qpylib.json_ld(JSON_LD_CONTEXT,
@@ -34,7 +44,7 @@ def get_asset_json_ld(asset_id):
'Asset details for id ' + asset_id,
asset_json)
-def get_asset_json_html(asset_id, generate_html = None):
+def get_asset_json_html(asset_id, generate_html=None):
asset_json = get_asset_json(asset_id)
if generate_html is None:
asset_html = get_asset_html_example(asset_json)
diff --git a/qpylib/encdec.py b/qpylib/encdec.py
index d62f47c..d7eb7ac 100644
--- a/qpylib/encdec.py
+++ b/qpylib/encdec.py
@@ -7,30 +7,27 @@
import os
import string
import uuid
-
-from Crypto.Util.Padding import pad, unpad
-
-from . import qpylib
-
from Crypto.Random import random
from Crypto.Cipher import AES
from Crypto.Protocol import KDF
+from Crypto.Util.Padding import pad, unpad
+from . import qpylib
# The encryption class is now version aware.
# EACH BREAKING CHANGE IN THE CRYPTOGRAPHIC METHODS SHOULD INCREMENT THE VERSION
# This enables better error handling for users. Users can also query the
# engine version to make decisions on how to handle secrets
-class Encryption(object):
+class Encryption():
engine_version = 3
def __init__(self, data):
if 'name' not in data or 'user' not in data or data['name'] == '' or data['user'] == '':
raise ValueError("Encryption : name and user are mandatory fields!")
- self.APP_UUID_ENV_VARIABLE = 'QRADAR_APP_UUID'
- if self.APP_UUID_ENV_VARIABLE not in os.environ:
- raise KeyError("Encryption : {0} not available in environment".format(self.APP_UUID_ENV_VARIABLE))
+ self.app_uuid_env_var = 'QRADAR_APP_UUID'
+ if self.app_uuid_env_var not in os.environ:
+ raise KeyError("Encryption : {0} not available in environment".format(self.app_uuid_env_var))
self.name = data['name']
self.user_id = data['user']
- self.app_uuid = os.environ.get(self.APP_UUID_ENV_VARIABLE)
+ self.app_uuid = os.environ.get(self.app_uuid_env_var)
self.config_path = qpylib.get_store_path(str(self.user_id) + '_e.db')
self.config = {}
self.__load_config()
@@ -78,7 +75,8 @@ def __generate_token(self):
token = self.__generate_token()
return token
- def __generate_random(self):
+ @staticmethod
+ def __generate_random():
""" Returns a string containing a random hash that uses letters, digits and special characters """
random_hash = ''.join(
(
@@ -138,7 +136,9 @@ def decrypt(self):
raise ValueError("Encryption : no secret to decrypt")
if self.config[self.name].get('version') != Encryption.engine_version:
- raise ValueError("Encryption : secret engine mismatch. Secret was stored with version {}, attempted to decrypt with version {}.".format(self.config[self.name].get('version') , Encryption.engine_version))
+ raise ValueError(("Encryption : secret engine mismatch. "
+ "Secret was stored with version {}, attempted to decrypt with version {}.")
+ .format(self.config[self.name].get('version'), Encryption.engine_version))
try:
return self.__decrypt_string(self.config[self.name]['secret'])
diff --git a/qpylib/json_qpylib.py b/qpylib/json_qpylib.py
index 7716dcd..7eefd15 100644
--- a/qpylib/json_qpylib.py
+++ b/qpylib/json_qpylib.py
@@ -3,37 +3,105 @@
# SPDX-License-Identifier: Apache-2.0
import json
+from . import app_qpylib
# A dictionary of jsonld context types mapped to the type name
-jsonld_types = {}
+JSONLD_TYPES = {}
+
+def register_jsonld_endpoints():
+ manifest = app_qpylib.get_manifest_json()
+
+ services = None
+ if 'services' in manifest.keys():
+ services = manifest['services']
+ if services is None:
+ return
+
+ for service in services:
+ endpoints = None
+ if 'endpoints' in service.keys():
+ endpoints = service['endpoints']
+ if endpoints is None:
+ continue
+ for endpoint in endpoints:
+ jsonld_context = None
+ if 'request_mime_type' in endpoint.keys():
+ argument = endpoint
+ jsonld_context = _extract_jsonld_context(argument, 'request_mime_type', 'request_body_type')
+ register_jsonld_type_from_context(jsonld_context)
+ if 'response' in endpoint.keys():
+ argument = endpoint['response']
+ jsonld_context = _extract_jsonld_context(argument, 'mime_type', 'body_type')
+ register_jsonld_type_from_context(jsonld_context)
+
+def _extract_jsonld_context(argument, mime_id, context_id):
+ if mime_id in argument.keys() and context_id in argument.keys():
+ if argument[mime_id] == 'application/json+ld':
+ return argument[context_id]
+ return None
+
+def register_jsonld_type_from_context(context):
+ if context is not None:
+ jsonld_type = _extract_type(context)
+ register_jsonld_type(jsonld_type, context)
def register_jsonld_type(jsonld_type, context):
- global jsonld_types
- jsonld_types[str(jsonld_type)] = context
+ global JSONLD_TYPES
+ JSONLD_TYPES[str(jsonld_type)] = context
def get_jsonld_type(jsonld_type):
- global jsonld_types
- if jsonld_type in jsonld_types.keys():
- return jsonld_types[str(jsonld_type)]
- else:
- raise ValueError('json ld key has not been registered')
+ global JSONLD_TYPES
+ if jsonld_type in JSONLD_TYPES.keys():
+ return JSONLD_TYPES[str(jsonld_type)]
+ raise ValueError('json ld key has not been registered')
+
+def _extract_type(argument):
+ type_id = None
+ if '@context' in argument.keys():
+ context = argument['@context']
+ if '@type' in context.keys():
+ type_id = context['@type']
+ if type_id == '@id' and '@id' in context.keys():
+ type_id = context['@id']
+ return type_id
-def render_json_ld_type(jld_type, data, jld_id = None):
+def render_json_ld_type(jld_type, data, jld_id=None):
jld_context = get_jsonld_type(jld_type)
json_dict = {}
for json_key in data:
json_dict[json_key] = data[json_key]
- json_dict['@context']=jld_context['@context']
- json_dict['@type']=jld_type
- if None != jld_id:
- json_dict['@type']=jld_type
+ json_dict['@context'] = jld_context['@context']
+ json_dict['@type'] = jld_type
+ if jld_id is not None:
+ json_dict['@type'] = jld_type
return json.dumps(json_dict, sort_keys=True)
+# pylint: disable=too-many-arguments
def json_ld(jld_context, jld_id, jld_type, name, description, data):
return json.dumps({'@context': jld_context, '@id': jld_id, '@type': jld_type, 'name': name,
'description': description, 'data': data}, sort_keys=True)
def json_html(html):
return json.dumps({'html': html})
+
+def to_json_dict(python_obj, classkey=None):
+ if isinstance(python_obj, str):
+ return python_obj
+ if isinstance(python_obj, dict):
+ data = {}
+ for (k, val) in list(python_obj.items()):
+ data[k] = to_json_dict(val, classkey)
+ return data
+ if hasattr(python_obj, "__iter__"):
+ return [to_json_dict(v, classkey) for v in python_obj]
+ # pylint: disable=consider-using-dict-comprehension
+ if hasattr(python_obj, "__dict__"):
+ data = dict([(key, to_json_dict(value, classkey))
+ for key, value in python_obj.__dict__.items()
+ if not callable(value) and not key.startswith('_')])
+ if classkey is not None and hasattr(python_obj, "__class__"):
+ data[classkey] = python_obj.__class__.__name__
+ return data
+ return python_obj
diff --git a/qpylib/live_qpylib.py b/qpylib/live_qpylib.py
deleted file mode 100644
index 2c0cb80..0000000
--- a/qpylib/live_qpylib.py
+++ /dev/null
@@ -1,160 +0,0 @@
-# Copyright 2019 IBM Corporation All Rights Reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-from .abstract_qpylib import AbstractQpylib
-from flask import request, has_request_context
-from logging import Formatter
-from logging.handlers import RotatingFileHandler, SysLogHandler
-import os
-from socket import gethostbyname, gethostname
-
-class LiveQpylib(AbstractQpylib):
- LOGFILE_LOCATION = '/store/log/app.log'
- APP_CERT_LOCATION = '/etc/pki/tls/certs/ca-bundle.crt'
- APP_MANIFEST_LOCATION = 'app/manifest.json'
-
- QRADAR_CONSOLE_FQDN = 'QRADAR_CONSOLE_FQDN'
- QRADAR_CSRF = 'QRadarCSRF'
- SEC_HEADER = 'SEC'
- SEC_ADMIN_TOKEN = 'SEC_ADMIN_TOKEN'
-
- APP_FILE_LOG_FORMAT = '%(asctime)s [%(module)s.%(funcName)s] [%(threadName)s] [%(levelname)s] - %(message)s'
- APP_CONSOLE_LOG_FORMAT = '%(asctime)s %(module)s.%(funcName)s: %(message)s'
-
- # ==== Logging ====
-
- def _add_log_handler(self, loc_logger):
- loc_logger.setLevel(self._map_log_level(self._get_manifest_field_value('log_level', 'info')))
-
- handler = RotatingFileHandler(self.LOGFILE_LOCATION, maxBytes=2*1024*1024, backupCount=5)
- handler.setFormatter(Formatter(self.APP_FILE_LOG_FORMAT))
- loc_logger.addHandler(handler)
-
- # Ipv6 address - Strip [] for syslog
- console_ip = self.get_console_address()
- if console_ip.startswith('[') and console_ip.endswith(']'):
- console_ip = console_ip[1:-1]
-
- syslogHandler = SysLogHandler(address=(console_ip, 514))
- syslogHandler.setFormatter(Formatter(self.APP_CONSOLE_LOG_FORMAT))
- loc_logger.addHandler(syslogHandler)
- return
-
- # ==== App details ====
-
- def get_app_id(self):
- return self._get_manifest_field_value('app_id', 0)
-
- def get_app_name(self):
- return self._get_manifest_field_value('name')
-
- def _get_manifest_location(self):
- return self.APP_MANIFEST_LOCATION
-
- def _root_path(self):
- return "/"
-
- def get_app_base_url(self):
- """
- Gets the full url that will proxy an app request to its plugin servlet.
- If any of the information required for building the proxy is missing
- then an empty string is returned.
- """
- app_id = self._get_manifest_field_value('app_id', '')
-
- if app_id == '':
- self.log("get_app_base_url: app_id not found in manifest", 'error')
- return ''
-
- url_suffix = "/console/plugins/{0}/app_proxy".format(app_id)
- proxy_path = ''
-
- try:
- x_console_host = self._get_host_header()
- proxy_path = "https://{0}{1}".format(x_console_host, url_suffix)
-
- except: # pylint: disable=W0702
- console_ip = self._get_manifest_field_value('console_ip', '')
- if console_ip == '':
- self.log("get_app_base_url: console_ip not found in manifest", 'error')
- else:
- proxy_path = "https://{0}{1}".format(console_ip, url_suffix)
-
- self.log("get_app_base_url: proxy_path={0}".format(proxy_path), 'debug')
- return proxy_path
-
- def _get_host_header(self):
- return request.headers.get('X-Console-Host')
-
- def get_console_address(self):
- return self._get_manifest_field_value('console_ip', '127.0.0.1')
-
- # ==== REST ====
-
- def REST(self, rest_type, request_url, headers=None, data=None,
- params=None, json_body=None, version=None, verify=None,
- timeout=60):
-
- rest_headers = self._add_headers(headers, version)
-
- if os.getenv(self.QRADAR_CONSOLE_FQDN):
- console_address = os.getenv(self.QRADAR_CONSOLE_FQDN)
- else:
- console_address = self.get_console_address()
-
- full_url = "https://{0}/{1}".format(console_address, request_url)
-
- if os.path.isfile('/store/consolecert.pem'):
- # if /store/consolecert.pem exists then we need to pass it
- # to be able to communicate with console
- verify = '/store/consolecert.pem'
- else:
- # If the verify value isn't a string - i.e. True, False or None
- # retrieve the cert file location to try and ensure all REST requests use SSL.
- if not isinstance(verify, str):
- verify = self._get_cert_filepath()
-
- self.log("REST type=" + rest_type +
- " url=" + full_url +
- " headers=" + str(rest_headers) +
- " data=" + str(data) +
- " params=" + str(params) +
- " json_body=" + str(json_body) +
- " verify=" + str(verify) +
- " version=" + str(version), 'debug')
-
- return self._chooseREST(rest_type)(full_url, headers=rest_headers, data=data,
- params=params, json=json_body,
- timeout=timeout, verify=verify)
-
- def _get_cert_filepath(self):
- if '/etc/qradar_pki' in open('/proc/mounts').read():
- self.log('Using ca bundle cert from file: {0}'.format(str(self.APP_CERT_LOCATION)), level='debug')
- cert_filepath = self.APP_CERT_LOCATION
- else:
- self.log('/etc/qradar_pki is not mounted in the container. verify will be turned off', level='debug')
- cert_filepath = False
- return cert_filepath
-
- def _add_headers(self, headers, version=None):
- if headers is None:
- headers = {}
-
- if version is not None:
- headers['Version'] = version
-
- if headers.get('Host') is None:
- headers['Host'] = gethostbyname(gethostname())
-
- if has_request_context():
- if self.QRADAR_CSRF in request.cookies.keys():
- headers[self.QRADAR_CSRF] = request.cookies.get(self.QRADAR_CSRF)
- if self.SEC_HEADER in request.cookies.keys() \
- and self.SEC_HEADER not in headers.keys():
- headers[self.SEC_HEADER] = request.cookies.get(self.SEC_HEADER)
-
- if os.getenv(self.SEC_ADMIN_TOKEN):
- headers[self.SEC_HEADER] = os.getenv(self.SEC_ADMIN_TOKEN)
-
- return headers
diff --git a/qpylib/log_qpylib.py b/qpylib/log_qpylib.py
new file mode 100644
index 0000000..177baff
--- /dev/null
+++ b/qpylib/log_qpylib.py
@@ -0,0 +1,75 @@
+# Copyright 2019 IBM Corporation All Rights Reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import logging
+from logging.handlers import RotatingFileHandler, SysLogHandler
+from . import app_qpylib
+from . import util_qpylib
+
+APP_FILE_LOG_FORMAT = '%(asctime)s [%(module)s.%(funcName)s] [%(threadName)s] [%(levelname)s] - %(message)s'
+APP_CONSOLE_LOG_FORMAT = '%(asctime)s %(module)s.%(funcName)s: %(message)s'
+
+QLOGGER = 0
+
+def log(message, level):
+ log_fn = _choose_log_fn(level)
+ log_fn("[APP_ID/{0}][NOT:{1}] {2}"
+ .format(app_qpylib.get_app_id(), _map_notification_code(level), message))
+
+def create_log():
+ global QLOGGER
+ QLOGGER = logging.getLogger('com.ibm.applicationLogger')
+ QLOGGER.setLevel(default_log_level())
+
+ handler = RotatingFileHandler(_log_file_location(), maxBytes=2*1024*1024, backupCount=5)
+ handler.setFormatter(logging.Formatter(APP_FILE_LOG_FORMAT))
+ QLOGGER.addHandler(handler)
+
+ if not util_qpylib.is_sdk():
+ console_ip = app_qpylib.get_console_ip()
+ if util_qpylib.is_ipv6_address(console_ip):
+ console_ip = console_ip[1:-1]
+ syslog_handler = SysLogHandler(address=(console_ip, 514))
+ syslog_handler.setFormatter(logging.Formatter(APP_CONSOLE_LOG_FORMAT))
+ QLOGGER.addHandler(syslog_handler)
+
+def set_log_level(level='INFO'):
+ global QLOGGER
+ QLOGGER.setLevel(_map_log_level(level))
+
+def default_log_level():
+ return _map_log_level(app_qpylib.get_manifest_field_value('log_level', 'INFO'))
+
+def _log_file_location():
+ return app_qpylib.get_log_path('app.log')
+
+def _choose_log_fn(level):
+ global QLOGGER
+ if QLOGGER == 0:
+ raise RuntimeError('You cannot use log before logging has been initialised')
+ return {
+ 'INFO': QLOGGER.info,
+ 'DEBUG': QLOGGER.debug,
+ 'WARNING': QLOGGER.warning,
+ 'ERROR': QLOGGER.error,
+ 'CRITICAL': QLOGGER.critical
+ }.get(level.upper(), QLOGGER.info)
+
+def _map_notification_code(level):
+ return {
+ 'INFO': "0000006000",
+ 'DEBUG': "0000006000",
+ 'WARNING': "0000004000",
+ 'ERROR': "0000003000",
+ 'CRITICAL': "0000003000"
+ }.get(level.upper(), "0000006000")
+
+def _map_log_level(level):
+ return {
+ 'INFO': logging.INFO,
+ 'DEBUG': logging.DEBUG,
+ 'WARNING': logging.WARNING,
+ 'ERROR': logging.ERROR,
+ 'CRITICAL': logging.CRITICAL
+ }.get(level.upper(), logging.INFO)
diff --git a/qpylib/offense_qpylib.py b/qpylib/offense_qpylib.py
index 6e92aeb..cb3f031 100644
--- a/qpylib/offense_qpylib.py
+++ b/qpylib/offense_qpylib.py
@@ -2,17 +2,22 @@
#
# SPDX-License-Identifier: Apache-2.0
-from . import qpylib
+from . import app_qpylib
from . import json_qpylib
+# pylint: disable=cyclic-import
+from . import qpylib
# Context location yet to be finalised
JSON_LD_CONTEXT = 'http://qradar/context/location'
+OFFENSE_HEADER_TEMPLATE = ('
')
def get_offense_url(offense_id):
return 'api/siem/offenses/{0}'.format(offense_id)
def get_offense_url_full(offense_id):
- return 'https://{0}/{1}'.format(qpylib.get_console_address(), get_offense_url(offense_id))
+ return 'https://{0}/{1}'.format(app_qpylib.get_console_fqdn(), get_offense_url(offense_id))
def get_offense_json(offense_id):
response = qpylib.REST('get', get_offense_url(offense_id))
@@ -28,10 +33,17 @@ def get_offense_html_example(offense_json):
'')
def get_offense_html_header(offense_id):
- html_header = ''
- return html_header
+ return OFFENSE_HEADER_TEMPLATE.format(offense_id, app_qpylib.get_app_name())
+
+def get_offense_rendering(offense_id, render_type):
+ rendering_fn = _choose_offense_rendering(render_type)
+ return rendering_fn(offense_id)
+
+def _choose_offense_rendering(render_type):
+ return {
+ 'HTML': get_offense_json_html,
+ 'JSONLD': get_offense_json_ld,
+ }.get(render_type.upper(), get_offense_json_html)
def get_offense_json_ld(offense_id):
offense_json = get_offense_json(offense_id)
@@ -42,7 +54,7 @@ def get_offense_json_ld(offense_id):
'Offense details for id ' + str(offense_id),
offense_json)
-def get_offense_json_html(offense_id, generate_html = None, generate_heading = True):
+def get_offense_json_html(offense_id, generate_html=None, generate_heading=True):
offense_html = ''
if generate_heading:
offense_html = get_offense_html_header(offense_id)
diff --git a/qpylib/qpylib.py b/qpylib/qpylib.py
index b978a6f..4eac62a 100644
--- a/qpylib/qpylib.py
+++ b/qpylib/qpylib.py
@@ -2,81 +2,141 @@
#
# SPDX-License-Identifier: Apache-2.0
-import os
-from .live_qpylib import LiveQpylib
-from .sdk_qpylib import SdkQpylib
-
-def is_sdk():
- sdk_env = os.getenv('QRADAR_APPFW_SDK', 'no').lower() == 'true'
- return sdk_env
-
-def strategy():
- if is_sdk():
- return SdkQpylib()
- return LiveQpylib()
+from . import app_qpylib
+from . import asset_qpylib
+from . import json_qpylib
+from . import log_qpylib
+from . import offense_qpylib
+from . import rest_qpylib
+from . import util_qpylib
# ==== Logging ====
-def log(message, level='info'):
- strategy().log(message, level)
+def log(message, level='INFO'):
+ ''' Logs a message at the given level, which defaults to INFO.
+ Level values: DEBUG, INFO, WARNING, ERROR, CRITICAL.
+ Raises RuntimeError if logging was not previously initialised
+ by a call to qpylib.create_log().
+ '''
+ log_qpylib.log(message, level)
def create_log():
- strategy().create_log()
+ ''' Initialises logging with INFO as the threshold log level.
+ Must be called before any call to qpylib.log().
+ '''
+ log_qpylib.create_log()
-def set_log_level(log_level='info'):
- strategy().set_log_level(log_level)
+def set_log_level(level):
+ ''' Sets the threshold log level.
+ Level values: DEBUG, INFO, WARNING, ERROR, CRITICAL.
+ '''
+ log_qpylib.set_log_level(level)
# ==== App details ====
def get_app_id():
- return strategy().get_app_id()
+ ''' Returns the "app_id" value from the app manifest,
+ or 0 if app_id is not in the manifest.
+ '''
+ return app_qpylib.get_app_id()
def get_app_name():
- return strategy().get_app_name()
+ ''' Returns the "name" value from the app manifest.
+ Raises KeyError if "name" is not in the manifest.
+ '''
+ return app_qpylib.get_app_name()
def get_manifest_json():
- return strategy().get_manifest_json()
-
-def get_store_path(relative_path=''):
- return strategy().get_store_path(relative_path)
-
-def get_root_path(relative_path=''):
- return strategy().get_root_path(relative_path)
+ ''' Returns the content of the app manifest as a Python object. '''
+ return app_qpylib.get_manifest_json()
+
+def get_manifest_field_value(key, default_value=None):
+ ''' Returns the value of "key" from the app manifest.
+ If "key" is not in the manifest and default_value
+ was supplied, default_value is returned.
+ Raises KeyError if "key" is not in the manifest and
+ no default_value was supplied.
+ '''
+ return app_qpylib.get_manifest_field_value(key, default_value)
+
+def get_root_path(*path_entries):
+ ''' Returns the app's root path, joined with path_entries if supplied.
+ The app's root path is the location of the app directory and
+ manifest.json file.
+ Raises KeyError if environment variable APP_ROOT is not set.
+ '''
+ return app_qpylib.get_root_path(*path_entries)
+
+def get_store_path(*path_entries):
+ ''' Returns the app's store path, joined with path_entries if supplied.
+ Raises KeyError if environment variable APP_ROOT is not set.
+ '''
+ return app_qpylib.get_store_path(*path_entries)
def get_app_base_url():
- return strategy().get_app_base_url()
+ """ Returns the QRadar app proxy prefix. """
+ return app_qpylib.get_app_base_url()
def q_url_for(endpoint, **values):
- return strategy().q_url_for(endpoint, **values)
+ """ Returns the QRadar app proxy prefix joined to the Flask endpoint url. """
+ return get_app_base_url() + app_qpylib.get_endpoint_url(endpoint, **values)
def get_console_address():
- return strategy().get_console_address()
+ ''' Returns the QRadar console IP address.
+ Raises KeyError if environment variable QRADAR_CONSOLE_IP is not set.
+ '''
+ return app_qpylib.get_console_ip()
+
+def get_console_fqdn():
+ ''' Returns the QRadar console fully-qualified domain name.
+ Raises KeyError if environment variable QRADAR_CONSOLE_FQDN is not set.
+ '''
+ return app_qpylib.get_console_fqdn()
# ==== REST ====
-def REST(rest_type, request_url, headers=None, data=None, params=None,
- json_body=None, version=None, verify=None, timeout=60):
- return strategy().REST(rest_type, request_url, headers=headers,
- data=data, params=params, json_body=json_body,
- version=version, verify=verify,
- timeout=timeout)
+# pylint: disable=invalid-name, too-many-arguments
+def REST(rest_action, request_url, version=None, headers=None, data=None,
+ params=None, json_body=None, verify=None, timeout=60):
+ ''' Invokes a rest_action request to request_url using the Python requests module.
+ Returns a requests.Response object.
+ Raises ValueError if rest_action is not one of GET, PUT, POST, DELETE.
+ '''
+ if util_qpylib.is_sdk():
+ rest_func = rest_qpylib.sdk_rest
+ else:
+ rest_func = rest_qpylib.live_rest
+ return rest_func(rest_action, request_url, version, headers, data,
+ params, json_body, verify, timeout)
# ==== JSON ====
-def to_json_dict(python_obj):
- return strategy().to_json_dict(python_obj)
+def to_json_dict(python_obj, classkey=None):
+ """ Converts a Python object into a dict usable with the REST function.
+ Recursively converts fields which are also Python objects.
+ """
+ return json_qpylib.to_json_dict(python_obj, classkey)
+
+def register_jsonld_endpoints():
+ ''' Registers JSON-LD endpoints from the app manifest. '''
+ json_qpylib.register_jsonld_endpoints()
def register_jsonld_type(context):
- return strategy().register_jsonld_type(context)
+ ''' Registers a JSON-LD endpoint from the given context. '''
+ json_qpylib.register_jsonld_type_from_context(context)
def get_offense_rendering(offense_id, render_type):
- return strategy().get_offense_rendering(offense_id, render_type)
+ ''' Returns an offense, rendered according to render_type.
+ render_type is HTML or JSONLD.
+ '''
+ return offense_qpylib.get_offense_rendering(offense_id, render_type)
def get_asset_rendering(asset_id, render_type):
- return strategy().get_asset_rendering(asset_id, render_type)
-
-def render_json_ld_type(jld_type, data, jld_id = None):
- return strategy().render_json_ld_type(jld_type, data, jld_id)
-
-def register_jsonld_endpoints():
- return strategy().register_jsonld_endpoints()
+ ''' Returns an asset, rendered according to render_type.
+ render_type is HTML or JSONLD.
+ '''
+ return asset_qpylib.get_asset_rendering(asset_id, render_type)
+
+def render_json_ld_type(jld_type, data, jld_id=None):
+ ''' Returns a JSON-LD type value rendered as a JSON-formatted string. '''
+ return json_qpylib.render_json_ld_type(jld_type, data, jld_id)
diff --git a/qpylib/rest_qpylib.py b/qpylib/rest_qpylib.py
new file mode 100644
index 0000000..79dfc2c
--- /dev/null
+++ b/qpylib/rest_qpylib.py
@@ -0,0 +1,88 @@
+# Copyright 2019 IBM Corporation All Rights Reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import os
+from socket import gethostbyname, gethostname
+from flask import request, has_request_context
+import requests
+from . import app_qpylib
+
+# pylint: disable=too-many-arguments
+
+QRADAR_CSRF = 'QRadarCSRF'
+SEC_HEADER = 'SEC'
+SEC_ADMIN_TOKEN = 'SEC_ADMIN_TOKEN'
+
+def live_rest(rest_action, request_url, version, headers, data,
+ params, json_body, verify, timeout):
+ if not isinstance(verify, str):
+ verify = _get_cert_filepath()
+ return _rest(rest_action, request_url, version, headers, data,
+ params, json_body, verify, timeout)
+
+def sdk_rest(rest_action, request_url, version, headers, data,
+ params, json_body, verify, timeout):
+ if not isinstance(verify, str):
+ # To be completed. Default to no verification for now.
+ verify = False
+ return _rest(rest_action, request_url, version, headers, data,
+ params, json_body, verify, timeout)
+
+def _rest(rest_action, request_url, version, headers, data,
+ params, json_body, verify, timeout):
+ rest_func = _choose_rest_function(rest_action)
+ full_url = _generate_full_url(request_url)
+ rest_headers = _add_headers(headers, version)
+ proxies = _add_proxies()
+ return rest_func(full_url, headers=rest_headers, data=data, params=params,
+ json=json_body, verify=verify, timeout=timeout, proxies=proxies)
+
+def _get_cert_filepath():
+ with open('/proc/mounts') as mounts:
+ if '/etc/qradar_pki' in mounts.read():
+ return '/etc/pki/tls/certs/ca-bundle.crt'
+ return False
+
+def _add_headers(headers, version=None):
+ if headers is None:
+ headers = {}
+
+ if version is not None:
+ headers['Version'] = version
+
+ if headers.get('Host') is None:
+ headers['Host'] = gethostbyname(gethostname())
+
+ if has_request_context():
+ if QRADAR_CSRF in request.cookies.keys():
+ headers[QRADAR_CSRF] = request.cookies.get(QRADAR_CSRF)
+ if SEC_HEADER in request.cookies.keys() \
+ and SEC_HEADER not in headers.keys():
+ headers[SEC_HEADER] = request.cookies.get(SEC_HEADER)
+
+ sec_admin_token = os.getenv(SEC_ADMIN_TOKEN)
+ if sec_admin_token is not None:
+ headers[SEC_HEADER] = sec_admin_token
+
+ return headers
+
+def _add_proxies():
+ qradar_rest_proxy = os.getenv('QRADAR_REST_PROXY')
+ if qradar_rest_proxy is None:
+ return {}
+ return {'https': qradar_rest_proxy}
+
+def _generate_full_url(request_url):
+ return "https://{0}/{1}".format(app_qpylib.get_console_fqdn(), request_url)
+
+def _choose_rest_function(rest_action):
+ return {
+ 'GET': requests.get,
+ 'PUT': requests.put,
+ 'POST': requests.post,
+ 'DELETE': requests.delete,
+ }.get(rest_action.upper(), _unsupported_rest_action)
+
+def _unsupported_rest_action(*args, **kw_args):
+ raise ValueError('Unsupported REST action was requested')
diff --git a/qpylib/sdk_qpylib.py b/qpylib/sdk_qpylib.py
deleted file mode 100644
index 51681a6..0000000
--- a/qpylib/sdk_qpylib.py
+++ /dev/null
@@ -1,292 +0,0 @@
-# Copyright 2019 IBM Corporation All Rights Reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-from .abstract_qpylib import AbstractQpylib
-from cryptography import x509
-from cryptography.hazmat.backends import default_backend
-from cryptography.hazmat.primitives import hashes
-import getpass
-import json
-import logging
-import os
-import requests
-import socket
-import ssl
-import sys
-import unicodedata
-from OpenSSL.crypto import dump_certificate, FILETYPE_PEM
-from OpenSSL.SSL import (Context, Connection)
-from OpenSSL.SSL import SSLv23_METHOD
-
-DEV_AUTH_FILE = ".qradar_appfw.auth"
-DEV_CONSOLE_FILE = ".qradar_appfw.console"
-CONSOLE_CERT_FILE = ".qradar_appfw.console_cert.{0}.pem"
-YES_OPTIONS = ("y", "yes")
-
-VAULT_PORT = 9381
-ROOT_PEM_URL = 'http://{0}:{1}/vault-qrd_ca.pem'
-INTERMEDIATE_PEM_URL = 'http://{0}:{1}/vault-qrd_ca_int.pem'
-
-api_auth_user = 0
-api_auth_password = 0
-consoleIP = 0
-handler_added = 0
-
-class SdkQpylib(AbstractQpylib):
-
- # ==== Logging ====
-
- def _add_log_handler(self, loc_logger):
- global handler_added
- if 0 == handler_added:
- loc_logger.setLevel(self._map_log_level('debug'))
- handler = logging.StreamHandler()
- loc_logger.addHandler(handler)
- handler_added=1
-
- # ==== App details ====
-
- def get_app_id(self):
- return "DEV_APP"
-
- def get_app_name(self):
- return "SDK_APP"
-
- def _get_manifest_location(self):
- return 'manifest.json'
-
- def _root_path(self):
- return os.getenv('QRADAR_APPFW_WORKSPACE', '~')
-
- def get_app_base_url(self):
- return "http://localhost:5000"
-
- def get_console_address(self):
- global consoleIP
- global DEV_CONSOLE_FILE
- home = os.path.expanduser("~")
- console_file_path = os.path.join(home, DEV_CONSOLE_FILE)
- if os.path.isfile(console_file_path):
- print("Loading console details from file: {0}".format(str(console_file_path)))
- sys.stdout.flush()
- with open(console_file_path) as consolefile:
- console_json = json.load(consolefile)
- consoleIP = console_json["console"]
- else:
- if consoleIP == 0:
- console_data = {}
- print("What is the IP of QRadar console required to make this API call:")
- sys.stdout.flush()
- consoleIP = input()
- console_data['console'] = consoleIP
- print("Do you want to store the console IP at {0}?".format(console_file_path))
- print("[y/n]:")
- sys.stdout.flush()
- do_store = input()
- if do_store in YES_OPTIONS:
- with open(console_file_path, 'w+') as console_file:
- json.dump(console_data, console_file)
- return consoleIP
-
- # ==== REST ====
-
- def REST(self, rest_type, request_url, headers=None, data=None, params=None,
- json_body=None, version=None, verify=None, timeout=60):
- if headers is None:
- headers={}
- if version is not None:
- headers['Version'] = version
- auth = self._get_api_auth()
- consoleAddress = self.get_console_address()
- fullURL = "https://" + str(consoleAddress) + "/" + str(request_url)
- rest_func = self._chooseREST(rest_type)
- if not isinstance(verify, str):
- verify = self._get_cert_filepath(consoleAddress)
- return rest_func(fullURL, headers=headers, data=data, auth=auth, params=params,
- json=json_body, timeout=timeout, verify=verify)
-
- def _get_api_auth(self):
- auth = None
- global DEV_AUTH_FILE
- global api_auth_user
- global api_auth_password
- home = os.path.expanduser("~")
- auth_file_path = os.path.join(home, DEV_AUTH_FILE)
- if os.path.isfile(auth_file_path):
- print("Loading user details from file: {0}".format(auth_file_path))
- sys.stdout.flush()
- with open(auth_file_path) as authfile:
- auth_json = json.load(authfile)
- auth = (auth_json["user"], auth_json["password"])
- else:
- auth_data = {}
- consoleAddress = self.get_console_address()
- print("QRadar credentials for {0} are required to make this API call".format(consoleAddress))
- if api_auth_user == 0:
- print("User:")
- sys.stdout.flush()
- api_auth_user = input()
- if api_auth_password == 0:
- api_auth_password = getpass.getpass("Password:")
- auth_data['user'] = api_auth_user
- auth_data['password'] = api_auth_password
- print("Store credentials credentials at: {0}".format(auth_file_path))
- print("WARNING: credentials will be stored in clear.")
- print("[y/n]:")
- sys.stdout.flush()
- do_store = input()
- if do_store in YES_OPTIONS:
- with open(auth_file_path, 'w+') as auth_file:
- json.dump(auth_data, auth_file)
- auth = (api_auth_user, api_auth_password)
- print("Using Auth: " + str(auth))
- return auth
-
- def _get_cert_filepath(self, host):
- global CONSOLE_CERT_FILE
- console_cert_file_path = os.path.join(os.path.expanduser("~"), CONSOLE_CERT_FILE.format(host))
-
- if os.path.isfile(console_cert_file_path + ".ignore"):
- return False
-
- if os.path.isfile(console_cert_file_path):
- try:
- with open(console_cert_file_path) as pem_file:
- pem_text = pem_file.read()
- x509.load_pem_x509_certificate(pem_text.encode(), default_backend())
- print("Using console cert from file: {0}".format(str(console_cert_file_path)))
- sys.stdout.flush()
- return console_cert_file_path
-
- except ValueError:
- print("Removing invalid console cert file {0}".format(str(console_cert_file_path)))
- sys.stdout.flush()
- os.remove(console_cert_file_path)
-
- self._store_cert_from_server(host, console_cert_file_path)
- return console_cert_file_path
-
- # If unable to connect to server, this function raises a socket error.
- def _store_cert_from_server(self, host, console_cert_file_path):
- global ROOT_PEM_URL
- global INTERMEDIATE_PEM_URL
- global VAULT_PORT
-
- # Try to get the root and intermediate certs introduced in 7.3.2.
- # If that fails fall back to the pre-7.3.2 cert.
- use_pre_732_cert = False
- try:
- pem_data = requests.get(url = ROOT_PEM_URL.format(host, VAULT_PORT)).text
- pem_text = self._normalize_pem_data(pem_data)
- intermediate_pem_data = requests.get(url = INTERMEDIATE_PEM_URL.format(host, VAULT_PORT)).text
- intermediate_pem_text = self._normalize_pem_data(intermediate_pem_data)
- except requests.ConnectionError:
- use_pre_732_cert = True
- pem_data = ssl.get_server_certificate((host, 443))
- pem_text = self._normalize_pem_data(pem_data)
-
- print('')
- print('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
- print('Server {0} is unknown, do you want to trust it?'.format(host))
- print('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
- print('')
- self._display_pem_cert_details(pem_text)
- print("Do you trust this certificate [y/n]: ")
- sys.stdout.flush()
-
- do_store = input().strip().lower()
- if do_store not in YES_OPTIONS:
- print("Not storing cert file for {0}. Aborting request.".format(host))
- sys.stdout.flush()
- raise ValueError("Certificate was rejected")
-
- print("Storing cert file to {0}".format(console_cert_file_path))
- sys.stdout.flush()
-
- if use_pre_732_cert:
- with open(console_cert_file_path, 'w') as cert_file:
- self._dump_all_certs(cert_file, host)
- else:
- with open(console_cert_file_path, "a") as cert_file:
- cert_file.write(pem_text.decode())
- cert_file.write(intermediate_pem_text.decode())
-
- def _dump_all_certs(self, cert_file, address):
- # This will also include intermediate certs
- context = Context(SSLv23_METHOD)
- context.set_default_verify_paths()
- client = socket.socket()
- client.connect((address, 443))
- clientSSL = Connection(context, client)
- clientSSL.set_connect_state()
- clientSSL.do_handshake()
- chains = clientSSL.get_peer_cert_chain()
- for chain in chains:
- cert_file.write(dump_certificate(FILETYPE_PEM, chain).decode())
-
- _name_fields = [
- 'OID_COMMON_NAME',
- 'OID_COUNTRY_NAME',
- 'OID_DOMAIN_COMPONENT',
- 'OID_DN_QUALIFIER',
- 'OID_EMAIL_ADDRESS',
- 'OID_GENERATION_QUALIFIER',
- 'OID_GIVEN_NAME',
- 'OID_LOCALITY_NAME',
- 'OID_ORGANIZATIONAL_UNIT_NAME',
- 'OID_ORGANIZATION_NAME',
- 'OID_PSEUDONYM',
- 'OID_SERIAL_NUMBER',
- 'OID_STATE_OR_PROVINCE_NAME',
- 'OID_SURNAME',
- 'OID_TITLE'
- ]
-
- def _display_pem_cert_details(self, pem_text):
- pem_cert = x509.load_pem_x509_certificate(pem_text, default_backend())
- print('********************')
- print('Certificate details:')
- print('********************')
- print('Version: {}'.format(pem_cert.version))
- print('SHA-256 Fingerprint:', end=' ')
- print(':'.join(format(b, '02x') for b in pem_cert.fingerprint(hashes.SHA256())))
- print('SHA-1 Fingerprint:', end=' ')
- print(':'.join(format(b, '02x') for b in pem_cert.fingerprint(hashes.SHA1())))
- print('Serial Number: {}'.format(pem_cert.serial_number))
- print('Not valid before: {}'.format(pem_cert.not_valid_before))
- print('Not valid after: {}'.format(pem_cert.not_valid_after))
- print('Signature Hash Algorithm: {}'.format(pem_cert.signature_algorithm_oid._name))
-
- print('Issuer:')
- for attr in self._name_fields:
- oid = getattr(x509, attr.upper())
- issuer = pem_cert.issuer
- info = issuer.get_attributes_for_oid(oid)
- if (info):
- print(" {}: {}".format(attr, info[0].value))
-
- print('Subject:')
- for attr in self._name_fields:
- oid = getattr(x509, attr.upper())
- subject = pem_cert.subject
- info = subject.get_attributes_for_oid(oid)
- if (info):
- print(" {}: {}".format(attr, info[0].value))
-
- for ext in pem_cert.extensions:
- try:
- print('Extension: Name :', ext.oid._name)
- print(' Critical :', ext.critical)
- print(' Value :', ext.value)
- except UnicodeEncodeError:
- pass
-
- print('Signature:\n ', end=' ')
- print(":".join(format(b, '02x') for b in pem_cert.signature))
- print('TBS Cert Signature:\n ', end=' ')
- print(":".join(format(b, '02x') for b in pem_cert.tbs_certificate_bytes))
- print('')
-
- def _normalize_pem_data(self, pem_data):
- return unicodedata.normalize('NFKD', pem_data).encode('ascii', 'ignore')
diff --git a/qpylib/util_qpylib.py b/qpylib/util_qpylib.py
new file mode 100644
index 0000000..93a8b68
--- /dev/null
+++ b/qpylib/util_qpylib.py
@@ -0,0 +1,11 @@
+# Copyright 2019 IBM Corporation All Rights Reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import os
+
+def is_sdk():
+ return os.getenv('QRADAR_APPFW_SDK', 'no').lower() == 'true'
+
+def is_ipv6_address(ip_address):
+ return ip_address.startswith('[') and ip_address.endswith(']')
diff --git a/requirements-dev.txt b/requirements-dev.txt
deleted file mode 100644
index f2578c6..0000000
--- a/requirements-dev.txt
+++ /dev/null
@@ -1,2 +0,0 @@
--r requirements.txt
-pytest
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index 8a1bb7e..b219080 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -4,4 +4,5 @@ cryptography
pyOpenSSL
pycryptodome
pylint
+pytest
responses
diff --git a/test/manifests/loglevel.json b/test/manifests/loglevel.json
new file mode 100644
index 0000000..8375490
--- /dev/null
+++ b/test/manifests/loglevel.json
@@ -0,0 +1,9 @@
+{
+ "name": "Debug Manifest",
+ "description": "A sample debug manifest",
+ "version": "1.0",
+ "uuid": "ddff161f-871a-435c-866b-c65b4ceca959",
+ "app_id": 1008,
+ "console_ip": "9.123.234.101",
+ "log_level": "DEBUG"
+}
diff --git a/test/test_app_details.py b/test/test_app_details.py
index d8b7e31..8b4e3d9 100644
--- a/test/test_app_details.py
+++ b/test/test_app_details.py
@@ -2,54 +2,60 @@
#
# SPDX-License-Identifier: Apache-2.0
#
-# pylint: disable=unused-argument
+# pylint: disable=unused-argument, redefined-outer-name, invalid-name
from unittest.mock import patch
import os
import pytest
from qpylib import qpylib
-GET_MANIFEST_LOCATION = 'qpylib.live_qpylib.LiveQpylib._get_manifest_location'
-APP_ROOT_PATH = 'qpylib.live_qpylib.LiveQpylib._root_path'
-GET_HOST_HEADER = 'qpylib.live_qpylib.LiveQpylib._get_host_header'
-GET_ENDPOINT_URL = 'qpylib.abstract_qpylib.AbstractQpylib._get_endpoint_url'
-QTEST_DIR = os.path.dirname(__file__)
+GET_MANIFEST_JSON = 'qpylib.app_qpylib.get_root_path'
-@pytest.fixture(scope='module', autouse=True)
-def pre_testing_setup():
- with patch('qpylib.abstract_qpylib.AbstractQpylib.log'):
+def manifest_path(manifest_file):
+ return os.path.join(os.path.dirname(__file__), 'manifests', manifest_file)
+
+@pytest.fixture(scope='function', autouse=True)
+def clear_manifest_cache():
+ with patch('qpylib.app_qpylib.Q_CACHED_MANIFEST', None):
yield
-
+
+@pytest.fixture(scope='function')
+def env_qradar_console_ip():
+ os.environ['QRADAR_CONSOLE_IP'] = '9.123.234.101'
+ yield
+ del os.environ['QRADAR_CONSOLE_IP']
+
+@pytest.fixture(scope='function')
+def env_app_root():
+ os.environ['APP_ROOT'] = '/opt/app-root'
+ yield
+ del os.environ['APP_ROOT']
+
# ==== get_app_id ====
-@patch(GET_MANIFEST_LOCATION, return_value = 'manifests/installed.json')
-@patch(APP_ROOT_PATH, return_value = QTEST_DIR)
-def test_get_app_id_returns_value_from_manifest(mock_root_path, mock_get_manifest_location):
+@patch(GET_MANIFEST_JSON, return_value=manifest_path('installed.json'))
+def test_get_app_id_returns_value_from_manifest(mock_manifest):
assert qpylib.get_app_id() == 1005
-@patch(GET_MANIFEST_LOCATION, return_value = 'manifests/pre_install.json')
-@patch(APP_ROOT_PATH, return_value = QTEST_DIR)
-def test_get_app_id_returns_zero_when_field_missing_from_manifest(mock_root_path, mock_get_manifest_location):
+@patch(GET_MANIFEST_JSON, return_value=manifest_path('pre_install.json'))
+def test_get_app_id_returns_zero_when_field_missing_from_manifest(mock_manifest):
assert qpylib.get_app_id() == 0
# ==== get_app_name ====
-@patch(GET_MANIFEST_LOCATION, return_value = 'manifests/installed.json')
-@patch(APP_ROOT_PATH, return_value = QTEST_DIR)
-def test_get_app_name_returns_value_from_manifest(mock_root_path, mock_get_manifest_location):
+@patch(GET_MANIFEST_JSON, return_value=manifest_path('installed.json'))
+def test_get_app_name_returns_value_from_manifest(mock_manifest):
assert qpylib.get_app_name() == 'Live Manifest'
-@patch(GET_MANIFEST_LOCATION, return_value = 'manifests/missing_name.json')
-@patch(APP_ROOT_PATH, return_value = QTEST_DIR)
-def test_get_app_name_raises_error_when_field_missing_from_manifest(mock_root_path, mock_get_manifest_location):
+@patch(GET_MANIFEST_JSON, return_value=manifest_path('missing_name.json'))
+def test_get_app_name_raises_error_when_field_missing_from_manifest(mock_manifest):
with pytest.raises(KeyError, match='name is a required manifest field'):
qpylib.get_app_name()
# ==== get_manifest_json ====
-@patch(GET_MANIFEST_LOCATION, return_value = 'manifests/installed.json')
-@patch(APP_ROOT_PATH, return_value = QTEST_DIR)
-def test_get_manifest_json_no_cache(mock_root_path, mock_get_manifest_location):
+@patch(GET_MANIFEST_JSON, return_value=manifest_path('installed.json'))
+def test_get_manifest_json_no_cache(mock_manifest):
manifest_json = qpylib.get_manifest_json()
assert manifest_json['name'] == 'Live Manifest'
assert manifest_json['description'] == 'A sample live manifest'
@@ -58,62 +64,74 @@ def test_get_manifest_json_no_cache(mock_root_path, mock_get_manifest_location):
assert manifest_json['app_id'] == 1005
assert manifest_json['console_ip'] == '9.123.234.101'
-# ==== get_store_path ====
+# ==== get_root_path ====
-def test_get_store_path_with_no_relative_path_returns_slash_store():
- assert qpylib.get_store_path() == '/store'
+def test_get_root_path_with_env_var_missing():
+ with pytest.raises(KeyError, match='Environment variable APP_ROOT is not set'):
+ qpylib.get_root_path()
-def test_get_store_path_with_relative_path_appends_relative_path():
- assert qpylib.get_store_path('my/other/directory') == '/store/my/other/directory'
+def test_get_root_path_with_no_relative_path(env_app_root):
+ assert qpylib.get_root_path() == '/opt/app-root'
-# ==== get_root_path ====
+def test_get_root_path_with_relative_path(env_app_root):
+ assert qpylib.get_root_path('my', 'other', 'directory') == '/opt/app-root/my/other/directory'
-def test_get_root_path_with_no_relative_path_returns_slash():
- assert qpylib.get_root_path() == '/'
+# ==== get_store_path ====
+
+def test_get_store_path_with_no_relative_path(env_app_root):
+ assert qpylib.get_store_path() == '/opt/app-root/store'
-def test_get_root_path_with_relative_path_appends_relative_path():
- assert qpylib.get_root_path('my/other/directory') == '/my/other/directory'
+def test_get_store_path_with_relative_path(env_app_root):
+ assert qpylib.get_store_path('my', 'other', 'directory') == '/opt/app-root/store/my/other/directory'
# ==== get_app_base_url ====
-@patch(GET_MANIFEST_LOCATION, return_value = 'manifests/pre_install.json')
-@patch(APP_ROOT_PATH, return_value = QTEST_DIR)
-def test_get_app_base_url_returns_empty_string_when_app_id_missing_from_manifest(mock_root_path, mock_get_manifest_location):
+@patch(GET_MANIFEST_JSON, return_value=manifest_path('pre_install.json'))
+def test_get_app_base_url_returns_empty_string_when_app_id_missing_from_manifest(mock_manifest):
assert qpylib.get_app_base_url() == ''
-@patch(GET_MANIFEST_LOCATION, return_value = 'manifests/installed_no_console_ip.json')
-@patch(APP_ROOT_PATH, return_value = QTEST_DIR)
-def test_get_app_base_url_returns_empty_string_when_console_ip_missing_from_manifest(mock_root_path, mock_get_manifest_location):
+@patch(GET_MANIFEST_JSON, return_value=manifest_path('installed_no_console_ip.json'))
+def test_get_app_base_url_returns_empty_string_when_host_cannot_be_determined(mock_manifest):
assert qpylib.get_app_base_url() == ''
-@patch(GET_MANIFEST_LOCATION, return_value = 'manifests/installed.json')
-@patch(APP_ROOT_PATH, return_value = QTEST_DIR)
-def test_get_app_base_url_uses_console_ip_when_x_console_host_header_missing(mock_root_path, mock_get_manifest_location):
- assert qpylib.get_app_base_url() == 'https://9.123.234.101/console/plugins/1005/app_proxy'
+@patch(GET_MANIFEST_JSON, return_value=manifest_path('installed_no_console_ip.json'))
+def test_get_app_base_url_uses_console_ip_when_x_console_host_header_missing(mock_manifest,
+ env_qradar_console_ip):
+ assert qpylib.get_app_base_url() == 'https://9.123.234.101/console/plugins/1007/app_proxy'
-@patch(GET_MANIFEST_LOCATION, return_value = 'manifests/installed.json')
-@patch(APP_ROOT_PATH, return_value = QTEST_DIR)
-@patch(GET_HOST_HEADER, return_value = '9.10.11.12')
-def test_get_app_base_url_uses_x_console_host_header_if_present(mock_get_host_header, mock_root_path, mock_get_manifest_location):
- assert qpylib.get_app_base_url() == 'https://9.10.11.12/console/plugins/1005/app_proxy'
+@patch(GET_MANIFEST_JSON, return_value=manifest_path('installed_no_console_ip.json'))
+@patch('qpylib.app_qpylib._get_host_header', return_value='9.10.11.12')
+def test_get_app_base_url_uses_x_console_host_header_if_present(mock_get_host_header, mock_manifest,
+ env_qradar_console_ip):
+ assert qpylib.get_app_base_url() == 'https://9.10.11.12/console/plugins/1007/app_proxy'
# ==== q_url_for ====
-@patch(GET_MANIFEST_LOCATION, return_value = 'manifests/installed.json')
-@patch(APP_ROOT_PATH, return_value = QTEST_DIR)
-@patch(GET_ENDPOINT_URL, return_value = '/index')
-def test_q_url_for(mock_flask_url_for, mock_root_path, mock_get_manifest_location):
+@patch(GET_MANIFEST_JSON, return_value=manifest_path('installed.json'))
+@patch('qpylib.app_qpylib.get_endpoint_url', return_value='/index')
+def test_q_url_for(mock_flask_url_for, mock_manifest, env_qradar_console_ip):
assert qpylib.q_url_for('index') == 'https://9.123.234.101/console/plugins/1005/app_proxy/index'
# ==== get_console_address ====
-@patch(GET_MANIFEST_LOCATION, return_value = 'manifests/installed.json')
-@patch(APP_ROOT_PATH, return_value = QTEST_DIR)
-def test_get_console_address_returns_value_from_manifest(mock_root_path, mock_get_manifest_location):
- assert qpylib.get_console_address() == "9.123.234.101"
+def test_get_console_address_with_env_var_set(env_qradar_console_ip):
+ assert qpylib.get_console_address() == '9.123.234.101'
+
+def test_get_console_address_with_env_var_missing():
+ with pytest.raises(KeyError, match='Environment variable QRADAR_CONSOLE_IP is not set'):
+ qpylib.get_console_address()
+
+# ==== get_console_fqdn ====
+
+@pytest.fixture(scope='function')
+def env_qradar_console_fqdn():
+ os.environ['QRADAR_CONSOLE_FQDN'] = 'myhost.ibm.com'
+ yield
+ del os.environ['QRADAR_CONSOLE_FQDN']
+
+def test_get_console_fqdn_with_env_var_set(env_qradar_console_fqdn):
+ assert qpylib.get_console_fqdn() == 'myhost.ibm.com'
-@patch(GET_MANIFEST_LOCATION, return_value = 'manifests/installed_no_console_ip.json')
-@patch(APP_ROOT_PATH, return_value = QTEST_DIR)
-def test_get_console_address_returns_default_when_field_missing_from_manifest(mock_root_path, mock_get_manifest_location):
- assert qpylib.get_console_address() == '127.0.0.1'
-
+def test_get_console_fqdn_with_env_var_missing():
+ with pytest.raises(KeyError, match='Environment variable QRADAR_CONSOLE_FQDN is not set'):
+ qpylib.get_console_fqdn()
diff --git a/test/test_endec.py b/test/test_endec.py
index 5e390ef..c6a849b 100644
--- a/test/test_endec.py
+++ b/test/test_endec.py
@@ -2,7 +2,7 @@
#
# SPDX-License-Identifier: Apache-2.0
#
-# pylint: disable=redefined-outer-name, unused-argument
+# pylint: disable=redefined-outer-name, unused-argument, invalid-name
import json
from unittest.mock import patch
@@ -15,7 +15,7 @@
@pytest.fixture(scope='module', autouse=True)
def pre_testing_setup():
- with patch('qpylib.abstract_qpylib.AbstractQpylib.log'):
+ with patch('qpylib.qpylib.log'):
yield
# Mock out get_store_path to return encryption db in test dir, then delete after each test
@@ -46,20 +46,20 @@ def set_unset_qradar_app_uuid_env_var():
def test_encryption_raises_value_error_on_missing_name_and_user_fields():
with pytest.raises(ValueError) as ex:
Encryption({})
- assert "Encryption : name and user are mandatory fields!" == str(ex.value)
+ assert str(ex.value) == "Encryption : name and user are mandatory fields!"
with pytest.raises(ValueError) as ex:
Encryption({"name": "test_name"})
- assert "Encryption : name and user are mandatory fields!" == str(ex.value)
+ assert str(ex.value) == "Encryption : name and user are mandatory fields!"
with pytest.raises(ValueError) as ex:
Encryption({"user": "test_user"})
- assert "Encryption : name and user are mandatory fields!" == str(ex.value)
+ assert str(ex.value) == "Encryption : name and user are mandatory fields!"
def test_encryption_raises_value_error_on_missing_env_var():
with pytest.raises(KeyError) as ex:
Encryption({"name": "test_name", "user": "test_user"})
- assert "'Encryption : QRADAR_APP_UUID not available in environment'" == str(ex.value)
+ assert str(ex.value) == "'Encryption : QRADAR_APP_UUID not available in environment'"
def test_encrypt_creates_valid_config_on_start(set_unset_qradar_app_uuid_env_var, patch_get_store_path):
Encryption({"name": "test_name", "user": "test_user"})
@@ -91,7 +91,7 @@ def test_decrypt_raises_error_when_config_missing(set_unset_qradar_app_uuid_env_
enc = Encryption({"name": "test_name", "user": "test_user"})
with pytest.raises(ValueError) as ex:
enc.decrypt()
- assert "Encryption : no secret to decrypt" == str(ex.value)
+ assert str(ex.value) == "Encryption : no secret to decrypt"
def test_decrypt_returns_incorrect_plaintext_with_altered_salt(set_unset_qradar_app_uuid_env_var,
patch_get_store_path):
@@ -108,7 +108,7 @@ def test_decrypt_returns_incorrect_plaintext_with_altered_salt(set_unset_qradar_
assert enc.decrypt() != 'testing123'
def test_decrypt_raise_value_error_on_engine_version_mismatch(set_unset_qradar_app_uuid_env_var,
- patch_get_store_path):
+ patch_get_store_path):
enc = Encryption({"name": "test_name", "user": "test_user"})
enc_string = enc.encrypt('testing123')
assert enc_string != 'testing123'
@@ -138,7 +138,7 @@ def test_encrypt_decrypt_empty_string(set_unset_qradar_app_uuid_env_var,
assert dec_string == ''
def test_encrypt_decrypt_whitespace(set_unset_qradar_app_uuid_env_var,
- patch_get_store_path, repeatable_encrypt):
+ patch_get_store_path, repeatable_encrypt):
enc_string = repeatable_encrypt.encrypt(' \n \t ')
assert enc_string == 'ed292f3dd7a30a26774860370c5083b1'
dec_string = repeatable_encrypt.decrypt()
diff --git a/test/test_logging.py b/test/test_logging.py
new file mode 100644
index 0000000..0e51b0e
--- /dev/null
+++ b/test/test_logging.py
@@ -0,0 +1,187 @@
+# Copyright 2019 IBM Corporation All Rights Reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+#
+# pylint: disable=redefined-outer-name, unused-argument, invalid-name
+
+from unittest.mock import patch
+import logging
+import os
+import pytest
+from qpylib import qpylib, log_qpylib
+
+APP_FILE_LOG_FORMAT = '[{0}] - [APP_ID/1001][NOT:{1}] {2}'
+
+GET_MANIFEST_JSON = 'qpylib.app_qpylib.get_root_path'
+
+def manifest_path(manifest_file):
+ return os.path.join(os.path.dirname(__file__), 'manifests', manifest_file)
+
+# This fixture avoids reading app id from the manifest.
+# Setting default log level threshold is handled by separate fixtures.
+@pytest.fixture(scope='module', autouse=True)
+def bypass_manifest_lookup():
+ with patch('qpylib.app_qpylib.get_app_id') as mock_get_app_id:
+ mock_get_app_id.return_value = 1001
+ yield
+
+@pytest.fixture(scope='function')
+def info_threshold():
+ with patch('qpylib.log_qpylib.default_log_level') as mock_default_log_level:
+ mock_default_log_level.return_value = logging.INFO
+ yield
+
+@pytest.fixture(scope='function')
+def debug_threshold():
+ with patch('qpylib.log_qpylib.default_log_level') as mock_default_log_level:
+ mock_default_log_level.return_value = logging.DEBUG
+ yield
+
+@pytest.fixture(scope='function')
+def set_console_ip():
+ os.environ['QRADAR_CONSOLE_IP'] = '9.123.234.101'
+ yield
+ del os.environ['QRADAR_CONSOLE_IP']
+
+@pytest.fixture(scope='function', autouse=True)
+def reset_globals():
+ with patch('qpylib.log_qpylib.QLOGGER', 0):
+ with patch('qpylib.app_qpylib.Q_CACHED_MANIFEST', None):
+ yield
+
+# pylint: disable=protected-access
+def verify_log_file_content(log_path, expected_lines, not_expected_lines=[]): # pylint: disable=dangerous-default-value
+ with open(log_path) as log_file:
+ content = log_file.read()
+ for line in expected_lines:
+ assert APP_FILE_LOG_FORMAT.format(
+ line['level'], log_qpylib._map_notification_code(line['level']), line['text']) in content
+ for line in not_expected_lines:
+ assert APP_FILE_LOG_FORMAT.format(
+ line['level'], log_qpylib._map_notification_code(line['level']), line['text']) not in content
+
+def test_log_without_create_raises_error():
+ with pytest.raises(RuntimeError, match='You cannot use log before logging has been initialised'):
+ qpylib.log('hello')
+
+def test_create_without_console_ip_env_var_raises_error(info_threshold, tmpdir):
+ log_path = os.path.join(tmpdir.strpath, 'app.log')
+ with patch('qpylib.log_qpylib._log_file_location') as mock_log_location:
+ mock_log_location.return_value = log_path
+ with pytest.raises(KeyError, match='Environment variable QRADAR_CONSOLE_IP is not set'):
+ qpylib.create_log()
+
+@patch(GET_MANIFEST_JSON, return_value=manifest_path('installed.json'))
+def test_default_log_level_no_level_in_manifest(mock_manifest, set_console_ip):
+ assert log_qpylib.default_log_level() == logging.INFO
+
+@patch(GET_MANIFEST_JSON, return_value=manifest_path('loglevel.json'))
+def test_default_log_level_read_from_manifest(mock_manifest, set_console_ip):
+ assert log_qpylib.default_log_level() == logging.DEBUG
+
+def test_all_log_levels_with_manifest_info_threshold(set_console_ip, info_threshold, tmpdir):
+ log_path = os.path.join(tmpdir.strpath, 'app.log')
+ with patch('qpylib.log_qpylib._log_file_location') as mock_log_location:
+ mock_log_location.return_value = log_path
+ qpylib.create_log()
+ qpylib.log('hello debug', 'DEBUG')
+ qpylib.log('hello default info')
+ qpylib.log('hello info', 'INFO')
+ qpylib.log('hello warning', 'WARNING')
+ qpylib.log('hello error', 'ERROR')
+ qpylib.log('hello critical', 'CRITICAL')
+ verify_log_file_content(log_path, [
+ {'level': 'INFO', 'text': 'hello default info'},
+ {'level': 'INFO', 'text': 'hello info'},
+ {'level': 'WARNING', 'text': 'hello warning'},
+ {'level': 'ERROR', 'text': 'hello error'},
+ {'level': 'CRITICAL', 'text': 'hello critical'}],
+ not_expected_lines=[{'level': 'DEBUG', 'text': 'hello debug'}])
+
+def test_all_log_levels_with_manifest_debug_threshold(set_console_ip, debug_threshold, tmpdir):
+ log_path = os.path.join(tmpdir.strpath, 'app.log')
+ with patch('qpylib.log_qpylib._log_file_location') as mock_log_location:
+ mock_log_location.return_value = log_path
+ qpylib.create_log()
+ qpylib.log('hello debug', 'DEBUG')
+ qpylib.log('hello default info')
+ qpylib.log('hello info', 'INFO')
+ qpylib.log('hello warning', 'WARNING')
+ qpylib.log('hello error', 'ERROR')
+ qpylib.log('hello critical', 'CRITICAL')
+ verify_log_file_content(log_path, [
+ {'level': 'DEBUG', 'text': 'hello debug'},
+ {'level': 'INFO', 'text': 'hello default info'},
+ {'level': 'INFO', 'text': 'hello info'},
+ {'level': 'WARNING', 'text': 'hello warning'},
+ {'level': 'ERROR', 'text': 'hello error'},
+ {'level': 'CRITICAL', 'text': 'hello critical'}])
+
+def test_all_log_levels_with_set_debug_threshold(set_console_ip, info_threshold, tmpdir):
+ log_path = os.path.join(tmpdir.strpath, 'app.log')
+ with patch('qpylib.log_qpylib._log_file_location') as mock_log_location:
+ mock_log_location.return_value = log_path
+ qpylib.create_log()
+ qpylib.set_log_level('DEBUG')
+ qpylib.log('hello debug', 'DEBUG')
+ qpylib.log('hello default info')
+ qpylib.log('hello info', 'INFO')
+ qpylib.log('hello warning', 'WARNING')
+ qpylib.log('hello error', 'ERROR')
+ qpylib.log('hello critical', 'CRITICAL')
+ verify_log_file_content(log_path, [
+ {'level': 'DEBUG', 'text': 'hello debug'},
+ {'level': 'INFO', 'text': 'hello default info'},
+ {'level': 'INFO', 'text': 'hello info'},
+ {'level': 'WARNING', 'text': 'hello warning'},
+ {'level': 'ERROR', 'text': 'hello error'},
+ {'level': 'CRITICAL', 'text': 'hello critical'}])
+
+def test_all_log_levels_with_set_warning_threshold(set_console_ip, info_threshold, tmpdir):
+ log_path = os.path.join(tmpdir.strpath, 'app.log')
+ with patch('qpylib.log_qpylib._log_file_location') as mock_log_location:
+ mock_log_location.return_value = log_path
+ qpylib.create_log()
+ qpylib.set_log_level('WARNING')
+ qpylib.log('hello debug', 'DEBUG')
+ qpylib.log('hello default info')
+ qpylib.log('hello info', 'INFO')
+ qpylib.log('hello warning', 'WARNING')
+ qpylib.log('hello error', 'ERROR')
+ qpylib.log('hello critical', 'CRITICAL')
+ verify_log_file_content(log_path, [
+ {'level': 'WARNING', 'text': 'hello warning'},
+ {'level': 'ERROR', 'text': 'hello error'},
+ {'level': 'CRITICAL', 'text': 'hello critical'}],
+ not_expected_lines=[
+ {'level': 'DEBUG', 'text': 'hello debug'},
+ {'level': 'INFO', 'text': 'hello default info'},
+ {'level': 'INFO', 'text': 'hello info'}])
+
+def test_log_with_bad_level_uses_info(set_console_ip, info_threshold, tmpdir):
+ log_path = os.path.join(tmpdir.strpath, 'app.log')
+ with patch('qpylib.log_qpylib._log_file_location') as mock_log_location:
+ mock_log_location.return_value = log_path
+ qpylib.create_log()
+ qpylib.log('hello', 'BAD')
+ verify_log_file_content(log_path, [{'level': 'INFO', 'text': 'hello'}])
+
+def test_set_log_level_with_bad_level_uses_info(set_console_ip, debug_threshold, tmpdir):
+ log_path = os.path.join(tmpdir.strpath, 'app.log')
+ with patch('qpylib.log_qpylib._log_file_location') as mock_log_location:
+ mock_log_location.return_value = log_path
+ qpylib.create_log()
+ qpylib.set_log_level('BAD')
+ qpylib.log('hello debug', 'DEBUG')
+ qpylib.log('hello default info')
+ qpylib.log('hello info', 'INFO')
+ qpylib.log('hello warning', 'WARNING')
+ qpylib.log('hello error', 'ERROR')
+ qpylib.log('hello critical', 'CRITICAL')
+ verify_log_file_content(log_path, [
+ {'level': 'INFO', 'text': 'hello default info'},
+ {'level': 'INFO', 'text': 'hello info'},
+ {'level': 'WARNING', 'text': 'hello warning'},
+ {'level': 'ERROR', 'text': 'hello error'},
+ {'level': 'CRITICAL', 'text': 'hello critical'}],
+ not_expected_lines=[{'level': 'DEBUG', 'text': 'hello debug'}])
diff --git a/test/test_rest.py b/test/test_rest_live.py
similarity index 57%
rename from test/test_rest.py
rename to test/test_rest_live.py
index 0fed46a..375708e 100644
--- a/test/test_rest.py
+++ b/test/test_rest_live.py
@@ -2,24 +2,21 @@
#
# SPDX-License-Identifier: Apache-2.0
#
-# pylint: disable=redefined-outer-name, unused-argument
+# pylint: disable=redefined-outer-name, unused-argument, invalid-name
-from flask import Flask
-from unittest.mock import patch
import os
+from unittest.mock import patch
+from flask import Flask
import pytest
-from qpylib import qpylib
import responses
from werkzeug import http
+from qpylib import qpylib
@pytest.fixture(scope='module', autouse=True)
def pre_testing_setup():
- with patch('qpylib.abstract_qpylib.AbstractQpylib.log'):
- with patch('qpylib.live_qpylib.LiveQpylib._root_path') as mock_root_path:
- mock_root_path.return_value = os.path.dirname(__file__)
- with patch('qpylib.live_qpylib.LiveQpylib._get_manifest_location') as mock_get_manifest_location:
- mock_get_manifest_location.return_value = 'manifests/installed.json'
- yield
+ with patch('qpylib.app_qpylib.get_root_path') as mock_manifest:
+ mock_manifest.return_value = os.path.join(os.path.dirname(__file__), 'manifests', 'installed.json')
+ yield
@pytest.fixture()
def env_sec_admin_token():
@@ -29,17 +26,17 @@ def env_sec_admin_token():
@pytest.fixture()
def env_qradar_console_fqdn():
- os.environ['QRADAR_CONSOLE_FQDN'] = '9.101.234.169'
+ os.environ['QRADAR_CONSOLE_FQDN'] = 'myhost.ibm.com'
yield
del os.environ['QRADAR_CONSOLE_FQDN']
@responses.activate
def test_rest_uses_env_vars_when_set(env_sec_admin_token, env_qradar_console_fqdn):
- responses.add('GET', 'https://9.101.234.169/testing_endpoint', status=200)
- response = qpylib.REST('GET', 'testing_endpoint', verify='dummycert')
+ responses.add('GET', 'https://myhost.ibm.com/testing_endpoint', status=200)
+ response = qpylib.REST('GET', 'testing_endpoint', verify='dummycert', headers={'Host': '127.0.0.1'})
assert response.status_code == 200
assert responses.calls[0].request.method == 'GET'
- assert responses.calls[0].request.url == 'https://9.101.234.169/testing_endpoint'
+ assert responses.calls[0].request.url == 'https://myhost.ibm.com/testing_endpoint'
assert responses.calls[0].request.headers['SEC'] == '12345-testing-12345-testing'
@responses.activate
@@ -48,54 +45,51 @@ def test_rest_uses_sec_cookie_when_env_var_not_set(env_qradar_console_fqdn):
cookie = http.dump_cookie("SEC", test_sec_cookie_value)
app = Flask(__name__)
with app.test_request_context(headers={"COOKIE": cookie}):
- responses.add('GET', 'https://9.101.234.169/testing_endpoint', status=200)
- response = qpylib.REST('GET', 'testing_endpoint', verify='dummycert')
+ responses.add('GET', 'https://myhost.ibm.com/testing_endpoint', status=200)
+ response = qpylib.REST('GET', 'testing_endpoint', verify='dummycert', headers={'Host': '127.0.0.1'})
assert response.status_code == 200
assert responses.calls[0].request.method == 'GET'
- assert responses.calls[0].request.url == 'https://9.101.234.169/testing_endpoint'
+ assert responses.calls[0].request.url == 'https://myhost.ibm.com/testing_endpoint'
assert responses.calls[0].request.headers['SEC'] == test_sec_cookie_value
@responses.activate
-def test_rest_uses_manifest_console_ip_when_env_var_not_set():
- responses.add('GET', 'https://9.123.234.101/testing_endpoint', status=200)
- response = qpylib.REST('GET', 'testing_endpoint', verify='dummycert')
- assert response.status_code == 200
- assert responses.calls[0].request.method == 'GET'
- assert responses.calls[0].request.url == 'https://9.123.234.101/testing_endpoint'
+def test_rest_fails_when_fqdn_env_var_not_set():
+ with pytest.raises(KeyError, match='Environment variable QRADAR_CONSOLE_FQDN is not set'):
+ qpylib.REST('GET', 'testing_endpoint', verify='dummycert', headers={'Host': '127.0.0.1'})
@responses.activate
def test_rest_sets_version_header(env_qradar_console_fqdn):
- responses.add('GET', 'https://9.101.234.169/testing_endpoint', status=200)
- response = qpylib.REST('GET', 'testing_endpoint', verify='dummycert', version='12')
+ responses.add('GET', 'https://myhost.ibm.com/testing_endpoint', status=200)
+ response = qpylib.REST('GET', 'testing_endpoint', verify='dummycert', version='12', headers={'Host': '127.0.0.1'})
assert response.status_code == 200
assert responses.calls[0].request.method == 'GET'
- assert responses.calls[0].request.url == 'https://9.101.234.169/testing_endpoint'
+ assert responses.calls[0].request.url == 'https://myhost.ibm.com/testing_endpoint'
assert responses.calls[0].request.headers['Version'] == '12'
@responses.activate
def test_rest_allows_post(env_qradar_console_fqdn):
- responses.add('POST', 'https://9.101.234.169/testing_endpoint', status=201)
- response = qpylib.REST('POST', 'testing_endpoint', verify='dummycert')
+ responses.add('POST', 'https://myhost.ibm.com/testing_endpoint', status=201)
+ response = qpylib.REST('POST', 'testing_endpoint', verify='dummycert', headers={'Host': '127.0.0.1'})
assert response.status_code == 201
assert responses.calls[0].request.method == 'POST'
- assert responses.calls[0].request.url == 'https://9.101.234.169/testing_endpoint'
+ assert responses.calls[0].request.url == 'https://myhost.ibm.com/testing_endpoint'
@responses.activate
def test_rest_allows_put(env_qradar_console_fqdn):
- responses.add('PUT', 'https://9.101.234.169/testing_endpoint', status=201)
- response = qpylib.REST('PUT', 'testing_endpoint', verify='dummycert')
+ responses.add('PUT', 'https://myhost.ibm.com/testing_endpoint', status=201)
+ response = qpylib.REST('PUT', 'testing_endpoint', verify='dummycert', headers={'Host': '127.0.0.1'})
assert response.status_code == 201
assert responses.calls[0].request.method == 'PUT'
- assert responses.calls[0].request.url == 'https://9.101.234.169/testing_endpoint'
+ assert responses.calls[0].request.url == 'https://myhost.ibm.com/testing_endpoint'
@responses.activate
def test_rest_allows_delete(env_qradar_console_fqdn):
- responses.add('DELETE', 'https://9.101.234.169/testing_endpoint', status=204)
- response = qpylib.REST('DELETE', 'testing_endpoint', verify='dummycert')
+ responses.add('DELETE', 'https://myhost.ibm.com/testing_endpoint', status=204)
+ response = qpylib.REST('DELETE', 'testing_endpoint', verify='dummycert', headers={'Host': '127.0.0.1'})
assert response.status_code == 204
assert responses.calls[0].request.method == 'DELETE'
- assert responses.calls[0].request.url == 'https://9.101.234.169/testing_endpoint'
+ assert responses.calls[0].request.url == 'https://myhost.ibm.com/testing_endpoint'
def test_rest_rejects_unsupported_method(env_qradar_console_fqdn):
with pytest.raises(ValueError, match='Unsupported REST action was requested'):
- qpylib.REST('PATCH', 'testing_endpoint', verify='dummycert')
+ qpylib.REST('PATCH', 'testing_endpoint', verify='dummycert', headers={'Host': '127.0.0.1'})
diff --git a/test/test_rest_sdk.py b/test/test_rest_sdk.py
new file mode 100644
index 0000000..c17e468
--- /dev/null
+++ b/test/test_rest_sdk.py
@@ -0,0 +1,70 @@
+# Copyright 2019 IBM Corporation All Rights Reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+#
+# pylint: disable=redefined-outer-name, unused-argument, invalid-name
+
+import os
+import pytest
+import responses
+from qpylib import qpylib
+
+@pytest.fixture(scope='module', autouse=True)
+def pre_testing_setup():
+ os.environ['QRADAR_APPFW_SDK'] = 'true'
+ os.environ['SEC_ADMIN_TOKEN'] = '12345-testing-12345-testing'
+ yield pre_testing_setup
+ del os.environ['QRADAR_APPFW_SDK']
+ del os.environ['SEC_ADMIN_TOKEN']
+
+@pytest.fixture()
+def env_qradar_console_fqdn():
+ os.environ['QRADAR_CONSOLE_FQDN'] = 'myhost.ibm.com'
+ yield
+ del os.environ['QRADAR_CONSOLE_FQDN']
+
+@responses.activate
+def test_rest_uses_input_and_env_values(env_qradar_console_fqdn):
+ responses.add('GET', 'https://myhost.ibm.com/testing_endpoint',
+ json={'success': True}, status=200)
+
+ response = qpylib.REST('GET', 'testing_endpoint', version='12', headers={'Host': '127.0.0.1'})
+ assert response.status_code == 200
+ assert responses.calls[0].request.method == 'GET'
+ assert responses.calls[0].request.url == 'https://myhost.ibm.com/testing_endpoint'
+ assert responses.calls[0].request.headers['Version'] == '12'
+ assert responses.calls[0].request.headers['Host'] == '127.0.0.1'
+ assert responses.calls[0].request.headers['SEC'] == '12345-testing-12345-testing'
+
+@responses.activate
+def test_rest_fails_when_fqdn_env_var_not_set():
+ with pytest.raises(KeyError, match='Environment variable QRADAR_CONSOLE_FQDN is not set'):
+ qpylib.REST('GET', 'testing_endpoint', headers={'Host': '127.0.0.1'})
+
+@responses.activate
+def test_rest_allows_post(env_qradar_console_fqdn):
+ responses.add('POST', 'https://myhost.ibm.com/testing_endpoint', status=201)
+ response = qpylib.REST('POST', 'testing_endpoint', headers={'Host': '127.0.0.1'})
+ assert response.status_code == 201
+ assert responses.calls[0].request.method == 'POST'
+ assert responses.calls[0].request.url == 'https://myhost.ibm.com/testing_endpoint'
+
+@responses.activate
+def test_rest_allows_put(env_qradar_console_fqdn):
+ responses.add('PUT', 'https://myhost.ibm.com/testing_endpoint', status=201)
+ response = qpylib.REST('PUT', 'testing_endpoint', headers={'Host': '127.0.0.1'})
+ assert response.status_code == 201
+ assert responses.calls[0].request.method == 'PUT'
+ assert responses.calls[0].request.url == 'https://myhost.ibm.com/testing_endpoint'
+
+@responses.activate
+def test_rest_allows_delete(env_qradar_console_fqdn):
+ responses.add('DELETE', 'https://myhost.ibm.com/testing_endpoint', status=204)
+ response = qpylib.REST('DELETE', 'testing_endpoint', headers={'Host': '127.0.0.1'})
+ assert response.status_code == 204
+ assert responses.calls[0].request.method == 'DELETE'
+ assert responses.calls[0].request.url == 'https://myhost.ibm.com/testing_endpoint'
+
+def test_rest_rejects_unsupported_method(env_qradar_console_fqdn):
+ with pytest.raises(ValueError, match='Unsupported REST action was requested'):
+ qpylib.REST('PATCH', 'testing_endpoint', headers={'Host': '127.0.0.1'})
diff --git a/test/test_sdk_qpylib.py b/test/test_sdk_qpylib.py
deleted file mode 100644
index b31f11a..0000000
--- a/test/test_sdk_qpylib.py
+++ /dev/null
@@ -1,30 +0,0 @@
-# Copyright 2019 IBM Corporation All Rights Reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-#
-# pylint: disable=unused-argument
-
-from unittest.mock import patch
-import os
-import pytest
-from qpylib import qpylib
-import responses
-
-@pytest.fixture(scope='module', autouse=True)
-def pre_testing_setup():
- os.environ['QRADAR_APPFW_SDK'] = 'true'
- with patch('qpylib.abstract_qpylib.AbstractQpylib.log'):
- yield pre_testing_setup
- del os.environ['QRADAR_APPFW_SDK']
-
-@responses.activate
-@patch('qpylib.sdk_qpylib.SdkQpylib.get_console_address', return_value = '9.101.234.169')
-@patch('qpylib.sdk_qpylib.SdkQpylib._get_api_auth', return_value = ('testuser', 'testing123'))
-def test_rest_uses_input_values(mock_api_auth, mock_console_address):
- responses.add('GET', 'https://9.101.234.169/testing_endpoint',
- json={'success': True}, status=200)
-
- response = qpylib.REST('GET', 'testing_endpoint', verify='dummycert')
- assert responses.calls[0].request.url == 'https://9.101.234.169/testing_endpoint'
- assert 'Basic' in responses.calls[0].request.headers['Authorization']
- assert response.status_code == 200