Skip to content

Commit

Permalink
Runtime change to timestamp/duration
Browse files Browse the repository at this point in the history
  • Loading branch information
chadrik committed Oct 27, 2019
1 parent 01d55e4 commit 0791ebb
Showing 3 changed files with 31 additions and 36 deletions.
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/transforms/combiners.py
Original file line number Diff line number Diff line change
@@ -62,7 +62,7 @@
T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')
TimestampType = Union[int, long, float, Timestamp, Duration]
TimestampType = Union[int, float, Timestamp, Duration]


class Mean(object):
57 changes: 27 additions & 30 deletions sdks/python/apache_beam/utils/timestamp.py
Original file line number Diff line number Diff line change
@@ -28,7 +28,6 @@
from builtins import object
from typing import Any
from typing import Union
from typing import overload

import dateutil.parser
import pytz
@@ -50,7 +49,7 @@ class Timestamp(object):
"""

def __init__(self, seconds=0, micros=0):
# type: (Union[int, long, float], Union[int, long, float]) -> None
# type: (Union[int, float], Union[int, float]) -> None
if not isinstance(seconds, (int, long, float)):
raise TypeError('Cannot interpret %s %s as seconds.' % (
seconds, type(seconds)))
@@ -61,7 +60,7 @@ def __init__(self, seconds=0, micros=0):

@staticmethod
def of(seconds):
# type: (Union[int, long, float, Timestamp]) -> Timestamp
# type: (Union[int, float, Timestamp]) -> Timestamp
"""Return the Timestamp for the given number of seconds.
If the input is already a Timestamp, the input itself will be returned.
@@ -151,11 +150,14 @@ def __int__(self):
return self.micros // 1000000

def __eq__(self, other):
# type: (Union[int, long, float, Timestamp, Duration]) -> bool
# type: (object) -> bool
# Support equality with other types
if not isinstance(other, (int, long, float, Timestamp, Duration)):
return NotImplemented
# Allow comparisons between Duration and Timestamp values.
if not isinstance(other, Duration):
try:
other = Timestamp.of(other)
return self.micros == Timestamp.of(other).micros
except TypeError:
return NotImplemented
return self.micros == other.micros
@@ -166,7 +168,7 @@ def __ne__(self, other):
return not self == other

def __lt__(self, other):
# type: (Union[int, long, float, Timestamp, Duration]) -> bool
# type: (Union[int, float, Timestamp, Duration]) -> bool
# Allow comparisons between Duration and Timestamp values.
if not isinstance(other, Duration):
other = Timestamp.of(other)
@@ -176,21 +178,21 @@ def __hash__(self):
return hash(self.micros)

def __add__(self, other):
# type: (Union[int, long, float, Duration]) -> Timestamp
# type: (Union[int, float, Duration]) -> Timestamp
other = Duration.of(other)
return Timestamp(micros=self.micros + other.micros)

def __radd__(self, other):
# type: (Union[int, long, float, Duration]) -> Timestamp
# type: (Union[int, float, Duration]) -> Timestamp
return self + other

def __sub__(self, other):
# type: (Union[int, long, float, Duration]) -> Timestamp
# type: (Union[int, float, Duration]) -> Timestamp
other = Duration.of(other)
return Timestamp(micros=self.micros - other.micros)

def __mod__(self, other):
# type: (Union[int, long, float, Duration]) -> Duration
# type: (Union[int, float, Duration]) -> Duration
other = Duration.of(other)
return Duration(micros=self.micros % other.micros)

@@ -214,12 +216,12 @@ class Duration(object):
"""

def __init__(self, seconds=0, micros=0):
# type: (Union[int, long, float], Union[int, long, float]) -> None
# type: (Union[int, float], Union[int, float]) -> None
self.micros = int(seconds * 1000000) + int(micros)

@staticmethod
def of(seconds):
# type: (Union[int, long, float, Duration]) -> Duration
# type: (Union[int, float, Duration]) -> Duration
"""Return the Duration for the given number of seconds since Unix epoch.
If the input is already a Duration, the input itself will be returned.
@@ -255,19 +257,23 @@ def __float__(self):
return self.micros / 1000000

def __eq__(self, other):
# type: (Union[int, long, float, Duration, Timestamp]) -> bool
# type: (object) -> bool
# Support equality with other types
if not isinstance(other, (int, long, float, Timestamp, Duration)):
return NotImplemented
# Allow comparisons between Duration and Timestamp values.
if not isinstance(other, Timestamp):
other = Duration.of(other)
return self.micros == other.micros
return self.micros == Duration.of(other).micros
else:
return self.micros == other.micros

def __ne__(self, other):
# type: (Any) -> bool
# TODO(BEAM-5949): Needed for Python 2 compatibility.
return not self == other

def __lt__(self, other):
# type: (Union[int, long, float, Duration, Timestamp]) -> bool
# type: (Union[int, float, Duration, Timestamp]) -> bool
# Allow comparisons between Duration and Timestamp values.
if not isinstance(other, Timestamp):
other = Duration.of(other)
@@ -280,43 +286,34 @@ def __neg__(self):
# type: () -> Duration
return Duration(micros=-self.micros)

@overload
def __add__(self, other):
# type: (Timestamp) -> Timestamp
pass

@overload
def __add__(self, other):
# type: (Union[int, long, float, Duration]) -> Duration
pass

def __add__(self, other):
# type: (Union[int, float, Duration]) -> Duration
if isinstance(other, Timestamp):
return other + self
return NotImplemented
other = Duration.of(other)
return Duration(micros=self.micros + other.micros)

def __radd__(self, other):
return self + other

def __sub__(self, other):
# type: (Union[int, long, float, Duration]) -> Duration
# type: (Union[int, float, Duration]) -> Duration
other = Duration.of(other)
return Duration(micros=self.micros - other.micros)

def __rsub__(self, other):
return -(self - other)

def __mul__(self, other):
# type: (Union[int, long, float, Duration]) -> Duration
# type: (Union[int, float, Duration]) -> Duration
other = Duration.of(other)
return Duration(micros=self.micros * other.micros // 1000000)

def __rmul__(self, other):
return self * other

def __mod__(self, other):
# type: (Union[int, long, float, Duration]) -> Duration
# type: (Union[int, float, Duration]) -> Duration
other = Duration.of(other)
return Duration(micros=self.micros % other.micros)

8 changes: 3 additions & 5 deletions sdks/python/apache_beam/utils/windowed_value.py
Original file line number Diff line number Diff line change
@@ -36,8 +36,6 @@
from typing import Tuple
from typing import Union

from past.builtins import long

from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import MIN_TIMESTAMP
from apache_beam.utils.timestamp import Timestamp
@@ -180,7 +178,7 @@ class WindowedValue(object):

def __init__(self,
value,
timestamp, # type: Union[int, long, float, Timestamp]
timestamp, # type: Union[int, float, Timestamp]
windows, # type: Tuple[BoundedWindow, ...]
pane_info=PANE_INFO_UNKNOWN
):
@@ -271,8 +269,8 @@ class _IntervalWindowBase(object):
"""

def __init__(self,
start, # type: Optional[Union[int, long, float, Timestamp]]
end # type: Optional[Union[int, long, float, Timestamp]]
start, # type: Optional[Union[int, float, Timestamp]]
end # type: Optional[Union[int, float, Timestamp]]
):
if start is not None and end is not None:
self._start_object = Timestamp.of(start) # type: Optional[Timestamp]

0 comments on commit 0791ebb

Please sign in to comment.