Skip to content

Commit

Permalink
Update to spark 3.3 (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
george-zubrienko authored Sep 2, 2022
1 parent be58c37 commit 0ef7ceb
Show file tree
Hide file tree
Showing 8 changed files with 554 additions and 31 deletions.
11 changes: 3 additions & 8 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
if: ${{ github.ref != 'refs/heads/main' }}

container:
image: esdcrproduction.azurecr.io/spark:v1.3.2-bitnami-3.2.0-python-3.9.7-0
image: esdcrproduction.azurecr.io/spark:v3.0.2-bitnami-3.3.0-python-3.9.13
credentials:
username: ${{ secrets.AZCR_PROD_USER }}
password: ${{ secrets.AZCR_PROD_TOKEN }}
Expand All @@ -35,16 +35,11 @@ jobs:
run: |
set -e
pypath=$(pwd)
export PYTHONPATH="$pypath:$PYTHONPATH"
find ./hadoop_fs_wrapper -type f -name "*.py" | xargs /github/home/.local/bin/poetry run pylint
- name: Unit test
shell: bash
run: |
set -e
pypath=$(pwd)
export PYTHONPATH="$pypath:$PYTHONPATH"
set -euxo pipefail
/github/home/.local/bin/poetry run pytest ./test --doctest-modules --junitxml=junit/test-results.xml --cov=. --cov-report=term-missing:skip-covered | tee pytest-coverage.txt
- name: Publish Code Coverage
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/prepare_release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ jobs:
uses: SneaksAndData/github-actions/[email protected]
with:
major_v: 0
minor_v: 4
minor_v: 5
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Select a version that matches hadoop version you are using:
| Hadoop Version | Compatible hadoop-fs-wrapper version |
|----------------|:------------------------------------:|
| 3.2.x | 0.4.x |
| 3.3.x | 0.4.x |
| 3.3.x | 0.4.x, 0.5.x |

## Usage
Common use case is accessing Hadoop FileSystem from Spark session object:
Expand Down Expand Up @@ -42,4 +42,4 @@ def is_valid_source_path(file_system: FileSystem, path: str) -> bool:
Currently basic filesystem operations (listing, deleting, search, iterative listing etc.) are supported. If an operation you require is not yet wrapped,
please open an issue or create a PR.

All changes are tested against Spark 3.2 running in local mode.
All changes are tested against Spark 3.2/3.3 running in local mode.
524 changes: 524 additions & 0 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ readme = "README.md"
repository = 'https://github.com/SneaksAndData/hadoop-fs-wrapper'

[tool.poetry.dependencies]
python = "^3.8"
pyspark = "~3.2"
python = "^3.9"
pyspark = "~3.3"

[tool.poetry.dev-dependencies]
pytest = "^7.0"
pytest-cov = "^2.12"
pytest-cov = "^3.0"
pylint = "^2.12"

[build-system]
Expand Down
9 changes: 9 additions & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import pytest as pytest
from pyspark.sql import SparkSession


@pytest.fixture(scope="session")
def spark_session():
session = SparkSession.builder.getOrCreate()
yield session
session.stop()
25 changes: 10 additions & 15 deletions test/test_file_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,10 @@
import pytest
from pyspark.sql import SparkSession

from hadoop_fs_wrapper.wrappers import FileSystem
from hadoop_fs_wrapper.wrappers.file_system import FileSystem


@pytest.fixture
def spark_session():
return SparkSession.builder.master('local[*]').getOrCreate()


def test_list_objects(spark_session):
def test_list_objects(spark_session: SparkSession):
test_data_path = f"{pathlib.Path(__file__).parent.resolve()}/data"

try:
Expand All @@ -45,7 +40,7 @@ def test_list_objects(spark_session):
pytest.fail("Failed to run list_objects")


def test_list_objects_stream(spark_session):
def test_list_objects_stream(spark_session: SparkSession):
test_data_path = f"{pathlib.Path(__file__).parent.resolve()}/data"

try:
Expand All @@ -59,7 +54,7 @@ def test_list_objects_stream(spark_session):
pytest.fail("Failed to run list_objects")


def test_glob_status(spark_session):
def test_glob_status(spark_session: SparkSession):
test_data_path = f"{pathlib.Path(__file__).parent.resolve()}/data"

try:
Expand All @@ -71,7 +66,7 @@ def test_glob_status(spark_session):
pytest.fail("Failed to run glob_status")


def test_read_file(spark_session):
def test_read_file(spark_session: SparkSession):
test_data_path = f"{pathlib.Path(__file__).parent.resolve()}/data"
expected_data = '"a";"b"\n1;2'

Expand All @@ -94,7 +89,7 @@ def test_read_file(spark_session):
),
],
)
def test_exists(spark_session, path, expected):
def test_exists(spark_session: SparkSession, path, expected):
test_data_path = f"{pathlib.Path(__file__).parent.resolve()}/data"

try:
Expand All @@ -106,7 +101,7 @@ def test_exists(spark_session, path, expected):
pytest.fail("Failed to run test_exists_true")


def test_write_and_read(spark_session):
def test_write_and_read(spark_session: SparkSession):
test_data_path = f"{pathlib.Path(__file__).parent.resolve()}/data"
sample_data = "hello world!"
sample_data_encoding = 'utf-8'
Expand All @@ -121,7 +116,7 @@ def test_write_and_read(spark_session):
pytest.fail("Failed to run test_write_and_read")


def test_write_delete_exists(spark_session):
def test_write_delete_exists(spark_session: SparkSession):
test_data_path = f"{pathlib.Path(__file__).parent.resolve()}/data"
sample_data = "hello world!"
sample_data_encoding = 'utf-8'
Expand All @@ -140,7 +135,7 @@ def test_write_delete_exists(spark_session):
pytest.fail("Failed to run test_write_delete_exists")


def test_write_rename(spark_session):
def test_write_rename(spark_session: SparkSession):
test_data_path = f"{pathlib.Path(__file__).parent.resolve()}/data"
sample_data = "hello world!"
sample_data_encoding = 'utf-8'
Expand Down Expand Up @@ -178,7 +173,7 @@ def test_write_rename(spark_session):
),
],
)
def test_length(spark_session, path, expected):
def test_length(spark_session: SparkSession, path, expected):
test_data_path = f"{pathlib.Path(__file__).parent.resolve()}/data"

try:
Expand Down
4 changes: 2 additions & 2 deletions test/test_parse_hadoop_filestatus.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@


from datetime import datetime
from hadoop_fs_wrapper.models import HadoopFileStatus
from hadoop_fs_wrapper.models import FileStatus
from hadoop_fs_wrapper.models.hadoop_file_status import HadoopFileStatus
from hadoop_fs_wrapper.models.file_status import FileStatus


class MockPath:
Expand Down

0 comments on commit 0ef7ceb

Please sign in to comment.