Skip to content

Commit

Permalink
Use app state instead of globals
Browse files Browse the repository at this point in the history
  • Loading branch information
alippai authored Jan 10, 2024
1 parent 73c7efe commit 20aee1f
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 34 deletions.
11 changes: 5 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ How it works:
4. Convert the collect parquet tables into pyarrow tables

Limitations:
1. It can be used on a single thread only right now
2. It's pure python, receiving IO performance may not be optimal
3. Using Parquet vs Arrow still has some overhead
4. Vertica has to connect to the python script, for this it needs IP or hostname
5. It's not encrypted
6. Resultset needs to fit into memory, no streaming yet
1. It's pure python, receiving IO performance may not be optimal
2. Using Parquet vs Arrow still has some overhead
3. Vertica has to connect to the python script, for this it needs IP or hostname
4. It's not encrypted
5. Resultset needs to fit into memory, no streaming yet
actually it needs to keep both the compressed and uncompressed versions in the memory for a moment

```
Expand Down
12 changes: 5 additions & 7 deletions fake_webhdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
from starlette.routing import Route
from starlette.requests import Request

from global_state import files, results

GENERIC_NOT_FOUND = {
"RemoteException": {
"exception": "FileNotFoundException",
Expand Down Expand Up @@ -36,8 +34,8 @@ async def handler(request: Request) -> Response:
full_path = request.url.path[1:]
operation = request.query_params['op']
if operation == "GETFILESTATUS":
if full_path in files:
return JSONResponse(generic_file(files[full_path]))
if full_path in app.state.files:
return JSONResponse(generic_file(app.state.files[full_path]))
else:
return JSONResponse(GENERIC_NOT_FOUND, 404)
if operation == "CREATE":
Expand All @@ -46,7 +44,7 @@ async def handler(request: Request) -> Response:
else:
# consume the body and assert it's empty
assert len(await request.body()) == 0
files[full_path] = 0
app.state.files[full_path] = 0
return Response(status_code=201, headers={"location": f"hdfs://{request.base_url.hostname}:{request.base_url.port}/{full_path}"})
if operation in {"RENAME","DELETE", "TRUNCATE", "MKDIRS"}:
return JSONResponse({"boolean": True})
Expand All @@ -56,8 +54,8 @@ async def handler(request: Request) -> Response:
else:
ret = await request.body()
b = BytesIO(ret)
files[full_path] += len(ret)
results.append(b)
app.state.files[full_path] += len(ret)
app.state.results.append(b)
return Response()
raise Exception(f"Unhandled operation {operation} at path {full_path} and method {request.method}")
app = Starlette(debug=True, routes=[Route("/{full_path:path}", handler, methods=["GET", "HEAD", "POST", "PUT", "DELETE", "CONNECT", "OPTIONS", "TRACE", "PATCH"])])
17 changes: 0 additions & 17 deletions global_state.py

This file was deleted.

9 changes: 5 additions & 4 deletions tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import uvicorn

import pyarrow as pa
import pyarrow.parquet as pq
import vertica_python as vp

from global_state import finish
from fake_webhdfs import app

class Server(uvicorn.Server):
def install_signal_handlers(self):
Expand All @@ -24,7 +25,7 @@ def run_in_thread(self):
self.should_exit = True
thread.join()

config = uvicorn.Config("fake_webhdfs:app", host="127.0.0.1", port=8000, log_level="trace")
config = uvicorn.Config(app, host="127.0.0.1", log_level="trace")
server = Server(config=config)

def test_read_main():
Expand All @@ -34,10 +35,10 @@ def test_read_main():

with server.run_in_thread():
connection.cursor().execute(
"EXPORT TO PARQUET(directory = 'hdfs://127.0.0.1:8000/virtualdata') AS SELECT 1 AS account_id"
f"EXPORT TO PARQUET(directory = 'hdfs://127.0.0.1:{server.servers[0].sockets[0].getsockname()[1]}/virtualdata') AS SELECT 1 AS account_id"
).fetchall()

# a single pyarrow table
t = finish()
t = pa.concat_tables([pq.read_table(r) for r in app.state.results])
print(t)
assert t.equals(pa.Table.from_arrays([pa.array([1])], names=['account_id']))

0 comments on commit 20aee1f

Please sign in to comment.