Skip to content

Commit b7c3446

Browse files
authored
Add Channel.CreateUnboundedPrioritized (#100550)
1 parent 85fbd98 commit b7c3446

11 files changed

+587
-31
lines changed

src/libraries/System.Threading.Channels/ref/System.Threading.Channels.csproj

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,17 @@
55

66
<ItemGroup>
77
<Compile Include="System.Threading.Channels.cs" />
8-
<Compile Include="System.Threading.Channels.netcoreapp.cs" Condition="$([MSBuild]::IsTargetFrameworkCompatible('$(TargetFramework)', 'netstandard2.1'))"/>
8+
<Compile Include="System.Threading.Channels.netstandard21.cs" Condition="$([MSBuild]::IsTargetFrameworkCompatible('$(TargetFramework)', 'netstandard2.1'))"/>
99
</ItemGroup>
1010

1111
<ItemGroup Condition="'$(TargetFramework)' == '$(NetCoreAppCurrent)'">
1212
<ProjectReference Include="$(LibrariesProjectRoot)System.Runtime/ref/System.Runtime.csproj" />
1313
</ItemGroup>
1414

15+
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' == '.NETCoreApp'">
16+
<Compile Include="System.Threading.Channels.netcoreapp.cs" />
17+
</ItemGroup>
18+
1519
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">
1620
<PackageReference Include="System.Threading.Tasks.Extensions" Version="$(SystemThreadingTasksExtensionsVersion)" />
1721
<ProjectReference Include="$(LibrariesProjectRoot)Microsoft.Bcl.AsyncInterfaces\ref\Microsoft.Bcl.AsyncInterfaces.csproj" />

src/libraries/System.Threading.Channels/ref/System.Threading.Channels.netcoreapp.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,17 @@
44
// Changes to this file must follow the https://aka.ms/api-review process.
55
// ------------------------------------------------------------------------------
66

7+
using System.Collections.Generic;
8+
79
namespace System.Threading.Channels
810
{
9-
public partial class ChannelClosedException : System.InvalidOperationException
11+
public partial class Channel
12+
{
13+
public static System.Threading.Channels.Channel<T> CreateUnboundedPrioritized<T>() { throw null; }
14+
public static System.Threading.Channels.Channel<T> CreateUnboundedPrioritized<T>(System.Threading.Channels.UnboundedPrioritizedChannelOptions<T> options) { throw null; }
15+
}
16+
public sealed partial class UnboundedPrioritizedChannelOptions<T> : System.Threading.Channels.ChannelOptions
1017
{
11-
#if NET8_0_OR_GREATER
12-
[System.ObsoleteAttribute("This API supports obsolete formatter-based serialization. It should not be called or extended by application code.", DiagnosticId = "SYSLIB0051", UrlFormat = "https://aka.ms/dotnet-warnings/{0}")]
13-
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
14-
#endif
15-
protected ChannelClosedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
18+
public System.Collections.Generic.IComparer<T>? Comparer { get; set; }
1619
}
1720
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// ------------------------------------------------------------------------------
4+
// Changes to this file must follow the https://aka.ms/api-review process.
5+
// ------------------------------------------------------------------------------
6+
7+
namespace System.Threading.Channels
8+
{
9+
public partial class ChannelClosedException : System.InvalidOperationException
10+
{
11+
#if NET8_0_OR_GREATER
12+
[System.ObsoleteAttribute("This API supports obsolete formatter-based serialization. It should not be called or extended by application code.", DiagnosticId = "SYSLIB0051", UrlFormat = "https://aka.ms/dotnet-warnings/{0}")]
13+
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
14+
#endif
15+
protected ChannelClosedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
16+
}
17+
}

src/libraries/System.Threading.Channels/src/System.Threading.Channels.csproj

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,10 @@ System.Threading.Channel&lt;T&gt;</PackageDescription>
1414
<ItemGroup>
1515
<Compile Include="System\VoidResult.cs" />
1616
<Compile Include="System\Threading\Channels\AsyncOperation.cs" />
17-
<Compile Include="System\Threading\Channels\AsyncOperation.netcoreapp.cs"
18-
Condition="'$(TargetFrameworkIdentifier)' == '.NETCoreApp'" />
19-
<Compile Include="System\Threading\Channels\AsyncOperation.netstandard.cs"
20-
Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'" />
2117
<Compile Include="System\Threading\Channels\BoundedChannel.cs" />
2218
<Compile Include="System\Threading\Channels\BoundedChannelFullMode.cs" />
2319
<Compile Include="System\Threading\Channels\Channel.cs" />
2420
<Compile Include="System\Threading\Channels\ChannelClosedException.cs" />
25-
<Compile Include="System\Threading\Channels\ChannelClosedException.netcoreapp.cs"
26-
Condition="$([MSBuild]::IsTargetFrameworkCompatible('$(TargetFramework)', 'netstandard2.1'))" />
2721
<Compile Include="System\Threading\Channels\ChannelOptions.cs" />
2822
<Compile Include="System\Threading\Channels\ChannelReader.cs" />
2923
<Compile Include="System\Threading\Channels\ChannelUtilities.cs" />
@@ -32,21 +26,25 @@ System.Threading.Channel&lt;T&gt;</PackageDescription>
3226
<Compile Include="System\Threading\Channels\Channel_2.cs" />
3327
<Compile Include="System\Threading\Channels\IDebugEnumerator.cs" />
3428
<Compile Include="System\Threading\Channels\SingleConsumerUnboundedChannel.cs" />
35-
<Compile Include="System\Threading\Channels\TaskCompletionSource.cs"
36-
Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'"/>
3729
<Compile Include="System\Threading\Channels\UnboundedChannel.cs" />
38-
<Compile Include="$(CommonPath)Internal\Padding.cs"
39-
Link="Common\Internal\Padding.cs" />
40-
<Compile Include="$(CommonPath)System\Collections\Concurrent\IProducerConsumerQueue.cs"
41-
Link="Common\System\Collections\Concurrent\IProducerConsumerQueue.cs" />
42-
<Compile Include="$(CommonPath)System\Collections\Concurrent\MultiProducerMultiConsumerQueue.cs"
43-
Link="Common\System\Collections\Concurrent\MultiProducerMultiConsumerQueue.cs" />
44-
<Compile Include="$(CommonPath)System\Collections\Concurrent\SingleProducerSingleConsumerQueue.cs"
45-
Link="Common\System\Collections\Concurrent\SingleProducerSingleConsumerQueue.cs" />
46-
<Compile Include="$(CommonPath)System\Collections\Generic\Deque.cs"
47-
Link="Common\System\Collections\Generic\Deque.cs" />
48-
<Compile Include="$(CommonPath)System\Obsoletions.cs"
49-
Link="Common\System\Obsoletions.cs" />
30+
<Compile Include="$(CommonPath)Internal\Padding.cs" Link="Common\Internal\Padding.cs" />
31+
<Compile Include="$(CommonPath)System\Collections\Concurrent\IProducerConsumerQueue.cs" Link="Common\System\Collections\Concurrent\IProducerConsumerQueue.cs" />
32+
<Compile Include="$(CommonPath)System\Collections\Concurrent\MultiProducerMultiConsumerQueue.cs" Link="Common\System\Collections\Concurrent\MultiProducerMultiConsumerQueue.cs" />
33+
<Compile Include="$(CommonPath)System\Collections\Concurrent\SingleProducerSingleConsumerQueue.cs" Link="Common\System\Collections\Concurrent\SingleProducerSingleConsumerQueue.cs" />
34+
<Compile Include="$(CommonPath)System\Collections\Generic\Deque.cs" Link="Common\System\Collections\Generic\Deque.cs" />
35+
<Compile Include="$(CommonPath)System\Obsoletions.cs" Link="Common\System\Obsoletions.cs" />
36+
</ItemGroup>
37+
38+
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">
39+
<Compile Include="System\Threading\Channels\AsyncOperation.netstandard.cs" />
40+
<Compile Include="System\Threading\Channels\TaskCompletionSource.cs" />
41+
</ItemGroup>
42+
43+
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' == '.NETCoreApp'">
44+
<Compile Include="System\Threading\Channels\AsyncOperation.netcoreapp.cs" />
45+
<Compile Include="System\Threading\Channels\Channel.netcoreapp.cs" />
46+
<Compile Include="System\Threading\Channels\ChannelOptions.netcoreapp.cs" />
47+
<Compile Include="System\Threading\Channels\UnboundedPriorityChannel.cs" />
5048
</ItemGroup>
5149

5250
<ItemGroup Condition="'$(TargetFramework)' == '$(NetCoreAppCurrent)'">
@@ -57,6 +55,10 @@ System.Threading.Channel&lt;T&gt;</PackageDescription>
5755
<Reference Include="System.Threading.ThreadPool" />
5856
</ItemGroup>
5957

58+
<ItemGroup Condition="$([MSBuild]::IsTargetFrameworkCompatible('$(TargetFramework)', 'netstandard2.1'))">
59+
<Compile Include="System\Threading\Channels\ChannelClosedException.netcoreapp.cs" />
60+
</ItemGroup>
61+
6062
<ItemGroup Condition="!$([MSBuild]::IsTargetFrameworkCompatible('$(TargetFramework)', 'netstandard2.1'))">
6163
<PackageReference Include="System.Threading.Tasks.Extensions" Version="$(SystemThreadingTasksExtensionsVersion)" />
6264
<ProjectReference Include="$(LibrariesProjectRoot)Microsoft.Bcl.AsyncInterfaces\src\Microsoft.Bcl.AsyncInterfaces.csproj" />

src/libraries/System.Threading.Channels/src/System/Threading/Channels/Channel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
namespace System.Threading.Channels
55
{
66
/// <summary>Provides static methods for creating channels.</summary>
7-
public static class Channel
7+
public static partial class Channel
88
{
99
/// <summary>Creates an unbounded channel usable by any number of readers and writers concurrently.</summary>
1010
/// <returns>The created channel.</returns>
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Collections.Generic;
5+
6+
namespace System.Threading.Channels
7+
{
8+
/// <summary>Provides static methods for creating channels.</summary>
9+
public static partial class Channel
10+
{
11+
/// <summary>Creates an unbounded prioritized channel usable by any number of readers and writers concurrently.</summary>
12+
/// <returns>The created channel.</returns>
13+
/// <remarks>
14+
/// <see cref="Comparer{T}.Default"/> is used to determine priority of elements.
15+
/// The next item read from the channel will be the element available in the channel with the lowest priority value.
16+
/// </remarks>
17+
public static Channel<T> CreateUnboundedPrioritized<T>() =>
18+
new UnboundedPrioritizedChannel<T>(runContinuationsAsynchronously: true, comparer: null);
19+
20+
/// <summary>Creates an unbounded prioritized channel subject to the provided options.</summary>
21+
/// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
22+
/// <param name="options">Options that guide the behavior of the channel.</param>
23+
/// <returns>The created channel.</returns>
24+
/// <remarks>
25+
/// The supplied <paramref name="options"/>' <see cref="UnboundedPrioritizedChannelOptions{T}.Comparer"/> is used to determine priority of elements,
26+
/// or <see cref="Comparer{T}.Default"/> if the provided comparer is null.
27+
/// The next item read from the channel will be the element available in the channel with the lowest priority value.
28+
/// </remarks>
29+
public static Channel<T> CreateUnboundedPrioritized<T>(UnboundedPrioritizedChannelOptions<T> options)
30+
{
31+
ArgumentNullException.ThrowIfNull(options);
32+
33+
return new UnboundedPrioritizedChannel<T>(!options.AllowSynchronousContinuations, options.Comparer);
34+
}
35+
}
36+
}

src/libraries/System.Threading.Channels/src/System/Threading/Channels/ChannelOptions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public abstract class ChannelOptions
3939
public bool AllowSynchronousContinuations { get; set; }
4040
}
4141

42-
/// <summary>Provides options that control the behavior of <see cref="BoundedChannel{T}"/> instances.</summary>
42+
/// <summary>Provides options that control the behavior of instances created by <see cref="M:Channel.CreateBounded"/>.</summary>
4343
public sealed class BoundedChannelOptions : ChannelOptions
4444
{
4545
/// <summary>The maximum number of items the bounded channel may store.</summary>
@@ -94,7 +94,7 @@ public BoundedChannelFullMode FullMode
9494
}
9595
}
9696

97-
/// <summary>Provides options that control the behavior of <see cref="UnboundedChannel{T}"/> instances.</summary>
97+
/// <summary>Provides options that control the behavior of instances created by <see cref="M:Channel.CreateUnbounded"/>.</summary>
9898
public sealed class UnboundedChannelOptions : ChannelOptions
9999
{
100100
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Collections.Generic;
5+
6+
namespace System.Threading.Channels
7+
{
8+
/// <summary>Provides options that control the behavior of instances created by <see cref="M:Channel.CreateUnboundedPrioritized"/>.</summary>
9+
public sealed class UnboundedPrioritizedChannelOptions<T> : ChannelOptions
10+
{
11+
/// <summary>Gets or sets the comparer used by the channel to prioritize elements.</summary>
12+
public IComparer<T>? Comparer { get; set; }
13+
}
14+
}

0 commit comments

Comments
 (0)