Skip to content

Commit

Permalink
feat(cli): improve docker quickstart (datahub-project#7184)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and Eric Yomi committed Feb 8, 2023
1 parent c7f36cd commit 11e2f9c
Show file tree
Hide file tree
Showing 14 changed files with 362 additions and 231 deletions.
8 changes: 4 additions & 4 deletions metadata-ingestion/src/datahub/cli/delete_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def merge(self, another_result: "DeletionResult") -> None:
self.sample_records.extend(another_result.sample_records)


@telemetry.with_telemetry
@telemetry.with_telemetry()
def delete_for_registry(
registry_id: str,
soft: bool,
Expand Down Expand Up @@ -133,7 +133,7 @@ def delete_for_registry(
@click.option("-n", "--dry-run", required=False, is_flag=True)
@click.option("--only-soft-deleted", required=False, is_flag=True, default=False)
@upgrade.check_upgrade
@telemetry.with_telemetry
@telemetry.with_telemetry()
def delete(
urn: str,
aspect_name: Optional[str],
Expand Down Expand Up @@ -271,7 +271,7 @@ def _get_current_time() -> int:
return int(time.time() * 1000.0)


@telemetry.with_telemetry
@telemetry.with_telemetry()
def delete_with_filters(
dry_run: bool,
soft: bool,
Expand Down Expand Up @@ -441,7 +441,7 @@ def _delete_one_urn(
return deletion_result


@telemetry.with_telemetry
@telemetry.with_telemetry()
def delete_one_urn_cmd(
urn: str,
aspect_name: Optional[str] = None,
Expand Down
231 changes: 163 additions & 68 deletions metadata-ingestion/src/datahub/cli/docker_check.py
Original file line number Diff line number Diff line change
@@ -1,122 +1,207 @@
import enum
import os
from contextlib import contextmanager
from typing import Iterator, List, Optional, Tuple
from dataclasses import dataclass
from typing import Any, Dict, Iterator, List, Optional

import docker
import docker.errors
import docker.models.containers

from datahub.configuration.common import ExceptionWithProps

REQUIRED_CONTAINERS = [
"elasticsearch-setup",
"elasticsearch",
"datahub-gms",
"datahub-frontend-react",
"datahub-upgrade",
"schema-registry",
"broker",
"zookeeper",
# These two containers are not necessary - only helpful in debugging.
# "kafka-topics-ui",
# "schema-registry-ui",
# "kibana",
# "kafka-rest-proxy",
# "datahub-mce-consumer",
# "datahub-mae-consumer"
]

# We expect these containers to exit 0, while all other containers
# are expected to be running and healthy.
ENSURE_EXIT_SUCCESS = [
"kafka-setup",
"elasticsearch-setup",
"mysql-setup",
"datahub-upgrade",
]

# If present, we check that the container is ok. If it exists
# in ENSURE_EXIT_SUCCESS, we check that it exited 0. Otherwise,
# we check that it is running and healthy.
CONTAINERS_TO_CHECK_IF_PRESENT = [
# We only add this container in some cases, but if it's present, we
# definitely want to check that it exits properly.
"mysql",
"mysql-setup",
"cassandra",
"cassandra-setup",
"neo4j",
"elasticsearch-setup",
"schema-registry",
"zookeeper",
"datahub-upgrade",
"kafka-setup",
# "datahub-mce-consumer",
# "datahub-mae-consumer",
]

# Docker seems to under-report memory allocated, so we also need a bit of buffer to account for it.
MIN_MEMORY_NEEDED = 3.8 # GB

DATAHUB_COMPOSE_PROJECT_FILTER = {"label": "com.docker.compose.project=datahub"}

@contextmanager
def get_client_with_error() -> (
Iterator[Tuple[docker.DockerClient, Optional[Exception]]]
):
# This method is structured somewhat strangely because we
# need to make sure that we only yield once.

docker_cli = None
class DockerNotRunningError(Exception):
SHOW_STACK_TRACE = False


class DockerLowMemoryError(Exception):
SHOW_STACK_TRACE = False


class DockerComposeVersionError(Exception):
SHOW_STACK_TRACE = False


class QuickstartError(Exception, ExceptionWithProps):
SHOW_STACK_TRACE = False

def __init__(self, message: str, container_statuses: Dict[str, Any]):
super().__init__(message)
self.container_statuses = container_statuses

def get_telemetry_props(self) -> Dict[str, Any]:
return self.container_statuses


@contextmanager
def get_docker_client() -> Iterator[docker.DockerClient]:
# Get a reference to the Docker client.
client = None
try:
docker_cli = docker.from_env()
client = docker.from_env()
except docker.errors.DockerException as error:
try:
# Docker Desktop 4.13.0 broke the docker.sock symlink.
# See https://github.com/docker/docker-py/issues/3059.
maybe_sock_path = os.path.expanduser("~/.docker/run/docker.sock")
if os.path.exists(maybe_sock_path):
docker_cli = docker.DockerClient(base_url=f"unix://{maybe_sock_path}")
client = docker.DockerClient(base_url=f"unix://{maybe_sock_path}")
else:
yield None, error
raise error
except docker.errors.DockerException as error:
yield None, error
raise DockerNotRunningError(
"Docker doesn't seem to be running. Did you start it?"
) from error
assert client

if docker_cli is not None:
try:
docker_cli.ping()
except docker.errors.DockerException as error:
yield None, error
else:
try:
yield docker_cli, None
finally:
docker_cli.close()
# Make sure that we can talk to Docker.
try:
client.ping()
except docker.errors.DockerException as error:
raise DockerNotRunningError(
"Unable to talk to Docker. Did you start it?"
) from error

# Yield the client and make sure to close it.
try:
yield client
finally:
client.close()


def memory_in_gb(mem_bytes: int) -> float:
return mem_bytes / (1024 * 1024 * 1000)


def check_local_docker_containers(preflight_only: bool = False) -> List[str]:
issues: List[str] = []
with get_client_with_error() as (client, error):
if error:
issues.append("Docker doesn't seem to be running. Did you start it?")
return issues
def run_quickstart_preflight_checks(client: docker.DockerClient) -> None:
# Check total memory.
# TODO: add option to skip this check.
total_mem_configured = int(client.info()["MemTotal"])
if memory_in_gb(total_mem_configured) < MIN_MEMORY_NEEDED:
raise DockerLowMemoryError(
f"Total Docker memory configured {memory_in_gb(total_mem_configured):.2f}GB is below the minimum threshold {MIN_MEMORY_NEEDED}GB. "
"You can increase the memory allocated to Docker in the Docker settings."
)

# Check total memory.
total_mem_configured = int(client.info()["MemTotal"])
if memory_in_gb(total_mem_configured) < MIN_MEMORY_NEEDED:
issues.append(
f"Total Docker memory configured {memory_in_gb(total_mem_configured):.2f}GB is below the minimum threshold {MIN_MEMORY_NEEDED}GB"
)

if preflight_only:
return issues
class ContainerStatus(enum.Enum):
OK = "is ok"

# For containers that are expected to exit 0.
STILL_RUNNING = "is still running"
EXITED_WITH_FAILURE = "exited with an error"

# For containers that are expected to be running.
DIED = "is not running"
MISSING = "is not present"
STARTING = "is still starting"
UNHEALTHY = "is running by not yet healthy"


@dataclass
class DockerContainerStatus:
name: str
status: ContainerStatus


@dataclass
class QuickstartStatus:
containers: List[DockerContainerStatus]

def errors(self) -> List[str]:
if not self.containers:
return ["quickstart.sh or dev.sh is not running"]

return [
f"{container.name} {container.status.value}"
for container in self.containers
if container.status != ContainerStatus.OK
]

def is_ok(self) -> bool:
return not self.errors()

def to_exception(
self, header: str, footer: Optional[str] = None
) -> QuickstartError:
message = f"{header}\n"
for error in self.errors():
message += f"- {error}\n"
if footer:
message += f"\n{footer}"

return QuickstartError(
message,
{
"containers_all": [container.name for container in self.containers],
"containers_errors": [
container.name
for container in self.containers
if container.status != ContainerStatus.OK
],
**{
f"container_{container.name}": container.status.name
for container in self.containers
},
},
)


def check_docker_quickstart() -> QuickstartStatus:
container_statuses: List[DockerContainerStatus] = []

with get_docker_client() as client:
containers = client.containers.list(
all=True,
filters={
"label": "com.docker.compose.project=datahub",
},
filters=DATAHUB_COMPOSE_PROJECT_FILTER,
)

# Check number of containers.
if len(containers) == 0:
issues.append("quickstart.sh or dev.sh is not running")
else:
existing_containers = {container.name for container in containers}
missing_containers = set(REQUIRED_CONTAINERS) - existing_containers
issues.extend(
f"{missing} container is not present" for missing in missing_containers
)

# Check that the containers are running and healthy.
container: docker.models.containers.Container
for container in containers:
name = container.name
status = ContainerStatus.OK

if container.name not in (
REQUIRED_CONTAINERS + CONTAINERS_TO_CHECK_IF_PRESENT
):
Expand All @@ -127,16 +212,26 @@ def check_local_docker_containers(preflight_only: bool = False) -> List[str]:

if container.name in ENSURE_EXIT_SUCCESS:
if container.status != "exited":
issues.append(f"{container.name} is still running")
status = ContainerStatus.STILL_RUNNING
elif container.attrs["State"]["ExitCode"] != 0:
issues.append(f"{container.name} did not exit cleanly")
status = ContainerStatus.EXITED_WITH_FAILURE

elif container.status != "running":
issues.append(f"{container.name} is not running")
status = ContainerStatus.DIED
elif "Health" in container.attrs["State"]:
if container.attrs["State"]["Health"]["Status"] == "starting":
issues.append(f"{container.name} is still starting")
status = ContainerStatus.STARTING
elif container.attrs["State"]["Health"]["Status"] != "healthy":
issues.append(f"{container.name} is running but not healthy")
status = ContainerStatus.UNHEALTHY

container_statuses.append(DockerContainerStatus(name, status))

# Check for missing containers.
existing_containers = {container.name for container in containers}
missing_containers = set(REQUIRED_CONTAINERS) - existing_containers
for missing in missing_containers:
container_statuses.append(
DockerContainerStatus(missing, ContainerStatus.MISSING)
)

return issues
return QuickstartStatus(container_statuses)
Loading

0 comments on commit 11e2f9c

Please sign in to comment.