Skip to content

Commit

Permalink
Add development server
Browse files Browse the repository at this point in the history
  • Loading branch information
mdesmet committed Nov 22, 2024
1 parent 4c57774 commit 0f4083d
Show file tree
Hide file tree
Showing 13 changed files with 222 additions and 136 deletions.
1 change: 1 addition & 0 deletions etc/catalog/jmx.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
connector.name=jmx
1 change: 1 addition & 0 deletions etc/catalog/memory.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
connector.name=memory
1 change: 1 addition & 0 deletions etc/catalog/tpcds.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
connector.name=tpcds
2 changes: 2 additions & 0 deletions etc/catalog/tpch.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
connector.name=tpch
tpch.splits-per-node=4
22 changes: 22 additions & 0 deletions etc/config.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
node.id=coordinator
node.environment=test

coordinator=true
experimental.concurrent-startup=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
query.max-memory=1GB
discovery.uri=http://localhost:8080

# Use task.min-writer-count > 1, as this allows to expose writer-concurrency related bugs.
task.min-writer-count=2
task.concurrency=2
task.max-writer-count=2

# Experimental protocol spooling settings
experimental.protocol.spooling.enabled=true
protocol.spooling.shared-secret-key=jxTKysfCBuMZtFqUf8UJDQ1w9ez8rynEJsJqgJf66u0=
protocol.spooling.retrieval-mode=coordinator_proxy

# Disable http request log
http-server.log.enabled=false
17 changes: 17 additions & 0 deletions etc/jvm.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-server
-Xmx2G
-XX:G1HeapRegionSize=32M
-XX:+ExplicitGCInvokesConcurrent
-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-XX:-OmitStackTraceInFastThrow
-XX:ReservedCodeCacheSize=150M
-XX:PerMethodRecompilationCutoff=10000
-XX:PerBytecodeRecompilationCutoff=10000
-Djdk.attach.allowAttachSelf=true
# jdk.nio.maxCachedBufferSize controls what buffers can be allocated in per-thread "temporary buffer cache" (sun.nio.ch.Util). Value of 0 disables the cache.
-Djdk.nio.maxCachedBufferSize=0
# Allow loading dynamic agent used by JOL
-XX:+EnableDynamicAgentLoading
-XX:+UnlockDiagnosticVMOptions
--enable-native-access=ALL-UNNAMED
13 changes: 13 additions & 0 deletions etc/spooling-manager.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
spooling-manager.name=filesystem
fs.s3.enabled=true
fs.location=s3://spooling/
s3.endpoint=http://172.17.0.1:9000/
s3.region=us-east-1
s3.aws-access-key=minio-access-key
s3.aws-secret-key=minio-secret-key
s3.path-style-access=true
fs.segment.ttl=5m
fs.segment.pruning.interval=15s
fs.segment.pruning.batch-size=250
# Disable as we don't support SSE-C while writing/reading from S3
fs.segment.encryption=false
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
"pre-commit",
"black",
"isort",
"keyring"
"keyring",
"testcontainers",
"minio"
]

setup(
Expand Down
111 changes: 111 additions & 0 deletions tests/development_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import os
import time
from contextlib import contextmanager
from pathlib import Path

from minio import Minio, S3Error
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs

from trino.constants import DEFAULT_PORT

MINIO_ROOT_USER = "minio-access-key"
MINIO_ROOT_PASSWORD = "minio-secret-key"

TRINO_VERSION = os.environ.get("TRINO_VERSION") or "latest"
TRINO_HOST = "localhost"


def create_bucket():
client = Minio(
"localhost:9000",
access_key=MINIO_ROOT_USER,
secret_key=MINIO_ROOT_PASSWORD,
secure=False
)
bucket_name = "spooling"
try:
# Check if the bucket exists
if not client.bucket_exists(bucket_name):
# Create the bucket if it does not exist
client.make_bucket(bucket_name)
print(f"Bucket {bucket_name} created successfully.")
else:
print(f"Bucket {bucket_name} already exists.")
except S3Error as e:
print(f"Error occurred: {e}")


@contextmanager
def start_development_server(port=None, trino_version=TRINO_VERSION):
minio = None
trino = None

try:
if TRINO_VERSION >= "465":
minio = DockerContainer("minio/minio:latest") \
.with_name("minio") \
.with_command(f"server --address '0.0.0.0:{9000}' --console-address '0.0.0.0:{9001}' -- /data") \
.with_env("MINIO_ROOT_USER", "minio-access-key") \
.with_env("MINIO_ROOT_PASSWORD", "minio-secret-key") \
.with_exposed_ports(9000, 9001) \
.with_bind_ports(9000, 9000) \
.with_bind_ports(9001, 9001)

# Start the container
print("Starting MinIO container...")
minio.start()

# Wait for logs indicating MinIO has started
wait_for_logs(minio, "API: http://", timeout=30)

# create spooling bucket
create_bucket()

trino = DockerContainer(f"trinodb/trino:{trino_version}") \
.with_name("trino") \
.with_env("TRINO_CONFIG_DIR", "/etc/trino") \
.with_bind_ports(DEFAULT_PORT, port)

root = Path(__file__).parent.parent

trino = trino \
.with_volume_mapping(str(root / "etc/config.properties"), "/etc/trino/config.properties") \
.with_volume_mapping(str(root / "etc/jvm.config"), "/etc/trino/jvm.config") \
.with_volume_mapping(str(root / "etc/catalog"), "/etc/trino/catalog")
if TRINO_VERSION >= "465":
trino.with_volume_mapping(
str(root / "etc/spooling-manager.properties"),
"/etc/trino/spooling-manager.properties", "rw")

print("Starting Trino container...")
trino.start()

# Wait for logs indicating the service has started
wait_for_logs(trino, "SERVER STARTED", timeout=30)

# Otherwise some tests fail with No nodes available
time.sleep(2)

yield minio, trino
finally:
# Stop containers when exiting the context
if trino:
print("Stopping Trino container...")
trino.stop()
if minio:
print("Stopping MinIO container...")
minio.stop()


def main():
"""Run Trino setup independently from pytest."""
with start_development_server(port=DEFAULT_PORT):
print(f"Trino started at {TRINO_HOST}:{DEFAULT_PORT}")

# Keep the process running so that the containers stay up
input("Press Enter to stop containers...")


if __name__ == "__main__":
main()
134 changes: 17 additions & 117 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,24 @@

import os
import socket
import subprocess
import time
from contextlib import closing
from uuid import uuid4

import pytest

import trino.logging
from trino.client import ClientSession, TrinoQuery, TrinoRequest
from tests.development_server import TRINO_HOST, TRINO_VERSION, start_development_server
from trino.constants import DEFAULT_PORT

logger = trino.logging.get_logger(__name__)


TRINO_VERSION = os.environ.get("TRINO_VERSION") or "latest"
TRINO_HOST = "127.0.0.1"
TRINO_PORT = 8080


def is_trino_available():
def is_trino_available(host, port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex((TRINO_HOST, DEFAULT_PORT))
result = sock.connect_ex((host, port))
if result == 0:
return True
return False


def get_local_port():
Expand All @@ -45,115 +38,22 @@ def get_local_port():
return s.getsockname()[1]


def get_default_trino_image_tag():
return "trinodb/trino:" + TRINO_VERSION


def start_trino(image_tag=None):
if not image_tag:
image_tag = get_default_trino_image_tag()

container_id = "trino-python-client-tests-" + uuid4().hex[:7]
local_port = get_local_port()
logger.info("starting Docker container")
docker_run = [
"docker",
"run",
"--rm",
"-p",
"{host_port}:{cont_port}".format(host_port=local_port, cont_port=TRINO_PORT),
"--name",
container_id,
image_tag,
]
run = subprocess.Popen(docker_run, universal_newlines=True, stderr=subprocess.PIPE)
return (container_id, run, "localhost", local_port)


def wait_for_trino_workers(host, port, timeout=180):
request = TrinoRequest(
host=host,
port=port,
client_session=ClientSession(
user="test_fixture"
)
)
sql = "SELECT state FROM system.runtime.nodes"
t0 = time.time()
while True:
query = TrinoQuery(request, sql)
rows = list(query.execute())
if any(row[0] == "active" for row in rows):
break
if time.time() - t0 > timeout:
raise TimeoutError
time.sleep(1)


def wait_for_trino_coordinator(stream, timeout=180):
started_tag = "======== SERVER STARTED ========"
t0 = time.time()
for line in iter(stream.readline, b""):
if line:
print(line)
if started_tag in line:
time.sleep(5)
return True
if time.time() - t0 > timeout:
logger.error("coordinator took longer than %s to start", timeout)
raise TimeoutError
return False


def start_local_trino_server(image_tag):
container_id, proc, host, port = start_trino(image_tag)
print("trino.server.state starting")
trino_ready = wait_for_trino_coordinator(proc.stderr)
if not trino_ready:
raise Exception("Trino server did not start")
wait_for_trino_workers(host, port)
print("trino.server.state ready")
return container_id, proc, host, port


def start_trino_and_wait(image_tag=None):
container_id = None
proc = None
host = os.environ.get("TRINO_RUNNING_HOST", None)
if host:
port = os.environ.get("TRINO_RUNNING_PORT", DEFAULT_PORT)
else:
container_id, proc, host, port = start_local_trino_server(
image_tag
)

print("trino.server.hostname {}".format(host))
print("trino.server.port {}".format(port))
if proc:
print("trino.server.pid {}".format(proc.pid))
if container_id:
print("trino.server.contained_id {}".format(container_id))
return container_id, proc, host, port


def stop_trino(container_id, proc):
subprocess.check_call(["docker", "kill", container_id])


@pytest.fixture(scope="module")
@pytest.fixture(scope="session")
def run_trino():
if is_trino_available():
yield None, TRINO_HOST, DEFAULT_PORT
return
host = os.environ.get("TRINO_RUNNING_HOST", TRINO_HOST)
port = os.environ.get("TRINO_RUNNING_PORT", DEFAULT_PORT)

image_tag = os.environ.get("TRINO_IMAGE")
if not image_tag:
image_tag = get_default_trino_image_tag()
# Is there any local Trino available
if is_trino_available(host, port):
yield host, port
return
else:
print(f"Could not connect to Trino at {host}:{port}")

container_id, proc, host, port = start_trino_and_wait(image_tag)
yield proc, host, port
if container_id or proc:
stop_trino(container_id, proc)
# Start Trino and MinIO server
local_port = get_local_port()
with start_development_server(port=local_port):
yield TRINO_HOST, local_port


def trino_version():
Expand Down
Loading

0 comments on commit 0f4083d

Please sign in to comment.