PowerShell 技能连载 - 并行处理实战

适用于 PowerShell 7.0 及以上版本(部分示例兼容 5.1)

随着服务器核心数增加和运维任务量增长,单线程处理已经无法满足效率需求。PowerShell 7 引入了 ForEach-Object -Parallel,让并行处理变得像管道一样简单。对于仍在使用 PowerShell 5.1 的环境,.NET 的 Runspace API 同样提供了强大的并行能力。合理使用并行处理,可以将原本需要数小时的任务缩短到几分钟。

本文将对比不同的并行方案,并给出实用的并行处理模式。

ForEach-Object -Parallel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 基本并行用法(PowerShell 7+)
$servers = @("SRV01", "SRV02", "SRV03", "SRV04", "SRV05")

# 串行获取磁盘信息
$sw = [System.Diagnostics.Stopwatch]::StartNew()
$serial = foreach ($server in $servers) {
Invoke-Command -ComputerName $server -ScriptBlock {
Get-PSDrive -PSProvider FileSystem | Select-Object Name,
@{N='FreeGB'; E={[math]::Round($_.Free / 1GB, 2)}},
@{N='UsedGB'; E={[math]::Round($_.Used / 1GB, 2)}}
}
}
$sw.Stop()
Write-Host "串行执行:$($sw.ElapsedMilliseconds) ms"

# 并行获取磁盘信息
$sw.Restart()
$parallel = $servers | ForEach-Object -ThrottleLimit 5 -Parallel {
Invoke-Command -ComputerName $_ -ScriptBlock {
Get-PSDrive -PSProvider FileSystem | Select-Object Name,
@{N='FreeGB'; E={[math]::Round($_.Free / 1GB, 2)}},
@{N='UsedGB'; E={[math]::Round($_.Used / 1GB, 2)}}
}
}
$sw.Stop()
Write-Host "并行执行:$($sw.ElapsedMilliseconds) ms"

# 带返回值的并行处理
$results = 1..100 | ForEach-Object -ThrottleLimit 10 -Parallel {
# 模拟耗时计算
Start-Sleep -Milliseconds 50
[PSCustomObject]@{
Input = $_
Squared = $_ * $_
Thread = [System.Threading.Thread]::CurrentThread.ManagedThreadId
}
}

$uniqueThreads = $results.Thread | Sort-Object -Unique
Write-Host "使用了 $($uniqueThreads.Count) 个线程处理 100 个任务"
$results | Select-Object -First 5 | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
9
10
串行执行:12534 ms
并行执行:3102 ms
使用了 6 个线程处理 100 个任务
Input Squared Thread
----- ------- ------
1 1 7
2 4 8
3 9 4
4 16 7
5 25 8

变量传递与状态共享

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 向并行代码块传递变量
$threshold = 80
$warningEmail = "admin@contoso.com"

$drives = @(
@{ Server = "SRV01"; Drive = "C:" },
@{ Server = "SRV02"; Drive = "D:" },
@{ Server = "SRV03"; Drive = "C:" }
)

$alerts = $drives | ForEach-Object -ThrottleLimit 3 -Parallel {
# 使用 $using: 引用外部变量
$thresh = $using:threshold
$email = $using:warningEmail

# 模拟获取磁盘使用率
$usage = Get-Random -Minimum 50 -Maximum 100

if ($usage -gt $thresh) {
[PSCustomObject]@{
Server = $_.Server
Drive = $_.Drive
Usage = $usage
Alert = "$($_.Server) $($_.Drive) 使用率 ${usage}% 超过阈值 ${thresh}%"
NotifyTo = $email
}
}
}

if ($alerts) {
Write-Host "磁盘告警:" -ForegroundColor Red
$alerts | Format-Table Server, Drive, Usage, Alert -AutoSize
} else {
Write-Host "所有磁盘使用率正常" -ForegroundColor Green
}

# 使用 ConcurrentDictionary 共享状态(线程安全)
$counter = [System.Collections.Concurrent.ConcurrentDictionary[string, int]]::new()

1..50 | ForEach-Object -ThrottleLimit 10 -Parallel {
$dict = $using:counter
$category = @("Web", "API", "DB", "Cache")[(Get-Random -Minimum 0 -Maximum 4)]
$dict.AddOrUpdate($category, 1, { param($k, $v) $v + 1 })
}

Write-Host "`n分类统计:" -ForegroundColor Cyan
$counter.GetEnumerator() | Sort-Object Value -Descending |
ForEach-Object { Write-Host " $($_.Key): $($_.Value)" }

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
磁盘告警:
Server Drive Usage Alert
------ ----- ----- -----
SRV01 C: 92 SRV01 C: 使用率 92% 超过阈值 80%
SRV03 C: 87 SRV03 C: 使用率 87% 超过阈值 80%

分类统计:
Web: 16
API: 14
DB: 11
Cache: 9

Runspace 池(兼容 5.1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# PowerShell 5.1 兼容的并行方案
function Invoke-RunspaceJob {
param(
[Parameter(Mandatory)]
[scriptblock]$ScriptBlock,

[Parameter(Mandatory)]
[object[]]$InputData,

[int]$ThrottleLimit = 4
)

$pool = [runspacefactory]::CreateRunspacePool(1, $ThrottleLimit)
$pool.Open()

$jobs = [System.Collections.Generic.List[hashtable]]::new()

foreach ($item in $InputData) {
$ps = [powershell]::Create()
$ps.RunspacePool = $pool

# 添加脚本和参数
$ps.AddScript($ScriptBlock).AddArgument($item) | Out-Null

$jobs.Add(@{
PowerShell = $ps
Handle = $ps.BeginInvoke()
Input = $item
})
}

# 等待所有任务完成
$results = @()
foreach ($job in $jobs) {
try {
$output = $job.PowerShell.EndInvoke($job.Handle)
if ($output) { $results += $output }
} catch {
Write-Host "任务失败:$($job.Input) - $($_.Exception.Message)" -ForegroundColor Red
} finally {
$job.PowerShell.Dispose()
}
}

$pool.Close()
$pool.Dispose()

return $results
}

# 使用 Runspace 并行 Ping 测试
$computers = @(
"SRV01", "SRV02", "SRV03", "SRV04", "SRV05",
"DB01", "DB02", "WEB01", "WEB02", "APP01"
)

$pingResults = Invoke-RunspaceJob -InputData $computers -ThrottleLimit 5 -ScriptBlock {
param([string]$Computer)

$sw = [System.Diagnostics.Stopwatch]::StartNew()
$ping = Test-Connection -ComputerName $Computer -Count 1 -Quiet -ErrorAction SilentlyContinue
$sw.Stop()

[PSCustomObject]@{
Computer = $Computer
Online = $ping
LatencyMs = $sw.ElapsedMilliseconds
}
}

$pingResults | Sort-Object Computer | Format-Table -AutoSize

$online = ($pingResults | Where-Object { $_.Online }).Count
Write-Host "在线:$online / $($computers.Count)" -ForegroundColor Green

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
Computer Online LatencyMs
-------- ------ ---------
APP01 True 2
DB01 True 3
DB02 True 2
SRV01 True 1
SRV02 True 2
SRV03 True 1
SRV04 True 2
SRV05 True 2
WEB01 True 3
WEB02 True 2
在线:10 / 10

并行文件处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# 并行处理大量文件
function Invoke-ParallelFileProcess {
param(
[Parameter(Mandatory)]
[string]$Path,

[Parameter(Mandatory)]
[scriptblock]$ProcessBlock,

[string]$Filter = "*.*",
[int]$ThrottleLimit = 4
)

$files = Get-ChildItem $Path -Filter $Filter -File
Write-Host "找到 $($files.Count) 个文件,使用 $ThrottleLimit 并发处理" -ForegroundColor Cyan

$sw = [System.Diagnostics.Stopwatch]::StartNew()

$results = $files | ForEach-Object -ThrottleLimit $ThrottleLimit -Parallel {
$file = $_
$block = $using:ProcessBlock

try {
& $block $file
} catch {
[PSCustomObject]@{
File = $file.Name
Error = $_.Exception.Message
Status = "Failed"
}
}
}

$sw.Stop()

$success = ($results | Where-Object { $_.Status -ne "Failed" }).Count
$failed = ($results | Where-Object { $_.Status -eq "Failed" }).Count

Write-Host "`n处理完成:成功 $success,失败 $failed" -ForegroundColor $(if ($failed) { "Yellow" } else { "Green" })
Write-Host "耗时:$($sw.ElapsedMilliseconds) ms"

return $results
}

# 示例:批量计算文件哈希
$hashResults = Invoke-ParallelFileProcess -Path "C:\Projects\MyApp\bin" -Filter "*.dll" -ThrottleLimit 8 -ProcessBlock {
param($file)
$hash = (Get-FileHash $file.FullName -Algorithm SHA256).Hash
[PSCustomObject]@{
File = $file.Name
SizeKB = [math]::Round($file.Length / 1KB, 2)
SHA256 = $hash.Substring(0, 16) + "..."
Status = "OK"
}
}

$hashResults | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
9
10
找到 24 个文件,使用 8 并发处理

处理完成:成功 24,失败 0
耗时:1234 ms
File SizeKB SHA256 Status
---- ------ ------ ------
Microsoft.AI.dll 456.78 a1b2c3d4e5f6... OK
Newtonsoft.Json 1234.56 f1e2d3c4b5a6... OK
System.Text.Json 234.56 1a2b3c4d5e6f... OK
MyApp.Core.dll 567.89 b2c3d4e5f6a1... OK

注意事项

  1. 并发数控制ThrottleLimit 不是越大越好,CPU 密集型任务设为 CPU 核心数,I/O 密集型可设为 2-4 倍
  2. 线程安全:并行代码中不要直接修改外部变量,使用 $using: 传递只读数据,或使用 ConcurrentDictionary 共享可变状态
  3. 错误处理:每个并行任务应有独立的 try-catch,避免一个任务失败导致整个并行操作中断
  4. 内存消耗:每个 runspace 有独立的内存空间,大量并发会消耗更多内存
  5. 调试困难:并行代码的调试比串行代码复杂得多,建议先用串行模式验证逻辑正确性
  6. PowerShell 7 优势ForEach-Object -Parallel 是 PowerShell 7 独有功能,5.1 只能使用 Runspace 方案

PowerShell 技能连载 - 异步编程模式

适用于 PowerShell 5.1 及以上版本,并行功能需要 PowerShell 7

运维脚本经常面临 IO 等待——网络请求、文件操作、数据库查询,这些操作的响应时间往往远超 CPU 处理时间。如果顺序执行 100 个 HTTP 健康检查,每个耗时 1 秒,总共需要 100 秒;但如果并发执行,可能只需要几秒。PowerShell 提供了多种异步编程机制:后台作业(Jobs)、运行空间(Runspaces)、ForEach-Object -Parallel、以及 .NET 的异步 API。

本文将讲解 PowerShell 中的异步编程模式及其适用场景。

后台作业(Jobs)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# Start-Job:在后台运行脚本块
$job = Start-Job -ScriptBlock {
Start-Sleep -Seconds 2
Get-Process | Select-Object Name, Id, CPU | Sort-Object CPU -Descending | Select-Object -First 5
}

Write-Host "作业已启动,ID:$($job.Id)" -ForegroundColor Cyan

# 非阻塞检查状态
while ($job.State -eq 'Running') {
Write-Host "." -NoNewline
Start-Sleep -Milliseconds 500
}
Write-Host

# 获取结果
$result = Receive-Job -Job $job
Write-Host "作业完成,结果:"
$result | Format-Table -AutoSize

# 清理作业
Remove-Job -Job $job

# 批量后台作业
$servers = @("SRV01", "SRV02", "SRV03", "SRV04", "SRV05")
$jobs = foreach ($server in $servers) {
Start-Job -ScriptBlock {
param($computer)
$os = Get-CimInstance Win32_OperatingSystem -ComputerName $computer -ErrorAction SilentlyContinue
if ($os) {
[PSCustomObject]@{
Server = $computer
OS = $os.Caption
Version = $os.Version
Status = "Online"
}
} else {
[PSCustomObject]@{
Server = $computer
OS = "N/A"
Version = "N/A"
Status = "Offline"
}
}
} -ArgumentList $server
}

Write-Host "启动了 $($jobs.Count) 个后台作业" -ForegroundColor Cyan

# 等待所有作业完成
$jobs | Wait-Job | Out-Null

# 收集所有结果
$allResults = $jobs | Receive-Job
$allResults | Format-Table -AutoSize

# 清理
$jobs | Remove-Job

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
作业已启动,ID:15
...

作业完成,结果:
Name Id CPU
---- -- ---
chrome 12345 1250.3
vscode 12346 456.7
powershell 12347 234.5
node 12348 123.4
dotnet 12349 98.1

启动了 5 个后台作业

Server OS Version Status
------ -- ------- ------
SRV01 Microsoft Windows Server 2022 Standard 10.0.20348 Online
SRV02 Microsoft Windows Server 2022 Standard 10.0.20348 Online
SRV03 N/A N/A Offline
SRV04 Microsoft Windows Server 2019 Standard 10.0.17763 Online
SRV05 Microsoft Windows Server 2022 Standard 10.0.20348 Online

ForEach-Object -Parallel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# PowerShell 7 并行执行(推荐方式)
$servers = @("SRV01", "SRV02", "SRV03", "SRV04", "SRV05", "SRV06", "SRV07", "SRV08")

$sw = [System.Diagnostics.Stopwatch]::StartNew()

$results = $servers | ForEach-Object -Parallel {
$server = $_
try {
$response = Invoke-WebRequest -Uri "http://${server}:8080/health" `
-TimeoutSec 5 -UseBasicParsing -ErrorAction Stop
[PSCustomObject]@{
Server = $server
Status = "Healthy"
Code = $response.StatusCode
LatencyMs = $response.RawContentLength
}
} catch {
[PSCustomObject]@{
Server = $server
Status = "Unhealthy"
Code = 0
LatencyMs = 0
Error = $_.Exception.Message
}
}
} -ThrottleLimit 4

$sw.Stop()

$results | Format-Table -AutoSize
Write-Host "并行检查 8 台服务器耗时:$($sw.ElapsedMilliseconds)ms" -ForegroundColor Green

# 共享变量(需要 $using:)
$threshold = 80
$results = $servers | ForEach-Object -Parallel {
$server = $_
$cpu = Get-Random -Min 10 -Max 95
[PSCustomObject]@{
Server = $server
CPU = $cpu
IsAlert = $cpu -gt $using:threshold
}
} -ThrottleLimit 4

$results | Where-Object { $_.IsAlert } | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Server Status    Code LatencyMs
------ ------ ---- ---------
SRV01 Healthy 200 256
SRV02 Healthy 200 256
SRV03 Unhealthy 0 0
SRV04 Healthy 200 256
SRV05 Healthy 200 256
SRV06 Healthy 200 256
SRV07 Unhealthy 0 0
SRV08 Healthy 200 256

并行检查 8 台服务器耗时:2150ms

Server CPU IsAlert
------ --- -------
SRV02 92 True
SRV05 87 True

Runspace 池

对于需要精确控制的并发场景,使用 Runspace 池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
function Invoke-Parallel {
<#
.SYNOPSIS
使用 Runspace 池并行执行脚本块
#>
param(
[Parameter(Mandatory)]
[scriptblock]$ScriptBlock,

[Parameter(Mandatory)]
[object[]]$InputData,

[int]$ThrottleLimit = 4
)

# 创建 Runspace 池
$pool = [System.Management.Automation.Runspaces.RunspaceFactory]::CreateRunspacePool(
1, $ThrottleLimit
)
$pool.Open()

$runspaces = @()

foreach ($item in $InputData) {
$powershell = [System.Management.Automation.PowerShell]::Create()
$powershell.RunspacePool = $pool

$null = $powershell.AddScript($ScriptBlock).AddArgument($item)

$runspaces += [PSCustomObject]@{
Pipe = $powershell
Handle = $powershell.BeginInvoke()
Input = $item
}
}

# 等待完成并收集结果
$results = @()
foreach ($rs in $runspaces) {
try {
$output = $rs.Pipe.EndInvoke($rs.Handle)
if ($output) {
foreach ($o in $output) {
$results += $o
}
}
} catch {
$results += [PSCustomObject]@{
Error = $true
Input = $rs.Input
Message = $_.Exception.Message
}
} finally {
$rs.Pipe.Dispose()
}
}

$pool.Close()
$pool.Dispose()

return $results
}

# 使用 Runspace 池并行 ping
$servers = 1..20 | ForEach-Object { "192.168.1.$_" }

$sw = [System.Diagnostics.Stopwatch]::StartNew()
$results = Invoke-Parallel -ScriptBlock {
param($ip)
$ping = Test-Connection -ComputerName $ip -Count 1 -Quiet -ErrorAction SilentlyContinue
[PSCustomObject]@{ IP = $ip; Online = $ping }
} -InputData $servers -ThrottleLimit 10
$sw.Stop()

$online = ($results | Where-Object { $_.Online }).Count
Write-Host "扫描 20 个 IP,$online 个在线,耗时:$($sw.ElapsedMilliseconds)ms" -ForegroundColor Green
$results | Where-Object { $_.Online } | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
扫描 20 个 IP,8 个在线,耗时:3200ms

IP Online
-- ------
192.168.1.1 True
192.168.1.2 True
192.168.1.10 True
192.168.1.11 True
192.168.1.12 True
192.168.1.20 True
192.168.1.50 True
192.168.1.100 True

异步文件操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 并行处理文件
$files = Get-ChildItem "C:\Logs" -Filter "*.log" -Recurse | Select-Object -First 20

$sw = [System.Diagnostics.Stopwatch]::StartNew()

$fileStats = $files | ForEach-Object -Parallel {
$file = $_
$lines = 0
$errors = 0

$reader = [System.IO.StreamReader]::new($file.FullName)
while ($null -ne $reader.ReadLine()) {
$lines++
}
$reader.Close()

[PSCustomObject]@{
Name = $file.Name
SizeKB = [math]::Round($file.Length / 1KB, 1)
Lines = $lines
Modified = $file.LastWriteTime.ToString('yyyy-MM-dd HH:mm')
}
} -ThrottleLimit 4

$sw.Stop()

$fileStats | Format-Table -AutoSize
Write-Host "并行处理 $($files.Count) 个文件耗时:$($sw.ElapsedMilliseconds)ms" -ForegroundColor Green

执行结果示例:

1
2
3
4
5
6
7
8
Name               SizeKB Lines Modified
---- ------- ----- --------
app-20250709.log 1024.5 12050 2025-07-09 08:30
app-20250708.log 856.3 9876 2025-07-08 23:59
app-20250707.log 743.1 8543 2025-07-07 23:59
security-20250709.log 456.7 5432 2025-07-09 08:15

并行处理 20 个文件耗时:850ms

注意事项

  1. Jobs 开销大:每个 Job 启动新的 PowerShell 进程,开销较大。少量任务用 Jobs,大量任务用 Runspace 或 -Parallel
  2. 线程安全:并行操作中不要直接修改共享变量,使用 ConcurrentDictionary 或收集结果后统一处理
  3. ThrottleLimit:设置合理的并发限制,过多并发会导致资源争用和 API 限流
  4. 错误处理:并行任务中的错误不会自动传播到主线程,需要显式捕获和收集
  5. $using: 作用域ForEach-Object -Parallel 中访问外部变量需要 $using: 前缀
  6. Runspace 清理:使用完 Runspace 和 RunspacePool 后必须调用 Dispose() 释放资源

PowerShell 技能连载 - 并行处理与 Runspace

适用于 PowerShell 5.1 及以上版本,ForEach-Object -Parallel 需要 PowerShell 7

PowerShell 默认是单线程顺序执行的——一个命令完成后再执行下一个。当需要处理数百台服务器、上千个文件或大量 API 请求时,串行执行的等待时间会线性增长。并行处理是解决这类性能瓶颈的关键手段,PowerShell 提供了多种并行方案,从简单到复杂依次为:Start-JobForEach-Object -Parallel、Runspace 池。

本文将对比这三种方案,并深入讲解 Runspace 池的高性能用法。

三种并行方案对比

在选择并行方案前,需要了解各方案的特点和适用场景:

方案 最低版本 启动开销 内存占用 适用场景
Start-Job 5.1 高(新进程) 简单后台任务
ForEach-Object -Parallel 7.0 中(新 runspace) 快速并行遍历
Runspace 池 5.1 低(线程复用) 高性能批量操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 方案一:Start-Job(最简单,但开销最大)
$jobs = @()
$servers = @('SRV01', 'SRV02', 'SRV03', 'SRV04', 'SRV05')

foreach ($server in $servers) {
$jobs += Start-Job -ScriptBlock {
param($srv)
Test-Connection -ComputerName $srv -Count 1 -Quiet
Get-Service -ComputerName $srv -Name WinRM |
Select-Object Status, Name
} -ArgumentList $server
}

# 等待所有作业完成
$results = $jobs | Wait-Job | Receive-Job
$jobs | Remove-Job

$results | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
Status Name
------ ----
True WinRM
True WinRM
False WinRM
True WinRM
True WinRM

ForEach-Object -Parallel(PowerShell 7+)

PowerShell 7 引入了 ForEach-Object -Parallel,这是最便捷的并行方案。它在底层使用新的 runspace 来并行执行脚本块,支持控制并发数和超时:

1
2
3
4
5
6
7
8
9
10
11
# 基本用法:并行 ping 多台服务器
$servers = 1..50 | ForEach-Object { "192.168.1.$_" }

$servers | ForEach-Object -Parallel {
$result = Test-Connection -ComputerName $_ -Count 1 -Quiet
[PSCustomObject]@{
Server = $_
Online = $result
Time = Get-Date -Format 'HH:mm:ss.fff'
}
} -ThrottleLimit 10 | Sort-Object Server | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
Server       Online Time
------ ------ ----
192.168.1.1 True 08:15:32.123
192.168.1.2 True 08:15:32.156
192.168.1.3 False 08:15:32.189
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 传递外部变量到并行脚本块
$credential = Get-Credential
$logPath = "C:\Logs"

1..20 | ForEach-Object -Parallel {
# 使用 $using: 引用外部变量
$server = "SRV$_"
$session = New-PSSession -ComputerName $server -Credential $using:credential

Invoke-Command -Session $session -ScriptBlock {
Get-EventLog -LogName System -Newest 10 |
Select-Object TimeGenerated, EntryType, Message
} | Export-Csv "$using:logPath\$server-events.csv" -NoTypeInformation

Remove-PSSession $session
Write-Host "完成:$server"
} -ThrottleLimit 5 -AsJob | Wait-Job

执行结果示例:

1
2
3
4
完成:SRV1
完成:SRV2
完成:SRV3
...

注意$using: 语法用于将外部变量传递到并行脚本块中。但 $using: 只能传递可序列化的对象,不能传递活动会话或运行时对象。

Runspace 池高性能并行

Runspace 是 PowerShell 执行环境的最小单元。通过手动创建和管理 runspace,可以实现最低的启动开销和最高的吞吐量。这是处理大规模并行任务的最优方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# 创建 Runspace 池
$maxThreads = 8
$runspacePool = [runspacefactory]::CreateRunspacePool(1, $maxThreads)
$runspacePool.Open()

# 定义任务列表
$servers = @('SRV01', 'SRV02', 'SRV03', 'SRV04', 'SRV05',
'SRV06', 'SRV07', 'SRV08', 'SRV09', 'SRV10')

$scriptBlock = {
param($ServerName)
$startTime = Get-Date

# 模拟远程操作(实际中替换为真实命令)
Start-Sleep -Milliseconds (Get-Random -Min 100 -Max 500)

$cpu = Get-Random -Min 10 -Max 95
$mem = Get-Random -Min 30 -Max 85

[PSCustomObject]@{
Server = $ServerName
CPU = $cpu
Memory = $mem
Status = if ($cpu -gt 80) { 'Warning' } else { 'OK' }
Duration = ((Get-Date) - $startTime).TotalMilliseconds
}
}

# 创建并启动所有 Runspace
$runspaces = @()
foreach ($server in $servers) {
$powershell = [powershell]::Create().AddScript($scriptBlock).AddArgument($server)
$powershell.RunspacePool = $runspacePool

$runspaces += [PSCustomObject]@{
Pipe = $powershell
Handle = $powershell.BeginInvoke()
Server = $server
}
}

# 收集结果
$results = @()
foreach ($rs in $runspaces) {
$result = $rs.Pipe.EndInvoke($rs.Handle)
if ($result) {
$results += $result
}
$rs.Pipe.Dispose()
}

$runspacePool.Close()
$runspacePool.Dispose()

$results | Sort-Object Server | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
Server CPU Memory Status  Duration
------ --- ------ ------ --------
SRV01 45 62 OK 312.45
SRV02 72 58 OK 287.33
SRV03 89 81 Warning 456.12
SRV04 34 45 OK 198.67
SRV05 56 71 OK 234.89
...

带 进度反馈的 Runspace

长时间运行的并行任务需要进度反馈。通过将 runspace 状态存入字典,可以实时查询进度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
$maxThreads = 4
$runspacePool = [runspacefactory]::CreateRunspacePool(1, $maxThreads)
$runspacePool.Open()

$tasks = 1..20 | ForEach-Object { "Task-$_" }
$scriptBlock = {
param($TaskName)
$totalSteps = 5
for ($step = 1; $step -le $totalSteps; $step++) {
Start-Sleep -Milliseconds (Get-Random -Min 200 -Max 600)
}
[PSCustomObject]@{
Task = $TaskName
Status = 'Completed'
Steps = $totalSteps
}
}

$runspaces = [System.Collections.Concurrent.ConcurrentDictionary[string,object]]::new()

foreach ($task in $tasks) {
$ps = [powershell]::Create().AddScript($scriptBlock).AddArgument($task)
$ps.RunspacePool = $runspacePool
$handle = $ps.BeginInvoke()

$runspaces[$task] = [PSCustomObject]@{
Pipe = $ps
Handle = $handle
}
}

# 等待并显示进度
$completed = 0
$total = $tasks.Count
while ($completed -lt $total) {
Start-Sleep -Milliseconds 500

foreach ($key in @($runspaces.Keys)) {
$rs = $runspaces[$key]
if ($rs.Handle.IsCompleted -and -not $rs.Done) {
$rs.Done = $true
$completed++
$pct = [math]::Round($completed / $total * 100)
Write-Progress -Activity "并行任务执行" `
-Status "$completed / $total 已完成 ($pct%)" `
-PercentComplete $pct
}
}
}

# 收集结果
$results = @()
foreach ($key in @($runspaces.Keys)) {
$rs = $runspaces[$key]
$result = $rs.Pipe.EndInvoke($rs.Handle)
if ($result) { $results += $result }
$rs.Pipe.Dispose()
}

$runspacePool.Close()
$runspacePool.Dispose()
$results | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
Task    Status    Steps
---- ------ -----
Task-1 Completed 5
Task-2 Completed 5
Task-3 Completed 5
...
Task-20 Completed 5

并行文件处理实战

以下是一个使用 Runspace 池并行处理文件的实用示例——批量计算文件哈希:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
function Get-FileHashParallel {
<#
.SYNOPSIS
并行计算文件哈希值
#>
param(
[string]$Path = "C:\Projects",
[int]$ThrottleLimit = 8,
[string]$Algorithm = 'SHA256'
)

$files = Get-ChildItem -Path $Path -File -Recurse |
Where-Object { $_.Length -gt 1MB }
Write-Host "共 $($files.Count) 个文件需要计算哈希" -ForegroundColor Cyan

$pool = [runspacefactory]::CreateRunspacePool(1, $ThrottleLimit)
$pool.Open()

$script = {
param($FilePath, $Algo)
$hash = Get-FileHash -Path $FilePath -Algorithm $Algo
[PSCustomObject]@{
File = $FilePath
Hash = $hash.Hash
SizeMB = [math]::Round((Get-Item $FilePath).Length / 1MB, 2)
}
}

$runspaces = @()
$sw = [System.Diagnostics.Stopwatch]::StartNew()

foreach ($file in $files) {
$ps = [powershell]::Create().AddScript($script)
$ps.AddArgument($file.FullName).AddArgument($Algorithm) | Out-Null
$ps.RunspacePool = $pool
$runspaces += @{ Pipe = $ps; Handle = $ps.BeginInvoke() }
}

$results = @()
foreach ($rs in $runspaces) {
$result = $rs.Pipe.EndInvoke($rs.Handle)
if ($result) { $results += $result }
$rs.Pipe.Dispose()
}

$pool.Close()
$pool.Dispose()
$sw.Stop()

Write-Host "`n耗时:$($sw.Elapsed.TotalSeconds) 秒" -ForegroundColor Green
$results | Format-Table -AutoSize
}

Get-FileHashParallel -Path "C:\Projects" -ThrottleLimit 8

执行结果示例:

1
2
3
4
5
6
7
8
9
共 42 个文件需要计算哈希

耗时:3.82 秒

File Hash SizeMB
---- ---- ------
C:\Projects\app-v1.0.zip A1B2C3D4E5F6... 125.3
C:\Projects\database-backup.bak F6E5D4C3B2A1... 342.7
C:\Projects\config.json 1234567890AB... 1.2

注意事项

  1. 线程安全:Runspace 中的代码不应直接修改外部变量或共享状态,应通过返回值传递结果
  2. 并发数控制ThrottleLimit 或 Runspace 池大小不宜过大,通常设为 CPU 核心数的 2-4 倍
  3. 错误处理:Runspace 中的异常不会自动传播到主线程,需要在脚本块内捕获并通过返回值传递错误信息
  4. 资源释放:使用完毕后必须调用 Dispose() 释放 Runspace 和 PowerShell 对象,避免内存泄漏
  5. $using: 限制ForEach-Object -Parallel 中的 $using: 只能传递可序列化的值,不能传递 StreamWriter、数据库连接等运行时对象
  6. 模块导入:每个 Runspace 是独立的执行环境,需要单独导入模块。可以在脚本块开头添加 Import-Module 语句