Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CI: Mark s3 tests parallel safe #35895

Merged
merged 3 commits into from
Aug 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions pandas/tests/io/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ def feather_file(datapath):


@pytest.fixture
def s3so():
return dict(client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"})
def s3so(worker_id):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TomAugspurger where is this defined? I'm getting test failures locally, guessing i need to update some dependency

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worker_id is from pytest-xdist.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_s3_roundtrip_for_dir is failing in the CI and im trying to reproduce locally but it is getting pytest.skipped and it isn't clear why. could worker_id be involved?

worker_id = "5" if worker_id == "master" else worker_id.lstrip("gw")
return dict(client_kwargs={"endpoint_url": f"http://127.0.0.1:555{worker_id}/"})


@pytest.fixture(scope="module")
def s3_base():
@pytest.fixture(scope="session")
def s3_base(worker_id):
"""
Fixture for mocking S3 interaction.

Expand All @@ -61,11 +62,13 @@ def s3_base():
# Launching moto in server mode, i.e., as a separate process
# with an S3 endpoint on localhost

endpoint_uri = "http://127.0.0.1:5555/"
worker_id = "5" if worker_id == "master" else worker_id.lstrip("gw")
endpoint_port = f"555{worker_id}"
endpoint_uri = f"http://127.0.0.1:{endpoint_port}/"

# pipe to null to avoid logging in terminal
proc = subprocess.Popen(
shlex.split("moto_server s3 -p 5555"), stdout=subprocess.DEVNULL
shlex.split(f"moto_server s3 -p {endpoint_port}"), stdout=subprocess.DEVNULL
)

timeout = 5
Expand All @@ -79,7 +82,7 @@ def s3_base():
pass
timeout -= 0.1
time.sleep(0.1)
yield
yield endpoint_uri

proc.terminate()
proc.wait()
Expand Down Expand Up @@ -119,9 +122,8 @@ def add_tips_files(bucket_name):
cli.put_object(Bucket=bucket_name, Key=s3_key, Body=f)

bucket = "pandas-test"
endpoint_uri = "http://127.0.0.1:5555/"
conn = boto3.resource("s3", endpoint_url=endpoint_uri)
cli = boto3.client("s3", endpoint_url=endpoint_uri)
conn = boto3.resource("s3", endpoint_url=s3_base)
cli = boto3.client("s3", endpoint_url=s3_base)

try:
cli.create_bucket(Bucket=bucket)
Expand All @@ -143,7 +145,7 @@ def add_tips_files(bucket_name):
s3fs.S3FileSystem.clear_instance_cache()
yield conn

s3 = s3fs.S3FileSystem(client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"})
s3 = s3fs.S3FileSystem(client_kwargs={"endpoint_url": s3_base})

try:
s3.rm(bucket, recursive=True)
Expand Down
6 changes: 2 additions & 4 deletions pandas/tests/io/json/test_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_read_zipped_json(datapath):


@td.skip_if_not_us_locale
def test_with_s3_url(compression, s3_resource):
def test_with_s3_url(compression, s3_resource, s3so):
# Bucket "pandas-test" created in tests/io/conftest.py

df = pd.read_json('{"a": [1, 2, 3], "b": [4, 5, 6]}')
Expand All @@ -45,9 +45,7 @@ def test_with_s3_url(compression, s3_resource):
s3_resource.Bucket("pandas-test").put_object(Key="test-1", Body=f)

roundtripped_df = pd.read_json(
"s3://pandas-test/test-1",
compression=compression,
storage_options=dict(client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"}),
"s3://pandas-test/test-1", compression=compression, storage_options=s3so,
)
tm.assert_frame_equal(df, roundtripped_df)

Expand Down
7 changes: 2 additions & 5 deletions pandas/tests/io/json/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -1702,17 +1702,14 @@ def test_json_multiindex(self, dataframe, expected):
result = series.to_json(orient="index")
assert result == expected

def test_to_s3(self, s3_resource):
def test_to_s3(self, s3_resource, s3so):
import time

# GH 28375
mock_bucket_name, target_file = "pandas-test", "test.json"
df = DataFrame({"x": [1, 2, 3], "y": [2, 4, 6]})
df.to_json(
f"s3://{mock_bucket_name}/{target_file}",
storage_options=dict(
client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"}
),
f"s3://{mock_bucket_name}/{target_file}", storage_options=s3so,
)
timeout = 5
while True:
Expand Down
34 changes: 24 additions & 10 deletions pandas/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,6 @@ def check_round_trip(
"""
write_kwargs = write_kwargs or {"compression": None}
read_kwargs = read_kwargs or {}
if isinstance(path, str) and "s3://" in path:
s3so = dict(client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"})
read_kwargs["storage_options"] = s3so
write_kwargs["storage_options"] = s3so

if expected is None:
expected = df
Expand Down Expand Up @@ -555,15 +551,24 @@ def test_s3_roundtrip_explicit_fs(self, df_compat, s3_resource, pa, s3so):
write_kwargs=kw,
)

def test_s3_roundtrip(self, df_compat, s3_resource, pa):
def test_s3_roundtrip(self, df_compat, s3_resource, pa, s3so):
if LooseVersion(pyarrow.__version__) <= LooseVersion("0.17.0"):
pytest.skip()
# GH #19134
check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet")
s3so = dict(storage_options=s3so)
check_round_trip(
df_compat,
pa,
path="s3://pandas-test/pyarrow.parquet",
read_kwargs=s3so,
write_kwargs=s3so,
)

@td.skip_if_no("s3fs")
@pytest.mark.parametrize("partition_col", [["A"], []])
def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col):
def test_s3_roundtrip_for_dir(
self, df_compat, s3_resource, pa, partition_col, s3so
):
# GH #26388
expected_df = df_compat.copy()

Expand All @@ -587,7 +592,10 @@ def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col):
pa,
expected=expected_df,
path="s3://pandas-test/parquet_dir",
write_kwargs={"partition_cols": partition_col, "compression": None},
read_kwargs=dict(storage_options=s3so),
write_kwargs=dict(
partition_cols=partition_col, compression=None, storage_options=s3so
),
check_like=True,
repeat=1,
)
Expand Down Expand Up @@ -761,9 +769,15 @@ def test_filter_row_groups(self, fp):
result = read_parquet(path, fp, filters=[("a", "==", 0)])
assert len(result) == 1

def test_s3_roundtrip(self, df_compat, s3_resource, fp):
def test_s3_roundtrip(self, df_compat, s3_resource, fp, s3so):
# GH #19134
check_round_trip(df_compat, fp, path="s3://pandas-test/fastparquet.parquet")
check_round_trip(
df_compat,
fp,
path="s3://pandas-test/fastparquet.parquet",
read_kwargs=dict(storage_options=s3so),
write_kwargs=dict(compression=None, storage_options=s3so),
)

def test_partition_cols_supported(self, fp, df_full):
# GH #23283
Expand Down