diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index ed2b48e..c88d2e5 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,10 @@ +### 4.8.0 + +* Added `AsyncSeq.mapFoldAsync` — maps each element using an asynchronous folder that also threads an accumulator state, returning both the array of results and the final state; mirrors `Seq.mapFold`. +* Added `AsyncSeq.mapFold` — synchronous variant of `AsyncSeq.mapFoldAsync`, mirroring `Seq.mapFold`. +* Added `AsyncSeq.allPairs` — returns an async sequence of all pairs from two input sequences (cartesian product); the second source is fully buffered before iteration, mirroring `Seq.allPairs`. +* Added `AsyncSeq.rev` — returns a new async sequence with all elements in reverse order; the entire source sequence is buffered before yielding, mirroring `Seq.rev`. + ### 4.7.0 * Added `AsyncSeq.splitAt` — splits a sequence at the given index, returning the first `count` elements as an array and the remaining elements as a new `AsyncSeq`. Mirrors `Seq.splitAt`. The source is enumerated once. diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs index efc25c4..bf536f4 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs @@ -1295,6 +1295,23 @@ module AsyncSeq = let reduce (f: 'T -> 'T -> 'T) (source: AsyncSeq<'T>) : Async<'T> = reduceAsync (fun a b -> f a b |> async.Return) source + let mapFoldAsync (folder: 'State -> 'T -> Async<'Result * 'State>) (state: 'State) (source: AsyncSeq<'T>) : Async<'Result array * 'State> = async { + let results = ResizeArray<'Result>() + let mutable st = state + use ie = source.GetEnumerator() + let! move = ie.MoveNext() + let b = ref move + while b.Value.IsSome do + let! (r, st') = folder st b.Value.Value + results.Add(r) + st <- st' + let! next = ie.MoveNext() + b := next + return (results.ToArray(), st) } + + let mapFold (folder: 'State -> 'T -> 'Result * 'State) (state: 'State) (source: AsyncSeq<'T>) : Async<'Result array * 'State> = + mapFoldAsync (fun st x -> folder st x |> async.Return) state source + let length (source : AsyncSeq<'T>) = fold (fun st _ -> st + 1L) 0L source @@ -1702,6 +1719,25 @@ module AsyncSeq = let zipWith3 (f:'T1 -> 'T2 -> 'T3 -> 'U) (source1:AsyncSeq<'T1>) (source2:AsyncSeq<'T2>) (source3:AsyncSeq<'T3>) : AsyncSeq<'U> = zipWithAsync3 (fun a b c -> f a b c |> async.Return) source1 source2 source3 + let allPairs (source1: AsyncSeq<'T1>) (source2: AsyncSeq<'T2>) : AsyncSeq<'T1 * 'T2> = asyncSeq { + let buf = System.Collections.Generic.List<'T2>() + use ie2 = source2.GetEnumerator() + let! move2 = ie2.MoveNext() + let b2 = ref move2 + while b2.Value.IsSome do + buf.Add(b2.Value.Value) + let! next2 = ie2.MoveNext() + b2 := next2 + use ie1 = source1.GetEnumerator() + let! move1 = ie1.MoveNext() + let b1 = ref move1 + while b1.Value.IsSome do + let x = b1.Value.Value + for y in buf do + yield (x, y) + let! next1 = ie1.MoveNext() + b1 := next1 } + let zappAsync (fs:AsyncSeq<'T -> Async<'U>>) (s:AsyncSeq<'T>) : AsyncSeq<'U> = zipWithAsync (|>) s fs @@ -1994,6 +2030,11 @@ module AsyncSeq = let sortWith (comparer:'T -> 'T -> int) (source:AsyncSeq<'T>) : array<'T> = toSortedSeq (Array.sortWith comparer) source + + let rev (source: AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq { + let! arr = toArrayAsync source + for i in arr.Length - 1 .. -1 .. 0 do + yield arr.[i] } #endif #if !FABLE_COMPILER diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi index 17841a4..5113dd6 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi @@ -258,6 +258,16 @@ module AsyncSeq = /// specified 'reduction' function. Raises InvalidOperationException if the sequence is empty. val reduce : reduction:('T -> 'T -> 'T) -> source:AsyncSeq<'T> -> Async<'T> + /// Asynchronously maps each element of the async sequence with an asynchronous folder function that + /// also threads an accumulator state through the computation. Returns the array of results and the + /// final state, mirroring Seq.mapFold. + val mapFoldAsync : folder:('State -> 'T -> Async<'Result * 'State>) -> state:'State -> source:AsyncSeq<'T> -> Async<'Result array * 'State> + + /// Maps each element of the async sequence with a folder function that also threads an accumulator + /// state through the computation. Returns the array of results and the final state, + /// mirroring Seq.mapFold. + val mapFold : folder:('State -> 'T -> 'Result * 'State) -> state:'State -> source:AsyncSeq<'T> -> Async<'Result array * 'State> + /// Asynchronously sum the elements of the input asynchronous sequence using the specified function. val inline sum : source:AsyncSeq< ^T > -> Async< ^T> when ^T : (static member ( + ) : ^T * ^T -> ^T) @@ -496,6 +506,10 @@ module AsyncSeq = /// The resulting sequence stops when any of the argument sequences stop. val zipWith3 : mapping:('T1 -> 'T2 -> 'T3 -> 'U) -> source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> source3:AsyncSeq<'T3> -> AsyncSeq<'U> + /// Returns an async sequence of all pairs of elements from the two input sequences. + /// The second sequence is fully buffered before iteration begins, mirroring Seq.allPairs. + val allPairs : source1:AsyncSeq<'T1> -> source2:AsyncSeq<'T2> -> AsyncSeq<'T1 * 'T2> + /// Builds a new asynchronous sequence whose elements are generated by /// applying the specified function to all elements of the input sequence. /// @@ -654,6 +668,11 @@ module AsyncSeq = /// that sequence is iterated. As a result this function should not be used with /// large or infinite sequences. val sortWith : comparer:('T -> 'T -> int) -> source:AsyncSeq<'T> -> array<'T> + + /// Returns a new async sequence with the elements in reverse order. The entire source + /// sequence is buffered before yielding any elements, mirroring Seq.rev. + /// This function should not be used with large or infinite sequences. + val rev : source:AsyncSeq<'T> -> AsyncSeq<'T> #endif /// Interleaves two async sequences of the same type into a resulting sequence. The provided diff --git a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs index f8c956d..17277e6 100644 --- a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs +++ b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs @@ -3409,6 +3409,84 @@ let ``AsyncSeq.sortWith returns empty array for empty sequence`` () = let result = AsyncSeq.sortWith compare AsyncSeq.empty Assert.AreEqual([||], result) +// ── AsyncSeq.mapFold ────────────────────────────────────────────────────────── + +[] +let ``AsyncSeq.mapFold maps elements and accumulates state`` () = + let source = asyncSeq { yield 1; yield 2; yield 3 } + let results, finalState = + AsyncSeq.mapFold (fun acc x -> (x * 2, acc + x)) 0 source |> Async.RunSynchronously + Assert.AreEqual([| 2; 4; 6 |], results) + Assert.AreEqual(6, finalState) + +[] +let ``AsyncSeq.mapFold returns empty array and initial state for empty sequence`` () = + let results, finalState = + AsyncSeq.mapFold (fun acc x -> (x, acc + x)) 99 AsyncSeq.empty |> Async.RunSynchronously + Assert.AreEqual([||], results) + Assert.AreEqual(99, finalState) + +[] +let ``AsyncSeq.mapFoldAsync maps elements asynchronously and accumulates state`` () = + let source = asyncSeq { yield 10; yield 20; yield 30 } + let results, finalState = + AsyncSeq.mapFoldAsync (fun acc x -> async { return (x + 1, acc + x) }) 0 source + |> Async.RunSynchronously + Assert.AreEqual([| 11; 21; 31 |], results) + Assert.AreEqual(60, finalState) + +[] +let ``AsyncSeq.mapFoldAsync returns empty array and initial state for empty sequence`` () = + let results, finalState = + AsyncSeq.mapFoldAsync (fun acc x -> async { return (x, acc + 1) }) 5 AsyncSeq.empty + |> Async.RunSynchronously + Assert.AreEqual([||], results) + Assert.AreEqual(5, finalState) + +// ── AsyncSeq.allPairs ──────────────────────────────────────────────────────── + +[] +let ``AsyncSeq.allPairs returns cartesian product`` () = + let s1 = asyncSeq { yield 1; yield 2 } + let s2 = asyncSeq { yield 'a'; yield 'b'; yield 'c' } + let result = + AsyncSeq.allPairs s1 s2 |> AsyncSeq.toArrayAsync |> Async.RunSynchronously + Assert.AreEqual( + [| (1,'a'); (1,'b'); (1,'c'); (2,'a'); (2,'b'); (2,'c') |], + result) + +[] +let ``AsyncSeq.allPairs returns empty when first source is empty`` () = + let result = + AsyncSeq.allPairs AsyncSeq.empty (asyncSeq { yield 1; yield 2 }) + |> AsyncSeq.toArrayAsync |> Async.RunSynchronously + Assert.AreEqual([||], result) + +[] +let ``AsyncSeq.allPairs returns empty when second source is empty`` () = + let result = + AsyncSeq.allPairs (asyncSeq { yield 1; yield 2 }) AsyncSeq.empty + |> AsyncSeq.toArrayAsync |> Async.RunSynchronously + Assert.AreEqual([||], result) + +// ── AsyncSeq.rev ───────────────────────────────────────────────────────────── + +[] +let ``AsyncSeq.rev reverses a sequence`` () = + let source = asyncSeq { yield 1; yield 2; yield 3; yield 4; yield 5 } + let result = AsyncSeq.rev source |> AsyncSeq.toArrayAsync |> Async.RunSynchronously + Assert.AreEqual([| 5; 4; 3; 2; 1 |], result) + +[] +let ``AsyncSeq.rev returns empty sequence for empty input`` () = + let result = AsyncSeq.rev AsyncSeq.empty |> AsyncSeq.toArrayAsync |> Async.RunSynchronously + Assert.AreEqual([||], result) + +[] +let ``AsyncSeq.rev returns singleton for single element`` () = + let result = AsyncSeq.rev (asyncSeq { yield 42 }) |> AsyncSeq.toArrayAsync |> Async.RunSynchronously + Assert.AreEqual([| 42 |], result) + [] let ``AsyncSeq.splitAt splits a sequence at the given index`` () = let source = asyncSeq { yield 1; yield 2; yield 3; yield 4; yield 5 }