Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
### 4.7.0

* Added `AsyncSeq.splitAt` — splits a sequence at the given index, returning the first `count` elements as an array and the remaining elements as a new `AsyncSeq`. Mirrors `Seq.splitAt`. The source is enumerated once.
* Added `AsyncSeq.removeAt` — returns a new sequence with the element at the specified index removed, mirroring `Seq.removeAt`.
* Added `AsyncSeq.updateAt` — returns a new sequence with the element at the specified index replaced by a given value, mirroring `Seq.updateAt`.
* Added `AsyncSeq.insertAt` — returns a new sequence with a value inserted before the element at the specified index (or appended if the index equals the sequence length), mirroring `Seq.insertAt`.
Expand Down
29 changes: 29 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1841,6 +1841,35 @@

let tail (source : AsyncSeq<'T>) : AsyncSeq<'T> = skip 1 source

/// Splits an async sequence at the given index, returning the first `count` elements as an array
/// and the remaining elements as a new AsyncSeq. The source is enumerated once.
let splitAt (count: int) (source: AsyncSeq<'T>) : Async<'T array * AsyncSeq<'T>> = async {
if count < 0 then invalidArg "count" "must be non-negative"
let ie = source.GetEnumerator()
let ra = ResizeArray<'T>()
let! m = ie.MoveNext()
let b = ref m
while b.Value.IsSome && ra.Count < count do
ra.Add b.Value.Value
let! next = ie.MoveNext()
b := next
let first = ra.ToArray()
let rest =
if b.Value.IsNone then
ie.Dispose()
empty<'T>
else
let cur = ref b.Value
asyncSeq {
try
while cur.Value.IsSome do
yield cur.Value.Value
let! next = ie.MoveNext()
cur := next
finally
ie.Dispose() }
return first, rest }

let toArrayAsync (source : AsyncSeq<'T>) : Async<'T[]> = async {
let ra = (new ResizeArray<_>())
use ie = source.GetEnumerator()
Expand Down Expand Up @@ -2327,7 +2356,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2359 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2359 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
5 changes: 5 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,11 @@ module AsyncSeq =
/// Returns an empty sequence if the source is empty.
val tail : source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Splits an async sequence at the given index. Returns an async computation that yields
/// the first `count` elements as an array and the remaining elements as a new AsyncSeq.
/// The source is enumerated once; the returned AsyncSeq lazily produces the remainder.
val splitAt : count:int -> source:AsyncSeq<'T> -> Async<'T array * AsyncSeq<'T>>

/// Creates an async computation which iterates the AsyncSeq and collects the output into an array.
val toArrayAsync : source:AsyncSeq<'T> -> Async<'T []>

Expand Down
44 changes: 44 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2000,7 +2000,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 2003 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -2009,7 +2009,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 2012 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down Expand Up @@ -3409,6 +3409,50 @@
let result = AsyncSeq.sortWith compare AsyncSeq.empty<int>
Assert.AreEqual([||], result)

[<Test>]
let ``AsyncSeq.splitAt splits a sequence at the given index`` () =
let source = asyncSeq { yield 1; yield 2; yield 3; yield 4; yield 5 }
let first, rest = AsyncSeq.splitAt 3 source |> Async.RunSynchronously
let restArr = rest |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 3 |], first)
Assert.AreEqual([| 4; 5 |], restArr)

[<Test>]
let ``AsyncSeq.splitAt with count=0 returns empty array and full rest`` () =
let source = asyncSeq { yield 10; yield 20 }
let first, rest = AsyncSeq.splitAt 0 source |> Async.RunSynchronously
let restArr = rest |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual([||], first)
Assert.AreEqual([| 10; 20 |], restArr)

[<Test>]
let ``AsyncSeq.splitAt with count >= length returns all elements in first and empty rest`` () =
let source = asyncSeq { yield 1; yield 2; yield 3 }
let first, rest = AsyncSeq.splitAt 10 source |> Async.RunSynchronously
let restArr = rest |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 3 |], first)
Assert.AreEqual([||], restArr)

[<Test>]
let ``AsyncSeq.splitAt on empty sequence returns empty first and empty rest`` () =
let first, rest = AsyncSeq.splitAt 3 AsyncSeq.empty<int> |> Async.RunSynchronously
let restArr = rest |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual([||], first)
Assert.AreEqual([||], restArr)

[<Test>]
let ``AsyncSeq.splitAt with count equal to length returns all in first and empty rest`` () =
let source = asyncSeq { yield 7; yield 8; yield 9 }
let first, rest = AsyncSeq.splitAt 3 source |> Async.RunSynchronously
let restArr = rest |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual([| 7; 8; 9 |], first)
Assert.AreEqual([||], restArr)

[<Test>]
let ``AsyncSeq.splitAt with negative count throws ArgumentException`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.splitAt -1 AsyncSeq.empty<int> |> Async.RunSynchronously |> ignore) |> ignore

// ===== removeAt =====

[<Test>]
Expand Down