diff --git a/.gitignore b/.gitignore
index 57a1574..6c86302 100644
--- a/.gitignore
+++ b/.gitignore
@@ -194,3 +194,4 @@ FakesAssemblies/
# Visual Studio 6 workspace options file
*.opt
+*.db
diff --git a/PSParallel.sln b/PSParallel.sln
index 334a019..6c6636f 100644
--- a/PSParallel.sln
+++ b/PSParallel.sln
@@ -21,6 +21,7 @@ EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "module", "module", "{9C879DF2-D1E2-4143-A95A-2374F8650F48}"
ProjectSection(SolutionItems) = preProject
module\en-US\about_PSParallel.Help.txt = module\en-US\about_PSParallel.Help.txt
+ module\en-US\PSParallel.dll-Help.xml = module\en-US\PSParallel.dll-Help.xml
module\PSParallel.psd1 = module\PSParallel.psd1
EndProjectSection
EndProject
diff --git a/README.md b/README.md
index bd66059..afe6ec8 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,9 @@
# PSParallel
+
Invoke scriptblocks in parallel runspaces
-##Installation
+## Installation
+
```PowerShell
Install-Module PSParallel
```
@@ -11,14 +13,15 @@ Install-Module PSParallel
(1..255).Foreach{"192.168.0.$_"} | Invoke-Parallel { [PSCustomObject] @{IP=$_;Result=ping.exe -4 -a -w 20 $_}}
```
-Variables are captured from the parent session but functions are not.
+Variables and functions are captured from the parent session.
+
+## Throttling
-##Throttling
To control the degree of parallelism, i.e. the number of concurrent runspaces, use the -ThrottleLimit parameter
```PowerShell
# process lots of crash dumps
-Get-ChildItem -recurce *.dmp | Invoke-Parallel -ThrottleLimit 64 -ProgressActivity "Processing dumps" {
+Get-ChildItem -Recurse *.dmp | Invoke-Parallel -ThrottleLimit 64 -ProgressActivity "Processing dumps" {
[PSCustomObject] @{ Dump=$_; Analysis = cdb.exe -z $_.fullname -c '"!analyze -v;q"'
}
```
@@ -27,9 +30,11 @@ The overhead of spinning up new PowerShell classes is non-zero. Invoke-Parallel

-##Contributions
+## Contributions
+
Pull requests and/or suggestions are more than welcome.
-###Acknowlegementes
+### Acknowledgements
+
The idea and the basis for the implementation comes from [RamblingCookieMonster](https://github.com/RamblingCookieMonster).
-Cudos for that implementation also goes to Boe Prox(@proxb) and Sergei Vorobev(@xvorsx).
\ No newline at end of file
+Kudos for that implementation also goes to Boe Prox(@proxb) and Sergei Vorobev(@xvorsx).
diff --git a/module/PSParallel.psd1 b/module/PSParallel.psd1
index 626174c..85a0627 100644
Binary files a/module/PSParallel.psd1 and b/module/PSParallel.psd1 differ
diff --git a/module/en-US/PSParallel.dll-Help.xml b/module/en-US/PSParallel.dll-Help.xml
index 2c825fa..931a439 100644
--- a/module/en-US/PSParallel.dll-Help.xml
+++ b/module/en-US/PSParallel.dll-Help.xml
@@ -1,14 +1,14 @@
-
-
+
+
-
+
-
+
Invoke-Parallel
@@ -22,21 +22,21 @@
- Th
+ The cmdlet uses a RunspacePool an and invokes the provied scriptblock once for each input.
+
+To control the environment of the scriptblock, the ImportModule, ImportVariable and ImportFunction parameters can be use
+d.
-
+
Invoke-Parallel
-
+
ScriptBlock
-
-
+ Specifies the operation that is performed on each input object. Enter a script block that describes the operation.
ScriptBlock
-
-
ParentProgressId
@@ -44,99 +44,108 @@
Identifies the parent activity of the current activity. Use the value -1 if the current activity has no parent activity.
Int32
-
-
ProgressId
- Specifies an ID that distinguishes each progress bar from the others. Use this parameter when you are creating more than one progress bar in a single command. If the progress bars do not have different IDs, they are superimposed instead of being displayed in a series.
+ Specifies an ID that distinguishes each progress bar from the others. Use this parameter when you are creating more than
+ one progress bar in a single command. If the progress bars do not have different IDs, they are superimposed instead of
+being displayed in a series.
Int32
-
-
ProgressActivity
- Specifies the first line of progress text in the heading above the status bar. This text describes the activity whose progress is being reported.
+ Specifies the first line of progress text in the heading above the status bar. This text describes the activity whose pr
+ogress is being reported.
String
-
-
-
+
ThrottleLimit
- Specifies the maximum number of concurrent connections that can be established to run this command. If you omit this parameter or enter a value of 0, the default value, 16, is used.
+ Specifies the maximum number of concurrent connections that can be established to run this command. If you omit this
+parameter or enter a value of 0, the default value, 16, is used.
Int32
-
-
+
+ InitialSessionState
+
+ The session state used by the runspaces when invoking ScriptBlock. This provides the functions, variables, drives, etc
+available to the ScriptBlock.
+By default, InitialSessionState.Create2() is used and the functions and variables from the current scope is then
+imported.
+
+ InitialSessionState
+
+
InputObject
- Specifies the input objects. Invoke-Parallel runs the script block on each input object in parallel. Enter a variable that contains the objects, or type a command or expression that gets the objects.
+ Specifies the input objects. Invoke-Parallel runs the script block on each input object in parallel. Enter a variable th
+at contains the objects, or type a command or expression that gets the objects.
PSObject
-
-
Invoke-Parallel
-
+
ScriptBlock
-
-
+ Specifies the operation that is performed on each input object. Enter a script block that describes the operation.
- ScriptBlock
-
-
+ ScriptBlock
-
+
ThrottleLimit
- Specifies the maximum number of concurrent connections that can be established to run this command. If you omit this parameter or enter a value of 0, the default value, 16, is used.
+ Specifies the maximum number of concurrent connections that can be established to run this command. If you omit this
+parameter or enter a value of 0, the default value, 16, is used.
- Int32
-
+ Int32
-
+
InputObject
- Specifies the input objects. Invoke-Parallel runs the script block on each input object in parallel. Enter a variable that contains the objects, or type a command or expression that gets the objects.
+ Specifies the input objects. Invoke-Parallel runs the script block on each input object in parallel. Enter a variable th
+at contains the objects, or type a command or expression that gets the objects.
- PSObject
-
-
+ PSObject
-
+
NoProgress
- Will now show progress from Invoke-Progress. Progress from the scriptblock will still be displayed.
+ Will not show progress from Invoke-Progress. Progress from the scriptblock will still be displayed.
- SwitchParameter
-
+ SwitchParameter
+
+
+ InitialSessionState
+
+ The session state used by the runspaces when invoking ScriptBlock. This provides the functions, variables, drives, etc
+available to the ScriptBlock.
+By default, InitialSessionState.Create2() is used and the functions and variables from the current scope is then
+imported.
+
+ InitialSessionState
-
-
+
+
ScriptBlock
-
-
+ Specifies the operation that is performed on each input object. Enter a script block that describes the operation.
ScriptBlock
ScriptBlock
-
-
+
ParentProgressId
@@ -148,39 +157,40 @@
Int32
-
-
+
ProgressId
- Specifies an ID that distinguishes each progress bar from the others. Use this parameter when you are creating more than one progress bar in a single command. If the progress bars do not have different IDs, they are superimposed instead of being displayed in a series.
+ Specifies an ID that distinguishes each progress bar from the others. Use this parameter when you are creating more than
+ one progress bar in a single command. If the progress bars do not have different IDs, they are superimposed instead of
+being displayed in a series.
Int32
Int32
-
-
+
ProgressActivity
- Specifies the first line of progress text in the heading above the status bar. This text describes the activity whose progress is being reported.
+ Specifies the first line of progress text in the heading above the status bar. This text describes the activity whose pr
+ogress is being reported.
String
String
-
-
+
-
+
ThrottleLimit
- Specifies the maximum number of concurrent connections that can be established to run this command. If you omit this parameter or enter a value of 0, the default value, 16, is used.
+ Specifies the maximum number of concurrent connections that can be established to run this command. If you omit this
+parameter or enter a value of 0, the default value, 16, is used.
Int32
@@ -189,23 +199,23 @@
-
+
InputObject
- Specifies the input objects. Invoke-Parallel runs the script block on each input object in parallel. Enter a variable that contains the objects, or type a command or expression that gets the objects.
+ Specifies the input objects. Invoke-Parallel runs the script block on each input object in parallel. Enter a variable th
+at contains the objects, or type a command or expression that gets the objects.
PSObject
PSObject
-
-
+
NoProgress
- Will now show progress from Invoke-Progress. Progress from the scriptblock will still be displayed.
+ Will not show progress from Invoke-Progress. Progress from the scriptblock will still be displayed.
SwitchParameter
@@ -214,46 +224,66 @@
-
-
-
-
+
+ InitialSessionState
+
+ The session state used by the runspaces when invoking ScriptBlock. This provides the functions, variables, drives, etc
+available to the ScriptBlock.
+By default, InitialSessionState.Create2() is used and the functions and variables from the current scope is then
+imported.
+
+ InitialSessionState
- System.Management.Automation.PSObject
-
+ InitialSessionState
+
+
+
+ Parameter8
-
-
-
-
-
+
+
+
+
+
+
+
- System.Object
+ System.Management.Automation.PSObject
+
+
+
+
+
+
+
+
+
-
-
+
+
-
+
-------------------------- EXAMPLE 1 --------------------------
PS C:\>
- 1..256 | Invoke-Parallel {$ip = 192.168.0.$_; $res = ping.exe -v4 -w20 $ip; [PSCustomObject] @{IP=$ip;Res=$res}}
+ (1..255).ForEach{"192.168.0.$_"} |
+ Invoke-Parallel {$ip = [IPAddress]$_; $res = ping.exe -n 1 -4 -w 20 -a $_; [PSCustomObject] @{IP=$ip;Res=$res}} -ThrottleLimit 64
-
-
+ This example pings all iP v4 addresses on a subnet, specifying Throttlelimit to 64, i.e. running up to 64 runspaces
+in parallel.
-
+
\ No newline at end of file
diff --git a/scripts/Install.ps1 b/scripts/Install.ps1
index ff1c97f..ac7fa11 100644
--- a/scripts/Install.ps1
+++ b/scripts/Install.ps1
@@ -1,10 +1,18 @@
-param([string]$InstallDirectory)
+$manPath = Get-ChildItem -recurse $PSScriptRoot/../module -include *.psd1 | Select-Object -first 1
+$man = Test-ModuleManifest $manPath
-$rootDir = Split-Path (Split-Path $MyInvocation.MyCommand.Path)
+$name = $man.Name
+[string]$version = $man.Version
+$moduleSourceDir = "$PSScriptRoot/$name"
+$moduleDir = "~/documents/WindowsPowerShell/Modules/$name/$version/"
+
+[string]$rootDir = Resolve-Path $PSSCriptRoot/..
+
+$InstallDirectory = $moduleDir
if ('' -eq $InstallDirectory)
{
- $personalModules = Join-Path -Path ([Environment]::GetFolderPath('MyDocuments')) -ChildPath WindowsPowerShell\Modules
+ $personalModules = Join-Path -Path ([Environment]::GetFolderPath('MyDocuments')) -ChildPath WindowsPowerShell\Modules\
if (($env:PSModulePath -split ';') -notcontains $personalModules)
{
Write-Warning "$personalModules is not in `$env:PSModulePath"
@@ -18,29 +26,36 @@ if ('' -eq $InstallDirectory)
$InstallDirectory = Join-Path -Path $personalModules -ChildPath PSParallel
}
-if (!(Test-Path $InstallDirectory))
+if(-not (Test-Path $InstallDirectory))
{
- $null = mkdir $InstallDirectory
+ $null = mkdir $InstallDirectory
}
-
-$moduleFileList = @(
- 'PSParallel.psd1'
- 'en-US\PSParallel.dll-Help.xml'
- 'en-US\about_PSParallel.Help.txt'
-
-)
-$binaryFileList = 'src\PsParallel\bin\Release\PSParallel.dll'
-
-
-
-$binaryFileList | foreach { Copy-Item "$rootDir\$_" -Destination $InstallDirectory }
-$moduleFileList | foreach {Copy-Item "$rootdir\module\$_" -Destination $InstallDirectory\$_ }
+@(
+ 'module\PSParallel.psd1'
+ 'src\PsParallel\bin\Release\PSParallel.dll'
+).Foreach{Copy-Item "$rootdir\$_" -Destination $InstallDirectory }
+
+$lang = @('en-us')
+
+$lang.Foreach{
+ $lang = $_
+ $langDir = "$InstallDirectory\$lang"
+ if(-not (Test-Path $langDir))
+ {
+ $null = MkDir $langDir
+ }
+
+ @(
+ 'PSParallel.dll-Help.xml'
+ 'about_PSParallel.Help.txt'
+ ).Foreach{Copy-Item "$rootDir\module\$lang\$_" -Destination $langDir}
+}
Get-ChildItem -Recurse -Path $InstallDirectory
-$cert = Get-Item Cert:\CurrentUser\My\98D6087848D1213F20149ADFE698473429A9B15D
-Get-ChildItem -File $InstallDirectory | Set-AuthenticodeSignature -Certificate $cert
-
-
-
\ No newline at end of file
+$cert =Get-ChildItem cert:\CurrentUser\My -CodeSigningCert
+if($cert)
+{
+ Get-ChildItem -File $InstallDirectory -Include *.dll,*.psd1 -Recurse | Set-AuthenticodeSignature -Certificate $cert -TimestampServer http://timestamp.verisign.com/scripts/timstamp.dll
+}
diff --git a/scripts/Publish-ToGallery.ps1 b/scripts/Publish-ToGallery.ps1
index 33a1b26..8aff037 100644
--- a/scripts/Publish-ToGallery.ps1
+++ b/scripts/Publish-ToGallery.ps1
@@ -1,11 +1,13 @@
+$manPath = Get-ChildItem -recurse $PSScriptRoot/../module -include *.psd1 | Select-Object -first 1
+$man = Test-ModuleManifest $manPath
+
+$name = $man.Name
+[string]$version = $man.Version
+
$p = @{
- Name = "PSParallel"
+ Name = $name
NuGetApiKey = $NuGetApiKey
- LicenseUri = "https://github.com/powercode/PSParallel/blob/master/LICENSE"
- IconUri = "https://github.com/powercode/PSParallel/blob/master/images/PSParallel_icon.png"
- Tag = "Parallel","Runspace","Invoke","Foreach"
- ReleaseNote = "Adding authenticode signature."
- ProjectUri = "https://github.com/powercode/PSParallel"
+ RequiredVersion = $version
}
Publish-Module @p
diff --git a/scripts/dbg.ps1 b/scripts/dbg.ps1
new file mode 100644
index 0000000..ed4c670
--- /dev/null
+++ b/scripts/dbg.ps1
@@ -0,0 +1,58 @@
+param(
+ [int] $ThrottleLimit = 3,
+ [int] $Milliseconds = 500
+)
+
+Import-Module PSParallel -RequiredVersion 2.2.1
+
+function new-philosopher {
+ param($name, [string[]] $treats)
+ [PSCustomObject] @{
+ Name = $name
+ Treats = $treats
+ }
+}
+
+
+$philosopherData = @(
+ new-philosopher 'Immanuel Kant' 'was a real pissant','who where very rarely stable'
+ new-philosopher 'Heidegger' 'was a boozy beggar', 'Who could think you under the table'
+ new-philosopher 'David Hume' 'could out-consume Schopenhauer and Hegel'
+ new-philosopher 'Wittgenstein' 'was a beery swine', 'Who was just as sloshed as Schlegel'
+ new-philosopher 'John Stuart Mill' 'of his own free will', 'On half a pint of shandy was particularly ill'
+ new-philosopher 'Nietzsche' 'There''s nothing Nietzsche couldn''t teach ya', 'Bout the raising of the wrist.'
+ new-philosopher 'Plato' 'they say, could stick it away', 'Half a crate of whiskey every day'
+ new-philosopher 'Aristotle' 'was a bugger for the bottle'
+ new-philosopher 'Hobbes' 'was fond of his dram'
+ new-philosopher 'Rene Descartes' 'was a drunken fart:', 'I drink, therefore I am'
+ new-philosopher 'Socrates' 'is particularly missed','A lovely little thinker but a bugger when he''s pissed!'
+ )
+
+
+1..100 | invoke-parallel -Throttle $ThrottleLimit {
+
+
+
+ $pd = $philosopherData[($_ -1)% $philosopherData.Count]
+
+ 1..100 | ForEach-Object {
+ $op = switch($_ % 8)
+ {
+ 0 { 'sleeping' }
+ 1 { 'drinking' }
+ 2 { 'drinking' }
+ 3 { 'thinking' }
+ 4 { 'drinking' }
+ 5 { 'drinking' }
+ 6 { 'eating' }
+ 7 { 'drinking' }
+ }
+
+ $status = $pd.Treats[$_ % $pd.Treats.Length]
+
+ $name = $pd.Name
+ $currentOperation = "$name is currently $op"
+ Write-Progress -id $PSParallelProgressId -percent $_ -activity $pd.Name -Status $status -CurrentOperation $currentOperation
+ Start-Sleep -milliseconds ($Milliseconds + 100 * (Get-Random -Minimum 3 -Maximum 7))
+ }
+}
\ No newline at end of file
diff --git a/src/PSParallel/InvokeParallelCommand.cs b/src/PSParallel/InvokeParallelCommand.cs
index f495fcf..42ab9b2 100644
--- a/src/PSParallel/InvokeParallelCommand.cs
+++ b/src/PSParallel/InvokeParallelCommand.cs
@@ -1,14 +1,15 @@
using System;
-using System.CodeDom;
using System.Collections.Generic;
using System.Collections.ObjectModel;
+using System.Diagnostics;
using System.Linq;
using System.Management.Automation;
-using System.Management.Automation.Language;
using System.Management.Automation.Runspaces;
using System.Threading;
-using Microsoft.PowerShell.Commands;
+// ReSharper disable UnusedAutoPropertyAccessor.Global
+// ReSharper disable AutoPropertyCanBeMadeGetOnly.Global
+// ReSharper disable MemberCanBePrivate.Global
namespace PSParallel
{
[Alias("ipa")]
@@ -18,190 +19,172 @@ public sealed class InvokeParallelCommand : PSCmdlet, IDisposable
[Parameter(Mandatory = true, Position = 0)]
public ScriptBlock ScriptBlock { get; set; }
- [Alias("ppi")]
[Parameter(ParameterSetName = "Progress")]
+ [Alias("ppi")]
public int ParentProgressId { get; set; } = -1;
- [Alias("pi")]
[Parameter(ParameterSetName = "Progress")]
+ [Alias("pi")]
public int ProgressId { get; set; } = 1000;
- [Alias("pa")]
[Parameter(ParameterSetName = "Progress")]
+ [Alias("pa")]
[ValidateNotNullOrEmpty]
public string ProgressActivity { get; set; } = "Invoke-Parallel";
[Parameter]
- [ValidateRange(1,128)]
+ [ValidateRange(1, 128)]
public int ThrottleLimit { get; set; } = 32;
+ [Parameter]
+ [AllowNull]
+ [Alias("iss")]
+ public InitialSessionState InitialSessionState { get; set; }
+
[Parameter(ValueFromPipeline = true, Mandatory = true)]
public PSObject InputObject { get; set; }
[Parameter(ParameterSetName = "NoProgress")]
public SwitchParameter NoProgress { get; set; }
- private readonly CancellationTokenSource m_cancelationTokenSource = new CancellationTokenSource();
- private PowershellPool m_powershellPool;
- private InitialSessionState m_initialSessionState;
- private ProgressManager m_progressManager;
-
- // this is only used when NoProgress is not specified
- // Input is then captured in ProcessRecored and processed in EndProcessing
- private List m_input;
+ private readonly CancellationTokenSource _cancelationTokenSource = new CancellationTokenSource();
+ internal PowershellPool PowershellPool;
- private static InitialSessionState GetSessionState(ScriptBlock scriptBlock, SessionState sessionState)
+ private static InitialSessionState GetSessionState(SessionState sessionState)
{
var initialSessionState = InitialSessionState.CreateDefault2();
-
- CaptureVariables(scriptBlock, sessionState, initialSessionState);
- // this will get invoked recursively
-
- var functions = GetFunctions(sessionState);
-
- CaptureFunctions(scriptBlock, initialSessionState, functions, new HashSet());
+ CaptureVariables(sessionState, initialSessionState);
+ CaptureFunctions(sessionState, initialSessionState);
return initialSessionState;
}
- private static IDictionary GetFunctions(SessionState sessionState)
+ private static IEnumerable GetFunctions(SessionState sessionState)
{
- var baseObject = (Dictionary.ValueCollection) sessionState.InvokeProvider.Item.Get("function:")[0].BaseObject;
- return baseObject.ToDictionary(f=>f.Name);
+ try
+ {
+ var functionDrive = sessionState.InvokeProvider.Item.Get("function:");
+ return (Dictionary.ValueCollection)functionDrive[0].BaseObject;
+
+ }
+ catch (DriveNotFoundException)
+ {
+ return new FunctionInfo[] { };
+ }
}
- private static void CaptureFunctions(ScriptBlock scriptBlock, InitialSessionState initialSessionState,
- IDictionary functions, ISet processedFunctions)
+ private static IEnumerable GetVariables(SessionState sessionState)
{
- var commands = scriptBlock.Ast.FindAll((ast) => ast is CommandAst, true);
-
- var nonProcessedCommandNames = commands.Cast()
- .Select(commandAst => commandAst.CommandElements[0].Extent.Text)
- .Where(commandName => !processedFunctions.Contains(commandName));
- foreach (var commandName in nonProcessedCommandNames)
+ try
{
-
- FunctionInfo functionInfo;
- if (!functions.TryGetValue(commandName, out functionInfo))
- {
- continue;
- }
- initialSessionState.Commands.Add(new SessionStateFunctionEntry(functionInfo.Name, functionInfo.Definition));
- processedFunctions.Add(commandName);
- CaptureFunctions(functionInfo.ScriptBlock, initialSessionState, functions, processedFunctions);
+ string[] noTouchVariables = { "null", "true", "false", "Error" };
+ var variables = sessionState.InvokeProvider.Item.Get("Variable:");
+ var psVariables = (IEnumerable)variables[0].BaseObject;
+
+ return psVariables.Where(p => !noTouchVariables.Contains(p.Name));
+ }
+ catch (DriveNotFoundException)
+ {
+ return new PSVariable[] { };
}
}
+ private static void CaptureFunctions(SessionState sessionState, InitialSessionState initialSessionState)
+ {
+ var functions = GetFunctions(sessionState);
+ foreach (var func in functions)
+ {
+ initialSessionState.Commands.Add(new SessionStateFunctionEntry(func.Name, func.Definition));
+ }
+ }
- private static void CaptureVariables(ScriptBlock scriptBlock, SessionState sessionState,
- InitialSessionState initialSessionState)
+ private static void CaptureVariables(SessionState sessionState, InitialSessionState initialSessionState)
{
- var variables = scriptBlock.Ast.FindAll(ast => ast is VariableExpressionAst, true);
- var varDict = new Dictionary();
- foreach (var ast in variables)
+ var variables = GetVariables(sessionState);
+ foreach (var variable in variables)
{
- var v = (VariableExpressionAst) ast;
- var variableName = v.VariablePath.UserPath;
- if (variableName == "_" || varDict.ContainsKey(variableName))
+ var existing = initialSessionState.Variables[variable.Name].FirstOrDefault();
+ if (existing != null)
{
- continue;
+ if ((existing.Options & (ScopedItemOptions.Constant | ScopedItemOptions.ReadOnly)) != ScopedItemOptions.None)
+ {
+ continue;
+ }
+ else
+ {
+ initialSessionState.Variables.Remove(existing.Name, existing.GetType());
+ initialSessionState.Variables.Add(new SessionStateVariableEntry(variable.Name, variable.Value, variable.Description, variable.Options, variable.Attributes));
+ }
}
-
- var variable = sessionState.PSVariable.Get(variableName);
- if (variable != null)
+ else
{
- var ssve = new SessionStateVariableEntry(variable.Name, variable.Value,
- variable.Description, variable.Options, variable.Attributes);
- varDict.Add(variableName, ssve);
+ initialSessionState.Variables.Add(new SessionStateVariableEntry(variable.Name, variable.Value, variable.Description, variable.Options, variable.Attributes));
}
}
+ }
- var prefs = new[]
- {
- "ErrorActionPreference", "DebugPreference", "VerbosePreference", "WarningPreference",
- "ProgressPreference", "InformationPreference", "ConfirmPreference", "WhatIfPreference"
- };
- foreach (var pref in prefs)
+ private void ValidateParameters()
+ {
+ if (NoProgress)
{
- var v = sessionState.PSVariable.Get(pref);
- if (v != null)
+ var boundParameters = MyInvocation.BoundParameters;
+ foreach (var p in new[] { nameof(ProgressActivity), nameof(ParentProgressId), nameof(ProgressId) })
{
- var ssve = new SessionStateVariableEntry(v.Name, v.Value,
- v.Description, v.Options, v.Attributes);
- varDict.Add(v.Name, ssve);
+ if (!boundParameters.ContainsKey(p)) continue;
+ var argumentException = new ArgumentException($"'{p}' must not be specified together with 'NoProgress'", p);
+ ThrowTerminatingError(new ErrorRecord(argumentException, "InvalidProgressParam", ErrorCategory.InvalidArgument, p));
}
}
-
- initialSessionState.Variables.Add(varDict.Values);
}
- protected override void BeginProcessing()
+ InitialSessionState GetSessionState()
{
- m_initialSessionState = GetSessionState(ScriptBlock, SessionState);
- m_powershellPool = new PowershellPool(ThrottleLimit,m_initialSessionState, m_cancelationTokenSource.Token);
- m_powershellPool.Open();
- if (!NoProgress)
+ if (MyInvocation.BoundParameters.ContainsKey(nameof(InitialSessionState)))
{
- m_progressManager = new ProgressManager(ProgressId, ProgressActivity, $"Processing with {ThrottleLimit} workers", ParentProgressId);
- m_input = new List(500);
+ if (InitialSessionState == null)
+ {
+ return InitialSessionState.Create();
+ }
+ return InitialSessionState;
}
+ return GetSessionState(base.SessionState);
+ }
+
+
+ private WorkerBase _worker;
+ protected override void BeginProcessing()
+ {
+ ValidateParameters();
+ var iss = GetSessionState();
+ PowershellPool = new PowershellPool(ThrottleLimit, iss, _cancelationTokenSource.Token);
+ _worker = NoProgress ? (WorkerBase) new NoProgressWorker(this) : new ProgressWorker(this);
}
+
protected override void ProcessRecord()
{
- if(NoProgress)
- {
- m_powershellPool.AddInput(ScriptBlock, InputObject);
- WriteOutputs();
- }
- else
- {
- m_input.Add(InputObject);
- }
+ _worker.ProcessRecord(InputObject);
}
protected override void EndProcessing()
{
- try {
- if (!NoProgress)
- {
- m_progressManager.TotalCount = m_input.Count;
- foreach (var i in m_input)
- {
- var pr = m_progressManager.GetCurrentProgressRecord($"Starting processing of {i}", m_powershellPool.ProcessedCount);
- WriteProgress(pr);
- m_powershellPool.AddInput(ScriptBlock, i);
- WriteOutputs();
- }
- }
- while(!m_powershellPool.WaitForAllPowershellCompleted(100))
- {
- var pr = m_progressManager.GetCurrentProgressRecord("All work queued. Waiting for remaining work to complete.", m_powershellPool.ProcessedCount);
- WriteProgress(pr);
- if (Stopping)
- {
- return;
- }
- WriteOutputs();
- }
- WriteOutputs();
- }
- finally
- {
- if(!NoProgress)
- {
- WriteProgress(m_progressManager.Completed());
- }
- }
+ _worker.EndProcessing();
+
}
protected override void StopProcessing()
{
- m_cancelationTokenSource.Cancel();
+ _cancelationTokenSource.Cancel();
+ PowershellPool?.Stop();
}
private void WriteOutputs()
{
- var streams = m_powershellPool.Streams;
+ Debug.WriteLine("Processing output");
+ if (_cancelationTokenSource.IsCancellationRequested)
+ {
+ return;
+ }
+ var streams = PowershellPool.Streams;
foreach (var o in streams.Output.ReadAll())
{
WriteObject(o, false);
@@ -227,21 +210,156 @@ private void WriteOutputs()
{
WriteVerbose(v.Message);
}
- foreach (var p in streams.Progress.ReadAll())
+ _worker.WriteProgress(streams.ReadAllProgress());
+ }
+
+ public void Dispose()
+ {
+ PowershellPool?.Dispose();
+ _cancelationTokenSource.Dispose();
+ }
+
+
+ private abstract class WorkerBase
+ {
+ protected readonly InvokeParallelCommand Cmdlet;
+ protected readonly PowershellPool Pool;
+ protected bool Stopping => Cmdlet.Stopping;
+ protected void WriteOutputs() => Cmdlet.WriteOutputs();
+ protected void WriteProgress(ProgressRecord record) => Cmdlet.WriteProgress(record);
+ public abstract void ProcessRecord(PSObject inputObject);
+ public abstract void EndProcessing();
+ public abstract void WriteProgress(Collection progress);
+ protected ScriptBlock ScriptBlock => Cmdlet.ScriptBlock;
+
+ protected WorkerBase(InvokeParallelCommand cmdlet)
+ {
+ Cmdlet = cmdlet;
+ Pool = cmdlet.PowershellPool;
+ }
+ }
+
+ class NoProgressWorker : WorkerBase
+ {
+ public NoProgressWorker(InvokeParallelCommand cmdlet) : base(cmdlet)
+ {
+ }
+
+ public override void ProcessRecord(PSObject inputObject)
+ {
+ while (!Pool.TryAddInput(Cmdlet.ScriptBlock, Cmdlet.InputObject))
+ {
+ Cmdlet.WriteOutputs();
+ }
+ }
+
+ public override void EndProcessing()
+ {
+ while (!Pool.WaitForAllPowershellCompleted(100))
+ {
+ if (Stopping)
+ {
+ return;
+ }
+ WriteOutputs();
+ }
+ WriteOutputs();
+ }
+
+ public override void WriteProgress(Collection progress)
{
- if(!NoProgress)
+ foreach (var p in progress)
{
- p.ParentActivityId = m_progressManager.ActivityId;
+ base.WriteProgress(p);
}
- WriteProgress(p);
}
}
- public void Dispose()
+ class ProgressWorker : WorkerBase
{
- m_powershellPool.Dispose();
- m_cancelationTokenSource.Dispose();
- }
+ readonly ProgressManager _progressManager;
+ private readonly List _input;
+ private int _lastEstimate = -1;
+ public ProgressWorker(InvokeParallelCommand cmdlet) : base(cmdlet)
+ {
+ _progressManager = new ProgressManager(cmdlet.ProgressId, cmdlet.ProgressActivity, $"Processing with {cmdlet.ThrottleLimit} workers", cmdlet.ParentProgressId);
+ _input = new List(500);
+ }
+
+ public override void ProcessRecord(PSObject inputObject)
+ {
+ _input.Add(inputObject);
+ }
+
+ public override void EndProcessing()
+ {
+ try
+ {
+ _progressManager.TotalCount = _input.Count;
+ var lastPercentComplete = -1;
+ foreach (var i in _input)
+ {
+ var processed = Pool.GetEstimatedProgressCount();
+ _lastEstimate = processed;
+ _progressManager.SetCurrentOperation($"Starting processing of {i}");
+ _progressManager.UpdateCurrentProgressRecord(processed);
+ var pr = _progressManager.ProgressRecord;
+ if (lastPercentComplete != pr.PercentComplete)
+ {
+ WriteProgress(pr);
+ lastPercentComplete = pr.PercentComplete;
+ }
+
+ while (!Pool.TryAddInput(ScriptBlock, i))
+ {
+ WriteOutputs();
+ }
+ }
+ _progressManager.SetCurrentOperation("All work queued. Waiting for remaining work to complete.");
+ while (!Pool.WaitForAllPowershellCompleted(100))
+ {
+ WriteProgressIfUpdated();
+ if (Stopping)
+ {
+ return;
+ }
+ WriteOutputs();
+ }
+ WriteOutputs();
+ }
+ finally
+ {
+ _progressManager.UpdateCurrentProgressRecord(Pool.GetEstimatedProgressCount());
+ WriteProgress(_progressManager.Completed());
+ }
+ }
+ public override void WriteProgress(Collection progress)
+ {
+ foreach (var p in progress)
+ {
+ if (p.ActivityId != _progressManager.ActivityId)
+ {
+ p.ParentActivityId = _progressManager.ActivityId;
+ }
+ WriteProgress(p);
+ }
+ if (progress.Count > 0)
+ {
+ WriteProgressIfUpdated();
+ }
+ }
+
+ private void WriteProgressIfUpdated()
+ {
+ var estimatedCompletedCount = Pool.GetEstimatedProgressCount();
+ if (_lastEstimate != estimatedCompletedCount)
+ {
+ _lastEstimate = estimatedCompletedCount;
+ _progressManager.UpdateCurrentProgressRecord(estimatedCompletedCount);
+ WriteProgress(_progressManager.ProgressRecord);
+ }
+ }
+ }
}
}
diff --git a/src/PSParallel/PSParallel.csproj b/src/PSParallel/PSParallel.csproj
index 75319f0..5a72cfb 100644
--- a/src/PSParallel/PSParallel.csproj
+++ b/src/PSParallel/PSParallel.csproj
@@ -33,6 +33,10 @@
false
+
+ False
+ ..\..\..\..\..\..\..\Windows\Microsoft.NET\assembly\GAC_MSIL\Microsoft.PowerShell.Commands.Utility\v4.0_3.0.0.0__31bf3856ad364e35\Microsoft.PowerShell.Commands.Utility.dll
+
diff --git a/src/PSParallel/PowerShellPoolMember.cs b/src/PSParallel/PowerShellPoolMember.cs
index ce7bf95..50265ae 100644
--- a/src/PSParallel/PowerShellPoolMember.cs
+++ b/src/PSParallel/PowerShellPoolMember.cs
@@ -1,62 +1,94 @@
using System;
using System.Management.Automation;
+using System.Management.Automation.Runspaces;
using System.Threading;
namespace PSParallel
{
class PowerShellPoolMember : IDisposable
{
- private readonly PowershellPool m_pool;
- private readonly PowerShellPoolStreams m_poolStreams;
- private PowerShell m_powerShell;
- public PowerShell PowerShell => m_powerShell;
- private readonly PSDataCollection m_input =new PSDataCollection();
- private PSDataCollection m_output;
+ private readonly PowershellPool _pool;
+ private readonly int _index;
+ private readonly InitialSessionState _initialSessionState;
+ private readonly PowerShellPoolStreams _poolStreams;
+ private PowerShell _powerShell;
+ public PowerShell PowerShell => _powerShell;
+ public int Index => _index ;
+
+ private readonly PSDataCollection _input =new PSDataCollection();
+ private PSDataCollection _output;
+ private int _percentComplete;
+ public int PercentComplete
+ {
+ get { return _percentComplete; }
+ set { _percentComplete = value; }
+ }
+
- public PowerShellPoolMember(PowershellPool pool)
+ public PowerShellPoolMember(PowershellPool pool, int index, InitialSessionState initialSessionState)
{
- m_pool = pool;
- m_poolStreams = m_pool.Streams;
- m_input.Complete();
- CreatePowerShell();
+ _pool = pool;
+ _index = index;
+ _initialSessionState = initialSessionState;
+ _poolStreams = _pool.Streams;
+ _input.Complete();
+ CreatePowerShell(initialSessionState);
}
private void PowerShellOnInvocationStateChanged(object sender, PSInvocationStateChangedEventArgs psInvocationStateChangedEventArgs)
{
switch (psInvocationStateChangedEventArgs.InvocationStateInfo.State)
{
-
case PSInvocationState.Stopped:
+ ReleasePowerShell();
+ _pool.ReportStopped(this);
+ break;
case PSInvocationState.Completed:
- case PSInvocationState.Failed:
- ReturnPowerShell(m_powerShell);
- CreatePowerShell();
- m_pool.ReportCompletion(this);
-
+ case PSInvocationState.Failed:
+ ResetPowerShell();
+ _pool.ReportAvailable(this);
break;
}
}
- private void CreatePowerShell()
+ private void CreatePowerShell(InitialSessionState initialSessionState)
{
- var powerShell = PowerShell.Create();
+ var powerShell = PowerShell.Create(RunspaceMode.NewRunspace);
+ var runspace = RunspaceFactory.CreateRunspace(initialSessionState);
+ runspace.ApartmentState = ApartmentState.MTA;
+ powerShell.Runspace = runspace;
+ runspace.Open();
HookStreamEvents(powerShell.Streams);
powerShell.InvocationStateChanged += PowerShellOnInvocationStateChanged;
- m_powerShell = powerShell;
- m_output = new PSDataCollection();
- m_output.DataAdded += OutputOnDataAdded;
+ _powerShell = powerShell;
+ _output = new PSDataCollection();
+ _output.DataAdded += OutputOnDataAdded;
+ }
+
+ public void ResetPowerShell()
+ {
+ UnhookStreamEvents(_powerShell.Streams);
+ _powerShell.Runspace.ResetRunspaceState();
+ var runspace = _powerShell.Runspace;
+ _powerShell = PowerShell.Create(RunspaceMode.NewRunspace);
+ _powerShell.Runspace = runspace;
+
+ HookStreamEvents(_powerShell.Streams);
+ _powerShell.InvocationStateChanged += PowerShellOnInvocationStateChanged;
+ _output = new PSDataCollection();
+ _output.DataAdded += OutputOnDataAdded;
}
- private void ReturnPowerShell(PowerShell powershell)
+ private void ReleasePowerShell()
{
- UnhookStreamEvents(powershell.Streams);
- powershell.InvocationStateChanged -= PowerShellOnInvocationStateChanged;
- m_output.DataAdded -= OutputOnDataAdded;
- powershell.Dispose();
+ UnhookStreamEvents(_powerShell.Streams);
+ _powerShell.InvocationStateChanged -= PowerShellOnInvocationStateChanged;
+ _output.DataAdded -= OutputOnDataAdded;
+ _powerShell.Dispose();
}
- private void HookStreamEvents(PSDataStreams streams)
+ private void HookStreamEvents(PSDataStreams streams)
{
streams.Debug.DataAdded += DebugOnDataAdded;
streams.Error.DataAdded += ErrorOnDataAdded;
@@ -80,65 +112,91 @@ private void UnhookStreamEvents(PSDataStreams streams)
public void BeginInvoke(ScriptBlock scriptblock, PSObject inputObject)
{
- string command = $"param($_,$PSItem){scriptblock}";
- m_powerShell.AddScript(command)
+ _percentComplete = 0;
+ string command = $"param($_,$PSItem, $PSPArallelIndex,$PSParallelProgressId){scriptblock}";
+ _powerShell.AddScript(command)
.AddParameter("_", inputObject)
- .AddParameter("PSItem", inputObject);
- m_powerShell.BeginInvoke(m_input, m_output);
+ .AddParameter("PSItem", inputObject)
+ .AddParameter("PSParallelIndex", _index)
+ .AddParameter("PSParallelProgressId", _index + 1000);
+ _powerShell.BeginInvoke(_input, _output);
}
public void Dispose()
{
- if (m_powerShell != null)
+ var ps = _powerShell;
+ if (ps != null)
{
- UnhookStreamEvents(m_powerShell.Streams);
+ UnhookStreamEvents(ps.Streams);
+ ps.Runspace?.Dispose();
+ ps.Dispose();
}
- m_output.Dispose();
- m_input.Dispose();
- m_powerShell?.Dispose();
+ _output.Dispose();
+ _input.Dispose();
}
private void OutputOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs)
{
var item = ((PSDataCollection)sender)[dataAddedEventArgs.Index];
- m_poolStreams.Output.Add(item);
+ _poolStreams.Output.Add(item);
}
private void InformationOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs)
{
var ir = ((PSDataCollection)sender)[dataAddedEventArgs.Index];
- m_poolStreams.Information.Add(ir);
+ _poolStreams.Information.Add(ir);
}
private void ProgressOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs)
{
- var record = ((PSDataCollection)sender)[dataAddedEventArgs.Index];
- m_poolStreams.Progress.Add(record);
+ var psDataCollection = ((PSDataCollection) sender);
+ var record = psDataCollection[dataAddedEventArgs.Index];
+ _poolStreams.AddProgress(record, _index);
}
private void ErrorOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs)
{
var record = ((PSDataCollection)sender)[dataAddedEventArgs.Index];
- m_poolStreams.Error.Add(record);
+ _poolStreams.Error.Add(record);
}
private void DebugOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs)
{
var record = ((PSDataCollection)sender)[dataAddedEventArgs.Index];
- m_poolStreams.Debug.Add(record);
+ _poolStreams.Debug.Add(record);
}
private void WarningOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs)
{
var record = ((PSDataCollection)sender)[dataAddedEventArgs.Index];
- m_poolStreams.Warning.Add(record);
+ _poolStreams.Warning.Add(record);
}
private void VerboseOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs)
{
var record = ((PSDataCollection)sender)[dataAddedEventArgs.Index];
- m_poolStreams.Verbose.Add(record);
+ _poolStreams.Verbose.Add(record);
+ }
+
+ public void Stop()
+ {
+ if(_powerShell.InvocationStateInfo.State != PSInvocationState.Stopped)
+ {
+ UnhookStreamEvents(_powerShell.Streams);
+ _powerShell.BeginStop(OnStopped, null);
+ }
+ }
+
+ private void OnStopped(IAsyncResult ar)
+ {
+ var ps = _powerShell;
+ if (ps == null)
+ {
+ return;
+ }
+ ps.EndStop(ar);
+ _powerShell = null;
}
}
}
\ No newline at end of file
diff --git a/src/PSParallel/PowerShellPoolStreams.cs b/src/PSParallel/PowerShellPoolStreams.cs
index 56e64e0..bcbe46b 100644
--- a/src/PSParallel/PowerShellPoolStreams.cs
+++ b/src/PSParallel/PowerShellPoolStreams.cs
@@ -1,4 +1,7 @@
using System;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Linq;
using System.Management.Automation;
namespace PSParallel
@@ -7,9 +10,9 @@ class PowerShellPoolStreams : IDisposable
{
public PSDataCollection Output { get; } = new PSDataCollection(100);
public PSDataCollection Debug { get; } = new PSDataCollection();
- public PSDataCollection Progress { get; } = new PSDataCollection();
+ private PSDataCollection Progress { get; } = new PSDataCollection();
public PSDataCollection Error { get; } = new PSDataCollection();
- public PSDataCollection Warning { get; } = new PSDataCollection();
+ public PSDataCollection Warning { get; } = new PSDataCollection();
public PSDataCollection Information { get; } = new PSDataCollection();
public PSDataCollection Verbose { get; } = new PSDataCollection();
@@ -22,5 +25,52 @@ public void Dispose()
Information.Dispose();
Verbose.Dispose();
}
+
+ public void AddProgress(ProgressRecord progress, int index)
+ {
+ DoAddProgress(progress);
+ OnProgressChanged(progress.PercentComplete, index);
+ }
+
+ public void ClearProgress(int index)
+ {
+ OnProgressChanged(0, index);
+ }
+
+ protected void DoAddProgress(ProgressRecord progress)
+ {
+ Progress.Add(progress);
+ }
+
+ protected virtual void OnProgressChanged(int progress, int index){}
+
+ public Collection ReadAllProgress()
+ {
+ return Progress.ReadAll();
+ }
}
+
+ class ProgressTrackingPowerShellPoolStreams : PowerShellPoolStreams
+ {
+ private readonly int _maxPoolSize;
+ private readonly int[] _poolProgress;
+ private int _currentProgress;
+ public ProgressTrackingPowerShellPoolStreams(int maxPoolSize)
+ {
+ _maxPoolSize = maxPoolSize;
+ _poolProgress = new int[maxPoolSize];
+ }
+
+ protected override void OnProgressChanged(int progress, int index)
+ {
+ lock(_poolProgress) {
+ _poolProgress[index] = progress;
+ _currentProgress = _poolProgress.Sum();
+ }
+ }
+
+ public int PoolPercentComplete => _currentProgress/_maxPoolSize;
+
+ }
+
}
\ No newline at end of file
diff --git a/src/PSParallel/PowershellPool.cs b/src/PSParallel/PowershellPool.cs
index 6031c3a..0412a26 100644
--- a/src/PSParallel/PowershellPool.cs
+++ b/src/PSParallel/PowershellPool.cs
@@ -1,99 +1,154 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
+using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Management.Automation;
+using System.Management.Automation.Host;
using System.Management.Automation.Runspaces;
using System.Threading;
+
namespace PSParallel
{
- class PowershellPool : IDisposable
+ sealed class PowershellPool : IDisposable
{
- private int m_busyCount;
- private int m_processedCount;
- private readonly CancellationToken m_cancellationToken;
- private readonly RunspacePool m_runspacePool;
- private readonly List m_poolMembers;
- private readonly BlockingCollection m_availablePoolMembers = new BlockingCollection(new ConcurrentStack ());
+ private readonly object _countLock = new object();
+ private int _busyCount;
+ private readonly CancellationToken _cancellationToken;
+ private readonly List _poolMembers;
+ private readonly InitialSessionState _initialSessionState;
+ private readonly BlockingCollection _availablePoolMembers = new BlockingCollection(new ConcurrentQueue());
public readonly PowerShellPoolStreams Streams = new PowerShellPoolStreams();
-
- public int ProcessedCount => m_processedCount;
+ private int _processedCount;
public PowershellPool(int poolSize, InitialSessionState initialSessionState, CancellationToken cancellationToken)
{
- m_poolMembers= new List(poolSize);
- m_processedCount = 0;
- m_cancellationToken = cancellationToken;
+ _poolMembers= new List(poolSize);
+ _initialSessionState = initialSessionState;
+ _cancellationToken = cancellationToken;
+
+ for (var i = 0; i < poolSize; i++)
+ {
+ var powerShellPoolMember = new PowerShellPoolMember(this, i+1, initialSessionState);
+ _poolMembers.Add(powerShellPoolMember);
+ _availablePoolMembers.Add(powerShellPoolMember);
+ }
+ }
- for (int i = 0; i < poolSize; i++)
+ private int GetPartiallyProcessedCount()
+ {
+ var totalPercentComplete = 0;
+ var count = _poolMembers.Count;
+ for (int i = 0; i < count; ++i)
{
- var powerShellPoolMember = new PowerShellPoolMember(this);
- m_poolMembers.Add(powerShellPoolMember);
- m_availablePoolMembers.Add(powerShellPoolMember);
+ var percentComplete = _poolMembers[i].PercentComplete;
+ if (percentComplete < 0)
+ {
+ percentComplete = 0;
+ }
+ else if(percentComplete > 100)
+ {
+ percentComplete = 100;
+ }
+ totalPercentComplete += percentComplete;
}
-
- m_runspacePool = RunspaceFactory.CreateRunspacePool(initialSessionState);
- m_runspacePool.SetMaxRunspaces(poolSize);
+ var partiallyProcessedCount = totalPercentComplete / 100;
+ return partiallyProcessedCount;
}
- public void AddInput(ScriptBlock scriptblock,PSObject inputObject)
+ public int GetEstimatedProgressCount()
{
- var powerShell = WaitForAvailablePowershell();
- powerShell.BeginInvoke(scriptblock, inputObject);
- Interlocked.Increment(ref m_busyCount);
+ lock(_countLock) {
+ return _processedCount + GetPartiallyProcessedCount();
+ }
}
- public void Open()
- {
- m_runspacePool.Open();
+ public bool TryAddInput(ScriptBlock scriptblock,PSObject inputObject)
+ {
+ PowerShellPoolMember poolMember;
+ if(!TryWaitForAvailablePowershell(100, out poolMember))
+ {
+ return false;
+ }
+
+ Interlocked.Increment(ref _busyCount);
+ poolMember.BeginInvoke(scriptblock, inputObject);
+ return true;
}
+
+
public bool WaitForAllPowershellCompleted(int timeoutMilliseconds)
{
- Contract.Requires(timeoutMilliseconds >=0);
+ Contract.Requires(timeoutMilliseconds >= 0);
var startTicks = Environment.TickCount;
var currendTicks = startTicks;
while (currendTicks - startTicks < timeoutMilliseconds)
{
currendTicks = Environment.TickCount;
- if (m_cancellationToken.IsCancellationRequested)
+ if (_cancellationToken.IsCancellationRequested)
{
return false;
}
- if (Interlocked.CompareExchange(ref m_busyCount, 0, 0) == 0)
+ if (Interlocked.CompareExchange(ref _busyCount, 0, 0) == 0)
{
return true;
}
Thread.Sleep(10);
-
}
return false;
}
- private PowerShellPoolMember WaitForAvailablePowershell()
+ private bool TryWaitForAvailablePowershell(int milliseconds, out PowerShellPoolMember poolMember)
{
- var poolmember = m_availablePoolMembers.Take(m_cancellationToken);
- poolmember.PowerShell.RunspacePool = m_runspacePool;
- return poolmember;
+ if (!_availablePoolMembers.TryTake(out poolMember, milliseconds, _cancellationToken))
+ {
+ _cancellationToken.ThrowIfCancellationRequested();
+ Debug.WriteLine("WaitForAvailablePowershell - TryTake failed");
+ poolMember = null;
+ return false;
+ }
+ return true;
}
public void Dispose()
{
- foreach (var pm in m_poolMembers)
+ Streams.Dispose();
+ _availablePoolMembers.Dispose();
+ }
+
+ public void ReportAvailable(PowerShellPoolMember poolmember)
+ {
+ Interlocked.Decrement(ref _busyCount);
+ lock (_countLock)
{
- pm.Dispose();
+ _processedCount++;
+ poolmember.PercentComplete = 0;
+ }
+
+ poolmember.PercentComplete = 0;
+ while (!_availablePoolMembers.TryAdd(poolmember, 1000, _cancellationToken))
+ {
+ _cancellationToken.ThrowIfCancellationRequested();
+ Debug.WriteLine("WaitForAvailablePowershell - TryAdd failed");
}
- Streams.Dispose();
- m_availablePoolMembers.Dispose();
}
- public void ReportCompletion(PowerShellPoolMember poolmember)
+ public void ReportStopped(PowerShellPoolMember powerShellPoolMember)
{
- Interlocked.Decrement(ref m_busyCount);
- Interlocked.Increment(ref m_processedCount);
- m_availablePoolMembers.Add(poolmember);
+ Interlocked.Decrement(ref _busyCount);
+ }
+
+ public void Stop()
+ {
+ _availablePoolMembers.CompleteAdding();
+ foreach (var poolMember in _poolMembers)
+ {
+ poolMember.Stop();
+ }
+ WaitForAllPowershellCompleted(5000);
}
}
}
\ No newline at end of file
diff --git a/src/PSParallel/ProgressManager.cs b/src/PSParallel/ProgressManager.cs
index c3b1d00..2291b91 100644
--- a/src/PSParallel/ProgressManager.cs
+++ b/src/PSParallel/ProgressManager.cs
@@ -1,56 +1,155 @@
-using System.Diagnostics;
+using System;
+using System.Diagnostics;
using System.Management.Automation;
-
namespace PSParallel
{
class ProgressManager
{
public int TotalCount { get; set; }
- private readonly ProgressRecord m_progressRecord;
- private readonly Stopwatch m_stopwatch;
+ private ProgressRecord _progressRecord;
+ private readonly Stopwatch _stopwatch;
+ private string _currentOperation;
public ProgressManager(int activityId, string activity, string statusDescription, int parentActivityId = -1, int totalCount = 0)
{
TotalCount = totalCount;
- m_stopwatch = new Stopwatch();
- m_progressRecord = new ProgressRecord(activityId, activity, statusDescription) {ParentActivityId = parentActivityId};
+ _stopwatch = new Stopwatch();
+ _progressRecord = new ProgressRecord(activityId, activity, statusDescription) {ParentActivityId = parentActivityId};
}
- public ProgressRecord GetCurrentProgressRecord(string currentOperation, int count)
+
+ private void UpdateCurrentProgressRecordInternal(int count)
{
- if(!m_stopwatch.IsRunning && TotalCount > 0)
+ if (!_stopwatch.IsRunning && TotalCount > 0)
{
- m_stopwatch.Start();
+ _stopwatch.Start();
}
- m_progressRecord.RecordType = ProgressRecordType.Processing;
- if(TotalCount > 0)
- {
- var percentComplete = GetPercentComplete(count);
- if (percentComplete != m_progressRecord.PercentComplete)
- {
- m_progressRecord.PercentComplete = percentComplete;
- m_progressRecord.SecondsRemaining = GetSecondsRemaining(count);
- }
- m_progressRecord.CurrentOperation = $"({count}/{TotalCount}) {currentOperation}";
+ var current = TotalCount > 0 ? $"({count}/{TotalCount}) {_currentOperation}" : _currentOperation;
+ var pr = _progressRecord.Clone();
+ pr.CurrentOperation = current;
+ pr.RecordType = ProgressRecordType.Processing;
+ if (TotalCount > 0)
+ {
+ pr.PercentComplete = GetPercentComplete(count);
+ pr.SecondsRemaining = GetSecondsRemaining(count);
}
- else
- {
- m_progressRecord.CurrentOperation = currentOperation;
- }
- return m_progressRecord;
+ _progressRecord = pr;
+ }
+
+ public void SetCurrentOperation(string currentOperation)
+ {
+ _currentOperation = currentOperation;
}
+ public void UpdateCurrentProgressRecord(int count)
+ {
+
+ UpdateCurrentProgressRecordInternal(count);
+ }
+
+ public ProgressRecord ProgressRecord => _progressRecord;
+
public ProgressRecord Completed()
{
- m_stopwatch.Reset();
+ _stopwatch.Reset();
+ _progressRecord = _progressRecord.WithRecordType(ProgressRecordType.Completed);
+ return _progressRecord;
+ }
+
+
+ private int GetSecondsRemaining(int count)
+ {
+ var secondsRemaining = count == 0 ? -1 : (int) ((TotalCount - count)*_stopwatch.ElapsedMilliseconds/1000/count);
+ return secondsRemaining;
+ }
+
+ private int GetPercentComplete(int count)
+ {
+ var percentComplete = count*100/TotalCount;
+ return percentComplete;
+ }
+
+ public int ActivityId => _progressRecord.ActivityId;
+ }
+
+
+ class ProgressProjector
+ {
+ private readonly Stopwatch _stopWatch;
+ private int _percentComplete;
+ public ProgressProjector()
+ {
+ _stopWatch = new Stopwatch();
+ _percentComplete = -1;
+ }
+
+ public void ReportProgress(int percentComplete)
+ {
+ if (percentComplete > 100)
+ {
+ percentComplete = 100;
+ }
+ _percentComplete = percentComplete;
+ }
- m_progressRecord.RecordType = ProgressRecordType.Completed;
- return m_progressRecord;
+ public bool IsValid => _percentComplete > 0 && _stopWatch.IsRunning;
+ public TimeSpan Elapsed => _stopWatch.Elapsed;
+
+ public TimeSpan ProjectedTotalTime => new TimeSpan(Elapsed.Ticks * 100 / _percentComplete);
+
+ public void Start()
+ {
+ _stopWatch.Start();
+ _percentComplete = 0;
+ }
+
+ public void Stop()
+ {
+ _stopWatch.Stop();
+ }
+ }
+
+ static class ProgressRecordExtension
+ {
+ static ProgressRecord CloneProgressRecord(ProgressRecord record)
+ {
+ return new ProgressRecord(record.ActivityId, record.Activity, record.StatusDescription)
+ {
+ CurrentOperation = record.CurrentOperation,
+ ParentActivityId = record.ParentActivityId,
+ SecondsRemaining = record.SecondsRemaining,
+ PercentComplete = record.PercentComplete,
+ RecordType = record.RecordType
+ };
+ }
+
+ public static ProgressRecord Clone(this ProgressRecord record)
+ {
+ return CloneProgressRecord(record);
+ }
+
+ public static ProgressRecord WithCurrentOperation(this ProgressRecord record, string currentOperation)
+ {
+ var r = CloneProgressRecord(record);
+ r.CurrentOperation = currentOperation;
+ return r;
+ }
+
+ public static ProgressRecord WithRecordType(this ProgressRecord record, ProgressRecordType recordType)
+ {
+ var r = CloneProgressRecord(record);
+ r.RecordType = recordType;
+ return r;
+ }
+
+ public static ProgressRecord WithPercentCompleteAndSecondsRemaining(this ProgressRecord record, int percentComplete, int secondsRemaining)
+ {
+ var r = CloneProgressRecord(record);
+ r.PercentComplete = percentComplete;
+ r.SecondsRemaining = secondsRemaining;
+ return r;
}
- private int GetSecondsRemaining(int count) => count == 0 ? -1 : (int) ((TotalCount - count)*m_stopwatch.ElapsedMilliseconds/1000/count);
- private int GetPercentComplete(int count) => count*100/TotalCount;
- public int ActivityId => m_progressRecord.ActivityId;
}
}
diff --git a/src/PSParallel/Properties/AssemblyInfo.cs b/src/PSParallel/Properties/AssemblyInfo.cs
index 3a8773d..fe5f249 100644
--- a/src/PSParallel/Properties/AssemblyInfo.cs
+++ b/src/PSParallel/Properties/AssemblyInfo.cs
@@ -1,5 +1,4 @@
using System.Reflection;
-using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
@@ -10,7 +9,7 @@
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("PSParallell")]
-[assembly: AssemblyCopyright("Copyright © 2015")]
+[assembly: AssemblyCopyright("Copyright © 2016")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
@@ -33,4 +32,4 @@
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
-[assembly: AssemblyFileVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("2.2.3.0")]
diff --git a/src/PSParallelTests/InvokeParallelTests.cs b/src/PSParallelTests/InvokeParallelTests.cs
index 5f1e941..02b105f 100644
--- a/src/PSParallelTests/InvokeParallelTests.cs
+++ b/src/PSParallelTests/InvokeParallelTests.cs
@@ -1,5 +1,4 @@
using System;
-using System.IO;
using System.Linq;
using System.Management.Automation;
using System.Management.Automation.Runspaces;
@@ -13,277 +12,394 @@ namespace PSParallelTests
public sealed class InvokeParallelTests : IDisposable
{
readonly RunspacePool m_runspacePool;
-
+ readonly InitialSessionState _iss;
public InvokeParallelTests()
- {
+ {
+ _iss = CreateInitialSessionState();
+ m_runspacePool = RunspaceFactory.CreateRunspacePool(_iss);
+ m_runspacePool.SetMaxRunspaces(10);
+ m_runspacePool.Open();
+ }
+
+ private static InitialSessionState CreateInitialSessionState()
+ {
var iss = InitialSessionState.Create();
- iss.LanguageMode = PSLanguageMode.FullLanguage;
- iss.Commands.Add(new []
+ iss.LanguageMode = PSLanguageMode.FullLanguage;
+ iss.Commands.Add(new[]
{
- new SessionStateCmdletEntry("Write-Error", typeof(WriteErrorCommand), null),
- new SessionStateCmdletEntry("Write-Verbose", typeof(WriteVerboseCommand), null),
- new SessionStateCmdletEntry("Write-Debug", typeof(WriteDebugCommand), null),
- new SessionStateCmdletEntry("Write-Progress", typeof(WriteProgressCommand), null),
- new SessionStateCmdletEntry("Write-Warning", typeof(WriteWarningCommand), null),
+ new SessionStateCmdletEntry("Write-Error", typeof(WriteErrorCommand), null),
+ new SessionStateCmdletEntry("Write-Verbose", typeof(WriteVerboseCommand), null),
+ new SessionStateCmdletEntry("Write-Debug", typeof(WriteDebugCommand), null),
+ new SessionStateCmdletEntry("Write-Progress", typeof(WriteProgressCommand), null),
+ new SessionStateCmdletEntry("Write-Warning", typeof(WriteWarningCommand), null),
new SessionStateCmdletEntry("Write-Information", typeof(WriteInformationCommand), null),
- new SessionStateCmdletEntry("Invoke-Parallel", typeof(InvokeParallelCommand), null),
- });
- m_runspacePool = RunspaceFactory.CreateRunspacePool(iss);
- m_runspacePool.SetMaxRunspaces(10);
- m_runspacePool.Open();
+ new SessionStateCmdletEntry("Invoke-Parallel", typeof(InvokeParallelCommand), null),
+ });
+ iss.Providers.Add(new SessionStateProviderEntry("Function", typeof(FunctionProvider), null));
+ iss.Providers.Add(new SessionStateProviderEntry("Variable", typeof(VariableProvider), null));
+ return iss;
}
+
[TestMethod]
public void TestOutput()
{
- PowerShell ps = PowerShell.Create();
- ps.RunspacePool = m_runspacePool;
-
- ps.AddCommand("Invoke-Parallel")
- .AddParameter("ScriptBlock", ScriptBlock.Create("$_* 2"))
- .AddParameter("ThrottleLimit", 1);
- var input = new PSDataCollection {1,2,3,4,5};
- input.Complete();
- var output = ps.Invoke(input);
- var sum = output.Aggregate(0, (a, b) => a + b);
- Assert.AreEqual(30, sum);
+ using (var ps = PowerShell.Create())
+ {
+ ps.RunspacePool = m_runspacePool;
+
+ ps.AddCommand("Invoke-Parallel")
+ .AddParameter("ScriptBlock", ScriptBlock.Create("$_* 2"))
+ .AddParameter("ThrottleLimit", 1);
+ var input = new PSDataCollection {1,2,3,4,5};
+ input.Complete();
+ var output = ps.Invoke(input);
+ var sum = output.Aggregate(0, (a, b) => a + b);
+ Assert.AreEqual(30, sum);
+ }
}
[TestMethod]
public void TestParallelOutput()
{
- PowerShell ps = PowerShell.Create();
- ps.RunspacePool = m_runspacePool;
-
- ps.AddCommand("Invoke-Parallel")
- .AddParameter("ScriptBlock", ScriptBlock.Create("$_* 2"))
- .AddParameter("ThrottleLimit", 10);
- var input = new PSDataCollection(Enumerable.Range(1,1000));
- input.Complete();
- var output = ps.Invoke(input);
- var sum = output.Aggregate(0, (a, b) => a + b);
- Assert.AreEqual(1001000, sum);
+ using (var ps = PowerShell.Create())
+ {
+ //ps.RunspacePool = m_runspacePool;
+
+ ps.AddCommand("Invoke-Parallel")
+ .AddParameter("ScriptBlock", ScriptBlock.Create("$_* 2"))
+ .AddParameter("ThrottleLimit", 10);
+ var input = new PSDataCollection(Enumerable.Range(1, 1000));
+ input.Complete();
+ var output = ps.Invoke(input);
+ var sum = output.Aggregate(0, (a, b) => a + b);
+ Assert.AreEqual(1001000, sum);
+ }
}
[TestMethod]
public void TestVerboseOutput()
{
- PowerShell ps = PowerShell.Create();
- ps.RunspacePool = m_runspacePool;
- ps.AddScript("$VerbosePreference='Continue'", false).Invoke();
- ps.Commands.Clear();
- ps.AddStatement()
- .AddCommand("Invoke-Parallel",false)
+ using (var ps = PowerShell.Create())
+ {
+ ps.RunspacePool = m_runspacePool;
+ ps.AddScript("$VerbosePreference=[System.Management.Automation.ActionPreference]::Continue", false).Invoke();
+ ps.Commands.Clear();
+ ps.AddStatement()
+ .AddCommand("Invoke-Parallel", false)
.AddParameter("ScriptBlock", ScriptBlock.Create("Write-Verbose $_"))
.AddParameter("ThrottleLimit", 1);
- var input = new PSDataCollection { 1, 2, 3, 4, 5 };
- input.Complete();
- ps.Invoke(input);
- Assert.IsFalse(ps.HadErrors, "We don't expect errors here");
- var vrb = ps.Streams.Verbose.ReadAll();
- Assert.IsTrue(vrb.Any(v=> v.Message == "1"), "Some verbose message should be '1'");
+ var input = new PSDataCollection {1, 2, 3, 4, 5};
+ input.Complete();
+ ps.Invoke(input);
+ Assert.IsFalse(ps.HadErrors, "We don't expect errors here");
+ var vrb = ps.Streams.Verbose.ReadAll();
+ Assert.IsTrue(vrb.Any(v => v.Message == "1"), "Some verbose message should be '1'");
+ }
}
[TestMethod]
public void TestNoVerboseOutputWithoutPreference()
{
- PowerShell ps = PowerShell.Create();
- ps.RunspacePool = m_runspacePool;
- ps.AddStatement()
- .AddCommand("Invoke-Parallel", false)
+ using (var ps = PowerShell.Create())
+ {
+ ps.Runspace = RunspaceFactory.CreateRunspace();
+ ps.Runspace.Open();
+ ps.AddStatement()
+ .AddCommand("Invoke-Parallel", false)
.AddParameter("ScriptBlock", ScriptBlock.Create("Write-Verbose $_"))
.AddParameter("ThrottleLimit", 1);
- var input = new PSDataCollection { 1, 2, 3, 4, 5 };
- input.Complete();
- ps.Invoke(input);
- Assert.IsFalse(ps.HadErrors, "We don't expect errors here");
- var vrb = ps.Streams.Verbose.ReadAll();
- Assert.IsFalse(vrb.Any(v => v.Message == "1"), "No verbose message should be '1'");
+ var input = new PSDataCollection {1, 2, 3, 4, 5};
+ input.Complete();
+ ps.Invoke(input);
+ Assert.IsFalse(ps.HadErrors, "We don't expect errors here");
+ var vrb = ps.Streams.Verbose.ReadAll();
+ Assert.IsFalse(vrb.Any(v => v.Message == "1"), "No verbose message should be '1'");
+ ps.Runspace.Dispose();
+ }
}
[TestMethod]
public void TestDebugOutput()
{
- PowerShell ps = PowerShell.Create();
- ps.RunspacePool = m_runspacePool;
- ps.AddScript("$DebugPreference='Continue'", false).Invoke();
- ps.Commands.Clear();
- ps.AddStatement()
- .AddCommand("Invoke-Parallel", false)
- .AddParameter("ScriptBlock", ScriptBlock.Create("Write-Debug $_"))
- .AddParameter("ThrottleLimit", 1);
- var input = new PSDataCollection { 1, 2, 3, 4, 5 };
- input.Complete();
- ps.Invoke(input);
- Assert.IsFalse(ps.HadErrors, "We don't expect errors here");
- var dbg = ps.Streams.Debug.ReadAll();
- Assert.IsTrue(dbg.Any(d => d.Message == "1"), "Some debug message should be '1'");
+ using (var ps = PowerShell.Create())
+ {
+ using (var rs = RunspaceFactory.CreateRunspace(_iss))
+ {
+ rs.Open();
+ ps.Runspace = rs;
+ ps.AddScript("$DebugPreference=[System.Management.Automation.ActionPreference]::Continue", false).Invoke();
+ ps.Commands.Clear();
+ ps.AddStatement()
+ .AddCommand("Invoke-Parallel", false)
+ .AddParameter("ScriptBlock", ScriptBlock.Create("Write-Debug $_"))
+ .AddParameter("ThrottleLimit", 1);
+ var input = new PSDataCollection { 1, 2, 3, 4, 5 };
+ input.Complete();
+ ps.Invoke(input);
+ Assert.IsFalse(ps.HadErrors, "We don't expect errors here");
+ var dbg = ps.Streams.Debug.ReadAll();
+ Assert.IsTrue(dbg.Any(d => d.Message == "1"), "Some debug message should be '1'");
+ }
+ }
}
[TestMethod]
public void TestNoDebugOutputWithoutPreference()
{
- PowerShell ps = PowerShell.Create();
- ps.RunspacePool = m_runspacePool;
- ps.Commands.Clear();
- ps.AddStatement()
- .AddCommand("Invoke-Parallel", false)
+ using (var ps = PowerShell.Create())
+ {
+ ps.RunspacePool = m_runspacePool;
+ ps.Commands.Clear();
+ ps.AddStatement()
+ .AddCommand("Invoke-Parallel", false)
.AddParameter("ScriptBlock", ScriptBlock.Create("Write-Debug $_"))
.AddParameter("ThrottleLimit", 1);
- var input = new PSDataCollection { 1, 2, 3, 4, 5 };
- input.Complete();
- ps.Invoke(input);
- var dbg = ps.Streams.Debug.ReadAll();
- Assert.IsFalse(dbg.Any(d => d.Message == "1"), "No debug message should be '1'");
+ var input = new PSDataCollection {1, 2, 3, 4, 5};
+ input.Complete();
+ ps.Invoke(input);
+ var dbg = ps.Streams.Debug.ReadAll();
+ Assert.IsFalse(dbg.Any(d => d.Message == "1"), "No debug message should be '1'");
+ }
}
[TestMethod]
public void TestWarningOutput()
{
- PowerShell ps = PowerShell.Create();
- ps.RunspacePool = m_runspacePool;
- ps.AddScript("$WarningPreference='Continue'", false).Invoke();
- ps.Commands.Clear();
- ps.AddStatement()
- .AddCommand("Invoke-Parallel", false)
+
+ using (var ps = PowerShell.Create())
+ {
+ ps.RunspacePool = m_runspacePool;
+ ps.AddScript("$WarningPreference='Continue'", false).Invoke();
+ ps.Commands.Clear();
+ ps.AddStatement()
+ .AddCommand("Invoke-Parallel", false)
.AddParameter("ScriptBlock", ScriptBlock.Create("Write-Warning $_"))
.AddParameter("ThrottleLimit", 1);
- var input = new PSDataCollection { 1, 2, 3, 4, 5 };
- input.Complete();
- ps.Invoke(input);
- var wrn = ps.Streams.Warning.ReadAll();
- Assert.IsTrue(wrn.Any(w => w.Message == "1"), "Some warning message should be '1'");
+ var input = new PSDataCollection {1, 2, 3, 4, 5};
+ input.Complete();
+ ps.Invoke(input);
+ var wrn = ps.Streams.Warning.ReadAll();
+ Assert.IsTrue(wrn.Any(w => w.Message == "1"), "Some warning message should be '1'");
+ }
}
[TestMethod]
public void TestNoWarningOutputWithoutPreference()
{
- PowerShell ps = PowerShell.Create();
- ps.RunspacePool = m_runspacePool;
- ps.AddScript("$WarningPreference='SilentlyContinue'", false).Invoke();
- ps.Commands.Clear();
- ps.AddStatement()
- .AddCommand("Invoke-Parallel", false)
+ using (var ps = PowerShell.Create())
+ {
+ ps.RunspacePool = m_runspacePool;
+ ps.AddScript("$WarningPreference='SilentlyContinue'", false).Invoke();
+ ps.Commands.Clear();
+ ps.AddStatement()
+ .AddCommand("Invoke-Parallel", false)
.AddParameter("ScriptBlock", ScriptBlock.Create("Write-Warning $_"))
.AddParameter("ThrottleLimit", 1);
- var input = new PSDataCollection { 1, 2, 3, 4, 5 };
- input.Complete();
- ps.Invoke(input);
- var wrn = ps.Streams.Warning.ReadAll();
- Assert.IsFalse(wrn.Any(w => w.Message == "1"), "No warning message should be '1'");
+ var input = new PSDataCollection {1, 2, 3, 4, 5};
+ input.Complete();
+ ps.Invoke(input);
+ var wrn = ps.Streams.Warning.ReadAll();
+ Assert.IsFalse(wrn.Any(w => w.Message == "1"), "No warning message should be '1'");
+ }
}
[TestMethod]
public void TestErrorOutput()
{
- PowerShell ps = PowerShell.Create();
- ps.RunspacePool = m_runspacePool;
- ps.AddScript("$ErrorActionPreference='Continue'", false).Invoke();
- ps.Commands.Clear();
- ps.AddStatement()
- .AddCommand("Invoke-Parallel", false)
+ using (var ps = PowerShell.Create())
+ {
+ ps.RunspacePool = m_runspacePool;
+ ps.AddScript("$ErrorActionPreference='Continue'", false).Invoke();
+ ps.Commands.Clear();
+ ps.AddStatement()
+ .AddCommand("Invoke-Parallel", false)
.AddParameter("ScriptBlock", ScriptBlock.Create("Write-Error -Message $_ -TargetObject $_"))
.AddParameter("ThrottleLimit", 1);
- var input = new PSDataCollection { 1, 2, 3, 4, 5 };
- input.Complete();
- ps.Invoke(input);
- var err = ps.Streams.Error.ReadAll();
- Assert.IsTrue(err.Any(e => e.Exception.Message == "1"), "Some warning message should be '1'");
+ var input = new PSDataCollection {1, 2, 3, 4, 5};
+ input.Complete();
+ ps.Invoke(input);
+ var err = ps.Streams.Error.ReadAll();
+ Assert.IsTrue(err.Any(e => e.Exception.Message == "1"), "Some warning message should be '1'");
+ }
}
[TestMethod]
public void TestNoErrorOutputWithoutPreference()
{
- PowerShell ps = PowerShell.Create();
- ps.RunspacePool = m_runspacePool;
- ps.AddScript("$ErrorActionPreference='SilentlyContinue'", false).Invoke();
- ps.Commands.Clear();
- ps.AddStatement()
- .AddCommand("Invoke-Parallel", false)
+ using (var ps = PowerShell.Create())
+ {
+ ps.RunspacePool = m_runspacePool;
+ ps.AddScript("$ErrorActionPreference='SilentlyContinue'", false).Invoke();
+ ps.Commands.Clear();
+ ps.AddStatement()
+ .AddCommand("Invoke-Parallel", false)
.AddParameter("ScriptBlock", ScriptBlock.Create("Write-Error -message $_ -TargetObject $_"))
.AddParameter("ThrottleLimit", 1);
- var input = new PSDataCollection { 1, 2, 3, 4, 5 };
- input.Complete();
- ps.Invoke(input);
- var err = ps.Streams.Error.ReadAll();
- Assert.IsFalse(err.Any(e => e.Exception.Message == "1"), "No Error message should be '1'");
+ var input = new PSDataCollection {1, 2, 3, 4, 5};
+ input.Complete();
+ ps.Invoke(input);
+ var err = ps.Streams.Error.ReadAll();
+ Assert.IsFalse(err.Any(e => e.Exception.Message == "1"), "No Error message should be '1'");
+ }
}
[TestMethod]
public void TestBinaryExpressionVariableCapture()
{
- PowerShell ps = PowerShell.Create();
- ps.RunspacePool = m_runspacePool;
- ps.AddScript("[int]$x=10", false).Invoke();
- ps.Commands.Clear();
- ps.AddStatement()
- .AddCommand("Invoke-Parallel", false)
- .AddParameter("ScriptBlock", ScriptBlock.Create("$x -eq 10"))
- .AddParameter("ThrottleLimit", 1)
- .AddParameter("InputObject", 1);
-
- var result = ps.Invoke().First();
- Assert.IsTrue(result);
+ using (var ps = PowerShell.Create())
+ {
+ ps.RunspacePool = m_runspacePool;
+ ps.AddScript("[int]$x=10", false).Invoke();
+ ps.Commands.Clear();
+ ps.AddStatement()
+ .AddCommand("Invoke-Parallel", false)
+ .AddParameter("ScriptBlock", ScriptBlock.Create("$x -eq 10"))
+ .AddParameter("ThrottleLimit", 1)
+ .AddParameter("InputObject", 1);
+
+ var result = ps.Invoke().First();
+ Assert.IsTrue(result);
+ }
}
[TestMethod]
public void TestAssingmentExpressionVariableCapture()
{
- PowerShell ps = PowerShell.Create();
- ps.RunspacePool = m_runspacePool;
- ps.AddScript("[int]$x=10;", false).Invoke();
- ps.Commands.Clear();
- ps.AddStatement()
- .AddCommand("Invoke-Parallel", false)
- .AddParameter("ScriptBlock", ScriptBlock.Create("$y = $x * 5; $y"))
- .AddParameter("ThrottleLimit", 1)
- .AddParameter("InputObject", 1);
-
- var result = ps.Invoke().First();
- Assert.AreEqual(50, result);
+ using (var ps = PowerShell.Create())
+ {
+ ps.RunspacePool = m_runspacePool;
+ ps.AddScript("[int]$x=10;", false).Invoke();
+ ps.Commands.Clear();
+ ps.AddStatement()
+ .AddCommand("Invoke-Parallel", false)
+ .AddParameter("ScriptBlock", ScriptBlock.Create("$y = $x * 5; $y"))
+ .AddParameter("ThrottleLimit", 1)
+ .AddParameter("InputObject", 1);
+
+ var result = ps.Invoke().First();
+ Assert.AreEqual(50, result);
+ }
}
[TestMethod]
public void TestProgressOutput()
{
- PowerShell ps = PowerShell.Create();
- ps.RunspacePool = m_runspacePool;
-
- ps.AddStatement()
- .AddCommand("Invoke-Parallel", false)
- .AddParameter("ScriptBlock", ScriptBlock.Create("Write-Progress -activity 'Test' -Status 'Status' -currentoperation $_"))
- .AddParameter("ThrottleLimit", 1);
-
- var input = new PSDataCollection { 1, 2, 3, 4, 5 };
- input.Complete();
- ps.Invoke(input);
- var progress = ps.Streams.Progress.ReadAll();
- Assert.AreEqual(11, progress.Count(pr=>pr.Activity == "Invoke-Parallel" || pr.Activity == "Test"));
+ using (var ps = PowerShell.Create())
+ {
+ ps.RunspacePool = m_runspacePool;
+ ps.AddScript("$ProgressPreference='Continue'", false).Invoke();
+ ps.AddStatement()
+ .AddCommand("Invoke-Parallel", false)
+ .AddParameter("ScriptBlock",
+ ScriptBlock.Create("Write-Progress -activity 'Test' -Status 'Status' -currentoperation $_"))
+ .AddParameter("ThrottleLimit", 1);
+
+ var input = new PSDataCollection {1, 2, 3, 4, 5};
+ input.Complete();
+ ps.Invoke(input);
+ var progress = ps.Streams.Progress.ReadAll();
+ Assert.IsTrue(10 < progress.Count(pr => pr.Activity == "Invoke-Parallel" || pr.Activity == "Test"));
+ }
+ }
+
+ [TestMethod]
+ public void TestProgressOutput2Workers()
+ {
+ using (var ps = PowerShell.Create())
+ {
+ ps.RunspacePool = m_runspacePool;
+ ps.AddScript("$ProgressPreference='Continue'", false).Invoke();
+ ps.AddStatement()
+ .AddCommand("Invoke-Parallel", false)
+ .AddParameter("ScriptBlock",
+ ScriptBlock.Create("Write-Progress -activity 'Test' -Status 'Status' -currentoperation $_"))
+ .AddParameter("ThrottleLimit", 2);
+
+ var input = new PSDataCollection { 1, 2, 3, 4, 5, 6, 7,8, 9,10 };
+ input.Complete();
+ ps.Invoke(input);
+ var progress = ps.Streams.Progress.ReadAll();
+ Assert.IsTrue(19 <= progress.Count(pr => pr.Activity == "Invoke-Parallel" || pr.Activity == "Test"));
+ }
}
[TestMethod]
public void TestNoProgressOutput()
{
- PowerShell ps = PowerShell.Create();
- ps.RunspacePool = m_runspacePool;
-
- ps.AddStatement()
- .AddCommand("Invoke-Parallel", false)
- .AddParameter("ScriptBlock", ScriptBlock.Create("Write-Progress -activity 'Test' -Status 'Status' -currentoperation $_"))
- .AddParameter("ThrottleLimit", 1)
- .AddParameter("NoProgress");
-
- var input = new PSDataCollection { 1, 2, 3, 4, 5 };
- input.Complete();
- ps.Invoke(input);
- var progress = ps.Streams.Progress.ReadAll();
- Assert.IsFalse( progress.Any(pr=>pr.Activity == "Invoke-Parallel"));
- Assert.AreEqual(5, progress.Count(pr=>pr.Activity == "Test"));
+ using (var ps = PowerShell.Create())
+ {
+ ps.RunspacePool = m_runspacePool;
+
+ ps.AddStatement()
+ .AddCommand("Invoke-Parallel", false)
+ .AddParameter("ScriptBlock",
+ ScriptBlock.Create("Write-Progress -activity 'Test' -Status 'Status' -currentoperation $_"))
+ .AddParameter("ThrottleLimit", 1)
+ .AddParameter("NoProgress");
+
+ var input = new PSDataCollection {1, 2, 3, 4, 5};
+ input.Complete();
+ ps.Invoke(input);
+ var progress = ps.Streams.Progress.ReadAll();
+ Assert.IsFalse(progress.Any(pr => pr.Activity == "Invoke-Parallel"));
+ Assert.AreEqual(5, progress.Count(pr => pr.Activity == "Test"));
+ }
+ }
+
+
+ [TestMethod]
+ public void TestFunctionCaptureOutput()
+ {
+ using (var ps = PowerShell.Create())
+ {
+ ps.RunspacePool = m_runspacePool;
+ ps.AddScript(@"
+function foo($x) {return $x * 2}
+", false);
+ ps.AddStatement()
+ .AddCommand("Invoke-Parallel", false)
+ .AddParameter("ScriptBlock", ScriptBlock.Create("foo $_"))
+ .AddParameter("ThrottleLimit", 1)
+ .AddParameter("NoProgress");
+
+ var input = new PSDataCollection {1, 2, 3, 4, 5};
+ input.Complete();
+ var output = ps.Invoke(input);
+ var sum = output.Aggregate(0, (a, b) => a + b);
+ Assert.AreEqual(30, sum);
+ }
}
+ [TestMethod]
+ public void TestRecursiveFunctionCaptureOutput()
+ {
+ using (var ps = PowerShell.Create())
+ {
+ ps.RunspacePool = m_runspacePool;
+ ps.AddScript(@"
+function foo($x) {return 2 * $x}
+function bar($x) {return 3 * (foo $x)}
+", false);
+
+ ps.AddStatement()
+ .AddCommand("Invoke-Parallel", false)
+ .AddParameter("ScriptBlock", ScriptBlock.Create("bar $_"))
+ .AddParameter("ThrottleLimit", 1)
+ .AddParameter("NoProgress");
+
+ var input = new PSDataCollection {1, 2, 3, 4, 5};
+ input.Complete();
+ var output = ps.Invoke(input);
+ var sum = output.Aggregate(0, (a, b) => a + b);
+ Assert.AreEqual(90, sum);
+ }
+ }
+
+
public void Dispose()
{
m_runspacePool.Dispose();