Skip to content

Commit 3ee03e9

Browse files
author
Staffan Gustafsson
committed
Manginging runspaces w/o using RunspacePool.
Improving progress handling.
1 parent 946d53e commit 3ee03e9

File tree

5 files changed

+231
-137
lines changed

5 files changed

+231
-137
lines changed

src/PSParallel/InvokeParallelCommand.cs

+52-18
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,9 @@ private static IEnumerable<PSVariable> GetVariables(SessionState sessionState)
7777
try
7878
{
7979
string[] noTouchVariables = { "null", "true", "false", "Error" };
80-
var variables = sessionState.InvokeProvider.Item.Get("Variable:");
80+
var variables = sessionState.InvokeProvider.Item.Get("Variable:");
8181
var psVariables = (IEnumerable<PSVariable>)variables[0].BaseObject;
82+
8283
return psVariables.Where(p => !noTouchVariables.Contains(p.Name));
8384
}
8485
catch (DriveNotFoundException)
@@ -102,11 +103,22 @@ private static void CaptureVariables(SessionState sessionState, InitialSessionSt
102103
foreach (var variable in variables)
103104
{
104105
var existing = initialSessionState.Variables[variable.Name].FirstOrDefault();
105-
if (existing != null && (existing.Options & (ScopedItemOptions.Constant | ScopedItemOptions.ReadOnly)) != ScopedItemOptions.None)
106+
if (existing != null)
106107
{
107-
continue;
108+
if ((existing.Options & (ScopedItemOptions.Constant | ScopedItemOptions.ReadOnly)) != ScopedItemOptions.None)
109+
{
110+
continue;
111+
}
112+
else
113+
{
114+
initialSessionState.Variables.Remove(existing.Name, existing.GetType());
115+
initialSessionState.Variables.Add(new SessionStateVariableEntry(variable.Name, variable.Value, variable.Description, variable.Options, variable.Attributes));
116+
}
117+
}
118+
else
119+
{
120+
initialSessionState.Variables.Add(new SessionStateVariableEntry(variable.Name, variable.Value, variable.Description, variable.Options, variable.Attributes));
108121
}
109-
initialSessionState.Variables.Add(new SessionStateVariableEntry(variable.Name, variable.Value, variable.Description, variable.Options, variable.Attributes));
110122
}
111123
}
112124

@@ -134,7 +146,7 @@ InitialSessionState GetSessionState()
134146
}
135147
return InitialSessionState;
136148
}
137-
return GetSessionState(SessionState);
149+
return GetSessionState(base.SessionState);
138150
}
139151

140152

@@ -143,8 +155,7 @@ protected override void BeginProcessing()
143155
{
144156
ValidateParameters();
145157
var iss = GetSessionState();
146-
PowershellPool = new PowershellPool(ThrottleLimit, iss, _cancelationTokenSource.Token);
147-
PowershellPool.Open();
158+
PowershellPool = new PowershellPool(ThrottleLimit, iss, _cancelationTokenSource.Token);
148159
_worker = NoProgress ? (WorkerBase) new NoProgressWorker(this) : new ProgressWorker(this);
149160
}
150161

@@ -268,6 +279,7 @@ class ProgressWorker : WorkerBase
268279
{
269280
readonly ProgressManager _progressManager;
270281
private readonly List<PSObject> _input;
282+
private int _lastEstimate = -1;
271283
public ProgressWorker(InvokeParallelCommand cmdlet) : base(cmdlet)
272284
{
273285
_progressManager = new ProgressManager(cmdlet.ProgressId, cmdlet.ProgressActivity, $"Processing with {cmdlet.ThrottleLimit} workers", cmdlet.ParentProgressId);
@@ -282,25 +294,30 @@ public override void ProcessRecord(PSObject inputObject)
282294
public override void EndProcessing()
283295
{
284296
try
285-
{
297+
{
286298
_progressManager.TotalCount = _input.Count;
299+
var lastPercentComplete = -1;
287300
foreach (var i in _input)
288301
{
289302
var processed = Pool.GetEstimatedProgressCount();
290-
_progressManager.UpdateCurrentProgressRecord($"Starting processing of {i}", processed);
291-
WriteProgress(_progressManager.ProgressRecord);
303+
_lastEstimate = processed;
304+
_progressManager.SetCurrentOperation($"Starting processing of {i}");
305+
_progressManager.UpdateCurrentProgressRecord(processed);
306+
var pr = _progressManager.ProgressRecord;
307+
if (lastPercentComplete != pr.PercentComplete)
308+
{
309+
WriteProgress(pr);
310+
}
311+
292312
while (!Pool.TryAddInput(ScriptBlock, i))
293313
{
294314
WriteOutputs();
295315
}
296316
}
297-
317+
_progressManager.SetCurrentOperation("All work queued. Waiting for remaining work to complete.");
298318
while (!Pool.WaitForAllPowershellCompleted(100))
299319
{
300-
301-
_progressManager.UpdateCurrentProgressRecord("All work queued. Waiting for remaining work to complete.", Pool.GetEstimatedProgressCount());
302-
WriteProgress(_progressManager.ProgressRecord);
303-
320+
WriteProgressIfUpdated();
304321
if (Stopping)
305322
{
306323
return;
@@ -311,6 +328,7 @@ public override void EndProcessing()
311328
}
312329
finally
313330
{
331+
_progressManager.UpdateCurrentProgressRecord(Pool.GetEstimatedProgressCount());
314332
WriteProgress(_progressManager.Completed());
315333
}
316334
}
@@ -319,11 +337,27 @@ public override void WriteProgress(Collection<ProgressRecord> progress)
319337
{
320338
foreach (var p in progress)
321339
{
322-
p.ParentActivityId = _progressManager.ActivityId;
340+
if (p.ActivityId != _progressManager.ActivityId)
341+
{
342+
p.ParentActivityId = _progressManager.ActivityId;
343+
}
323344
WriteProgress(p);
324345
}
325-
_progressManager.UpdateCurrentProgressRecord(Pool.GetEstimatedProgressCount());
326-
WriteProgress(_progressManager.ProgressRecord);
346+
if (progress.Count > 0)
347+
{
348+
WriteProgressIfUpdated();
349+
}
350+
}
351+
352+
private void WriteProgressIfUpdated()
353+
{
354+
var estimatedCompletedCount = Pool.GetEstimatedProgressCount();
355+
if (_lastEstimate != estimatedCompletedCount)
356+
{
357+
_lastEstimate = estimatedCompletedCount;
358+
_progressManager.UpdateCurrentProgressRecord(estimatedCompletedCount);
359+
WriteProgress(_progressManager.ProgressRecord);
360+
}
327361
}
328362
}
329363
}
+59-48
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,37 @@
11
using System;
22
using System.Management.Automation;
3+
using System.Management.Automation.Runspaces;
34

45
namespace PSParallel
56
{
67
class PowerShellPoolMember : IDisposable
78
{
8-
private readonly PowershellPool m_pool;
9-
private readonly int m_index;
10-
private readonly PowerShellPoolStreams m_poolStreams;
11-
private PowerShell m_powerShell;
12-
public PowerShell PowerShell => m_powerShell;
13-
public int Index => m_index ;
14-
15-
private readonly PSDataCollection<PSObject> m_input =new PSDataCollection<PSObject>();
16-
private PSDataCollection<PSObject> m_output;
17-
private int m_percentComplete;
9+
private readonly PowershellPool _pool;
10+
private readonly int _index;
11+
private readonly PowerShellPoolStreams _poolStreams;
12+
private readonly Runspace _runspace;
13+
private PowerShell _powerShell;
14+
public PowerShell PowerShell => _powerShell;
15+
public int Index => _index ;
16+
17+
private readonly PSDataCollection<PSObject> _input =new PSDataCollection<PSObject>();
18+
private PSDataCollection<PSObject> _output;
19+
private int _percentComplete;
1820
public int PercentComplete
1921
{
20-
get { return m_percentComplete; }
21-
set { m_percentComplete = value; }
22+
get { return _percentComplete; }
23+
set { _percentComplete = value; }
2224
}
2325

2426

25-
public PowerShellPoolMember(PowershellPool pool, int index)
27+
public PowerShellPoolMember(PowershellPool pool, int index, Runspace runspace)
2628
{
27-
m_pool = pool;
28-
m_index = index;
29-
m_poolStreams = m_pool.Streams;
30-
m_input.Complete();
29+
_pool = pool;
30+
_index = index;
31+
_runspace = runspace;
32+
_runspace.Open();
33+
_poolStreams = _pool.Streams;
34+
_input.Complete();
3135
CreatePowerShell();
3236
}
3337

@@ -37,34 +41,35 @@ private void PowerShellOnInvocationStateChanged(object sender, PSInvocationState
3741
{
3842
case PSInvocationState.Stopped:
3943
ReleasePowerShell();
40-
m_pool.ReportStopped(this);
44+
_pool.ReportStopped(this);
4145
break;
4246
case PSInvocationState.Completed:
4347
case PSInvocationState.Failed:
4448
ReleasePowerShell();
4549
CreatePowerShell();
46-
m_pool.ReportAvailable(this);
50+
_pool.ReportAvailable(this);
4751
break;
4852
}
4953
}
5054

5155
private void CreatePowerShell()
5256
{
5357
var powerShell = PowerShell.Create();
58+
powerShell.Runspace = _runspace;
5459
HookStreamEvents(powerShell.Streams);
5560
powerShell.InvocationStateChanged += PowerShellOnInvocationStateChanged;
56-
m_powerShell = powerShell;
57-
m_output = new PSDataCollection<PSObject>();
58-
m_output.DataAdded += OutputOnDataAdded;
61+
_powerShell = powerShell;
62+
_output = new PSDataCollection<PSObject>();
63+
_output.DataAdded += OutputOnDataAdded;
5964
}
6065

6166
private void ReleasePowerShell()
6267
{
63-
UnhookStreamEvents(m_powerShell.Streams);
64-
m_powerShell.InvocationStateChanged -= PowerShellOnInvocationStateChanged;
65-
m_output.DataAdded -= OutputOnDataAdded;
66-
m_powerShell.Dispose();
67-
m_powerShell = null;
68+
UnhookStreamEvents(_powerShell.Streams);
69+
_powerShell.InvocationStateChanged -= PowerShellOnInvocationStateChanged;
70+
_output.DataAdded -= OutputOnDataAdded;
71+
_powerShell.Dispose();
72+
_powerShell = null;
6873
}
6974

7075

@@ -92,90 +97,96 @@ private void UnhookStreamEvents(PSDataStreams streams)
9297

9398
public void BeginInvoke(ScriptBlock scriptblock, PSObject inputObject)
9499
{
95-
m_percentComplete = 0;
100+
_percentComplete = 0;
96101
string command = $"param($_,$PSItem, $PSPArallelIndex,$PSParallelProgressId){scriptblock}";
97-
m_powerShell.AddScript(command)
102+
_powerShell.AddScript(command)
98103
.AddParameter("_", inputObject)
99104
.AddParameter("PSItem", inputObject)
100-
.AddParameter("PSParallelIndex", m_index)
101-
.AddParameter("PSParallelProgressId", m_index+1000);
102-
m_powerShell.BeginInvoke(m_input, m_output);
105+
.AddParameter("PSParallelIndex", _index)
106+
.AddParameter("PSParallelProgressId", _index+1000);
107+
_powerShell.BeginInvoke(_input, _output);
108+
}
109+
110+
internal void Reset()
111+
{
112+
_runspace.ResetRunspaceState();
103113
}
104114

105115
public void Dispose()
106116
{
107-
var ps = m_powerShell;
117+
var ps = _powerShell;
108118
if (ps != null)
109119
{
110120
UnhookStreamEvents(ps.Streams);
111121
ps.Dispose();
112122
}
113-
m_output.Dispose();
114-
m_input.Dispose();
123+
_output.Dispose();
124+
_input.Dispose();
125+
_runspace.Dispose();
115126
}
116127

117128
private void OutputOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs)
118129
{
119130
var item = ((PSDataCollection<PSObject>)sender)[dataAddedEventArgs.Index];
120-
m_poolStreams.Output.Add(item);
131+
_poolStreams.Output.Add(item);
121132
}
122133

123134

124135
private void InformationOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs)
125136
{
126137
var ir = ((PSDataCollection<InformationRecord>)sender)[dataAddedEventArgs.Index];
127-
m_poolStreams.Information.Add(ir);
138+
_poolStreams.Information.Add(ir);
128139
}
129140

130141
private void ProgressOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs)
131142
{
132143
var record = ((PSDataCollection<ProgressRecord>)sender)[dataAddedEventArgs.Index];
133-
m_percentComplete = record.PercentComplete;
134-
m_poolStreams.AddProgress(record, m_index);
144+
_percentComplete = record.PercentComplete;
145+
_poolStreams.AddProgress(record, _index);
135146
}
136147

137148
private void ErrorOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs)
138149
{
139150
var record = ((PSDataCollection<ErrorRecord>)sender)[dataAddedEventArgs.Index];
140-
m_poolStreams.Error.Add(record);
151+
_poolStreams.Error.Add(record);
141152
}
142153

143154
private void DebugOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs)
144155
{
145156
var record = ((PSDataCollection<DebugRecord>)sender)[dataAddedEventArgs.Index];
146-
m_poolStreams.Debug.Add(record);
157+
_poolStreams.Debug.Add(record);
147158
}
148159

149160
private void WarningOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs)
150161
{
151162
var record = ((PSDataCollection<WarningRecord>)sender)[dataAddedEventArgs.Index];
152-
m_poolStreams.Warning.Add(record);
163+
_poolStreams.Warning.Add(record);
153164
}
154165

155166
private void VerboseOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs)
156167
{
157168
var record = ((PSDataCollection<VerboseRecord>)sender)[dataAddedEventArgs.Index];
158-
m_poolStreams.Verbose.Add(record);
169+
_poolStreams.Verbose.Add(record);
159170
}
160171

161172
public void Stop()
162173
{
163-
if(m_powerShell.InvocationStateInfo.State != PSInvocationState.Stopped)
174+
if(_powerShell.InvocationStateInfo.State != PSInvocationState.Stopped)
164175
{
165-
UnhookStreamEvents(m_powerShell.Streams);
166-
m_powerShell.BeginStop(OnStopped, null);
176+
UnhookStreamEvents(_powerShell.Streams);
177+
_powerShell.BeginStop(OnStopped, null);
167178
}
168179
}
169180

170181
private void OnStopped(IAsyncResult ar)
171182
{
172-
var ps = m_powerShell;
183+
var ps = _powerShell;
173184
if (ps == null)
174185
{
175186
return;
176187
}
177188
ps.EndStop(ar);
178-
m_powerShell = null;
189+
_powerShell = null;
179190
}
180191
}
181192
}

0 commit comments

Comments
 (0)