Skip to main content

Event

ib_interface.eventkit.event

Event

Enable event passing between loosely coupled components. The event emits values to connected listeners and has a selection of operators to create general data flow pipelines.

Args: name: Name to use for this event.

name

def name() -> str

This event's name.

done

def done() -> bool

True if event has ended with no more emits coming, False otherwise.

set_done

def set_done()

Set this event to be ended. The event should not emit anything after that.

value

def value()

This event's last emitted value.

connect

def connect(listener, error = None, done = None, keep_ref: bool = False) -> Event

Connect a listener to this event. If the listener is added multiple times then it is invoked just as many times on emit.

The += operator can be used as a synonym for this method::

import eventkit as ev

def f(a, b): print(a * b)

def g(a, b): print(a / b)

event = ev.Event() event += f event += g event.emit(10, 5)

Args: listener (callable): The callback to invoke on emit of this event. It gets the *args from an emit as arguments. If the listener is a coroutine function, or a function that returns an awaitable, the awaitable is run in the asyncio event loop. error (callable): The callback to invoke on error of this event. It gets (this event, exception) as two arguments. done (callable): The callback to invoke on ending of this event. It gets this event as single argument. keep_ref:

  • True: A strong reference to the callable is kept
  • False: If the callable allows weak refs and it is garbage collected, then it is automatically disconnected from this event.

disconnect

def disconnect(listener, error = None, done = None)

Disconnect a listener from this event.

The -= operator can be used as a synonym for this method.

Args: listener (callable): The callback to disconnect. The callback is removed at most once. It is valid if the callback is already not connected. error (callable): The error callback to disconnect. done (callable): The done callback to disconnect.

disconnect_obj

def disconnect_obj(obj)

Disconnect all listeners on the given object. (also the error and done listeners).

Args: obj (object): The target object that is to be completely removed from this event.

emit

def emit(args = ())

Emit a new value to all connected listeners.

Args: args (tuple): Argument values to emit to listeners.

emit_threadsafe

def emit_threadsafe(args = ())

Threadsafe version of :meth:emit that doesn't invoke the listeners directly but via the event loop of the main thread.

clear

def clear()

Disconnect all listeners.

run

def run() -> List

Start the asyncio event loop, run this event to completion and return all values as a list::

import eventkit as ev

ev.Timer(0.25, count=10).run() -> [0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5]

.. note::

When running inside a Jupyter notebook this will give an error that the asyncio event loop is already running. This can be remedied by applying nest_asyncio <https://github.com/erdewit/nest_asyncio>_ or by using the top-level await statement of Jupyter::

await event.list()

pipe

def pipe(targets: Event = ())

Form several events into a pipe::

import eventkit as ev

e1 = ev.Sequence('abcde') e2 = ev.Enumerate().map(lambda i, c: (i, i + ord(c))) e3 = ev.Star().pluck(1).map(chr)

e1.pipe(e2, e3) # or: ev.Event.Pipe(e1, e2, e3) -> ['a', 'c', 'e', 'g', 'i']

Args: targets: One or more Events that have no source yet, or Event constructors that needs no arguments.

fork

def fork(targets: Event = ()) -> Fork

Fork this event into one or more target events. Square brackets can be used as a synonym::

import eventkit as ev

ev.Range(2, 5)[ev.Min, ev.Max, ev.Sum].zip() -> [(2, 2, 2), (2, 3, 5), (2, 4, 9)]

The events in the fork can be combined by one of the join methods of Fork.

Args: targets: One or more events that have no source yet, or Event constructors that need no arguments.

set_source

def set_source(source)

aiter

def aiter(skip_to_last: bool = False, tuples: bool = False)

Create an asynchronous iterator that yields the emitted values from this event::

async def coro(): async for args in event.aiter(): ...

:meth:__aiter__ is a synonym for :meth:aiter with default arguments,

Args: skip_to_last:

  • True: Backlogged source values are skipped over to yield only the latest value. Can be used as a slipper clutch between a source that produces too fast and the handling that can't keep up.
  • False: All events are yielded. tuples:
  • True: Always yield arguments as a tuple.
  • False: Unpack single argument tuples.

init

def init(obj, event_names: Iterable)

Convenience function for initializing multiple events as members of the given object.

Args: event_names: Names to use for the created events.

create

def create(obj)

Create an event from a async iterator, awaitable, or event constructor without arguments.

Args: obj (object): The source object. If it's already an event then it is passed as-is.

wait

def wait(future: Awaitable) -> Wait

Create a new event that emits the value of the awaitable when it becomes available and then set this event done.

:meth:wait and :meth:__await__ are each other's inverse.

Args: future: Future to wait on.

aiterate

def aiterate(ait: AsyncIterable) -> Aiterate

Create a new event that emits the yielded values from the asynchronous iterator.

The asynchronous iterator serves as a source for both the time and value of emits.

:meth:aiterate and :meth:__aiter__ are each other's inverse.

Args: ait: The asynchronous source iterator. It must await at least once; If necessary use::

await asyncio.sleep(0)

sequence

def sequence(values: Iterable, interval: float = 0, times: Union[Iterable[float], None] = None) -> Sequence

Create a new event that emits the given values. Supply at most one interval or times.

Args: values: The source values. interval: Time interval in seconds between values. times: Relative times for individual values, in seconds since start of event. The sequence should match values.

repeat

def repeat(value = NO_VALUE, count = 1, interval: float = 0, times: Union[Iterable[float], None] = None) -> Repeat

Create a new event that repeats value a number of count times.

Args: value (object): The value to emit. count (int): Number of times to emit. interval: Time interval in seconds between values. times: Relative times for individual values, in seconds since start of event. The sequence should match values.

range

def range(args = (), interval: float = 0, times: Union[Iterable[float], None] = None) -> Range

Create a new event that emits the values from a range.

Args: args (tuple): Same as for built-in range. interval: Time interval in seconds between values. times: Relative times for individual values, in seconds since start of event. The sequence should match the range.

timerange

def timerange(start = 0, end = None, step = 1) -> Timerange

Create a new event that emits the datetime value, at that datetime, from a range of datetimes.

Args: start (float): Start time, can be specified as:

  • datetime.datetime.

  • datetime.time: Today is used as date.

  • int or float: Number of seconds relative to now. Values will be quantized to the given step. end (float): End time, can be specified as:

  • datetime.datetime.

  • datetime.time: Today is used as date.

  • None: No end limit. step (float): Number of seconds, or datetime.timedelta, to space between values.

timer

def timer(interval: float, count: Union[int, None] = None) -> Timer

Create a new timer event that emits at regularly paced intervals the number of seconds since starting it.

Args: interval: Time interval in seconds between emits. count: Number of times to emit, or None for no limit.

marble

def marble(s: str, interval: float = 0, times: Union[Iterable[float], None] = None) -> Marble

Create a new event that emits the values from a Rx-type marble string.

Args: s: The string with characters that are emitted. interval: Time interval in seconds between values. times: Relative times for individual values, in seconds since start of event. The sequence should match the marble string.

filter

def filter(predicate = bool) -> Filter

For every source value, apply predicate and re-emit when True.

Args: predicate (callable): The function to test every source value with. The default is to test the general truthiness with bool().

skip

def skip(count: int = 1) -> Skip

Drop the first count values from source and follow the source after that.

Args: count: Number of source values to drop.

take

def take(count: int = 1) -> Take

Re-emit first count values from the source and then end.

Args: count: Number of source values to re-emit.

takewhile

def takewhile(predicate = bool) -> TakeWhile

Re-emit values from the source until the predicate becomes False and then end.

Args: predicate (callable): The function to test every source value with. The default is to test the general truthiness with bool().

dropwhile

def dropwhile(predicate = lambda x: not x) -> DropWhile

Drop source values until the predicate becomes False and after that re-emit everything from the source.

Args: predicate (callable): The function to test every source value with. The default is to test the inverted general truthiness.

takeuntil

def takeuntil(notifier: Event) -> TakeUntil

Re-emit values from the source until the notifier emits and then end. If the notifier ends without any emit then keep passing source values.

Args: notifier: Event that signals to end this event.

constant

def constant(constant) -> Constant

On emit of the source emit a constant value::

emit(value) -> emit(constant)

Args: constant (object): The constant value to emit.

iterate

def iterate(it) -> Iterate

On emit of the source, emit the next value from an iterator::

emit(a, b, ...) -> emit(next(it))

The time of events follows the source and the values follow the iterator.

Args: it (iterable): The source iterator to use for generating values. When the iterator is exhausted the event is set to be done.

count

def count(start = 0, step = 1) -> Count

Count and emit the number of source emits::

emit(a, b, ...) -> emit(count)

Args: start (int): Start count. step (int): Add count by this amount for every new source value.

enumerate

def enumerate(start = 0, step = 1) -> Enumerate

Add a count to every source value::

emit(a, b, ...) -> emit(count, a, b, ...)

Args: start (float): Start count. step (float): Increase by this amount for every new source value.

timestamp

def timestamp() -> Timestamp

Add a timestamp (from time.time()) to every source value::

emit(a, b, ...) -> emit(timestamp, a, b, ...)

The timestamp is the float number in seconds since the midnight Jan 1, 1970 epoch.

partial

def partial(left_args = ()) -> Partial

Pad source values with extra arguments on the left::

emit(a, b, ...) -> emit(*left_args, a, b, ...)

Args: left_args (tuple): Arguments to inject.

partial_right

def partial_right(right_args = ()) -> PartialRight

Pad source values with extra arguments on the right::

emit(a, b, ...) -> emit(a, b, ..., *right_args)

Args: right_args (tuple): Arguments to inject.

star

def star() -> Star

Unpack a source tuple into positional arguments, similar to the star operator::

emit((a, b, ...)) -> emit(a, b, ...)

:meth:star and :meth:pack are each other's inverse.

pack

def pack() -> Pack

Pack positional arguments into a tuple::

emit(a, b, ...) -> emit((a, b, ...))

:meth:star and :meth:pack are each other's inverse.

pluck

def pluck(selections: Union[int, str] = ()) -> Pluck

Extract arguments or nested properties from the source values.

Select which argument positions to keep::

emit(a, b, c, d).pluck(1, 2) -> emit(b, c)

Re-order arguments::

emit(a, b, c).pluck(2, 1, 0) -> emit(c, b, a)

To do an empty emit leave selections empty::

emit(a, b).pluck() -> emit()

Select nested properties from positional arguments::

emit(person, account).pluck( '1.number', '0.address.street') ->

emit(account.number, person.address.street)

If no value can be extracted then NO_VALUE is emitted in its place.

Args: selections: The values to extract.

map

def map(func, timeout = None, ordered = True, task_limit = None) -> Map

Apply a sync or async function to source values using positional arguments::

emit(a, b, ...) -> emit(func(a, b, ...))

or if func returns an awaitable then it will be awaited::

emit(a, b, ...) -> emit(await func(a, b, ...))

In case of timeout or other failure, NO_VALUE is emitted.

Args: func (callable): The function or coroutine constructor to apply. timeout (float): Timeout in seconds since coroutine is started ordered (bool):

  • True: The order of emitted results preserves the order of the source values.
  • False: Results are in order of completion. task_limit (int): Max number of concurrent tasks, or None for no limit.

timeout, ordered and task_limit apply to async functions only.

emap

def emap(constr, joiner: AddableJoinOp) -> Emap

Higher-order event map that creates a new Event instance for every source value::

emit(a, b, ...) -> new Event constr(a, b, ...)

Args: constr (type): Constructor function for creating a new event. Apart from returning an Event, the constructor may also return an awaitable or an asynchronous iterator, in which case an Event will be created. joiner: Join operator to combine the emits of nested events.

mergemap

def mergemap(constr) -> Mergemap

:meth:emap that uses :meth:merge to combine the nested events::

marbles = [ 'A B C D', '_1 2 3 4', '__K L M N']

ev.Range(3).mergemap(lambda v: ev.Marble(marbles[v])) -> ['A', '1', 'K', 'B', '2', 'L', '3', 'C', 'M', '4', 'D', 'N']

concatmap

def concatmap(constr) -> Concatmap

:meth:emap that uses :meth:concat to combine the nested events::

marbles = [ 'A B C D', '_ 1 2 3 4', '__ K L M N']

ev.Range(3).concatmap(lambda v: ev.Marble(marbles[v])) -> ['A', 'B', '1', '2', '3', 'K', 'L', 'M', 'N']

chainmap

def chainmap(constr) -> Chainmap

:meth:emap that uses :meth:chain to combine the nested events::

marbles = [ 'A B C D ', '_ 1 2 3 4', '__ K L M N']

ev.Range(3).chainmap(lambda v: ev.Marble(marbles[v])) -> ['A', 'B', 'C', 'D', '1', '2', '3', '4', 'K', 'L', 'M', 'N']

switchmap

def switchmap(constr) -> Switchmap

:meth:emap that uses :meth:switch to combine the nested events::

marbles = [ 'A B C D ', '_ K L M N', '__ 1 2 3 4' ] ev.Range(3).switchmap(lambda v: Event.marble(marbles[v])) -> ['A', 'B', '1', '2', 'K', 'L', 'M', 'N'])

reduce

def reduce(func, initializer = NO_VALUE) -> Reduce

Apply a two-argument reduction function to the previous reduction result and the current value and emit the new reduction result.

Args: func (callable): Reduction function::

emit(args) -> emit(func(prev_args, args))

initializer (object): First argument of first reduction::

first_result = func(initializer, first_value)

If no initializer is given, then the first result is emitted on the second source emit.

min

def min() -> Min

Minimum value.

max

def max() -> Max

Maximum value.

sum

def sum(start = 0) -> Sum

Total sum.

Args: start (float): Value added to total sum.

product

def product(start = 1) -> Product

Total product.

Args: start (float): Initial start value.

mean

def mean() -> Mean

Total average.

any

def any() -> Any

Test if predicate holds for at least one source value.

all

def all() -> All

Test if predicate holds for all source values.

ema

def ema(n: Union[int, None] = None, weight: Union[float, None] = None) -> Ema

Exponential moving average.

Args: n: Number of periods. weight: Weight of new value.

Give either n or weight. The relation is weight = 2 / (n + 1).

previous

def previous(count: int = 1) -> Previous

For every source value, emit the count-th previous value::

source: -ab---c--d-e- output: --a---b--c-d-

Starts emitting on the count + 1-th source emit.

Args: count: Number of periods to go back.

pairwise

def pairwise() -> Pairwise

Emit (previous_source_value, current_source_value) tuples. Starts emitting on the second source emit::

source: -a----b------c--------d----- output: ------(a,b)--(b,c)----(c,d)-

changes

def changes() -> Changes

Emit only source values that have changed from the previous value.

unique

def unique(key = None) -> Unique

Emit only unique values, dropping values that have already been emitted.

Args: key (callable): The callable 'key(value)is used to group values. The default ofNone`` groups values by equality. The resulting group must be hashable.

last

def last() -> Last

Wait until source has ended and re-emit its last value.

list

def list() -> ListOp

Collect all source values and emit as list when the source ends.

deque

def deque(count = 0) -> Deque

Emit a deque with the last count values from the source (or less in the lead-in phase).

Args: count (int): Number of last periods to use, or 0 to use all.

array

def array(count = 0) -> Array

Emit a numpy array with the last count values from the source (or less in the lead-in phase).

Args: count (int): Number of last periods to use, or 0 to use all.

chunk

def chunk(size: int) -> Chunk

Chunk values up in lists of equal size. The last chunk can be shorter.

Args: size: Chunk size.

chunkwith

def chunkwith(timer: Event, emit_empty: bool = True) -> ChunkWith

Emit a chunked list of values when the timer emits.

Args: timer: Event to use for timing the chunks. emit_empty: Emit empty list if no values present since last emit.

chain

def chain(sources: Event = ()) -> Chain

Re-emit from a source until it ends, then move to the next source, Repeat until all sources have ended, ending the chain. Emits from pending sources are queued up::

source 1: -a----b---c| source 2: --2-----3--4| source 3: ------------x---------y--| output: -a----b---c2--3--4x---y--|

Args: sources: Source events.

merge

def merge(sources = ()) -> Merge

Re-emit everything from the source events::

source 1: -a----b-------------c------d-| source 2: ------1-----2------3--4-| source 3: --------x----y--| output: -a----b--1--x--2-y--c-3--4-d-|

Args: sources (list): Source events.

concat

def concat(sources = ()) -> Concat

Re-emit everything from one source until it ends and then move to the next source::

source 1: -a----b-----| source 2: --1-----2-----3----4--| source 3: -----------x--y--| output: -a----b---------3----4----x--y--|

Args: sources (list): Source events.

switch

def switch(sources = ()) -> Switch

Re-emit everything from one source and move to another source as soon as that other source starts to emit::

source 1: -a----b---c-----d---| source 2: -----------x---y-| source 3: ---------1----2----3-----| output: -a----b--1----2--x---y---|

Args: sources (list): Source events.

zip

def zip(sources = ()) -> Zip

Zip sources together: The i-th emit has the i-th value from each source as positional arguments. Only emits when each source has emtted its i-th value and ends when any source ends::

source 1: -a----b------------------c------d---e--f---| source 2: --------1-------2-------3---------4-----| output emit: --------(a,1)---(b,2)----(c,3)----(d,4)-|

Args: sources (list): Source events.

ziplatest

def ziplatest(sources = (), partial: bool = True) -> Ziplatest

Emit zipped values with the latest value from each of the source events. Emits every time when a source emits::

source 1: -a-------------------b-------c---| source 2: ---------------1--------------------2------| output emit: (a,NoValue)---(a,1)-(b,1)---(c,1)--(c,2)--|

Args: sources (list): Source events. partial:

  • True: Use NoValue for sources that have not emitted yet.
  • False: Wait until all sources have emitted.

delay

def delay(delay) -> Delay

Time-shift all source events by a delay::

source: -abc-d-e---f---| output: ---abc-d-e---f---|

This applies to the source errors and the source done event as well.

Args: delay (float): Time delay of all events (in seconds).

timeout

def timeout(timeout) -> Timeout

When the source doesn't emit for longer than the timeout period, do an empty emit and set this event as done.

Args: timeout (float): Timeout value.

throttle

def throttle(maximum, interval, cost_func = None) -> Throttle

Limit number of emits per time without dropping values. Values that come in too fast are queued and re-emitted as soon as allowed by the limits.

A nested status_event emits True when throttling starts and False when throttling ends.

The limit can be dynamically changed with set_limit.

Args: maximum (float): Maximum payload per interval. interval (float): Time interval (in seconds). cost_func (callable): The sum of cost_func(value) for every source value inside the interval that is to remain under the maximum. The default is to count every source value as 1.

debounce

def debounce(delay, on_first: bool = False) -> Debounce

Filter out values from the source that happen in rapid succession.

Args: delay (float): Maximal time difference (in seconds) between successive values before debouncing kicks in. on_first:

  • True: First value is send immediately and following values in the rapid succession are dropped::

    source: -abcd----efg- output: -a-------e---

  • False: Last value of a rapid succession is send after the delay and the values before that are dropped::

    source: -abcd----efg-- output: ----d------g-

copy

def copy() -> Copy

Create a shallow copy of the source values.

deepcopy

def deepcopy() -> Deepcopy

Create a deep copy of the source values.

sample

def sample(timer: Event) -> Sample

At the times that the timer emits, sample the value from this event and emit the sample.

Args: timer: Event used to time the samples.

errors

def errors() -> Errors

Emit errors from the source.

end_on_error

def end_on_error() -> EndOnError

End on any error from the source.