Skip to content

Commit

Permalink
Enhance localize API with tolerance after/before and deactivated inte…
Browse files Browse the repository at this point in the history
…rpolation by default; fix missing elevation data when rest of gps data is present.
  • Loading branch information
Claudius Röhl committed Jan 23, 2025
1 parent a9fa19f commit 6f0ea0b
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 56 deletions.
52 changes: 50 additions & 2 deletions modules/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,37 @@
help="Time offset for media files. E.g. if the cameras time is 10 seconds in the future, you can correct it by writing -o=-10s. General format: -o=1h30m15s.",
type=str,
dest="localize_time_offset_mediafile",
metavar="DURATION",
)

localizeparser.add_argument(
"-t",
"--gps_time_tolerance",
help="Time tolerance for GPS data. General format: -t=1h30m15s. If a mediafiles timestamp is within this tolerance of a GPS data timestamp, the GPS data is taken as source of truth for the mediafile.",
help="Time tolerance for GPS data. Overwrites both after and before variants of this command. General format: -t=1h30m15s. If a mediafiles timestamp is within this tolerance of a GPS data timestamp, the GPS data is taken as source of truth for the mediafile.",
type=str,
dest="localize_gps_time_tolerance",
default=None,
metavar="DURATION",
)

localizeparser.add_argument(
"-a",
"--gps_time_tolerance_after",
help="Time tolerance for GPS data after the time of the mediafile. General format: -t=1h30m15s. If a mediafiles timestamp is within this tolerance of a GPS data timestamp, the GPS data is taken as source of truth for the mediafile.",
type=str,
dest="localize_gps_time_tolerance_after",
default="10m",
metavar="DURATION",
)

localizeparser.add_argument(
"-b",
"--gps_time_tolerance_before",
help="Time tolerance for GPS data before the time of the mediafile. General format: -t=1h30m15s. If a mediafiles timestamp is within this tolerance of a GPS data timestamp, the GPS data is taken as source of truth for the mediafile.",
type=str,
dest="localize_gps_time_tolerance_before",
default="10m",
metavar="DURATION",
)

localizeparser.add_argument(
Expand All @@ -211,13 +233,15 @@
type=str,
dest="localize_timezone",
default="Europe/Berlin",
metavar="TIMEZONE",
)

localizeparser.add_argument(
"--force_gps_data",
help="Force GPS data. If set, all files get assigned this gps data, independently of gpx information available. Format: --force-gps-data -12,34.45,4556, interpreted as latitude,longitude,height.",
type=str,
dest="localize_force_gps_data",
metavar="LAT,LON,ELEV",
)

localizeparser.add_argument(
Expand All @@ -229,6 +253,15 @@
default=False,
)

localizeparser.add_argument(
"-p",
"--interpolate_linearly",
help="Do linear interpolation for gps data if there are missing gps data points and there are two points before and after within the time tolerance specified.",
action="store_true",
dest="localize_interpolate_linerarly",
default=False,
)


aggregateparser.add_argument(
"-j",
Expand Down Expand Up @@ -364,13 +397,28 @@ def main():
inp.time_offset_mediafile = parse_timedelta(
args.localize_time_offset_mediafile
)
if args.localize_gps_time_tolerance_after is not None:
inp.gps_time_tolerance_after = parse_timedelta(
args.localize_gps_time_tolerance_after
)
if args.localize_gps_time_tolerance_before is not None:
inp.gps_time_tolerance_before = parse_timedelta(
args.localize_gps_time_tolerance_before
)
if args.localize_gps_time_tolerance is not None:
inp.gps_time_tolerance = parse_timedelta(args.localize_gps_time_tolerance)
inp.gps_time_tolerance_before = parse_timedelta(
args.localize_gps_time_tolerance
)
inp.gps_time_tolerance_after = parse_timedelta(
args.localize_gps_time_tolerance
)
if args.localize_force_gps_data is not None:
inp.force_gps_data = GpsData.fromString(args.localize_force_gps_data)
if args.localize_suppress_map_open:
inp.suppress_map_open = True

inp.interpolate_linearly = args.localize_interpolate_linerarly

mow.localize(localizerInput=inp)
elif should_execute_stage("aggregate", args):
mow.aggregate(jpgIsSingleSourceOfTruth=args.aggregate_jpgsinglesourceoftruth)
Expand Down
115 changes: 74 additions & 41 deletions modules/general/medialocalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ def __str__(self):
@dataclass
class BaseLocalizerInput:
time_offset_mediafile: datetime.timedelta = datetime.timedelta(seconds=0)
gps_time_tolerance: datetime.timedelta = datetime.timedelta(seconds=60)
gps_time_tolerance_before: datetime.timedelta = datetime.timedelta(seconds=60)
gps_time_tolerance_after: datetime.timedelta = datetime.timedelta(seconds=60)
interpolate_linearly: bool = False
mediafile_timezone: str = "Europe/Berlin"
force_gps_data: GpsData = None
transition_even_if_no_gps_data: bool = False
Expand Down Expand Up @@ -86,7 +88,9 @@ class MediaLocalizer(MediaTransitioner):
def __init__(self, input: LocalizerInput):
input.mediaFileFactory = createAnyValidMediaFile
super().__init__(input)
self.gps_time_tolerance = input.gps_time_tolerance
self.gps_time_tolerance_before = input.gps_time_tolerance_before
self.gps_time_tolerance_after = input.gps_time_tolerance_after
self.interpolate_linearly = input.interpolate_linearly
self.time_offset_mediafile = input.time_offset_mediafile
self.mediafile_timezone = input.mediafile_timezone
self.force_gps_data = input.force_gps_data
Expand All @@ -99,7 +103,8 @@ def __init__(self, input: LocalizerInput):
)
sys.exit(1)

self.positions = self.getAllPositionsDataframe()
if self.force_gps_data is None:
self.positions = self.getAllPositionsDataframe()

def getTasks(self) -> list[TransitionTask]:
out = []
Expand All @@ -112,33 +117,34 @@ def getTasks(self) -> list[TransitionTask]:
metaTags=self.force_gps_data.getGPSMetaTagsForWriting(),
)
)
else:
mediafile_time = extractDatetimeFromFileName(mediafile.pathnoext)
gps_data = self.getGpsDataForTime(mediafile_time)

if gps_data is None:
if self.transition_even_if_no_gps_data:
out.append(TransitionTask(index))
else:
out.append(
TransitionTask(
index,
skip=True,
skipReason="Could not localize because of missing GPS data.",
)
)
self.print_debug(
f"Could not find GPS data for file {Path(mediafile.pathnoext).relative_to(self.src)}, with time {mediafile_time} which was corrected to {self.getNormalizedMediaFileTime(mediafile_time)}"
)
continue

mediafile_time = extractDatetimeFromFileName(mediafile.pathnoext)
gps_data = self.getGpsDataForTime(mediafile_time)

if gps_data is None:
if self.transition_even_if_no_gps_data:
out.append(TransitionTask(index))
else:
self.print_debug(
f"Found GPS data for {os.path.basename(mediafile.pathnoext)} : {gps_data}. File has time {mediafile_time}, which was corrected to {self.getNormalizedMediaFileTime(mediafile_time)}"
)
out.append(
TransitionTask(
index, metaTags=gps_data.getGPSMetaTagsForWriting()
index,
skip=True,
skipReason="Could not localize because of missing GPS data.",
)
)
self.print_debug(
f"Could not find GPS data for file {Path(mediafile.pathnoext).relative_to(self.src)}, with time {mediafile_time} which was corrected to {self.getNormalizedMediaFileTime(mediafile_time)}"
)
else:
self.print_debug(
f"Found GPS data for {os.path.basename(mediafile.pathnoext)} : {gps_data}. File has time {mediafile_time}, which was corrected to {self.getNormalizedMediaFileTime(mediafile_time)}"
)
out.append(
TransitionTask(
index, metaTags=gps_data.getGPSMetaTagsForWriting()
)
)
except Exception as e:
out.append(
TransitionTask(
Expand Down Expand Up @@ -260,34 +266,58 @@ def getGpsDataForTime(
mediafile_time_in_gps_time = self.getNormalizedMediaFileTime(mediafile_time)

before_position, after_position = self.getBeforeAfterGpsData(
self.gps_time_tolerance, mediafile_time_in_gps_time
self.gps_time_tolerance_before,
self.gps_time_tolerance_after,
mediafile_time_in_gps_time,
)

if before_position is not None and after_position is not None:
self.assureElevationExists(before_position, after_position)

if (
self.interpolate_linearly
and before_position is not None
and after_position is not None
):
return self.getInterpolatedGpsData(
mediafile_time_in_gps_time, before_position, after_position
)
elif before_position is not None:

time_diff_before = (
mediafile_time_in_gps_time - before_position.time
if before_position is not None
else datetime.timedelta.max
)
time_diff_after = (
after_position.time - mediafile_time_in_gps_time
if after_position is not None
else datetime.timedelta.max
)

if before_position is not None and time_diff_before <= time_diff_after:
return before_position
elif after_position is not None:
elif after_position is not None and time_diff_after <= time_diff_before:
return after_position
else:
return None

def assureElevationExists(self, before_position, after_position):
if after_position and before_position:
if after_position.elev is None and before_position.elev is not None:
after_position.elev = before_position.elev
elif before_position.elev is None and after_position.elev is not None:
before_position.elev = after_position.elev
else:
if after_position and after_position.elev is None:
after_position.elev = 0
if before_position and before_position.elev is None:
before_position.elev = 0

def getInterpolatedGpsData(
self, mediafile_time_in_gps_time, before: GpsData, after: GpsData
) -> GpsData:
if after.time == before.time:
return before

if before.elev is None and after.elev is not None:
before.elev = after.elev
elif after.elev is None and before.elev is not None:
after.elev = before.elev
elif after.elev is None and before.elev is None:
before.elev = 0
after.elev = 0

ratio = (mediafile_time_in_gps_time - before.time).total_seconds() / (
after.time - before.time
).total_seconds()
Expand All @@ -313,11 +343,14 @@ def getNormalizedMediaFileTime(self, mediafile_time) -> datetime.datetime:
return mediafile_time_in_gps_time

def getBeforeAfterGpsData(
self, gps_time_tolerance, mediafile_time_in_gps_time
) -> tuple[pl.DataFrame, pl.DataFrame]:
self,
gps_time_tolerance_before: datetime.datetime,
gps_time_tolerance_after: datetime.datetime,
mediafile_time_in_gps_time,
) -> tuple[GpsData, GpsData]:
before = self.positions.filter(
pl.col("time") < mediafile_time_in_gps_time,
pl.col("time") >= mediafile_time_in_gps_time - gps_time_tolerance,
pl.col("time") >= mediafile_time_in_gps_time - gps_time_tolerance_before,
).tail(1)

if len(before) > 0:
Expand All @@ -332,7 +365,7 @@ def getBeforeAfterGpsData(

after = self.positions.filter(
pl.col("time") > mediafile_time_in_gps_time,
pl.col("time") <= mediafile_time_in_gps_time + gps_time_tolerance,
pl.col("time") <= mediafile_time_in_gps_time + gps_time_tolerance_after,
).head(1)

if len(after) > 0:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "mow"
version = "1.0.0"
version = "1.1.0"
description = "Mow - a media organization workflow tool"
requires-python = ">=3.12"
dependencies = [
Expand Down
Loading

0 comments on commit 6f0ea0b

Please sign in to comment.