Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Spooling Protocol #509

Merged
merged 4 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,32 @@ conn = connect(
)
```

## Spooled protocol

The client spooling protocol requires [a Trino server with spooling protocol support](https://trino.io/docs/current/client/client-protocol.html#spooling-protocol).

Enable the spooling protocol by specifying a supported encoding in the `encoding` parameter:

Supported encodings are `json`, `json+lz4` and `json+zstd`.

```python
from trino.dbapi import connect

conn = connect(
encoding="json+zstd"
)
```

or a list of supported encodings in order of preference:

```python
from trino.dbapi import connect

conn = connect(
encoding=["json+zstd", "json"]
)
```

## Transactions

The client runs by default in *autocommit* mode. To enable transactions, set
Expand Down
1 change: 1 addition & 0 deletions etc/catalog/jmx.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
connector.name=jmx
1 change: 1 addition & 0 deletions etc/catalog/memory.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
connector.name=memory
1 change: 1 addition & 0 deletions etc/catalog/tpcds.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
connector.name=tpcds
2 changes: 2 additions & 0 deletions etc/catalog/tpch.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
connector.name=tpch
tpch.splits-per-node=4
11 changes: 11 additions & 0 deletions etc/config-pre-466.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
node.id=coordinator
node.environment=test

coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
query.max-memory=1GB
discovery.uri=http://localhost:8080

# Disable http request log
http-server.log.enabled=false
17 changes: 17 additions & 0 deletions etc/config.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
node.id=coordinator
node.environment=test

coordinator=true
experimental.concurrent-startup=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
query.max-memory=1GB
discovery.uri=http://localhost:8080

# spooling protocol settings
protocol.spooling.enabled=true
protocol.spooling.shared-secret-key=jxTKysfCBuMZtFqUf8UJDQ1w9ez8rynEJsJqgJf66u0=
protocol.spooling.retrieval-mode=coordinator_proxy

# Disable http request log
http-server.log.enabled=false
16 changes: 16 additions & 0 deletions etc/jvm-pre-466.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-server
-Xmx2G
-XX:G1HeapRegionSize=32M
-XX:+ExplicitGCInvokesConcurrent
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-XX:-OmitStackTraceInFastThrow
-XX:ReservedCodeCacheSize=150M
-XX:PerMethodRecompilationCutoff=10000
-XX:PerBytecodeRecompilationCutoff=10000
-Djdk.attach.allowAttachSelf=true
# jdk.nio.maxCachedBufferSize controls what buffers can be allocated in per-thread "temporary buffer cache" (sun.nio.ch.Util). Value of 0 disables the cache.
-Djdk.nio.maxCachedBufferSize=0
# Allow loading dynamic agent used by JOL
-XX:+EnableDynamicAgentLoading
-XX:+UnlockDiagnosticVMOptions
17 changes: 17 additions & 0 deletions etc/jvm.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-server
-Xmx2G
-XX:G1HeapRegionSize=32M
-XX:+ExplicitGCInvokesConcurrent
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-XX:-OmitStackTraceInFastThrow
-XX:ReservedCodeCacheSize=150M
-XX:PerMethodRecompilationCutoff=10000
-XX:PerBytecodeRecompilationCutoff=10000
-Djdk.attach.allowAttachSelf=true
# jdk.nio.maxCachedBufferSize controls what buffers can be allocated in per-thread "temporary buffer cache" (sun.nio.ch.Util). Value of 0 disables the cache.
-Djdk.nio.maxCachedBufferSize=0
# Allow loading dynamic agent used by JOL
-XX:+EnableDynamicAgentLoading
-XX:+UnlockDiagnosticVMOptions
--enable-native-access=ALL-UNNAMED
8 changes: 8 additions & 0 deletions etc/spooling-manager.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
spooling-manager.name=filesystem
fs.s3.enabled=true
fs.location=s3://spooling/
s3.endpoint=http://localstack:4566/
s3.region=us-east-1
s3.aws-access-key=test
s3.aws-secret-key=test
s3.path-style-access=true
6 changes: 5 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
"pre-commit",
"black",
"isort",
"keyring"
"keyring",
"testcontainers",
"boto3"
]

setup(
Expand Down Expand Up @@ -81,11 +83,13 @@
],
python_requires=">=3.9",
install_requires=[
"lz4",
"python-dateutil",
"pytz",
# requests CVE https://github.com/advisories/GHSA-j8r2-6x86-q33q
"requests>=2.31.0",
"tzlocal",
"zstandard",
],
extras_require={
"all": all_require,
Expand Down
139 changes: 139 additions & 0 deletions tests/development_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import os
import time
from contextlib import contextmanager
from pathlib import Path

from testcontainers.core.container import DockerContainer
from testcontainers.core.network import Network
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.localstack import LocalStackContainer

from trino.constants import DEFAULT_PORT

MINIO_ROOT_USER = "minio-access-key"
MINIO_ROOT_PASSWORD = "minio-secret-key"

TRINO_VERSION = os.environ.get("TRINO_VERSION") or "latest"
TRINO_HOST = "localhost"


def create_bucket(s3_client):
bucket_name = "spooling"
try:
print("Checking for bucket existence...")
response = s3_client.list_buckets()
buckets = [bucket["Name"] for bucket in response["Buckets"]]
if bucket_name in buckets:
print("Bucket exists!")
return
except s3_client.exceptions.ClientError as e:
if not e.response['Error']['Code'] == '404':
print("An error occurred:", e)
return

try:
print("Creating bucket...")
s3_client.create_bucket(
Bucket=bucket_name,
)
print("Bucket created!")
except s3_client.exceptions.ClientError as e:
print("An error occurred:", e)


@contextmanager
def start_development_server(port=None, trino_version=TRINO_VERSION):
network = None
localstack = None
trino = None

try:
network = Network().create()
supports_spooling_protocol = TRINO_VERSION == "latest" or int(TRINO_VERSION) >= 466
if supports_spooling_protocol:
localstack = LocalStackContainer(image="localstack/localstack:latest", region_name="us-east-1") \
.with_name("localstack") \
.with_network(network) \
.with_bind_ports(4566, 4566) \
.with_bind_ports(4571, 4571) \
.with_env("SERVICES", "s3")

# Start the container
print("Starting LocalStack container...")
localstack.start()

# Wait for logs indicating MinIO has started
wait_for_logs(localstack, "Ready.", timeout=30)

# create spooling bucket
create_bucket(localstack.get_client("s3"))

trino = DockerContainer(f"trinodb/trino:{trino_version}") \
.with_name("trino") \
.with_network(network) \
.with_env("TRINO_CONFIG_DIR", "/etc/trino") \
.with_bind_ports(DEFAULT_PORT, port)

root = Path(__file__).parent.parent

trino = trino \
.with_volume_mapping(str(root / "etc/catalog"), "/etc/trino/catalog")

# Enable spooling config
if supports_spooling_protocol:
trino \
.with_volume_mapping(
str(root / "etc/spooling-manager.properties"),
"/etc/trino/spooling-manager.properties", "rw") \
.with_volume_mapping(str(root / "etc/jvm.config"), "/etc/trino/jvm.config") \
.with_volume_mapping(str(root / "etc/config.properties"), "/etc/trino/config.properties")
else:
trino \
.with_volume_mapping(str(root / "etc/jvm-pre-466.config"), "/etc/trino/jvm.config") \
.with_volume_mapping(str(root / "etc/config-pre-466.properties"), "/etc/trino/config.properties")

print("Starting Trino container...")
trino.start()

# Wait for logs indicating the service has started
wait_for_logs(trino, "SERVER STARTED", timeout=60)

# Otherwise some tests fail with No nodes available
time.sleep(2)

yield localstack, trino, network
finally:
# Stop containers when exiting the context
if trino:
try:
print("Stopping Trino container...")
trino.stop()
except Exception as e:
print(f"Error stopping Trino container: {e}")

if localstack:
try:
print("Stopping LocalStack container...")
localstack.stop()
except Exception as e:
print(f"Error stopping LocalStack container: {e}")

if network:
try:
print("Removing network...")
network.remove()
except Exception as e:
print(f"Error removing network: {e}")


def main():
"""Run Trino setup independently from pytest."""
with start_development_server(port=DEFAULT_PORT):
print(f"Trino started at {TRINO_HOST}:{DEFAULT_PORT}")

# Keep the process running so that the containers stay up
input("Press Enter to stop containers...")


if __name__ == "__main__":
main()
Loading
Loading