blob: 35e59f823567e556c8cd3518190d79219e1b38b5 [file] [log] [blame]
"""
Copyright 2013 Appurify, Inc
All rights reserved
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
"""
import argparse
import json
import os
import sys
import time
import pprint
import inspect
import requests
from . import constants
from .utils import log, wget
from .api import *
class AppurifyClientError(Exception):
def __init__(self, message, exit_code=constants.EXIT_CODE_CLIENT_EXCEPTION):
super(AppurifyClientError, self).__init__(message)
self.exit_code = exit_code
class AppurifyClient(object):
def __init__(self, *args, **kwargs):
self.args = kwargs
self.access_token = self.args.get('access_token', None)
self.timeout = self.args.get('timeout_sec', None) or os.environ.get('APPURIFY_API_TIMEOUT', 10800)
if self.timeout is not None:
self.timeout = float(self.timeout)
self.poll_every = self.args.get('poll_every', None) or os.environ.get('APPURIFY_API_POLL_DELAY', constants.API_POLL_SEC)
self.test_type = self.args.get('test_type' or None)
self.device_type_id = self.args.get('device_type_id', None)
self.device_id = self.args.get('device_id', None)
disable_ssl_check = self.args.get('disable_ssl_check', False)
if disable_ssl_check:
self.verify_ssl = False
else:
self.verify_ssl = True
def refreshAccessToken(self):
if self.access_token is None:
api_key = self.args.get('api_key', None)
api_secret = self.args.get('api_secret',None)
if api_key is None or api_secret is None:
raise AppurifyClientError("Either access_token or api_key and api_secret are required parameters", exit_code=constants.EXIT_CODE_BAD_TEST)
log('generating access token...')
r = access_token_generate(api_key, api_secret)
if r.status_code == 200:
access_token = r.json()['response']['access_token']
log('access_token_generate success, access_token:%s' % access_token)
self.access_token = access_token
else:
raise AppurifyClientError('access_token_generate failed with response %s' % r.text, exit_code=constants.EXIT_CODE_AUTH_FAILURE)
return self.access_token
def checkDevice(self):
response_device_list = devices_list(self.access_token)
data_device_list = json.loads(response_device_list.text.replace("'","\""))
device_id_list =[]
for device in data_device_list["response"]:
device_id_list.append( device["device_type_id"])
if response_device_list.status_code == 200:
listOfDevices = self.device_type_id.split(',')
for d in listOfDevices:
if int(d) not in device_id_list :
raise AppurifyClientError("Current device list does not include device type: %s" % d, exit_code=constants.EXIT_CODE_DEVICE_NOT_FOUND)
def checkAppCompatibility(self, app_src):
response_device_list = devices_list(self.access_token)
data_device_list = json.loads(response_device_list.text.replace("'","\""))
reservingDevice = -1
for device in data_device_list["response"]:
paramListDevices = self.device_type_id.split(',')
for d in paramListDevices :
if int(d) == device["device_type_id"]:
reservingDevice = device
#verify app type works with OS type of device
if reservingDevice != -1 and int(response_device_list.status_code) == 200:
devicePlatform = reservingDevice["os_name"].lower()
appType = app_src[-3:]
if (appType == "ipa" and devicePlatform == "android") or (appType == "apk" and devicePlatform == "ios"):
raise AppurifyClientError("Must install .ipa on iOS device or .apk on android device. Mismatch: %s installing onto an %s device: %s" % (appType, devicePlatform, d), exit_code=constants.EXIT_CODE_APP_INCOMPATIBLE)
def uploadApp(self):
log('uploading app file...')
app_src_type = self.args.get('app_src_type', None)
app_src = self.args.get('app_src', None)
app_name = self.args.get('name', None)
webapp_url = self.args.get('url', None)
if app_src is None and self.test_type in constants.NO_APP_SOURCE:
if webapp_url is None and app_name is None:
log('WARNING: url for this webapp was not passed (using --url parameter). The results for this app will appear under unclassified_app_group in the UI.')
log(' To avoid this issue, please pass the url of the website under test using the --url parameter')
r = apps_upload(self.access_token, None, 'url', self.test_type, name=app_name, webapp_url=webapp_url)
else:
if app_src is None:
raise AppurifyClientError("app src is required for test type %s" % self.test_type, exit_code=constants.EXIT_CODE_BAD_TEST)
if app_src_type != 'url':
self.checkAppCompatibility(app_src)
with open(app_src, 'rb') as app_file_source:
r = apps_upload(self.access_token, app_file_source, app_src_type, app_src_type, app_name)
else:
r = apps_upload(self.access_token, app_src, app_src_type, app_src_type, app_name)
if r.status_code == 200:
app_id = r.json()['response']['app_id']
log('apps_upload success, app_id:%s' % app_id)
return app_id
else:
raise AppurifyClientError('apps_upload failed with response %s' % r.text, exit_code=constants.EXIT_CODE_BAD_TEST)
def uploadTest(self, app_id):
log('uploading test file...')
test_src_type = self.args.get('test_src_type', None)
test_src = self.args.get('test_src', None)
if not test_src and self.test_type not in constants.NO_TEST_SOURCE:
raise AppurifyClientError('test_type %s requires a test source' % self.test_type, exit_code=constants.EXIT_CODE_BAD_TEST)
if test_src:
if test_src_type != 'url':
with open(test_src, 'rb') as test_file_source:
r = tests_upload(self.access_token, test_file_source, test_src_type, self.test_type, app_id=app_id)
else:
r = tests_upload(self.access_token, test_src, test_src_type, self.test_type, app_id=app_id)
elif self.test_type in constants.NO_TEST_SOURCE:
r = tests_upload(self.access_token, None, 'url', self.test_type)
if r.status_code == 200:
test_id = r.json()['response']['test_id']
log('tests_upload success, test_id:%s' % test_id)
return test_id
else:
raise AppurifyClientError('tests_upload failed with response %s' % r.text, exit_code=constants.EXIT_CODE_OTHER_EXCEPTION)
def uploadConfig(self, test_id, config_src):
log('uploading config file...')
with open(config_src, 'rb') as config_src_file:
r = config_upload(self.access_token, config_src_file, test_id)
if r.status_code == 200:
log('config file upload success, test_id:%s' % test_id)
config_id = r.json()['response']['config_id']
return config_id
else:
raise AppurifyClientError('config file upload failed with response %s' % r.text, exit_code=constants.EXIT_CODE_BAD_TEST)
def runTest(self, app_id, test_id):
r = tests_run(self.access_token, self.device_type_id, app_id, test_id, self.device_id)
if r.status_code == 200:
test_response = r.json()['response']
test_run_id = test_response['test_run_id']
log('tests_run success scheduling test test_run_id:%s' % test_run_id)
try:
configs = [test_response['config']]
except:
try:
configs = map(lambda x: x['config'], test_response['test_runs'])
except:
configs = []
return (test_run_id, test_response['queue_timeout_limit'] if 'queue_timeout_limit' in test_response else self.timeout, configs)
else:
raise AppurifyClientError('runTest failed scheduling test with response %s' % r.text, exit_code=constants.EXIT_CODE_OTHER_EXCEPTION)
def abortTest(self, test_run_id, reason):
r = tests_abort(self.access_token, test_run_id, reason)
if r.status_code == 200:
response = r.json()['response']
if response['status'] == 'aborting':
log("aborting test run id %s" % test_run_id)
elif response['status'] == 'complete':
log("test run id %s is complete" % test_run_id)
return True
else:
False
def printConfigs(self, configs):
if configs:
found_config = False
print "== Test will run with the following device configurations =="
for config in configs:
if config:
found_config = True
print json.dumps(config, sort_keys=True,indent=4, separators=(',', ': '))
if not found_config:
print "Default"
print "== End device configurations =="
def pollTestResult(self, test_run_id, timeout_limit):
test_status = None
runtime = 0
while test_status != 'complete' and runtime < timeout_limit:
time.sleep(self.poll_every)
r = tests_check_result(self.access_token, test_run_id)
test_status_response = r.json()['response']
test_status = test_status_response['status']
if test_status == 'complete':
test_response = test_status_response['results']
log("**** COMPLETE - JSON SUMMARY FOLLOWS ****")
log(json.dumps(test_response))
log("**** COMPLETE - JSON SUMMARY ENDS ****")
return test_status_response
else:
log("%s sec elapsed (timeout in %s)" % (runtime, (timeout_limit - runtime)))
if 'message' in test_status_response:
log(test_status_response['message'])
log("Test progress: {}".format(test_status_response.get('detailed_status', 'status-unavailable')))
runtime = runtime + self.poll_every
raise AppurifyClientError("Test result poll timed out after %s seconds" % timeout_limit, exit_code=constants.EXIT_CODE_TEST_TIMEOUT)
@staticmethod
def download_test_response(results_url, result_dir, verify=True):
if not os.path.exists(result_dir):
log("Attempting to create directory %s" % result_dir)
os.makedirs(result_dir)
if result_dir:
result_path = result_dir + '/' + 'results.zip'
log("Saving results to %s" % result_path)
try_count = 1
status_code = 0
while try_count <= constants.MAX_DOWNLOAD_RETRIES and status_code != 200:
time.sleep(try_count)
status_code = wget(results_url, result_path, verify)
try_count = try_count + 1
if try_count > constants.MAX_DOWNLOAD_RETRIES:
log("Error downloading url %s, failed after 5 retries" % results_url)
def reportTestResult(self, test_status_response):
log("== reportTestResult ==")
log(json.dumps(test_status_response))
exit_code = constants.EXIT_CODE_ALL_PASS
test_response = test_status_response['results']
result_dir = self.args.get('result_dir', None)
if 'complete_count' in test_status_response:
response_pass = AppurifyClient.print_multi_test_responses(test_response)
if result_dir:
AppurifyClient.download_multi_test_response(test_response, result_dir, self.verify_ssl)
else:
response_pass = AppurifyClient.print_single_test_response(test_response)
test_response = [test_response] # make sure test response has the same format in both cases
if result_dir:
result_url = test_response[0]['url']
AppurifyClient.download_test_response(result_url, result_dir, self.verify_ssl)
detailed_status = test_status_response.get('detailed_status')
if detailed_status == "exception":
exit_code = self.getExceptionExitCode(test_response)
elif detailed_status == "timeout":
exit_code = constants.EXIT_CODE_TEST_TIMEOUT
else:
if not response_pass:
exit_code = constants.EXIT_CODE_TEST_FAILURE
return exit_code
def getExceptionExitCode(self, test_response):
exit_code = constants.EXIT_CODE_CLIENT_EXCEPTION
try:
for response in test_response:
exception = response.get("exception", False)
if exception:
exception_code = exception.split(":")[0]
for key in constants.EXIT_CODE_EXCEPTION_MAP:
try:
#if exception code cannot pasre into int, means server didn't send correctly.
if int(exception_code) in constants.EXIT_CODE_EXCEPTION_MAP[key]:
return key
except Exception:
exit_code = constants.EXIT_CODE_OTHER_EXCEPTION
finally:
return exit_code
@staticmethod
def print_single_test_response(test_response):
try:
for response_type in ['output', 'errors', 'exception', 'number_passes', 'number_fails']:
response_text = test_response[response_type] or None
response_text = None if type(response_text) in ('unicode', 'str') and response_text.strip() == '' else response_text
log("Test %s: %s" % (response_type, response_text))
response_pass = test_response['pass']
if response_pass:
log("All tests passed!")
else:
log("There were test failures")
results_url = test_response['url']
log("Detailed results url: %s" % results_url)
return response_pass
except Exception as e:
log("Error printing test results: %r" % e)
@staticmethod
def print_multi_test_responses(test_response):
response_pass = True
for result in test_response:
log("Device Type %s result:" % result['device_type'])
AppurifyClient.print_single_test_response(result["results"])
log("\n")
return response_pass
@staticmethod
def download_multi_test_response(self, test_response, result_dir, verify=True):
for result in test_response:
try:
result_url = result['results']['url']
device_type_id = result['device_type_id']
device_result_path = result_dir + "/device_type_%s" % device_type_id
AppurifyClient.download_test_response(result_url, device_result_path, verify)
except Exception as e:
log("Error downloading test response: %s" % e)
def main(self):
"""
See constants for return codes
"""
exit_code = 0
try:
self.refreshAccessToken()
if self.test_type is None:
raise AppurifyClientError("test_type is required")
self.checkDevice()
# upload app/test of use passed id's
app_id = self.args.get('app_id', None) or self.uploadApp()
test_id = self.args.get('test_id', None) or self.uploadTest(app_id)
config_src = self.args.get('config_src', False)
if config_src:
self.uploadConfig(test_id, config_src)
# start test run
test_run_id, queue_timeout_limit, configs = self.runTest(app_id, test_id)
self.printConfigs(configs)
self.timeout = self.timeout or queue_timeout_limit
# poll for results and print report
test_status_response = self.pollTestResult(test_run_id, self.timeout)
exit_code = self.reportTestResult(test_status_response)
except AppurifyClientError, e:
log(str(e))
exit_code = e.exit_code
except KeyboardInterrupt, e:
self.abortTest(test_run_id, repr(e))
log(str(e))
exit_code = constants.EXIT_CODE_TEST_ABORT
except requests.exceptions.RequestException, e:
log(str(e))
exit_code = constants.EXIT_CODE_CONNECTION_ERROR
except Exception, e:
log("%s : %s" % (sys.exc_traceback.tb_lineno , str(e)))
exit_code = constants.EXIT_CODE_CLIENT_EXCEPTION
log('done with exit code %s' % exit_code)
return exit_code
@staticmethod
def execute(action, kwargs, required):
"""Execute a particular action and prints received response."""
os.environ['APPURIFY_API_RETRY_ON_FAILURE'] = '0' #disable retries
pp = pprint.PrettyPrinter(indent=4)
r = globals()[action](**{k : v for k,v in kwargs.iteritems() if k in required})
pp.pprint(r.json())
return 0 if r.status_code == 200 else 1
@staticmethod
def cli():
parser = argparse.ArgumentParser(
description='Appurify developer REST API client v%s' % constants.__version__,
epilog='Having difficulty using %s? Report at: %s/issues/new or email us at %s' % (constants.__description__, constants.__repourl__, constants.__contact__)
)
parser.add_argument('--api-key', help='Appurify developer key')
parser.add_argument('--api-secret', help='Appurify developer secret')
parser.add_argument('--access-token-tag', action='append', help='colon separated key:value tag for access_token to be generated')
parser.add_argument('--access-token', help='Use an existing access token instead of generating a new one')
parser.add_argument('--app-src', help='Path or Url of app file to upload')
parser.add_argument('--app-id', help='Specify to use previously uploaded app file')
parser.add_argument('--test-src', help='Path or Url of test file to upload')
parser.add_argument('--test-type', help='Type of test being uploaded')
parser.add_argument('--test-id', help='Specify to use previously uploaded test file')
parser.add_argument('--test-run-id', help='Specify test run id (required only while running tests_check_result action)')
parser.add_argument('--device-type-id', help='Device type to reserve and run tests upon (you may run tests on multiple devices by using a comma separated list of device IDs)')
parser.add_argument('--device-id', help='Specify to use a particular device')
parser.add_argument('--config-src', help='Path of additional configuration to add to test')
parser.add_argument('--result-dir', help='Path to save downloaded results to')
parser.add_argument('--action', help='Specific API to call (default: main)')
parser.add_argument('--name', help='Optional, the name of the app to display')
parser.add_argument('--url', help='If the app being tested is a web application, url of the web application')
parser.add_argument('--disable-ssl-check', help="Optional, if set, don't verify SSL certificates (e.g. if you're using self-signed certificates)", action="store_true")
parser.add_argument('--timeout', help='Optional, timeout in seconds before the client assumes the test has failed. Defaults to server side timeout value (~ 6 hours)')
parser.add_argument('--version', help='Print client version and exit', action='store_true')
kwargs = {}
args = parser.parse_args()
if args.version:
print(constants.__version__)
sys.exit(0)
# (optional) when 'main' is the requested action
# (required) when 'devices_config' is the requested action
kwargs['device_id'] = args.device_id
# (required) access_token || api_key && api_secret
# (optional) access_token_tag
if args.access_token == None and (args.api_key == None or args.api_secret == None):
parser.error('--access-token OR --api-key and --api-secret is required')
kwargs['test_run_id'] = args.test_run_id
kwargs['api_key'] = args.api_key
kwargs['api_secret'] = args.api_secret
kwargs['access_token'] = args.access_token
kwargs['access_token_tag'] = args.access_token_tag
kwargs['disable_ssl_check'] = args.disable_ssl_check
# (optional)
if args.action:
if args.action in constants.ENABLED_ACTIONS:
argspec = inspect.getargspec(globals()[args.action])
required = argspec[0] if not argspec[3] else argspec[0][: -1 * len(argspec[3])]
for k in required:
if not k in kwargs or not kwargs[k]:
parser.error('"%s" action requires following parameters: %s. "%s" not found.' % (args.action, ", ".join(required), k))
sys.exit(AppurifyClient.execute(args.action, kwargs, required))
else:
parser.error('"%s" action is not supported. Available options are: %s' % (args.action, ", ".join(constants.ENABLED_ACTIONS)))
# (required) app_id || app_src
# (optional) app_test_type
# (calculated) app_src_type
if args.app_id is None and args.app_src is None and args.test_type not in constants.NO_APP_SOURCE:
parser.error('--app-id OR --app-src is required')
kwargs['app_id'] = args.app_id
kwargs['app_src'] = args.app_src
if args.app_src:
if args.app_src[0:4] == 'http':
kwargs['app_src_type'] = 'url'
else:
try:
with open(args.app_src) as _: pass
kwargs['app_src_type'] = 'raw'
except:
parser.error('--app-src %s could not be found' % args.app_src)
# (required) test_id || test_src && test_type
if args.test_id == None and (args.test_src == None or args.test_type == None) and args.test_type not in constants.NO_TEST_SOURCE:
parser.error('--test-id OR --test-src and --test-type is required')
kwargs['test_id'] = args.test_id
kwargs['test_type'] = args.test_type
kwargs['test_src'] = args.test_src
if args.test_type not in constants.SUPPORTED_TEST_TYPES:
parser.error('--test-type must be one of the following: %s' % ', '.join(constants.SUPPORTED_TEST_TYPES))
# (calculated) test_src_type
if args.test_src:
if args.test_src[0:4] == 'http':
kwargs['test_src_type'] = 'url'
else:
try:
with open(args.test_src) as _: pass
kwargs['test_src_type'] = 'raw'
except:
parser.error('--test-src %s could not be found' % args.test_src)
# (optional) config_src
if args.config_src != None:
kwargs['config_src'] = args.config_src
# (required) device_type_id
kwargs['device_type_id'] = args.device_type_id
# (optional) result_dir
kwargs['result_dir'] = args.result_dir
# (optional) app name
kwargs['name'] = args.name
kwargs['url'] = args.url
# (optional) timeout
try:
kwargs['timeout_sec'] = int(args.timeout)
except:
pass
client = AppurifyClient(**kwargs)
sys.exit(client.main())
def init():
AppurifyClient.cli()
if __name__ == '__main__':
init()