Skip to content

Commit

Permalink
Use starlette instead of uvicorn (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
alippai authored Jan 8, 2024
1 parent c0db354 commit f744939
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 74 deletions.
8 changes: 2 additions & 6 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,15 @@ on: [push, pull_request]

jobs:
build:

runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.11']

steps:
- name: Check out repository
uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
- name: Set up Python 3.11
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
python-version: 3.11

- name: Set up a Vertica server docker container
timeout-minutes: 15
Expand Down
98 changes: 40 additions & 58 deletions fake_webhdfs.py
Original file line number Diff line number Diff line change
@@ -1,81 +1,63 @@
from io import BytesIO
from typing import Any

from fastapi import FastAPI, Request, Response
from starlette.applications import Starlette
from starlette.responses import JSONResponse, Response
from starlette.routing import Route
from starlette.requests import Request

from global_state import files, results

GENERIC_NOT_FOUND = f"""{{
"RemoteException": {{
GENERIC_NOT_FOUND = {
"RemoteException": {
"exception": "FileNotFoundException",
"javaClassName": "java.io.FileNotFoundException",
"message": "File does not exist: /foo/a.patch"
}}
}}"""

def generic_file(size: int) -> str:
return f'''{{
"FileStatus": {{
"accessTime" : 0,
"blockSize" : 0,
"group" : "",
"length" : {size},
"modificationTime": 0,
"owner" : "",
"pathSuffix" : "",
"permission" : "666",
"replication" : 1,
"type" : "FILE"
}}
}}'''

app = FastAPI()

@app.get("/{full_path:path}")
async def get_handler(request: Request, full_path: str) -> Response:
print(f"full path {full_path}")
print(f"query param {request.query_params}")
if request.query_params['op'] == "GETFILESTATUS":
"message": "File does not exist: /foo/a.patch",
}
}

def generic_file(size: int) -> dict[str, dict[str, Any]]:
return {
"FileStatus": {
"accessTime" : 0,
"blockSize" : 0,
"group" : "",
"length" : size,
"modificationTime": 0,
"owner" : "",
"pathSuffix" : "",
"permission" : "666",
"replication" : 1,
"type" : "FILE",
}
}

async def handler(request: Request) -> Response:
full_path = request.url.path[1:]
operation = request.query_params['op']
if operation == "GETFILESTATUS":
if full_path in files:
return Response(generic_file(files[full_path]), 200, media_type='application/json')
return JSONResponse(generic_file(files[full_path]))
else:
return Response(GENERIC_NOT_FOUND, 404, media_type='application/json')

@app.put("/{full_path:path}")
async def put_handler(request: Request, full_path: str) -> Response:
print(f"full path {full_path}")
print(f"query param {request.query_params}")
if request.query_params['op'] == "MKDIRS":
return Response('{"boolean": true}', 200, media_type='application/json')
if request.query_params["op"] == "CREATE":
return JSONResponse(GENERIC_NOT_FOUND, 404)
if operation == "CREATE":
if "create_redirected" not in request.query_params:
return Response(status_code=307, headers={"location": f"http://{request.base_url.hostname}:{request.base_url.port}/{full_path}?{request.query_params}&create_redirected=true"})
else:
# consume the body and assert it's empty
assert len(await request.body()) == 0
files[full_path] = 0
return Response(status_code=201, headers={"location": f"hdfs://{request.base_url.hostname}:{request.base_url.port}/{full_path}"})
if request.query_params["op"] == "RENAME":
return Response('{"boolean": true}', 200, media_type='application/json')

@app.delete("/{full_path:path}")
async def delete_handler(request: Request, full_path: str) -> Response:
print(f"full path {full_path}")
print(f"query param {request.query_params}")
if request.query_params["op"] == "DELETE":
return Response('{"boolean": true}', 200, media_type='application/json')

@app.post("/{full_path:path}")
async def post_handler(request: Request, full_path: str) -> Response:
print(f"full path {full_path}")
print(f"query param {request.query_params}")
if request.query_params["op"] == "TRUNCATE":
return Response('{"boolean": true}', 200, media_type='application/json')
if request.query_params["op"] == "APPEND":
if operation in {"RENAME","DELETE", "TRUNCATE", "MKDIRS"}:
return JSONResponse({"boolean": True})
if operation == "APPEND":
if "append_redirected" not in request.query_params:
return Response(status_code=307, headers={"location": f"http://{request.base_url.hostname}:{request.base_url.port}/{full_path}?{request.query_params}&append_redirected=true"})
else:
ret = await request.body()
b = BytesIO(ret)
files[full_path] += len(ret)
results.append(b)
return Response(status_code=200)
return Response()
raise Exception(f"Unhandled operation {operation} at path {full_path} and method {request.method}")
app = Starlette(debug=True, routes=[Route("/{full_path:path}", handler, methods=["GET", "HEAD", "POST", "PUT", "DELETE", "CONNECT", "OPTIONS", "TRACE", "PATCH"])])
6 changes: 4 additions & 2 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import threading
import uvicorn

import pyarrow as pa
import vertica_python as vp

from global_state import finish
Expand All @@ -17,13 +18,13 @@ def run_in_thread(self):
thread.start()
try:
while not self.started:
time.sleep(1e-3)
time.sleep(0.01)
yield
finally:
self.should_exit = True
thread.join()

config = uvicorn.Config("fake_webhdfs:app", host="127.0.0.1", port=8000, log_level="info")
config = uvicorn.Config("fake_webhdfs:app", host="127.0.0.1", port=8000, log_level="trace")
server = Server(config=config)

def test_read_main():
Expand All @@ -39,3 +40,4 @@ def test_read_main():
# a single pyarrow table
t = finish()
print(t)
assert t.equals(pa.Table.from_arrays([pa.array([1])], names=['account_id']))
12 changes: 4 additions & 8 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,9 @@ passenv = *
commands =
pytest -s {posargs}
deps =
pyarrow
pytest
pytest-timeout
parameterized
python-dateutil
mock
fastapi
starlette
uvicorn
vertica-python
pyarrow
requests
vertica-python

0 comments on commit f744939

Please sign in to comment.