Skip to content

Commit

Permalink
amb typing to match amb_ (add Future)
Browse files Browse the repository at this point in the history
Curry flip amb
Typing on fromfuture tests
Doc: Remove 3rd observable in amb since it takes only 2
Doc: show that completion follows winner
  • Loading branch information
mat committed Feb 7, 2023
1 parent fe265ef commit 4a487e3
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 88 deletions.
3 changes: 1 addition & 2 deletions reactivex/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,8 @@ def amb(right_source: Observable[_T]) -> Callable[[Observable[_T]], Observable[_
.. marble::
:alt: amb
---8--6--9-----------|
---8--6--9---------|
--1--2--3---5--------|
----------10-20-30---|
[ amb() ]
--1--2--3---5--------|
Expand Down
147 changes: 73 additions & 74 deletions reactivex/operators/_amb.py
Original file line number Diff line number Diff line change
@@ -1,92 +1,91 @@
from asyncio import Future
from typing import Callable, List, Optional, TypeVar, Union
from typing import List, Optional, TypeVar, Union

from reactivex import Observable, abc, from_future
from reactivex.curry import curry_flip
from reactivex.disposable import CompositeDisposable, SingleAssignmentDisposable

_T = TypeVar("_T")


@curry_flip(1)
def amb_(
right_source: Union[Observable[_T], "Future[_T]"]
) -> Callable[[Observable[_T]], Observable[_T]]:
left_source: Observable[_T], right_source: Union[Observable[_T], "Future[_T]"]
) -> Observable[_T]:

if isinstance(right_source, Future):
obs: Observable[_T] = from_future(right_source)
else:
obs = right_source

def amb(left_source: Observable[_T]) -> Observable[_T]:
def subscribe(
observer: abc.ObserverBase[_T],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
choice: List[Optional[str]] = [None]
left_choice = "L"
right_choice = "R"
left_subscription = SingleAssignmentDisposable()
right_subscription = SingleAssignmentDisposable()

def choice_left():
if not choice[0]:
choice[0] = left_choice
right_subscription.dispose()

def choice_right():
if not choice[0]:
choice[0] = right_choice
left_subscription.dispose()

def on_next_left(value: _T) -> None:
with left_source.lock:
choice_left()
if choice[0] == left_choice:
observer.on_next(value)

def on_error_left(err: Exception) -> None:
with left_source.lock:
choice_left()
if choice[0] == left_choice:
observer.on_error(err)

def on_completed_left() -> None:
with left_source.lock:
choice_left()
if choice[0] == left_choice:
observer.on_completed()

left_d = left_source.subscribe(
on_next_left, on_error_left, on_completed_left, scheduler=scheduler
)
left_subscription.disposable = left_d

def send_right(value: _T) -> None:
with left_source.lock:
choice_right()
if choice[0] == right_choice:
observer.on_next(value)

def on_error_right(err: Exception) -> None:
with left_source.lock:
choice_right()
if choice[0] == right_choice:
observer.on_error(err)

def on_completed_right() -> None:
with left_source.lock:
choice_right()
if choice[0] == right_choice:
observer.on_completed()

right_d = obs.subscribe(
send_right, on_error_right, on_completed_right, scheduler=scheduler
)
right_subscription.disposable = right_d
return CompositeDisposable(left_subscription, right_subscription)

return Observable(subscribe)

return amb
def subscribe(
observer: abc.ObserverBase[_T],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
choice: List[Optional[str]] = [None]
left_choice = "L"
right_choice = "R"
left_subscription = SingleAssignmentDisposable()
right_subscription = SingleAssignmentDisposable()

def choice_left():
if not choice[0]:
choice[0] = left_choice
right_subscription.dispose()

def choice_right():
if not choice[0]:
choice[0] = right_choice
left_subscription.dispose()

def on_next_left(value: _T) -> None:
with left_source.lock:
choice_left()
if choice[0] == left_choice:
observer.on_next(value)

def on_error_left(err: Exception) -> None:
with left_source.lock:
choice_left()
if choice[0] == left_choice:
observer.on_error(err)

def on_completed_left() -> None:
with left_source.lock:
choice_left()
if choice[0] == left_choice:
observer.on_completed()

left_d = left_source.subscribe(
on_next_left, on_error_left, on_completed_left, scheduler=scheduler
)
left_subscription.disposable = left_d

def send_right(value: _T) -> None:
with left_source.lock:
choice_right()
if choice[0] == right_choice:
observer.on_next(value)

def on_error_right(err: Exception) -> None:
with left_source.lock:
choice_right()
if choice[0] == right_choice:
observer.on_error(err)

def on_completed_right() -> None:
with left_source.lock:
choice_right()
if choice[0] == right_choice:
observer.on_completed()

right_d = obs.subscribe(
send_right, on_error_right, on_completed_right, scheduler=scheduler
)
right_subscription.disposable = right_d
return CompositeDisposable(left_subscription, right_subscription)

return Observable(subscribe)


__all__ = ["amb_"]
25 changes: 13 additions & 12 deletions tests/test_observable/test_fromfuture.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import unittest
from asyncio import Future
from typing import Any

import reactivex

Expand All @@ -11,15 +12,15 @@ def test_future_success(self):
success = [False, True, False]

async def go():
future = Future()
future: Future[int] = Future()
future.set_result(42)

source = reactivex.from_future(future)

def on_next(x):
def on_next(x: int):
success[0] = x == 42

def on_error(err):
def on_error(_err: Exception):
success[1] = False

def on_completed():
Expand All @@ -37,15 +38,15 @@ def test_future_failure(self):
async def go():
error = Exception("woops")

future = Future()
future: Future[Any] = Future()
future.set_exception(error)

source = reactivex.from_future(future)

def on_next(x):
def on_next(x: Any):
success[0] = False

def on_error(err):
def on_error(err: Exception):
success[1] = str(err) == str(error)

def on_completed():
Expand All @@ -61,13 +62,13 @@ def test_future_cancel(self):
success = [True, False, True]

async def go():
future = Future()
future: Future[Any] = Future()
source = reactivex.from_future(future)

def on_next(x):
def on_next(x: Any):
success[0] = False

def on_error(err):
def on_error(err: Any):
success[1] = type(err) == asyncio.CancelledError

def on_completed():
Expand All @@ -84,15 +85,15 @@ def test_future_dispose(self):
success = [True, True, True]

async def go():
future = Future()
future: Future[int] = Future()
future.set_result(42)

source = reactivex.from_future(future)

def on_next(x):
def on_next(x: int):
success[0] = False

def on_error(err):
def on_error(err: Exception):
success[1] = False

def on_completed():
Expand Down

0 comments on commit 4a487e3

Please sign in to comment.