Skip to content

Commit

Permalink
fix/maxsize (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgarciae authored Oct 11, 2020
1 parent 5909376 commit a2451ff
Show file tree
Hide file tree
Showing 38 changed files with 171 additions and 63 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## [0.4.6] - 2020-10-11
* Introduces the `maxsize` as an argument to `to_stage` and `to_iterable`.
* `ordered` now takes an optinal `maxsize` parameter.

## [0.4.5] - 2020-10-04
* Fixed `pl.task.from_iterable` to solve #56
* `pl.*.ordered` implementations now based on `bisect.insort`.
Expand Down
15 changes: 15 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,20 @@ data = (
)
```

## Run Tests
A sample script is provided to run the tests in a container (either Docker or Podman is supported), to run tests:

```bash
$ bash scripts/run-tests.sh
```

This script can also receive a python version to check test against, i.e

```bash
$ bash scripts/run-tests.sh 3.7
```


## Related Stuff
* [Making an Unlimited Number of Requests with Python aiohttp + pypeln](https://medium.com/@cgarciae/making-an-infinite-number-of-requests-with-python-aiohttp-pypeln-3a552b97dc95)
* [Process Pools](https://docs.python.org/3.4/library/multiprocessing.html?highlight=process#module-multiprocessing.pool)
Expand All @@ -177,6 +191,7 @@ data = (

## Contributors
* [cgarciae](https://github.com/cgarciae)
* [Davidnet](https://github.com/Davidnet)

## License
MIT
51 changes: 49 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pypeln/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
from .utils import BaseStage


__version__ = "0.4.5"
__version__ = "0.4.6"
2 changes: 1 addition & 1 deletion pypeln/process/api/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def concat(
A stage object.
"""

dependencies = [to_stage(stage) for stage in stages]
dependencies = [to_stage(stage, maxsize=maxsize) for stage in stages]

return Stage(
process_fn=Concat(),
Expand Down
2 changes: 1 addition & 1 deletion pypeln/process/api/each.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def process_image(image_path):
)
)

stage = to_stage(stage)
stage = to_stage(stage, maxsize=maxsize)

stage = Stage(
process_fn=Each(f),
Expand Down
2 changes: 1 addition & 1 deletion pypeln/process/api/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def slow_gt3(x):
)
)

stage = to_stage(stage)
stage = to_stage(stage, maxsize=maxsize)

return Stage(
process_fn=Filter(f),
Expand Down
2 changes: 1 addition & 1 deletion pypeln/process/api/flat_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def slow_integer_pair(x):
)
)

stage = to_stage(stage)
stage = to_stage(stage, maxsize=maxsize)

return Stage(
process_fn=FlatMap(f),
Expand Down
16 changes: 11 additions & 5 deletions pypeln/process/api/from_iterable.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@

class FromIterable(tp.NamedTuple):
iterable: tp.Iterable
maxsize: int

def __call__(self, worker: Worker, **kwargs):

iterable = self.iterable

if isinstance(iterable, pypeln_utils.BaseStage):

for x in iterable.to_iterable(maxsize=0, return_index=True):
for x in iterable.to_iterable(maxsize=self.maxsize, return_index=True):
worker.stage_params.output_queues.put(x)
else:
for i, x in enumerate(iterable):
Expand All @@ -30,18 +31,23 @@ def __call__(self, worker: Worker, **kwargs):


@tp.overload
def from_iterable(iterable: tp.Iterable[T], use_thread: bool = True) -> Stage[T]:
def from_iterable(
iterable: tp.Iterable[T], use_thread: bool = True, maxsize: int = 0
) -> Stage[T]:
...


@tp.overload
def from_iterable(use_thread: bool = True) -> pypeln_utils.Partial[Stage[T]]:
def from_iterable(
use_thread: bool = True, maxsize: int = 0
) -> pypeln_utils.Partial[Stage[T]]:
...


def from_iterable(
iterable: tp.Union[tp.Iterable[T], pypeln_utils.Undefined] = pypeln_utils.UNDEFINED,
use_thread: bool = True,
maxsize: int = 0,
) -> tp.Union[Stage[T], pypeln_utils.Partial[Stage[T]]]:
"""
Creates a stage from an iterable.
Expand All @@ -60,9 +66,9 @@ def from_iterable(
)

return Stage(
process_fn=FromIterable(iterable),
process_fn=FromIterable(iterable, maxsize=maxsize),
workers=1,
maxsize=0,
maxsize=maxsize,
timeout=0,
total_sources=1,
dependencies=[],
Expand Down
2 changes: 1 addition & 1 deletion pypeln/process/api/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def slow_add1(x):
)
)

stage = to_stage(stage)
stage = to_stage(stage, maxsize=maxsize)

return Stage(
process_fn=Map(f),
Expand Down
10 changes: 6 additions & 4 deletions pypeln/process/api/ordered.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,20 @@ def __call__(self, worker: Worker, **kwargs):


@tp.overload
def ordered(stage: Stage[A]) -> Stage[A]:
def ordered(stage: Stage[A], maxsize: int = 0,) -> Stage[A]:
...


@tp.overload
def ordered() -> pypeln_utils.Partial[Stage[A]]:
def ordered(maxsize: int = 0) -> pypeln_utils.Partial[Stage[A]]:
...


def ordered(
stage: tp.Union[
Stage[A], tp.Iterable[A], pypeln_utils.Undefined
] = pypeln_utils.UNDEFINED,
maxsize: int = 0,
) -> tp.Union[Stage[A], pypeln_utils.Partial[Stage[A]]]:
"""
Creates a stage that sorts its elements based on their order of creation on the source iterable(s) of the pipeline.
Expand Down Expand Up @@ -64,6 +65,7 @@ def slow_squared(x):
Arguments:
stage: A Stage or Iterable.
maxsize: The maximum number of objects the stage can hold simultaneously, if set to `0` (default) then the stage can grow unbounded.
Returns:
If the `stage` parameters is given then this function returns an iterable, else it returns a `Partial`.
Expand All @@ -72,12 +74,12 @@ def slow_squared(x):
if isinstance(stage, pypeln_utils.Undefined):
return pypeln_utils.Partial(lambda stage: ordered(stage))

stage = to_stage(stage)
stage = to_stage(stage, maxsize=maxsize)

return Stage(
process_fn=Ordered(),
workers=1,
maxsize=0,
maxsize=maxsize,
timeout=0,
total_sources=stage.workers,
dependencies=[stage],
Expand Down
4 changes: 2 additions & 2 deletions pypeln/process/api/to_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from .from_iterable import from_iterable


def to_stage(obj: tp.Union[Stage[A], tp.Iterable[A]]) -> Stage[A]:
def to_stage(obj: tp.Union[Stage[A], tp.Iterable[A]], maxsize: int) -> Stage[A]:

if isinstance(obj, Stage):
return obj
else:
return from_iterable(obj)
return from_iterable(obj, maxsize=maxsize)
2 changes: 1 addition & 1 deletion pypeln/sync/api/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def concat(
A stage object.
"""

dependencies = [to_stage(stage) for stage in stages]
dependencies = [to_stage(stage, maxsize=maxsize) for stage in stages]

return Stage(
process_fn=Concat(),
Expand Down
2 changes: 1 addition & 1 deletion pypeln/sync/api/each.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def process_image(image_path):
)
)

stage_ = to_stage(stage)
stage_ = to_stage(stage, maxsize=maxsize)

stage_ = Stage(
process_fn=Each(f),
Expand Down
2 changes: 1 addition & 1 deletion pypeln/sync/api/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def slow_gt3(x):
)
)

stage_ = to_stage(stage)
stage_ = to_stage(stage, maxsize=maxsize)

return Stage(
process_fn=Filter(f),
Expand Down
2 changes: 1 addition & 1 deletion pypeln/sync/api/flat_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def slow_integer_pair(x):
)
)

stage_ = to_stage(stage)
stage_ = to_stage(stage, maxsize=maxsize)

return Stage(
process_fn=FlatMap(f),
Expand Down
16 changes: 12 additions & 4 deletions pypeln/sync/api/from_iterable.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
@dataclass
class FromIterable(ProcessFn):
iterable: tp.Iterable
maxsize: int

def __call__(self, worker: Stage, **kwargs) -> tp.Iterable:
if isinstance(self.iterable, pypeln_utils.BaseStage):
yield from self.iterable.to_iterable(maxsize=0, return_index=True)
yield from self.iterable.to_iterable(
maxsize=self.maxsize, return_index=True
)
else:
for i, x in enumerate(self.iterable):
if isinstance(x, pypeln_utils.Element):
Expand All @@ -23,18 +26,23 @@ def __call__(self, worker: Stage, **kwargs) -> tp.Iterable:


@tp.overload
def from_iterable(iterable: tp.Iterable[T], use_thread: bool = True) -> Stage[T]:
def from_iterable(
iterable: tp.Iterable[T], use_thread: bool = True, maxsize: int = 0
) -> Stage[T]:
...


@tp.overload
def from_iterable(use_thread: bool = True) -> pypeln_utils.Partial[Stage[T]]:
def from_iterable(
use_thread: bool = True, maxsize: int = 0
) -> pypeln_utils.Partial[Stage[T]]:
...


def from_iterable(
iterable: tp.Union[tp.Iterable[T], pypeln_utils.Undefined] = pypeln_utils.UNDEFINED,
use_thread: bool = True,
maxsize: int = 0,
) -> tp.Union[Stage[T], pypeln_utils.Partial[Stage[T]]]:
"""
Creates a stage from an iterable.
Expand All @@ -51,7 +59,7 @@ def from_iterable(
return pypeln_utils.Partial(lambda iterable: from_iterable(iterable))

return Stage(
process_fn=FromIterable(iterable),
process_fn=FromIterable(iterable, maxsize=maxsize),
timeout=0,
dependencies=[],
on_start=None,
Expand Down
2 changes: 1 addition & 1 deletion pypeln/sync/api/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def slow_add1(x):
)
)

stage_ = to_stage(stage)
stage_ = to_stage(stage, maxsize=maxsize)

return Stage(
process_fn=Map(f),
Expand Down
Loading

0 comments on commit a2451ff

Please sign in to comment.