Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): add --log-file option and show CLI logs in UI report #7118

Merged
merged 8 commits into from
Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ def get_long_description():
"greenlet",
}

sqllineage_lib = "sqllineage==1.3.6"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice refactor!


aws_common = {
# AWS Python SDK
"boto3",
Expand All @@ -143,7 +145,7 @@ def get_long_description():
# See https://github.com/joshtemple/lkml/issues/73.
"lkml>=1.3.0b5",
"sql-metadata==2.2.2",
"sqllineage==1.3.6",
sqllineage_lib,
"GitPython>2",
}

Expand All @@ -165,7 +167,7 @@ def get_long_description():
"sqlalchemy-redshift",
"psycopg2-binary",
"GeoAlchemy2",
"sqllineage==1.3.6",
sqllineage_lib,
*path_spec_common,
}

Expand Down Expand Up @@ -255,18 +257,18 @@ def get_long_description():
"gql>=3.3.0",
"gql[requests]>=3.3.0",
},
"great-expectations": sql_common | {"sqllineage==1.3.6"},
"great-expectations": sql_common | {sqllineage_lib},
# Source plugins
# PyAthena is pinned with exact version because we use private method in PyAthena
"athena": sql_common | {"PyAthena[SQLAlchemy]==2.4.1"},
"azure-ad": set(),
"bigquery": sql_common
| bigquery_common
| {"sqllineage==1.3.6", "sql_metadata", "sqlalchemy-bigquery>=1.4.1"},
| {sqllineage_lib, "sql_metadata", "sqlalchemy-bigquery>=1.4.1"},
"bigquery-beta": sql_common
| bigquery_common
| {
"sqllineage==1.3.6",
sqllineage_lib,
"sql_metadata",
"sqlalchemy-bigquery>=1.4.1",
}, # deprecated, but keeping the extra for backwards compatibility
Expand Down Expand Up @@ -310,8 +312,8 @@ def get_long_description():
"ldap": {"python-ldap>=2.4"},
"looker": looker_common,
"lookml": looker_common,
"metabase": {"requests", "sqllineage==1.3.6"},
"mode": {"requests", "sqllineage==1.3.6", "tenacity>=8.0.1"},
"metabase": {"requests", sqllineage_lib},
"mode": {"requests", sqllineage_lib, "tenacity>=8.0.1"},
"mongodb": {"pymongo[srv]>=3.11", "packaging"},
"mssql": sql_common | {"sqlalchemy-pytds>=0.3"},
"mssql-odbc": sql_common | {"pyodbc"},
Expand All @@ -325,7 +327,7 @@ def get_long_description():
"presto-on-hive": sql_common
| {"psycopg2-binary", "acryl-pyhive[hive]>=0.6.12", "pymysql>=1.0.2"},
"pulsar": {"requests"},
"redash": {"redash-toolbelt", "sql-metadata", "sqllineage==1.3.6"},
"redash": {"redash-toolbelt", "sql-metadata", sqllineage_lib},
"redshift": sql_common | redshift_common,
"redshift-usage": sql_common | usage_common | redshift_common,
"s3": {*s3_base, *data_lake_profiling},
Expand Down
7 changes: 3 additions & 4 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ def write_gms_config(
# ok to fail on this
previous_config = {}
log.debug(
f"Failed to retrieve config from file {DATAHUB_CONFIG_PATH}. This isn't fatal.",
e,
f"Failed to retrieve config from file {DATAHUB_CONFIG_PATH}: {e}. This isn't fatal."
)
config_dict = {**previous_config, **config.dict()}
else:
Expand Down Expand Up @@ -687,8 +686,6 @@ def get_aspects_for_entity(
aspect_py_class: Optional[Type[Any]] = _get_pydantic_class_from_aspect_name(
aspect_name
)
if aspect_name == "unknown":
print(f"Failed to find aspect_name for class {aspect_name}")

aspect_dict = a["value"]
if not typed:
Expand All @@ -699,6 +696,8 @@ def get_aspects_for_entity(
aspect_map[aspect_name] = aspect_py_class.from_obj(post_json_obj)
except Exception as e:
log.error(f"Error on {json.dumps(aspect_dict)}", e)
else:
log.debug(f"Failed to find class for aspect {aspect_name}")

if aspects:
return {k: v for (k, v) in aspect_map.items() if k in aspects}
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/delete_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def delete(
remove_references: bool = False

if (not force) and references_count > 0:
print(
click.echo(
f"This urn was referenced in {references_count} other aspects across your metadata graph:"
)
click.echo(
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,5 +393,5 @@ def rollback(
writer.writerow([row.get("urn")])

except IOError as e:
print(e)
logger.exception(f"Unable to save rollback failure report: {e}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you

sys.exit(f"Unable to write reports to {report_dir}")
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/cli/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def dataplatform2instance_func(
delete_cli._delete_one_urn(src_entity_urn, soft=not hard, run_id=run_id)
migration_report.on_entity_migrated(src_entity_urn, "status") # type: ignore

print(f"{migration_report}")
click.echo(f"{migration_report}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omg thank you

migrate_containers(
dry_run=dry_run,
env=env,
Expand Down Expand Up @@ -372,7 +372,7 @@ def migrate_containers(
delete_cli._delete_one_urn(src_urn, soft=not hard, run_id=run_id)
migration_report.on_entity_migrated(src_urn, "status") # type: ignore

print(f"{migration_report}")
click.echo(f"{migration_report}")


def get_containers_for_migration(env: str) -> List[Any]:
Expand Down
80 changes: 30 additions & 50 deletions metadata-ingestion/src/datahub/entrypoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import platform
import sys
from typing import Optional

import click
import stackprinter
Expand All @@ -25,21 +26,11 @@
from datahub.cli.telemetry import telemetry as telemetry_cli
from datahub.cli.timeline_cli import timeline
from datahub.telemetry import telemetry
from datahub.utilities.logging_manager import configure_logging
from datahub.utilities.server_config_util import get_gms_config

logger = logging.getLogger(__name__)

# Configure some loggers.
logging.getLogger("urllib3").setLevel(logging.ERROR)
logging.getLogger("snowflake").setLevel(level=logging.WARNING)
# logging.getLogger("botocore").setLevel(logging.INFO)
# logging.getLogger("google").setLevel(logging.INFO)

# Configure logger.
BASE_LOGGING_FORMAT = (
"[%(asctime)s] %(levelname)-8s {%(name)s:%(lineno)d} - %(message)s"
)
logging.basicConfig(format=BASE_LOGGING_FORMAT)
_logging_configured = None

MAX_CONTENT_WIDTH = 120

Expand All @@ -58,6 +49,12 @@
default=False,
help="Enable debug logging.",
)
@click.option(
"--log-file",
type=click.Path(dir_okay=False),
default=None,
help="Enable debug logging.",
)
@click.option(
"--debug-vars/--no-debug-vars",
type=bool,
Expand All @@ -79,35 +76,31 @@
)
@click.pass_context
def datahub(
ctx: click.Context, debug: bool, debug_vars: bool, detect_memory_leaks: bool
ctx: click.Context,
debug: bool,
log_file: Optional[str],
debug_vars: bool,
detect_memory_leaks: bool,
) -> None:
if debug_vars:
# debug_vars implies debug. This option isn't actually used here, but instead
# read directly from the command line arguments in the main entrypoint.
debug = True

# Insulate 'datahub' and all child loggers from inadvertent changes to the
# root logger by the external site packages that we import.
# (Eg: https://github.com/reata/sqllineage/commit/2df027c77ea0a8ea4909e471dcd1ecbf4b8aeb2f#diff-30685ea717322cd1e79c33ed8d37903eea388e1750aa00833c33c0c5b89448b3R11
# changes the root logger's handler level to WARNING, causing any message below
# WARNING level to be dropped after this module is imported, irrespective
# of the logger's logging level! The lookml source was affected by this).

# 1. Create 'datahub' parent logger.
datahub_logger = logging.getLogger("datahub")
# 2. Setup the stream handler with formatter.
stream_handler = logging.StreamHandler()
formatter = logging.Formatter(BASE_LOGGING_FORMAT)
stream_handler.setFormatter(formatter)
datahub_logger.addHandler(stream_handler)
# 3. Turn off propagation to the root handler.
datahub_logger.propagate = False
# 4. Adjust log-levels.
if debug or get_boolean_env_variable("DATAHUB_DEBUG", False):
logging.getLogger().setLevel(logging.INFO)
datahub_logger.setLevel(logging.DEBUG)
logging.getLogger("datahub_classify").setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.WARNING)
datahub_logger.setLevel(logging.INFO)
debug = debug or get_boolean_env_variable("DATAHUB_DEBUG", False)

# Note that we're purposely leaking the context manager here.
# Technically we should wrap this with ctx.with_resource(). However, we have
# some important error logging in the main() wrapper function that we don't
# want to miss. If we wrap this with ctx.with_resource(), then click would
# clean it up before those error handlers are processed.
# So why is this ok? Because we're leaking a context manager, this will
# still get cleaned up automatically when the memory is reclaimed, which is
# worse-case at program exit.
global _logging_configured
_logging_configured = None # see if we can force python to GC this
_logging_configured = configure_logging(debug=debug, log_file=log_file)
_logging_configured.__enter__()

# Setup the context for the memory_leak_detector decorator.
ctx.ensure_object(dict)
Expand Down Expand Up @@ -232,16 +225,3 @@ def main(**kwargs):
)
logger.debug(f"GMS config {get_gms_config()}")
sys.exit(1)


def _get_pretty_chained_message(exc: Exception) -> str:
pretty_msg = f"{exc.__class__.__name__} {exc}"
tmp_exc = exc.__cause__
indent = "\n\t\t"
while tmp_exc:
pretty_msg = (
f"{pretty_msg} due to {indent}{tmp_exc.__class__.__name__}{tmp_exc}"
)
tmp_exc = tmp_exc.__cause__
indent += "\t"
return pretty_msg
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
ExecutionRequestInputClass,
ExecutionRequestResultClass,
ExecutionRequestSourceClass,
StructuredExecutionReportClass,
_Aspect,
)
from datahub.utilities.logging_manager import get_log_buffer
from datahub.utilities.urns.urn import Urn

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -143,7 +145,6 @@ def __init__(self, sink: Sink, report_recipe: bool, ctx: PipelineContext) -> Non
# Emit the dataHubIngestionSourceInfo aspect
self._emit_aspect(
entity_urn=self.ingestion_source_urn,
aspect_name="dataHubIngestionSourceInfo",
aspect_value=source_info_aspect,
)

Expand All @@ -154,17 +155,12 @@ def _get_recipe_to_report(self, ctx: PipelineContext) -> str:
else:
return json.dumps(redact_raw_config(ctx.pipeline_config._raw_dict))

def _emit_aspect(
self, entity_urn: Urn, aspect_name: str, aspect_value: _Aspect
) -> None:
def _emit_aspect(self, entity_urn: Urn, aspect_value: _Aspect) -> None:
self.sink.write_record_async(
RecordEnvelope(
record=MetadataChangeProposalWrapper(
entityType=entity_urn.get_type(),
entityUrn=str(entity_urn),
aspectName=aspect_name,
aspect=aspect_value,
changeType="UPSERT",
),
metadata={},
),
Expand All @@ -190,7 +186,6 @@ def on_start(self, ctx: PipelineContext) -> None:
# Emit the dataHubExecutionRequestInput aspect
self._emit_aspect(
entity_urn=self.execution_request_input_urn,
aspect_name="dataHubExecutionRequestInput",
aspect_value=execution_input_aspect,
)

Expand All @@ -200,18 +195,29 @@ def on_completion(
report: Dict[str, Any],
ctx: PipelineContext,
) -> None:
# Prepare a nicely formatted summary
structured_report_str = json.dumps(report, indent=2)
summary = f"~~~~ Ingestion Report ~~~~\n{structured_report_str}\n\n"
summary += "~~~~ Ingestion Logs ~~~~\n"
summary += get_log_buffer().format_lines()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah so here we leverage the in memory buffer. nice


# Construct the dataHubExecutionRequestResult aspect
structured_report = StructuredExecutionReportClass(
type="CLI_INGEST",
serializedValue=structured_report_str,
contentType="application/json",
)
execution_result_aspect = ExecutionRequestResultClass(
status=status,
startTimeMs=self.start_time_ms,
durationMs=self.get_cur_time_in_ms() - self.start_time_ms,
report=json.dumps(report, indent=2),
report=summary,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! So in this case, we are now sending the structured report to the correct field. From here, we can continue to add to that report and get it into our result aspect

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Sets ourselves up for much richer reports)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup exactly - this way the structured report shows up in the correct field for both managed ingestion and cli ingestion

note that the structured report field is basically a string right now - we may need to rethink that in the future

structuredReport=structured_report,
)

# Emit the dataHubExecutionRequestResult aspect
self._emit_aspect(
entity_urn=self.execution_request_input_urn,
aspect_name="dataHubExecutionRequestResult",
aspect_value=execution_result_aspect,
)
self.sink.close()
Loading