Skip to content

Commit

Permalink
Merge branch 'master' into gabe--addingAdvancedSearchFiltersComponent
Browse files Browse the repository at this point in the history
  • Loading branch information
jjoyce0510 authored Sep 27, 2022
2 parents fc95022 + b45d5eb commit d6d745d
Show file tree
Hide file tree
Showing 20 changed files with 509 additions and 322 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.parquet.SemanticVersion;
import org.apache.maven.artifact.versioning.ComparableVersion;

import static com.linkedin.datahub.graphql.types.timeline.utils.TimelineUtils.*;

Expand All @@ -45,12 +45,12 @@ public static GetSchemaBlameResult map(List<ChangeTransaction> changeTransaction
truncateSemanticVersion(changeTransactions.get(changeTransactions.size() - 1).getSemVer());

String semanticVersionFilterString = versionCutoff == null ? latestSemanticVersionString : versionCutoff;
Optional<SemanticVersion> semanticVersionFilterOptional = createSemanticVersion(semanticVersionFilterString);
Optional<ComparableVersion> semanticVersionFilterOptional = createSemanticVersion(semanticVersionFilterString);
if (!semanticVersionFilterOptional.isPresent()) {
return result;
}

SemanticVersion semanticVersionFilter = semanticVersionFilterOptional.get();
ComparableVersion semanticVersionFilter = semanticVersionFilterOptional.get();

List<ChangeTransaction> reversedChangeTransactions = changeTransactions.stream()
.map(TimelineUtils::semanticVersionChangeTransactionPair)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,24 @@
import com.linkedin.util.Pair;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.parquet.SemanticVersion;
import org.apache.maven.artifact.versioning.ComparableVersion;


@Slf4j
public class TimelineUtils {

public static Optional<Pair<SemanticVersion, ChangeTransaction>> semanticVersionChangeTransactionPair(
public static Optional<Pair<ComparableVersion, ChangeTransaction>> semanticVersionChangeTransactionPair(
ChangeTransaction changeTransaction) {
Optional<SemanticVersion> semanticVersion = createSemanticVersion(changeTransaction.getSemVer());
Optional<ComparableVersion> semanticVersion = createSemanticVersion(changeTransaction.getSemVer());
return semanticVersion.map(version -> Pair.of(version, changeTransaction));
}

public static Optional<SemanticVersion> createSemanticVersion(String semanticVersionString) {
public static Optional<ComparableVersion> createSemanticVersion(String semanticVersionString) {
String truncatedSemanticVersion = truncateSemanticVersion(semanticVersionString);
try {
SemanticVersion semanticVersion = SemanticVersion.parse(truncatedSemanticVersion);
ComparableVersion semanticVersion = new ComparableVersion(truncatedSemanticVersion);
return Optional.of(semanticVersion);
} catch (SemanticVersion.SemanticVersionParseException e) {
} catch (Exception e) {
return Optional.empty();
}
}
Expand Down
13 changes: 13 additions & 0 deletions docker/elasticsearch-setup/create-indices.sh
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,22 @@ function create_datahub_usage_event_aws_elasticsearch() {
}

if [[ $DATAHUB_ANALYTICS_ENABLED == true ]]; then
echo -e "\n datahub_analytics_enabled: $DATAHUB_ANALYTICS_ENABLED"
if [[ $USE_AWS_ELASTICSEARCH == false ]]; then
create_datahub_usage_event_datastream || exit 1
else
create_datahub_usage_event_aws_elasticsearch || exit 1
fi
else
echo -e "\ndatahub_analytics_enabled: $DATAHUB_ANALYTICS_ENABLED"
DATAHUB_USAGE_EVENT_INDEX_RESPONSE_CODE=$(curl -o /dev/null -s -w "%{http_code}" --header "$ELASTICSEARCH_AUTH_HEADER" "${ELASTICSEARCH_INSECURE}$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT/cat/indices/${PREFIX}datahub_usage_event")
if [ $DATAHUB_USAGE_EVENT_INDEX_RESPONSE_CODE -eq 404 ]
then
echo -e "\ncreating ${PREFIX}datahub_usage_event"
curl -XPUT --header "$ELASTICSEARCH_AUTH_HEADER" "${ELASTICSEARCH_INSECURE}$ELASTICSEARCH_PROTOCOL://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT/${PREFIX}datahub_usage_event"
elif [ $DATAHUB_USAGE_EVENT_INDEX_RESPONSE_CODE -eq 200 ]; then
echo -e "\n${PREFIX}datahub_usage_event exists"
elif [ $DATAHUB_USAGE_EVENT_INDEX_RESPONSE_CODE -eq 403 ]; then
echo -e "Forbidden so exiting"
fi
fi
99 changes: 98 additions & 1 deletion metadata-ingestion/docs/sources/s3/s3.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,102 @@

### Path Spec
### Path Specs

**Example - Dataset per file**

Bucket structure:

```
test-s3-bucket
├── employees.csv
└── food_items.csv
```

Path specs config
```
path_specs:
- include: s3://test-s3-bucket/*.csv
```

**Example - Datasets with partitions**

Bucket structure:
```
test-s3-bucket
├── orders
│   └── year=2022
│   └── month=2
│   ├── 1.parquet
│   └── 2.parquet
└── returns
└── year=2021
└── month=2
└── 1.parquet
```

Path specs config:
```
path_specs:
- include: s3://test-s3-bucket/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet
```

**Example - Datasets with partition and exclude**

Bucket structure:
```
test-s3-bucket
├── orders
│   └── year=2022
│   └── month=2
│   ├── 1.parquet
│   └── 2.parquet
└── tmp_orders
└── year=2021
└── month=2
└── 1.parquet
```

Path specs config:
```
path_specs:
- include: s3://test-s3-bucket/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet
exclude:
- **/tmp_orders/**
```
**Example - Datasets of mixed nature**

Bucket structure:
```
test-s3-bucket
├── customers
│   ├── part1.json
│   ├── part2.json
│   ├── part3.json
│   └── part4.json
├── employees.csv
├── food_items.csv
├── tmp_10101000.csv
└── orders
   └── year=2022
    └── month=2
   ├── 1.parquet
   ├── 2.parquet
   └── 3.parquet
```

Path specs config:
```
path_specs:
- include: s3://test-s3-bucket/*.csv
exclude:
- **/tmp_10101000.csv
- include: s3://test-s3-bucket/{table}/*.json
- include: s3://test-s3-bucket/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet
```

**Valid path_specs.include**

Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ def get_long_description():
"flake8>=3.8.3",
"flake8-tidy-imports>=4.3.0",
"isort>=5.7.0",
"mypy>=0.950",
"mypy>=0.981",
# pydantic 1.8.2 is incompatible with mypy 0.910.
# See https://github.com/samuelcolvin/pydantic/pull/3175#issuecomment-995382910.
# Restricting top version to <1.10 until we can fix our types.
Expand Down
23 changes: 17 additions & 6 deletions metadata-ingestion/src/datahub/configuration/config_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import pathlib
import re
import sys
from typing import Any, Dict, Union
import unittest.mock
from typing import Any, Dict, Set, Union

from expandvars import UnboundVariable, expandvars

Expand Down Expand Up @@ -51,6 +52,19 @@ def resolve_env_variables(config: dict) -> dict:
return new_dict


def list_referenced_env_variables(config: dict) -> Set[str]:
# This is a bit of a hack, but expandvars does a bunch of escaping
# and other logic that we don't want to duplicate here.

with unittest.mock.patch("expandvars.getenv") as mock_getenv:
mock_getenv.return_value = "mocked_value"

resolve_env_variables(config)

calls = mock_getenv.mock_calls
return set([call[1][0] for call in calls])


def load_config_file(
config_file: Union[pathlib.Path, str],
squirrel_original_config: bool = False,
Expand All @@ -63,8 +77,7 @@ def load_config_file(
config_mech = YamlConfigurationMechanism()
raw_config_file = sys.stdin.read()
else:
if isinstance(config_file, str):
config_file = pathlib.Path(config_file)
config_file = pathlib.Path(config_file)
if not config_file.is_file():
raise ConfigurationError(f"Cannot open config file {config_file}")

Expand All @@ -74,9 +87,7 @@ def load_config_file(
config_mech = TomlConfigurationMechanism()
else:
raise ConfigurationError(
"Only .toml and .yml are supported. Cannot process file type {}".format(
config_file.suffix
)
f"Only .toml and .yml are supported. Cannot process file type {config_file.suffix}"
)

raw_config_file = config_file.read_text()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from looker_sdk.sdk.api31.models import Dashboard, LookWithQuery

import datahub.emitter.mce_builder as builder
from datahub.emitter.mce_builder import Aspect
from datahub.emitter.mce_builder import Aspect, AspectAbstract
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.source.looker import looker_common
from datahub.ingestion.source.looker.looker_common import (
Expand Down Expand Up @@ -192,11 +192,13 @@ def get_filter(self) -> Dict[ViewField, str]:
pass

@abstractmethod
def to_entity_absolute_stat_aspect(self, looker_object: ModelForUsage) -> Aspect:
def to_entity_absolute_stat_aspect(
self, looker_object: ModelForUsage
) -> AspectAbstract:
pass

@abstractmethod
def to_entity_timeseries_stat_aspect(self, row: Dict) -> Aspect:
def to_entity_timeseries_stat_aspect(self, row: Dict) -> AspectAbstract:
pass

@abstractmethod
Expand Down Expand Up @@ -242,9 +244,9 @@ def _get_user_identifier(self, row: Dict) -> int:

def _process_entity_timeseries_rows(
self, rows: List[Dict]
) -> Dict[Tuple[str, str], Aspect]:
) -> Dict[Tuple[str, str], AspectAbstract]:
# Convert Looker entity stat i.e. rows to DataHub stat aspect
entity_stat_aspect: Dict[Tuple[str, str], Aspect] = {}
entity_stat_aspect: Dict[Tuple[str, str], AspectAbstract] = {}

for row in rows:
logger.debug(row)
Expand All @@ -254,8 +256,8 @@ def _process_entity_timeseries_rows(

return entity_stat_aspect

def _process_absolute_aspect(self) -> List[Tuple[ModelForUsage, Aspect]]:
aspects: List[Tuple[ModelForUsage, Aspect]] = []
def _process_absolute_aspect(self) -> List[Tuple[ModelForUsage, AspectAbstract]]:
aspects: List[Tuple[ModelForUsage, AspectAbstract]] = []
for looker_object in self.looker_models:
aspects.append(
(looker_object, self.to_entity_absolute_stat_aspect(looker_object))
Expand Down Expand Up @@ -463,20 +465,19 @@ def _get_mcp_attributes(self, model: ModelForUsage) -> Dict:
"aspectName": "dashboardUsageStatistics",
}

def to_entity_absolute_stat_aspect(self, looker_object: ModelForUsage) -> Aspect:
def to_entity_absolute_stat_aspect(
self, looker_object: ModelForUsage
) -> DashboardUsageStatisticsClass:
looker_dashboard: LookerDashboardForUsage = cast(
LookerDashboardForUsage, looker_object
)
if looker_dashboard.view_count:
self.report.dashboards_with_activity.add(str(looker_dashboard.id))
return cast(
Aspect,
DashboardUsageStatisticsClass(
timestampMillis=round(datetime.datetime.now().timestamp() * 1000),
favoritesCount=looker_dashboard.favorite_count,
viewsCount=looker_dashboard.view_count,
lastViewedAt=looker_dashboard.last_viewed_at,
),
return DashboardUsageStatisticsClass(
timestampMillis=round(datetime.datetime.now().timestamp() * 1000),
favoritesCount=looker_dashboard.favorite_count,
viewsCount=looker_dashboard.view_count,
lastViewedAt=looker_dashboard.last_viewed_at,
)

def get_entity_timeseries_query(self) -> LookerQuery:
Expand All @@ -485,20 +486,19 @@ def get_entity_timeseries_query(self) -> LookerQuery:
def get_entity_user_timeseries_query(self) -> LookerQuery:
return query_collection[QueryId.DASHBOARD_PER_USER_PER_DAY_USAGE_STAT]

def to_entity_timeseries_stat_aspect(self, row: dict) -> Aspect:
def to_entity_timeseries_stat_aspect(
self, row: dict
) -> DashboardUsageStatisticsClass:
self.report.dashboards_with_activity.add(
row[HistoryViewField.HISTORY_DASHBOARD_ID]
)
return cast(
Aspect,
DashboardUsageStatisticsClass(
timestampMillis=self._round_time(
row[HistoryViewField.HISTORY_CREATED_DATE]
),
eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.DAY),
uniqueUserCount=row[HistoryViewField.HISTORY_DASHBOARD_USER],
executionsCount=row[HistoryViewField.HISTORY_DASHBOARD_RUN_COUNT],
return DashboardUsageStatisticsClass(
timestampMillis=self._round_time(
row[HistoryViewField.HISTORY_CREATED_DATE]
),
eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.DAY),
uniqueUserCount=row[HistoryViewField.HISTORY_DASHBOARD_USER],
executionsCount=row[HistoryViewField.HISTORY_DASHBOARD_RUN_COUNT],
)

def append_user_stat(
Expand Down Expand Up @@ -584,18 +584,17 @@ def _get_mcp_attributes(self, model: ModelForUsage) -> Dict:
"aspectName": "chartUsageStatistics",
}

def to_entity_absolute_stat_aspect(self, looker_object: ModelForUsage) -> Aspect:
def to_entity_absolute_stat_aspect(
self, looker_object: ModelForUsage
) -> ChartUsageStatisticsClass:
looker_look: LookerChartForUsage = cast(LookerChartForUsage, looker_object)
assert looker_look.id
if looker_look.view_count:
self.report.charts_with_activity.add(looker_look.id)

return cast(
Aspect,
ChartUsageStatisticsClass(
timestampMillis=round(datetime.datetime.now().timestamp() * 1000),
viewsCount=looker_look.view_count,
),
return ChartUsageStatisticsClass(
timestampMillis=round(datetime.datetime.now().timestamp() * 1000),
viewsCount=looker_look.view_count,
)

def get_entity_timeseries_query(self) -> LookerQuery:
Expand All @@ -604,18 +603,15 @@ def get_entity_timeseries_query(self) -> LookerQuery:
def get_entity_user_timeseries_query(self) -> LookerQuery:
return query_collection[QueryId.LOOK_PER_USER_PER_DAY_USAGE_STAT]

def to_entity_timeseries_stat_aspect(self, row: dict) -> Aspect:
def to_entity_timeseries_stat_aspect(self, row: dict) -> ChartUsageStatisticsClass:
self.report.charts_with_activity.add(str(row[LookViewField.LOOK_ID]))

return cast(
Aspect,
ChartUsageStatisticsClass(
timestampMillis=self._round_time(
row[HistoryViewField.HISTORY_CREATED_DATE]
),
eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.DAY),
viewsCount=row[HistoryViewField.HISTORY_COUNT],
return ChartUsageStatisticsClass(
timestampMillis=self._round_time(
row[HistoryViewField.HISTORY_CREATED_DATE]
),
eventGranularity=TimeWindowSizeClass(unit=CalendarIntervalClass.DAY),
viewsCount=row[HistoryViewField.HISTORY_COUNT],
)

def append_user_stat(
Expand Down
Loading

0 comments on commit d6d745d

Please sign in to comment.