@@ -15,8 +15,9 @@ internal abstract class AsyncPageBase<T> : IPage<T>
15
15
protected readonly int Offset ;
16
16
protected readonly int PageSize ;
17
17
private readonly IDisposable _onDisposalAfterFetchCompleted ;
18
+ private readonly IAsyncPageFetchScheduler _asyncPageFetchScheduler ;
18
19
protected readonly IObserver < ( int Offset , int PageSize , T [ ] PreviousPage , T [ ] Page ) > PageArrivalObservations ;
19
- private readonly CancellationTokenSource _cancellationTokenSource = new ( ) ;
20
+ protected readonly CancellationTokenSource CancellationTokenSource = new ( ) ;
20
21
protected T [ ] Page ;
21
22
protected bool IsDisposed ;
22
23
@@ -30,32 +31,29 @@ internal AsyncPageBase(
30
31
// dependencies
31
32
Func < int , int , T > placeholderFactory ,
32
33
IAsyncPageFetchScheduler asyncPageFetchScheduler ,
33
- IScheduler pageBackgroundScheduler ,
34
34
IObserver < ( int Offset , int PageSize , T [ ] PreviousPage , T [ ] Page ) > pageArrivalObservations )
35
35
{
36
36
Offset = offset ;
37
37
PageSize = pageSize ;
38
38
_onDisposalAfterFetchCompleted = onDisposalAfterFetchCompleted ;
39
+ _asyncPageFetchScheduler = asyncPageFetchScheduler ;
39
40
PageArrivalObservations = pageArrivalObservations ;
40
41
Page = Enumerable
41
42
. Range ( 0 , pageSize )
42
43
. Select ( pageIndex => placeholderFactory ( pageKey , pageIndex ) )
43
44
. ToArray ( ) ;
44
- PageFetchCompletion = Observable
45
- . StartAsync ( FetchPage , pageBackgroundScheduler )
46
- . ToTask ( _cancellationTokenSource . Token ) ;
45
+ }
47
46
48
- async Task FetchPage ( CancellationToken ct )
49
- {
50
- await asyncPageFetchScheduler . Schedule ( ) . ConfigureAwait ( false ) ;
51
- ct . ThrowIfCancellationRequested ( ) ;
52
- await FetchPageInner ( ct ) . ConfigureAwait ( false ) ;
53
- }
47
+ protected async Task FetchPage ( CancellationToken ct )
48
+ {
49
+ await _asyncPageFetchScheduler . Schedule ( ) . ConfigureAwait ( false ) ;
50
+ ct . ThrowIfCancellationRequested ( ) ;
51
+ await FetchPageInner ( ct ) . ConfigureAwait ( false ) ;
54
52
}
55
53
56
54
protected abstract Task FetchPageInner ( CancellationToken ct ) ;
57
55
58
- public Task PageFetchCompletion { get ; }
56
+ public abstract Task PageFetchCompletion { get ; }
59
57
60
58
public T this [ int index ] =>
61
59
index >= PageSize || index < 0
@@ -76,7 +74,7 @@ public async ValueTask DisposeAsync()
76
74
IsDisposed = true ;
77
75
try
78
76
{
79
- _cancellationTokenSource . Cancel ( ) ;
77
+ CancellationTokenSource . Cancel ( ) ;
80
78
await PageFetchCompletion . ConfigureAwait ( false ) ;
81
79
}
82
80
catch ( OperationCanceledException )
@@ -116,12 +114,17 @@ internal AsyncNonTaskBasedPage(
116
114
onDisposalAfterFetchCompleted ,
117
115
placeholderFactory ,
118
116
asyncPageFetchScheduler ,
119
- pageBackgroundScheduler ,
120
- pageArrivalObservations ) =>
117
+ pageArrivalObservations )
118
+ {
121
119
_pageFetcher = pageFetcher ;
120
+ PageFetchCompletion = Observable
121
+ . StartAsync ( FetchPage , pageBackgroundScheduler )
122
+ . ToTask ( CancellationTokenSource . Token ) ;
123
+ }
122
124
123
125
protected override async Task FetchPageInner ( CancellationToken ct )
124
126
{
127
+ await Task . Delay ( 1 , ct ) . ConfigureAwait ( false ) ;
125
128
var previousPage = Page ;
126
129
Page = _pageFetcher ( Offset , PageSize , ct ) ;
127
130
await DisposePageItems ( previousPage ) . ConfigureAwait ( false ) ;
@@ -130,6 +133,8 @@ protected override async Task FetchPageInner(CancellationToken ct)
130
133
else
131
134
await DisposePageItems ( Page ) . ConfigureAwait ( false ) ;
132
135
}
136
+
137
+ public override Task PageFetchCompletion { get ; }
133
138
}
134
139
135
140
internal sealed class AsyncTaskBasedPage < T > : AsyncPageBase < T >
@@ -156,9 +161,13 @@ internal AsyncTaskBasedPage(
156
161
onDisposalAfterFetchCompleted ,
157
162
placeholderFactory ,
158
163
asyncPageFetchScheduler ,
159
- pageBackgroundScheduler ,
160
- pageArrivalObservations ) =>
164
+ pageArrivalObservations )
165
+ {
161
166
_pageFetcher = pageFetcher ;
167
+ PageFetchCompletion = Observable
168
+ . StartAsync ( FetchPage , pageBackgroundScheduler )
169
+ . ToTask ( CancellationTokenSource . Token ) ;
170
+ }
162
171
163
172
protected override async Task FetchPageInner ( CancellationToken ct )
164
173
{
@@ -170,6 +179,8 @@ protected override async Task FetchPageInner(CancellationToken ct)
170
179
else
171
180
await DisposePageItems ( Page ) . ConfigureAwait ( false ) ;
172
181
}
182
+
183
+ public override Task PageFetchCompletion { get ; }
173
184
}
174
185
175
186
internal sealed class AsyncEnumerableBasedPage < T > : AsyncPageBase < T >
@@ -196,9 +207,13 @@ internal AsyncEnumerableBasedPage(
196
207
onDisposalAfterFetchCompleted ,
197
208
placeholderFactory ,
198
209
asyncPageFetchScheduler ,
199
- pageBackgroundScheduler ,
200
- pageArrivalObservations ) =>
210
+ pageArrivalObservations )
211
+ {
201
212
_pageFetcher = pageFetcher ;
213
+ PageFetchCompletion = Observable
214
+ . StartAsync ( FetchPage , pageBackgroundScheduler )
215
+ . ToTask ( CancellationTokenSource . Token ) ;
216
+ }
202
217
203
218
protected override async Task FetchPageInner ( CancellationToken ct )
204
219
{
@@ -216,5 +231,7 @@ protected override async Task FetchPageInner(CancellationToken ct)
216
231
i ++ ;
217
232
}
218
233
}
234
+
235
+ public override Task PageFetchCompletion { get ; }
219
236
}
220
237
}
0 commit comments