Skip to content

Commit dab8971

Browse files
author
Staffan Gustafsson
committed
TryAdd/TryTake for better robustness and responsiveness.
Hierarchical progress
1 parent 4d54c85 commit dab8971

File tree

6 files changed

+97
-33
lines changed

6 files changed

+97
-33
lines changed

module/PSParallel.psd1

364 Bytes
Binary file not shown.

src/PSParallel/InvokeParallelCommand.cs

+10-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Diagnostics;
34
using System.Linq;
45
using System.Management.Automation;
56
using System.Management.Automation.Runspaces;
@@ -185,8 +186,10 @@ protected override void ProcessRecord()
185186
{
186187
if(NoProgress)
187188
{
188-
m_powershellPool.AddInput(ScriptBlock, InputObject);
189-
WriteOutputs();
189+
while (!m_powershellPool.TryAddInput(ScriptBlock, InputObject))
190+
{
191+
WriteOutputs();
192+
}
190193
}
191194
else
192195
{
@@ -205,8 +208,10 @@ protected override void EndProcessing()
205208
{
206209
var pr = m_progressManager.GetCurrentProgressRecord($"Starting processing of {i}", m_powershellPool.ProcessedCount);
207210
WriteProgress(pr);
208-
m_powershellPool.AddInput(ScriptBlock, i);
209-
WriteOutputs();
211+
while (!m_powershellPool.TryAddInput(ScriptBlock, i))
212+
{
213+
WriteOutputs();
214+
}
210215
}
211216
}
212217
while(!m_powershellPool.WaitForAllPowershellCompleted(100))
@@ -241,6 +246,7 @@ protected override void StopProcessing()
241246

242247
private void WriteOutputs()
243248
{
249+
Debug.WriteLine("Processing output");
244250
if (m_cancelationTokenSource.IsCancellationRequested)
245251
{
246252
return;

src/PSParallel/PowerShellPoolMember.cs

+16-5
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,25 @@ namespace PSParallel
66
class PowerShellPoolMember : IDisposable
77
{
88
private readonly PowershellPool m_pool;
9+
private readonly int m_index;
910
private readonly PowerShellPoolStreams m_poolStreams;
1011
private PowerShell m_powerShell;
1112
public PowerShell PowerShell => m_powerShell;
13+
public int Index => m_index ;
14+
1215
private readonly PSDataCollection<PSObject> m_input =new PSDataCollection<PSObject>();
1316
private PSDataCollection<PSObject> m_output;
17+
private ProgressProjector m_progressProjector;
18+
public ProgressProjector ProgressProjector => m_progressProjector;
1419

15-
public PowerShellPoolMember(PowershellPool pool)
20+
public PowerShellPoolMember(PowershellPool pool, int index)
1621
{
1722
m_pool = pool;
23+
m_index = index;
1824
m_poolStreams = m_pool.Streams;
19-
m_input.Complete();
25+
m_input.Complete();
2026
CreatePowerShell();
27+
m_progressProjector = new ProgressProjector();
2128
}
2229

2330
private void PowerShellOnInvocationStateChanged(object sender, PSInvocationStateChangedEventArgs psInvocationStateChangedEventArgs)
@@ -81,10 +88,13 @@ private void UnhookStreamEvents(PSDataStreams streams)
8188

8289
public void BeginInvoke(ScriptBlock scriptblock, PSObject inputObject)
8390
{
84-
string command = $"param($_,$PSItem){scriptblock}";
91+
m_progressProjector.Start();
92+
string command = $"param($_,$PSItem, $PSPArallelIndex,$PSParallelProgressId){scriptblock}";
8593
m_powerShell.AddScript(command)
8694
.AddParameter("_", inputObject)
87-
.AddParameter("PSItem", inputObject);
95+
.AddParameter("PSItem", inputObject)
96+
.AddParameter("PSParallelIndex", m_index)
97+
.AddParameter("PSParallelProgressId", m_index+1000);
8898
m_powerShell.BeginInvoke(m_input, m_output);
8999
}
90100

@@ -115,7 +125,8 @@ private void InformationOnDataAdded(object sender, DataAddedEventArgs dataAddedE
115125

116126
private void ProgressOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs)
117127
{
118-
var record = ((PSDataCollection<ProgressRecord>)sender)[dataAddedEventArgs.Index];
128+
var record = ((PSDataCollection<ProgressRecord>)sender)[dataAddedEventArgs.Index];
129+
m_progressProjector.ReportProgress(record.PercentComplete);
119130
m_poolStreams.Progress.Add(record);
120131
}
121132

src/PSParallel/PowershellPool.cs

+34-21
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
using System;
22
using System.Collections.Concurrent;
33
using System.Collections.Generic;
4+
using System.Diagnostics;
45
using System.Diagnostics.Contracts;
56
using System.Management.Automation;
67
using System.Management.Automation.Runspaces;
78
using System.Threading;
89

910
namespace PSParallel
1011
{
11-
class PowershellPool : IDisposable
12+
sealed class PowershellPool : IDisposable
1213
{
1314
private int m_busyCount;
1415
private int m_processedCount;
1516
private readonly CancellationToken m_cancellationToken;
1617
private readonly RunspacePool m_runspacePool;
1718
private readonly List<PowerShellPoolMember> m_poolMembers;
18-
private readonly BlockingCollection<PowerShellPoolMember> m_availablePoolMembers = new BlockingCollection<PowerShellPoolMember>(new ConcurrentStack<PowerShellPoolMember> ());
19+
private readonly BlockingCollection<PowerShellPoolMember> m_availablePoolMembers = new BlockingCollection<PowerShellPoolMember>(new ConcurrentQueue<PowerShellPoolMember>());
1920
public readonly PowerShellPoolStreams Streams = new PowerShellPoolStreams();
2021

2122
public int ProcessedCount => m_processedCount;
@@ -26,9 +27,9 @@ public PowershellPool(int poolSize, InitialSessionState initialSessionState, Can
2627
m_processedCount = 0;
2728
m_cancellationToken = cancellationToken;
2829

29-
for (int i = 0; i < poolSize; i++)
30+
for (var i = 0; i < poolSize; i++)
3031
{
31-
var powerShellPoolMember = new PowerShellPoolMember(this);
32+
var powerShellPoolMember = new PowerShellPoolMember(this, i+1);
3233
m_poolMembers.Add(powerShellPoolMember);
3334
m_availablePoolMembers.Add(powerShellPoolMember);
3435
}
@@ -37,16 +38,17 @@ public PowershellPool(int poolSize, InitialSessionState initialSessionState, Can
3738
m_runspacePool.SetMaxRunspaces(poolSize);
3839
}
3940

40-
public void AddInput(ScriptBlock scriptblock,PSObject inputObject)
41-
{
42-
try {
43-
var powerShell = WaitForAvailablePowershell();
44-
Interlocked.Increment(ref m_busyCount);
45-
powerShell.BeginInvoke(scriptblock, inputObject);
46-
}
47-
catch(OperationCanceledException)
41+
public bool TryAddInput(ScriptBlock scriptblock,PSObject inputObject)
42+
{
43+
PowerShellPoolMember poolMember;
44+
if(!TryWaitForAvailablePowershell(100, out poolMember))
4845
{
46+
return false;
4947
}
48+
49+
Interlocked.Increment(ref m_busyCount);
50+
poolMember.BeginInvoke(scriptblock, inputObject);
51+
return true;
5052
}
5153

5254
public void Open()
@@ -76,11 +78,19 @@ public bool WaitForAllPowershellCompleted(int timeoutMilliseconds)
7678
return false;
7779
}
7880

79-
private PowerShellPoolMember WaitForAvailablePowershell()
80-
{
81-
var poolmember = m_availablePoolMembers.Take(m_cancellationToken);
82-
poolmember.PowerShell.RunspacePool = m_runspacePool;
83-
return poolmember;
81+
private bool TryWaitForAvailablePowershell(int milliseconds, out PowerShellPoolMember poolMember)
82+
{
83+
if(!m_availablePoolMembers.TryTake(out poolMember, milliseconds, m_cancellationToken))
84+
{
85+
m_cancellationToken.ThrowIfCancellationRequested();
86+
Debug.WriteLine($"WaitForAvailablePowershell - TryTake failed");
87+
poolMember = null;
88+
return false;
89+
}
90+
91+
poolMember.PowerShell.RunspacePool = m_runspacePool;
92+
Debug.WriteLine($"WaitForAvailablePowershell - Busy: {m_busyCount} _processed {m_processedCount}, member = {poolMember.Index}");
93+
return true;
8494
}
8595

8696

@@ -95,10 +105,13 @@ public void ReportAvailable(PowerShellPoolMember poolmember)
95105
{
96106
Interlocked.Decrement(ref m_busyCount);
97107
Interlocked.Increment(ref m_processedCount);
98-
if(!m_cancellationToken.IsCancellationRequested)
99-
{
100-
m_availablePoolMembers.Add(poolmember);
101-
}
108+
while (!m_availablePoolMembers.TryAdd(poolmember, 1000, m_cancellationToken))
109+
{
110+
m_cancellationToken.ThrowIfCancellationRequested();
111+
Debug.WriteLine($"WaitForAvailablePowershell - TryAdd failed");
112+
}
113+
Debug.WriteLine($"ReportAvailable - Busy: {m_busyCount} _processed {m_processedCount}, member = {poolmember.Index}");
114+
102115
}
103116

104117
public void ReportStopped(PowerShellPoolMember powerShellPoolMember)

src/PSParallel/ProgressManager.cs

+35-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Diagnostics;
1+
using System;
2+
using System.Diagnostics;
23
using System.Management.Automation;
34

45
namespace PSParallel
@@ -53,4 +54,37 @@ public ProgressRecord Completed()
5354
private int GetPercentComplete(int count) => count*100/TotalCount;
5455
public int ActivityId => m_progressRecord.ActivityId;
5556
}
57+
58+
59+
class ProgressProjector
60+
{
61+
private readonly Stopwatch m_stopWatch;
62+
private int m_percentComplete;
63+
public ProgressProjector()
64+
{
65+
m_stopWatch = new Stopwatch();
66+
m_percentComplete = -1;
67+
}
68+
69+
public void ReportProgress(int percentComplete)
70+
{
71+
m_percentComplete = percentComplete;
72+
}
73+
74+
public bool IsValid => m_percentComplete > 0 && m_stopWatch.IsRunning;
75+
public TimeSpan Elapsed => m_stopWatch.Elapsed;
76+
77+
public TimeSpan ProjectedTotalTime => new TimeSpan(Elapsed.Ticks * 100 / m_percentComplete);
78+
79+
public void Start()
80+
{
81+
m_stopWatch.Start();
82+
m_percentComplete = 0;
83+
}
84+
85+
public void Stop()
86+
{
87+
m_stopWatch.Stop();
88+
}
89+
}
5690
}

src/PSParallel/Properties/AssemblyInfo.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
[assembly: AssemblyConfiguration("")]
1010
[assembly: AssemblyCompany("")]
1111
[assembly: AssemblyProduct("PSParallell")]
12-
[assembly: AssemblyCopyright("Copyright © 2015")]
12+
[assembly: AssemblyCopyright("Copyright © 2016")]
1313
[assembly: AssemblyTrademark("")]
1414
[assembly: AssemblyCulture("")]
1515

@@ -32,4 +32,4 @@
3232
// by using the '*' as shown below:
3333
// [assembly: AssemblyVersion("1.0.*")]
3434
[assembly: AssemblyVersion("1.0.0.0")]
35-
[assembly: AssemblyFileVersion("2.1.3.0")]
35+
[assembly: AssemblyFileVersion("2.2.0.0")]

0 commit comments

Comments
 (0)