PowerShell 技能连载 - 消息队列与异步通信

适用于 PowerShell 5.1 及以上版本

在构建自动化运维系统时,不同组件之间的通信方式直接决定了系统的可靠性和扩展性。传统的同步调用方式要求调用方等待被调用方处理完毕后才能继续,这在高并发场景下容易成为瓶颈。消息队列(Message Queue)通过引入”发布-订阅”或”生产者-消费者”模式,让发送方和接收方解耦,从而实现真正意义上的异步通信。

Windows 平台原生提供了 MSMQ(Microsoft Message Queuing)服务,PowerShell 可以通过 .NET 类库直接与之交互。而在跨平台场景下,我们也可以借助文件系统、数据库或第三方消息中间件(如 RabbitMQ、Redis)来模拟队列行为。理解消息队列的核心概念(队列、消息、投递确认、死信)对设计健壮的自动化流程至关重要。

本文将从 MSMQ 原生队列操作入手,逐步扩展到基于文件系统的轻量队列实现,最后演示如何在 PowerShell 中构建一个完整的生产者-消费者模型,帮助你掌握异步通信的核心技巧。

使用 MSMQ 创建和操作消息队列

MSMQ 是 Windows 内置的消息队列服务,适合在 Windows 环境下构建可靠的异步通信管道。以下代码展示如何检测 MSMQ 服务状态、创建队列,以及发送和接收消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# 检查 MSMQ 服务是否运行
$msmqService = Get-Service -Name 'MSMQ' -ErrorAction SilentlyContinue

if ($null -eq $msmqService) {
Write-Warning 'MSMQ 服务未安装。请通过"启用或关闭 Windows 功能"安装 Message Queuing。'
return
}

if ($msmqService.Status -ne 'Running') {
Write-Host '正在启动 MSMQ 服务...'
Start-Service -Name 'MSMQ'
}

# 加载 MSMQ 的 .NET 程序集
Add-Type -AssemblyName 'System.Messaging'

# 创建私有队列(如果不存在)
$queuePath = '.\Private$\PowerShellTaskQueue'

if (-not [System.Messaging.MessageQueue]::Exists($queuePath)) {
$queue = [System.Messaging.MessageQueue]::Create($queuePath)
Write-Host "队列已创建:$queuePath"
} else {
$queue = New-Object System.Messaging.MessageQueue($queuePath)
Write-Host "队列已存在,直接连接:$queuePath"
}

# 发送消息到队列
$taskMessage = @{
TaskId = [guid]::NewGuid().ToString()
TaskName = 'BackupDatabase'
CreatedAt = (Get-Date).ToString('o')
Payload = @{ Database = 'ProductionDB'; Destination = '\\backup\db\' }
} | ConvertTo-Json -Depth 5

$msg = New-Object System.Messaging.Message
$msg.Body = $taskMessage
$msg.Label = 'DBBackupTask'
$msg.Priority = [System.Messaging.MessagePriority]::High

$queue.Send($msg)
Write-Host '消息已发送到队列。'

# 从队列接收消息
$queue.Formatter = New-Object System.Messaging.XmlMessageFormatter(
@([string])
)

$receivedMsg = $queue.Receive(
[System.TimeSpan]::FromSeconds(5)
)

$receivedBody = $receivedMsg.Body
$receivedLabel = $receivedMsg.Label

Write-Host "收到消息 - 标签: $receivedLabel"
Write-Host "消息内容: $receivedBody"

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
正在启动 MSMQ 服务...
队列已创建:.\Private$\PowerShellTaskQueue
消息已发送到队列。
收到消息 - 标签: DBBackupTask
消息内容: {
"TaskId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"TaskName": "BackupDatabase",
"CreatedAt": "2025-09-19T08:30:00.0000000+08:00",
"Payload": {
"Database": "ProductionDB",
"Destination": "\\\\backup\\db\\"
}
}

基于文件系统的轻量级消息队列

并非所有环境都安装了 MSMQ,尤其在跨平台场景(Linux、macOS)下更是如此。我们可以用文件系统模拟消息队列的核心行为:将消息写入独立文件实现入队,读取并删除文件实现出队。这种方案简单可靠,适合中小规模的自动化任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# 定义文件队列的基础路径
$queueRoot = Join-Path $env:TEMP 'PSFileQueue'

# 初始化队列目录结构
$directories = @(
(Join-Path $queueRoot 'pending')
(Join-Path $queueRoot 'processing')
(Join-Path $queueRoot 'completed')
(Join-Path $queueRoot 'deadletter')
)

foreach ($dir in $directories) {
if (-not (Test-Path $dir)) {
New-Item -Path $dir -ItemType Directory -Force | Out-Null
Write-Host "已创建目录: $dir"
}
}

# 定义入队函数
function Send-FileQueueMessage {
param(
[Parameter(Mandatory)]
[string]$QueuePath,

[Parameter(Mandatory)]
[hashtable]$MessageData,

[string]$Label = 'Default'
)

$messageId = [guid]::NewGuid().ToString('N')
$timestamp = Get-Date -Format 'yyyyMMdd-HHmmss-ffff'

$envelope = @{
MessageId = $messageId
Label = $Label
CreatedAt = (Get-Date).ToString('o')
RetryCount = 0
MaxRetries = 3
Data = $MessageData
}

$fileName = '{0}_{1}.json' -f $timestamp, $messageId.Substring(0, 8)
$filePath = Join-Path (Join-Path $QueuePath 'pending') $fileName

$envelope | ConvertTo-Json -Depth 10 | Set-Content -Path $filePath -Encoding UTF8

Write-Host "消息已入队: $fileName (Label: $Label)"
return $messageId
}

# 定义出队函数
function Receive-FileQueueMessage {
param(
[Parameter(Mandatory)]
[string]$QueuePath,

[int]$VisibilityTimeout = 30
)

$pendingDir = Join-Path $QueuePath 'pending'
$processingDir = Join-Path $QueuePath 'processing'

$files = Get-ChildItem -Path $pendingDir -Filter '*.json' |
Sort-Object Name |
Select-Object -First 1

if ($null -eq $files) {
Write-Host '队列为空,没有待处理的消息。'
return $null
}

$file = $files
$destPath = Join-Path $processingDir $file.Name

# 原子性移动:从 pending 到 processing
Move-Item -Path $file.FullName -Destination $destPath -Force

$message = Get-Content $destPath -Raw | ConvertFrom-Json

Write-Host "消息已出队: $($file.Name) (ID: $($message.MessageId))"
return [PSCustomObject]@{
FilePath = $destPath
Message = $message
}
}

# 测试:发送 3 条消息
$testMessages = @(
@{ Server = 'SRV01'; Action = 'RestartService'; Service = 'Spooler' }
@{ Server = 'SRV02'; Action = 'ClearLogs'; LogName = 'Application' }
@{ Server = 'SRV03'; Action = 'CheckDisk'; Drive = 'C:' }
)

foreach ($msg in $testMessages) {
Send-FileQueueMessage -QueuePath $queueRoot -MessageData $msg -Label 'Maintenance'
}

# 测试:接收一条消息
$received = Receive-FileQueueMessage -QueuePath $queueRoot

执行结果示例:

1
2
3
4
5
6
7
8
已创建目录: /tmp/PSFileQueue/pending
已创建目录: /tmp/PSFileQueue/processing
已创建目录: /tmp/PSFileQueue/completed
已创建目录: /tmp/PSFileQueue/deadletter
消息已入队: 20250919-0830001234_a1b2c3d4.json (Label: Maintenance)
消息已入队: 20250919-0830001235_e5f6a7b8.json (Label: Maintenance)
消息已入队: 20250919-0830001236_c9d0e1f2.json (Label: Maintenance)
消息已出队: 20250919-0830001234_a1b2c3d4.json (ID: a1b2c3d4e5f6...)

构建生产者-消费者异步处理模型

有了队列基础设施后,我们需要一个完整的生产者-消费者模型来驱动异步处理。以下代码使用 PowerShell 后台作业(Job)模拟并发消费者,实现消息的并行处理,并包含重试和死信机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# 消费者处理逻辑
function Invoke-QueueConsumer {
param(
[string]$QueuePath,
[int]$ConsumerId,
[int]$PollIntervalSeconds = 2
)

$pendingDir = Join-Path $QueuePath 'pending'
$processingDir = Join-Path $QueuePath 'processing'
$completedDir = Join-Path $QueuePath 'completed'
$deadletterDir = Join-Path $QueuePath 'deadletter'

$processed = 0

# 持续轮询,最多处理 5 条消息作为演示
while ($processed -lt 5) {
$files = Get-ChildItem -Path $pendingDir -Filter '*.json' |
Sort-Object Name |
Select-Object -First 1

if ($null -eq $files) {
Write-Host "[消费者 $ConsumerId] 队列为空,等待 ${PollIntervalSeconds}s..."
Start-Sleep -Seconds $PollIntervalSeconds
continue
}

$file = $files
$destPath = Join-Path $processingDir $file.Name

try {
Move-Item -Path $file.FullName -Destination $destPath -Force -ErrorAction Stop
$message = Get-Content $destPath -Raw | ConvertFrom-Json

Write-Host "[消费者 $ConsumerId] 正在处理: $($message.Label) " `
"- ID: $($message.MessageId.Substring(0,8))..."

# 模拟处理耗时
$workTime = Get-Random -Minimum 500 -Maximum 2000
Start-Sleep -Milliseconds $workTime

# 模拟偶发失败(约 20% 概率)
$shouldFail = (Get-Random -Minimum 1 -Maximum 100) -le 20

if ($shouldFail) {
throw '模拟的处理异常'
}

# 处理成功:移到 completed
$completedPath = Join-Path $completedDir $file.Name
Move-Item -Path $destPath -Destination $completedPath -Force

$processed++
Write-Host "[消费者 $ConsumerId] 处理完成 " `
"(耗时 ${workTime}ms, 累计 $processed 条)"
} catch {
Write-Warning "[消费者 $ConsumerId] 处理失败: $_"

# 重试逻辑
$message = Get-Content $destPath -Raw | ConvertFrom-Json
$retryCount = $message.RetryCount + 1

if ($retryCount -ge $message.MaxRetries) {
# 超过最大重试次数,移入死信队列
$deadPath = Join-Path $deadletterDir $file.Name
Move-Item -Path $destPath -Destination $deadPath -Force
Write-Warning "[消费者 $ConsumerId] 消息移入死信队列: $($file.Name)"
} else {
# 更新重试计数后放回 pending
$message.RetryCount = $retryCount
$message | ConvertTo-Json -Depth 10 |
Set-Content -Path $destPath -Encoding UTF8

$retryPath = Join-Path $pendingDir $file.Name
Move-Item -Path $destPath -Destination $retryPath -Force
Write-Host "[消费者 $ConsumerId] 消息已放回队列 " `
"(重试 $retryCount/$($message.MaxRetries))"
}

Start-Sleep -Seconds 1
}
}

return $processed
}

# 先往队列灌入一批消息
$queueRoot = Join-Path $env:TEMP 'PSFileQueue'

$tasks = @(
@{ Task = 'CleanupTemp'; Path = 'C:\Temp' }
@{ Task = 'ArchiveLogs'; Path = 'C:\Logs' }
@{ Task = 'SyncData'; Source = '\\server\share'; Dest = 'D:\Backup' }
@{ Task = 'HealthCheck'; Server = 'db-prod-01' }
@{ Task = 'UpdateConfig'; App = 'WebApp'; Version = '2.5.0' }
@{ Task = 'RotateKeys'; Service = 'APIGateway' }
@{ Task = 'SendReport'; Recipients = @('admin@corp.com') }
@{ Task = 'ValidateBackup'; Location = '\\nas\backup' }
)

foreach ($task in $tasks) {
Send-FileQueueMessage -QueuePath $queueRoot -MessageData $task -Label 'BatchJob'
}

Write-Host "`n启动 3 个并行消费者...`n"

# 启动 3 个消费者作为后台作业
$jobs = @()
foreach ($i in 1..3) {
$jobs += Start-Job -ScriptBlock {
param($QueuePath, $ConsumerId)
# 在 Job 中重新定义必要的函数逻辑
$pendingDir = Join-Path $QueuePath 'pending'
$processingDir = Join-Path $QueuePath 'processing'
$completedDir = Join-Path $QueuePath 'completed'
$deadletterDir = Join-Path $QueuePath 'deadletter'

$count = 0
while ($count -lt 3) {
$files = Get-ChildItem -Path $pendingDir -Filter '*.json' |
Sort-Object Name |
Select-Object -First 1

if ($null -eq $files) {
Start-Sleep -Seconds 1
continue
}

$file = $files
$dest = Join-Path $processingDir $file.Name

try {
Move-Item $file.FullName $dest -Force -ErrorAction Stop
$msg = Get-Content $dest -Raw | ConvertFrom-Json
$workMs = Get-Random -Min 200 -Max 1000
Start-Sleep -Milliseconds $workMs
$done = Join-Path $completedDir $file.Name
Move-Item $dest $done -Force
$count++
"[$ConsumerId] 完成: $($msg.Label) - $($msg.Data.Task)"
} catch {
$dead = Join-Path $deadletterDir $file.Name
if (Test-Path $dest) { Move-Item $dest $dead -Force -ErrorAction SilentlyContinue }
"[$ConsumerId] 失败: $($file.Name)"
}
}
return $count
} -ArgumentList $queueRoot, $i
}

# 等待所有作业完成
$results = $jobs | Wait-Job -Timeout 60 | Receive-Job

# 清理作业
$jobs | Remove-Job -Force

Write-Host "`n=== 消费结果汇总 ==="
foreach ($result in $results) {
Write-Host $result
}

# 查看队列最终状态
Write-Host "`n=== 队列最终状态 ==="
foreach ($dir in @('pending', 'processing', 'completed', 'deadletter')) {
$count = (Get-ChildItem (Join-Path $queueRoot $dir) -Filter '*.json' -ErrorAction SilentlyContinue).Count
Write-Host ("{0,-15} {1} 条消息" -f $dir, $count)
}

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
启动 3 个并行消费者...

=== 消费结果汇总 ===
[1] 完成: BatchJob - CleanupTemp
[2] 完成: BatchJob - ArchiveLogs
[3] 完成: BatchJob - SyncData
[1] 完成: BatchJob - HealthCheck
[2] 完成: BatchJob - UpdateConfig
[3] 完成: BatchJob - RotateKeys

=== 队列最终状态 ===
pending 0 条消息
processing 0 条消息
completed 6 条消息
deadletter 2 条消息

注意事项

  1. MSMQ 可用性限制:MSMQ 仅在 Windows 平台可用,且需要在”启用或关闭 Windows 功能”中手动安装。如果你的自动化流程需要跨平台运行(Linux、macOS),建议使用文件队列或第三方消息中间件(如 RabbitMQ、Redis Streams)作为替代方案。

  2. 文件队列的原子性:使用文件系统模拟队列时,Move-Item 操作在 NTFS 和大多数 Linux 文件系统上是原子的,这是确保同一消息不会被多个消费者同时取走的关键。但在网络共享路径(UNC 路径)上,原子性可能无法保证,需要额外加锁机制。

  3. 消息大小与性能:MSMQ 单条消息默认大小限制为 4MB。如果需要传输更大的数据负载,应将数据存储在外部(如文件共享、数据库、对象存储),在消息体中只传递引用路径。文件队列虽然没有硬性大小限制,但过大的 JSON 文件会显著影响读写性能。

  4. 死信队列不可忽略:任何消息队列系统都必须有死信(Dead Letter)处理机制。超过最大重试次数的消息不应被简单丢弃,而应移入死信队列供人工审查或定时重处理。定期监控死信队列的积压情况是运维健康度的重要指标。

  5. 后台作业的生命周期管理:使用 Start-Job 启动消费者时,务必通过 Wait-Job 设置超时时间,并在完成后调用 Remove-Job 清理资源。长时间运行的消费者建议改用 .NET 的 System.Threading.ThreadRunspace 池,以获得更好的性能和更精细的控制。

  6. 幂等性设计:消费者处理逻辑必须设计为幂等(Idempotent),即同一条消息被处理多次不会产生副作用。这是因为即使在最佳实践的队列系统中,”至少一次投递”(at-least-once delivery)也是比”恰好一次投递”(exactly-once delivery)更容易实现的保证。在代码中通过检查 MessageId 和处理状态来跳过已处理过的消息是常见的幂等策略。

PowerShell 技能连载 - 异步编程模式

适用于 PowerShell 5.1 及以上版本,并行功能需要 PowerShell 7

运维脚本经常面临 IO 等待——网络请求、文件操作、数据库查询,这些操作的响应时间往往远超 CPU 处理时间。如果顺序执行 100 个 HTTP 健康检查,每个耗时 1 秒,总共需要 100 秒;但如果并发执行,可能只需要几秒。PowerShell 提供了多种异步编程机制:后台作业(Jobs)、运行空间(Runspaces)、ForEach-Object -Parallel、以及 .NET 的异步 API。

本文将讲解 PowerShell 中的异步编程模式及其适用场景。

后台作业(Jobs)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# Start-Job:在后台运行脚本块
$job = Start-Job -ScriptBlock {
Start-Sleep -Seconds 2
Get-Process | Select-Object Name, Id, CPU | Sort-Object CPU -Descending | Select-Object -First 5
}

Write-Host "作业已启动,ID:$($job.Id)" -ForegroundColor Cyan

# 非阻塞检查状态
while ($job.State -eq 'Running') {
Write-Host "." -NoNewline
Start-Sleep -Milliseconds 500
}
Write-Host

# 获取结果
$result = Receive-Job -Job $job
Write-Host "作业完成,结果:"
$result | Format-Table -AutoSize

# 清理作业
Remove-Job -Job $job

# 批量后台作业
$servers = @("SRV01", "SRV02", "SRV03", "SRV04", "SRV05")
$jobs = foreach ($server in $servers) {
Start-Job -ScriptBlock {
param($computer)
$os = Get-CimInstance Win32_OperatingSystem -ComputerName $computer -ErrorAction SilentlyContinue
if ($os) {
[PSCustomObject]@{
Server = $computer
OS = $os.Caption
Version = $os.Version
Status = "Online"
}
} else {
[PSCustomObject]@{
Server = $computer
OS = "N/A"
Version = "N/A"
Status = "Offline"
}
}
} -ArgumentList $server
}

Write-Host "启动了 $($jobs.Count) 个后台作业" -ForegroundColor Cyan

# 等待所有作业完成
$jobs | Wait-Job | Out-Null

# 收集所有结果
$allResults = $jobs | Receive-Job
$allResults | Format-Table -AutoSize

# 清理
$jobs | Remove-Job

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
作业已启动,ID:15
...

作业完成,结果:
Name Id CPU
---- -- ---
chrome 12345 1250.3
vscode 12346 456.7
powershell 12347 234.5
node 12348 123.4
dotnet 12349 98.1

启动了 5 个后台作业

Server OS Version Status
------ -- ------- ------
SRV01 Microsoft Windows Server 2022 Standard 10.0.20348 Online
SRV02 Microsoft Windows Server 2022 Standard 10.0.20348 Online
SRV03 N/A N/A Offline
SRV04 Microsoft Windows Server 2019 Standard 10.0.17763 Online
SRV05 Microsoft Windows Server 2022 Standard 10.0.20348 Online

ForEach-Object -Parallel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# PowerShell 7 并行执行(推荐方式)
$servers = @("SRV01", "SRV02", "SRV03", "SRV04", "SRV05", "SRV06", "SRV07", "SRV08")

$sw = [System.Diagnostics.Stopwatch]::StartNew()

$results = $servers | ForEach-Object -Parallel {
$server = $_
try {
$response = Invoke-WebRequest -Uri "http://${server}:8080/health" `
-TimeoutSec 5 -UseBasicParsing -ErrorAction Stop
[PSCustomObject]@{
Server = $server
Status = "Healthy"
Code = $response.StatusCode
LatencyMs = $response.RawContentLength
}
} catch {
[PSCustomObject]@{
Server = $server
Status = "Unhealthy"
Code = 0
LatencyMs = 0
Error = $_.Exception.Message
}
}
} -ThrottleLimit 4

$sw.Stop()

$results | Format-Table -AutoSize
Write-Host "并行检查 8 台服务器耗时:$($sw.ElapsedMilliseconds)ms" -ForegroundColor Green

# 共享变量(需要 $using:)
$threshold = 80
$results = $servers | ForEach-Object -Parallel {
$server = $_
$cpu = Get-Random -Min 10 -Max 95
[PSCustomObject]@{
Server = $server
CPU = $cpu
IsAlert = $cpu -gt $using:threshold
}
} -ThrottleLimit 4

$results | Where-Object { $_.IsAlert } | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Server Status    Code LatencyMs
------ ------ ---- ---------
SRV01 Healthy 200 256
SRV02 Healthy 200 256
SRV03 Unhealthy 0 0
SRV04 Healthy 200 256
SRV05 Healthy 200 256
SRV06 Healthy 200 256
SRV07 Unhealthy 0 0
SRV08 Healthy 200 256

并行检查 8 台服务器耗时:2150ms

Server CPU IsAlert
------ --- -------
SRV02 92 True
SRV05 87 True

Runspace 池

对于需要精确控制的并发场景,使用 Runspace 池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
function Invoke-Parallel {
<#
.SYNOPSIS
使用 Runspace 池并行执行脚本块
#>
param(
[Parameter(Mandatory)]
[scriptblock]$ScriptBlock,

[Parameter(Mandatory)]
[object[]]$InputData,

[int]$ThrottleLimit = 4
)

# 创建 Runspace 池
$pool = [System.Management.Automation.Runspaces.RunspaceFactory]::CreateRunspacePool(
1, $ThrottleLimit
)
$pool.Open()

$runspaces = @()

foreach ($item in $InputData) {
$powershell = [System.Management.Automation.PowerShell]::Create()
$powershell.RunspacePool = $pool

$null = $powershell.AddScript($ScriptBlock).AddArgument($item)

$runspaces += [PSCustomObject]@{
Pipe = $powershell
Handle = $powershell.BeginInvoke()
Input = $item
}
}

# 等待完成并收集结果
$results = @()
foreach ($rs in $runspaces) {
try {
$output = $rs.Pipe.EndInvoke($rs.Handle)
if ($output) {
foreach ($o in $output) {
$results += $o
}
}
} catch {
$results += [PSCustomObject]@{
Error = $true
Input = $rs.Input
Message = $_.Exception.Message
}
} finally {
$rs.Pipe.Dispose()
}
}

$pool.Close()
$pool.Dispose()

return $results
}

# 使用 Runspace 池并行 ping
$servers = 1..20 | ForEach-Object { "192.168.1.$_" }

$sw = [System.Diagnostics.Stopwatch]::StartNew()
$results = Invoke-Parallel -ScriptBlock {
param($ip)
$ping = Test-Connection -ComputerName $ip -Count 1 -Quiet -ErrorAction SilentlyContinue
[PSCustomObject]@{ IP = $ip; Online = $ping }
} -InputData $servers -ThrottleLimit 10
$sw.Stop()

$online = ($results | Where-Object { $_.Online }).Count
Write-Host "扫描 20 个 IP,$online 个在线,耗时:$($sw.ElapsedMilliseconds)ms" -ForegroundColor Green
$results | Where-Object { $_.Online } | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
扫描 20 个 IP,8 个在线,耗时:3200ms

IP Online
-- ------
192.168.1.1 True
192.168.1.2 True
192.168.1.10 True
192.168.1.11 True
192.168.1.12 True
192.168.1.20 True
192.168.1.50 True
192.168.1.100 True

异步文件操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 并行处理文件
$files = Get-ChildItem "C:\Logs" -Filter "*.log" -Recurse | Select-Object -First 20

$sw = [System.Diagnostics.Stopwatch]::StartNew()

$fileStats = $files | ForEach-Object -Parallel {
$file = $_
$lines = 0
$errors = 0

$reader = [System.IO.StreamReader]::new($file.FullName)
while ($null -ne $reader.ReadLine()) {
$lines++
}
$reader.Close()

[PSCustomObject]@{
Name = $file.Name
SizeKB = [math]::Round($file.Length / 1KB, 1)
Lines = $lines
Modified = $file.LastWriteTime.ToString('yyyy-MM-dd HH:mm')
}
} -ThrottleLimit 4

$sw.Stop()

$fileStats | Format-Table -AutoSize
Write-Host "并行处理 $($files.Count) 个文件耗时:$($sw.ElapsedMilliseconds)ms" -ForegroundColor Green

执行结果示例:

1
2
3
4
5
6
7
8
Name               SizeKB Lines Modified
---- ------- ----- --------
app-20250709.log 1024.5 12050 2025-07-09 08:30
app-20250708.log 856.3 9876 2025-07-08 23:59
app-20250707.log 743.1 8543 2025-07-07 23:59
security-20250709.log 456.7 5432 2025-07-09 08:15

并行处理 20 个文件耗时:850ms

注意事项

  1. Jobs 开销大:每个 Job 启动新的 PowerShell 进程,开销较大。少量任务用 Jobs,大量任务用 Runspace 或 -Parallel
  2. 线程安全:并行操作中不要直接修改共享变量,使用 ConcurrentDictionary 或收集结果后统一处理
  3. ThrottleLimit:设置合理的并发限制,过多并发会导致资源争用和 API 限流
  4. 错误处理:并行任务中的错误不会自动传播到主线程,需要显式捕获和收集
  5. $using: 作用域ForEach-Object -Parallel 中访问外部变量需要 $using: 前缀
  6. Runspace 清理:使用完 Runspace 和 RunspacePool 后必须调用 Dispose() 释放资源