Skip to content

Commit

Permalink
Added temp directory to handle temp files for hubble, xmm_newton and …
Browse files Browse the repository at this point in the history
…esa_sky tests
  • Loading branch information
javier-ballester committed Jul 19, 2022
1 parent 03a871c commit 61226bd
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 74 deletions.
33 changes: 16 additions & 17 deletions astroquery/esa/hubble/tests/test_esa_hubble.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,20 @@
"""
import numpy as np
import pytest
import os
import shutil
import tempfile
from unittest.mock import MagicMock
from unittest.mock import patch

import numpy as np
import pytest
from astropy import coordinates
from astropy.table.table import Table
from requests.models import Response

from astroquery.esa.hubble import ESAHubbleClass
from astroquery.esa.hubble.tests.dummy_tap_handler import DummyHubbleTapHandler
from astroquery.utils.mocks import MockResponse
from astropy import coordinates
from unittest.mock import MagicMock
from astropy.table.table import Table
import shutil


def data_path(filename):
Expand Down Expand Up @@ -93,21 +94,22 @@ def test_download_product_errors(self):
assert "This product_type is not allowed" in err.value.args[0]

def test_download_product_by_calibration(self):
tempdir = tempfile.mkdtemp("temp")
parameters = {'observation_id': "J6FL25S4Q",
'cal_level': "RAW",
'filename': "J6FL25S4Q.vot.test",
'filename': tempdir + "/" + "J6FL25S4Q.vot.test",
'verbose': True}
ehst = ESAHubbleClass(self.get_dummy_tap_handler())
ehst.download_product(observation_id=parameters['observation_id'],
calibration_level=parameters['cal_level'],
filename=parameters['filename'],
verbose=parameters['verbose'])
os.remove("J6FL25S4Q.vot.test")

def test_download_product_by_product_type(self):
tempdir = tempfile.mkdtemp("temp")
parameters = {'observation_id': "J6FL25S4Q",
'product_type': "SCIENCE_PRODUCT",
'filename': "J6FL25S4Q.vot.test",
'filename': tempdir + "/" + "J6FL25S4Q.vot.test",
'verbose': True}
ehst = ESAHubbleClass(self.get_dummy_tap_handler())
ehst.download_product(observation_id=parameters['observation_id'],
Expand All @@ -126,16 +128,13 @@ def test_download_product_by_product_type(self):
product_type=parameters['product_type'],
filename=parameters['filename'],
verbose=parameters['verbose'])
os.remove("J6FL25S4Q.vot.test")
os.remove("J6FL25S4Q.vot.test.fits.gz")
os.remove("J6FL25S4Q.vot.test.jpg")

def test_get_postcard(self):
tempdir = tempfile.mkdtemp("temp")
ehst = ESAHubbleClass(self.get_dummy_tap_handler())
ehst.get_postcard(observation_id="X0MC5101T",
filename="X0MC5101T.vot",
filename=tempdir + "/" + "X0MC5101T.vot",
verbose=True)
os.remove("X0MC5101T.vot")

@patch.object(ESAHubbleClass, 'cone_search')
@patch.object(ESAHubbleClass, '_query_tap_target')
Expand Down Expand Up @@ -229,9 +228,9 @@ def test_get_tables(self):
ehst.get_tables(True, True)

def test_get_artifact(self):
tempdir = tempfile.mkdtemp("temp")
ehst = ESAHubbleClass(self.get_dummy_tap_handler())
ehst.get_artifact("w0ji0v01t_c2f.fits.gz")
os.remove("w0ji0v01t_c2f.fits.gz")
ehst.get_artifact(tempdir + "/" + "w0ji0v01t_c2f.fits.gz")

def test_get_columns(self):
parameters = {'query': "select top 10 * from hsc_v2.hubble_sc2",
Expand Down
76 changes: 39 additions & 37 deletions astroquery/esa/xmm_newton/tests/test_xmm_newton.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@
European Space Agency (ESA)
Created on 4 Sept. 2019
"""
import errno
import os
import shutil
import tarfile
import tempfile
from unittest.mock import patch

import pytest
import tarfile
import os
import errno
import shutil

from astroquery.exceptions import LoginError
from ..core import XMMNewtonClass
from ..tests.dummy_tap_handler import DummyXMMNewtonTapHandler
from ..tests.dummy_handler import DummyHandler
from astroquery.exceptions import LoginError
from ..tests.dummy_tap_handler import DummyXMMNewtonTapHandler


def data_path(filename):
Expand Down Expand Up @@ -98,9 +99,10 @@ def test_dummy_handler(self):
dummyHandler.reset()

def test_parse_filename(self):
self._create_tar("filename.tar", self._files)
tempdir = tempfile.mkdtemp("temp")
self._create_tar(tempdir + "/" + "filename.tar", self._files)
xsa = XMMNewtonClass(self.get_dummy_tap_handler())
with tarfile.open("filename.tar", "r") as tar:
with tarfile.open(tempdir + "/" + "filename.tar", "r") as tar:
for i in tar.getmembers():
paths = os.path.split(i.name)
fname = paths[1]
Expand All @@ -109,7 +111,6 @@ def test_parse_filename(self):
continue
fname_info = xsa._parse_filename(fname)
assert fname_info["X"] == "P"
os.remove("filename.tar")

_files = {
"0405320501": {
Expand Down Expand Up @@ -294,10 +295,10 @@ def _create_tar_lightcurves(self, tarname, files):
shutil.rmtree(ob_name)

def test_create_tar_lightcurves(self):
tempdir = tempfile.mkdtemp("temp")
_tarname = "tarfile_lightcurves.tar"
self._create_tar_lightcurves(_tarname, self._files_lightcurves)
assert os.path.isfile(_tarname)
os.remove("tarfile_lightcurves.tar")
self._create_tar_lightcurves(tempdir + "/" + _tarname, self._files_lightcurves)
assert os.path.isfile(tempdir + "/" + _tarname)

def test_get_epic_spectra_non_existing_file(self, capsys):
_tarname = "nonexistingfile.tar"
Expand All @@ -311,27 +312,28 @@ def test_get_epic_spectra_non_existing_file(self, capsys):
"[astroquery.esa.xmm_newton.core]\n" % _tarname)

def test_get_epic_spectra_invalid_instrumnet(self, capsys):
tempdir = tempfile.mkdtemp("temp")
_tarname = "tarfile.tar"
_invalid_instrument = "II"
_source_number = 83
self._create_tar(_tarname, self._files)
self._create_tar(tempdir + "/" + _tarname, self._files)
xsa = XMMNewtonClass(self.get_dummy_tap_handler())
res = xsa.get_epic_spectra(_tarname, _source_number,
res = xsa.get_epic_spectra(tempdir + "/" + _tarname, _source_number,
instrument=[_invalid_instrument])
assert res == {}
out, err = capsys.readouterr()
assert err == ("WARNING: Invalid instrument %s "
"[astroquery.esa.xmm_newton.core]\n"
% _invalid_instrument)
os.remove(_tarname)

def test_get_epic_spectra_invalid_source_number(self, capsys):
tempdir = tempfile.mkdtemp("temp")
_tarname = "tarfile.tar"
_invalid_source_number = 833
_default_instrument = ['M1', 'M2', 'PN', 'EP']
self._create_tar(_tarname, self._files)
self._create_tar(tempdir + "/" + _tarname, self._files)
xsa = XMMNewtonClass(self.get_dummy_tap_handler())
res = xsa.get_epic_spectra(_tarname, _invalid_source_number,
res = xsa.get_epic_spectra(tempdir + "/" + _tarname, _invalid_source_number,
instrument=[])
assert res == {}
out, err = capsys.readouterr()
Expand All @@ -340,9 +342,8 @@ def test_get_epic_spectra_invalid_source_number(self, capsys):
" Source Number: %u\n"
" Instrument: %s\n"
" [astroquery.esa.xmm_newton.core]\n"
% (_tarname, _invalid_source_number,
% (tempdir + "/" + _tarname, _invalid_source_number,
_default_instrument))
os.remove(_tarname)

def test_get_epic_images_non_existing_file(self, capsys):
_tarname = "nonexistingfile.tar"
Expand All @@ -355,43 +356,44 @@ def test_get_epic_images_non_existing_file(self, capsys):
"[astroquery.esa.xmm_newton.core]\n" % _tarname)

def test_get_epic_images_invalid_instrument(self, capsys):
tempdir = tempfile.mkdtemp("temp")
_tarname = "tarfile.tar"
_invalid_instrument = "II"
self._create_tar(_tarname, self._files)
self._create_tar(tempdir + "/" + _tarname, self._files)
xsa = XMMNewtonClass(self.get_dummy_tap_handler())
res = xsa.get_epic_images(_tarname,
res = xsa.get_epic_images(tempdir + "/" + _tarname,
band=[], instrument=[_invalid_instrument],
get_detmask=True, get_exposure_map=True)
assert res == {}
out, err = capsys.readouterr()
assert err == ("WARNING: Invalid instrument %s "
"[astroquery.esa.xmm_newton.core]\n"
% _invalid_instrument)
os.remove(_tarname)

def test_get_epic_images_invalid_band(self, capsys):
tempdir = tempfile.mkdtemp("temp")
_tarname = "tarfile.tar"
_invalid_band = 10
self._create_tar(_tarname, self._files)
self._create_tar(tempdir + "/" + _tarname, self._files)
xsa = XMMNewtonClass(self.get_dummy_tap_handler())
res = xsa.get_epic_images(_tarname,
res = xsa.get_epic_images(tempdir + "/" + _tarname,
band=[_invalid_band], instrument=[],
get_detmask=True, get_exposure_map=True)
assert res == {}
out, err = capsys.readouterr()
assert err == ("WARNING: Invalid band %u "
"[astroquery.esa.xmm_newton.core]\n" % _invalid_band)
os.remove(_tarname)

def test_get_epic_images(self):
tempdir = tempfile.mkdtemp("temp")
_tarname = "tarfile.tar"
_instruments = ["M1", "M1_expo", "M1_det",
"M2", "M2_expo", "M2_det",
"PN", "PN_expo", "PN_det",
"EP", "EP_expo", "EP_det"]
self._create_tar(_tarname, self._files)
self._create_tar(tempdir + "/" + _tarname, self._files)
xsa = XMMNewtonClass(self.get_dummy_tap_handler())
res = xsa.get_epic_images(_tarname, band=[], instrument=[],
res = xsa.get_epic_images(tempdir + "/" + _tarname, band=[], instrument=[],
get_detmask=True, get_exposure_map=True)
assert len(res) == 6 # Number of different bands
assert len(res[1]) == 9 # Number of different inst within band 1
Expand Down Expand Up @@ -472,14 +474,14 @@ def test_get_epic_images(self):
# Removing files created in this test
for ob_name in self._files:
shutil.rmtree(ob_name)
os.remove(_tarname)

def test_get_epic_lightcurve(self):
tempdir = tempfile.mkdtemp("temp")
_tarname = "tarfile.tar"
self._create_tar(_tarname, self._files)
self._create_tar(tempdir + "/" + _tarname, self._files)
_source_number = 1
xsa = XMMNewtonClass(self.get_dummy_tap_handler())
res = xsa.get_epic_lightcurve(_tarname, _source_number,
res = xsa.get_epic_lightcurve(tempdir + "/" + _tarname, _source_number,
instrument=['M1', 'M2', 'PN'])
assert res == {}

Expand All @@ -495,26 +497,27 @@ def test_get_epic_lightcurve_non_existing_file(self, capsys):
"[astroquery.esa.xmm_newton.core]\n" % _tarname)

def test_get_epic_lightcurve_invalid_instrument(self, capsys):
tempdir = tempfile.mkdtemp("temp")
_tarname = "tarfile.tar"
_invalid_instrument = "II"
self._create_tar(_tarname, self._files)
self._create_tar(tempdir + "/" + _tarname, self._files)
xsa = XMMNewtonClass(self.get_dummy_tap_handler())
res = xsa.get_epic_images(_tarname, [], [_invalid_instrument],
res = xsa.get_epic_images(tempdir + "/" + _tarname, [], [_invalid_instrument],
get_detmask=True, get_exposure_map=True)
assert res == {}
out, err = capsys.readouterr()
assert err == ("WARNING: Invalid instrument %s "
"[astroquery.esa.xmm_newton.core]\n"
% _invalid_instrument)
os.remove(_tarname)

def test_get_epic_lightcurve_invalid_source_number(self, capsys):
tempdir = tempfile.mkdtemp("temp")
_tarname = "tarfile.tar"
_invalid_source_number = 833
_default_instrument = ['M1', 'M2', 'PN', 'EP']
self._create_tar(_tarname, self._files)
self._create_tar(tempdir + "/" + _tarname, self._files)
xsa = XMMNewtonClass(self.get_dummy_tap_handler())
res = xsa.get_epic_lightcurve(_tarname, _invalid_source_number,
res = xsa.get_epic_lightcurve(tempdir + "/" + _tarname, _invalid_source_number,
instrument=[])
assert res == {}
out, err = capsys.readouterr()
Expand All @@ -523,9 +526,8 @@ def test_get_epic_lightcurve_invalid_source_number(self, capsys):
" Source Number: %u\n"
" Instrument: %s\n"
" [astroquery.esa.xmm_newton.core]\n"
% (_tarname, _invalid_source_number,
% (tempdir + "/" + _tarname, _invalid_source_number,
_default_instrument))
os.remove(_tarname)

def test_create_link(self):
xsa = XMMNewtonClass(self.get_dummy_tap_handler())
Expand Down
36 changes: 16 additions & 20 deletions astroquery/esasky/tests/test_esasky_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import os
import shutil
import tempfile

import pytest
from astroquery import log
from astroquery.utils.tap.model.taptable import TapTableMeta
from astroquery.utils.tap.model.tapcolumn import TapColumn
from astroquery.utils.commons import TableList
from astropy.io.fits.hdu.hdulist import HDUList

from astroquery import log
from astroquery.utils.commons import TableList
from astroquery.utils.tap.model.tapcolumn import TapColumn
from astroquery.utils.tap.model.taptable import TapTableMeta
from ... import esasky

ESASkyClass = esasky.core.ESASkyClass()
Expand Down Expand Up @@ -87,11 +89,11 @@ def test_esasky_get_images_obs_id(self):
assert os.path.exists(file_path)
log.info("Checking {} data.".format(mission))
if mission.upper() == "HERSCHEL":
assert(isinstance(result[mission.upper()][0]["250"], HDUList))
assert(isinstance(result[mission.upper()][0]["350"], HDUList))
assert(isinstance(result[mission.upper()][0]["500"], HDUList))
assert (isinstance(result[mission.upper()][0]["250"], HDUList))
assert (isinstance(result[mission.upper()][0]["350"], HDUList))
assert (isinstance(result[mission.upper()][0]["500"], HDUList))
else:
assert(isinstance(result[mission.upper()][0], HDUList))
assert (isinstance(result[mission.upper()][0], HDUList))

result = None

Expand All @@ -113,10 +115,10 @@ def test_esasky_get_spectra_obs_id(self):
assert os.path.exists(file_path)
log.info("Checking {} data.".format(mission))
if mission.upper() == "HERSCHEL":
assert(isinstance(result[mission.upper()]["1342253595"]["WBS"]["WBS-V_USB_4b"], HDUList))
assert(isinstance(result[mission.upper()]["1342253595"]["HRS"]["HRS-H_LSB_4b"], HDUList))
assert (isinstance(result[mission.upper()]["1342253595"]["WBS"]["WBS-V_USB_4b"], HDUList))
assert (isinstance(result[mission.upper()]["1342253595"]["HRS"]["HRS-H_LSB_4b"], HDUList))
else:
assert(isinstance(result[mission.upper()][0], HDUList))
assert (isinstance(result[mission.upper()][0], HDUList))

result = None

Expand All @@ -132,23 +134,17 @@ def test_esasky_query_object_maps(self):

@pytest.mark.bigdata
def test_esasky_get_images(self):
download_directory = "ESASkyRemoteTest"
if not os.path.exists(download_directory):
os.makedirs(download_directory)
tempdir = tempfile.mkdtemp("ESASkyRemoteTest")

missions = ESASkyClass.list_maps()
# Remove very large map missions & missions with many results
# & missions without proper download url (INTEGRAL, SUZAKU, ALMA, AKARI)
missions = [mission for mission in missions if mission not in
("HST-OPTICAL", "HST-IR", "HST-UV", "XMM-OM-UV", "INTEGRAL", "SUZAKU", "ALMA", "AKARI")]

ESASkyClass.get_images(position="M51", missions=missions, download_dir=download_directory)

for mission in missions:
file_path = os.path.join(download_directory, mission)
assert os.path.exists(file_path)
ESASkyClass.get_images(position="M51", missions=missions, download_dir=tempdir)

shutil.rmtree(download_directory)
assert (os.path.getsize(tempdir) > 0)

def test_esasky_get_images_small(self):
download_directory = "ESASkyRemoteTest"
Expand Down

0 comments on commit 61226bd

Please sign in to comment.