PowerShell 技能连载 - 并行处理实战

适用于 PowerShell 7.0 及以上版本(部分示例兼容 5.1)

随着服务器核心数增加和运维任务量增长,单线程处理已经无法满足效率需求。PowerShell 7 引入了 ForEach-Object -Parallel,让并行处理变得像管道一样简单。对于仍在使用 PowerShell 5.1 的环境,.NET 的 Runspace API 同样提供了强大的并行能力。合理使用并行处理,可以将原本需要数小时的任务缩短到几分钟。

本文将对比不同的并行方案,并给出实用的并行处理模式。

ForEach-Object -Parallel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 基本并行用法(PowerShell 7+)
$servers = @("SRV01", "SRV02", "SRV03", "SRV04", "SRV05")

# 串行获取磁盘信息
$sw = [System.Diagnostics.Stopwatch]::StartNew()
$serial = foreach ($server in $servers) {
Invoke-Command -ComputerName $server -ScriptBlock {
Get-PSDrive -PSProvider FileSystem | Select-Object Name,
@{N='FreeGB'; E={[math]::Round($_.Free / 1GB, 2)}},
@{N='UsedGB'; E={[math]::Round($_.Used / 1GB, 2)}}
}
}
$sw.Stop()
Write-Host "串行执行:$($sw.ElapsedMilliseconds) ms"

# 并行获取磁盘信息
$sw.Restart()
$parallel = $servers | ForEach-Object -ThrottleLimit 5 -Parallel {
Invoke-Command -ComputerName $_ -ScriptBlock {
Get-PSDrive -PSProvider FileSystem | Select-Object Name,
@{N='FreeGB'; E={[math]::Round($_.Free / 1GB, 2)}},
@{N='UsedGB'; E={[math]::Round($_.Used / 1GB, 2)}}
}
}
$sw.Stop()
Write-Host "并行执行:$($sw.ElapsedMilliseconds) ms"

# 带返回值的并行处理
$results = 1..100 | ForEach-Object -ThrottleLimit 10 -Parallel {
# 模拟耗时计算
Start-Sleep -Milliseconds 50
[PSCustomObject]@{
Input = $_
Squared = $_ * $_
Thread = [System.Threading.Thread]::CurrentThread.ManagedThreadId
}
}

$uniqueThreads = $results.Thread | Sort-Object -Unique
Write-Host "使用了 $($uniqueThreads.Count) 个线程处理 100 个任务"
$results | Select-Object -First 5 | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
9
10
串行执行:12534 ms
并行执行:3102 ms
使用了 6 个线程处理 100 个任务
Input Squared Thread
----- ------- ------
1 1 7
2 4 8
3 9 4
4 16 7
5 25 8

变量传递与状态共享

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 向并行代码块传递变量
$threshold = 80
$warningEmail = "admin@contoso.com"

$drives = @(
@{ Server = "SRV01"; Drive = "C:" },
@{ Server = "SRV02"; Drive = "D:" },
@{ Server = "SRV03"; Drive = "C:" }
)

$alerts = $drives | ForEach-Object -ThrottleLimit 3 -Parallel {
# 使用 $using: 引用外部变量
$thresh = $using:threshold
$email = $using:warningEmail

# 模拟获取磁盘使用率
$usage = Get-Random -Minimum 50 -Maximum 100

if ($usage -gt $thresh) {
[PSCustomObject]@{
Server = $_.Server
Drive = $_.Drive
Usage = $usage
Alert = "$($_.Server) $($_.Drive) 使用率 ${usage}% 超过阈值 ${thresh}%"
NotifyTo = $email
}
}
}

if ($alerts) {
Write-Host "磁盘告警:" -ForegroundColor Red
$alerts | Format-Table Server, Drive, Usage, Alert -AutoSize
} else {
Write-Host "所有磁盘使用率正常" -ForegroundColor Green
}

# 使用 ConcurrentDictionary 共享状态(线程安全)
$counter = [System.Collections.Concurrent.ConcurrentDictionary[string, int]]::new()

1..50 | ForEach-Object -ThrottleLimit 10 -Parallel {
$dict = $using:counter
$category = @("Web", "API", "DB", "Cache")[(Get-Random -Minimum 0 -Maximum 4)]
$dict.AddOrUpdate($category, 1, { param($k, $v) $v + 1 })
}

Write-Host "`n分类统计:" -ForegroundColor Cyan
$counter.GetEnumerator() | Sort-Object Value -Descending |
ForEach-Object { Write-Host " $($_.Key): $($_.Value)" }

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
磁盘告警:
Server Drive Usage Alert
------ ----- ----- -----
SRV01 C: 92 SRV01 C: 使用率 92% 超过阈值 80%
SRV03 C: 87 SRV03 C: 使用率 87% 超过阈值 80%

分类统计:
Web: 16
API: 14
DB: 11
Cache: 9

Runspace 池(兼容 5.1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# PowerShell 5.1 兼容的并行方案
function Invoke-RunspaceJob {
param(
[Parameter(Mandatory)]
[scriptblock]$ScriptBlock,

[Parameter(Mandatory)]
[object[]]$InputData,

[int]$ThrottleLimit = 4
)

$pool = [runspacefactory]::CreateRunspacePool(1, $ThrottleLimit)
$pool.Open()

$jobs = [System.Collections.Generic.List[hashtable]]::new()

foreach ($item in $InputData) {
$ps = [powershell]::Create()
$ps.RunspacePool = $pool

# 添加脚本和参数
$ps.AddScript($ScriptBlock).AddArgument($item) | Out-Null

$jobs.Add(@{
PowerShell = $ps
Handle = $ps.BeginInvoke()
Input = $item
})
}

# 等待所有任务完成
$results = @()
foreach ($job in $jobs) {
try {
$output = $job.PowerShell.EndInvoke($job.Handle)
if ($output) { $results += $output }
} catch {
Write-Host "任务失败:$($job.Input) - $($_.Exception.Message)" -ForegroundColor Red
} finally {
$job.PowerShell.Dispose()
}
}

$pool.Close()
$pool.Dispose()

return $results
}

# 使用 Runspace 并行 Ping 测试
$computers = @(
"SRV01", "SRV02", "SRV03", "SRV04", "SRV05",
"DB01", "DB02", "WEB01", "WEB02", "APP01"
)

$pingResults = Invoke-RunspaceJob -InputData $computers -ThrottleLimit 5 -ScriptBlock {
param([string]$Computer)

$sw = [System.Diagnostics.Stopwatch]::StartNew()
$ping = Test-Connection -ComputerName $Computer -Count 1 -Quiet -ErrorAction SilentlyContinue
$sw.Stop()

[PSCustomObject]@{
Computer = $Computer
Online = $ping
LatencyMs = $sw.ElapsedMilliseconds
}
}

$pingResults | Sort-Object Computer | Format-Table -AutoSize

$online = ($pingResults | Where-Object { $_.Online }).Count
Write-Host "在线:$online / $($computers.Count)" -ForegroundColor Green

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
Computer Online LatencyMs
-------- ------ ---------
APP01 True 2
DB01 True 3
DB02 True 2
SRV01 True 1
SRV02 True 2
SRV03 True 1
SRV04 True 2
SRV05 True 2
WEB01 True 3
WEB02 True 2
在线:10 / 10

并行文件处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# 并行处理大量文件
function Invoke-ParallelFileProcess {
param(
[Parameter(Mandatory)]
[string]$Path,

[Parameter(Mandatory)]
[scriptblock]$ProcessBlock,

[string]$Filter = "*.*",
[int]$ThrottleLimit = 4
)

$files = Get-ChildItem $Path -Filter $Filter -File
Write-Host "找到 $($files.Count) 个文件,使用 $ThrottleLimit 并发处理" -ForegroundColor Cyan

$sw = [System.Diagnostics.Stopwatch]::StartNew()

$results = $files | ForEach-Object -ThrottleLimit $ThrottleLimit -Parallel {
$file = $_
$block = $using:ProcessBlock

try {
& $block $file
} catch {
[PSCustomObject]@{
File = $file.Name
Error = $_.Exception.Message
Status = "Failed"
}
}
}

$sw.Stop()

$success = ($results | Where-Object { $_.Status -ne "Failed" }).Count
$failed = ($results | Where-Object { $_.Status -eq "Failed" }).Count

Write-Host "`n处理完成:成功 $success,失败 $failed" -ForegroundColor $(if ($failed) { "Yellow" } else { "Green" })
Write-Host "耗时:$($sw.ElapsedMilliseconds) ms"

return $results
}

# 示例:批量计算文件哈希
$hashResults = Invoke-ParallelFileProcess -Path "C:\Projects\MyApp\bin" -Filter "*.dll" -ThrottleLimit 8 -ProcessBlock {
param($file)
$hash = (Get-FileHash $file.FullName -Algorithm SHA256).Hash
[PSCustomObject]@{
File = $file.Name
SizeKB = [math]::Round($file.Length / 1KB, 2)
SHA256 = $hash.Substring(0, 16) + "..."
Status = "OK"
}
}

$hashResults | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
9
10
找到 24 个文件,使用 8 并发处理

处理完成:成功 24,失败 0
耗时:1234 ms
File SizeKB SHA256 Status
---- ------ ------ ------
Microsoft.AI.dll 456.78 a1b2c3d4e5f6... OK
Newtonsoft.Json 1234.56 f1e2d3c4b5a6... OK
System.Text.Json 234.56 1a2b3c4d5e6f... OK
MyApp.Core.dll 567.89 b2c3d4e5f6a1... OK

注意事项

  1. 并发数控制ThrottleLimit 不是越大越好,CPU 密集型任务设为 CPU 核心数,I/O 密集型可设为 2-4 倍
  2. 线程安全:并行代码中不要直接修改外部变量,使用 $using: 传递只读数据,或使用 ConcurrentDictionary 共享可变状态
  3. 错误处理:每个并行任务应有独立的 try-catch,避免一个任务失败导致整个并行操作中断
  4. 内存消耗:每个 runspace 有独立的内存空间,大量并发会消耗更多内存
  5. 调试困难:并行代码的调试比串行代码复杂得多,建议先用串行模式验证逻辑正确性
  6. PowerShell 7 优势ForEach-Object -Parallel 是 PowerShell 7 独有功能,5.1 只能使用 Runspace 方案

PowerShell 技能连载 - 异步编程模式

适用于 PowerShell 5.1 及以上版本,并行功能需要 PowerShell 7

运维脚本经常面临 IO 等待——网络请求、文件操作、数据库查询,这些操作的响应时间往往远超 CPU 处理时间。如果顺序执行 100 个 HTTP 健康检查,每个耗时 1 秒,总共需要 100 秒;但如果并发执行,可能只需要几秒。PowerShell 提供了多种异步编程机制:后台作业(Jobs)、运行空间(Runspaces)、ForEach-Object -Parallel、以及 .NET 的异步 API。

本文将讲解 PowerShell 中的异步编程模式及其适用场景。

后台作业(Jobs)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# Start-Job:在后台运行脚本块
$job = Start-Job -ScriptBlock {
Start-Sleep -Seconds 2
Get-Process | Select-Object Name, Id, CPU | Sort-Object CPU -Descending | Select-Object -First 5
}

Write-Host "作业已启动,ID:$($job.Id)" -ForegroundColor Cyan

# 非阻塞检查状态
while ($job.State -eq 'Running') {
Write-Host "." -NoNewline
Start-Sleep -Milliseconds 500
}
Write-Host

# 获取结果
$result = Receive-Job -Job $job
Write-Host "作业完成,结果:"
$result | Format-Table -AutoSize

# 清理作业
Remove-Job -Job $job

# 批量后台作业
$servers = @("SRV01", "SRV02", "SRV03", "SRV04", "SRV05")
$jobs = foreach ($server in $servers) {
Start-Job -ScriptBlock {
param($computer)
$os = Get-CimInstance Win32_OperatingSystem -ComputerName $computer -ErrorAction SilentlyContinue
if ($os) {
[PSCustomObject]@{
Server = $computer
OS = $os.Caption
Version = $os.Version
Status = "Online"
}
} else {
[PSCustomObject]@{
Server = $computer
OS = "N/A"
Version = "N/A"
Status = "Offline"
}
}
} -ArgumentList $server
}

Write-Host "启动了 $($jobs.Count) 个后台作业" -ForegroundColor Cyan

# 等待所有作业完成
$jobs | Wait-Job | Out-Null

# 收集所有结果
$allResults = $jobs | Receive-Job
$allResults | Format-Table -AutoSize

# 清理
$jobs | Remove-Job

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
作业已启动,ID:15
...

作业完成,结果:
Name Id CPU
---- -- ---
chrome 12345 1250.3
vscode 12346 456.7
powershell 12347 234.5
node 12348 123.4
dotnet 12349 98.1

启动了 5 个后台作业

Server OS Version Status
------ -- ------- ------
SRV01 Microsoft Windows Server 2022 Standard 10.0.20348 Online
SRV02 Microsoft Windows Server 2022 Standard 10.0.20348 Online
SRV03 N/A N/A Offline
SRV04 Microsoft Windows Server 2019 Standard 10.0.17763 Online
SRV05 Microsoft Windows Server 2022 Standard 10.0.20348 Online

ForEach-Object -Parallel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# PowerShell 7 并行执行(推荐方式)
$servers = @("SRV01", "SRV02", "SRV03", "SRV04", "SRV05", "SRV06", "SRV07", "SRV08")

$sw = [System.Diagnostics.Stopwatch]::StartNew()

$results = $servers | ForEach-Object -Parallel {
$server = $_
try {
$response = Invoke-WebRequest -Uri "http://${server}:8080/health" `
-TimeoutSec 5 -UseBasicParsing -ErrorAction Stop
[PSCustomObject]@{
Server = $server
Status = "Healthy"
Code = $response.StatusCode
LatencyMs = $response.RawContentLength
}
} catch {
[PSCustomObject]@{
Server = $server
Status = "Unhealthy"
Code = 0
LatencyMs = 0
Error = $_.Exception.Message
}
}
} -ThrottleLimit 4

$sw.Stop()

$results | Format-Table -AutoSize
Write-Host "并行检查 8 台服务器耗时:$($sw.ElapsedMilliseconds)ms" -ForegroundColor Green

# 共享变量(需要 $using:)
$threshold = 80
$results = $servers | ForEach-Object -Parallel {
$server = $_
$cpu = Get-Random -Min 10 -Max 95
[PSCustomObject]@{
Server = $server
CPU = $cpu
IsAlert = $cpu -gt $using:threshold
}
} -ThrottleLimit 4

$results | Where-Object { $_.IsAlert } | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Server Status    Code LatencyMs
------ ------ ---- ---------
SRV01 Healthy 200 256
SRV02 Healthy 200 256
SRV03 Unhealthy 0 0
SRV04 Healthy 200 256
SRV05 Healthy 200 256
SRV06 Healthy 200 256
SRV07 Unhealthy 0 0
SRV08 Healthy 200 256

并行检查 8 台服务器耗时:2150ms

Server CPU IsAlert
------ --- -------
SRV02 92 True
SRV05 87 True

Runspace 池

对于需要精确控制的并发场景,使用 Runspace 池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
function Invoke-Parallel {
<#
.SYNOPSIS
使用 Runspace 池并行执行脚本块
#>
param(
[Parameter(Mandatory)]
[scriptblock]$ScriptBlock,

[Parameter(Mandatory)]
[object[]]$InputData,

[int]$ThrottleLimit = 4
)

# 创建 Runspace 池
$pool = [System.Management.Automation.Runspaces.RunspaceFactory]::CreateRunspacePool(
1, $ThrottleLimit
)
$pool.Open()

$runspaces = @()

foreach ($item in $InputData) {
$powershell = [System.Management.Automation.PowerShell]::Create()
$powershell.RunspacePool = $pool

$null = $powershell.AddScript($ScriptBlock).AddArgument($item)

$runspaces += [PSCustomObject]@{
Pipe = $powershell
Handle = $powershell.BeginInvoke()
Input = $item
}
}

# 等待完成并收集结果
$results = @()
foreach ($rs in $runspaces) {
try {
$output = $rs.Pipe.EndInvoke($rs.Handle)
if ($output) {
foreach ($o in $output) {
$results += $o
}
}
} catch {
$results += [PSCustomObject]@{
Error = $true
Input = $rs.Input
Message = $_.Exception.Message
}
} finally {
$rs.Pipe.Dispose()
}
}

$pool.Close()
$pool.Dispose()

return $results
}

# 使用 Runspace 池并行 ping
$servers = 1..20 | ForEach-Object { "192.168.1.$_" }

$sw = [System.Diagnostics.Stopwatch]::StartNew()
$results = Invoke-Parallel -ScriptBlock {
param($ip)
$ping = Test-Connection -ComputerName $ip -Count 1 -Quiet -ErrorAction SilentlyContinue
[PSCustomObject]@{ IP = $ip; Online = $ping }
} -InputData $servers -ThrottleLimit 10
$sw.Stop()

$online = ($results | Where-Object { $_.Online }).Count
Write-Host "扫描 20 个 IP,$online 个在线,耗时:$($sw.ElapsedMilliseconds)ms" -ForegroundColor Green
$results | Where-Object { $_.Online } | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
扫描 20 个 IP,8 个在线,耗时:3200ms

IP Online
-- ------
192.168.1.1 True
192.168.1.2 True
192.168.1.10 True
192.168.1.11 True
192.168.1.12 True
192.168.1.20 True
192.168.1.50 True
192.168.1.100 True

异步文件操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 并行处理文件
$files = Get-ChildItem "C:\Logs" -Filter "*.log" -Recurse | Select-Object -First 20

$sw = [System.Diagnostics.Stopwatch]::StartNew()

$fileStats = $files | ForEach-Object -Parallel {
$file = $_
$lines = 0
$errors = 0

$reader = [System.IO.StreamReader]::new($file.FullName)
while ($null -ne $reader.ReadLine()) {
$lines++
}
$reader.Close()

[PSCustomObject]@{
Name = $file.Name
SizeKB = [math]::Round($file.Length / 1KB, 1)
Lines = $lines
Modified = $file.LastWriteTime.ToString('yyyy-MM-dd HH:mm')
}
} -ThrottleLimit 4

$sw.Stop()

$fileStats | Format-Table -AutoSize
Write-Host "并行处理 $($files.Count) 个文件耗时:$($sw.ElapsedMilliseconds)ms" -ForegroundColor Green

执行结果示例:

1
2
3
4
5
6
7
8
Name               SizeKB Lines Modified
---- ------- ----- --------
app-20250709.log 1024.5 12050 2025-07-09 08:30
app-20250708.log 856.3 9876 2025-07-08 23:59
app-20250707.log 743.1 8543 2025-07-07 23:59
security-20250709.log 456.7 5432 2025-07-09 08:15

并行处理 20 个文件耗时:850ms

注意事项

  1. Jobs 开销大:每个 Job 启动新的 PowerShell 进程,开销较大。少量任务用 Jobs,大量任务用 Runspace 或 -Parallel
  2. 线程安全:并行操作中不要直接修改共享变量,使用 ConcurrentDictionary 或收集结果后统一处理
  3. ThrottleLimit:设置合理的并发限制,过多并发会导致资源争用和 API 限流
  4. 错误处理:并行任务中的错误不会自动传播到主线程,需要显式捕获和收集
  5. $using: 作用域ForEach-Object -Parallel 中访问外部变量需要 $using: 前缀
  6. Runspace 清理:使用完 Runspace 和 RunspacePool 后必须调用 Dispose() 释放资源

PowerShell 技能连载 - 并行处理与 Runspace

适用于 PowerShell 5.1 及以上版本,ForEach-Object -Parallel 需要 PowerShell 7

PowerShell 默认是单线程顺序执行的——一个命令完成后再执行下一个。当需要处理数百台服务器、上千个文件或大量 API 请求时,串行执行的等待时间会线性增长。并行处理是解决这类性能瓶颈的关键手段,PowerShell 提供了多种并行方案,从简单到复杂依次为:Start-JobForEach-Object -Parallel、Runspace 池。

本文将对比这三种方案,并深入讲解 Runspace 池的高性能用法。

三种并行方案对比

在选择并行方案前,需要了解各方案的特点和适用场景:

方案 最低版本 启动开销 内存占用 适用场景
Start-Job 5.1 高(新进程) 简单后台任务
ForEach-Object -Parallel 7.0 中(新 runspace) 快速并行遍历
Runspace 池 5.1 低(线程复用) 高性能批量操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 方案一:Start-Job(最简单,但开销最大)
$jobs = @()
$servers = @('SRV01', 'SRV02', 'SRV03', 'SRV04', 'SRV05')

foreach ($server in $servers) {
$jobs += Start-Job -ScriptBlock {
param($srv)
Test-Connection -ComputerName $srv -Count 1 -Quiet
Get-Service -ComputerName $srv -Name WinRM |
Select-Object Status, Name
} -ArgumentList $server
}

# 等待所有作业完成
$results = $jobs | Wait-Job | Receive-Job
$jobs | Remove-Job

$results | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
Status Name
------ ----
True WinRM
True WinRM
False WinRM
True WinRM
True WinRM

ForEach-Object -Parallel(PowerShell 7+)

PowerShell 7 引入了 ForEach-Object -Parallel,这是最便捷的并行方案。它在底层使用新的 runspace 来并行执行脚本块,支持控制并发数和超时:

1
2
3
4
5
6
7
8
9
10
11
# 基本用法:并行 ping 多台服务器
$servers = 1..50 | ForEach-Object { "192.168.1.$_" }

$servers | ForEach-Object -Parallel {
$result = Test-Connection -ComputerName $_ -Count 1 -Quiet
[PSCustomObject]@{
Server = $_
Online = $result
Time = Get-Date -Format 'HH:mm:ss.fff'
}
} -ThrottleLimit 10 | Sort-Object Server | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
Server       Online Time
------ ------ ----
192.168.1.1 True 08:15:32.123
192.168.1.2 True 08:15:32.156
192.168.1.3 False 08:15:32.189
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 传递外部变量到并行脚本块
$credential = Get-Credential
$logPath = "C:\Logs"

1..20 | ForEach-Object -Parallel {
# 使用 $using: 引用外部变量
$server = "SRV$_"
$session = New-PSSession -ComputerName $server -Credential $using:credential

Invoke-Command -Session $session -ScriptBlock {
Get-EventLog -LogName System -Newest 10 |
Select-Object TimeGenerated, EntryType, Message
} | Export-Csv "$using:logPath\$server-events.csv" -NoTypeInformation

Remove-PSSession $session
Write-Host "完成:$server"
} -ThrottleLimit 5 -AsJob | Wait-Job

执行结果示例:

1
2
3
4
完成:SRV1
完成:SRV2
完成:SRV3
...

注意$using: 语法用于将外部变量传递到并行脚本块中。但 $using: 只能传递可序列化的对象,不能传递活动会话或运行时对象。

Runspace 池高性能并行

Runspace 是 PowerShell 执行环境的最小单元。通过手动创建和管理 runspace,可以实现最低的启动开销和最高的吞吐量。这是处理大规模并行任务的最优方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# 创建 Runspace 池
$maxThreads = 8
$runspacePool = [runspacefactory]::CreateRunspacePool(1, $maxThreads)
$runspacePool.Open()

# 定义任务列表
$servers = @('SRV01', 'SRV02', 'SRV03', 'SRV04', 'SRV05',
'SRV06', 'SRV07', 'SRV08', 'SRV09', 'SRV10')

$scriptBlock = {
param($ServerName)
$startTime = Get-Date

# 模拟远程操作(实际中替换为真实命令)
Start-Sleep -Milliseconds (Get-Random -Min 100 -Max 500)

$cpu = Get-Random -Min 10 -Max 95
$mem = Get-Random -Min 30 -Max 85

[PSCustomObject]@{
Server = $ServerName
CPU = $cpu
Memory = $mem
Status = if ($cpu -gt 80) { 'Warning' } else { 'OK' }
Duration = ((Get-Date) - $startTime).TotalMilliseconds
}
}

# 创建并启动所有 Runspace
$runspaces = @()
foreach ($server in $servers) {
$powershell = [powershell]::Create().AddScript($scriptBlock).AddArgument($server)
$powershell.RunspacePool = $runspacePool

$runspaces += [PSCustomObject]@{
Pipe = $powershell
Handle = $powershell.BeginInvoke()
Server = $server
}
}

# 收集结果
$results = @()
foreach ($rs in $runspaces) {
$result = $rs.Pipe.EndInvoke($rs.Handle)
if ($result) {
$results += $result
}
$rs.Pipe.Dispose()
}

$runspacePool.Close()
$runspacePool.Dispose()

$results | Sort-Object Server | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
Server CPU Memory Status  Duration
------ --- ------ ------ --------
SRV01 45 62 OK 312.45
SRV02 72 58 OK 287.33
SRV03 89 81 Warning 456.12
SRV04 34 45 OK 198.67
SRV05 56 71 OK 234.89
...

带 进度反馈的 Runspace

长时间运行的并行任务需要进度反馈。通过将 runspace 状态存入字典,可以实时查询进度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
$maxThreads = 4
$runspacePool = [runspacefactory]::CreateRunspacePool(1, $maxThreads)
$runspacePool.Open()

$tasks = 1..20 | ForEach-Object { "Task-$_" }
$scriptBlock = {
param($TaskName)
$totalSteps = 5
for ($step = 1; $step -le $totalSteps; $step++) {
Start-Sleep -Milliseconds (Get-Random -Min 200 -Max 600)
}
[PSCustomObject]@{
Task = $TaskName
Status = 'Completed'
Steps = $totalSteps
}
}

$runspaces = [System.Collections.Concurrent.ConcurrentDictionary[string,object]]::new()

foreach ($task in $tasks) {
$ps = [powershell]::Create().AddScript($scriptBlock).AddArgument($task)
$ps.RunspacePool = $runspacePool
$handle = $ps.BeginInvoke()

$runspaces[$task] = [PSCustomObject]@{
Pipe = $ps
Handle = $handle
}
}

# 等待并显示进度
$completed = 0
$total = $tasks.Count
while ($completed -lt $total) {
Start-Sleep -Milliseconds 500

foreach ($key in @($runspaces.Keys)) {
$rs = $runspaces[$key]
if ($rs.Handle.IsCompleted -and -not $rs.Done) {
$rs.Done = $true
$completed++
$pct = [math]::Round($completed / $total * 100)
Write-Progress -Activity "并行任务执行" `
-Status "$completed / $total 已完成 ($pct%)" `
-PercentComplete $pct
}
}
}

# 收集结果
$results = @()
foreach ($key in @($runspaces.Keys)) {
$rs = $runspaces[$key]
$result = $rs.Pipe.EndInvoke($rs.Handle)
if ($result) { $results += $result }
$rs.Pipe.Dispose()
}

$runspacePool.Close()
$runspacePool.Dispose()
$results | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
Task    Status    Steps
---- ------ -----
Task-1 Completed 5
Task-2 Completed 5
Task-3 Completed 5
...
Task-20 Completed 5

并行文件处理实战

以下是一个使用 Runspace 池并行处理文件的实用示例——批量计算文件哈希:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
function Get-FileHashParallel {
<#
.SYNOPSIS
并行计算文件哈希值
#>
param(
[string]$Path = "C:\Projects",
[int]$ThrottleLimit = 8,
[string]$Algorithm = 'SHA256'
)

$files = Get-ChildItem -Path $Path -File -Recurse |
Where-Object { $_.Length -gt 1MB }
Write-Host "共 $($files.Count) 个文件需要计算哈希" -ForegroundColor Cyan

$pool = [runspacefactory]::CreateRunspacePool(1, $ThrottleLimit)
$pool.Open()

$script = {
param($FilePath, $Algo)
$hash = Get-FileHash -Path $FilePath -Algorithm $Algo
[PSCustomObject]@{
File = $FilePath
Hash = $hash.Hash
SizeMB = [math]::Round((Get-Item $FilePath).Length / 1MB, 2)
}
}

$runspaces = @()
$sw = [System.Diagnostics.Stopwatch]::StartNew()

foreach ($file in $files) {
$ps = [powershell]::Create().AddScript($script)
$ps.AddArgument($file.FullName).AddArgument($Algorithm) | Out-Null
$ps.RunspacePool = $pool
$runspaces += @{ Pipe = $ps; Handle = $ps.BeginInvoke() }
}

$results = @()
foreach ($rs in $runspaces) {
$result = $rs.Pipe.EndInvoke($rs.Handle)
if ($result) { $results += $result }
$rs.Pipe.Dispose()
}

$pool.Close()
$pool.Dispose()
$sw.Stop()

Write-Host "`n耗时:$($sw.Elapsed.TotalSeconds) 秒" -ForegroundColor Green
$results | Format-Table -AutoSize
}

Get-FileHashParallel -Path "C:\Projects" -ThrottleLimit 8

执行结果示例:

1
2
3
4
5
6
7
8
9
共 42 个文件需要计算哈希

耗时:3.82 秒

File Hash SizeMB
---- ---- ------
C:\Projects\app-v1.0.zip A1B2C3D4E5F6... 125.3
C:\Projects\database-backup.bak F6E5D4C3B2A1... 342.7
C:\Projects\config.json 1234567890AB... 1.2

注意事项

  1. 线程安全:Runspace 中的代码不应直接修改外部变量或共享状态,应通过返回值传递结果
  2. 并发数控制ThrottleLimit 或 Runspace 池大小不宜过大,通常设为 CPU 核心数的 2-4 倍
  3. 错误处理:Runspace 中的异常不会自动传播到主线程,需要在脚本块内捕获并通过返回值传递错误信息
  4. 资源释放:使用完毕后必须调用 Dispose() 释放 Runspace 和 PowerShell 对象,避免内存泄漏
  5. $using: 限制ForEach-Object -Parallel 中的 $using: 只能传递可序列化的值,不能传递 StreamWriter、数据库连接等运行时对象
  6. 模块导入:每个 Runspace 是独立的执行环境,需要单独导入模块。可以在脚本块开头添加 Import-Module 语句

PowerShell 技能连载 - 并行处理实战

适用于 PowerShell 7.0 及以上版本

当我们需要对大量对象执行相同操作时,传统的顺序处理方式往往耗时过长。例如检查 100 台服务器的连通性,逐台 ping 可能需要几分钟,但如果同时发起多个 ping 操作,时间可以缩短到几秒钟。

PowerShell 7 引入了 ForEach-Object -Parallel 参数,让并行处理变得前所未有的简单。对于更高级的场景,还可以通过 Runspace 直接操控底层并发机制。本文将系统介绍 PowerShell 中的并行处理方法,帮助你在合适的场景下大幅提升脚本执行效率。

ForEach-Object -Parallel 基础

ForEach-Object -Parallel 是 PowerShell 7 新增的功能,它为管道中的每个元素创建一个独立的运行空间来并行执行脚本块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 顺序 vs 并行:测试多台服务器的连通性
$servers = @(
"server-01.vichamp.com",
"server-02.vichamp.com",
"server-03.vichamp.com",
"server-04.vichamp.com",
"server-05.vichamp.com"
)

# 并行测试(同时发起所有 ping)
Measure-Command {
$servers | ForEach-Object -Parallel {
$result = Test-Connection -ComputerName $_ -Count 2 -Quiet
[PSCustomObject]@{
Server = $_
Online = $result
}
} | Format-Table -AutoSize
} | Select-Object TotalSeconds
1
2
3
4
5
6
7
8
9
10
11
Server                   Online
------ ------
server-01.vichamp.com True
server-02.vichamp.com True
server-03.vichamp.com False
server-04.vichamp.com True
server-05.vichamp.com True

TotalSeconds
------------
2.34

使用 $using: 传递变量

-Parallel 脚本块中,不能直接访问外部作用域的变量。需要使用 $using: 前缀来引用外部变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 从外部传入配置信息
$config = @{
Port = 443
Timeout = 5000
AlertTo = "admin@vichamp.com"
}

$servers | ForEach-Object -Parallel {
$serverConfig = $using:config
# 测试 HTTPS 端口连通性
$tcpClient = [System.Net.Sockets.TcpClient]::new()
try {
$asyncResult = $tcpClient.BeginConnect($_, $serverConfig.Port, $null, $null)
$waited = $asyncResult.AsyncWaitHandle.WaitOne($serverConfig.Timeout, $false)
[PSCustomObject]@{
Server = $_
Port = $serverConfig.Port
Open = $tcpClient.Connected
}
} catch {
[PSCustomObject]@{
Server = $_
Port = $serverConfig.Port
Open = $false
}
} finally {
$tcpClient.Close()
}
} | Format-Table -AutoSize
1
2
3
4
5
6
7
Server                    Port  Open
------ ---- ----
server-01.vichamp.com 443 True
server-02.vichamp.com 443 True
server-03.vichamp.com 443 False
server-04.vichamp.com 443 True
server-05.vichamp.com 443 True

传递外部函数

如果需要在并行脚本块中调用自定义函数,也需要通过 $using: 传递,或者在脚本块内重新定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 方式一:使用 $using: 传递函数定义
function Get-ServerHealth {
param([string]$ServerName)
# 模拟健康检查
$cpu = Get-Random -Minimum 10 -Maximum 95
$mem = Get-Random -Minimum 30 -Maximum 90
[PSCustomObject]@{
Server = $ServerName
CPU = "$cpu%"
Memory = "$mem%"
Status = if ($cpu -gt 85 -or $mem -gt 85) { "警告" } else { "正常" }
}
}

$healthFunc = ${function:Get-ServerHealth}

$servers | ForEach-Object -Parallel {
${function:Get-ServerHealth} = $using:healthFunc
Get-ServerHealth -ServerName $_
} | Format-Table -AutoSize

并发限流

当目标数量很大时,不加限制的并行可能会耗尽系统资源。使用 -ThrottleLimit 参数可以控制最大并发数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 限制最大并发数为 5,避免过载
$targets = 1..50 | ForEach-Object { "host-$($_.ToString('00'))" }

$results = $targets | ForEach-Object -ThrottleLimit 5 -Parallel {
# 模拟网络请求延迟
Start-Sleep -Milliseconds (Get-Random -Minimum 100 -Maximum 500)
[PSCustomObject]@{
Host = $_
Latency = "$(Get-Random -Minimum 10 -Maximum 200)ms"
Checked = (Get-Date).ToString('HH:mm:ss.fff')
}
}

$results | Format-Table -AutoSize
1
2
3
4
5
6
Host     Latency Checked
---- ------- --------
host-01 87ms 09:15:23.112
host-02 142ms 09:15:23.234
host-03 35ms 09:15:23.178
...

动态调整并发数

对于不同类型的任务,最佳并发数不同。可以编写一个带参数的函数来灵活控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 可配置的并行批量处理函数
function Invoke-BatchParallel {
param(
[Parameter(Mandatory)]
[array]$Items,
[scriptblock]$ScriptBlock,
[int]$ThrottleLimit = 10,
[int]$TimeoutSeconds = 300
)

$results = $Items | ForEach-Object -Parallel $ScriptBlock -ThrottleLimit $ThrottleLimit -AsJob
$completed = $results | Wait-Job -Timeout $TimeoutSeconds

$output = $completed | Receive-Job
$results | Remove-Job -Force

return $output
}

# 使用示例:批量获取远程服务器的磁盘信息(限流 3 个并发)
$serverList = @("SRV-01", "SRV-02", "SRV-03", "SRV-04", "SRV-05")

Invoke-BatchParallel -Items $serverList -ThrottleLimit 3 -ScriptBlock {
Get-CimInstance -ClassName Win32_LogicalDisk -Filter "DriveType=3" -ComputerName $_ |
Select-Object PSComputerName, DeviceID,
@{N='总空间(GB)';E={[math]::Round($_.Size/1GB,2)}},
@{N='剩余(GB)';E={[math]::Round($_.FreeSpace/1GB,2)}},
@{N='使用率';E={"$([math]::Round(($_.Size - $_.FreeSpace)/$_.Size * 100, 1))%"}}
} | Format-Table -AutoSize

使用 Runspace 实现高级并行

对于需要更精细控制的场景,可以直接使用 .NET 的 Runspace 来创建线程池。这种方式比 ForEach-Object -Parallel 更底层,但灵活性更高。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 使用 Runspace 池进行并行处理
$runspacePool = [runspacefactory]::CreateRunspacePool(1, 8)
$runspacePool.Open()

$jobs = foreach ($server in $servers) {
$powershell = [powershell]::Create().AddScript({
param($ComputerName)
$os = Get-CimInstance -ClassName Win32_OperatingSystem -ComputerName $ComputerName
[PSCustomObject]@{
Server = $ComputerName
OS = $os.Caption
LastBoot = $os.LastBootUpTime
UptimeDays = ((Get-Date) - $os.LastBootUpTime).Days
}
}).AddArgument($server)

$powershell.RunspacePool = $runspacePool

[PSCustomObject]@{
Pipe = $powershell
Handle = $powershell.BeginInvoke()
}
}

# 收集结果
foreach ($job in $jobs) {
$result = $job.Pipe.EndInvoke($job.Handle)
$result
$job.Pipe.Dispose()
}

$runspacePool.Close()
$runspacePool.Dispose()
1
2
3
4
5
6
Server            OS                                    LastBoot            UptimeDays
------ -- -------- ----------
server-01 Microsoft Windows Server 2022 2025/3/15 8:00:00 25
server-02 Microsoft Windows Server 2022 2025/4/1 10:30:00 8
server-04 Microsoft Windows Server 2019 2025/2/20 14:00:00 48
server-05 Microsoft Windows Server 2022 2025/3/28 6:15:00 12

注意事项

  • ForEach-Object -Parallel 是 PowerShell 7 的功能,在 Windows PowerShell 5.1 中不可用
  • 并行脚本块中运行在独立的运行空间中,不能直接共享变量状态,需要使用 $using: 或其他机制传递数据
  • 合理设置 -ThrottleLimit,一般不超过 CPU 核心数的 2-4 倍,避免上下文切换开销过大
  • 并行处理会产生多个运行空间,内存消耗会比顺序处理更高
  • 对于 I/O 密集型任务(网络请求、文件读写),并行效果最明显;对于 CPU 密集型任务,收益有限
  • 并行执行时输出顺序不确定,如需保持顺序,请在结果中附加原始索引后排序
  • 在并行脚本块中避免写入同一个文件,否则可能出现文件锁冲突