-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAbstractDataSink.cs
More file actions
107 lines (92 loc) · 3.2 KB
/
AbstractDataSink.cs
File metadata and controls
107 lines (92 loc) · 3.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Sample
{
public abstract class AbstractDataSink<Item>
{
// Режимы завершения цикла обработки элементов.
private enum StopMode
{
None, // не завершать цикл
Drain, // обработать имеющиеся элементы
Break // завершить без обработки имеющихся элементов
}
private Queue<Item> _queue = new Queue<Item>();
private StopMode _stopMode = StopMode.None;
private TaskCompletionSource<object> _tcs = new TaskCompletionSource<object>();
private Task _task;
public AbstractDataSink()
{
_task = ProcessQueueAsync();
}
// Обработка элемента данных.
protected abstract Task ProcessItem(Item item);
// Добавление элемента в очередь.
public bool Enqueue(Item item)
{
lock (_queue)
{
if (_stopMode != StopMode.None) return false;
_queue.Enqueue(item);
_tcs.TrySetResult(null);
return true;
}
}
// Запрет добавления элементов
// и ожидание завершения обработки содержимого очереди.
public async Task DrainAsync()
{
lock (this)
{
if (_stopMode == StopMode.None) _stopMode = StopMode.Drain;
_tcs.TrySetResult(null);
}
await _task;
}
// Запрет добавления элементов,
// и прерывание цикла обработки очереди
// после завершения текущих вызовов ProcessItem и OnSuspend.
public void Break()
{
lock (this)
{
_stopMode = StopMode.Break;
_tcs.TrySetResult(null);
}
try
{
Task.WhenAll(_task).Wait();
}
catch (AggregateException e)
{
e.Handle(x => throw x);
}
}
// Цикл обработки элементов.
private async Task ProcessQueueAsync()
{
while (true)
{
await _tcs.Task.ConfigureAwait(false);
await Task.Yield();
while (true)
{
Item item;
lock (_queue)
{
if (_stopMode == StopMode.Break) return;
if (_queue.Count == 0)
{
if (_stopMode == StopMode.Drain) return;
_tcs = new TaskCompletionSource<object>();
break;
}
item = _queue.Dequeue();
}
await ProcessItem(item);
}
}
}
}
}