#!/usr/bin/env python
from flask import Flask
import sys
import urllib2
import urllib
import base64
import traceback
import json
import httplib
import argparse
import logging
import getpass
import os
import re
app = Flask(__name__)
jenkinsjobname = 'AP-KUMO_3.4.0.1_T300'
DEFAULT_JENKINS_SERVER = 'http://172.16.200.63:8080/'
JENKINS_ROOT_VIEW = DEFAULT_JENKINS_SERVER + 'view/ALL/'
INFO = 'api/json'
JOB_INFO = 'job/%(name)s/api/json?depth=0'
SCRIPT_NAME = os.path.basename(__file__)
SCRIPT_OBJECTIVE = '''
Purpose of the script: Find view associated with a job
'''
def auth_headers(username, password):
'''
Simple implementation of HTTP Basic Authentication. Returns the 'Authentication' header value. Required for authenticated site access
'''
return 'Basic ' + base64.encodestring('%s:%s' % (username, password))[:-1]
def login(username, password): # Prompt for password
if (username is not None and username != '') and (password is None or password == ''):
return (username, getpass.getpass("\nEnter your ({0}) jenkins password: ".format(username)))
else:
return (username, password)
class JenkinsException(Exception): pass
class Jenkins(object):
def __init__(self, url, username=None, password=None):
self.server = self.fix_server_url(url)
if username is not None and password is not None:
self.auth = auth_headers(username, password)
else:
self.auth = None
def fix_server_url(self, urls): # urls may be comma separated server, view, job, or build urls
url = urls.split(',')[0]
if url[-1] != '/':
url = url + '/'
return url
def fix_urls(self, urls): # urls may be comma separated server, view, job, or build urls
url_str = ''
for url in urls.split(','):
if url[-1] != '/':
url_str += url + '/,'
else:
url_str += url + ','
return url_str.rstrip(',')
def jenkins_open(self, req): # req = HTTP request using urllib2
try:
if self.auth:
req.add_header('Authorization', self.auth)
return urllib2.urlopen(req).read()
except urllib2.HTTPError, e:
# Jenkins's funky authentication means its highly impossible to distinguish errors.
if e.code in [401, 403, 500]:
raise JenkinsException('Error in request. Possibly authentication failed [%s]'%(e.code))
def get_info(self, url = None):
if url is None:
url = self.server
try:
return json.loads(self.jenkins_open(urllib2.Request(url + INFO)))
except urllib2.HTTPError:
raise JenkinsException("Error communicating with server[%s]"%url)
except httplib.BadStatusLine:
raise JenkinsException("Error communicating with server[%s]"%url)
except ValueError:
raise JenkinsException("Could not parse JSON info for server[%s]"%url)
except:
raise JenkinsException("Possibly wrong url[%s]"%url)
@staticmethod
def valid_view(info_str): # checks if the given url is a view url and has associated jobs
if 'description' in info_str and 'jobs' in info_str and (len(info_str['jobs']) > 0):
return True
else:
return False
@staticmethod
def valid_job(info_str): # checks if the given url is a job url
if 'builds' in info_str and (len(info_str['builds']) > 0):
return True
else:
return False
def get_views_list(self, jenkins_job_url):
jenkins_info = self.get_info(jenkins_job_url)
urls_list = [DEFAULT_JENKINS_SERVER]
job_views_list = []
if self.valid_job(jenkins_info):
#check nested urls
while len(urls_list) > 0:
urls_new = []
for url in urls_list:
jenkins_info = self.get_info(url)
if ('views' in jenkins_info):
for view in jenkins_info['views']:
#views_list.append(view['url'])
urls_new.append(view['url'])
urls_list = urls_new
else:
if not '/view/ALL/' in url: # Do not consider the root view 'ALL' because it consists of all the jobs
job_views_list.append(url)
if len(urls_new) == 0:
urls_list = []
return job_views_list
def get_jobs_list_in_view(self, view_name):
jobs_list = []
jenkins_jobs = self.get_info(view_name)
if 'jobs' in jenkins_jobs:
jobs_list = jenkins_jobs['jobs']
return jobs_list
def find_view(self, job_name, job_url):
views_list = self.get_views_list(job_url)
job_view = ''
for view in views_list:
jobs_list = self.get_jobs_list_in_view(view)
if job_name in str(jobs_list):
job_view = view
break
return job_view
@app.route("/")
def any_function():
global JENKINS_ROOT_VIEW, jenkinsjobname
joburl = JENKINS_ROOT_VIEW + 'job/' + jenkinsjobname + '/'
jenkins = Jenkins(joburl, None, None)
#print '\nSearching the view for the job: ' + jenkinsjobname + '\nIt may take few minutes, please wait ... ... ...'
str = jenkins.find_view(jenkinsjobname, joburl)
return str
app.run(debug=True, port = 5001)
No comments:
Post a Comment