Skip to content

Parallel streams with buffers #475

@jlanglois-jam

Description

@jlanglois-jam

In a simple use case like downloading files and process them on a single machine, how could one achieve parallelization of downloads and processes with buffers?

Example:

import time
from streamz import Stream
from tornado.ioloop import IOLoop


def download_file(file_id: int):
    time.sleep(1)
    print(f"Downloaded file: {file_id}")
    return file_id


def process_file(file_id: int):
    time.sleep(2)
    print(f"Processed file : {file_id}")
    return file_id


async def streamz_run():
    s = Stream(asynchronous=True)
    s.map(download_file).buffer(4).sink(process_file)
    for i in range(10):
        await s.emit(i)


if __name__ == '__main__':
    start = time.time()
    IOLoop().run_sync(streamz_run)
    print(f"Streamz run took: {time.time() - start}s")

The download_file is properly buffered but not executed at the same time as process_file. The whole thing takes ~30s to run while we could expect 21s with parallel downloads/processes. Is using Dask the intended way in that case?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions