Tuesday, April 19, 2016

python script to find view for a given job



#!/usr/bin/env python
# coding: utf-8
import sys
import urllib2
import urllib
import base64
import traceback
import json
import httplib
import argparse
import logging
import getpass
import os
import re

DEFAULT_JENKINS_SERVER = 'http://172.16.200.63:8080/'
JENKINS_ROOT_VIEW = DEFAULT_JENKINS_SERVER + 'view/ALL/'

INFO         = 'api/json'
JOB_INFO     = 'job/%(name)s/api/json?depth=0'


SCRIPT_NAME = os.path.basename(__file__)
SCRIPT_OBJECTIVE = '''
Purpose of the script: Find view associated with a job
'''

class CommandLine:
    """Handles parsing the commandline parameters"""
    def __init__(self):
        self.parser = argparse.ArgumentParser(
            formatter_class=argparse.RawTextHelpFormatter,
            description=SCRIPT_OBJECTIVE,
            add_help=False,
            epilog='''
EXAMPLE USAGE:
 %(executable)s
                        accepts jenkins view urls separated by comma\n
 %(executable)s %(view_urls)s
            ''' % {'executable': SCRIPT_NAME, 'view_urls': 'http://localhost:8080/view/view1,http://localhost:8080/view/view2'})

        class HelpAction(argparse._HelpAction):
            def __call__(self, parser, namespace, values, option_string=None):
                print(parser.format_help()
                .replace('usage:', 'USAGE:')
                .replace('positional arguments:', 'POSITIONAL ARGUMENTS:')
                .replace('optional arguments:', 'OPTIONAL ARGUMENTS:'))
                parser.exit()

        self.parser.add_argument('jenkinsjobname', help = 'Jenkins jobname')
        self.parser.add_argument('-jenkinsurl', help = 'Jenkins buil/view/server url for status check')
        self.parser.add_argument('-d', '--debug', action='store_const', const=True, help="print debugging information\n\n")
        self.parser.add_argument('-h', '--help', action=HelpAction, help='show this help message and exit\n\n')

    def parse_args(self):
        # Message display when there is no argument on command line
        if len(sys.argv) < 2:
            print "Please run the script with argument -h for help"
            exit(0)

        args = self.parser.parse_args()
        if args.debug:
            logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)

        return args

    def help_blurb(self):
        return 'Try \'%s --help\' for more information.' % os.path.basename(sys.argv[0])


args = CommandLine().parse_args()

#sys.exit(0)



def auth_headers(username, password):
    '''
    Simple implementation of HTTP Basic Authentication. Returns the 'Authentication' header value. Required for authenticated site access
    '''
    return 'Basic ' + base64.encodestring('%s:%s' % (username, password))[:-1]

def login(username, password): # Prompt for password
    if (username is not None and username != '') and (password is None or password == ''):
        return (username, getpass.getpass("\nEnter your ({0}) jenkins password: ".format(username)))
    else:
        return (username, password)

class JenkinsException(Exception): pass

class Jenkins(object):
    def __init__(self, url, username=None, password=None):
        self.server = self.fix_server_url(url)
        if username is not None and password is not None:
            self.auth = auth_headers(username, password)
        else:
            self.auth = None

    def fix_server_url(self, urls): # urls may be comma separated server, view, job, or build urls
        url = urls.split(',')[0]
        if url[-1] != '/':
            url = url + '/'
        return url

    def fix_urls(self, urls): # urls may be comma separated server, view, job, or build urls
        url_str = ''
        for url in urls.split(','):
            if url[-1] != '/':
                url_str += url + '/,'
            else:
                url_str += url + ','

        return url_str.rstrip(',')

    def jenkins_open(self, req): # req = HTTP request using urllib2
        try:
            if self.auth:
                req.add_header('Authorization', self.auth)
            return urllib2.urlopen(req).read()
        except urllib2.HTTPError, e:
            # Jenkins's funky authentication means its highly impossible to distinguish errors.
            if e.code in [401, 403, 500]:
                raise JenkinsException('Error in request. Possibly authentication failed [%s]'%(e.code))

    def get_info(self, url = None):
        if url is None:
            url = self.server
        try:
            return json.loads(self.jenkins_open(urllib2.Request(url + INFO)))
        except urllib2.HTTPError:
            raise JenkinsException("Error communicating with server[%s]"%url)
        except httplib.BadStatusLine:
            raise JenkinsException("Error communicating with server[%s]"%url)
        except ValueError:
            raise JenkinsException("Could not parse JSON info for server[%s]"%url)
        except:
            raise JenkinsException("Possibly wrong url[%s]"%url)


    @staticmethod
    def valid_view(info_str): # checks if the given url is a view url and has associated jobs
        if 'description' in info_str and 'jobs' in info_str and (len(info_str['jobs']) > 0):
            return True
        else:
            return False

    @staticmethod
    def valid_job(info_str): # checks if the given url is a job url
        if 'builds' in info_str and (len(info_str['builds']) > 0):
            return True
        else:
            return False


    def get_views_list(self, jenkins_job_url):
        jenkins_info = self.get_info(jenkins_job_url)
        urls_list = [DEFAULT_JENKINS_SERVER]
        job_views_list = []

        if self.valid_job(jenkins_info):
            #check nested urls
            while len(urls_list) > 0:
                urls_new = []
                for url in urls_list:
                    jenkins_info = self.get_info(url)
                    if ('views' in jenkins_info):
                        for view in jenkins_info['views']:
                            #views_list.append(view['url'])
                            urls_new.append(view['url'])
                        urls_list = urls_new
                    else:
                        if not '/view/ALL/' in url: # Do not consider the root view 'ALL' because it consists of all the jobs
                            job_views_list.append(url)

                if len(urls_new) == 0:
                    urls_list = []
        return job_views_list

    def get_jobs_list_in_view(self, view_name):
        jobs_list = []
        jenkins_jobs = self.get_info(view_name)
        if 'jobs' in jenkins_jobs:
            jobs_list = jenkins_jobs['jobs']
        return jobs_list  
       
    def find_view(self, job_name, job_url):
        views_list = self.get_views_list(job_url)
        job_view = ''
       
        for view in views_list:
            jobs_list = self.get_jobs_list_in_view(view)
            if job_name in str(jobs_list):
                job_view = view
                break
        return job_view



def main():
    #username, password = login(args.username, args.password)
    joburl = JENKINS_ROOT_VIEW + 'job/' + args.jenkinsjobname + '/'
    jenkins = Jenkins(joburl, None, None)
    print '\nSearching the view for the job: ' + args.jenkinsjobname + '\nIt may take few minutes, please wait ... ... ...'
    print jenkins.find_view(args.jenkinsjobname, joburl)

    print '\nDone\n'

if __name__ == '__main__':
    main()

No comments:

Post a Comment