diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 816186f8dd27..426806828416 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -36,6 +36,7 @@ Changed - update the local simulators for accepting ``Qobj`` as input. (#667) - Use ``get_status_job()`` for checking IBMQJob status. (#641) - Q network hub/group/project credentials replaced by new url format. (#740) +- ``Jobs`` API simplification and breaking changes. (#686) Removed ------- diff --git a/qiskit/backends/__init__.py b/qiskit/backends/__init__.py index 3bd501858a4a..a7d30fd6cc27 100644 --- a/qiskit/backends/__init__.py +++ b/qiskit/backends/__init__.py @@ -10,3 +10,5 @@ from .baseprovider import BaseProvider from .basejob import BaseJob from .jobstatus import JobStatus +from .joberror import JobError +from .jobtimeouterror import JobTimeoutError diff --git a/qiskit/backends/basejob.py b/qiskit/backends/basejob.py index 92a5ca665afa..2c156747b87b 100644 --- a/qiskit/backends/basejob.py +++ b/qiskit/backends/basejob.py @@ -32,28 +32,7 @@ def cancel(self): """Attempt to cancel job.""" pass - # Property attributes - ##################### - @property @abstractmethod def status(self): """Get backend status dictionary""" pass - - @property - @abstractmethod - def running(self): - """True if job is currently running.""" - pass - - @property - @abstractmethod - def done(self): - """True if call was successfully finished.""" - pass - - @property - @abstractmethod - def cancelled(self): - """True if call was successfully cancelled""" - pass diff --git a/qiskit/backends/ibmq/ibmqbackend.py b/qiskit/backends/ibmq/ibmqbackend.py index 51d6e3b58de8..1dddc44a72b7 100644 --- a/qiskit/backends/ibmq/ibmqbackend.py +++ b/qiskit/backends/ibmq/ibmqbackend.py @@ -53,7 +53,9 @@ def run(self, qobj): Returns: IBMQJob: an instance derived from BaseJob """ - return IBMQJob(qobj, self._api, not self.configuration['simulator']) + job = IBMQJob(self._api, not self.configuration['simulator'], qobj=qobj) + job.submit() + return job @property def calibration(self): @@ -216,7 +218,10 @@ def jobs(self, limit=50, skip=0, status=None, db_filter=None): job_list = [] for job_info in job_info_list: is_device = not bool(self._configuration.get('simulator')) - job = IBMQJob.from_api(job_info, self._api, is_device) + job = IBMQJob(self._api, is_device, + job_id=job_info.get('id'), + backend_name=job_info.get('backend').get('name'), + creation_date=job_info.get('creationDate')) job_list.append(job) return job_list @@ -236,7 +241,10 @@ def retrieve_job(self, job_id): if 'error' in job_info: raise IBMQBackendError('failed to get job id "{}"'.format(job_id)) is_device = not bool(self._configuration.get('simulator')) - job = IBMQJob.from_api(job_info, self._api, is_device) + job = IBMQJob(self._api, is_device, + job_id=job_info.get('id'), + backend_name=job_info.get('backend').get('name'), + creation_date=job_info.get('creationDate')) return job diff --git a/qiskit/backends/ibmq/ibmqjob.py b/qiskit/backends/ibmq/ibmqjob.py index 64e67314c4bc..36c670a86379 100644 --- a/qiskit/backends/ibmq/ibmqjob.py +++ b/qiskit/backends/ibmq/ibmqjob.py @@ -15,6 +15,7 @@ import time import logging import pprint +import contextlib import json import datetime import numpy @@ -23,11 +24,9 @@ from qiskit.qobj import qobj_to_dict from qiskit.transpiler import transpile -from qiskit.backends import BaseJob +from qiskit.backends import BaseJob, JobError, JobTimeoutError from qiskit.backends.jobstatus import JobStatus, JOB_FINAL_STATES -from qiskit._qiskiterror import QISKitError from qiskit._result import Result -from qiskit._resulterror import ResultError logger = logging.getLogger(__name__) @@ -42,79 +41,133 @@ class IBMQJob(BaseJob): - """IBM Q Job class + """Represent the jobs that will be executed on IBM-Q simulators and real + devices. Jobs are intended to be created calling ``run()`` on a particular + backend. + + Creating a ``Job`` instance does not imply running it. You need to do it in + separate steps: + + .. highlight:: + + job = IBMQJob(...) + job.submit() # It won't block. + + An error while submitting a job will cause the next call to ``status()`` to + raise. If submitting the job successes, you can inspect the job's status by + using ``status()``. Status can be one of ``JobStatus`` members: + + .. highlight:: + + from qiskit.backends.jobstatus import JobStatus + + job = IBMQJob(...) + job.submit() + + try: + job_status = job.status() # It won't block. It will query the backend API. + if job_status is JobStatus.RUNNING: + print('The job is still running') + + except JobError as ex: + print("Something wrong happened!: {}".format(ex)) + + A call to ``status()`` can raise if something happens at the API level that + prevents Qiskit from determining the status of the job. An example of this + is a temporary connection lose or a network failure. + + The ``submit()`` and ``status()`` methods are examples of non-blocking API. + ``Job`` instances also have `id()` and ``result()`` methods which will + block: + + .. highlight:: + + job = IBMQJob(...) + job.submit() + + try: + job_id = job.id() # It will block until completing submission. + print('The job {} was successfully submitted'.format(job_id)) + + job_result = job.result() # It will block until finishing. + print('The job finished with result {}'.format(job_result)) + + except JobError as ex: + print("Something wrong happened!: {}".format(ex)) + + + Both methods can raise if something ath the API level happens that prevent + Qiskit from determining the status of the job. + + .. NOTE:: + When querying the API for getting the status, two kinds of errors are + possible. The most severe is the one preventing Qiskit from getting a + response from the backend. This can be caused by a network failure or a + temporary system break. In these cases, calling ``status()`` will raise. + + If Qiskit successfully retrieves the status of a job, it could be it + finished with errors. In that case, ``status()`` will simply return + ``JobStatus.ERROR`` and you can call ``error_message()`` to get more + info. Attributes: _executor (futures.Executor): executor to handle asynchronous jobs - _final_states (list(JobStatus)): terminal states of async jobs """ _executor = futures.ThreadPoolExecutor() - def __init__(self, qobj, api, is_device): + def __init__(self, api, is_device, qobj=None, job_id=None, backend_name=None, + creation_date=None): """IBMQJob init function. + We can instantiate jobs from two sources: A QObj, and an already submitted job returned by + the API servers. Args: - qobj (Qobj): job description api (IBMQuantumExperience): IBM Q API is_device (bool): whether backend is a real device # TODO: remove this after Qobj + qobj (Qobj): The Quantum Object. See notes below + job_id (String): The job ID of an already submitted job. + backend_name(String): The name of the backend that run the job. + creation_date(String): When the job was run. + + Notes: + It is mandatory to pass either ``qobj`` or ``job_id``. Passing a ``qobj`` + will ignore ``job_id`` and will create an instance representing + an already-created job retrieved from the API server. """ super().__init__() - self._qobj = qobj + self._job_data = None + if qobj is not None: + # TODO: No need for this conversion, just use the new equivalent members above + old_qobj = qobj_to_dict(qobj, version='0.0.1') + self._job_data = { + 'circuits': old_qobj['circuits'], + 'hpc': old_qobj['config'].get('hpc'), + 'seed': old_qobj['circuits'][0]['config']['seed'], + 'shots': old_qobj['config']['shots'], + 'max_credits': old_qobj['config']['max_credits'] + } + self._future_captured_exception = None self._api = api - self._id = None # this must be before creating the future - self._backend_name = self._qobj.header.backend_name + self._id = job_id + self._backend_name = qobj.header.backend_name if qobj is not None else backend_name self._status = JobStatus.INITIALIZING - self._future_submit = self._executor.submit(self._submit) - self._status_msg = 'Job is initializing. Please, wait a moment.' + # In case of not providing a qobj, it assumes job_id has been provided + # and query the API for updating the status. + if qobj is None: + self.status() self._queue_position = None self._cancelled = False - self._exception = None self._is_device = is_device - self.creation_date = datetime.datetime.utcnow().replace( - tzinfo=datetime.timezone.utc).isoformat() - - @classmethod - def from_api(cls, job_info, api, is_device): - """Instantiates job using information returned from - IBMQuantumExperience about a particular job. - Args: - job_info (dict): This is the information about a job returned from - the API. It has the simplified structure:: - - {'backend': {'id', 'backend id string', - 'name', 'ibmqx4'}, - 'id': 'job id string', - 'qasms': [{'executionId': 'id string', - 'qasm': 'qasm string'}, - ] - 'status': 'status string', - 'seed': '1', - 'shots': 1024, - 'status': 'status string', - 'usedCredits': 3, - 'creationDate': '2018-06-13T04:31:13.175Z' - 'userId': 'user id'} + def current_utc_time(): + """Gets the current time in UTC format""" + datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat() - api (IBMQuantumExperience): IBM Q API - is_device (bool): whether backend is a real device # TODO: remove this after Qobj - - Returns: - IBMQJob: an instance of this class - """ - job_instance = cls.__new__(cls) - job_instance._status = JobStatus.QUEUED - job_instance._backend_name = job_info.get('backend').get('name') - job_instance._api = api - job_instance._id = job_info.get('id') - job_instance._exception = None # needs to be before status call below - job_instance._status_msg = None - job_instance._queue_position = None - job_instance._cancelled = False - job_instance._is_device = is_device - job_instance.creation_date = job_info.get('creationDate') - return job_instance + self._creation_date = creation_date or current_utc_time() + self._future = None + self._api_error_msg = None + # pylint: disable=arguments-differ def result(self, timeout=None, wait=5): """Return the result from the job. @@ -126,311 +179,197 @@ def result(self, timeout=None, wait=5): Result: Result object Raises: - IBMQJobError: exception raised during job initialization + JobError: exception raised during job initialization """ - # pylint: disable=arguments-differ - while self._status == JobStatus.INITIALIZING: - if self._future_submit.exception(): - raise IBMQJobError('error submitting job: {}'.format( - repr(self._future_submit.exception()))) - time.sleep(0.1) + self._wait_for_submission() try: this_result = self._wait_for_job(timeout=timeout, wait=wait) except TimeoutError as err: # A timeout error retrieving the results does not imply the job - # is failing. The job can be still running. + # is failing. The job can be still running. This is why we are not + # throwing an exception here. return Result({'id': self._id, 'status': 'ERROR', 'result': str(err)}) + except ApiError as api_err: + raise JobError(str(api_err)) - if self._is_device and self.done: + if self._is_device and self.status() == JobStatus.DONE: _reorder_bits(this_result) - if self._status not in JOB_FINAL_STATES: - if this_result.get_status() == 'ERROR': - self._status = JobStatus.ERROR - else: - self._status = JobStatus.DONE return this_result def cancel(self): - """Attempt to cancel a job. If it is not possible, check the - ```exception``` property for more information. + """Attempt to cancel a job. Returns: bool: True if job can be cancelled, else False. Currently this is only possible on commercial systems. Raises: - IBMQJobError: if server returned error + JobError: if there was some unexpected failure in the server """ hub = self._api.config.get('hub', None) group = self._api.config.get('group', None) project = self._api.config.get('project', None) - cancelled = False try: response = self._api.cancel_job(self._id, hub, group, project) - errored = 'error' in response - cancelled = not errored + self._cancelled = 'error' not in response + return self._cancelled + except ApiError as error: + self._cancelled = False + raise JobError('Error cancelling job: %s' % error.usr_msg) - if errored: - err_msg = response.get('error', '') - error = IBMQJobError('Error cancelling job: %s' % err_msg) - self._exception = error + def status(self): + """Query the API to update the status. - except ApiError as error: - err_msg = error.usr_msg - error = IBMQJobError('Error cancelling job: %s' % err_msg) - self._exception = error + Returns: + JobStatus: The status of the job, once updated. - self._cancelled = cancelled - return self._cancelled + Raises: + JobError: if there was an exception in the future being executed + or the server sent an unknown answer. + """ - @property - def status(self): - self._update_status() - stats = { - 'job_id': self._id, - 'status': self._status, - 'status_msg': self._status_msg - } - if self._queue_position: - stats['queue_position'] = self._queue_position - # Reset once consumed to allow _update_status to regenerate the - # value if needed. - self._queue_position = None - return stats - - def _update_status(self): - """Query the API to update the status.""" - if (self._status in JOB_FINAL_STATES or - self._status == JobStatus.INITIALIZING): - return None + # Implies self._id is None + if self._future_captured_exception is not None: + raise JobError(str(self._future_captured_exception)) - try: - api_job = self._api.get_status_job(self.id) - if api_job['status'] in API_FINAL_STATES: - # Call the endpoint that returns full information. - api_job = self._api.get_job(self.id) + if self._id is None or self._status in JOB_FINAL_STATES: + return self._status + try: + # TODO: See result values + api_job = self._api.get_status_job(self._id) if 'status' not in api_job: - raise QISKitError('get_job didn\'t return status: %s' % - pprint.pformat(api_job)) + raise JobError('get_job didn\'t return status: %s' % + pprint.pformat(api_job)) # pylint: disable=broad-except except Exception as err: - self._status = JobStatus.ERROR - self._exception = err - self._status_msg = '{}'.format(err) - return None + raise JobError(str(err)) if api_job['status'] == 'VALIDATING': self._status = JobStatus.VALIDATING - self._status_msg = self._status.value elif api_job['status'] == 'RUNNING': self._status = JobStatus.RUNNING - self._status_msg = self._status.value - queued, queue_position = self._is_job_queued(api_job) + queued, self._queue_position = _is_job_queued(api_job) if queued: self._status = JobStatus.QUEUED - self._status_msg = self._status.value - if queue_position: - self._queue_position = queue_position elif api_job['status'] == 'COMPLETED': self._status = JobStatus.DONE - self._status_msg = self._status.value elif api_job['status'] == 'CANCELLED': self._status = JobStatus.CANCELLED - self._status_msg = self._status.value self._cancelled = True elif 'ERROR' in api_job['status']: - # Errored status are of the form "ERROR_*_JOB" - self._status = JobStatus.ERROR - self._status_msg = api_job['status'] - - elif self.exception or self._future_submit.exception(): + # Error status are of the form "ERROR_*_JOB" self._status = JobStatus.ERROR - if self._future_submit.exception(): - self._exception = self._future_submit.exception() - self._status_msg = str(self.exception) + # TODO: This seems to be an inconsistency in the API package. + self._api_error_msg = api_job.get('error') or api_job.get('Error') else: - self._status = JobStatus.ERROR - self._exception = IBMQJobError( - 'Unrecognized result: \n{}'.format(pprint.pformat(api_job))) - self._status_msg = '{}'.format(self._exception) - - return api_job - - def _is_job_queued(self, api_job): - is_queued, position = False, None - if 'infoQueue' in api_job: - if 'status' in api_job['infoQueue']: - queue_status = api_job['infoQueue']['status'] - is_queued = queue_status == 'PENDING_IN_QUEUE' - if 'position' in api_job['infoQueue']: - position = api_job['infoQueue']['position'] - return is_queued, position - - @property - def queued(self): - """ - Returns whether job is queued. + raise JobError('Unrecognized answer from server: \n{}' + .format(pprint.pformat(api_job))) - Returns: - bool: True if job is queued, else False. + return self._status - Raises: - QISKitError: couldn't get job status from server - """ - return self.status['status'] == JobStatus.QUEUED + def error_message(self): + """Return the error message returned from the API server response.""" + return self._api_error_msg - @property - def running(self): - """ - Returns whether job is actively running + def queue_position(self): + """Return the position in the server queue. Returns: - bool: True if job is running, else False. - - Raises: - QISKitError: couldn't get job status from server + Number: Position in the queue. """ - return self.status['status'] == JobStatus.RUNNING + return self._queue_position - @property - def validating(self): + def creation_date(self): """ - Returns whether job is being validated - - Returns: - bool: True if job is under validation, else False. - - Raises: - QISKitError: couldn't get job status from server + Return creation date. """ - return self.status['status'] == JobStatus.VALIDATING + return self._creation_date - @property - def done(self): - """ - Returns True if job successfully finished running. - - Note behavior is slightly different than Future objects which would - also return true if successfully cancelled. - """ - return self.status['status'] == JobStatus.DONE - - @property - def cancelled(self): - return self._cancelled - - @property - def exception(self): - """ - Return Exception object previously raised by job else None - - Returns: - Exception: exception raised by job - """ - if isinstance(self._exception, Exception): - self._status_msg = str(self._exception) - return self._exception - - @property + # pylint: disable=invalid-name def id(self): + """Return backend determined id. + + If the Id is not set because the job is already initializing, this call + will block until we have an Id. """ - Return backend determined id (also available in status method). - """ - # pylint: disable=invalid-name - while self._id is None and self._status not in JOB_FINAL_STATES: - if self._future_submit.exception(): - self._status = JobStatus.ERROR - self._exception = self._future_submit.exception() - # job is initializing and hasn't gotten a id yet. - time.sleep(0.1) + self._wait_for_submission() return self._id - @property def backend_name(self): - """ - Return backend name used for this job - """ + """Return backend name used for this job.""" return self._backend_name - def _submit(self): - """Submit job to IBM Q. - - Returns: - dict: submission info including job id from server + def submit(self): + """Submit job to IBM-Q. Raises: - QISKitError: The backend name in the job doesn't match this backend. - ResultError: If the API reported an error with the submitted job. - RegisterSizeError: If the requested register size exceeded device - capability. + JobError: If we have already submitted the job. """ - qobj = qobj_to_dict(self._qobj, version='0.0.1') + if self._future is not None or self._id is not None: + raise JobError("We have already submitted the job!") + api_jobs = [] - for circuit in qobj['circuits']: - job = {} - if not circuit.get('compiled_circuit_qasm', None): - compiled_circuit = transpile(circuit['circuit']) - circuit['compiled_circuit_qasm'] = compiled_circuit.qasm(qeflag=True) - if isinstance(circuit['compiled_circuit_qasm'], bytes): - job['qasm'] = circuit['compiled_circuit_qasm'].decode() - else: - job['qasm'] = circuit['compiled_circuit_qasm'] - if circuit.get('name', None): - job['name'] = circuit['name'] - # convert numpy types for json serialization - compiled_circuit = json.loads( - json.dumps(circuit['compiled_circuit'], - default=_numpy_type_converter)) - job['metadata'] = {'compiled_circuit': compiled_circuit} + circuits = self._job_data['circuits'] + for circuit in circuits: + job = _create_api_job_from_circuit(circuit) api_jobs.append(job) - seed0 = qobj['circuits'][0]['config']['seed'] - hpc = None - if qobj['config'].get('hpc', None): - try: - # Use CamelCase when passing the hpc parameters to the API. - hpc = { - 'multiShotOptimization': - qobj['config']['hpc']['multi_shot_optimization'], - 'ompNumThreads': - qobj['config']['hpc']['omp_num_threads'] - } - except (KeyError, TypeError): - hpc = None - backend_name = qobj['config']['backend_name'] - if backend_name != self._backend_name: - raise QISKitError("inconsistent qobj backend " - "name ({0} != {1})".format(backend_name, - self._backend_name)) + hpc = self._job_data['hpc'] + seed = self._job_data['seed'] + shots = self._job_data['shots'] + max_credits = self._job_data['max_credits'] + + hpc_camel_cased = _format_hpc_parameters(hpc) + + self._future = self._executor.submit(self._submit_callback, api_jobs, + self._backend_name, hpc_camel_cased, + seed, shots, max_credits) + + def _submit_callback(self, api_jobs, backend_name, hpc, seed, shots, max_credits): + """Submit job to IBM-Q. + + Args: + api_jobs (list): List of API job dictionaries to submit. One per circuit. + backend_name (string): The name of the backend + hpc (dict): HPC specific configuration + seed (integer): The seed for the circuits + shots (integer): Number of shots the circuits should run + max_credits (integer): Maximum number of credits + + Returns: + dict: A dictionary with the response of the submitted job + """ try: submit_info = self._api.run_job(api_jobs, backend=backend_name, - shots=qobj['config']['shots'], - max_credits=qobj['config']['max_credits'], - seed=seed0, - hpc=hpc) + shots=shots, max_credits=max_credits, + seed=seed, hpc=hpc) # pylint: disable=broad-except except Exception as err: - self._status = JobStatus.ERROR - self._status_msg = str(err) - self._exception = err + # Undefined error during submission: + # Capture and keep it for raising it when calling status(). + self._future_captured_exception = err return None + + # Error in the job after submission: + # Transition to the `ERROR` final state. if 'error' in submit_info: self._status = JobStatus.ERROR - self._status_msg = str(submit_info['error']) - self._exception = IBMQJobError(self._status_msg) + self._api_error_msg = str(submit_info['error']) return submit_info - self._id = submit_info.get('id') - self.creation_date = submit_info.get('creationDate') + + # Submisssion success. + self._creation_date = submit_info.get('creationDate') self._status = JobStatus.QUEUED + self._id = submit_info.get('id') return submit_info def _wait_for_job(self, timeout=60, wait=5): @@ -445,67 +384,66 @@ def _wait_for_job(self, timeout=60, wait=5): Result: A result object. Raises: - QISKitError: job didn't return status or reported error in status - TimeoutError: if the job does not return results before an - specified timeout. + JobTimeoutError: if the job does not return results before a specified timeout. + JobError: if something wrong happened in some of the server API calls """ start_time = time.time() - api_result = self._update_status() - while self._status not in JOB_FINAL_STATES: + while self.status() not in JOB_FINAL_STATES: elapsed_time = time.time() - start_time if timeout is not None and elapsed_time >= timeout: - raise TimeoutError('QISKit timed out') - logger.info('status = %s (%d seconds)', api_result['status'], - elapsed_time) - - if 'status' not in api_result: - self._exception = QISKitError("get_job didn't return status: %s" % - (pprint.pformat(api_result))) - raise QISKitError("get_job didn't return status: %s" % - (pprint.pformat(api_result))) + raise JobTimeoutError( + 'Timeout while waiting for the job: {}'.format(self._id) + ) + logger.info('status = %s (%d seconds)', self._status, elapsed_time) time.sleep(wait) - api_result = self._update_status() - if self.cancelled: - job_result = {'id': self._id, 'status': 'CANCELLED', - 'result': 'job cancelled'} - return Result(job_result) - - elif self.exception: - job_result = {'id': self._id, 'status': 'ERROR', - 'result': str(self.exception)} - return Result(job_result) - - if api_result is None: - api_result = self._api.get_job(self._id) + if self._cancelled: + return Result({'id': self._id, 'status': 'CANCELLED', + 'result': 'job cancelled'}) + job_data = self._api.get_job(self._id) job_result_list = [] - for circuit_result in api_result['qasms']: + for circuit_result in job_data['qasms']: this_result = {'data': circuit_result['data'], 'name': circuit_result.get('name'), 'compiled_circuit_qasm': circuit_result.get('qasm'), 'status': circuit_result['status']} if 'metadata' in circuit_result: this_result['metadata'] = circuit_result['metadata'] + job_result_list.append(this_result) - job_result = {'id': self._id, - 'status': api_result['status'], - 'used_credits': api_result.get('usedCredits'), - 'result': job_result_list} - job_result['backend_name'] = self.backend_name - return Result(job_result) + return Result({'id': self._id, + 'status': job_data['status'], + 'used_credits': job_data.get('usedCredits'), + 'result': job_result_list, + 'backend_name': self.backend_name}) + + def _wait_for_submission(self, timeout=60): + """Waits for the request to return a job ID""" + if self._id is None: + if self._future is None: + raise JobError("You have to submit before asking for status or results!") + try: + submit_info = self._future.result(timeout=timeout) + except TimeoutError as ex: + raise JobTimeoutError( + "Timeout waiting for the job being submitted: {}".format(ex) + ) -class IBMQJobError(QISKitError): - """class for IBM Q Job errors""" - pass + if 'error' in submit_info: + self._status = JobStatus.ERROR + self._api_error_msg = str(submit_info['error']) + raise JobError(str(submit_info['error'])) def _reorder_bits(result): - """temporary fix for ibmq backends. - for every ran circuit, get reordering information from qobj - and apply reordering on result""" + """Temporary fix for ibmq backends. + + For every ran circuit, get reordering information from qobj + and apply reordering on result. + """ for circuit_result in result._result['result']: if 'metadata' in circuit_result: circ = circuit_result['metadata'].get('compiled_circuit') @@ -539,7 +477,7 @@ def _reorder_bits(result): # insert spaces to signify different classical registers cregs = circ['header']['clbit_labels'] if sum([creg[1] for creg in cregs]) != num_clbits: - raise ResultError("creg sizes don't add up in result header.") + raise JobError("creg sizes don't add up in result header.") creg_begin_pos = [] creg_end_pos = [] acc = 0 @@ -561,10 +499,62 @@ def _reorder_bits(result): def _numpy_type_converter(obj): + ret = obj if isinstance(obj, numpy.integer): - return int(obj) + ret = int(obj) elif isinstance(obj, numpy.floating): # pylint: disable=no-member - return float(obj) + ret = float(obj) elif isinstance(obj, numpy.ndarray): - return obj.tolist() - return obj + ret = obj.tolist() + return ret + + +def _create_api_job_from_circuit(circuit): + """Helper function that creates a special job required by the API, from a circuit.""" + api_job = {} + if not circuit.get('compiled_circuit_qasm'): + compiled_circuit = transpile(circuit['circuit']) + circuit['compiled_circuit_qasm'] = compiled_circuit.qasm(qeflag=True) + + if isinstance(circuit['compiled_circuit_qasm'], bytes): + api_job['qasm'] = circuit['compiled_circuit_qasm'].decode() + else: + api_job['qasm'] = circuit['compiled_circuit_qasm'] + + if circuit.get('name'): + api_job['name'] = circuit['name'] + + # convert numpy types for json serialization + compiled_circuit = json.loads(json.dumps(circuit['compiled_circuit'], + default=_numpy_type_converter)) + + api_job['metadata'] = {'compiled_circuit': compiled_circuit} + return api_job + + +def _is_job_queued(api_job_response): + """Checks whether a job has been queued or not.""" + is_queued, position = False, 0 + if 'infoQueue' in api_job_response: + if 'status' in api_job_response['infoQueue']: + queue_status = api_job_response['infoQueue']['status'] + is_queued = queue_status == 'PENDING_IN_QUEUE' + if 'position' in api_job_response['infoQueue']: + position = api_job_response['infoQueue']['position'] + return is_queued, position + + +def _format_hpc_parameters(hpc): + """Helper function to get HPC parameters with the correct format""" + if hpc is None: + return None + + hpc_camel_cased = None + with contextlib.suppress(KeyError, TypeError): + # Use CamelCase when passing the hpc parameters to the API. + hpc_camel_cased = { + 'multiShotOptimization': hpc['multi_shot_optimization'], + 'ompNumThreads': hpc['omp_num_threads'] + } + + return hpc_camel_cased diff --git a/qiskit/backends/joberror.py b/qiskit/backends/joberror.py new file mode 100644 index 000000000000..59ac142cad1e --- /dev/null +++ b/qiskit/backends/joberror.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- + +# Copyright 2017, IBM. +# +# This source code is licensed under the Apache License, Version 2.0 found in +# the LICENSE.txt file in the root directory of this source tree. + +""" +Exception for errors raised by jobs. +""" + +from qiskit import QISKitError + + +class JobError(QISKitError): + """Base class for errors raised by jobs.""" + + def __init__(self, *message): + """Set the error message.""" + super().__init__(*message) + self.message = ' '.join(message) + + def __str__(self): + """Return the message.""" + return repr(self.message) diff --git a/qiskit/backends/jobtimeouterror.py b/qiskit/backends/jobtimeouterror.py new file mode 100644 index 000000000000..0203066936d9 --- /dev/null +++ b/qiskit/backends/jobtimeouterror.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- + +# Copyright 2017, IBM. +# +# This source code is licensed under the Apache License, Version 2.0 found in +# the LICENSE.txt file in the root directory of this source tree. + +""" +Exception for timeout errors raised by jobs. +""" + +from qiskit.backends import JobError + + +class JobTimeoutError(JobError): + """Base class for timeout errors raised by jobs.""" + + def __init__(self, *message): + """Set the error message.""" + super().__init__(*message) + self.message = ' '.join(message) + + def __str__(self): + """Return the message.""" + return repr(self.message) diff --git a/qiskit/backends/local/localjob.py b/qiskit/backends/local/localjob.py index 503f1b5afa49..d3c9414d7498 100644 --- a/qiskit/backends/local/localjob.py +++ b/qiskit/backends/local/localjob.py @@ -10,14 +10,32 @@ from concurrent import futures import logging import sys +import functools -from qiskit.backends import BaseJob -from qiskit.backends import JobStatus -from qiskit import QISKitError +from qiskit.backends import BaseJob, JobStatus, JobError logger = logging.getLogger(__name__) +def requires_submit(func): + """ + Decorator to ensure that a submit has been performed before + calling the method. + + Args: + func (callable): test function to be decorated. + + Returns: + callable: the decorated function. + """ + @functools.wraps(func) + def _wrapper(self, *args, **kwargs): + if self._future is None: + raise JobError("Job not submitted yet!. You have to .submit() first!") + return func(self, *args, **kwargs) + return _wrapper + + class LocalJob(BaseJob): """Local QISKit SDK Job class. @@ -32,14 +50,22 @@ class LocalJob(BaseJob): def __init__(self, fn, qobj): super().__init__() + self._fn = fn self._qobj = qobj self._backend_name = qobj.header.backend_name - self._future = self._executor.submit(fn, qobj) + self._future = None + + def submit(self): + """Submit the job to the backend for running """ + if self._future is not None: + raise JobError("We have already submitted the job!") + + self._future = self._executor.submit(self._fn, self._qobj) + @requires_submit def result(self, timeout=None): # pylint: disable=arguments-differ - """ - Get job result. The behavior is the same as the underlying + """Get job result. The behavior is the same as the underlying concurrent Future objects, https://docs.python.org/3/library/concurrent.futures.html#future-objects @@ -51,62 +77,37 @@ def result(self, timeout=None): Result: Result object Raises: - concurrent.futures.TimeoutError: if timeout occured. + concurrent.futures.TimeoutError: if timeout occurred. concurrent.futures.CancelledError: if job cancelled before completed. """ return self._future.result(timeout=timeout) + @requires_submit def cancel(self): return self._future.cancel() - @property + @requires_submit def status(self): - _status_msg = None - # order is important here - if self.running: + """Gets the status of the job by querying the Python's future + + Returns: + JobStatus: The current JobStatus + + Raises: + JobError: If the future is in unexpected state + concurrent.futures.TimeoutError: if timeout occurred. + """ + # The order is important here + if self._future.running(): _status = JobStatus.RUNNING - elif not self.done: - _status = JobStatus.QUEUED - elif self.cancelled: + elif self._future.cancelled(): _status = JobStatus.CANCELLED - elif self.done: - _status = JobStatus.DONE - elif self.exception: - _status = JobStatus.ERROR - _status_msg = str(self.exception) + elif self._future.done(): + _status = JobStatus.DONE if self._future.exception() is None else JobStatus.ERROR else: - raise LocalJobError('Unexpected behavior of {0}'.format( + raise JobError('Unexpected behavior of {0}'.format( self.__class__.__name__)) - return {'status': _status, - 'status_msg': _status_msg} - - @property - def running(self): - return self._future.running() - - @property - def done(self): - """ - Returns True if job successfully finished running. - - Note behavior is slightly different than Future objects which would - also return true if successfully cancelled. - """ - return self._future.done() and not self._future.cancelled() - - @property - def cancelled(self): - return self._future.cancelled() - - @property - def exception(self): - """ - Return Exception object if exception occured else None. - - Returns: - Exception: exception raised by attempting to run job. - """ - return self._future.exception(timeout=0) + return _status @property def backend_name(self): @@ -114,8 +115,3 @@ def backend_name(self): Return backend name used for this job """ return self._backend_name - - -class LocalJobError(QISKitError): - """class for Local Job errors""" - pass diff --git a/qiskit/backends/local/qasm_simulator_cpp.py b/qiskit/backends/local/qasm_simulator_cpp.py index 5ec8cb13ee90..868d6d0abd09 100644 --- a/qiskit/backends/local/qasm_simulator_cpp.py +++ b/qiskit/backends/local/qasm_simulator_cpp.py @@ -72,7 +72,9 @@ def __init__(self, configuration=None): def run(self, qobj): """Run a qobj on the backend.""" - return LocalJob(self._run_job, qobj) + local_job = LocalJob(self._run_job, qobj) + local_job.submit() + return local_job def _run_job(self, qobj): self._validate(qobj) @@ -129,7 +131,9 @@ def run(self, qobj): Returns: LocalJob: derived from BaseJob """ - return LocalJob(self._run_job, qobj) + local_job = LocalJob(self._run_job, qobj) + local_job.submit() + return local_job def _run_job(self, qobj): self._validate() diff --git a/qiskit/backends/local/qasm_simulator_py.py b/qiskit/backends/local/qasm_simulator_py.py index 91d5c682df63..a1086a3c0b8a 100644 --- a/qiskit/backends/local/qasm_simulator_py.py +++ b/qiskit/backends/local/qasm_simulator_py.py @@ -269,7 +269,9 @@ def run(self, qobj): Returns: LocalJob: derived from BaseJob """ - return LocalJob(self._run_job, qobj) + local_job = LocalJob(self._run_job, qobj) + local_job.submit() + return local_job def _run_job(self, qobj): """Run circuits in qobj""" diff --git a/qiskit/backends/local/statevector_simulator_cpp.py b/qiskit/backends/local/statevector_simulator_cpp.py index 0010d064309a..e0b99f1910ee 100644 --- a/qiskit/backends/local/statevector_simulator_cpp.py +++ b/qiskit/backends/local/statevector_simulator_cpp.py @@ -39,7 +39,9 @@ def __init__(self, configuration=None): def run(self, qobj): """Run a qobj on the the backend.""" - return LocalJob(self._run_job, qobj) + local_job = LocalJob(self._run_job, qobj) + local_job.submit() + return local_job def _run_job(self, qobj): """Run a Qobj on the backend.""" diff --git a/qiskit/backends/local/statevector_simulator_py.py b/qiskit/backends/local/statevector_simulator_py.py index 4cb93c173d2e..bfbbaf349d9e 100644 --- a/qiskit/backends/local/statevector_simulator_py.py +++ b/qiskit/backends/local/statevector_simulator_py.py @@ -53,7 +53,9 @@ def run(self, qobj): Returns: LocalJob: derived from BaseJob """ - return LocalJob(self._run_job, qobj) + local_job = LocalJob(self._run_job, qobj) + local_job.submit() + return local_job def _run_job(self, qobj): """Run a Qobj on the backend.""" diff --git a/qiskit/backends/local/unitary_simulator_py.py b/qiskit/backends/local/unitary_simulator_py.py index b7011de5cdf1..f63b18d054ab 100644 --- a/qiskit/backends/local/unitary_simulator_py.py +++ b/qiskit/backends/local/unitary_simulator_py.py @@ -151,7 +151,9 @@ def run(self, qobj): Returns: LocalJob: derived from BaseJob """ - return LocalJob(self._run_job, qobj) + local_job = LocalJob(self._run_job, qobj) + local_job.submit() + return local_job def _run_job(self, qobj): """Run qobj. This is a blocking call. diff --git a/test/python/_mockutils.py b/test/python/_mockutils.py index 4068be684f95..0164c78b2ac6 100644 --- a/test/python/_mockutils.py +++ b/test/python/_mockutils.py @@ -103,15 +103,15 @@ def cancel(self): return self._future.cancel() def status(self): - if self.running: + if self._running: _status = JobStatus.RUNNING - elif not self.done: + elif not self._done: _status = JobStatus.QUEUED - elif self.cancelled: + elif self._cancelled: _status = JobStatus.CANCELLED - elif self.done: + elif self._done: _status = JobStatus.DONE - elif self.error: + elif self._error: _status = JobStatus.ERROR else: raise Exception('Unexpected state of {0}'.format( @@ -121,25 +121,19 @@ def status(self): 'status_msg': _status_msg} @property - def cancelled(self): + def _cancelled(self): return self._future.cancelled() @property - def done(self): + def _done(self): return self._future.done() @property - def running(self): + def _running(self): return self._future.running() @property - def error(self): - """ - Return Exception object if exception occured else None. - - Returns: - Exception: exception raised by attempting to run job. - """ + def _error(self): return self._future.exception(timeout=0) diff --git a/test/python/test_ibmqjob.py b/test/python/test_ibmqjob.py index 080248d42852..848cda2d4e46 100644 --- a/test/python/test_ibmqjob.py +++ b/test/python/test_ibmqjob.py @@ -19,10 +19,11 @@ from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister from qiskit import transpiler -from qiskit.backends import JobStatus +from qiskit.backends import JobStatus, JobError from qiskit.backends.ibmq import IBMQProvider from qiskit.backends.ibmq.ibmqbackend import IBMQBackendError from qiskit.backends.ibmq.ibmqjob import IBMQJob +from qiskit.backends.local import LocalProvider from .common import requires_qe_access, JobTestCase, slow_test @@ -57,6 +58,7 @@ def setUp(self, qe_token, qe_url): qc.measure(qr, cr) self._qc = qc self._provider = IBMQProvider(qe_token, qe_url) + self._local_provider = LocalProvider() def test_run_simulator(self): backend = self._provider.get_backend('ibmq_qasm_simulator') @@ -102,11 +104,9 @@ def test_run_device(self): qobj = transpiler.compile(self._qc, backend) shots = qobj.config.shots job = backend.run(qobj) - while not (job.done or job.exception): - self.log.info(job.status) + while not job.status() is JobStatus.DONE: + self.log.info(job.status()) time.sleep(4) - if job.exception: - raise job.exception self.log.info(job.status) result = job.result() counts_qx = result.get_counts(result.get_names()[0]) @@ -140,18 +140,18 @@ def test_run_async_simulator(self): timeout = 30 start_time = time.time() while not found_async_jobs: - check = sum([job.running for job in job_array]) + check = sum([job.status() is JobStatus.RUNNING for job in job_array]) if check >= 2: self.log.info('found %d simultaneous jobs', check) break - if all([job.done for job in job_array]): + if all([job.status() is JobStatus.DONE for job in job_array]): # done too soon? don't generate error self.log.warning('all jobs completed before simultaneous jobs ' 'could be detected') break for job in job_array: - self.log.info('%s %s %s %s', job.status['status'], job.running, - check, job.id) + self.log.info('%s %s %s %s', job.status(), job.status() is JobStatus.RUNNING, + check, job.id()) self.log.info('-'*20 + ' ' + str(time.time()-start_time)) if time.time() - start_time > timeout: raise TimeoutError('failed to see multiple running jobs after ' @@ -161,11 +161,11 @@ def test_run_async_simulator(self): result_array = [job.result() for job in job_array] self.log.info('got back all job results') # Ensure all jobs have finished. - self.assertTrue(all([job.done for job in job_array])) + self.assertTrue(all([job.status() is JobStatus.DONE for job in job_array])) self.assertTrue(all([result.get_status() == 'COMPLETED' for result in result_array])) # Ensure job ids are unique. - job_ids = [job.id for job in job_array] + job_ids = [job.id() for job in job_array] self.assertEqual(sorted(job_ids), sorted(list(set(job_ids)))) @slow_test @@ -185,12 +185,12 @@ def test_run_async_device(self): num_jobs = 3 job_array = [backend.run(qobj) for _ in range(num_jobs)] time.sleep(3) # give time for jobs to start (better way?) - job_status = [job.status['status'] for job in job_array] - num_init = sum([status == JobStatus.INITIALIZING for status in job_status]) - num_queued = sum([status == JobStatus.QUEUED for status in job_status]) - num_running = sum([status == JobStatus.RUNNING for status in job_status]) - num_done = sum([status == JobStatus.DONE for status in job_status]) - num_error = sum([status == JobStatus.ERROR for status in job_status]) + job_status = [job.status() for job in job_array] + num_init = sum([status is JobStatus.INITIALIZING for status in job_status]) + num_queued = sum([status is JobStatus.QUEUED for status in job_status]) + num_running = sum([status is JobStatus.RUNNING for status in job_status]) + num_done = sum([status is JobStatus.DONE for status in job_status]) + num_error = sum([status is JobStatus.ERROR for status in job_status]) self.log.info('number of currently initializing jobs: %d/%d', num_init, num_jobs) self.log.info('number of currently queued jobs: %d/%d', @@ -207,11 +207,11 @@ def test_run_async_device(self): result_array = [job.result() for job in job_array] # Ensure all jobs have finished. - self.assertTrue(all([job.done for job in job_array])) + self.assertTrue(all([job.status() is JobStatus.DONE for job in job_array])) self.assertTrue(all([result.get_status() == 'COMPLETED' for result in result_array])) # Ensure job ids are unique. - job_ids = [job.id for job in job_array] + job_ids = [job.id() for job in job_array] self.assertEqual(sorted(job_ids), sorted(list(set(job_ids)))) @slow_test @@ -230,15 +230,15 @@ def test_job_id(self): backend = self._provider.get_backend('ibmq_qasm_simulator') qobj = transpiler.compile(self._qc, backend) job = backend.run(qobj) - self.log.info('job_id: %s', job.id) - self.assertTrue(job.id is not None) + self.log.info('job_id: %s', job.id()) + self.assertTrue(job.id() is not None) def test_get_backend_name(self): backend_name = 'ibmq_qasm_simulator' backend = self._provider.get_backend(backend_name) qobj = transpiler.compile(self._qc, backend) job = backend.run(qobj) - self.assertTrue(job.backend_name == backend_name) + self.assertTrue(job.backend_name() == backend_name) def test_get_jobs_from_backend(self): backend = _least_busy(self._provider.available_backends()) @@ -247,16 +247,16 @@ def test_get_jobs_from_backend(self): self.log.info('time to get jobs: %0.3f s', time.time() - start_time) self.log.info('found %s jobs on backend %s', len(job_list), backend.name) for job in job_list: - self.log.info('status: %s', job.status) - self.assertTrue(isinstance(job.id, str)) + self.log.info('status: %s', job.status()) + self.assertTrue(isinstance(job.id(), str)) self.log.info('time to get job statuses: %0.3f s', time.time() - start_time) def test_retrieve_job(self): backend = self._provider.get_backend('ibmq_qasm_simulator') qobj = transpiler.compile(self._qc, backend) job = backend.run(qobj) - rjob = backend.retrieve_job(job.id) - self.assertTrue(job.id == rjob.id) + rjob = backend.retrieve_job(job.id()) + self.assertTrue(job.id() == rjob.id()) self.assertTrue(job.result().get_counts() == rjob.result().get_counts()) def test_retrieve_job_error(self): @@ -272,7 +272,7 @@ def test_get_jobs_filter_job_status(self): self.log.info('found %s matching jobs', len(job_list)) for i, job in enumerate(job_list): self.log.info('match #%d: %s', i, job.result()._result['status']) - self.assertTrue(job.status['status'] == JobStatus.DONE) + self.assertTrue(job.status() is JobStatus.DONE) def test_get_jobs_filter_counts(self): # TODO: consider generalizing backend name @@ -304,8 +304,16 @@ def test_get_jobs_filter_date(self): job_list = backend.jobs(limit=5, db_filter=my_filter) self.log.info('found %s matching jobs', len(job_list)) for i, job in enumerate(job_list): - self.log.info('match #%d: %s', i, job.creation_date) - self.assertTrue(job.creation_date < past_day_30.isoformat()) + self.log.info('match #%d: %s', i, job.creation_date()) + self.assertTrue(job.creation_date() < past_day_30.isoformat()) + + def test_double_submit_fails(self): + backend = self._provider.get_backend('ibmq_qasm_simulator') + qobj = transpiler.compile(self._qc, backend) + # backend.run() will automatically call job.submit() + job = backend.run(qobj) + with self.assertRaises(JobError): + job.submit() if __name__ == '__main__': diff --git a/test/python/test_ibmqjob_states.py b/test/python/test_ibmqjob_states.py index 9538e0337742..16634a37b5b0 100644 --- a/test/python/test_ibmqjob_states.py +++ b/test/python/test_ibmqjob_states.py @@ -12,10 +12,12 @@ import unittest import time +from contextlib import suppress from IBMQuantumExperience import ApiError from qiskit.backends.jobstatus import JobStatus -from qiskit.backends.ibmq.ibmqjob import IBMQJob, IBMQJobError +from qiskit.backends.ibmq.ibmqjob import IBMQJob from qiskit.backends.ibmq.ibmqjob import API_FINAL_STATES +from qiskit.backends import JobError, JobTimeoutError from .common import JobTestCase from ._mockutils import new_fake_qobj @@ -30,132 +32,135 @@ def setUp(self): def test_unrecognized_status(self): job = self.run_with_api(UnknownStatusAPI()) - self.assertStatus(job, JobStatus.INITIALIZING) - self.wait_for_initialization(job) - self.assertIsInstance(job.exception, IBMQJobError) - self.assertStatus(job, JobStatus.ERROR) + with self.assertRaises(JobError): + self.wait_for_initialization(job) def test_validating_job(self): job = self.run_with_api(ValidatingAPI()) - self.assertStatus(job, JobStatus.INITIALIZING) + self.assertEqual(job.status(), JobStatus.INITIALIZING) self.wait_for_initialization(job) - self.assertStatus(job, JobStatus.VALIDATING) - - self._current_api.progress() + self.assertEqual(job.status(), JobStatus.VALIDATING) def test_error_while_creating_job(self): job = self.run_with_api(ErrorWhileCreatingAPI()) - self.assertStatus(job, JobStatus.INITIALIZING) + self.assertEqual(job.status(), JobStatus.INITIALIZING) self.wait_for_initialization(job) - self.assertStatus(job, JobStatus.ERROR) - - def test_error_while_running_job(self): - job = self.run_with_api(ErrorWhileRunningAPI()) - self.assertStatus(job, JobStatus.INITIALIZING) - - self.wait_for_initialization(job) - self.assertStatus(job, JobStatus.RUNNING) - - self._current_api.progress() - self.assertStatus(job, JobStatus.ERROR) + self.assertEqual(job.status(), JobStatus.ERROR) def test_error_while_validating_job(self): job = self.run_with_api(ErrorWhileValidatingAPI()) - self.assertStatus(job, JobStatus.INITIALIZING) + self.assertEqual(job.status(), JobStatus.INITIALIZING) self.wait_for_initialization(job) - self.assertStatus(job, JobStatus.VALIDATING) + self.assertEqual(job.status(), JobStatus.VALIDATING) self._current_api.progress() - self.assertStatus(job, JobStatus.ERROR) + self.assertEqual(job.status(), JobStatus.ERROR) def test_status_flow_for_non_queued_job(self): job = self.run_with_api(NonQueuedAPI()) - self.assertStatus(job, JobStatus.INITIALIZING) + self.assertEqual(job.status(), JobStatus.INITIALIZING) self.wait_for_initialization(job) - self.assertStatus(job, JobStatus.RUNNING) + self.assertEqual(job.status(), JobStatus.RUNNING) self._current_api.progress() - self.assertStatus(job, JobStatus.DONE) + self.assertEqual(job.status(), JobStatus.DONE) def test_status_flow_for_queued_job(self): job = self.run_with_api(QueuedAPI()) - self.assertStatus(job, JobStatus.INITIALIZING) + self.assertEqual(job.status(), JobStatus.INITIALIZING) self.wait_for_initialization(job) - self.assertStatus(job, JobStatus.QUEUED) + self.assertEqual(job.status(), JobStatus.QUEUED) self._current_api.progress() - self.assertStatus(job, JobStatus.RUNNING) + self.assertEqual(job.status(), JobStatus.RUNNING) self._current_api.progress() - self.assertStatus(job, JobStatus.DONE) + self.assertEqual(job.status(), JobStatus.DONE) def test_status_flow_for_cancellable_job(self): job = self.run_with_api(CancellableAPI()) - self.assertStatus(job, JobStatus.INITIALIZING) + self.assertEqual(job.status(), JobStatus.INITIALIZING) self.wait_for_initialization(job) - self.assertStatus(job, JobStatus.RUNNING) + self.assertEqual(job.status(), JobStatus.RUNNING) can_cancel = job.cancel() self.assertTrue(can_cancel) self._current_api.progress() - self.assertStatus(job, JobStatus.CANCELLED) + self.assertEqual(job.status(), JobStatus.CANCELLED) def test_status_flow_for_non_cancellable_job(self): job = self.run_with_api(NonCancellableAPI()) - self.assertStatus(job, JobStatus.INITIALIZING) + self.assertEqual(job.status(), JobStatus.INITIALIZING) self.wait_for_initialization(job) - self.assertStatus(job, JobStatus.RUNNING) + self.assertEqual(job.status(), JobStatus.RUNNING) can_cancel = job.cancel() self.assertFalse(can_cancel) self._current_api.progress() - self.assertStatus(job, JobStatus.RUNNING) + self.assertEqual(job.status(), JobStatus.RUNNING) def test_status_flow_for_errored_cancellation(self): job = self.run_with_api(ErroredCancellationAPI()) - self.assertStatus(job, JobStatus.INITIALIZING) + self.assertEqual(job.status(), JobStatus.INITIALIZING) self.wait_for_initialization(job) - self.assertStatus(job, JobStatus.RUNNING) - + self.assertEqual(job.status(), JobStatus.RUNNING) can_cancel = job.cancel() self.assertFalse(can_cancel) - self.assertIsInstance(job.exception, IBMQJobError) + self.assertEqual(job.status(), JobStatus.RUNNING) - self.assertStatus(job, JobStatus.RUNNING) + def test_status_flow_for_unable_to_run_valid_qobj(self): + """Contrary to other tests, this one is expected to fail even for a + non-job-related issue. If the API fails while sending a job, we don't + get an id so we can not query for the job status.""" + job = self.run_with_api(UnavailableRunAPI()) - def test_status_flow_for_invalid_job(self): - job = self.run_with_api(UnableToInitializeAPI()) - self.assertStatus(job, JobStatus.INITIALIZING) + with self.assertRaises(JobError): + self.wait_for_initialization(job) - self.wait_for_initialization(job) - self.assertIsInstance(job.exception, IBMQJobError) - self.assertStatus(job, JobStatus.ERROR) + with self.assertRaises(JobError): + job.status() + + def test_api_throws_temporarily_but_job_is_finished(self): + job = self.run_with_api(ThrowingNonJobRelatedErrorAPI(errors_before_success=2)) - def test_status_flow_for_throwing_job(self): - job = self.run_with_api(ThrowingInitializationAPI()) - self.assertStatus(job, JobStatus.INITIALIZING) + # First time we query the server... + with self.assertRaises(JobError): + # The error happens inside wait_for_initialization, the first time + # it calls to status() after INITIALIZING. + self.wait_for_initialization(job) + # Also an explicit second time... + with self.assertRaises(JobError): + job.status() + + # Now the API gets fixed and doesn't throw anymore. + self.assertEqual(job.status(), JobStatus.DONE) + + def test_status_flow_for_unable_to_run_invalid_qobj(self): + job = self.run_with_api(RejectingJobAPI()) self.wait_for_initialization(job) - self.assertIsInstance(job.exception, ApiError) - self.assertStatus(job, JobStatus.ERROR) + self.assertEqual(job.status(), JobStatus.ERROR) - def test_status_flow_for_throwing_api(self): - job = self.run_with_api(ThrowingAPI()) - self.assertStatus(job, JobStatus.INITIALIZING) + def test_error_while_running_job(self): + job = self.run_with_api(ErrorWhileRunningAPI()) + self.assertEqual(job.status(), JobStatus.INITIALIZING) self.wait_for_initialization(job) - self.assertIsInstance(job.exception, ApiError) - self.assertStatus(job, JobStatus.ERROR) + self.assertEqual(job.status(), JobStatus.RUNNING) + + self._current_api.progress() + self.assertEqual(job.status(), JobStatus.ERROR) + self.assertEqual(job.error_message(), 'Error running job') def test_cancelled_result(self): job = self.run_with_api(CancellableAPI()) @@ -164,15 +169,13 @@ def test_cancelled_result(self): job.cancel() self._current_api.progress() self.assertEqual(job.result().get_status(), 'CANCELLED') - self.assertStatus(job, JobStatus.CANCELLED) + self.assertEqual(job.status(), JobStatus.CANCELLED) def test_errored_result(self): - job = self.run_with_api(ThrowingInitializationAPI()) - - # TODO: Seems inconsistent, should throw while initializating? + job = self.run_with_api(ThrowingGetJobAPI()) self.wait_for_initialization(job) - self.assertEqual(job.result().get_status(), 'ERROR') - self.assertStatus(job, JobStatus.ERROR) + with self.assertRaises(JobError): + job.result() def test_completed_result(self): job = self.run_with_api(NonQueuedAPI()) @@ -180,7 +183,7 @@ def test_completed_result(self): self.wait_for_initialization(job) self._current_api.progress() self.assertEqual(job.result().get_status(), 'COMPLETED') - self.assertStatus(job, JobStatus.DONE) + self.assertEqual(job.status(), JobStatus.DONE) def test_block_on_result_waiting_until_completed(self): from concurrent import futures @@ -191,7 +194,7 @@ def test_block_on_result_waiting_until_completed(self): result = job.result() self.assertEqual(result.get_status(), 'COMPLETED') - self.assertStatus(job, JobStatus.DONE) + self.assertEqual(job.status(), JobStatus.DONE) def test_block_on_result_waiting_until_cancelled(self): from concurrent.futures import ThreadPoolExecutor @@ -202,7 +205,7 @@ def test_block_on_result_waiting_until_cancelled(self): result = job.result() self.assertEqual(result.get_status(), 'CANCELLED') - self.assertStatus(job, JobStatus.CANCELLED) + self.assertEqual(job.status(), JobStatus.CANCELLED) def test_block_on_result_waiting_until_exception(self): from concurrent.futures import ThreadPoolExecutor @@ -211,25 +214,24 @@ def test_block_on_result_waiting_until_exception(self): with ThreadPoolExecutor() as executor: executor.submit(_auto_progress_api, self._current_api) - result = job.result() - self.assertEqual(result.get_status(), 'ERROR') - self.assertStatus(job, JobStatus.ERROR) + with self.assertRaises(JobError): + job.result() def test_never_complete_result_with_timeout(self): job = self.run_with_api(NonQueuedAPI()) self.wait_for_initialization(job) # We never make the API status to progress so it is stuck on RUNNING - self.assertEqual(job.result(timeout=1).get_status(), 'ERROR') - self.assertStatus(job, JobStatus.RUNNING) + with self.assertRaises(JobTimeoutError): + job.result(timeout=0.2) - def test_cancel_while_initializing_should_not_raise(self): - """Trying to cancel while initializing is not possible but should - not raise.""" + self.assertEqual(job.status(), JobStatus.RUNNING) + + def test_cancel_while_initializing_fails(self): job = self.run_with_api(CancellableAPI()) can_cancel = job.cancel() self.assertFalse(can_cancel) - self.assertStatus(job, JobStatus.INITIALIZING) + self.assertEqual(job.status(), JobStatus.INITIALIZING) def test_only_final_states_cause_datailed_request(self): from unittest import mock @@ -246,36 +248,50 @@ def test_only_final_states_cause_datailed_request(self): job = self.run_with_api(api()) self.wait_for_initialization(job) - try: + with suppress(BaseFakeAPI.NoMoreStatesError): self._current_api.progress() - except BaseFakeAPI.NoMoreStatesError: - pass with mock.patch.object(self._current_api, 'get_job', wraps=self._current_api.get_job): - _ = job.status + job.status() if status in API_FINAL_STATES: self.assertTrue(self._current_api.get_job.called) else: self.assertFalse(self._current_api.get_job.called) + def wait_for_initialization(self, job, timeout=1): + """Waits until the job progress from `INITIALIZING` to a different + status. + """ + waited = 0 + wait = 0.1 + while job.status() == JobStatus.INITIALIZING: + time.sleep(wait) + waited += wait + if waited > timeout: + self.fail( + msg="The JOB is still initializing after timeout ({}s)" + .format(timeout) + ) + def run_with_api(self, api): """Creates a new `IBMQJob` instance running with the provided API - object.""" + object. + """ self._current_api = api - self._current_qjob = IBMQJob(new_fake_qobj(), api, False) + self._current_qjob = IBMQJob(api, False, qobj=new_fake_qobj()) + self._current_qjob.submit() return self._current_qjob def _auto_progress_api(api, interval=0.2): """Progress a `BaseFakeAPI` instacn every `interval` seconds until reaching - the final state.""" - try: + the final state. + """ + with suppress(BaseFakeAPI.NoMoreStatesError): while True: time.sleep(interval) api.progress() - except BaseFakeAPI.NoMoreStatesError: - pass class BaseFakeAPI(): @@ -304,7 +320,7 @@ def get_job(self, job_id): return self._job_status[self._state] def get_status_job(self, job_id): - summary_fields = ['status', 'infoQueue'] + summary_fields = ['status', 'error', 'infoQueue'] complete_response = self.get_job(job_id) return {key: value for key, value in complete_response.items() if key in summary_fields} @@ -362,7 +378,8 @@ class NonQueuedAPI(BaseFakeAPI): class ErrorWhileCreatingAPI(BaseFakeAPI): """Class emulating an API processing a job that errors while creating - the job.""" + the job. + """ _job_status = [ {'status': 'ERROR_CREATING_JOB'} @@ -374,7 +391,7 @@ class ErrorWhileRunningAPI(BaseFakeAPI): _job_status = [ {'status': 'RUNNING'}, - {'status': 'ERROR_RUNNING_JOB'} + {'status': 'ERROR_RUNNING_JOB', 'error': 'Error running job'} ] @@ -388,15 +405,14 @@ class QueuedAPI(BaseFakeAPI): ] -class UnableToInitializeAPI(BaseFakeAPI): +class RejectingJobAPI(BaseFakeAPI): """Class for emulating an API unable of initializing.""" def run_job(self, *_args, **_kwargs): - time.sleep(0.2) - return {'error': 'invalid test qobj'} + return {'error': 'invalid qobj'} -class ThrowingInitializationAPI(BaseFakeAPI): +class UnavailableRunAPI(BaseFakeAPI): """Class for emulating an API throwing before even initializing.""" def run_job(self, *_args, **_kwargs): @@ -415,6 +431,43 @@ def get_job(self, job_id): raise ApiError() +class ThrowingNonJobRelatedErrorAPI(BaseFakeAPI): + """Class for emulating an scenario where the job is done but the API + fails some times for non job-related errors. + """ + + _job_status = [ + {'status': 'COMPLETED'} + ] + + def __init__(self, errors_before_success=2): + super().__init__() + self._number_of_exceptions_to_throw = errors_before_success + + def get_job(self, job_id): + if self._number_of_exceptions_to_throw != 0: + self._number_of_exceptions_to_throw -= 1 + raise ApiError() + + return super().get_job(job_id) + + +class ThrowingGetJobAPI(BaseFakeAPI): + """Class for emulating an API throwing in the middle of execution. But not in + get_status_job() , just in get_job(). + """ + + _job_status = [ + {'status': 'COMPLETED'} + ] + + def get_status_job(self, job_id): + return self._job_status[self._state] + + def get_job(self, job_id): + raise ApiError("Unexpected error") + + class CancellableAPI(BaseFakeAPI): """Class for emulating an API with cancellation.""" @@ -438,7 +491,8 @@ class NonCancellableAPI(BaseFakeAPI): class ErroredCancellationAPI(BaseFakeAPI): """Class for emulating an API with cancellation but throwing while - trying.""" + trying. + """ _job_status = [ {'status': 'RUNNING'}, diff --git a/test/python/test_localjob.py b/test/python/test_localjob.py index 023a7b8a5a92..2becd45af0d8 100644 --- a/test/python/test_localjob.py +++ b/test/python/test_localjob.py @@ -14,7 +14,6 @@ import unittest from unittest.mock import patch -from qiskit.backends.local import LocalJob from qiskit.backends.local import QasmSimulatorCpp from qiskit.backends.local import QasmSimulatorPy from qiskit.backends.local import StatevectorSimulatorCpp @@ -35,18 +34,6 @@ class TestLocalJob(QiskitTestCase): UnitarySimulatorPy ] - def test_run(self): - with mocked_simulator_binaries(),\ - patch.object(LocalJob, '__init__', return_value=None, - autospec=True): - - for backend_constructor in self._backends: - with self.subTest(backend=backend_constructor): - self.log.info('Backend under test: %s', backend_constructor) - backend = backend_constructor() - job = backend.run(new_fake_qobj()) - self.assertIsInstance(job, LocalJob) - def test_multiple_execution(self): # Notice that it is Python responsibility to test the executors # can run several tasks at the same time. It is our responsibility to @@ -58,7 +45,8 @@ def test_multiple_execution(self): # pylint: disable=invalid-name,redefined-outer-name with mocked_executor() as (LocalJob, executor): for index in range(taskcount): - LocalJob(target_tasks[index], new_fake_qobj()) + local_job = LocalJob(target_tasks[index], new_fake_qobj()) + local_job.submit() self.assertEqual(executor.submit.call_count, taskcount) for index in range(taskcount): @@ -75,25 +63,13 @@ def test_cancel(self): # pylint: disable=invalid-name,redefined-outer-name with mocked_executor() as (LocalJob, executor): job = LocalJob(lambda: None, new_fake_qobj()) + job.submit() job.cancel() self.assertCalledOnce(executor.submit) mocked_future = executor.submit.return_value self.assertCalledOnce(mocked_future.cancel) - def test_done(self): - # Once more, testing that reading the `done` property delegates into - # the proper future API. - - # pylint: disable=invalid-name,redefined-outer-name - with mocked_executor() as (LocalJob, executor): - job = LocalJob(lambda: None, new_fake_qobj()) - _ = job.done - - self.assertCalledOnce(executor.submit) - mocked_future = executor.submit.return_value - self.assertCalledOnce(mocked_future.done) - def assertCalledOnce(self, mocked_callable): """Assert a mocked callable has been called once.""" call_count = mocked_callable.call_count