Event
ib_interface.eventkit.event
Event
Enable event passing between loosely coupled components. The event emits values to connected listeners and has a selection of operators to create general data flow pipelines.
Args: name: Name to use for this event.
name
def name() -> str
This event's name.
done
def done() -> bool
True if event has ended with no more emits coming,
False otherwise.
set_done
def set_done()
Set this event to be ended. The event should not emit anything after that.
value
def value()
This event's last emitted value.
connect
def connect(listener, error = None, done = None, keep_ref: bool = False) -> Event
Connect a listener to this event. If the listener is added multiple times then it is invoked just as many times on emit.
The += operator can be used as a synonym for this method::
import eventkit as ev
def f(a, b): print(a * b)
def g(a, b): print(a / b)
event = ev.Event() event += f event += g event.emit(10, 5)
Args:
listener (callable): The callback to invoke on emit of this event.
It gets the *args from an emit as arguments.
If the listener is a coroutine function, or a function that
returns an awaitable, the awaitable is run in the
asyncio event loop.
error (callable): The callback to invoke on error of this event.
It gets (this event, exception) as two arguments.
done (callable): The callback to invoke on ending of this event.
It gets this event as single argument.
keep_ref:
True: A strong reference to the callable is keptFalse: If the callable allows weak refs and it is garbage collected, then it is automatically disconnected from this event.
disconnect
def disconnect(listener, error = None, done = None)
Disconnect a listener from this event.
The -= operator can be used as a synonym for this method.
Args: listener (callable): The callback to disconnect. The callback is removed at most once. It is valid if the callback is already not connected. error (callable): The error callback to disconnect. done (callable): The done callback to disconnect.
disconnect_obj
def disconnect_obj(obj)
Disconnect all listeners on the given object. (also the error and done listeners).
Args: obj (object): The target object that is to be completely removed from this event.
emit
def emit(args = ())
Emit a new value to all connected listeners.
Args: args (tuple): Argument values to emit to listeners.
emit_threadsafe
def emit_threadsafe(args = ())
Threadsafe version of :meth:emit that doesn't invoke the
listeners directly but via the event loop of the main thread.
clear
def clear()
Disconnect all listeners.
run
def run() -> List
Start the asyncio event loop, run this event to completion and return all values as a list::
import eventkit as ev
ev.Timer(0.25, count=10).run() -> [0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5]
.. note::
When running inside a Jupyter notebook this will give an error
that the asyncio event loop is already running. This can be
remedied by applying
nest_asyncio <https://github.com/erdewit/nest_asyncio>_
or by using the top-level await statement of Jupyter::
await event.list()
pipe
def pipe(targets: Event = ())
Form several events into a pipe::
import eventkit as ev
e1 = ev.Sequence('abcde') e2 = ev.Enumerate().map(lambda i, c: (i, i + ord(c))) e3 = ev.Star().pluck(1).map(chr)
e1.pipe(e2, e3) # or: ev.Event.Pipe(e1, e2, e3) -> ['a', 'c', 'e', 'g', 'i']
Args:
targets: One or more Events that have no source yet,
or Event constructors that needs no arguments.
fork
def fork(targets: Event = ()) -> Fork
Fork this event into one or more target events. Square brackets can be used as a synonym::
import eventkit as ev
ev.Range(2, 5)[ev.Min, ev.Max, ev.Sum].zip() -> [(2, 2, 2), (2, 3, 5), (2, 4, 9)]
The events in the fork can be combined by one of the join
methods of Fork.
Args:
targets: One or more events that have no source yet,
or Event constructors that need no arguments.
set_source
def set_source(source)
aiter
def aiter(skip_to_last: bool = False, tuples: bool = False)
Create an asynchronous iterator that yields the emitted values from this event::
async def coro(): async for args in event.aiter(): ...
:meth:__aiter__ is a synonym for :meth:aiter with
default arguments,
Args: skip_to_last:
True: Backlogged source values are skipped over to yield only the latest value. Can be used as a slipper clutch between a source that produces too fast and the handling that can't keep up.False: All events are yielded. tuples:True: Always yield arguments as a tuple.False: Unpack single argument tuples.
init
def init(obj, event_names: Iterable)
Convenience function for initializing multiple events as members of the given object.
Args: event_names: Names to use for the created events.
create
def create(obj)
Create an event from a async iterator, awaitable, or event constructor without arguments.
Args: obj (object): The source object. If it's already an event then it is passed as-is.
wait
def wait(future: Awaitable) -> Wait
Create a new event that emits the value of the awaitable when it becomes available and then set this event done.
:meth:wait and :meth:__await__ are each other's inverse.
Args: future: Future to wait on.
aiterate
def aiterate(ait: AsyncIterable) -> Aiterate
Create a new event that emits the yielded values from the asynchronous iterator.
The asynchronous iterator serves as a source for both the time and value of emits.
:meth:aiterate and :meth:__aiter__ are each other's inverse.
Args:
ait: The asynchronous source iterator. It must await
at least once; If necessary use::
await asyncio.sleep(0)
sequence
def sequence(values: Iterable, interval: float = 0, times: Union[Iterable[float], None] = None) -> Sequence
Create a new event that emits the given values.
Supply at most one interval or times.
Args:
values: The source values.
interval: Time interval in seconds between values.
times: Relative times for individual values, in seconds since
start of event. The sequence should match values.
repeat
def repeat(value = NO_VALUE, count = 1, interval: float = 0, times: Union[Iterable[float], None] = None) -> Repeat
Create a new event that repeats value a number of count times.
Args:
value (object): The value to emit.
count (int): Number of times to emit.
interval: Time interval in seconds between values.
times: Relative times for individual values, in seconds since
start of event. The sequence should match values.
range
def range(args = (), interval: float = 0, times: Union[Iterable[float], None] = None) -> Range
Create a new event that emits the values from a range.
Args:
args (tuple): Same as for built-in range.
interval: Time interval in seconds between values.
times: Relative times for individual values, in seconds since
start of event. The sequence should match the range.
timerange
def timerange(start = 0, end = None, step = 1) -> Timerange
Create a new event that emits the datetime value, at that datetime, from a range of datetimes.
Args: start (float): Start time, can be specified as:
-
datetime.datetime. -
datetime.time: Today is used as date. -
intorfloat: Number of seconds relative to now. Values will be quantized to the given step. end (float): End time, can be specified as: -
datetime.datetime. -
datetime.time: Today is used as date. -
None: No end limit. step (float): Number of seconds, ordatetime.timedelta, to space between values.
timer
def timer(interval: float, count: Union[int, None] = None) -> Timer
Create a new timer event that emits at regularly paced intervals the number of seconds since starting it.
Args:
interval: Time interval in seconds between emits.
count: Number of times to emit, or None for no limit.
marble
def marble(s: str, interval: float = 0, times: Union[Iterable[float], None] = None) -> Marble
Create a new event that emits the values from a Rx-type marble string.
Args: s: The string with characters that are emitted. interval: Time interval in seconds between values. times: Relative times for individual values, in seconds since start of event. The sequence should match the marble string.
filter
def filter(predicate = bool) -> Filter
For every source value, apply predicate and re-emit when True.
Args:
predicate (callable): The function to test every source value with.
The default is to test the general truthiness with bool().
skip
def skip(count: int = 1) -> Skip
Drop the first count values from source and follow the source
after that.
Args: count: Number of source values to drop.
take
def take(count: int = 1) -> Take
Re-emit first count values from the source and then end.
Args: count: Number of source values to re-emit.
takewhile
def takewhile(predicate = bool) -> TakeWhile
Re-emit values from the source until the predicate becomes False and then end.
Args:
predicate (callable): The function to test every source value with.
The default is to test the general truthiness with bool().
dropwhile
def dropwhile(predicate = lambda x: not x) -> DropWhile
Drop source values until the predicate becomes False and after that re-emit everything from the source.
Args: predicate (callable): The function to test every source value with. The default is to test the inverted general truthiness.
takeuntil
def takeuntil(notifier: Event) -> TakeUntil
Re-emit values from the source until the notifier emits
and then end. If the notifier ends without any emit then
keep passing source values.
Args: notifier: Event that signals to end this event.
constant
def constant(constant) -> Constant
On emit of the source emit a constant value::
emit(value) -> emit(constant)
Args: constant (object): The constant value to emit.
iterate
def iterate(it) -> Iterate
On emit of the source, emit the next value from an iterator::
emit(a, b, ...) -> emit(next(it))
The time of events follows the source and the values follow the iterator.
Args: it (iterable): The source iterator to use for generating values. When the iterator is exhausted the event is set to be done.
count
def count(start = 0, step = 1) -> Count
Count and emit the number of source emits::
emit(a, b, ...) -> emit(count)
Args: start (int): Start count. step (int): Add count by this amount for every new source value.
enumerate
def enumerate(start = 0, step = 1) -> Enumerate
Add a count to every source value::
emit(a, b, ...) -> emit(count, a, b, ...)
Args: start (float): Start count. step (float): Increase by this amount for every new source value.
timestamp
def timestamp() -> Timestamp
Add a timestamp (from time.time()) to every source value::
emit(a, b, ...) -> emit(timestamp, a, b, ...)
The timestamp is the float number in seconds since the midnight Jan 1, 1970 epoch.
partial
def partial(left_args = ()) -> Partial
Pad source values with extra arguments on the left::
emit(a, b, ...) -> emit(*left_args, a, b, ...)
Args: left_args (tuple): Arguments to inject.
partial_right
def partial_right(right_args = ()) -> PartialRight
Pad source values with extra arguments on the right::
emit(a, b, ...) -> emit(a, b, ..., *right_args)
Args: right_args (tuple): Arguments to inject.
star
def star() -> Star
Unpack a source tuple into positional arguments, similar to the star operator::
emit((a, b, ...)) -> emit(a, b, ...)
:meth:star and :meth:pack are each other's inverse.
pack
def pack() -> Pack
Pack positional arguments into a tuple::
emit(a, b, ...) -> emit((a, b, ...))
:meth:star and :meth:pack are each other's inverse.
pluck
def pluck(selections: Union[int, str] = ()) -> Pluck
Extract arguments or nested properties from the source values.
Select which argument positions to keep::
emit(a, b, c, d).pluck(1, 2) -> emit(b, c)
Re-order arguments::
emit(a, b, c).pluck(2, 1, 0) -> emit(c, b, a)
To do an empty emit leave selections empty::
emit(a, b).pluck() -> emit()
Select nested properties from positional arguments::
emit(person, account).pluck( '1.number', '0.address.street') ->
emit(account.number, person.address.street)
If no value can be extracted then NO_VALUE is emitted in its place.
Args: selections: The values to extract.
map
def map(func, timeout = None, ordered = True, task_limit = None) -> Map
Apply a sync or async function to source values using positional arguments::
emit(a, b, ...) -> emit(func(a, b, ...))
or if func returns an awaitable then it will be awaited::
emit(a, b, ...) -> emit(await func(a, b, ...))
In case of timeout or other failure, NO_VALUE is emitted.
Args: func (callable): The function or coroutine constructor to apply. timeout (float): Timeout in seconds since coroutine is started ordered (bool):
True: The order of emitted results preserves the order of the source values.False: Results are in order of completion. task_limit (int): Max number of concurrent tasks, or None for no limit.
timeout, ordered and task_limit apply to
async functions only.
emap
def emap(constr, joiner: AddableJoinOp) -> Emap
Higher-order event map that creates a new Event instance
for every source value::
emit(a, b, ...) -> new Event constr(a, b, ...)
Args:
constr (type): Constructor function for creating a new event.
Apart from returning an Event, the constructor may also
return an awaitable or an asynchronous iterator, in which
case an Event will be created.
joiner: Join operator to combine the emits of nested events.
mergemap
def mergemap(constr) -> Mergemap
:meth:emap that uses :meth:merge to combine the nested events::
marbles = [ 'A B C D', '_1 2 3 4', '__K L M N']
ev.Range(3).mergemap(lambda v: ev.Marble(marbles[v])) -> ['A', '1', 'K', 'B', '2', 'L', '3', 'C', 'M', '4', 'D', 'N']
concatmap
def concatmap(constr) -> Concatmap
:meth:emap that uses :meth:concat to combine the nested events::
marbles = [ 'A B C D', '_ 1 2 3 4', '__ K L M N']
ev.Range(3).concatmap(lambda v: ev.Marble(marbles[v])) -> ['A', 'B', '1', '2', '3', 'K', 'L', 'M', 'N']
chainmap
def chainmap(constr) -> Chainmap
:meth:emap that uses :meth:chain to combine the nested events::
marbles = [ 'A B C D ', '_ 1 2 3 4', '__ K L M N']
ev.Range(3).chainmap(lambda v: ev.Marble(marbles[v])) -> ['A', 'B', 'C', 'D', '1', '2', '3', '4', 'K', 'L', 'M', 'N']
switchmap
def switchmap(constr) -> Switchmap
:meth:emap that uses :meth:switch to combine the nested events::
marbles = [ 'A B C D ', '_ K L M N', '__ 1 2 3 4' ] ev.Range(3).switchmap(lambda v: Event.marble(marbles[v])) -> ['A', 'B', '1', '2', 'K', 'L', 'M', 'N'])
reduce
def reduce(func, initializer = NO_VALUE) -> Reduce
Apply a two-argument reduction function to the previous reduction result and the current value and emit the new reduction result.
Args: func (callable): Reduction function::
emit(args) -> emit(func(prev_args, args))
initializer (object): First argument of first reduction::
first_result = func(initializer, first_value)
If no initializer is given, then the first result is emitted on the second source emit.
min
def min() -> Min
Minimum value.
max
def max() -> Max
Maximum value.
sum
def sum(start = 0) -> Sum
Total sum.
Args: start (float): Value added to total sum.
product
def product(start = 1) -> Product
Total product.
Args: start (float): Initial start value.
mean
def mean() -> Mean
Total average.
any
def any() -> Any
Test if predicate holds for at least one source value.
all
def all() -> All
Test if predicate holds for all source values.
ema
def ema(n: Union[int, None] = None, weight: Union[float, None] = None) -> Ema
Exponential moving average.
Args: n: Number of periods. weight: Weight of new value.
Give either n or weight.
The relation is weight = 2 / (n + 1).
previous
def previous(count: int = 1) -> Previous
For every source value, emit the count-th previous value::
source: -ab---c--d-e- output: --a---b--c-d-
Starts emitting on the count + 1-th source emit.
Args: count: Number of periods to go back.
pairwise
def pairwise() -> Pairwise
Emit (previous_source_value, current_source_value) tuples.
Starts emitting on the second source emit::
source: -a----b------c--------d----- output: ------(a,b)--(b,c)----(c,d)-
changes
def changes() -> Changes
Emit only source values that have changed from the previous value.
unique
def unique(key = None) -> Unique
Emit only unique values, dropping values that have already been emitted.
Args:
key (callable): The callable 'key(value)is used to group values. The default ofNone`` groups values by equality.
The resulting group must be hashable.
last
def last() -> Last
Wait until source has ended and re-emit its last value.
list
def list() -> ListOp
Collect all source values and emit as list when the source ends.
deque
def deque(count = 0) -> Deque
Emit a deque with the last count values from the source
(or less in the lead-in phase).
Args: count (int): Number of last periods to use, or 0 to use all.
array
def array(count = 0) -> Array
Emit a numpy array with the last count values from the source
(or less in the lead-in phase).
Args: count (int): Number of last periods to use, or 0 to use all.
chunk
def chunk(size: int) -> Chunk
Chunk values up in lists of equal size. The last chunk can be shorter.
Args: size: Chunk size.
chunkwith
def chunkwith(timer: Event, emit_empty: bool = True) -> ChunkWith
Emit a chunked list of values when the timer emits.
Args: timer: Event to use for timing the chunks. emit_empty: Emit empty list if no values present since last emit.
chain
def chain(sources: Event = ()) -> Chain
Re-emit from a source until it ends, then move to the next source, Repeat until all sources have ended, ending the chain. Emits from pending sources are queued up::
source 1: -a----b---c| source 2: --2-----3--4| source 3: ------------x---------y--| output: -a----b---c2--3--4x---y--|
Args: sources: Source events.
merge
def merge(sources = ()) -> Merge
Re-emit everything from the source events::
source 1: -a----b-------------c------d-| source 2: ------1-----2------3--4-| source 3: --------x----y--| output: -a----b--1--x--2-y--c-3--4-d-|
Args: sources (list): Source events.
concat
def concat(sources = ()) -> Concat
Re-emit everything from one source until it ends and then move to the next source::
source 1: -a----b-----| source 2: --1-----2-----3----4--| source 3: -----------x--y--| output: -a----b---------3----4----x--y--|
Args: sources (list): Source events.
switch
def switch(sources = ()) -> Switch
Re-emit everything from one source and move to another source as soon as that other source starts to emit::
source 1: -a----b---c-----d---| source 2: -----------x---y-| source 3: ---------1----2----3-----| output: -a----b--1----2--x---y---|
Args: sources (list): Source events.
zip
def zip(sources = ()) -> Zip
Zip sources together: The i-th emit has the i-th value from each source as positional arguments. Only emits when each source has emtted its i-th value and ends when any source ends::
source 1: -a----b------------------c------d---e--f---| source 2: --------1-------2-------3---------4-----| output emit: --------(a,1)---(b,2)----(c,3)----(d,4)-|
Args: sources (list): Source events.
ziplatest
def ziplatest(sources = (), partial: bool = True) -> Ziplatest
Emit zipped values with the latest value from each of the source events. Emits every time when a source emits::
source 1: -a-------------------b-------c---| source 2: ---------------1--------------------2------| output emit: (a,NoValue)---(a,1)-(b,1)---(c,1)--(c,2)--|
Args: sources (list): Source events. partial:
- True: Use
NoValuefor sources that have not emitted yet. - False: Wait until all sources have emitted.
delay
def delay(delay) -> Delay
Time-shift all source events by a delay::
source: -abc-d-e---f---| output: ---abc-d-e---f---|
This applies to the source errors and the source done event as well.
Args: delay (float): Time delay of all events (in seconds).
timeout
def timeout(timeout) -> Timeout
When the source doesn't emit for longer than the timeout period, do an empty emit and set this event as done.
Args: timeout (float): Timeout value.
throttle
def throttle(maximum, interval, cost_func = None) -> Throttle
Limit number of emits per time without dropping values. Values that come in too fast are queued and re-emitted as soon as allowed by the limits.
A nested status_event emits True when throttling starts
and False when throttling ends.
The limit can be dynamically changed with set_limit.
Args:
maximum (float): Maximum payload per interval.
interval (float): Time interval (in seconds).
cost_func (callable): The sum of cost_func(value) for every
source value inside the interval that is to remain
under the maximum. The default is to count every
source value as 1.
debounce
def debounce(delay, on_first: bool = False) -> Debounce
Filter out values from the source that happen in rapid succession.
Args: delay (float): Maximal time difference (in seconds) between successive values before debouncing kicks in. on_first:
-
True: First value is send immediately and following values in the rapid succession are dropped::
source: -abcd----efg- output: -a-------e---
-
False: Last value of a rapid succession is send after the delay and the values before that are dropped::
source: -abcd----efg-- output: ----d------g-
copy
def copy() -> Copy
Create a shallow copy of the source values.
deepcopy
def deepcopy() -> Deepcopy
Create a deep copy of the source values.
sample
def sample(timer: Event) -> Sample
At the times that the timer emits, sample the value from this event and emit the sample.
Args: timer: Event used to time the samples.
errors
def errors() -> Errors
Emit errors from the source.
end_on_error
def end_on_error() -> EndOnError
End on any error from the source.