From 34213344b1867b09f4611a842614bda61c479fa0 Mon Sep 17 00:00:00 2001 From: Giacomo Govi Date: Wed, 25 Nov 2015 17:21:04 +0100 Subject: [PATCH] Released script for uploading conditions with the new ConditionsUpload service. Include the automatic test for all the synchronization types. --- .../Utilities/scripts/conditionUploadTest.py | 269 ++++++ .../Utilities/scripts/uploadConditions.py | 781 ++++++++++++++++++ 2 files changed, 1050 insertions(+) create mode 100644 CondCore/Utilities/scripts/conditionUploadTest.py create mode 100755 CondCore/Utilities/scripts/uploadConditions.py diff --git a/CondCore/Utilities/scripts/conditionUploadTest.py b/CondCore/Utilities/scripts/conditionUploadTest.py new file mode 100644 index 0000000000000..36e51e44e247e --- /dev/null +++ b/CondCore/Utilities/scripts/conditionUploadTest.py @@ -0,0 +1,269 @@ +#!/usr/bin/env python + +import cx_Oracle +import subprocess +import json +import os +import shutil +import datetime + +# Requirement 1: a conddb key for the authentication with valid permission on writing on prep CMS_CONDITIONS account +# this could be dropped introducing a specific entry in the .netrc +# Requirement 2: an entry "Dropbox" in the .netrc for the authentication + +class DB: + def __init__(self, serviceName, schemaName ): + self.serviceName = serviceName + self.schemaName = schemaName + self.connStr = None + + def connect( self ): + command = "cmscond_authentication_manager -s %s --list_conn | grep '%s@%s'" %(self.serviceName,self.schemaName,self.serviceName) + pipe = subprocess.Popen( command, shell=True,stdout=subprocess.PIPE,stderr=subprocess.STDOUT) + out = pipe.communicate()[0] + srvconn = '%s@%s' %(self.schemaName,self.serviceName) + rowpwd = out.split(srvconn)[1].split(self.schemaName)[1] + pwd = '' + for c in rowpwd: + if c != ' ' and c != '\n': + pwd += c + self.connStr = '%s/%s@%s' %(self.schemaName,pwd,self.serviceName) + + def setSynchronizationType( self, tag, synchType ): + db = cx_Oracle.connect(self.connStr) + cursor = db.cursor() + db.begin() + cursor.execute('UPDATE TAG SET SYNCHRONIZATION =:SYNCH WHERE NAME =:NAME',(synchType,tag,)) + db.commit() + + def getLastInsertedSince( self, tag, snapshot ): + db = cx_Oracle.connect(self.connStr) + cursor = db.cursor() + cursor.execute('SELECT SINCE, INSERTION_TIME FROM IOV WHERE TAG_NAME =:TAG_NAME AND INSERTION_TIME >:TIME ORDER BY INSERTION_TIME DESC',(tag,snapshot)) + row = cursor.fetchone() + return row + + def removeTag( self, tag ): + db = cx_Oracle.connect(self.connStr) + cursor = db.cursor() + db.begin() + cursor.execute('DELETE FROM IOV WHERE TAG_NAME =:TAG_NAME',(tag,)) + cursor.execute('DELETE FROM TAG WHERE NAME=:NAME',(tag,)) + db.commit() + +def makeBaseFile( inputTag, startingSince ): + cwd = os.getcwd() + baseFile = '%s_%s.db' %(inputTag,startingSince) + baseFilePath = os.path.join(cwd,baseFile) + if os.path.exists( baseFile ): + os.remove( baseFile ) + command = "conddb_import -c sqlite_file:%s -f oracle://cms_orcon_adg/CMS_CONDITIONS -i %s -t %s -b %s" %(baseFile,inputTag,inputTag,startingSince) + pipe = subprocess.Popen( command, shell=True,stdout=subprocess.PIPE,stderr=subprocess.STDOUT) + out = pipe.communicate()[0] + if not os.path.exists( baseFile ): + msg = 'ERROR: base file has not been created: %s' %out + raise Exception( msg ) + return baseFile + + +def makeMetadataFile( inputTag, destTag, since, description ): + cwd = os.getcwd() + metadataFile = os.path.join(cwd,'%s.txt') %destTag + if os.path.exists( metadataFile ): + os.remove( metadataFile ) + metadata = {} + metadata[ "destinationDatabase" ] = "oracle://cms_orcoff_prep/CMS_CONDITIONS" + tagList = {} + tagList[ destTag ] = { "dependencies": {}, "synchronizeTo": "any" } + metadata[ "destinationTags" ] = tagList + metadata[ "inputTag" ] = inputTag + metadata[ "since" ] = since + metadata[ "userText" ] = description + fileName = destTag+".txt" + with open( fileName, "w" ) as file: + file.write(json.dumps(metadata,file,indent=4,sort_keys=True)) + +def uploadFile( fileName, logFileName ): + command = "uploadConditions.py %s" %fileName + pipe = subprocess.Popen( command, shell=True,stdout=subprocess.PIPE,stderr=subprocess.STDOUT) + out = pipe.communicate()[0] + lines = out.split('\n') + ret = False + for line in lines: + if line.startswith('\t '): + if line.startswith('\t status : -2'): + print 'ERROR: upload of file %s failed.' %fileName + if line.startswith('\t %s' %fileName): + returnCode = line.split('\t %s :' %fileName)[1].strip() + if returnCode == 'True': + ret = True + with open(logFileName,'a') as logFile: + logFile.write(out) + return ret + +class UploadTest: + def __init__(self, db): + self.db = db + self.errors = 0 + self.logFileName = 'conditionUloadTest.log' + + def log( self, msg ): + print msg + with open(self.logFileName,'a') as logFile: + logFile.write(msg) + logFile.write('\n') + + def upload( self, inputTag, baseFile, destTag, synchro, destSince, success, expectedAction ): + insertedSince = None + destFile = '%s.db' %destTag + metaDestFile = '%s.txt' %destTag + shutil.copyfile( baseFile, destFile ) + self.log( '# ---------------------------------------------------------------------------') + self.log( '# Testing tag %s with synch=%s, destSince=%s - expecting ret=%s action=%s' %(destTag,synchro,destSince,success,expectedAction)) + + descr = 'Testing conditionsUpload with synch:%s - expected action: %s' %(synchro,expectedAction) + makeMetadataFile( inputTag, destTag, destSince, descr ) + beforeUpload = datetime.datetime.utcnow() + ret = uploadFile( destFile, self.logFileName ) + if ret != success: + self.log( 'ERROR: the return value for the upload of tag %s with sychro %s was %s, while the expected result is %s' %(destTag,synchro,ret,success)) + self.errors += 1 + else: + row = self.db.getLastInsertedSince( destTag, beforeUpload ) + if ret == True: + if expectedAction == 'CREATE' or expectedAction == 'INSERT' or expectedAction == 'APPEND': + if destSince != row[0]: + self.log( 'ERROR: the since inserted is %s, expected value is %s - expected action: %s' %(row[0],destSince,expectedAction)) + self.errors += 1 + else: + self.log( '# OK: Found expected value for last since inserted: %s timestamp: %s' %(row[0],row[1])) + insertedSince = row[0] + elif expectedAction == 'SYNCHRONIZE': + if destSince == row[0]: + self.log( 'ERROR: the since inserted %s has not been synchronized with the FCSR - expected action: %s' %(row[0],expectedAction)) + self.errors += 1 + else: + self.log( '# OK: Found synchronized value for the last since inserted: %s timestamp: %s' %(row[0],row[1])) + insertedSince = row[0] + else: + self.log( 'ERROR: found an appended since %s - expected action: %s' %(row[0],expectedAction)) + self.errors += 1 + else: + if not row is None: + self.log( 'ERROR: found new insered since: %s timestamp: %s' %(row[0],row[1])) + self.errors += 1 + if expectedAction != 'FAIL': + self.log( 'ERROR: Upload failed. Expected value: %s' %(destSince)) + self.errors += 1 + else: + self.log( '# OK: Upload failed as expected.') + os.remove( destFile ) + os.remove( metaDestFile ) + return insertedSince + + +def main(): + print 'Testing...' + serviceName = 'cms_orcoff_prep' + schemaName = 'CMS_CONDITIONS' + db = DB(serviceName,schemaName) + db.connect() + inputTag = 'runinfo_31X_mc' + bfile0 = makeBaseFile( inputTag,1) + bfile1 = makeBaseFile( inputTag,100) + test = UploadTest( db ) + # test with synch=any + tag = 'test_CondUpload_any' + test.upload( inputTag, bfile0, tag, 'any', 1, True, 'CREATE' ) + test.upload( inputTag, bfile1, tag, 'any', 1, False, 'FAIL' ) + test.upload( inputTag, bfile0, tag, 'any', 200, True, 'APPEND' ) + test.upload( inputTag, bfile0, tag, 'any', 100, True, 'INSERT') + test.upload( inputTag, bfile0, tag, 'any', 200, True, 'INSERT') + db.removeTag( tag ) + # test with synch=validation + tag = 'test_CondUpload_validation' + test.upload( inputTag, bfile0, tag, 'validation', 1, True, 'CREATE') + db.setSynchronizationType( tag, 'validation' ) + test.upload( inputTag, bfile0, tag, 'validation', 1, True, 'INSERT') + test.upload( inputTag, bfile0, tag, 'validation', 200, True, 'APPEND') + test.upload( inputTag, bfile0, tag, 'validation', 100, True, 'INSERT') + db.removeTag( tag ) + # test with synch=mc + tag = 'test_CondUpload_mc' + test.upload( inputTag, bfile1, tag, 'mc', 1, False, 'FAIL') + test.upload( inputTag, bfile0, tag, 'mc', 1, True, 'CREATE') + db.setSynchronizationType( tag, 'mc' ) + test.upload( inputTag, bfile0, tag, 'mc', 1, False, 'FAIL') + test.upload( inputTag, bfile0, tag, 'mc', 200, False, 'FAIL') + db.removeTag( tag ) + # test with synch=hlt + tag = 'test_CondUpload_hlt' + test.upload( inputTag, bfile0, tag, 'hlt', 1, True, 'CREATE') + db.setSynchronizationType( tag, 'hlt' ) + test.upload( inputTag, bfile0, tag, 'hlt', 200, True, 'SYNCHRONIZE') + fcsr = test.upload( inputTag, bfile0, tag, 'hlt', 100, True, 'SYNCHRONIZE') + if not fcsr is None: + since = fcsr + 200 + test.upload( inputTag, bfile0, tag, 'hlt', since, True, 'APPEND') + since = fcsr + 100 + test.upload( inputTag, bfile0, tag, 'hlt', since, True, 'INSERT') + db.removeTag( tag ) + # test with synch=express + tag = 'test_CondUpload_express' + test.upload( inputTag, bfile0, tag, 'express', 1, True, 'CREATE') + db.setSynchronizationType( tag, 'express' ) + test.upload( inputTag, bfile0, tag, 'express', 200, True, 'SYNCHRONIZE') + fcsr = test.upload( inputTag, bfile0, tag, 'express', 100, True, 'SYNCHRONIZE') + if not fcsr is None: + since = fcsr + 200 + test.upload( inputTag, bfile0, tag, 'express', since, True, 'APPEND') + since = fcsr + 100 + test.upload( inputTag, bfile0, tag, 'express', since, True, 'INSERT') + db.removeTag( tag ) + # test with synch=prompt + tag = 'test_CondUpload_prompt' + test.upload( inputTag, bfile0, tag, 'prompt', 1, True, 'CREATE') + db.setSynchronizationType( tag, 'prompt' ) + test.upload( inputTag, bfile0, tag, 'prompt', 200, True, 'SYNCHRONIZE') + fcsr = test.upload( inputTag, bfile0, tag, 'prompt', 100, True, 'SYNCHRONIZE') + if not fcsr is None: + since = fcsr + 200 + test.upload( inputTag, bfile0, tag, 'prompt', since, True, 'APPEND') + since = fcsr + 100 + test.upload( inputTag, bfile0, tag, 'prompt', since, True, 'INSERT') + db.removeTag( tag ) + # test with synch=pcl + tag = 'test_CondUpload_pcl' + test.upload( inputTag, bfile0, tag, 'pcl', 1, True, 'CREATE') + db.setSynchronizationType( tag, 'pcl' ) + test.upload( inputTag, bfile0, tag, 'pcl', 200, False, 'FAIL') + if not fcsr is None: + since = fcsr + 200 + test.upload( inputTag, bfile0, tag, 'pcl', since, True, 'APPEND') + since = fcsr + 100 + test.upload( inputTag, bfile0, tag, 'pcl', since, True, 'INSERT') + db.removeTag( tag ) + # test with synch=offline + tag = 'test_CondUpload_offline' + test.upload( inputTag, bfile0, tag, 'offline', 1, True, 'CREATE') + db.setSynchronizationType( tag, 'offline' ) + test.upload( inputTag, bfile0, tag, 'offline', 1000, True, 'APPEND') + test.upload( inputTag, bfile0, tag, 'offline', 500, False, 'FAIL' ) + test.upload( inputTag, bfile0, tag, 'offline', 1000, False, 'FAIL' ) + test.upload( inputTag, bfile0, tag, 'offline', 2000, True, 'APPEND' ) + db.removeTag( tag ) + # test with synch=runmc + tag = 'test_CondUpload_runmc' + test.upload( inputTag, bfile0, tag, 'runmc', 1, True, 'CREATE') + db.setSynchronizationType( tag, 'runmc' ) + test.upload( inputTag, bfile0, tag, 'runmc', 1000, True, 'APPEND') + test.upload( inputTag, bfile0, tag, 'runmc', 500, False, 'FAIL' ) + test.upload( inputTag, bfile0, tag, 'runmc', 1000, False, 'FAIL' ) + test.upload( inputTag, bfile0, tag, 'runmc', 2000, True, 'APPEND' ) + db.removeTag( tag ) + os.remove( bfile0 ) + os.remove( bfile1 ) + print 'Done. Errors: %s' %test.errors + +if __name__ == '__main__': + main() diff --git a/CondCore/Utilities/scripts/uploadConditions.py b/CondCore/Utilities/scripts/uploadConditions.py new file mode 100755 index 0000000000000..f687aff17ceaf --- /dev/null +++ b/CondCore/Utilities/scripts/uploadConditions.py @@ -0,0 +1,781 @@ +#!/usr/bin/env python +'''Script that uploads to the new CMS conditions uploader. +Adapted to the new infrastructure from v6 of the upload.py script for the DropBox from Miguel Ojeda. +''' + +__author__ = 'Andreas Pfeiffer' +__copyright__ = 'Copyright 2015, CERN CMS' +__credits__ = ['Giacomo Govi', 'Salvatore Di Guida', 'Miguel Ojeda', 'Andreas Pfeiffer'] +__license__ = 'Unknown' +__maintainer__ = 'Andreas Pfeiffer' +__email__ = 'andreas.pfeiffer@cern.ch' +__version__ = 1 + + +import os +import sys +import optparse +import hashlib +import tarfile +import netrc +import getpass +import errno +import sqlite3 +import json +import tempfile + +defaultBackend = 'online' +defaultHostname = 'cms-conddb-dev.cern.ch' +defaultUrlTemplate = 'https://%s/cmsDbUpload/' +defaultTemporaryFile = 'upload.tar.bz2' +defaultNetrcHost = 'Dropbox' +defaultWorkflow = 'offline' + +# common/http.py start (plus the "# Try to extract..." section bit) +import time +import logging +import cStringIO + +import pycurl +import copy + +def getInput(default, prompt = ''): + '''Like raw_input() but with a default and automatic strip(). + ''' + + answer = raw_input(prompt) + if answer: + return answer.strip() + + return default.strip() + + +def getInputWorkflow(prompt = ''): + '''Like getInput() but tailored to get target workflows (synchronization options). + ''' + + while True: + workflow = getInput(defaultWorkflow, prompt) + + if workflow in frozenset(['offline', 'hlt', 'express', 'prompt', 'pcl']): + return workflow + + logging.error('Please specify one of the allowed workflows. See above for the explanation on each of them.') + + +def getInputChoose(optionsList, default, prompt = ''): + '''Makes the user choose from a list of options. + ''' + + while True: + index = getInput(default, prompt) + + try: + return optionsList[int(index)] + except ValueError: + logging.error('Please specify an index of the list (i.e. integer).') + except IndexError: + logging.error('The index you provided is not in the given list.') + + +def getInputRepeat(prompt = ''): + '''Like raw_input() but repeats if nothing is provided and automatic strip(). + ''' + + while True: + answer = raw_input(prompt) + if answer: + return answer.strip() + + logging.error('You need to provide a value.') + + +def runWizard(basename, dataFilename, metadataFilename): + while True: + print '''\nWizard for metadata for %s + +I will ask you some questions to fill the metadata file. For some of the questions there are defaults between square brackets (i.e. []), leave empty (i.e. hit Enter) to use them.''' % basename + + # Try to get the available inputTags + try: + dataConnection = sqlite3.connect(dataFilename) + dataCursor = dataConnection.cursor() + dataCursor.execute('select name from sqlite_master where type == "table"') + tables = set(zip(*dataCursor.fetchall())[0]) + + # only conddb V2 supported... + if 'TAG' in tables: + dataCursor.execute('select NAME from TAG') + # In any other case, do not try to get the inputTags + else: + raise Exception() + + inputTags = dataCursor.fetchall() + if len(inputTags) == 0: + raise Exception() + inputTags = zip(*inputTags)[0] + + except Exception: + inputTags = [] + + if len(inputTags) == 0: + print '\nI could not find any input tag in your data file, but you can still specify one manually.' + + inputTag = getInputRepeat( + '\nWhich is the input tag (i.e. the tag to be read from the SQLite data file)?\ne.g. BeamSpotObject_ByRun\ninputTag: ') + + else: + print '\nI found the following input tags in your SQLite data file:' + for (index, inputTag) in enumerate(inputTags): + print ' %s) %s' % (index, inputTag) + + inputTag = getInputChoose(inputTags, '0', + '\nWhich is the input tag (i.e. the tag to be read from the SQLite data file)?\ne.g. 0 (you select the first in the list)\ninputTag [0]: ') + + destinationDatabase = getInputRepeat( + '\nWhich is the destination database where the tags should be exported? \ne.g. prod: oracle://cms_orcon_prod/CMS_CONDITIONS - prep: oracle://cms_orcoff_prep/CMS_CONDITIONS\ndestinationDatabase: ') + + while True: + since = getInput('', + '\nWhich is the given since? (if not specified, the one from the SQLite data file will be taken -- note that even if specified, still this may not be the final since, depending on the synchronization options you select later: if the synchronization target is not offline, and the since you give is smaller than the next possible one (i.e. you give a run number earlier than the one which will be started/processed next in prompt/hlt/express), the DropBox will move the since ahead to go to the first safe run instead of the value you gave)\ne.g. 1234\nsince []: ') + if not since: + since = None + break + else: + try: + since = int(since) + break + except ValueError: + logging.error('The since value has to be an integer or empty (null).') + + userText = getInput('', + '\nWrite any comments/text you may want to describe your request\ne.g. Muon alignment scenario for...\nuserText []: ') + + print ''' +Finally, we are going to add the destination tags. There must be at least one. +The tags (and its dependencies) can be synchronized to several workflows. You can synchronize to the following workflows: + * "offline" means no checks/synchronization will be done. + * "hlt" and "express" means that the IOV will be synchronized to the last online run number plus one (as seen by RunInfo). + * "prompt" means that the IOV will be synchronized to the smallest run number waiting for Prompt Reconstruction not having larger run numbers already released (as seen by the Tier0 monitoring). + * "pcl" is like "prompt", but the exportation will occur if and only if the begin time of the first IOV (as stored in the SQLite file or established by the since field in the metadata file) is larger than the first condition safe run number obtained from Tier0.''' + + destinationTags = {} + while True: + destinationTag = getInput('', + '\nWhich is the next destination tag to be added (leave empty to stop)?\ne.g. BeamSpotObjects_PCL_byRun_v0_offline\ndestinationTag []: ') + if not destinationTag: + if len(destinationTags) == 0: + logging.error('There must be at least one destination tag.') + continue + break + + if destinationTag in destinationTags: + logging.warning( + 'You already added this destination tag. Overwriting the previous one with this new one.') + + print( "The synchronization will be set to 'any' - the value is ignored for existing tags.") + synchronizeTo = 'any' + + dependencies = {} + + destinationTags[destinationTag] = { + 'synchronizeTo': synchronizeTo, + 'dependencies': dependencies, + } + + metadata = { + 'destinationDatabase': destinationDatabase, + 'destinationTags': destinationTags, + 'inputTag': inputTag, + 'since': since, + 'userText': userText, + } + + metadata = json.dumps(metadata, sort_keys=True, indent=4) + print '\nThis is the generated metadata:\n%s' % metadata + + if getInput('n', + '\nIs it fine (i.e. save in %s and *upload* the conditions if this is the latest file)?\nAnswer [n]: ' % metadataFilename).lower() == 'y': + break + logging.info('Saving generated metadata in %s...', metadataFilename) + with open(metadataFilename, 'wb') as metadataFile: + metadataFile.write(metadata) + +class HTTPError(Exception): + '''A common HTTP exception. + + self.code is the response HTTP code as an integer. + self.response is the response body (i.e. page). + ''' + + def __init__(self, code, response): + self.code = code + self.response = response + + # Try to extract the error message if possible (i.e. known error page format) + try: + self.args = (response.split('

')[1].split('

')[0], ) + except Exception: + self.args = (self.response, ) + + +CERN_SSO_CURL_CAPATH = '/etc/pki/tls/certs' + +class HTTP(object): + '''Class used for querying URLs using the HTTP protocol. + ''' + + retryCodes = frozenset([502, 503]) + + def __init__(self): + self.setBaseUrl() + self.setRetries() + + self.curl = pycurl.Curl() + self.curl.setopt(self.curl.COOKIEFILE, '') # in memory + + #-toDo: make sure we have the right options set here to use ssl + #-review(2015-09-25): check and see - action: AP + # self.curl.setopt(self.curl.SSL_VERIFYPEER, 1) + self.curl.setopt(self.curl.SSL_VERIFYPEER, 0) + self.curl.setopt(self.curl.SSL_VERIFYHOST, 2) + + self.baseUrl = None + + self.token = None + + def getCookies(self): + '''Returns the list of cookies. + ''' + return self.curl.getinfo(self.curl.INFO_COOKIELIST) + + def discardCookies(self): + '''Discards cookies. + ''' + self.curl.setopt(self.curl.COOKIELIST, 'ALL') + + + def setBaseUrl(self, baseUrl = ''): + '''Allows to set a base URL which will be prefixed to all the URLs + that will be queried later. + ''' + self.baseUrl = baseUrl + + + def setProxy(self, proxy = ''): + '''Allows to set a proxy. + ''' + self.curl.setopt(self.curl.PROXY, proxy) + + + def setTimeout(self, timeout = 0): + '''Allows to set a timeout. + ''' + self.curl.setopt(self.curl.TIMEOUT, timeout) + + + def setRetries(self, retries = ()): + '''Allows to set retries. + + The retries are a sequence of the seconds to wait per retry. + + The retries are done on: + * PyCurl errors (includes network problems, e.g. not being able + to connect to the host). + * 502 Bad Gateway (for the moment, to avoid temporary + Apache-CherryPy issues). + * 503 Service Temporarily Unavailable (for when we update + the frontends). + ''' + self.retries = retries + + def getToken(self, username, password): + + url = self.baseUrl + 'token' + + self.curl.setopt(pycurl.URL, url) + self.curl.setopt(pycurl.VERBOSE, 0) + + #-toDo: check if/why these are needed ... + #-ap: hmm ... + # self.curl.setopt(pycurl.DNS_CACHE_TIMEOUT, 0) + # self.curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4) + #-end hmmm ... + #-review(2015-09-25): check and see - action: AP + + + self.curl.setopt(pycurl.HTTPHEADER, ['Accept: application/json']) + # self.curl.setopt( self.curl.POST, {}) + self.curl.setopt(self.curl.HTTPGET, 0) + + response = cStringIO.StringIO() + self.curl.setopt(pycurl.WRITEFUNCTION, response.write) + self.curl.setopt(pycurl.USERPWD, '%s:%s' % (username, password) ) + + logging.debug('going to connect to server at: %s' % url ) + + self.curl.perform() + code = self.curl.getinfo(pycurl.RESPONSE_CODE) + logging.debug('got: %s ', str(code)) + + try: + self.token = json.loads( response.getvalue() )['token'] + except Exception, e: + logging.error('http::getToken> got error from server: %s ', str(e) ) + if 'No JSON object could be decoded' in str(e): + return None + logging.error("error getting token: %s", str(e)) + return None + + logging.debug('token: %s', self.token) + logging.debug('returning: %s', response.getvalue()) + + return response.getvalue() + + def query(self, url, data = None, files = None, keepCookies = True): + '''Queries a URL, optionally with some data (dictionary). + + If no data is specified, a GET request will be used. + If some data is specified, a POST request will be used. + + If files is specified, it must be a dictionary like data but + the values are filenames. + + By default, cookies are kept in-between requests. + + A HTTPError exception is raised if the response's HTTP code is not 200. + ''' + + if not keepCookies: + self.discardCookies() + + url = self.baseUrl + url + + # make sure the logs are safe ... at least somewhat :) + data4log = copy.copy(data) + if data4log: + if 'password' in data4log.keys(): + data4log['password'] = '*' + + retries = [0] + list(self.retries) + + while True: + logging.debug('Querying %s with data %s and files %s (retries left: %s, current sleep: %s)...', url, data4log, files, len(retries), retries[0]) + + time.sleep(retries.pop(0)) + + try: + self.curl.setopt(self.curl.URL, url) + self.curl.setopt(self.curl.HTTPGET, 1) + + # from now on we use the token we got from the login + self.curl.setopt(pycurl.USERPWD, '%s:""' % ( str(self.token), ) ) + self.curl.setopt(pycurl.HTTPHEADER, ['Accept: application/json']) + + if data is not None or files is not None: + # If there is data or files to send, use a POST request + + finalData = {} + + if data is not None: + finalData.update(data) + + if files is not None: + for (key, fileName) in files.items(): + finalData[key] = (self.curl.FORM_FILE, fileName) + self.curl.setopt( self.curl.HTTPPOST, finalData.items() ) + + self.curl.setopt(pycurl.VERBOSE, 0) + + response = cStringIO.StringIO() + self.curl.setopt(self.curl.WRITEFUNCTION, response.write) + self.curl.perform() + + code = self.curl.getinfo(self.curl.RESPONSE_CODE) + + if code in self.retryCodes and len(retries) > 0: + logging.debug('Retrying since we got the %s error code...', code) + continue + + if code != 200: + raise HTTPError(code, response.getvalue()) + + return response.getvalue() + + except pycurl.error as e: + if len(retries) == 0: + raise e + logging.debug('Retrying since we got the %s pycurl exception...', str(e)) + +# common/http.py end + +def addToTarFile(tarFile, fileobj, arcname): + tarInfo = tarFile.gettarinfo(fileobj = fileobj, arcname = arcname) + tarInfo.mode = 0400 + tarInfo.uid = tarInfo.gid = tarInfo.mtime = 0 + tarInfo.uname = tarInfo.gname = 'root' + tarFile.addfile(tarInfo, fileobj) + +class ConditionsUploader(object): + '''Upload conditions to the CMS conditions uploader service. + ''' + + def __init__(self, hostname = defaultHostname, urlTemplate = defaultUrlTemplate): + self.hostname = hostname + self.userName = None + self.http = HTTP() + self.http.setBaseUrl(urlTemplate % hostname) + + + def signIn(self, username, password): + '''Signs in the server. + ''' + + logging.info('%s: Signing in user %s ...', self.hostname, username) + try: + self.token = self.http.getToken(username, password) + except Exception, e: + logging.error("Caught exception when trying to get token for user %s from %s: %s" % (username, self.hostname, str(e)) ) + return False + + if not self.token: + logging.error("could not get token for user %s from %s" % (username, self.hostname) ) + return False + + logging.debug( "got: '%s'", str(self.token) ) + self.userName = username + + return True + + def signOut(self): + '''Signs out the server. + ''' + + logging.info('%s: Signing out...', self.hostname) + # self.http.query('logout') + self.token = None + + + def _checkForUpdates(self): + '''Updates this script, if a new version is found. + ''' + + logging.debug('%s: Checking if a newer version of this script is available ...', self.hostname) + version = int(self.http.query('getUploadScriptVersion')) + + if version <= __version__: + logging.debug('%s: Script is up-to-date.', self.hostname) + return + + logging.info('%s: Updating to a newer version (%s) than the current one (%s): downloading ...', self.hostname, version, __version__) + + uploadScript = self.http.query('getUploadScript') + + self.signOut() + + logging.info('%s: ... saving the new version ...', self.hostname) + with open(sys.argv[0], 'wb') as f: + f.write(uploadScript) + + logging.info('%s: ... executing the new version...', self.hostname) + os.execl(sys.executable, *([sys.executable] + sys.argv)) + + + def uploadFile(self, filename, backend = defaultBackend, temporaryFile = defaultTemporaryFile): + '''Uploads a file to the dropBox. + + The filename can be without extension, with .db or with .txt extension. + It will be stripped and then both .db and .txt files are used. + ''' + + basepath = filename.rsplit('.db', 1)[0].rsplit('.txt', 1)[0] + basename = os.path.basename(basepath) + + logging.debug('%s: %s: Creating tar file for upload ...', self.hostname, basename) + + try: + tarFile = tarfile.open(temporaryFile, 'w:bz2') + + with open('%s.db' % basepath, 'rb') as data: + addToTarFile(tarFile, data, 'data.db') + except Exception, e: + msg = 'Error when creating tar file. \n' + msg += 'Please check that you have write access to the directory you are running,\n' + msg += 'and that you have enough space on this disk (df -h .)\n' + logging.error(msg) + raise Exception(msg) + + with tempfile.NamedTemporaryFile() as metadata: + with open('%s.txt' % basepath, 'rb') as originalMetadata: + json.dump(json.load(originalMetadata), metadata, sort_keys = True, indent = 4) + + metadata.seek(0) + addToTarFile(tarFile, metadata, 'metadata.txt') + + tarFile.close() + + logging.debug('%s: %s: Calculating hash...', self.hostname, basename) + + fileHash = hashlib.sha1() + with open(temporaryFile, 'rb') as f: + while True: + data = f.read(4 * 1024 * 1024) + if not data: + break + fileHash.update(data) + + fileHash = fileHash.hexdigest() + fileInfo = os.stat(temporaryFile) + fileSize = fileInfo.st_size + + logging.debug('%s: %s: Hash: %s', self.hostname, basename, fileHash) + + logging.info('%s: %s: Uploading file (%s, size %s) to the %s backend...', self.hostname, basename, fileHash, fileSize, backend) + os.rename(temporaryFile, fileHash) + try: + ret = self.http.query('uploadFile', + { + 'backend': backend, + 'fileName': basename, + 'userName': self.userName, + }, + files = { + 'uploadedFile': fileHash, + } + ) + except Exception, e: + logging.error('Error from uploading: %s' % str(e)) + ret = json.dumps( { "status": -1, "upload" : { 'itemStatus' : { basename : {'status':'failed', 'info':str(e)}}}, "error" : str(e)} ) + + os.unlink(fileHash) + + statusInfo = json.loads(ret)['upload'] + logging.debug( 'upload returned: %s', statusInfo ) + + okTags = [] + skippedTags = [] + failedTags = [] + for tag, info in statusInfo['itemStatus'].items(): + logging.debug('checking tag %s, info %s', tag, str(json.dumps(info, indent=4,sort_keys=True)) ) + if 'ok' in info['status'].lower() : + okTags.append( tag ) + logging.info('tag %s successfully uploaded', tag) + if 'skip' in info['status'].lower() : + skippedTags.append( tag ) + logging.warning('found tag %s to be skipped. reason: \n ... \t%s ', tag, info['info']) + if 'fail' in info['status'].lower() : + failedTags.append( tag ) + logging.error('found tag %s failed to upload. reason: \n ... \t%s ', tag, info['info']) + + if len(okTags) > 0: logging.info ("tags sucessfully uploaded: %s ", str(okTags) ) + if len(skippedTags) > 0: logging.warning("tags SKIPped to upload : %s ", str(skippedTags) ) + if len(failedTags) > 0: logging.error ("tags FAILed to upload : %s ", str(failedTags) ) + + fileLogURL = 'https://cms-conddb-dev.cern.ch/logs/dropBox/getFileLog?fileHash=%s' + logging.info('file log at: %s', fileLogURL % fileHash) + + return len(okTags)>0 + +def authenticateUser(dropBox, options): + + try: + # Try to find the netrc entry + (username, account, password) = netrc.netrc().authenticators(options.netrcHost) + except Exception: + # netrc entry not found, ask for the username and password + logging.info( + 'netrc entry "%s" not found: if you wish not to have to retype your password, you can add an entry in your .netrc file. However, beware of the risks of having your password stored as plaintext. Instead.', + options.netrcHost) + + # Try to get a default username + defaultUsername = getpass.getuser() + if defaultUsername is None: + defaultUsername = '(not found)' + + username = getInput(defaultUsername, '\nUsername [%s]: ' % defaultUsername) + password = getpass.getpass('Password: ') + + # Now we have a username and password, authenticate with them + return dropBox.signIn(username, password) + + +def uploadAllFiles(options, arguments): + + results = {} + + # Check that we can read the data and metadata files + # If the metadata file does not exist, start the wizard + for filename in arguments: + basepath = filename.rsplit('.db', 1)[0].rsplit('.txt', 1)[0] + basename = os.path.basename(basepath) + dataFilename = '%s.db' % basepath + metadataFilename = '%s.txt' % basepath + + logging.info('Checking %s...', basename) + + # Data file + try: + with open(dataFilename, 'rb') as dataFile: + pass + except IOError as e: + logging.error('Impossible to open SQLite data file %s', dataFilename) + return -3 + + # Metadata file + try: + with open(metadataFilename, 'rb') as metadataFile: + pass + except IOError as e: + if e.errno != errno.ENOENT: + logging.error('Impossible to open file %s (for other reason than not existing)', metadataFilename) + return -4 + + if getInput('y', '\nIt looks like the metadata file %s does not exist. Do you want me to create it and help you fill it?\nAnswer [y]: ' % metadataFilename).lower() != 'y': + logging.error('Metadata file %s does not exist', metadataFilename) + return -5 + + # Wizard + runWizard(basename, dataFilename, metadataFilename) + + # Upload files + try: + dropBox = ConditionsUploader(options.hostname, options.urlTemplate) + + # Authentication + if not authenticateUser(dropBox, options): + logging.error("Error authenticating user. Aborting.") + return { 'status' : -2, 'error' : "Error authenticating user. Aborting." } + + # At this point we must be authenticated + dropBox._checkForUpdates() + + for filename in arguments: + results[filename] = dropBox.uploadFile(filename, options.backend, options.temporaryFile) + logging.debug("all files uploaded, logging out now.") + + dropBox.signOut() + + except HTTPError as e: + logging.error('got HTTP error: %s', str(e)) + return { 'status' : -1, 'error' : str(e) } + + return results + +def uploadTier0Files(filenames, username, password, cookieFileName = None): + '''Uploads a bunch of files coming from Tier0. + This has the following requirements: + * Username/Password based authentication. + * Uses the online backend. + * Ignores errors related to the upload/content (e.g. duplicated file). + ''' + + dropBox = ConditionsUploader() + + dropBox.signIn(username, password) + + for filename in filenames: + try: + result = dropBox.uploadFile(filename, backend = 'test') + except HTTPError as e: + if e.code == 400: + # 400 Bad Request: This is an exception related to the upload + # being wrong for some reason (e.g. duplicated file). + # Since for Tier0 this is not an issue, continue + logging.error('HTTP Exception 400 Bad Request: Upload-related, skipping. Message: %s', e) + continue + + # In any other case, re-raise. + raise + + #-toDo: add a flag to say if we should retry or not. So far, all retries are done server-side (Tier-0), + # if we flag as failed any retry would not help and would result in the same error (e.g. + # when a file with an identical hash is uploaded again) + #-review(2015-09-25): get feedback from tests at Tier-0 (action: AP) + + if not result: # dropbox reported an error when uploading, do not retry. + logging.error('Error from dropbox, upload-related, skipping.') + continue + + dropBox.signOut() + + +def main(): + '''Entry point. + ''' + + parser = optparse.OptionParser(usage = + 'Usage: %prog [options] [ ...]\n' + ) + + parser.add_option('-d', '--debug', + dest = 'debug', + action="store_true", + default = False, + help = 'Switch on printing debug information. Default: %default', + ) + + parser.add_option('-b', '--backend', + dest = 'backend', + default = defaultBackend, + help = 'dropBox\'s backend to upload to. Default: %default', + ) + + parser.add_option('-H', '--hostname', + dest = 'hostname', + default = defaultHostname, + help = 'dropBox\'s hostname. Default: %default', + ) + + parser.add_option('-u', '--urlTemplate', + dest = 'urlTemplate', + default = defaultUrlTemplate, + help = 'dropBox\'s URL template. Default: %default', + ) + + parser.add_option('-f', '--temporaryFile', + dest = 'temporaryFile', + default = defaultTemporaryFile, + help = 'Temporary file that will be used to store the first tar file. Note that it then will be moved to a file with the hash of the file as its name, so there will be two temporary files created in fact. Default: %default', + ) + + parser.add_option('-n', '--netrcHost', + dest = 'netrcHost', + default = defaultNetrcHost, + help = 'The netrc host (machine) from where the username and password will be read. Default: %default', + ) + + (options, arguments) = parser.parse_args() + + if len(arguments) < 1: + parser.print_help() + return -2 + + logLevel = logging.INFO + if options.debug: + logLevel = logging.DEBUG + logging.basicConfig( + format = '[%(asctime)s] %(levelname)s: %(message)s', + level = logLevel, + ) + + results = uploadAllFiles(options, arguments) + + print "uploadAllFiles returned:" + for hash, res in results.items(): + print "\t %s : %s " % (hash, str(res)) + +def testTier0Upload(): + + global defaultNetrcHost + + (username, account, password) = netrc.netrc().authenticators(defaultNetrcHost) + + filenames = ['testFiles/localSqlite-top2'] + + uploadTier0Files(filenames, username, password, cookieFileName = None) + + +if __name__ == '__main__': + + sys.exit(main()) + # testTier0Upload()