http://mixedtechnical-gopalsingh.blogspot.com/2016/04/jenkins-email-configuration-with-yahoo.html
Thursday, April 28, 2016
Wednesday, April 27, 2016
Monday, April 25, 2016
multiple shell commands in same process
http://stackoverflow.com/questions/3172470/actual-meaning-of-shell-true-in-subprocess
# Working multiple command in one process
# Write multiple commands with p.write: all will be executed in the same process by: p.close()
def test_bash():
bash_path = runcmd('which bash').strip() # strip() used to trim line feed
p = os.popen('%s' % bash_path, 'w')
p.write('%s\n' % 'ls')
p.write('%s\n' % 'ls -al')
p.write('%s\n' % 'export hihi=\"no hi hi\"')
p.write('%s\n' % 'echo =====' )
p.write('%s\n' % 'echo $hihi')
status = p.close()
if status is None:
print 'Commands executed OK'
else:
print 'One or more command failed to run'
# Working multiple command in one process
# Write multiple commands with p.write: all will be executed in the same process by: p.close()
def test_bash():
bash_path = runcmd('which bash').strip() # strip() used to trim line feed
p = os.popen('%s' % bash_path, 'w')
p.write('%s\n' % 'ls')
p.write('%s\n' % 'ls -al')
p.write('%s\n' % 'export hihi=\"no hi hi\"')
p.write('%s\n' % 'echo =====' )
p.write('%s\n' % 'echo $hihi')
status = p.close()
if status is None:
print 'Commands executed OK'
else:
print 'One or more command failed to run'
Friday, April 22, 2016
Wednesday, April 20, 2016
create perforce user
https://www.perforce.com/blog/090224/using-p4-user-o-and-i-automate-creating-new-users
https://www.perforce.com/perforce/doc.current/manuals/p4script/03_python.html
http://stackoverflow.com/questions/14522682/execute-perforce-commands-through-python
http://stackoverflow.com/questions/184187/how-do-i-check-out-a-file-from-perforce-in-python
http://stackoverflow.com/questions/589093/python-persistent-popen
http://stackoverflow.com/questions/28318673/login-to-perforce-from-commandline
http://community.scmgalaxy.com/blog/view/15512/ways-to-pass-password-in-triggers-file-or-script-file-in-perforce
http://forums.perforce.com/index.php?/topic/1412-addremove-users-from-group/
https://www.perforce.com/perforce/doc.current/manuals/p4sag/chapter.superuser.html
https://www.perforce.com/perforce/doc.current/manuals/cmdref/p4_passwd.html
#!/usr/bin/env python
# coding: utf-8
import re
import subprocess
import shlex
import os
import argparse, sys
# usage: ./p4_create_user.py user_id manager_id group_id location_id : 4 required arguments
script_objective = '''
Purpose of the script:
Create Perforce user and send email to the user, user's manager, and release engineering group.
'''
# Message display when there are not enough arguments on command line
if len(sys.argv) < 2:
print "Please run the script with argument -h for help"
exit(0)
# argument parsing:
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter,
add_help=True, description = '', epilog = script_objective)
parser.add_argument('user_id', help = 'user_id for the new perforce user')
parser.add_argument('manager_id', help = 'email_id of user\'s manager, without domain name similar format as user_id')
parser.add_argument('group_id', help = 'perforce group id to which the user_id should be member of')
parser.add_argument('location_id', help = 'one of the location ids hq, bdc, sdc, tdc')
args = parser.parse_args()
user_id = args.user_id
manager_id = args.manager_id
group_id = args.group_id
location_id = args.location_id
email_domain = '@ruckuswireless.com'
email_from = 'perforce-noreply' + email_domain
email_to = user_id + email_domain
email_cc = manager_id + email_domain + ',releng-team' + email_domain
email_bcc = ''
subject = 'Perforce account for ' + user_id
server_proxy_map = {'hq':'perforce:1666', 'bdc':'bdc-p4.video54.local:1999', 'sdc':'new-sdc-p4.video54.local:1666', 'tdc':'tdc-p4.video54.local:1666'}
# perforce groups on 4/21/2016
perforce_groups = ['Qubercomm', 'Qubercomm', 'Tools', 'admin', 'automation', 'bdc', 'bdc-ap-contractors', \
'cdc', 'cdc_simtools', 'espp-contractor', 'espp-employee', 'firmware.txt', 'idc', 'legal', \
'odc', 'odc_automation', 'p4users', 'pkiteam', 'release-leads', 'sdc', 'sdc_fm', 'service_users', \
'small-cell', 'sz34_integ', 'sz35_integ', 'sz35_platform', 'tpmteam', 'xclaim-contractor']
p4_server_name = server_proxy_map[location_id]
account_msg = '''
Dear ''' + user_id + ',' + '''
Welcome to Ruckus Family! We at Ruckus use Perforce as Source Code Management system and here is your account credential details:
Server – ''' + p4_server_name + '''
User name – ''' + user_id + '''
Password - Welcome2Ruck
Please download and install client application of your need (P4V: Visual Client and/or P4: Command-Line Client) from http://www.perforce.com/downloads/latest-components#clients
Feel free to contact us in case you need further guidance regarding Perforce once you go through some of the study materials.
You can also refer in-house information from https://jira-wiki.ruckuswireless.com/display/SCM/Getting+Started+with+Perforce
'''
def runcmd(cmd):
cmd = shlex.split(cmd)
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, error = proc.communicate()
return out + error
except:
print 'Invalid Command: ' + str(cmd)
def set_password(user_id, password):
bash_path = runcmd('which bash').strip() # strip() used to trim line feed
p = os.popen('%s' % bash_path, 'w')
cmd = 'p4 passwd -O \"\" -P ' + password + ' ' + user_id
p.write('%s\n' % cmd)
status = p.close()
if status is None:
print 'Commands executed OK'
else:
print 'One or more command failed to run'
return status
# add to group
def add_to_group(user_id, group_id):
bash_path = runcmd('which bash').strip() # strip() used to trim line feed
p = os.popen('%s' % bash_path, 'w')
p.write('%s\n' % 'p4 -u p4build login')
cmd = '(p4 group -o ' + group_id + '; echo \" ' + user_id + '\")|p4 group -i'
p.write('%s\n' % cmd)
status = p.close()
if status is None:
print 'Commands executed OK'
else:
print 'One or more command failed to run'
return status
# add user
def add_user(user_id, email_domain):
bash_path = runcmd('which bash').strip() # strip() used to trim line feed
p = os.popen('%s' % bash_path, 'w')
p.write('%s\n' % 'p4 -u p4build login')
p.write('%s\n' % 'p4 user -i -f')
p.write('%s\n' % ('User: ' + user_id))
p.write('%s\n' % ('Email: ' + user_id + email_domain))
p.write('%s\n' % ('FullName: ' + user_id))
status = p.close()
if status is None:
print 'Commands executed OK'
else:
print 'One or more command failed to run'
return status
def verify_email_address(receivers):
error_message = 'One or more email address is invalid'
valid_emails = []
invalid_emails = []
for email_address in (''.join(receivers.split())).split(','): # Taken care off white spaces in the specified emails list
match = re.match('^[_a-z0-9-\.]+(\.[_a-z0-9-]+)*@[a-z0-9-]+(\.[a-z0-9-]+)*(\.[a-z]{2,4})$', email_address)
if match == None:
invalid_emails += [email_address]
else:
valid_emails += [email_address]
if len(invalid_emails) > 0:
print(error_message)
raise ValueError(error_message)
return valid_emails
def send_email(email_from, email_to, email_subject, email_message, email_cc='', email_bcc=''):
sendmail_location = runcmd('which sendmail').strip() # strip() used to trim line feed
p = os.popen('%s -t' % sendmail_location, 'w')
p.write('From: %s\n' % email_from)
p.write('To: %s\n' % email_to)
if email_cc is not '':
p.write('Cc: %s\n' % email_cc)
if email_bcc is not '':
p.write('Bcc: %s\n' % email_bcc)
p.write('Subject: %s\n' % email_subject)
p.write('\n') # blank line separating headers from body
p.write(email_message)
status = p.close()
if status is None: status = 'OK'
print "Sendmail exit status: ", str(status)
return status
def send_verified_email():
global email_from, email_to, email_cc, email_bcc, account_msg, subject
status = None
# send email
valid_emails_to = ','.join(verify_email_address(email_to)) # sendmail does not need list
# Validate CC and BCC addresses
if email_cc is not '':
valid_emails_cc = ','.join(verify_email_address(email_cc))
else:
valid_emails_cc = ''
if email_bcc is not '':
valid_emails_bcc = ','.join(verify_email_address(email_bcc))
else:
valid_emails_bcc = ''
if len(valid_emails_to) > 0:
send_email(email_from, valid_emails_to, subject, account_msg, email_cc=valid_emails_cc, email_bcc=valid_emails_bcc)
else:
print 'No valid email specified'
status = 'Error'
def complete_p4_user_create():
global user_id, email_domain_name, group_id
status = add_user(user_id, email_domain)
if status is None:
status = set_password(user_id, 'Welcome2Ruck')
if status is None:
status = add_to_group(user_id, group_id)
if status is None:
status = send_verified_email()
return status
def main():
status = complete_p4_user_create()
if status is None:
print '\nPerforce user created and email sent successfully.'
else:
print '\nError in creation Perforce user and/or sending email.'
if __name__ == '__main__':
main()
https://www.perforce.com/perforce/doc.current/manuals/p4script/03_python.html
http://stackoverflow.com/questions/14522682/execute-perforce-commands-through-python
http://stackoverflow.com/questions/184187/how-do-i-check-out-a-file-from-perforce-in-python
http://stackoverflow.com/questions/589093/python-persistent-popen
http://stackoverflow.com/questions/28318673/login-to-perforce-from-commandline
http://community.scmgalaxy.com/blog/view/15512/ways-to-pass-password-in-triggers-file-or-script-file-in-perforce
http://forums.perforce.com/index.php?/topic/1412-addremove-users-from-group/
https://www.perforce.com/perforce/doc.current/manuals/p4sag/chapter.superuser.html
https://www.perforce.com/perforce/doc.current/manuals/cmdref/p4_passwd.html
#!/usr/bin/env python
# coding: utf-8
import re
import subprocess
import shlex
import os
import argparse, sys
# usage: ./p4_create_user.py user_id manager_id group_id location_id : 4 required arguments
script_objective = '''
Purpose of the script:
Create Perforce user and send email to the user, user's manager, and release engineering group.
'''
# Message display when there are not enough arguments on command line
if len(sys.argv) < 2:
print "Please run the script with argument -h for help"
exit(0)
# argument parsing:
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter,
add_help=True, description = '', epilog = script_objective)
parser.add_argument('user_id', help = 'user_id for the new perforce user')
parser.add_argument('manager_id', help = 'email_id of user\'s manager, without domain name similar format as user_id')
parser.add_argument('group_id', help = 'perforce group id to which the user_id should be member of')
parser.add_argument('location_id', help = 'one of the location ids hq, bdc, sdc, tdc')
args = parser.parse_args()
user_id = args.user_id
manager_id = args.manager_id
group_id = args.group_id
location_id = args.location_id
email_domain = '@ruckuswireless.com'
email_from = 'perforce-noreply' + email_domain
email_to = user_id + email_domain
email_cc = manager_id + email_domain + ',releng-team' + email_domain
email_bcc = ''
subject = 'Perforce account for ' + user_id
server_proxy_map = {'hq':'perforce:1666', 'bdc':'bdc-p4.video54.local:1999', 'sdc':'new-sdc-p4.video54.local:1666', 'tdc':'tdc-p4.video54.local:1666'}
# perforce groups on 4/21/2016
perforce_groups = ['Qubercomm', 'Qubercomm', 'Tools', 'admin', 'automation', 'bdc', 'bdc-ap-contractors', \
'cdc', 'cdc_simtools', 'espp-contractor', 'espp-employee', 'firmware.txt', 'idc', 'legal', \
'odc', 'odc_automation', 'p4users', 'pkiteam', 'release-leads', 'sdc', 'sdc_fm', 'service_users', \
'small-cell', 'sz34_integ', 'sz35_integ', 'sz35_platform', 'tpmteam', 'xclaim-contractor']
p4_server_name = server_proxy_map[location_id]
account_msg = '''
Dear ''' + user_id + ',' + '''
Welcome to Ruckus Family! We at Ruckus use Perforce as Source Code Management system and here is your account credential details:
Server – ''' + p4_server_name + '''
User name – ''' + user_id + '''
Password - Welcome2Ruck
Please download and install client application of your need (P4V: Visual Client and/or P4: Command-Line Client) from http://www.perforce.com/downloads/latest-components#clients
Feel free to contact us in case you need further guidance regarding Perforce once you go through some of the study materials.
You can also refer in-house information from https://jira-wiki.ruckuswireless.com/display/SCM/Getting+Started+with+Perforce
'''
def runcmd(cmd):
cmd = shlex.split(cmd)
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, error = proc.communicate()
return out + error
except:
print 'Invalid Command: ' + str(cmd)
def set_password(user_id, password):
bash_path = runcmd('which bash').strip() # strip() used to trim line feed
p = os.popen('%s' % bash_path, 'w')
cmd = 'p4 passwd -O \"\" -P ' + password + ' ' + user_id
p.write('%s\n' % cmd)
status = p.close()
if status is None:
print 'Commands executed OK'
else:
print 'One or more command failed to run'
return status
# add to group
def add_to_group(user_id, group_id):
bash_path = runcmd('which bash').strip() # strip() used to trim line feed
p = os.popen('%s' % bash_path, 'w')
p.write('%s\n' % 'p4 -u p4build login')
cmd = '(p4 group -o ' + group_id + '; echo \" ' + user_id + '\")|p4 group -i'
p.write('%s\n' % cmd)
status = p.close()
if status is None:
print 'Commands executed OK'
else:
print 'One or more command failed to run'
return status
# add user
def add_user(user_id, email_domain):
bash_path = runcmd('which bash').strip() # strip() used to trim line feed
p = os.popen('%s' % bash_path, 'w')
p.write('%s\n' % 'p4 -u p4build login')
p.write('%s\n' % 'p4 user -i -f')
p.write('%s\n' % ('User: ' + user_id))
p.write('%s\n' % ('Email: ' + user_id + email_domain))
p.write('%s\n' % ('FullName: ' + user_id))
status = p.close()
if status is None:
print 'Commands executed OK'
else:
print 'One or more command failed to run'
return status
def verify_email_address(receivers):
error_message = 'One or more email address is invalid'
valid_emails = []
invalid_emails = []
for email_address in (''.join(receivers.split())).split(','): # Taken care off white spaces in the specified emails list
match = re.match('^[_a-z0-9-\.]+(\.[_a-z0-9-]+)*@[a-z0-9-]+(\.[a-z0-9-]+)*(\.[a-z]{2,4})$', email_address)
if match == None:
invalid_emails += [email_address]
else:
valid_emails += [email_address]
if len(invalid_emails) > 0:
print(error_message)
raise ValueError(error_message)
return valid_emails
def send_email(email_from, email_to, email_subject, email_message, email_cc='', email_bcc=''):
sendmail_location = runcmd('which sendmail').strip() # strip() used to trim line feed
p = os.popen('%s -t' % sendmail_location, 'w')
p.write('From: %s\n' % email_from)
p.write('To: %s\n' % email_to)
if email_cc is not '':
p.write('Cc: %s\n' % email_cc)
if email_bcc is not '':
p.write('Bcc: %s\n' % email_bcc)
p.write('Subject: %s\n' % email_subject)
p.write('\n') # blank line separating headers from body
p.write(email_message)
status = p.close()
if status is None: status = 'OK'
print "Sendmail exit status: ", str(status)
return status
def send_verified_email():
global email_from, email_to, email_cc, email_bcc, account_msg, subject
status = None
# send email
valid_emails_to = ','.join(verify_email_address(email_to)) # sendmail does not need list
# Validate CC and BCC addresses
if email_cc is not '':
valid_emails_cc = ','.join(verify_email_address(email_cc))
else:
valid_emails_cc = ''
if email_bcc is not '':
valid_emails_bcc = ','.join(verify_email_address(email_bcc))
else:
valid_emails_bcc = ''
if len(valid_emails_to) > 0:
send_email(email_from, valid_emails_to, subject, account_msg, email_cc=valid_emails_cc, email_bcc=valid_emails_bcc)
else:
print 'No valid email specified'
status = 'Error'
def complete_p4_user_create():
global user_id, email_domain_name, group_id
status = add_user(user_id, email_domain)
if status is None:
status = set_password(user_id, 'Welcome2Ruck')
if status is None:
status = add_to_group(user_id, group_id)
if status is None:
status = send_verified_email()
return status
def main():
status = complete_p4_user_create()
if status is None:
print '\nPerforce user created and email sent successfully.'
else:
print '\nError in creation Perforce user and/or sending email.'
if __name__ == '__main__':
main()
email sending
http://tecadmin.net/ways-to-send-email-from-linux-command-line/
http://www.binarytides.com/linux-mail-command-examples/
http://www.tutorialspoint.com/python/python_sending_email.htm
https://www.scottbrady91.com/Email-Verification/Python-Email-Verification-Script
http://stackoverflow.com/questions/3739909/how-to-strip-all-whitespace-from-string
http://stackoverflow.com/questions/7232088/python-subject-not-shown-when-sending-email-using-smtplib-module
http://stackoverflow.com/questions/1546367/python-how-to-send-mail-with-to-cc-and-bcc
http://stackoverflow.com/questions/73781/sending-mail-via-sendmail-from-python
echo "This is the body" | mail -s "Subject" -aFrom:Harry\<harry@gmail.com\> gopal.singh@ruckuswireless.com
# Python script:
=====
# using yahoo
#!/usr/bin/env python
# coding: utf-8
import re
import subprocess
import shlex
import os
import argparse, sys
def runcmd(cmd):
cmd = shlex.split(cmd)
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, error = proc.communicate()
return out + error
except:
print 'Invalid Command: ' + str(cmd)
import smtplib
from email.mime.text import MIMEText
SMTP_SERVER = "smtp.mail.yahoo.com"
SMTP_PORT = 587
SMTP_USERNAME = "hurrymanjee"
SMTP_PASSWORD = "######"
EMAIL_FROM = "hurrymanjee@yahoo.com"
EMAIL_TO = "dummyrain@sbcglobal.net"
EMAIL_SUBJECT = "REMINDER:"
co_msg = """
Hello, [username]! Just wanted to send a friendly appointment
reminder for your appointment:
[Company]
Where: [companyAddress]
Time: [appointmentTime]
Company URL: [companyUrl]
Change appointment?? Add Service??
change notification preference (text msg/email)
"""
def send_email():
msg = MIMEText(co_msg)
msg['Subject'] = EMAIL_SUBJECT + "Company - Service at appointmentTime"
msg['From'] = EMAIL_FROM
msg['To'] = EMAIL_TO
debuglevel = True
mail = smtplib.SMTP(SMTP_SERVER, SMTP_PORT)
mail.set_debuglevel(debuglevel)
mail.starttls()
mail.login(SMTP_USERNAME, SMTP_PASSWORD)
mail.sendmail(EMAIL_FROM, EMAIL_TO, msg.as_string())
mail.quit()
if __name__=='__main__':
send_email()
http://www.binarytides.com/linux-mail-command-examples/
http://www.tutorialspoint.com/python/python_sending_email.htm
https://www.scottbrady91.com/Email-Verification/Python-Email-Verification-Script
http://stackoverflow.com/questions/3739909/how-to-strip-all-whitespace-from-string
http://stackoverflow.com/questions/7232088/python-subject-not-shown-when-sending-email-using-smtplib-module
http://stackoverflow.com/questions/1546367/python-how-to-send-mail-with-to-cc-and-bcc
http://stackoverflow.com/questions/73781/sending-mail-via-sendmail-from-python
echo "This is the body" | mail -s "Subject" -aFrom:Harry\<harry@gmail.com\> gopal.singh@ruckuswireless.com
# Python script:
# email_receivers must be a list for multiple receivers. One receiver is OK to specify as string.
import smtplib
email_sender = 'gopal.singh@ruckuswireless.com'
email_receivers = email_sender
message = 'Test Message'
def send_email(sender, receivers, message):
try:
smtpObj = smtplib.SMTP('localhost')
smtpObj.sendmail(sender, receivers, message)
print "Successfully sent email"
except smtplib.SMTPException:
print "Error: unable to send email"
def main():
send_email(email_sender, email_receivers, message)
if __name__ == '__main__':
main()
# Email with CC and BCC
toaddr = 'buffy@sunnydale.k12.ca.us'
cc = ['alexander@sunydale.k12.ca.us','willow@sunnydale.k12.ca.us']
bcc = ['chairman@slayerscouncil.uk']
fromaddr = 'giles@sunnydale.k12.ca.us'
message_subject = "disturbance in sector 7"
message_text = "Three are dead in an attack in the sewers below sector 7."
message = "From: %s\r\n" % fromaddr
+ "To: %s\r\n" % toaddr
+ "CC: %s\r\n" % ",".join(cc)
+ "Subject: %s\r\n" % message_subject
+ "\r\n"
+ message_text
toaddrs = [toaddr] + cc + bcc
server = smtplib.SMTP('smtp.sunnydale.k12.ca.us')
server.set_debuglevel(1)
server.sendmail(fromaddr, toaddrs, message)
server.quit()
# Using sendmail
def sendMail():
sendmail_location = "/usr/sbin/sendmail" # sendmail location
p = os.popen("%s -t" % sendmail_location, "w")
p.write("From: %s\n" % "from@somewhere.com")
p.write("To: %s\n" % "to@somewhereelse.com")
p.write("Subject: thesubject\n")
p.write("\n") # blank line separating headers from body
p.write("body of the mail")
status = p.close()
if status != 0:
print "Sendmail exit status", status
#
from email.mime.text import MIMEText
from subprocess import Popen, PIPE
msg = MIMEText("Here is the body of my message")
msg["From"] = "me@example.com"
msg["To"] = "you@example.com"
msg["Subject"] = "This is the subject."
p = Popen(["/usr/sbin/sendmail", "-t", "-oi"], stdin=PIPE, universal_newlines=True)
p.communicate(msg.as_string())
import smtplib
email_sender = 'gopal.singh@ruckuswireless.com'
email_receivers = email_sender
message = 'Test Message'
def send_email(sender, receivers, message):
try:
smtpObj = smtplib.SMTP('localhost')
smtpObj.sendmail(sender, receivers, message)
print "Successfully sent email"
except smtplib.SMTPException:
print "Error: unable to send email"
def main():
send_email(email_sender, email_receivers, message)
if __name__ == '__main__':
main()
# Email with CC and BCC
toaddr = 'buffy@sunnydale.k12.ca.us'
cc = ['alexander@sunydale.k12.ca.us','willow@sunnydale.k12.ca.us']
bcc = ['chairman@slayerscouncil.uk']
fromaddr = 'giles@sunnydale.k12.ca.us'
message_subject = "disturbance in sector 7"
message_text = "Three are dead in an attack in the sewers below sector 7."
message = "From: %s\r\n" % fromaddr
+ "To: %s\r\n" % toaddr
+ "CC: %s\r\n" % ",".join(cc)
+ "Subject: %s\r\n" % message_subject
+ "\r\n"
+ message_text
toaddrs = [toaddr] + cc + bcc
server = smtplib.SMTP('smtp.sunnydale.k12.ca.us')
server.set_debuglevel(1)
server.sendmail(fromaddr, toaddrs, message)
server.quit()
# Using sendmail
def sendMail():
sendmail_location = "/usr/sbin/sendmail" # sendmail location
p = os.popen("%s -t" % sendmail_location, "w")
p.write("From: %s\n" % "from@somewhere.com")
p.write("To: %s\n" % "to@somewhereelse.com")
p.write("Subject: thesubject\n")
p.write("\n") # blank line separating headers from body
p.write("body of the mail")
status = p.close()
if status != 0:
print "Sendmail exit status", status
#
from email.mime.text import MIMEText
from subprocess import Popen, PIPE
msg = MIMEText("Here is the body of my message")
msg["From"] = "me@example.com"
msg["To"] = "you@example.com"
msg["Subject"] = "This is the subject."
p = Popen(["/usr/sbin/sendmail", "-t", "-oi"], stdin=PIPE, universal_newlines=True)
p.communicate(msg.as_string())
=====
# using yahoo
#!/usr/bin/env python
# coding: utf-8
import re
import subprocess
import shlex
import os
import argparse, sys
def runcmd(cmd):
cmd = shlex.split(cmd)
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, error = proc.communicate()
return out + error
except:
print 'Invalid Command: ' + str(cmd)
import smtplib
from email.mime.text import MIMEText
SMTP_SERVER = "smtp.mail.yahoo.com"
SMTP_PORT = 587
SMTP_USERNAME = "hurrymanjee"
SMTP_PASSWORD = "######"
EMAIL_FROM = "hurrymanjee@yahoo.com"
EMAIL_TO = "dummyrain@sbcglobal.net"
EMAIL_SUBJECT = "REMINDER:"
co_msg = """
Hello, [username]! Just wanted to send a friendly appointment
reminder for your appointment:
[Company]
Where: [companyAddress]
Time: [appointmentTime]
Company URL: [companyUrl]
Change appointment?? Add Service??
change notification preference (text msg/email)
"""
def send_email():
msg = MIMEText(co_msg)
msg['Subject'] = EMAIL_SUBJECT + "Company - Service at appointmentTime"
msg['From'] = EMAIL_FROM
msg['To'] = EMAIL_TO
debuglevel = True
mail = smtplib.SMTP(SMTP_SERVER, SMTP_PORT)
mail.set_debuglevel(debuglevel)
mail.starttls()
mail.login(SMTP_USERNAME, SMTP_PASSWORD)
mail.sendmail(EMAIL_FROM, EMAIL_TO, msg.as_string())
mail.quit()
if __name__=='__main__':
send_email()
python perforce
install: pip install p4python
https://www.perforce.com/perforce/r15.1/manuals/p4script/python.programming.html
https://www.perforce.com/company/newsletter/2009/07/technofiles-scripting-perforce-p4python
https://www.perforce.com/company/newsletter/2009/07/technofiles-scripting-perforce-p4python
Tuesday, April 19, 2016
python Flask script to find view for a given job -v1.0
#!/usr/bin/env python
from flask import Flask
import sys
import urllib2
import urllib
import base64
import traceback
import json
import httplib
import argparse
import logging
import getpass
import os
import re
app = Flask(__name__)
jenkinsjobname = 'AP-KUMO_3.4.0.1_T300'
DEFAULT_JENKINS_SERVER = 'http://172.16.200.63:8080/'
JENKINS_ROOT_VIEW = DEFAULT_JENKINS_SERVER + 'view/ALL/'
INFO = 'api/json'
JOB_INFO = 'job/%(name)s/api/json?depth=0'
SCRIPT_NAME = os.path.basename(__file__)
SCRIPT_OBJECTIVE = '''
Purpose of the script: Find view associated with a job
'''
def auth_headers(username, password):
'''
Simple implementation of HTTP Basic Authentication. Returns the 'Authentication' header value. Required for authenticated site access
'''
return 'Basic ' + base64.encodestring('%s:%s' % (username, password))[:-1]
def login(username, password): # Prompt for password
if (username is not None and username != '') and (password is None or password == ''):
return (username, getpass.getpass("\nEnter your ({0}) jenkins password: ".format(username)))
else:
return (username, password)
class JenkinsException(Exception): pass
class Jenkins(object):
def __init__(self, url, username=None, password=None):
self.server = self.fix_server_url(url)
if username is not None and password is not None:
self.auth = auth_headers(username, password)
else:
self.auth = None
def fix_server_url(self, urls): # urls may be comma separated server, view, job, or build urls
url = urls.split(',')[0]
if url[-1] != '/':
url = url + '/'
return url
def fix_urls(self, urls): # urls may be comma separated server, view, job, or build urls
url_str = ''
for url in urls.split(','):
if url[-1] != '/':
url_str += url + '/,'
else:
url_str += url + ','
return url_str.rstrip(',')
def jenkins_open(self, req): # req = HTTP request using urllib2
try:
if self.auth:
req.add_header('Authorization', self.auth)
return urllib2.urlopen(req).read()
except urllib2.HTTPError, e:
# Jenkins's funky authentication means its highly impossible to distinguish errors.
if e.code in [401, 403, 500]:
raise JenkinsException('Error in request. Possibly authentication failed [%s]'%(e.code))
def get_info(self, url = None):
if url is None:
url = self.server
try:
return json.loads(self.jenkins_open(urllib2.Request(url + INFO)))
except urllib2.HTTPError:
raise JenkinsException("Error communicating with server[%s]"%url)
except httplib.BadStatusLine:
raise JenkinsException("Error communicating with server[%s]"%url)
except ValueError:
raise JenkinsException("Could not parse JSON info for server[%s]"%url)
except:
raise JenkinsException("Possibly wrong url[%s]"%url)
@staticmethod
def valid_view(info_str): # checks if the given url is a view url and has associated jobs
if 'description' in info_str and 'jobs' in info_str and (len(info_str['jobs']) > 0):
return True
else:
return False
@staticmethod
def valid_job(info_str): # checks if the given url is a job url
if 'builds' in info_str and (len(info_str['builds']) > 0):
return True
else:
return False
def get_views_list(self, jenkins_job_url):
jenkins_info = self.get_info(jenkins_job_url)
urls_list = [DEFAULT_JENKINS_SERVER]
job_views_list = []
if self.valid_job(jenkins_info):
#check nested urls
while len(urls_list) > 0:
urls_new = []
for url in urls_list:
jenkins_info = self.get_info(url)
if ('views' in jenkins_info):
for view in jenkins_info['views']:
#views_list.append(view['url'])
urls_new.append(view['url'])
urls_list = urls_new
else:
if not '/view/ALL/' in url: # Do not consider the root view 'ALL' because it consists of all the jobs
job_views_list.append(url)
if len(urls_new) == 0:
urls_list = []
return job_views_list
def get_jobs_list_in_view(self, view_name):
jobs_list = []
jenkins_jobs = self.get_info(view_name)
if 'jobs' in jenkins_jobs:
jobs_list = jenkins_jobs['jobs']
return jobs_list
def find_view(self, job_name, job_url):
views_list = self.get_views_list(job_url)
job_view = ''
for view in views_list:
jobs_list = self.get_jobs_list_in_view(view)
if job_name in str(jobs_list):
job_view = view
break
return job_view
@app.route("/")
def any_function():
global JENKINS_ROOT_VIEW, jenkinsjobname
joburl = JENKINS_ROOT_VIEW + 'job/' + jenkinsjobname + '/'
jenkins = Jenkins(joburl, None, None)
#print '\nSearching the view for the job: ' + jenkinsjobname + '\nIt may take few minutes, please wait ... ... ...'
str = jenkins.find_view(jenkinsjobname, joburl)
return str
app.run(debug=True, port = 5001)
from flask import Flask
import sys
import urllib2
import urllib
import base64
import traceback
import json
import httplib
import argparse
import logging
import getpass
import os
import re
app = Flask(__name__)
jenkinsjobname = 'AP-KUMO_3.4.0.1_T300'
DEFAULT_JENKINS_SERVER = 'http://172.16.200.63:8080/'
JENKINS_ROOT_VIEW = DEFAULT_JENKINS_SERVER + 'view/ALL/'
INFO = 'api/json'
JOB_INFO = 'job/%(name)s/api/json?depth=0'
SCRIPT_NAME = os.path.basename(__file__)
SCRIPT_OBJECTIVE = '''
Purpose of the script: Find view associated with a job
'''
def auth_headers(username, password):
'''
Simple implementation of HTTP Basic Authentication. Returns the 'Authentication' header value. Required for authenticated site access
'''
return 'Basic ' + base64.encodestring('%s:%s' % (username, password))[:-1]
def login(username, password): # Prompt for password
if (username is not None and username != '') and (password is None or password == ''):
return (username, getpass.getpass("\nEnter your ({0}) jenkins password: ".format(username)))
else:
return (username, password)
class JenkinsException(Exception): pass
class Jenkins(object):
def __init__(self, url, username=None, password=None):
self.server = self.fix_server_url(url)
if username is not None and password is not None:
self.auth = auth_headers(username, password)
else:
self.auth = None
def fix_server_url(self, urls): # urls may be comma separated server, view, job, or build urls
url = urls.split(',')[0]
if url[-1] != '/':
url = url + '/'
return url
def fix_urls(self, urls): # urls may be comma separated server, view, job, or build urls
url_str = ''
for url in urls.split(','):
if url[-1] != '/':
url_str += url + '/,'
else:
url_str += url + ','
return url_str.rstrip(',')
def jenkins_open(self, req): # req = HTTP request using urllib2
try:
if self.auth:
req.add_header('Authorization', self.auth)
return urllib2.urlopen(req).read()
except urllib2.HTTPError, e:
# Jenkins's funky authentication means its highly impossible to distinguish errors.
if e.code in [401, 403, 500]:
raise JenkinsException('Error in request. Possibly authentication failed [%s]'%(e.code))
def get_info(self, url = None):
if url is None:
url = self.server
try:
return json.loads(self.jenkins_open(urllib2.Request(url + INFO)))
except urllib2.HTTPError:
raise JenkinsException("Error communicating with server[%s]"%url)
except httplib.BadStatusLine:
raise JenkinsException("Error communicating with server[%s]"%url)
except ValueError:
raise JenkinsException("Could not parse JSON info for server[%s]"%url)
except:
raise JenkinsException("Possibly wrong url[%s]"%url)
@staticmethod
def valid_view(info_str): # checks if the given url is a view url and has associated jobs
if 'description' in info_str and 'jobs' in info_str and (len(info_str['jobs']) > 0):
return True
else:
return False
@staticmethod
def valid_job(info_str): # checks if the given url is a job url
if 'builds' in info_str and (len(info_str['builds']) > 0):
return True
else:
return False
def get_views_list(self, jenkins_job_url):
jenkins_info = self.get_info(jenkins_job_url)
urls_list = [DEFAULT_JENKINS_SERVER]
job_views_list = []
if self.valid_job(jenkins_info):
#check nested urls
while len(urls_list) > 0:
urls_new = []
for url in urls_list:
jenkins_info = self.get_info(url)
if ('views' in jenkins_info):
for view in jenkins_info['views']:
#views_list.append(view['url'])
urls_new.append(view['url'])
urls_list = urls_new
else:
if not '/view/ALL/' in url: # Do not consider the root view 'ALL' because it consists of all the jobs
job_views_list.append(url)
if len(urls_new) == 0:
urls_list = []
return job_views_list
def get_jobs_list_in_view(self, view_name):
jobs_list = []
jenkins_jobs = self.get_info(view_name)
if 'jobs' in jenkins_jobs:
jobs_list = jenkins_jobs['jobs']
return jobs_list
def find_view(self, job_name, job_url):
views_list = self.get_views_list(job_url)
job_view = ''
for view in views_list:
jobs_list = self.get_jobs_list_in_view(view)
if job_name in str(jobs_list):
job_view = view
break
return job_view
@app.route("/")
def any_function():
global JENKINS_ROOT_VIEW, jenkinsjobname
joburl = JENKINS_ROOT_VIEW + 'job/' + jenkinsjobname + '/'
jenkins = Jenkins(joburl, None, None)
#print '\nSearching the view for the job: ' + jenkinsjobname + '\nIt may take few minutes, please wait ... ... ...'
str = jenkins.find_view(jenkinsjobname, joburl)
return str
app.run(debug=True, port = 5001)
python script to find view for a given job
#!/usr/bin/env python
# coding: utf-8
import sys
import urllib2
import urllib
import base64
import traceback
import json
import httplib
import argparse
import logging
import getpass
import os
import re
DEFAULT_JENKINS_SERVER = 'http://172.16.200.63:8080/'
JENKINS_ROOT_VIEW = DEFAULT_JENKINS_SERVER + 'view/ALL/'
INFO = 'api/json'
JOB_INFO = 'job/%(name)s/api/json?depth=0'
SCRIPT_NAME = os.path.basename(__file__)
SCRIPT_OBJECTIVE = '''
Purpose of the script: Find view associated with a job
'''
class CommandLine:
"""Handles parsing the commandline parameters"""
def __init__(self):
self.parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
description=SCRIPT_OBJECTIVE,
add_help=False,
epilog='''
EXAMPLE USAGE:
%(executable)s
accepts jenkins view urls separated by comma\n
%(executable)s %(view_urls)s
''' % {'executable': SCRIPT_NAME, 'view_urls': 'http://localhost:8080/view/view1,http://localhost:8080/view/view2'})
class HelpAction(argparse._HelpAction):
def __call__(self, parser, namespace, values, option_string=None):
print(parser.format_help()
.replace('usage:', 'USAGE:')
.replace('positional arguments:', 'POSITIONAL ARGUMENTS:')
.replace('optional arguments:', 'OPTIONAL ARGUMENTS:'))
parser.exit()
self.parser.add_argument('jenkinsjobname', help = 'Jenkins jobname')
self.parser.add_argument('-jenkinsurl', help = 'Jenkins buil/view/server url for status check')
self.parser.add_argument('-d', '--debug', action='store_const', const=True, help="print debugging information\n\n")
self.parser.add_argument('-h', '--help', action=HelpAction, help='show this help message and exit\n\n')
def parse_args(self):
# Message display when there is no argument on command line
if len(sys.argv) < 2:
print "Please run the script with argument -h for help"
exit(0)
args = self.parser.parse_args()
if args.debug:
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
return args
def help_blurb(self):
return 'Try \'%s --help\' for more information.' % os.path.basename(sys.argv[0])
args = CommandLine().parse_args()
#sys.exit(0)
def auth_headers(username, password):
'''
Simple implementation of HTTP Basic Authentication. Returns the 'Authentication' header value. Required for authenticated site access
'''
return 'Basic ' + base64.encodestring('%s:%s' % (username, password))[:-1]
def login(username, password): # Prompt for password
if (username is not None and username != '') and (password is None or password == ''):
return (username, getpass.getpass("\nEnter your ({0}) jenkins password: ".format(username)))
else:
return (username, password)
class JenkinsException(Exception): pass
class Jenkins(object):
def __init__(self, url, username=None, password=None):
self.server = self.fix_server_url(url)
if username is not None and password is not None:
self.auth = auth_headers(username, password)
else:
self.auth = None
def fix_server_url(self, urls): # urls may be comma separated server, view, job, or build urls
url = urls.split(',')[0]
if url[-1] != '/':
url = url + '/'
return url
def fix_urls(self, urls): # urls may be comma separated server, view, job, or build urls
url_str = ''
for url in urls.split(','):
if url[-1] != '/':
url_str += url + '/,'
else:
url_str += url + ','
return url_str.rstrip(',')
def jenkins_open(self, req): # req = HTTP request using urllib2
try:
if self.auth:
req.add_header('Authorization', self.auth)
return urllib2.urlopen(req).read()
except urllib2.HTTPError, e:
# Jenkins's funky authentication means its highly impossible to distinguish errors.
if e.code in [401, 403, 500]:
raise JenkinsException('Error in request. Possibly authentication failed [%s]'%(e.code))
def get_info(self, url = None):
if url is None:
url = self.server
try:
return json.loads(self.jenkins_open(urllib2.Request(url + INFO)))
except urllib2.HTTPError:
raise JenkinsException("Error communicating with server[%s]"%url)
except httplib.BadStatusLine:
raise JenkinsException("Error communicating with server[%s]"%url)
except ValueError:
raise JenkinsException("Could not parse JSON info for server[%s]"%url)
except:
raise JenkinsException("Possibly wrong url[%s]"%url)
@staticmethod
def valid_view(info_str): # checks if the given url is a view url and has associated jobs
if 'description' in info_str and 'jobs' in info_str and (len(info_str['jobs']) > 0):
return True
else:
return False
@staticmethod
def valid_job(info_str): # checks if the given url is a job url
if 'builds' in info_str and (len(info_str['builds']) > 0):
return True
else:
return False
def get_views_list(self, jenkins_job_url):
jenkins_info = self.get_info(jenkins_job_url)
urls_list = [DEFAULT_JENKINS_SERVER]
job_views_list = []
if self.valid_job(jenkins_info):
#check nested urls
while len(urls_list) > 0:
urls_new = []
for url in urls_list:
jenkins_info = self.get_info(url)
if ('views' in jenkins_info):
for view in jenkins_info['views']:
#views_list.append(view['url'])
urls_new.append(view['url'])
urls_list = urls_new
else:
if not '/view/ALL/' in url: # Do not consider the root view 'ALL' because it consists of all the jobs
job_views_list.append(url)
if len(urls_new) == 0:
urls_list = []
return job_views_list
def get_jobs_list_in_view(self, view_name):
jobs_list = []
jenkins_jobs = self.get_info(view_name)
if 'jobs' in jenkins_jobs:
jobs_list = jenkins_jobs['jobs']
return jobs_list
def find_view(self, job_name, job_url):
views_list = self.get_views_list(job_url)
job_view = ''
for view in views_list:
jobs_list = self.get_jobs_list_in_view(view)
if job_name in str(jobs_list):
job_view = view
break
return job_view
def main():
#username, password = login(args.username, args.password)
joburl = JENKINS_ROOT_VIEW + 'job/' + args.jenkinsjobname + '/'
jenkins = Jenkins(joburl, None, None)
print '\nSearching the view for the job: ' + args.jenkinsjobname + '\nIt may take few minutes, please wait ... ... ...'
print jenkins.find_view(args.jenkinsjobname, joburl)
print '\nDone\n'
if __name__ == '__main__':
main()
using global variable in a function
http://stackoverflow.com/questions/19182963/global-variable-and-python-flask
str = 'Hello'
def f1():
global str
print str
=> 'Hello'
str = 'Hello'
def f1():
global str
print str
=> 'Hello'
Friday, April 15, 2016
thread in python
http://www.tutorialspoint.com/python/python_multithreading.htm
http://opensourceforu.com/2011/01/python-threading-and-its-caveats/
Simple thread example:
#!/usr/bin/env python
from threading import Thread
class mythread(Thread):
def __init__(self):
Thread.__init__(self) # Must initialize the thread
def run(self): # Must override the run method of Thread Class
test_function()
def test_function():
print 'NOT OK'
th1 = mythread()
th1.start()
th1.join()
print 'Done'
http://opensourceforu.com/2011/01/python-threading-and-its-caveats/
Simple thread example:
#!/usr/bin/env python
from threading import Thread
class mythread(Thread):
def __init__(self):
Thread.__init__(self) # Must initialize the thread
def run(self): # Must override the run method of Thread Class
test_function()
def test_function():
print 'NOT OK'
th1 = mythread()
th1.start()
th1.join()
print 'Done'
Sunday, April 10, 2016
bpm beats frequency detection in audio wav file: pyaudio, wave
https://linuxaudiostudio.wordpress.com/2012/07/06/some-python-code-for-calculating-bpm-and-delay-times/
http://stackoverflow.com/questions/12344951/detect-beat-and-play-wav-file-in-a-syncronised-manner
https://www.google.com/webhp?sourceid=chrome-instant&ion=1&espv=2&ie=UTF-8#q=python%20audio%20beats%20bpm%20detection
https://wiki.python.org/moin/PythonInMusic
# Beats detection
#!/usr/bin/env python
import wave
from string import capitalize, capwords
from optparse import OptionParser
def ms_to_bpm(length, measures = 1, beat = 1):
"""Calculates beat per minute of a soundfile given the lenght in
milliseconds, the number of measures and the beat, where 4/4 is 1.
Returns bpm
"""
bpm = round(60000/((float(length/beat)/measures)/4),2)
return bpm
def wavduration(wavfile):
"""Returns the duration of a wavfile in milliseconds"""
myfile = wave.open(wavfile, "r")
frames = (1.0 * myfile.getnframes ())
sr = myfile.getframerate ()
time = (1.0 *(frames/sr))
return int(round(time * 1000))
def delay_times(bpm = 120):
"""Returns delay times for the specified bpm, in milliseconds"""
result = []
durations = [
(1,"whole"),
((3.0/4),"dotted half"),
((1.0/2),"half"),
((3.0/8),"dotted quarter"),
((1.0/4),"quarter"),
((3.0/16),"dotted eight"),
(round(((1.0/2)/3),5),"quarter triplet"),
((1.0/8),"eight"),
((3.0/32),"dotted sixteenth"),
(round(((1.0/4)/3),5),"eight triplet"),
((1.0/16),"sixteenth"),
((3.0/64),"dotted thirty second"),
(round(((1.0/8)/3),5),"sixteenth triplet"),
((1.0/32),"thirty second")
]
for duration, description in durations:
title = capwords(description)
delay = (duration * 4000) / (bpm/60.0)
frequency = 1000 / delay
result.append({"title": title, "delay": delay,"frequency": frequency})
return result
def delay_times_format(bpm = 120):
a = delay_times(bpm)
a.sort()
a.reverse()
print "\n",bpm,"beats per minute (bpm):"
print
print "Note Delay time LFO freq"
print 75 * "-"
for line in a:
title = line["title"].ljust(30," ")
delay = round(line["delay"],3)
frequency = round(line["frequency"],2)
print title,delay.__str__().rjust(8), "ms ",20*" ", frequency.__str__().rjust(5), "Hz"
if __name__ == "__main__":
parser = OptionParser()
parser.add_option("-f", "--file", dest="filename", default="none",
help="wave FILE to load", metavar="FILE")
parser.add_option("-b", "--bpm",
dest="bpm", default="analize",
help="beats per minute")
parser.add_option("-B", "--bars", dest ="bars", help="number of bars in the wav file")
parser.add_option("-m", "--meter", dest ="meter", help="as in 3/4 or 12/8, default 4/4")
(options, args) = parser.parse_args()
# print options, args
if options.meter:
a = options.meter.split("/")
meter = (float(a[0])/float(a[1]))
print "\nMeter is", options.meter
else:
meter = 1
print "\nMeter is 4/4"
if options.bars:
bars = int(options.bars)
else:
bars = 1
wavfile = options.filename
if wavfile == "none":
print "No wavfiles to analize, defaulting to bpm provided, or 120"
if options.bpm == "analize":
bpm = 120
else:
bpm = float(options.bpm)
else:
if options.bpm != "analize":
bpm = float(options.bpm)
delay_times_format(bpm)
else:
bpm = 120
print 75*"-","\n"
wavduration = wavduration(wavfile)
print wavfile,"is",wavduration, "milliseconds"
bpm = ms_to_bpm(wavduration,bars,meter)
# bpm = ms_to_bpm(3850,2,1)
print "Bpm of",wavfile, "is", bpm
delay_times_format(bpm)
# Frequency detection:
http://stackoverflow.com/questions/12344951/detect-beat-and-play-wav-file-in-a-syncronised-manner
https://www.google.com/webhp?sourceid=chrome-instant&ion=1&espv=2&ie=UTF-8#q=python%20audio%20beats%20bpm%20detection
https://wiki.python.org/moin/PythonInMusic
# Beats detection
#!/usr/bin/env python
import wave
from string import capitalize, capwords
from optparse import OptionParser
def ms_to_bpm(length, measures = 1, beat = 1):
"""Calculates beat per minute of a soundfile given the lenght in
milliseconds, the number of measures and the beat, where 4/4 is 1.
Returns bpm
"""
bpm = round(60000/((float(length/beat)/measures)/4),2)
return bpm
def wavduration(wavfile):
"""Returns the duration of a wavfile in milliseconds"""
myfile = wave.open(wavfile, "r")
frames = (1.0 * myfile.getnframes ())
sr = myfile.getframerate ()
time = (1.0 *(frames/sr))
return int(round(time * 1000))
def delay_times(bpm = 120):
"""Returns delay times for the specified bpm, in milliseconds"""
result = []
durations = [
(1,"whole"),
((3.0/4),"dotted half"),
((1.0/2),"half"),
((3.0/8),"dotted quarter"),
((1.0/4),"quarter"),
((3.0/16),"dotted eight"),
(round(((1.0/2)/3),5),"quarter triplet"),
((1.0/8),"eight"),
((3.0/32),"dotted sixteenth"),
(round(((1.0/4)/3),5),"eight triplet"),
((1.0/16),"sixteenth"),
((3.0/64),"dotted thirty second"),
(round(((1.0/8)/3),5),"sixteenth triplet"),
((1.0/32),"thirty second")
]
for duration, description in durations:
title = capwords(description)
delay = (duration * 4000) / (bpm/60.0)
frequency = 1000 / delay
result.append({"title": title, "delay": delay,"frequency": frequency})
return result
def delay_times_format(bpm = 120):
a = delay_times(bpm)
a.sort()
a.reverse()
print "\n",bpm,"beats per minute (bpm):"
print "Note Delay time LFO freq"
print 75 * "-"
for line in a:
title = line["title"].ljust(30," ")
delay = round(line["delay"],3)
frequency = round(line["frequency"],2)
print title,delay.__str__().rjust(8), "ms ",20*" ", frequency.__str__().rjust(5), "Hz"
if __name__ == "__main__":
parser = OptionParser()
parser.add_option("-f", "--file", dest="filename", default="none",
help="wave FILE to load", metavar="FILE")
parser.add_option("-b", "--bpm",
dest="bpm", default="analize",
help="beats per minute")
parser.add_option("-B", "--bars", dest ="bars", help="number of bars in the wav file")
parser.add_option("-m", "--meter", dest ="meter", help="as in 3/4 or 12/8, default 4/4")
(options, args) = parser.parse_args()
# print options, args
if options.meter:
a = options.meter.split("/")
meter = (float(a[0])/float(a[1]))
print "\nMeter is", options.meter
else:
meter = 1
print "\nMeter is 4/4"
if options.bars:
bars = int(options.bars)
else:
bars = 1
wavfile = options.filename
if wavfile == "none":
print "No wavfiles to analize, defaulting to bpm provided, or 120"
if options.bpm == "analize":
bpm = 120
else:
bpm = float(options.bpm)
else:
if options.bpm != "analize":
bpm = float(options.bpm)
delay_times_format(bpm)
else:
bpm = 120
print 75*"-","\n"
wavduration = wavduration(wavfile)
print wavfile,"is",wavduration, "milliseconds"
bpm = ms_to_bpm(wavduration,bars,meter)
# bpm = ms_to_bpm(3850,2,1)
print "Bpm of",wavfile, "is", bpm
delay_times_format(bpm)
# Frequency detection:
# Read in a WAV and find the freq's
import pyaudio
import wave
import numpy as np
chunk = 2048
# open up a wave
wf = wave.open('test-tones/440hz.wav', 'rb')
swidth = wf.getsampwidth()
RATE = wf.getframerate()
# use a Blackman window
window = np.blackman(chunk)
# open stream
p = pyaudio.PyAudio()
stream = p.open(format =
p.get_format_from_width(wf.getsampwidth()),
channels = wf.getnchannels(),
rate = RATE,
output = True)
# read some data
data = wf.readframes(chunk)
# play stream and find the frequency of each chunk
while len(data) == chunk*swidth:
# write data out to the audio stream
stream.write(data)
# unpack the data and times by the hamming window
indata = np.array(wave.struct.unpack("%dh"%(len(data)/swidth),\
data))*window
# Take the fft and square each value
fftData=abs(np.fft.rfft(indata))**2
# find the maximum
which = fftData[1:].argmax() + 1
# use quadratic interpolation around the max
if which != len(fftData)-1:
y0,y1,y2 = np.log(fftData[which-1:which+2:])
x1 = (y2 - y0) * .5 / (2 * y1 - y2 - y0)
# find the frequency and output it
thefreq = (which+x1)*RATE/chunk
print "The freq is %f Hz." % (thefreq)
else:
thefreq = which*RATE/chunk
print "The freq is %f Hz." % (thefreq)
# read some more data
data = wf.readframes(chunk)
if data:
stream.write(data)
stream.close()
p.terminate()
Friday, April 8, 2016
jinja templating language with python flash
https://realpython.com/blog/python/primer-on-jinja-templating/
http://jinja.pocoo.org/docs/dev/templates/
# jinja2 delimiters: {% %} - for expression: {{ }} - for outputting result
from jinja2 import Template
temp_str = "hello: {% for n in my_list %}{{n}} " "{% endfor %}"
t = Template(temp_str)
http://jinja.pocoo.org/docs/dev/templates/
# jinja2 delimiters: {% %} - for expression: {{ }} - for outputting result
from jinja2 import Template
temp_str = "hello: {% for n in my_list %}{{n}} " "{% endfor %}"
t = Template(temp_str)
t.render(mylist = [3,6,9])
# Note:
1. In direct jinja2, the python variable is only passed through render method. Separate variable can be used in the jinja expression
2. In Flask, the flask method 'render_template' pass the variables to the jinja render
Thursday, April 7, 2016
login to remote machines : pxssh
http://www.pythonforbeginners.com/code-snippets-source-code/ssh-connection-with-python
http://stackoverflow.com/questions/21055943/pxssh-connecting-to-an-ssh-proxy-timeout-exceeded-in-read-nonblocking
# Note: may get error as: TIMEOUT:
The issue is an out of date / absent snippet of code in /usr/lib/python.2.x/dist-packages/pxssh.py. (it may also be in usr/lib/python.2.x/site-packages/pxssh.py).
"
Navigate to this file and add the following lines of code:
self.sendline() #Line 134
time.sleep(0.5) #Line 135
This should be inserted just above the line
self.read_nonblocking(size=10000,timeout=1) # GAS: Clear out the cache before getting the prompt
"
# Code
#!/usr/bin/env python
import pxssh
import getpass
try:
s = pxssh.pxssh()
hostname = raw_input('hostname: ')
username = raw_input('username: ')
password = getpass.getpass('password: ')
s.login (hostname, username, password)
s.sendline ('uptime') # run a command
s.prompt() # match the prompt
print s.before # print everything before the prompt.
s.sendline ('ls -l')
s.prompt()
print s.before
s.sendline ('df')
s.prompt()
print s.before
s.logout()
except pxssh.ExceptionPxssh, e:
print "pxssh failed on login."
print str(e)
http://stackoverflow.com/questions/21055943/pxssh-connecting-to-an-ssh-proxy-timeout-exceeded-in-read-nonblocking
# Note: may get error as: TIMEOUT:
The issue is an out of date / absent snippet of code in /usr/lib/python.2.x/dist-packages/pxssh.py. (it may also be in usr/lib/python.2.x/site-packages/pxssh.py).
"
Navigate to this file and add the following lines of code:
self.sendline() #Line 134
time.sleep(0.5) #Line 135
This should be inserted just above the line
self.read_nonblocking(size=10000,timeout=1) # GAS: Clear out the cache before getting the prompt
"
# Code
#!/usr/bin/env python
import pxssh
import getpass
try:
s = pxssh.pxssh()
hostname = raw_input('hostname: ')
username = raw_input('username: ')
password = getpass.getpass('password: ')
s.login (hostname, username, password)
s.sendline ('uptime') # run a command
s.prompt() # match the prompt
print s.before # print everything before the prompt.
s.sendline ('ls -l')
s.prompt()
print s.before
s.sendline ('df')
s.prompt()
print s.before
s.logout()
except pxssh.ExceptionPxssh, e:
print "pxssh failed on login."
print str(e)
set local time on ubuntu
http://manpages.ubuntu.com/manpages/saucy/man3/DateTime::TimeZone::Catalog.3pm.html
http://stackoverflow.com/questions/5971635/setting-reading-up-environment-variables-in-python
import os, time
time.strftime('%X %x %Z')
os.environ['TZ'] = 'America/Los_Angeles'
time.tzset()
time.strftime('%X %x %Z')
# Note: Install and check ntp:
apt-get install ntp
Type ntpdc -c peers into terminal. Or ntpdc -p should do it as well.
http://stackoverflow.com/questions/5971635/setting-reading-up-environment-variables-in-python
import os, time
time.strftime('%X %x %Z')
os.environ['TZ'] = 'America/Los_Angeles'
time.tzset()
time.strftime('%X %x %Z')
# Note: Install and check ntp:
apt-get install ntp
Type ntpdc -c peers into terminal. Or ntpdc -p should do it as well.
Friday, April 1, 2016
pymongo : python working with mongodb
https://api.mongodb.org/python/current/tutorial.html
https://gist.github.com/vgoklani/1556251
https://api.mongodb.org/python/current/api/pymongo/database.html
https://docs.mongodb.org/getting-started/python/query/
https://docs.mongodb.org/manual/reference/command/delete/
https://docs.mongodb.org/manual/tutorial/remove-documents/
https://docs.mongodb.org/manual/tutorial/create-an-auto-incrementing-field/
# Mongodb backup/restore: using mongoexport and mongoimport
http://www.mkyong.com/mongodb/mongodb-import-and-export-example/
http://www.tothenew.com/blog/export-very-large-data-from-a-mongodb-collection-using-script/
http://stackoverflow.com/questions/11255630/how-to-export-all-collection-in-mongodb
https://docs.mongodb.com/v3.0/reference/program/mongorestore/
http://www.tutorialspoint.com/mongodb/mongodb_create_database.htm
# Note: Jinja {% if x != "test" and x != "local" %} : jinja code in html comment is not allowed:
Do not put html comment for jinja code such as <!-- {{local_dat}} -->
# export: mongoexport --db test --collection collections --out db_bak.1
# Also: mongodump/mongorestore
mongodump -d <our database name> -o <directory_backup>
And to "restore/import" that, i used (from directory_backup/dump/):
mongorestore <our database name>
# backup/restore complete complete server instance
just use: mongodump to backup & mongorestore to restore the complete instance. No need to stop the mongo service: The backup command creates a dump folder in the directory where mongodump is executed. While restore, the mongorestore looks for dump folder in the directory where the mongorestore command is executed.
# To create a database in mongodb , at least one document should be created.
#!/bin/bash
backupDir=/data/db-backup/
dbName='commit_db'
collectionName='commits'
numDays=7
curDate=$(date +'%Y-%m-%d')
curLog="${backupDir}/${curDate}.bak"
echo "Removing backups older than ${numDays} days"
find ${backupDir} -type f -name "*.bak" -delete
echo "Exporting data into ${curLog}"
/usr/bin/mongoexport --verbose --db ${dbName} --collection ${collectionName} --out ${curLog}
# default used db is: test or last active database if removed or used
import pymongo
connection = pymongo.MongoClient() # Create connection to database
db = connect.<db_name> # same as mongo console : 'use'
collection = db.test_collection
document = collection.test_document
# Other usage of the pymongo methods
# default connection to localhost:27017 : MongoClient()
# MongoClient('localhost', 27017)
# MongoClient('mongodb://localhost:27017/')
# db = connection['hello_boy']
# db = connect.hello_boy
# collection = db.test_collection
# collection = db['test-collection']
# Note: Console commands
1. default database variable on mongo console: 'db'. This is initialized by: 'use'
2. drop database: db.dropDatabase()
3. db # displays current database
4. show dbs
5. show collections
# Write Data:
use test
collection = db.collection
db.collection.insert( { item: "card", qty: 15 } )
# Read Data
use test
db.collection.find()
# Python methods:
connection.drop_database('hellodb') : connection.drop_database(<databse_name>)
connection['mydatabase'].drop_collection('mycollection')
dababase_name = db.name : name from the db object
c.database_names() # list all databases :: "show dbs"
# drop collection :: "db.collection.drop()"
db.collection_names() # list all collections in the selected database
db.inventory.remove( { type : "food" } ) # To delete data
db.inventory.remove({}) # Removes all data from the collection
# Python equivalent of remove: delete_many(search data)
db.inventory.update(
{ item: "ABC1" },
{ $set: { "details.model": "14Q2" } }
)
# Python read/write data
#!/usr/bin/env python
import shlex
import subprocess
import os
from collections import OrderedDict
import pymongo
# command run
def runcmd(cmd):
cmd = shlex.split(cmd)
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, error = proc.communicate()
return out + error
except:
print 'Invalid Command: ' + str(cmd)
return -1
# multiple commands in a process
def runcmds(cmds_list):
bash_path = runcmd('which bash').strip() # strip() used to trim line feed
p = os.popen('%s' % bash_path, 'w')
for cmd in cmds_list:
p.write('%s\n' % cmd)
status = p.close()
if status is None:
print 'Commands executed OK'
else:
print 'One or more command failed to run'
return status
def db_init():
conn = pymongo.MongoClient('localhost', 27017)
dbh = conn.test
collection = dbh.collections
return collection
def db_insert(collection, dataItem):
#print json.dumps(dataItem, indent=4, sort_keys=True)
print 'OK...'
try:
commit_id = collection.insert_one(dataItem).inserted_id
print "Inserted: record with id:{0}".format(dataItem['_id']) # _id is added to the data item by insert_one
except pymongo.errors.DuplicateKeyError:
print "Error: record with id:{0} already exists".format(dataItem['_id'])
# insert all values as string and process the number at read time
def set_var(field_names, values, num=False):
fields = field_names.strip().replace(' ', '').split(',')
values = values.strip().replace(' ', '').split(',')
if num:
values = map(int, values)
data = zip(fields, values)
#print data
#print dict(data)
collection = db_init()
db_insert(collection, dict(data))
def get_var(conditions_list=None):
field_names = []
values = []
object_ids = []
collection = db_init()
for i in collection.find():
field_group = []
values_group = []
for key, value in i.iteritems():
if key != '_id':
field_group.append(str(key))
values_group.append(str(value))
else:
object_ids.append(str(value))
field_names.append(field_group)
values.append(values_group)
print field_names
print values
print object_ids
def list_vars():
pass
def clean_vars():
pass
if __name__ == '__main__':
#l = ['mongo', 'show dbs']
#print runcmds(l)
#set_var('name, age, size', 'As, 12, 13')
get_var()
https://gist.github.com/vgoklani/1556251
https://api.mongodb.org/python/current/api/pymongo/database.html
https://docs.mongodb.org/getting-started/python/query/
https://docs.mongodb.org/manual/reference/command/delete/
https://docs.mongodb.org/manual/tutorial/remove-documents/
https://docs.mongodb.org/manual/tutorial/create-an-auto-incrementing-field/
# Mongodb backup/restore: using mongoexport and mongoimport
http://www.mkyong.com/mongodb/mongodb-import-and-export-example/
http://www.tothenew.com/blog/export-very-large-data-from-a-mongodb-collection-using-script/
http://stackoverflow.com/questions/11255630/how-to-export-all-collection-in-mongodb
https://docs.mongodb.com/v3.0/reference/program/mongorestore/
http://www.tutorialspoint.com/mongodb/mongodb_create_database.htm
# Note: Jinja {% if x != "test" and x != "local" %} : jinja code in html comment is not allowed:
Do not put html comment for jinja code such as <!-- {{local_dat}} -->
# export: mongoexport --db test --collection collections --out db_bak.1
# import: mongoimport --db test --collection collections --file db_bak.1
# Also: mongodump/mongorestore
mongodump -d <our database name> -o <directory_backup>
And to "restore/import" that, i used (from directory_backup/dump/):
mongorestore <our database name>
# backup/restore complete complete server instance
just use: mongodump to backup & mongorestore to restore the complete instance. No need to stop the mongo service: The backup command creates a dump folder in the directory where mongodump is executed. While restore, the mongorestore looks for dump folder in the directory where the mongorestore command is executed.
# To create a database in mongodb , at least one document should be created.
#!/bin/bash
backupDir=/data/db-backup/
dbName='commit_db'
collectionName='commits'
numDays=7
curDate=$(date +'%Y-%m-%d')
curLog="${backupDir}/${curDate}.bak"
echo "Removing backups older than ${numDays} days"
find ${backupDir} -type f -name "*.bak" -delete
echo "Exporting data into ${curLog}"
/usr/bin/mongoexport --verbose --db ${dbName} --collection ${collectionName} --out ${curLog}
# default used db is: test or last active database if removed or used
import pymongo
connection = pymongo.MongoClient() # Create connection to database
db = connect.<db_name> # same as mongo console : 'use'
MongoClient
(host='localhost', port=27017, document_class=dict, tz_aware=False, connect=True, **kwargs)
collection = db.test_collection
document = collection.test_document
# Other usage of the pymongo methods
# default connection to localhost:27017 : MongoClient()
# MongoClient('localhost', 27017)
# MongoClient('mongodb://localhost:27017/')
# db = connection['hello_boy']
# db = connect.hello_boy
# collection = db.test_collection
# collection = db['test-collection']
# Note: Console commands
1. default database variable on mongo console: 'db'. This is initialized by: 'use'
2. drop database: db.dropDatabase()
3. db # displays current database
4. show dbs
5. show collections
# Write Data:
use test
collection = db.collection
db.collection.insert( { item: "card", qty: 15 } )
# Read Data
use test
db.collection.find()
# Python methods:
connection.drop_database('hellodb') : connection.drop_database(<databse_name>)
connection['mydatabase'].drop_collection('mycollection')
dababase_name = db.name : name from the db object
c.database_names() # list all databases :: "show dbs"
# drop collection :: "db.collection.drop()"
db.collection_names() # list all collections in the selected database
db.inventory.remove( { type : "food" } ) # To delete data
db.inventory.remove({}) # Removes all data from the collection
# Python equivalent of remove: delete_many(search data)
{ item: "ABC1" },
{ $set: { "details.model": "14Q2" } }
)
# Python read/write data
#!/usr/bin/env python
import shlex
import subprocess
import os
from collections import OrderedDict
import pymongo
# command run
def runcmd(cmd):
cmd = shlex.split(cmd)
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, error = proc.communicate()
return out + error
except:
print 'Invalid Command: ' + str(cmd)
return -1
# multiple commands in a process
def runcmds(cmds_list):
bash_path = runcmd('which bash').strip() # strip() used to trim line feed
p = os.popen('%s' % bash_path, 'w')
for cmd in cmds_list:
p.write('%s\n' % cmd)
status = p.close()
if status is None:
print 'Commands executed OK'
else:
print 'One or more command failed to run'
return status
def db_init():
conn = pymongo.MongoClient('localhost', 27017)
dbh = conn.test
collection = dbh.collections
return collection
def db_insert(collection, dataItem):
#print json.dumps(dataItem, indent=4, sort_keys=True)
print 'OK...'
try:
commit_id = collection.insert_one(dataItem).inserted_id
print "Inserted: record with id:{0}".format(dataItem['_id']) # _id is added to the data item by insert_one
except pymongo.errors.DuplicateKeyError:
print "Error: record with id:{0} already exists".format(dataItem['_id'])
# insert all values as string and process the number at read time
def set_var(field_names, values, num=False):
fields = field_names.strip().replace(' ', '').split(',')
values = values.strip().replace(' ', '').split(',')
if num:
values = map(int, values)
data = zip(fields, values)
#print data
#print dict(data)
collection = db_init()
db_insert(collection, dict(data))
def get_var(conditions_list=None):
field_names = []
values = []
object_ids = []
collection = db_init()
for i in collection.find():
field_group = []
values_group = []
for key, value in i.iteritems():
if key != '_id':
field_group.append(str(key))
values_group.append(str(value))
else:
object_ids.append(str(value))
field_names.append(field_group)
values.append(values_group)
print field_names
print values
print object_ids
def list_vars():
pass
def clean_vars():
pass
if __name__ == '__main__':
#l = ['mongo', 'show dbs']
#print runcmds(l)
#set_var('name, age, size', 'As, 12, 13')
get_var()
Subscribe to:
Posts (Atom)