"""
timeline & runner implementation
"""
import copy
import datetime
import inspect
import sys
import threading
import time
from functools import wraps
from unittest import mock
from .errors import SegmentNotComplete, TimeOutofBounds
from .patches import Date, Datetime
from .utils import chained, time_in_seconds, timedelta_to_seconds
IGNORED_MODULES = set()
_NO_EXCEPTION = (None, None, None)
class Decorator:
def __call__(self, fn):
@wraps(fn)
def inner(*args, **kw):
self.__enter__()
exc = _NO_EXCEPTION
try:
if "timeline" in inspect.signature(fn).parameters:
result = fn(*args, timeline=self, **kw)
else:
result = fn(*args, **kw)
except Exception:
exc = sys.exc_info()
catch = self.__exit__(*exc)
if not catch and exc is not _NO_EXCEPTION:
raise exc[0]
return result
return inner
class Segment:
"""
utility class to manage execution result and timings
for :class:`SyncRunner
"""
def __init__(self):
self.__error = False
self.__response = None
self.__start = time.time()
self.__end = None
def complete(self, response):
"""
called upon successful completion of the segment
"""
self.__response = response
def complete_with_error(self, exception):
"""
called if the segment errored during execution
"""
self.__error = exception
@property
def complete_time(self):
"""
returns the completion time
"""
return self.__end
@complete_time.setter
def complete_time(self, completion_time):
"""
sets the completion time
"""
self.__end = completion_time
@property
def start_time(self):
"""
returns the start time
"""
return self.__start
@start_time.setter
def start_time(self, start_time):
"""
sets the start time
"""
self.__start = start_time
@property
def runtime(self):
"""
returns the total execution time of the segment
"""
if self.__end:
return self.complete_time - self.start_time
else:
raise SegmentNotComplete
@property
def response(self):
"""
returns the return value captured in the segment or raises
the :exception:`exceptions.Exception` that was caught.
"""
if not self.__end:
raise SegmentNotComplete
else:
if self.__error:
raise self.__error[0]
return self.__response
[docs]class Timeline(Decorator):
"""
Timeline context manager. Within this context
the following builtins respect the alterations made
to the timeline:
- :func:`time.time`
- :func:`time.time_ns`
- :func:`time.monotonic`
- :func:`time.monotonic_ns`
- :func:`time.sleep`
- :func:`time.localtime`
- :func:`time.gmtime`
- :meth:`datetime.datetime.now`
- :meth:`datetime.date.today`
- :meth:`datetime.datetime.utcnow`
The class can be used either as a context manager or a decorator.
The following are all valid ways to use it.
.. code-block:: python
with Timeline(scale=10, start=datetime.datetime(2012,12,12)):
....
fast_timeline = Timeline(scale=10).forward(120)
with fast_timeline as timeline:
....
delta = datetime.date(2015,1,1) - datetime.date.today()
future_frozen_timeline = Timeline(scale=10000).freeze().forward(delta)
with future_frozen_timeline as timeline:
...
@Timeline(scale=100)
def slow():
time.sleep(120)
:param float scale: > 1 time will go faster and < 1 it will be slowed down.
:param start: if specified starts the timeline at the given value (either a
floating point representing seconds since epoch or a
:class:`datetime.datetime` object)
"""
class_mappings = {
"date": (datetime.date, Date),
"datetime": (datetime.datetime, Datetime),
}
def __init__(self, scale=1, start=None):
self.reference = time.time()
self.offset = (
time_in_seconds(start) - self.reference if start is not None else 0.0
)
self.freeze_point = self.freeze_at = None
self.patchers = []
self.mock_mappings = {
"datetime.date": (datetime.date, Date),
"datetime.datetime": (datetime.datetime, Datetime),
"time.monotonic": (time.monotonic, self.__time_monotonic),
"time.monotonic_ns": (time.monotonic_ns, self.__time_monotonic_ns),
"time.time": (time.time, self.__time_time),
"time.time_ns": (time.time_ns, self.__time_time_ns),
"time.sleep": (time.sleep, self.__time_sleep),
"time.gmtime": (time.gmtime, self.__time_gmtime),
"time.localtime": (time.localtime, self.__time_localtime),
}
self.func_mappings = {
"time": (time.time, self.__time_time),
"time_ns": (time.time_ns, self.__time_time_ns),
"monotonic": (time.monotonic, self.__time_monotonic),
"monotonic_ns": (time.monotonic_ns, self.__time_monotonic_ns),
"sleep": (time.sleep, self.__time_sleep),
"gmtime": (time.gmtime, self.__time_gmtime),
"localtime": (time.localtime, self.__time_localtime),
}
self.factor = scale
def _get_original(self, fn_or_mod):
"""
returns the original moduel or function
"""
if fn_or_mod in self.mock_mappings:
return self.mock_mappings[fn_or_mod][0]
elif fn_or_mod in self.func_mappings:
return self.func_mappings[fn_or_mod][0]
else:
return self.class_mappings[fn_or_mod][0]
def _get_fake(self, fn_or_mod):
"""
returns the mocked/patched module or function
"""
if fn_or_mod in self.mock_mappings:
return self.mock_mappings[fn_or_mod][1]
elif fn_or_mod in self.func_mappings:
return self.func_mappings[fn_or_mod][1]
else:
return self.class_mappings[fn_or_mod][1]
def __compute_time(self, freeze_point, offset, original, unit=1, cast_func=float):
"""
computes the current_time after accounting for
any adjustments due to :attr:`factor` or invocations
of :meth:`freeze`, :meth:`rewind` or :meth:`forward`
"""
if freeze_point is not None:
return unit * (offset + freeze_point)
else:
delta = self._get_original(original)() - (unit * self.reference)
return cast_func(
unit * self.reference + (delta * self.factor) + unit * offset
)
def __check_out_of_bounds(self, offset=None, freeze_point=None):
"""
ensures that the time that would be calculated based on any
offset or freeze point would not result in jumping beyond the epoch
"""
next_time = self.__compute_time(
freeze_point or self.freeze_point, offset or self.offset, "time.time"
)
if next_time < 0:
raise TimeOutofBounds(next_time)
def __time_monotonic(self):
"""
patched version of :func:`time.monotonic`
"""
return self.__compute_time(self.freeze_point, self.offset, "time.monotonic")
def __time_monotonic_ns(self):
"""
patched version of :func:`time.monotonic_ns`
"""
return self.__compute_time(
self.freeze_point, self.offset, "time.monotonic_ns", 1e9, int
)
def __time_time(self):
"""
patched version of :func:`time.time`
"""
return self.__compute_time(self.freeze_point, self.offset, "time.time")
def __time_time_ns(self):
"""
patched version of :func:`time.time_ns`
"""
return self.__compute_time(
self.freeze_point, self.offset, "time.time_ns", 1e9, int
)
def __time_gmtime(self, seconds=None):
"""
patched version of :func:`time.gmtime`
"""
return self._get_original("time.gmtime")(
seconds if seconds is not None else self.__time_time()
)
def __time_localtime(self, seconds=None):
"""
patched version of :func:`time.localtime`
"""
return self._get_original("time.localtime")(
seconds if seconds is not None else self.__time_time()
)
def __time_sleep(self, amount):
"""
patched version of :func:`time.sleep`
"""
self._get_original("time.sleep")(1.0 * amount / self.factor)
[docs] @chained
def forward(self, amount):
"""
forwards the timeline by the specified :attr:`amount`
:param amount: either an integer representing seconds or
a :class:`datetime.timedelta` object
"""
offset = self.offset
if isinstance(amount, datetime.timedelta):
offset += timedelta_to_seconds(amount)
else:
offset += amount
self.__check_out_of_bounds(offset=offset)
self.offset = offset
[docs] @chained
def rewind(self, amount):
"""
rewinds the timeline by the specified :attr:`amount`
:param amount: either an integer representing seconds or
a :class:`datetime.timedelta` object
"""
offset = self.offset
if isinstance(amount, datetime.timedelta):
offset -= timedelta_to_seconds(amount)
else:
offset -= amount
self.__check_out_of_bounds(offset=offset)
self.offset = offset
[docs] @chained
def freeze(self, target_time=None):
"""
freezes the timeline
:param target_time: the time to freeze at as either a float
representing seconds since the epoch or a :class:`datetime.datetime`
object. If not provided time will be frozen at the current time of
the enclosing :class:`Timeline`
"""
if target_time is None:
freeze_point = self._get_fake("time.time")()
else:
freeze_point = time_in_seconds(target_time)
self.__check_out_of_bounds(freeze_point=freeze_point)
self.freeze_point = freeze_point
self.offset = 0
[docs] @chained
def unfreeze(self):
"""
if a call to :meth:`freeze` was previously made, the timeline will be
unfrozen to the point which :meth:`freeze` was invoked.
.. warning::
Since unfreezing will reset the timeline back to the point in
when the :meth:`freeze` was invoked - the effect of previous
invocations of :meth:`forward` and :meth:`rewind` will
be lost. This is by design so that freeze/unfreeze can be used as
a checkpoint mechanism.
"""
if self.freeze_point is not None:
self.reference = self._get_original("time.time")()
self.offset = time_in_seconds(self.freeze_point) - self.reference
self.freeze_point = None
[docs] @chained
def scale(self, factor):
"""
changes the speed at which time elapses and how long sleeps last for.
:param float factor: > 1 time will go faster and < 1 it will be slowed
down.
"""
self.factor = factor
self.reference = self._get_original("time.time")()
[docs] @chained
def reset(self):
"""
resets the current timeline to the actual time now
with a scale factor 1
"""
self.factor = 1
self.freeze_point = None
self.reference = self._get_original("time.time")()
self.offset = 0
def __enter__(self):
for name in list(sys.modules.keys()):
module = sys.modules[name]
if module in IGNORED_MODULES:
continue
mappings = copy.copy(self.class_mappings)
mappings.update(self.func_mappings)
try:
for obj in mappings:
if obj in dir(module) and getattr(
module, obj
) == self._get_original(obj):
path = "{}.{}".format(name, obj)
if path not in self.mock_mappings:
patcher = mock.patch(path, self._get_fake(obj))
patcher.start()
self.patchers.append(patcher)
# this is done for cases where invalid modules are on
# sys modules.
except: # noqa: E722
IGNORED_MODULES.add(module)
for time_obj in self.mock_mappings:
patcher = mock.patch(time_obj, self._get_fake(time_obj))
patcher.start()
self.patchers.append(patcher)
return self
def __exit__(self, exc_type, exc_value, traceback):
for patcher in self.patchers:
patcher.stop()
self.patchers = []
[docs]class ScaledRunner:
"""
manages the execution of a callable within a :class:`hiro.Timeline`
context.
"""
def __init__(self, factor, func, *args, **kwargs):
self.func = func
self.func_args = args
self.func_kwargs = kwargs
self.segment = Segment()
self.factor = factor
self.__call__()
def _run(self):
"""
managed execution of :attr:`func`
"""
self.segment.start_time = time.time()
with Timeline(scale=self.factor):
try:
self.segment.complete(self.func(*self.func_args, **self.func_kwargs))
# will be rethrown
except: # noqa: E722
self.segment.complete_with_error(sys.exc_info())
self.segment.complete_time = time.time()
def __call__(self):
self._run()
return self
[docs] def get_response(self):
"""
:returns: the return value from :attr:`func`
:raises: Exception if the :attr:`func` raised one during execution
"""
return self.segment.response
[docs] def get_execution_time(self):
"""
:returns: the real execution time of :attr:`func` in seconds
"""
return self.segment.runtime
[docs]class ScaledThreadedRunner(ScaledRunner):
"""
manages the threaded execution of a callable within a
:class:`hiro.Timeline` context.
"""
def __init__(self, *args, **kwargs):
self.thread_runner = threading.Thread(target=self._run)
super().__init__(*args, **kwargs)
def __call__(self):
self.thread_runner.start()
return self
[docs] def is_running(self):
"""
:rtype bool: whether the :attr:`func` is still running or not.
"""
return self.thread_runner.is_alive()
[docs] def join(self):
"""
waits for the :attr:`func` to complete execution.
"""
return self.thread_runner.join()
[docs]def run_sync(factor, func, *args, **kwargs):
"""
Executes a callable within a :class:`hiro.Timeline`
:param int factor: scale factor to use for the timeline during execution
:param function func: the function to invoke
:param args: the arguments to pass to the function
:param kwargs: the keyword arguments to pass to the function
:returns: an instance of :class:`hiro.core.ScaledRunner`
"""
return ScaledRunner(factor, func, *args, **kwargs)
[docs]def run_threaded(factor, func, *args, **kwargs):
"""
Executes a callable in a separate thread within a :class:`hiro.Timeline`
:param int factor: scale factor to use for the timeline during execution
:param function func: the function to invoke
:param args: the arguments to pass to the function
:param kwargs: the keyword arguments to pass to the function
:returns: an instance of :class:`hiro.core.ScaledThreadedRunner`
"""
return ScaledThreadedRunner(factor, func, *args, **kwargs)
# For backward compatibility
[docs]def run_async(factor, func, *args, **kwargs):
"""
Executes a callable in a separate thread within a :class:`hiro.Timeline`
:param int factor: scale factor to use for the timeline during execution
:param function func: the function to invoke
:param args: the arguments to pass to the function
:param kwargs: the keyword arguments to pass to the function
:returns: an instance of :class:`hiro.core.ScaledAsyncRunner`
.. deprecated:: 1.0.0
Use :meth:`run_threaded`
"""
return run_threaded(factor, func, *args, **kwargs)
ScaledAsyncRunner = ScaledThreadedRunner