Skip to content

Commit

Permalink
Fix #1055 (#1056)
Browse files Browse the repository at this point in the history
  • Loading branch information
windoze authored Feb 10, 2023
1 parent a7913f1 commit 4cbe35e
Showing 1 changed file with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,22 +107,23 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH

val dataLoaderHandlers: List[DataLoaderHandler] = dataPathHandlers.map(_.dataLoaderHandler)

// Only file-based source has real "path", others are just single dataset
val (adjustedObsTimeRange, dataSourcePath) = if (factDataSource.location.isFileBasedLocation()) {
val (timeInterval, updatedFactDataSource) = if (factDataSource.location.isFileBasedLocation()) {
val pathChecker = PathChecker(ss, dataLoaderHandlers)
val pathAnalyzer = new TimeBasedHdfsPathAnalyzer(pathChecker, dataLoaderHandlers)
val pathInfo = pathAnalyzer.analyze(factDataSource.path)
if (pathInfo.dateTimeResolution == DateTimeResolution.DAILY) {
(obsTimeRange.adjustWithDateTimeResolution(DateTimeResolution.DAILY), pathInfo.basePath)
} else (obsTimeRange, pathInfo.basePath)
val adjustedObsTimeRange = if (pathInfo.dateTimeResolution == DateTimeResolution.DAILY) {
obsTimeRange.adjustWithDateTimeResolution(DateTimeResolution.DAILY)
} else {
obsTimeRange
}
(OfflineDateTimeUtils.getFactDataTimeRange(adjustedObsTimeRange, window, timeDelays),
DataSource(pathInfo.basePath, factDataSource.sourceType, factDataSource.timeWindowParams,
factDataSource.timePartitionPattern, factDataSource.postfixPath))
} else {
(obsTimeRange, factDataSource.path)
// Path and time range adjustments cannot be applied to non-file-based sources, keep them as-is
(obsTimeRange, factDataSource)
}
// Copy the pathInfo's path into the datasource path as it adds the daily/hourly keyword if it is missing from the path
val updatedFactDataSource = DataSource(dataSourcePath, factDataSource.sourceType, factDataSource.timeWindowParams,
factDataSource.timePartitionPattern, factDataSource.postfixPath)

val timeInterval = OfflineDateTimeUtils.getFactDataTimeRange(adjustedObsTimeRange, window, timeDelays)
val needCreateTimestampColumn = SlidingWindowFeatureUtils.needCreateTimestampColumnFromPartition(factDataSource)
val shouldSkipFeature = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean

Expand Down

0 comments on commit 4cbe35e

Please sign in to comment.