Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Docker.DotNet/Docker.DotNet.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="System.Buffers" Version="4.5.1" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />
</ItemGroup>
</Project>
20 changes: 10 additions & 10 deletions src/Docker.DotNet/DockerClient.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipes;
Expand Down Expand Up @@ -346,22 +346,22 @@ private async Task<HttpResponseMessage> PrivateMakeRequestAsync(
IRequestContent data,
CancellationToken cancellationToken)
{
// If there is a timeout, we turn it into a cancellation token. At the same time, we need to link to the caller's
// cancellation token. To avoid leaking objects, we must then also dispose of the CancellationTokenSource. To keep
// code flow simple, we treat it as re-entering the same method with a different CancellationToken and no timeout.
var request = PrepareRequest(method, path, queryString, headers, data);

if (timeout != s_InfiniteTimeout)
{
using (var timeoutTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
{
timeoutTokenSource.CancelAfter(timeout);
Comment on lines 351 to 355
Copy link
Contributor

@Emdot Emdot Feb 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to move the timeout complexity to a separate function? Then this whole method can be:

var request = PrepareRequest(method, path, queryString, headers, data);
return RunWithTimeoutAsync(
    ct => _client.SendAsync(request, completionOption, ct), 
    cancellationToken, timeout);

A RunWithTimeoutAsync implementation might look like this:

public static async Task<T> TimeoutAfter<T>(Func<CancellationToken, Task<T>> makeTask, CancellationToken inputToken, TimeSpan timeout)
{
    // Make a token that triggers either manually or when the input token triggers.
    using var cts = CancellationTokenSource.CreateLinkedTokenSource(inputToken);

    var timerTask = Task.Delay(timeout, cts.Token);
    var mainTask = makeTask(cts.Token);
    
    // Start both the timeout and the main task in parallel.
    Task firstTask == await Task.WhenAny(timerTask, mainTask).ConfigureAwait(false);
    
    // At this point, one of them has finished. Cancel the other one.
    cts.Cancel();
    
    // If the consumer canceled, put the output task in the canceled state and link it to the input token.
    if (inputToken.IsCancellationRequested)
        throw new OperationCanceledException(inputToken);

    // If the timeout occurred first, throw a timed-out exception.
    if (firstTask == timerTask)
        throw new TimeoutException();
    
    // Otherwise return the result of the main task. If it was canceled, this may throw 
    // a cancelation exception. If it faulted, it may throw some other exception.
    return await mainTask.ConfigureAwait(false);
}

For added performance you could use a state parameter to avoid the closure. As a bonus, timeouts emit a separate exception than cancellation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Emdot great suggestion.
I'll wait for this to get approved and continue with improvements.


// We must await here because we need to dispose of the CTS only after the work has been completed.
return await PrivateMakeRequestAsync(s_InfiniteTimeout, completionOption, method, path, queryString, headers, data, timeoutTokenSource.Token).ConfigureAwait(false);
return await _client.SendAsync(request, completionOption, timeoutTokenSource.Token).ConfigureAwait(false);
}
}

var request = PrepareRequest(method, path, queryString, headers, data);
return await _client.SendAsync(request, completionOption, cancellationToken).ConfigureAwait(false);
var tcs = new TaskCompletionSource<HttpResponseMessage>();
using (cancellationToken.Register(() => tcs.SetCanceled()))
{
return await await Task.WhenAny(tcs.Task, _client.SendAsync(request, completionOption, cancellationToken)).ConfigureAwait(false);
}
}

private async Task HandleIfErrorResponseAsync(HttpStatusCode statusCode, HttpResponseMessage response, IEnumerable<ApiResponseErrorHandlingDelegate> handlers)
Expand Down Expand Up @@ -452,4 +452,4 @@ public void Dispose()
}

internal delegate void ApiResponseErrorHandlingDelegate(HttpStatusCode statusCode, string responseBody);
}
}
152 changes: 55 additions & 97 deletions src/Docker.DotNet/Endpoints/StreamUtil.cs
Original file line number Diff line number Diff line change
@@ -1,97 +1,55 @@
using System;
using System.IO;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;

namespace Docker.DotNet.Models
{
internal static class StreamUtil
{
private static Newtonsoft.Json.JsonSerializer _serializer = new Newtonsoft.Json.JsonSerializer();

internal static async Task MonitorStreamAsync(Task<Stream> streamTask, DockerClient client, CancellationToken cancel, IProgress<string> progress)
{
using (var stream = await streamTask)
{
// ReadLineAsync must be cancelled by closing the whole stream.
using (cancel.Register(() => stream.Dispose()))
{
using (var reader = new StreamReader(stream, new UTF8Encoding(false)))
{
string line;
while ((line = await reader.ReadLineAsync()) != null)
{
progress.Report(line);
}
}
}
}
}

internal static async Task MonitorStreamForMessagesAsync<T>(Task<Stream> streamTask, DockerClient client, CancellationToken cancel, IProgress<T> progress)
{
using (var stream = await streamTask)
using (var reader = new StreamReader(stream, new UTF8Encoding(false)))
using (var jsonReader = new JsonTextReader(reader) { SupportMultipleContent = true })
{
while (await jsonReader.ReadAsync().WithCancellation(cancel))
{
var ev = _serializer.Deserialize<T>(jsonReader);
progress?.Report(ev);
}
}
}

internal static async Task MonitorResponseForMessagesAsync<T>(Task<HttpResponseMessage> responseTask, DockerClient client, CancellationToken cancel, IProgress<T> progress)
{
using (var response = await responseTask)
{
await client.HandleIfErrorResponseAsync(response.StatusCode, response);

using (var stream = await response.Content.ReadAsStreamAsync())
{
// ReadLineAsync must be cancelled by closing the whole stream.
using (cancel.Register(() => stream.Dispose()))
{
using (var reader = new StreamReader(stream, new UTF8Encoding(false)))
{
string line;
try
{
while ((line = await reader.ReadLineAsync()) != null)
{
var prog = client.JsonSerializer.DeserializeObject<T>(line);
if (prog == null) continue;

progress.Report(prog);
}
}
catch (ObjectDisposedException)
{
// The subsequent call to reader.ReadLineAsync() after cancellation
// will fail because we disposed the stream. Just ignore here.
}
}
}
}
}
}

private static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<bool>();
using (cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs))
{
if (task != await Task.WhenAny(task, tcs.Task))
{
throw new OperationCanceledException(cancellationToken);
}
}

return await task;
}
}
}
using System;
using System.Diagnostics;
using System.IO;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;

namespace Docker.DotNet.Models
{
internal static class StreamUtil
{
internal static async Task MonitorStreamAsync(Task<Stream> streamTask, DockerClient client, CancellationToken cancellationToken, IProgress<string> progress)
{
var tcs = new TaskCompletionSource<string>();

using (var stream = await streamTask)
using (var reader = new StreamReader(stream, new UTF8Encoding(false)))
using (cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)))
{
string line;
while ((line = await await Task.WhenAny(reader.ReadLineAsync(), tcs.Task)) != null)
{
progress.Report(line);
}
}
}

internal static async Task MonitorStreamForMessagesAsync<T>(Task<Stream> streamTask, DockerClient client, CancellationToken cancellationToken, IProgress<T> progress)
{
var tcs = new TaskCompletionSource<bool>();

using (var stream = await streamTask)
using (var reader = new StreamReader(stream, new UTF8Encoding(false)))
using (var jsonReader = new JsonTextReader(reader) { SupportMultipleContent = true })
using (cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)))
{
while (await await Task.WhenAny(jsonReader.ReadAsync(cancellationToken), tcs.Task))
{
var ev = await client.JsonSerializer.Deserialize<T>(jsonReader, cancellationToken);
progress.Report(ev);
}
}
}

internal static async Task MonitorResponseForMessagesAsync<T>(Task<HttpResponseMessage> responseTask, DockerClient client, CancellationToken cancel, IProgress<T> progress)
{
using (var response = await responseTask)
{
await MonitorStreamForMessagesAsync<T>(response.Content.ReadAsStreamAsync(), client, cancel, progress);
}
}
}
}
25 changes: 23 additions & 2 deletions src/Docker.DotNet/JsonSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Newtonsoft.Json;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;

namespace Docker.DotNet
Expand All @@ -8,6 +10,8 @@ namespace Docker.DotNet
/// </summary>
internal class JsonSerializer
{
private readonly Newtonsoft.Json.JsonSerializer _serializer;

private readonly JsonSerializerSettings _settings = new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore,
Expand All @@ -24,6 +28,23 @@ internal class JsonSerializer

public JsonSerializer()
{
_serializer = Newtonsoft.Json.JsonSerializer.CreateDefault(this._settings);
}

public Task<T> Deserialize<T>(JsonReader jsonReader, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<T>();
using (cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)))
{
Task.Factory.StartNew(
() => tcs.TrySetResult(_serializer.Deserialize<T>(jsonReader)),
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default
);

return tcs.Task;
}
}

public T DeserializeObject<T>(string json)
Expand All @@ -36,4 +57,4 @@ public string SerializeObject<T>(T value)
return JsonConvert.SerializeObject(value, this._settings);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public override int WriteTimeout

public override int Read(byte[] buffer, int offset, int count)
{
return ReadAsync(buffer, offset, count, CancellationToken.None).Result;
return ReadAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
}

public async override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
Expand Down
Loading