diff --git a/packages/utilities/src/streams_utilities.ts b/packages/utilities/src/streams_utilities.ts index 41140a08..a4fdcb69 100644 --- a/packages/utilities/src/streams_utilities.ts +++ b/packages/utilities/src/streams_utilities.ts @@ -26,3 +26,35 @@ export async function readStreamToString(stream: Readable | PassThrough, encodin const buffer = await concatStreamToBuffer(stream); return buffer.toString(encoding); } + +function isAsyncIterable(value: unknown): value is AsyncIterable { + return typeof value === 'object' && !!value && Symbol.asyncIterator in value; +} + +async function asyncIterableToArray(iterable: AsyncIterable): Promise { + const out: T[] = []; + for await (const item of iterable) { + out.push(item); + } + + return out; +} + +export function iterableToArray(iterable: AsyncIterable): Promise; +export function iterableToArray(iterable: Iterable): T[]; + +/** + * Collect items from an iterable object or an async iterable into an array. + */ +export function iterableToArray(iterable: AsyncIterable | Iterable) { + if (isAsyncIterable(iterable)) { + return asyncIterableToArray(iterable); + } + + const out: T[] = []; + for (const item of iterable) { + out.push(item); + } + + return out; +} diff --git a/test/streams_utilities.test.ts b/test/streams_utilities.test.ts new file mode 100644 index 00000000..0942aac1 --- /dev/null +++ b/test/streams_utilities.test.ts @@ -0,0 +1,32 @@ +import { describe, expect, it } from 'vitest'; + +import { iterableToArray } from '@apify/utilities'; + +describe('iterableToArray', () => { + it('collects a synchronous iterable into an array', () => { + const result = iterableToArray(new Set([1, 2, 3])); + + expect(result).toEqual([1, 2, 3]); + }); + + it('collects an async iterable into an array', async () => { + async function* gen() { + yield 'a'; + yield 'b'; + yield 'c'; + } + + const result = await iterableToArray(gen()); + + expect(result).toEqual(['a', 'b', 'c']); + }); + + it('propagates errors thrown by an async iterable', async () => { + async function* gen() { + yield 1; + throw new Error('boom'); + } + + await expect(iterableToArray(gen())).rejects.toThrow('boom'); + }); +});