Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
### 4.7.0

* Added `AsyncSeq.removeAt` β€” returns a new sequence with the element at the specified index removed, mirroring `Seq.removeAt`.
* Added `AsyncSeq.updateAt` β€” returns a new sequence with the element at the specified index replaced by a given value, mirroring `Seq.updateAt`.
* Added `AsyncSeq.insertAt` β€” returns a new sequence with a value inserted before the element at the specified index (or appended if the index equals the sequence length), mirroring `Seq.insertAt`.

### 4.6.0

* Added `AsyncSeq.isEmpty` β€” returns `true` if the sequence contains no elements; short-circuits after the first element, mirroring `Seq.isEmpty`.
Expand Down
26 changes: 26 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1432,6 +1432,32 @@
let s = System.Collections.Generic.HashSet(excluded)
source |> filter (fun x -> not (s.Contains(x)))

let removeAt (index : int) (source : AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
if index < 0 then invalidArg "index" "must be non-negative"
let i = ref 0
for x in source do
if i.Value <> index then yield x
i := i.Value + 1 }

let updateAt (index : int) (value : 'T) (source : AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
if index < 0 then invalidArg "index" "must be non-negative"
let i = ref 0
for x in source do
if i.Value = index then yield value
else yield x
i := i.Value + 1 }

let insertAt (index : int) (value : 'T) (source : AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
if index < 0 then invalidArg "index" "must be non-negative"
let i = ref 0
for x in source do
if i.Value = index then yield value
yield x
i := i.Value + 1
if i.Value = index then yield value
elif i.Value < index then
invalidArg "index" "The index is outside the range of elements in the collection." }

#if !FABLE_COMPILER
let iterAsyncParallel (f:'a -> Async<unit>) (s:AsyncSeq<'a>) : Async<unit> = async {
use mb = MailboxProcessor.Start (ignore >> async.Return)
Expand Down Expand Up @@ -2301,7 +2327,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2330 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2330 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
13 changes: 13 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,19 @@ module AsyncSeq =
/// in the given excluded collection. Uses a HashSet for O(1) lookup. Mirrors Seq.except.
val except : excluded:seq<'T> -> source:AsyncSeq<'T> -> AsyncSeq<'T> when 'T : equality

/// Returns a new asynchronous sequence with the element at the specified index removed.
/// Raises ArgumentException if index is negative. Mirrors Seq.removeAt.
val removeAt : index:int -> source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Returns a new asynchronous sequence with the element at the specified index replaced by the given value.
/// Raises ArgumentException if index is negative. Mirrors Seq.updateAt.
val updateAt : index:int -> value:'T -> source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Returns a new asynchronous sequence with the given value inserted before the element at the specified index.
/// An index equal to the length of the sequence appends the value at the end.
/// Raises ArgumentException if index is negative or greater than the sequence length. Mirrors Seq.insertAt.
val insertAt : index:int -> value:'T -> source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Creates an asynchronous sequence that lazily takes element from an
/// input synchronous sequence and returns them one-by-one.
val ofSeq : source:seq<'T> -> AsyncSeq<'T>
Expand Down
132 changes: 132 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2000,7 +2000,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 2003 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -2009,7 +2009,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 2012 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down Expand Up @@ -3408,3 +3408,135 @@
let ``AsyncSeq.sortWith returns empty array for empty sequence`` () =
let result = AsyncSeq.sortWith compare AsyncSeq.empty<int>
Assert.AreEqual([||], result)

// ===== removeAt =====

[<Test>]
let ``AsyncSeq.removeAt removes the element at the specified index`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3; 4; 5 ]
|> AsyncSeq.removeAt 2
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 4; 5 |], result)

[<Test>]
let ``AsyncSeq.removeAt removes the first element (index 0)`` () =
let result =
AsyncSeq.ofSeq [ 10; 20; 30 ]
|> AsyncSeq.removeAt 0
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 20; 30 |], result)

[<Test>]
let ``AsyncSeq.removeAt removes the last element`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.removeAt 2
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 1; 2 |], result)

[<Test>]
let ``AsyncSeq.removeAt raises ArgumentException for negative index`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.removeAt -1
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

// ===== updateAt =====

[<Test>]
let ``AsyncSeq.updateAt replaces element at specified index`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3; 4 ]
|> AsyncSeq.updateAt 1 99
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 1; 99; 3; 4 |], result)

[<Test>]
let ``AsyncSeq.updateAt replaces first element`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.updateAt 0 99
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 99; 2; 3 |], result)

[<Test>]
let ``AsyncSeq.updateAt replaces last element`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.updateAt 2 99
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 99 |], result)

[<Test>]
let ``AsyncSeq.updateAt raises ArgumentException for negative index`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.updateAt -1 0
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

// ===== insertAt =====

[<Test>]
let ``AsyncSeq.insertAt inserts element at specified index`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 4; 5 ]
|> AsyncSeq.insertAt 2 3
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 3; 4; 5 |], result)

[<Test>]
let ``AsyncSeq.insertAt inserts at index 0 (prepend)`` () =
let result =
AsyncSeq.ofSeq [ 2; 3 ]
|> AsyncSeq.insertAt 0 1
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 3 |], result)

[<Test>]
let ``AsyncSeq.insertAt appends when index equals sequence length`` () =
let result =
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.insertAt 3 4
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 3; 4 |], result)

[<Test>]
let ``AsyncSeq.insertAt inserts into empty sequence at index 0`` () =
let result =
AsyncSeq.empty<int>
|> AsyncSeq.insertAt 0 42
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([| 42 |], result)

[<Test>]
let ``AsyncSeq.insertAt raises ArgumentException for negative index`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.insertAt -1 0
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

[<Test>]
let ``AsyncSeq.insertAt raises ArgumentException when index exceeds length`` () =
Assert.Throws<System.ArgumentException>(fun () ->
AsyncSeq.ofSeq [ 1; 2; 3 ]
|> AsyncSeq.insertAt 5 0
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore
2 changes: 1 addition & 1 deletion version.props
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<Project>
<PropertyGroup>
<Version>4.6.0</Version>
<Version>4.7.0</Version>
</PropertyGroup>
</Project>