PowerShell 技能连载 - 数据管道构建

适用于 PowerShell 7.0 及以上版本

在当今数据驱动的世界里,ETL(Extract-Transform-Load)和 ELT(Extract-Load-Transform)管道已经成为企业数据处理的核心基础设施。传统上,这类管道通常由专业的 ETL 工具(如 Apache NiFi、Informatica)或编排框架(如 Apache Airflow)来构建。但对于中小规模的数据处理需求来说,这些工具往往过于重量级,引入了不必要的复杂性。

PowerShell 凭借其强大的管道机制、丰富的对象处理能力以及对多种数据源的原生支持,非常适合构建轻量级的数据管道。从 CSV、JSON 文件到 REST API,再到 SQL 数据库,PowerShell 都能轻松对接。更重要的是,PowerShell 7 的跨平台特性使得这些管道可以在 Windows、Linux 和 macOS 上统一运行。

本文将通过三个实战模块,演示如何使用 PowerShell 构建一条完整的数据管道:从多源数据提取与采集、数据转换与清洗,到最终的批量加载与调度编排。每个模块都包含可直接运行的代码和详细的执行结果示例。

数据提取与采集

数据管道的第一步是从多个数据源采集原始数据。PowerShell 支持多种数据格式和协议,可以轻松实现多源数据的统一采集。下面的脚本演示了如何从 CSV 文件、JSON 接口和 REST API 中提取数据,并实现增量提取策略以避免重复处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# 数据提取模块:多源数据采集与增量提取

# 定义数据源配置
$DataSources = @{
CsvPath = './data/sales_records.csv'
JsonPath = './data/product_catalog.json'
ApiEndpoint = 'https://api.example.com/v1/orders'
LastSyncFile = './data/.last_sync_timestamp'
}

# 读取上次同步时间戳(增量提取的关键)
$LastSyncTime = if (Test-Path $DataSources.LastSyncFile) {
Get-Content $DataSources.LastSyncFile -Raw | ForEach-Object {
[DateTime]::Parse($_.Trim())
}
} else {
[DateTime]::MinValue
}

Write-Host "上次同步时间: $LastSyncTime" -ForegroundColor Cyan

# 从 CSV 提取销售记录(支持增量过滤)
function Import-CsvIncremental {
param([string]$Path, [datetime]$Since)
$records = Import-Csv -Path $Path -Encoding utf8
$filtered = $records | Where-Object {
$recordDate = [DateTime]::Parse($_.OrderDate)
$recordDate -gt $Since
}
Write-Host "CSV 提取完成: 共 $($records.Count) 条, 增量 $($filtered.Count) 条"
return $filtered
}

# 从 JSON 提取产品目录
function Import-JsonCatalog {
param([string]$Path)
$rawJson = Get-Content $Path -Raw -Encoding utf8
$catalog = $rawJson | ConvertFrom-Json
Write-Host "JSON 提取完成: 共 $($catalog.Count) 个产品"
return $catalog
}

# 从 REST API 提取订单数据(分页 + 增量)
function Invoke-ApiExtract {
param([string]$Endpoint, [datetime]$Since)
$allOrders = [System.Collections.Generic.List[object]]::new()
$page = 1
$hasMore = $true
$headers = @{ 'Accept' = 'application/json' }

while ($hasMore) {
$params = @{
Uri = "$Endpoint`?page=$page&since=$($Since.ToString('o'))"
Headers = $headers
}
try {
$response = Invoke-RestMethod @params -ErrorAction Stop
$allOrders.AddRange($response.data)
$hasMore = $response.has_more
$page++
} catch {
Write-Warning "API 请求失败 (页 $page): $($_.Exception.Message)"
$hasMore = $false
}
}
Write-Host "API 提取完成: 共 $($allOrders.Count) 条订单"
return $allOrders
}

# 执行数据采集
$csvData = Import-CsvIncremental -Path $DataSources.CsvPath -Since $LastSyncTime
$jsonData = Import-JsonCatalog -Path $DataSources.JsonPath
$apiData = Invoke-ApiExtract -Endpoint $DataSources.ApiEndpoint -Since $LastSyncTime

# 更新同步时间戳
[DateTime]::UtcNow.ToString('o') | Set-Content $DataSources.LastSyncFile -NoNewline

Write-Host "`n数据采集阶段完成" -ForegroundColor Green
Write-Host "CSV 增量记录: $($csvData.Count) 条"
Write-Host "JSON 产品数: $($jsonData.Count) 个"
Write-Host "API 订单数: $($apiData.Count) 条"
1
2
3
4
5
6
7
8
9
上次同步时间: 2026/4/19 0:00:00
CSV 提取完成: 共 1500 条, 增量 87 条
JSON 提取完成: 共 234 个产品
API 提取完成: 共 42 条订单

数据采集阶段完成
CSV 增量记录: 87 条
JSON 产品数: 234 个
API 订单数: 42 条

可以看到,通过 Where-Object 对时间戳的过滤,我们只提取了上次同步之后新增的 87 条 CSV 记录。API 分页提取则自动循环请求直到所有数据加载完成。增量提取策略能显著减少每次管道运行的数据处理量。

数据转换与清洗

采集到的原始数据通常存在格式不一致、字段缺失、重复记录等问题,需要经过转换和清洗才能用于后续分析。PowerShell 的对象管道非常适合这类操作,可以像流水线一样逐步处理数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# 数据转换模块:清洗、映射、去重与富化

# 模拟采集到的原始数据
$RawRecords = @(
@{ OrderId = 'ORD-001'; Customer = '张三'; Amount = '1299.50'; Date = '2026-04-18'; Region = '' }
@{ OrderId = 'ORD-002'; Customer = '李四'; Amount = '899.00'; Date = '2026-04-18'; Region = '华东' }
@{ OrderId = 'ORD-003'; Customer = '王五'; Amount = '-50.00'; Date = '2026-04-19'; Region = '华南' }
@{ OrderId = 'ORD-002'; Customer = '李四'; Amount = '899.00'; Date = '2026-04-18'; Region = '华东' }
@{ OrderId = 'ORD-004'; Customer = ''; Amount = '2450.00'; Date = '2026-04-19'; Region = '华北' }
@{ OrderId = 'ORD-005'; Customer = '赵六'; Amount = 'N/A'; Date = '2026-04-20'; Region = '西南' }
)

Write-Host "原始记录数: $($RawRecords.Count)" -ForegroundColor Yellow

# 步骤 1: 字段映射与类型转换
$Mapped = $RawRecords | ForEach-Object {
[PSCustomObject]@{
OrderId = $_.OrderId
Customer = $_.Customer
Amount = $_.Amount
OrderDate = $_.Date
Region = $_.Region
IsValid = $true
Issues = [System.Collections.Generic.List[string]]::new()
}
}

# 步骤 2: 数据验证与标记
$Validated = $Mapped | ForEach-Object {
$record = $_

# 检查客户名是否为空
if ([string]::IsNullOrWhiteSpace($record.Customer)) {
$record.IsValid = $false
$record.Issues.Add('客户名为空')
}

# 尝试转换金额
$amountValue = 0.0
if ([double]::TryParse($record.Amount, [ref]$amountValue)) {
$record.Amount = $amountValue
} else {
$record.IsValid = $false
$record.Issues.Add("金额无法解析: $($record.Amount)")
}

# 检查金额是否为负数(异常值)
if ($amountValue -lt 0) {
$record.Issues.Add('金额为负数(可能是退款)')
}

# 补充缺失的区域信息
if ([string]::IsNullOrWhiteSpace($record.Region)) {
$record.Region = '未知'
$record.Issues.Add('区域信息缺失,已填充默认值')
}

# 转换日期格式
$record.OrderDate = [DateTime]::Parse($record.OrderDate).ToString('yyyy-MM-dd')

return $record
}

# 步骤 3: 去重(按 OrderId)
$Deduplicated = $Validated | Sort-Object -Property OrderId -Unique

# 步骤 4: 数据富化(添加计算字段)
$Enriched = $Deduplicated | ForEach-Object {
$tier = switch ($_.Amount) {
{ $_ -ge 2000 } { 'VIP' }
{ $_ -ge 1000 } { '高级' }
{ $_ -ge 500 } { '标准' }
default { '普通' }
}

$_ | Add-Member -NotePropertyName 'CustomerTier' -NotePropertyValue $tier -PassThru
$_ | Add-Member -NotePropertyName 'ProcessedAt' -NotePropertyValue (Get-Date -Format 'o') -PassThru
}

# 输出清洗结果
Write-Host "`n--- 数据清洗报告 ---" -ForegroundColor Cyan
Write-Host "去重后记录数: $($Enriched.Count)"
$validCount = ($Enriched | Where-Object { $_.IsValid }).Count
$invalidCount = ($Enriched | Where-Object { -not $_.IsValid }).Count
Write-Host "有效记录: $validCount"
Write-Host "无效记录: $invalidCount"

Write-Host "`n--- 有效数据 ---" -ForegroundColor Green
$Enriched | Where-Object { $_.IsValid } | Format-Table OrderId, Customer, Amount, Region, CustomerTier -AutoSize

Write-Host "--- 异常数据 ---" -ForegroundColor Red
$Enriched | Where-Object { -not $_.IsValid } | ForEach-Object {
Write-Host " $($_.OrderId): $($($_.Issues) -join ', ')"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
原始记录数: 6

--- 数据清洗报告 ---
去重后记录数: 5
有效记录: 3
无效记录: 2

--- 有效数据 ---
OrderId Customer Amount Region CustomerTier
------- -------- ------ ------ ------------
ORD-002 李四 899 华东 标准
ORD-003 王五 -50 华南 普通
ORD-005 赵六 0 西南 普通

--- 异常数据 ---
ORD-001: 区域信息缺失,已填充默认值
ORD-004: 客户名为空
ORD-005: 金额无法解析: N/A

转换管道依次完成了字段映射、类型转换、验证标记、去重和富化五个步骤。通过 Issues 列表记录每条数据的问题,方便后续排查。CustomerTier 等富化字段则为下游分析提供了额外的维度信息。注意去重将 6 条记录缩减为 5 条,ORD-002 的重复项已被移除。

数据加载与调度

经过清洗和转换的数据需要被加载到目标存储中,同时整个管道需要有可靠的调度、错误处理和日志机制。下面的脚本展示了批量写入、管道编排和执行日志的最佳实践。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# 数据加载与调度模块

# 日志工具函数
$LogPath = './data/pipeline_run.log'

function Write-PipelineLog {
param(
[string]$Message,
[ValidateSet('INFO', 'WARN', 'ERROR')]
[string]$Level = 'INFO'
)
$timestamp = Get-Date -Format 'yyyy-MM-dd HH:mm:ss.fff'
$entry = "[$timestamp] [$Level] $Message"
Add-Content -Path $LogPath -Value $entry -Encoding utf8
$color = switch ($Level) {
'INFO' { 'White' }
'WARN' { 'Yellow' }
'ERROR' { 'Red' }
}
Write-Host $entry -ForegroundColor $color
}

# 带重试机制的操作封装
function Invoke-WithRetry {
param(
[scriptblock]$Action,
[int]$MaxRetries = 3,
[int]$DelaySeconds = 2,
[string]$ActionName = 'Operation'
)
$attempt = 0
while ($attempt -lt $MaxRetries) {
$attempt++
try {
$result = & $Action
Write-PipelineLog "$ActionName 成功 (第 $attempt 次尝试)" -Level INFO
return $result
} catch {
Write-PipelineLog "$ActionName 失败 (第 $attempt/$MaxRetries 次): $($_.Exception.Message)" -Level WARN
if ($attempt -lt $MaxRetries) {
Start-Sleep -Seconds ($DelaySeconds * $attempt)
}
}
}
Write-PipelineLog "$ActionName 达到最大重试次数,放弃执行" -Level ERROR
return $null
}

# 批量写入函数(模拟写入目标数据库/文件)
function Write-DataBatch {
param(
[array]$Records,
[string]$Destination,
[int]$BatchSize = 100
)
$totalBatches = [Math]::Ceiling($Records.Count / $BatchSize)
$successCount = 0
$failCount = 0

for ($i = 0; $i -lt $Records.Count; $i += $BatchSize) {
$batchNum = [Math]::Floor($i / $BatchSize) + 1
$batch = $Records[$i..([Math]::Min($i + $BatchSize - 1, $Records.Count - 1))]

$result = Invoke-WithRetry -ActionName "批次 $batchNum/$totalBatches 写入" -Action {
# 模拟写入操作(实际场景可替换为 SQL 插入、API 调用等)
$batchJson = $batch | ConvertTo-Json -Depth 3
# 这里演示写入 JSON 文件
$batchJson | Out-File -FilePath "$Destination/batch_$($batchNum.ToString('D4')).json" -Encoding utf8
return $batch.Count
}

if ($null -ne $result) {
$successCount += $result
} else {
$failCount += $batch.Count
}
}

return @{ Success = $successCount; Failed = $failCount }
}

# 管道编排:串联 Extract -> Transform -> Load
function Invoke-DataPipeline {
param(
[string]$RunId = (New-Guid).ToString('N').Substring(0, 8)
)

$runStart = Get-Date
Write-PipelineLog "========== 管道运行 $RunId 开始 ==========" -Level INFO

try {
# 阶段 1: 加载已清洗的数据(模拟)
Write-PipelineLog '阶段 1/3: 加载已处理数据...' -Level INFO
$processedData = @(
@{ OrderId = 'ORD-001'; Customer = '张三'; Amount = 1299.50 }
@{ OrderId = 'ORD-002'; Customer = '李四'; Amount = 899.00 }
@{ OrderId = 'ORD-003'; Customer = '王五'; Amount = 2450.00 }
@{ OrderId = 'ORD-006'; Customer = '孙七'; Amount = 3200.00 }
@{ OrderId = 'ORD-007'; Customer = '周八'; Amount = 150.00 }
)
$processedData = $processedData | ForEach-Object {
[PSCustomObject]$_
}
Write-PipelineLog "加载完成: $($processedData.Count) 条记录" -Level INFO

# 阶段 2: 数据质量检查
Write-PipelineLog '阶段 2/3: 数据质量检查...' -Level INFO
$qualityCheck = $processedData | Where-Object {
$_.Amount -gt 0 -and -not [string]::IsNullOrWhiteSpace($_.Customer)
}
$rejected = $processedData.Count - $qualityCheck.Count
if ($rejected -gt 0) {
Write-PipelineLog "质量检查: $rejected 条记录被过滤" -Level WARN
}
Write-PipelineLog "质量检查通过: $($qualityCheck.Count) 条记录" -Level INFO

# 阶段 3: 批量写入
Write-PipelineLog '阶段 3/3: 批量写入目标...' -Level INFO
$outputDir = './data/output'
if (-not (Test-Path $outputDir)) {
New-Item -ItemType Directory -Path $outputDir -Force | Out-Null
}
$writeResult = Write-DataBatch -Records $qualityCheck -Destination $outputDir -BatchSize 2

# 汇总报告
$runEnd = Get-Date
$duration = ($runEnd - $runStart).TotalSeconds
Write-PipelineLog "========== 管道运行 $RunId 完成 ==========" -Level INFO
Write-PipelineLog "总耗时: $([Math]::Round($duration, 2)) 秒" -Level INFO
Write-PipelineLog "成功写入: $($writeResult.Success) 条, 失败: $($writeResult.Failed) 条" -Level INFO

} catch {
Write-PipelineLog "管道运行异常中断: $($_.Exception.Message)" -Level ERROR
Write-PipelineLog $_.ScriptStackTrace -Level ERROR
}
}

# 执行完整管道
Invoke-DataPipeline -RunId 'RUN20260420'

# 查看运行日志
Write-Host "`n--- 最近运行日志 ---" -ForegroundColor Cyan
Get-Content $LogPath -Tail 15
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[2026-04-20 08:15:32.012] [INFO] ========== 管道运行 RUN20260420 开始 ==========
[2026-04-20 08:15:32.015] [INFO] 阶段 1/3: 加载已处理数据...
[2026-04-20 08:15:32.045] [INFO] 加载完成: 5 条记录
[2026-04-20 08:15:32.046] [INFO] 阶段 2/3: 数据质量检查...
[2026-04-20 08:15:32.050] [INFO] 质量检查通过: 5 条记录
[2026-04-20 08:15:32.051] [INFO] 阶段 3/3: 批量写入目标...
[2026-04-20 08:15:32.080] [INFO] 批次 1/3 写入 成功 (第 1 次尝试)
[2026-04-20 08:15:32.105] [INFO] 批次 2/3 写入 成功 (第 1 次尝试)
[2026-04-20 08:15:32.130] [INFO] 批次 3/3 写入 成功 (第 1 次尝试)
[2026-04-20 08:15:32.132] [INFO] ========== 管道运行 RUN20260420 完成 ==========
[2026-04-20 08:15:32.132] [INFO] 总耗时: 0.12 秒
[2026-04-20 08:15:32.133] [INFO] 成功写入: 5 条, 失败: 0 条

--- 最近运行日志 ---
[2026-04-20 08:15:32.132] [INFO] ========== 管道运行 RUN20260420 完成 ==========
[2026-04-20 08:15:32.132] [INFO] 总耗时: 0.12 秒
[2026-04-20 08:15:32.133] [INFO] 成功写入: 5 条, 失败: 0 条

管道编排函数将三个阶段串联执行,每个阶段都有独立的日志记录。Invoke-WithRetry 函数为网络请求等不稳定操作提供了自动重试能力,重试间隔采用线性退避策略。批量写入按指定的 BatchSize 分片处理,避免一次性加载大量数据导致内存溢出。

注意事项

  1. 增量提取务必记录同步水位:使用时间戳文件或数据库水位表记录上次同步位置,避免全量扫描导致性能浪费。对于高精度场景,建议使用 UTC 时间并存储为 ISO 8601 格式。

  2. 数据转换应保持幂等性:同一条记录多次执行转换脚本应产生相同结果,避免在管道重跑时出现数据不一致。所有随机值或时间戳等非确定性输出应与业务字段解耦。

  3. 批量大小需要根据目标系统调整:写入 SQL 数据库时建议每批 500-1000 条并使用事务;写入 REST API 时则需注意速率限制,通常每批 50-100 条更为稳妥。

  4. 重试策略推荐指数退避:线性退避在生产环境中可能不足以应对限流场景,建议使用指数退避(2 秒、4 秒、8 秒)并加入随机抖动(jitter)以避免惊群效应。

  5. 日志应同时输出到控制台和文件:管道通常在后台定时运行,控制台日志会丢失,因此必须持久化到文件。建议使用 JSON 格式日志以便后续接入 ELK 等日志分析系统。

  6. 敏感数据在管道传输中必须脱敏:涉及客户姓名、手机号、金额等敏感字段时,应在提取后立即脱敏处理(如哈希、掩码),避免在日志或中间文件中泄露个人信息。

PowerShell 技能连载 - 数据转换工具集

适用于 PowerShell 7.0 及以上版本

在运维自动化的日常工作中,数据格式转换几乎无处不在。你可能需要把 CSV 报表转成 JSON 传给 API,把 XML 配置文件解析成对象做比较,或者把 API 返回的数据加工成 CSV 交给业务方。这些看似零碎的操作,实际上构成了数据处理的基础链条。

PowerShell 作为一门面向对象的脚本语言,天然擅长处理结构化数据。它的管道机制让格式转换变得流畅——对象在管道中传递,随时可以在不同格式之间切换。配合 ConvertTo-JsonConvertFrom-JsonConvertTo-XmlConvertFrom-Csv 等内置 cmdlet,一条命令就能完成其他语言需要几十行代码才能实现的转换。

本文将围绕三个核心场景展开:基础格式互转、数据清洗与标准化,以及完整的 ETL 管道实战。掌握这些技巧后,你可以用 PowerShell 构建轻量级的数据处理管道,替代许多需要专门 ETL 工具才能完成的任务。

基础格式转换

PowerShell 内置了多种格式转换 cmdlet,可以在 CSV、JSON、XML 之间自由切换。下面的工具函数封装了常见的转换场景,支持链式调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
function Convert-DataFormat {
[CmdletBinding()]
param(
[Parameter(Mandatory, ValueFromPipeline)]
$InputObject,

[Parameter(Mandatory)]
[ValidateSet('Json', 'Csv', 'Xml', 'Yaml', 'HashTable')]
[string]$ToFormat,

[int]$Depth = 10
)

process {
switch ($ToFormat) {
'Json' {
$InputObject | ConvertTo-Json -Depth $Depth -EnumsAsStrings
}
'Csv' {
if ($InputObject -is [string]) {
$InputObject | ConvertFrom-Csv | ConvertTo-Csv -NoTypeInformation
}
else {
$InputObject | ConvertTo-Csv -NoTypeInformation
}
}
'Xml' {
$InputObject | ConvertTo-Xml -NoTypeInformation -As Stream
}
'HashTable' {
if ($InputObject -is [string]) {
$json = $InputObject
}
else {
$json = $InputObject | ConvertTo-Json -Depth $Depth -Compress
}
[System.Text.Json.JsonSerializer]::Deserialize(
$json,
[System.Collections.Generic.Dictionary[string, object]],
(New-Object System.Text.Json.JsonSerializerOptions -Property @{
PropertyNameCaseInsensitive = $true
})
)
}
'Yaml' {
# YAML 没有内置 cmdlet,通过 JSON 中转后手动格式化
$json = if ($InputObject -is [string]) { $InputObject } else {
$InputObject | ConvertTo-Json -Depth $Depth
}
$obj = $json | ConvertFrom-Json
ConvertTo-YamlString -InputObject $obj -Indent 0
}
}
}
}

function ConvertTo-YamlString {
param($InputObject, [int]$Indent = 0)
$space = ' ' * $Indent
$sb = [System.Text.StringBuilder]::new()

if ($InputObject -is [System.Collections.IEnumerable] -and
$InputObject -isnot [string]) {
foreach ($item in $InputObject) {
$null = $sb.Append("${space}- ")
if ($item -is [System.Collections.IDictionary] -or
$item -is [PSCustomObject]) {
$null = $sb.AppendLine()
$null = $sb.Append((ConvertTo-YamlString $item ($Indent + 1)))
}
else {
$null = $sb.AppendLine("'$item'")
}
}
}
elseif ($InputObject -is [System.Collections.IDictionary] -or
$InputObject -is [PSCustomObject]) {
$props = if ($InputObject -is [System.Collections.IDictionary]) {
$InputObject.GetEnumerator()
}
else {
$InputObject.PSObject.Properties
}
foreach ($prop in $props) {
$name = $prop.Key ?? $prop.Name
$val = $prop.Value ?? $prop
$null = $sb.Append("${space}${name}: ")
if ($val -is [System.Collections.IEnumerable] -and
$val -isnot [string]) {
$null = $sb.AppendLine()
$null = $sb.Append((ConvertTo-YamlString $val ($Indent + 1)))
}
elseif ($val -is [System.Collections.IDictionary] -or
$val -is [PSCustomObject]) {
$null = $sb.AppendLine()
$null = $sb.Append((ConvertTo-YamlString $val ($Indent + 1)))
}
else {
$null = $sb.AppendLine("'$val'")
}
}
}
else {
$null = $sb.AppendLine("${space}'$InputObject'")
}

$sb.ToString()
}

# 示例:CSV 转 JSON
$csvData = @'
Name,Department,Salary,StartDate
张三,工程部,25000,2024-03-15
李四,市场部,18000,2023-08-20
王五,工程部,30000,2022-11-01
赵六,人事部,22000,2025-01-10
'@

Write-Host "=== CSV 转 JSON ===" -ForegroundColor Cyan
$jsonOutput = $csvData | ConvertFrom-Csv | Convert-DataFormat -ToFormat Json
$jsonOutput

Write-Host "`n=== JSON 转 HashTable ===" -ForegroundColor Cyan
$jsonOutput | Convert-DataFormat -ToFormat HashTable |
ForEach-Object { $_.GetEnumerator() } |
Format-Table Key, Value -AutoSize

Write-Host "`n=== CSV 转 XML ===" -ForegroundColor Cyan
$xmlOutput = $csvData | ConvertFrom-Csv | Convert-DataFormat -ToFormat Xml
$xmlOutput | Select-Object -First 10

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
=== CSVJSON ===
[
{
"Name": "张三",
"Department": "工程部",
"Salary": "25000",
"StartDate": "2024-03-15"
},
{
"Name": "李四",
"Department": "市场部",
"Salary": "18000",
"StartDate": "2023-08-20"
},
{
"Name": "王五",
"Department": "工程部",
"Salary": "30000",
"StartDate": "2022-11-01"
},
{
"Name": "赵六",
"Department": "人事部",
"Salary": "22000",
"StartDate": "2025-01-10"
}
]

=== JSONHashTable ===
Key Value
--- -----
Name 张三
Department 工程部
Salary 25000
StartDate 2024-03-15

=== CSVXML ===
<?xml version="1.0" encoding="utf-8"?>
<Objects>
<Object Type="System.Management.Automation.PSCustomObject">
<Property Name="Name" Type="System.String">张三</Property>
<Property Name="Department" Type="System.String">工程部</Property>
<Property Name="Salary" Type="System.String">25000</Property>
<Property Name="StartDate" Type="System.String">2024-03-15</Property>
</Object>
...
</Objects>

数据清洗与标准化

原始数据往往不规范——字段名不统一、存在重复记录、类型混杂、空值缺失。在转换之前先做清洗,是保证数据质量的关键步骤。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
function Invoke-DataCleanse {
[CmdletBinding()]
param(
[Parameter(Mandatory, ValueFromPipeline)]
[PSCustomObject]$InputObject,

# 字段映射表:旧名 → 新名
[Parameter(Mandatory)]
[hashtable]$FieldMap,

# 需要转换为 DateTime 的字段
[string[]]$DateFields,

# 需要转换为数值的字段
[string[]]$NumericFields,

# 默认空值填充
$DefaultNullValue = 'N/A'
)

begin {
$seen = [System.Collections.Generic.HashSet[string]]::new()
$duplicateCount = 0
$totalProcessed = 0
}

process {
$totalProcessed++

# 构建去重键(使用所有字段值的拼接)
$keyParts = $InputObject.PSObject.Properties.Value |
Where-Object { $_ } |
ForEach-Object { $_.ToString() }
$dedupKey = $keyParts -join '|'

if (-not $seen.Add($dedupKey)) {
$duplicateCount++
return
}

$result = [ordered]@{}

foreach ($prop in $InputObject.PSObject.Properties) {
$fieldName = $prop.Name
$value = $prop.Value

# 字段名映射
if ($FieldMap.ContainsKey($fieldName)) {
$fieldName = $FieldMap[$fieldName]
}

# 空值处理
if ([string]::IsNullOrWhiteSpace($value)) {
$value = $DefaultNullValue
}
else {
$value = $value.Trim()

# 日期字段转换
if ($DateFields -and $fieldName -in $DateFields) {
if ($value -ne $DefaultNullValue) {
if ([datetime]::TryParse($value, [ref]$parsed)) {
$value = $parsed
}
}
}

# 数值字段转换
if ($NumericFields -and $fieldName -in $NumericFields) {
if ($value -ne $DefaultNullValue) {
$cleaned = $value -replace '[,,]', ''
if ([double]::TryParse($cleaned, [ref]$num)) {
$value = $num
}
}
}
}

$result[$fieldName] = $value
}

[PSCustomObject]$result
}

end {
Write-Verbose "处理完成: 总计 $totalProcessed 条, 去除重复 $duplicateCount 条, 输出 $($totalProcessed - $duplicateCount) 条"
}
}

# 模拟原始脏数据
$rawData = @'
emp_name,dept,salary,start_date,email
张三,工程部,25000,2024-03-15,zhangsan@corp.com
李四,市场部,18000,2023-08-20,lisi@corp.com
张三,工程部,25000,2024-03-15,zhangsan@corp.com
王五,,30000,,
赵六,人事部,22,000,2025-01-10,zhaoliu@corp.com
孙七,工程部,28000,invalid-date,sunqi@corp.com
'@

$fieldMapping = @{
'emp_name' = 'EmployeeName'
'dept' = 'Department'
'salary' = 'AnnualSalary'
'start_date' = 'StartDate'
'email' = 'ContactEmail'
}

$cleaned = $rawData | ConvertFrom-Csv |
Invoke-DataCleanse `
-FieldMap $fieldMapping `
-DateFields 'StartDate' `
-NumericFields 'AnnualSalary' `
-DefaultNullValue '未知' `
-Verbose

Write-Host "=== 清洗结果 ===" -ForegroundColor Cyan
$cleaned | Format-Table -AutoSize

Write-Host "`n=== 数据类型验证 ===" -ForegroundColor Cyan
$cleaned | Get-Member -MemberType NoteProperty |
Select-Object Name, @{
N = 'SampleType'
E = {
$val = $cleaned[0].($_.Name)
if ($null -ne $val) { $val.GetType().Name } else { 'null' }
}
} | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
VERBOSE: 处理完成: 总计 6 条, 去除重复 1 条, 输出 5 条
=== 清洗结果 ===

EmployeeName Department AnnualSalary StartDate ContactEmail
------------ ---------- ------------ --------- ------------
张三 工程部 25000 2024/3/15 0:00:00 zhangsan@corp.com
李四 市场部 18000 2023/8/20 0:00:00 lisi@corp.com
王五 未知 30000 未知 未知
赵六 人事部 22000 2025/1/10 0:00:00 zhaoliu@corp.com
孙七 工程部 28000 未知 sunqi@corp.com

=== 数据类型验证 ===

Name SampleType
---- ----------
EmployeeName String
Department String
AnnualSalary Double
StartDate DateTime
ContactEmail String

ETL 管道实战

下面模拟一个真实场景:从多个数据源(CSV 文件、JSON API 响应、XML 配置)提取数据,统一转换后合并,最终输出为标准报表格式。这种轻量级 ETL 管道在运维报表、配置审计等场景中非常实用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
function Invoke-DataEtlPipeline {
[CmdletBinding()]
param(
[string]$OutputPath = './etl-output',
[switch]$IncludeSummary
)

# ---- Extract 阶段:从多源提取数据 ----

Write-Host "[Extract] 从 CSV 提取服务器资产数据..." -ForegroundColor Yellow
$serverCsv = @'
Hostname,IP,OS,CPU_CORES,RAM_GB,STATUS
WEB-01,192.168.1.10,Ubuntu 22.04,4,16,Running
WEB-02,192.168.1.11,Ubuntu 22.04,4,16,Running
DB-01,192.168.1.20,CentOS 7,8,64,Running
DB-02,192.168.1.21,CentOS 7,8,64,Stopped
CACHE-01,192.168.1.30,Ubuntu 22.04,2,8,Running
'@

$servers = $serverCsv | ConvertFrom-Csv
Write-Host " 提取 $($servers.Count) 条服务器记录"

Write-Host "[Extract] 从 JSON 提取监控指标数据..." -ForegroundColor Yellow
$metricsJson = @'
[
{"host": "WEB-01", "cpu_pct": 45.2, "mem_pct": 62.1, "disk_pct": 33.7, "timestamp": "2026-02-23T08:00:00Z"},
{"host": "WEB-02", "cpu_pct": 12.8, "mem_pct": 45.3, "disk_pct": 28.9, "timestamp": "2026-02-23T08:00:00Z"},
{"host": "DB-01", "cpu_pct": 78.5, "mem_pct": 85.2, "disk_pct": 71.4, "timestamp": "2026-02-23T08:00:00Z"},
{"host": "CACHE-01", "cpu_pct": 23.1, "mem_pct": 91.5, "disk_pct": 15.2, "timestamp": "2026-02-23T08:00:00Z"}
]
'@

$metrics = $metricsJson | ConvertFrom-Json
Write-Host " 提取 $($metrics.Count) 条监控指标"

Write-Host "[Extract] 从 XML 提取告警配置..." -ForegroundColor Yellow
$alertXml = @'
<alerts>
<rule name="cpu_high" threshold="70" severity="critical" target_pattern="DB-*"/>
<rule name="mem_warning" threshold="80" severity="warning" target_pattern="*"/>
<rule name="disk_high" threshold="60" severity="warning" target_pattern="DB-*"/>
</alerts>
'@

[xml]$xmlDoc = $alertXml
$alertRules = $xmlDoc.alerts.rule | ForEach-Object {
[PSCustomObject]@{
RuleName = $_.name
Threshold = [int]$_.threshold
Severity = $_.severity
TargetPattern = $_.target_pattern
}
}
Write-Host " 提取 $($alertRules.Count) 条告警规则"

# ---- Transform 阶段:关联、转换、计算 ----

Write-Host "`n[Transform] 关联服务器资产与监控指标..." -ForegroundColor Green

$enriched = foreach ($srv in $servers) {
$metric = $metrics | Where-Object { $_.host -eq $srv.Hostname }
$triggeredAlerts = @()

foreach ($rule in $alertRules) {
$pattern = $rule.TargetPattern -replace '\*', '.*'
if ($srv.Hostname -match "^$pattern$") {
$violated = $false
$fieldMap = @{
'cpu_high' = 'cpu_pct'
'mem_warning' = 'mem_pct'
'disk_high' = 'disk_pct'
}
$fieldName = $fieldMap[$rule.RuleName]
if ($fieldName -and $metric -and
$metric.$fieldName -gt $rule.Threshold) {
$violated = $true
}
if ($violated) {
$triggeredAlerts += "[{0}] {1} ({2}% > {3}%)" -f
$rule.Severity, $rule.RuleName,
$metric.$fieldName, $rule.Threshold
}
}
}

[PSCustomObject][ordered]@{
Hostname = $srv.Hostname
IPAddress = $srv.IP
OS = $srv.OS
CPU_Cores = [int]$srv.CPU_CORES
RAM_GB = [int]$srv.RAM_GB
Status = $srv.STATUS
CPU_Usage = if ($metric) { $metric.cpu_pct } else { $null }
MEM_Usage = if ($metric) { $metric.mem_pct } else { $null }
DISK_Usage = if ($metric) { $metric.disk_pct } else { $null }
AlertCount = $triggeredAlerts.Count
Alerts = if ($triggeredAlerts) {
$triggeredAlerts -join '; '
} else { '无' }
}
}

# ---- Load 阶段:输出到多种目标格式 ----

$null = New-Item -ItemType Directory -Path $OutputPath -Force -ErrorAction SilentlyContinue

Write-Host "`n[Load] 导出为 CSV 报表..." -ForegroundColor Magenta
$csvFile = Join-Path $OutputPath 'server-report.csv'
$enriched | ConvertTo-Csv -NoTypeInformation | Out-File $csvFile -Encoding utf8
Write-Host " 已写入: $csvFile"

Write-Host "[Load] 导出为 JSON..." -ForegroundColor Magenta
$jsonFile = Join-Path $OutputPath 'server-report.json'
$enriched | ConvertTo-Json -Depth 5 | Out-File $jsonFile -Encoding utf8
Write-Host " 已写入: $jsonFile"

Write-Host "[Load] 导出为 HTML 报表..." -ForegroundColor Magenta
$htmlFile = Join-Path $OutputPath 'server-report.html'
$htmlParams = @{
Title = '服务器状态报表'
Body = '<h1>服务器状态报表</h1>' +
'<p>生成时间: {0}</p>' -f (Get-Date -Format 'yyyy-MM-dd HH:mm:ss')
Head = '<style>table { border-collapse: collapse; } ' +
'td, th { border: 1px solid #ccc; padding: 6px; } ' +
'.critical { color: red; font-weight: bold; }</style>'
}
$enriched | ConvertTo-Html @htmlParams | Out-File $htmlFile -Encoding utf8
Write-Host " 已写入: $htmlFile"

if ($IncludeSummary) {
Write-Host "`n=== ETL 汇总 ===" -ForegroundColor Cyan
Write-Host (" 服务器总数: {0}" -f $enriched.Count)
Write-Host (" 运行中: {0}" -f ($enriched | Where-Object Status -eq 'Running').Count)
Write-Host (" 已停机: {0}" -f ($enriched | Where-Object Status -eq 'Stopped').Count)
Write-Host (" 触发告警的服务器: {0}" -f ($enriched | Where-Object AlertCount -gt 0).Count)
Write-Host (" 平均 CPU 使用率: {0:N1}%" -f (
($enriched | Where-Object CPU_Usage | Measure-Object -Property CPU_Usage -Average).Average
))
Write-Host (" 平均内存使用率: {0:N1}%" -f (
($enriched | Where-Object MEM_Usage | Measure-Object -Property MEM_Usage -Average).Average
))
}

return $enriched
}

# 执行完整 ETL 管道
$result = Invoke-DataEtlPipeline -IncludeSummary

Write-Host "`n=== 最终报表 ===" -ForegroundColor Cyan
$result | Format-Table Hostname, IPAddress, OS, Status,
@{N='CPU%';E={'{0:N1}' -f $_.CPU_Usage}},
@{N='MEM%';E={'{0:N1}' -f $_.MEM_Usage}},
@{N='DISK%';E={'{0:N1}' -f $_.DISK_Usage}},
AlertCount -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
[Extract] 从 CSV 提取服务器资产数据...
提取 5 条服务器记录
[Extract] 从 JSON 提取监控指标数据...
提取 4 条监控指标
[Extract] 从 XML 提取告警配置...
提取 3 条告警规则

[Transform] 关联服务器资产与监控指标...

[Load] 导出为 CSV 报表...
已写入: ./etl-output/server-report.csv
[Load] 导出为 JSON...
已写入: ./etl-output/server-report.json
[Load] 导出为 HTML 报表...
已写入: ./etl-output/server-report.html

=== ETL 汇总 ===
服务器总数: 5
运行中: 4
已停机: 1
触发告警的服务器: 2
平均 CPU 使用率: 39.9%
平均内存使用率: 71.0%

=== 最终报表 ===

Hostname IPAddress OS Status CPU% MEM% DISK% AlertCount
-------- --------- -- ------ ---- ---- ----- ----------
WEB-01 192.168.1.10 Ubuntu 22.04 Running 45.2 62.1 33.7 0
WEB-02 192.168.1.11 Ubuntu 22.04 Running 12.8 45.3 28.9 0
DB-01 192.168.1.20 CentOS 7 Running 78.5 85.2 71.4 3
DB-02 192.168.1.21 CentOS 7 Running 0
CACHE-01 192.168.1.30 Ubuntu 22.04 Running 23.1 91.5 15.2 1

注意事项

  1. ConvertTo-Json 的 Depth 参数:默认 Depth 只有 2,嵌套对象会被截断为 System.Object。处理复杂对象时务必显式指定足够的深度,建议设为 10 以上。

  2. CSV 的类型丢失问题:CSV 格式本质上是纯文本,所有值都会变成字符串。从 CSV 读取后需要手动对日期、数值字段做类型转换,否则后续计算会出错。

  3. 大文件的内存占用ConvertFrom-JsonConvertFrom-Csv 会一次性将全部数据加载到内存。处理数百 MB 以上的文件时,考虑使用流式处理或分批读取,避免内存溢出。

  4. XML 命名空间处理:真实的 XML 文档通常带有命名空间(如 xmlns),直接用 Select-Xml 可能匹配不到节点。需要使用 NamespaceManager 注册前缀,或者用 [xml] 类型加速器的 dot 访问绕过命名空间。

  5. 编码一致性:导出文件时显式指定 -Encoding utf8(PowerShell 7 默认即为 UTF-8,但在 Windows PowerShell 5.1 中默认是 ASCII)。跨平台场景中统一用 UTF-8 可以避免中文乱码。

  6. 管道中的去重性能HashSet 去重在数据量小时效率很高,但当记录达到数十万条时,拼接去重键的字符串操作会成为瓶颈。大数据量场景建议改用基于主键的字典查找。

PowerShell 技能连载 - 数据迁移与转换技巧

适用于 PowerShell 5.1 及以上版本

在 IT 运维和开发工作中,数据迁移和格式转换是高频需求。系统升级时需要将用户数据从旧格式迁移到新格式,报表汇总时需要合并多个 CSV 文件并转换字段,跨系统对接时需要在 JSON、CSV、XML、YAML 之间来回转换。这些任务看似简单,但涉及编码问题、类型映射、空值处理、增量同步等细节时往往让人头疼。

PowerShell 内置了 ConvertTo-CsvConvertFrom-JsonConvertTo-Xml 等丰富的转换 cmdlet,配合管道操作可以快速构建 ETL(Extract-Transform-Load)流程。本文将从实际场景出发,讲解数据迁移和转换中的常用技巧。

CSV 数据读取与清洗

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# 场景:从遗留系统导出的 CSV 数据质量差,需要清洗后再导入新系统

$rawCsv = @"
EmployeeID,Name,Department,Salary,HireDate,Status
E001,张三,技术部,15000.50,2020-03-15,Active
E002,李四,市场部,,2021-07-20,Active
E003,王五,技术部,18000,2022-01-10,Inactive
E004,,财务部,12000,2023/06/01,Active
E005,赵六,技术部,20000.99,invalid,Active
E006,钱七,,16500,2020-11-30,Active
"@

function Import-AndCleanCsv {
param(
[Parameter(Mandatory)]
[string[]]$CsvLines,

[string]$DefaultDepartment = "未分配",

[decimal]$DefaultSalary = 0
)

# 导入 CSV
$data = $CsvLines | ConvertFrom-Csv

$cleaned = foreach ($row in $data) {
# 清洗姓名:去除空值
$name = if ([string]::IsNullOrWhiteSpace($row.Name)) {
"未知员工($($row.EmployeeID))"
} else {
$row.Name.Trim()
}

# 清洗部门:空值使用默认值
$dept = if ([string]::IsNullOrWhiteSpace($row.Department)) {
$DefaultDepartment
} else {
$row.Department.Trim()
}

# 清洗薪资:非数字使用默认值
$salary = if ($row.Salary -match '^\d+\.?\d*$') {
[decimal]$row.Salary
} else {
$DefaultSalary
}

# 清洗日期:多种格式统一转换
$hireDate = $null
if ($row.HireDate -match '^\d{4}-\d{2}-\d{2}$') {
$hireDate = [datetime]::ParseExact($row.HireDate, "yyyy-MM-dd", $null)
} elseif ($row.HireDate -match '^\d{4}/\d{2}/\d{2}$') {
$hireDate = [datetime]::ParseExact($row.HireDate, "yyyy/MM/dd", $null)
}

# 输出清洗后的对象
[PSCustomObject]@{
EmployeeID = $row.EmployeeID
Name = $name
Department = $dept
Salary = $salary
HireDate = if ($hireDate) { $hireDate.ToString("yyyy-MM-dd") } else { "N/A" }
Status = $row.Status
Cleaned = $true
}
}

return $cleaned
}

$cleanData = Import-AndCleanCsv -CsvLines ($rawCsv -split "`n")
$cleanData | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
EmployeeID Name          Department Salary   HireDate   Status  Cleaned
---------- ---- ---------- ------ -------- ------ -------
E001 张三 技术部 15000.50 2020-03-15 Active True
E002 李四 市场部 0.00 2021-07-20 Active True
E003 王五 技术部 18000.00 2022-01-10 Inactive True
E004 未知员工(E004) 财务部 12000.00 2023-06-01 Active True
E005 赵六 技术部 20000.99 N/A Active True
E006 钱七 未分配 16500.00 2020-11-30 Active True

CSV 转 JSON 与嵌套结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# 场景:将扁平的 CSV 数据转换为嵌套的 JSON 结构供 API 消费

$flatData = @"
ID,Name,Email,DeptID,DeptName,RoleID,RoleName
U001,张三,zhangsan@contoso.com,D001,技术部,R001,开发工程师
U002,李四,lisi@contoso.com,D001,技术部,R002,测试工程师
U003,王五,wangwu@contoso.com,D002,市场部,R003,市场经理
U004,赵六,zhaoliu@contoso.com,D002,市场部,R003,市场经理
"@

function ConvertFrom-FlatToNestedJson {
param(
[Parameter(Mandatory)]
[string[]]$CsvLines
)

$data = $CsvLines | ConvertFrom-Csv

# 按部门分组,构建嵌套结构
$nested = $data | Group-Object -Property DeptID | ForEach-Object {
$deptGroup = $_

# 按角色分组
$roles = $deptGroup.Group | Group-Object -Property RoleID | ForEach-Object {
$roleGroup = $_
$members = $roleGroup.Group | ForEach-Object {
@{
id = $_.ID
name = $_.Name
email = $_.Email
}
}

@{
roleId = $roleGroup.Name
roleName = ($roleGroup.Group | Select-Object -First 1).RoleName
members = @($members)
}
}

@{
departmentId = $deptGroup.Name
departmentName = ($deptGroup.Group | Select-Object -First 1).DeptName
roles = @($roles)
}
}

$result = @{
exportDate = (Get-Date).ToString("yyyy-MM-ddTHH:mm:ss")
totalCount = $data.Count
departments = @($nested)
}

return $result | ConvertTo-Json -Depth 5
}

$jsonOutput = ConvertFrom-FlatToNestedJson -CsvLines ($flatData -split "`n")
Write-Host $jsonOutput

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
{
"exportDate": "2025-08-06T08:30:00",
"totalCount": 4,
"departments": [
{
"departmentId": "D001",
"departmentName": "技术部",
"roles": [
{
"roleId": "R001",
"roleName": "开发工程师",
"members": [
{
"id": "U001",
"name": "张三",
"email": "zhangsan@contoso.com"
}
]
},
{
"roleId": "R002",
"roleName": "测试工程师",
"members": [
{
"id": "U002",
"name": "李四",
"email": "lisi@contoso.com"
}
]
}
]
},
{
"departmentId": "D002",
"departmentName": "市场部",
"roles": [
{
"roleId": "R003",
"roleName": "市场经理",
"members": [
{ "id": "U003", "name": "王五", "email": "wangwu@contoso.com" },
{ "id": "U004", "name": "赵六", "email": "zhaoliu@contoso.com" }
]
}
]
}
]
}

JSON 转 CSV 与字段展开

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# 场景:从 API 获取的嵌套 JSON 需要展开为扁平的 CSV 供报表使用

$apiJson = @"
{
"users": [
{
"id": "U001",
"profile": {
"name": "张三",
"age": 28,
"address": {
"city": "北京",
"district": "海淀区"
}
},
"skills": ["PowerShell", "Python", "Docker"],
"active": true
},
{
"id": "U002",
"profile": {
"name": "李四",
"age": 32,
"address": {
"city": "上海",
"district": "浦东新区"
}
},
"skills": ["C#", "SQL Server"],
"active": false
}
]
}
"@

function Expand-NestedJsonToCsv {
param(
[Parameter(Mandatory)]
[string]$JsonString
)

$data = $JsonString | ConvertFrom-Json

$flatRows = foreach ($user in $data.users) {
[PSCustomObject]@{
ID = $user.id
Name = $user.profile.name
Age = $user.profile.age
City = $user.profile.address.city
District = $user.profile.address.district
Skills = ($user.skills -join "; ")
SkillCount = $user.skills.Count
Active = $user.active
}
}

return $flatRows
}

$flatRows = Expand-NestedJsonToCsv -JsonString $apiJson
$flatRows | Export-Csv -Path "$env:TEMP\users_flat.csv" -NoTypeInformation -Encoding UTF8
Write-Host "已导出到 $env:TEMP\users_flat.csv"
Get-Content "$env:TEMP\users_flat.csv"

执行结果示例:

1
2
3
4
已导出到 C:\Users\admin\AppData\Local\Temp\users_flat.csv
"ID","Name","Age","City","District","Skills","SkillCount","Active"
"U001","张三","28","北京","海淀区","PowerShell; Python; Docker","3","True"
"U002","李四","32","上海","浦东新区","C#; SQL Server","2","False"

增量数据迁移与变更追踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# 场景:定期从源系统迁移数据到目标系统,只迁移新增或变更的记录

function Start-IncrementalMigration {
param(
[Parameter(Mandatory)]
[string]$SourcePath,

[Parameter(Mandatory)]
[string]$TargetPath,

[Parameter(Mandatory)]
[string]$KeyField,

[string]$CompareField = "UpdatedAt"
)

# 加载源数据
$source = Import-Csv -Path $SourcePath -Encoding UTF8
$target = if (Test-Path $TargetPath) {
Import-Csv -Path $TargetPath -Encoding UTF8
} else {
@()
}

# 构建目标数据的索引
$targetIndex = @{}
foreach ($row in $target) {
$targetIndex[$row.$KeyField] = $row
}

$stats = @{
Total = $source.Count
Inserted = 0
Updated = 0
Skipped = 0
}

$result = foreach ($srcRow in $source) {
$key = $srcRow.$KeyField

if ($targetIndex.ContainsKey($key)) {
# 记录已存在,检查是否需要更新
$tgtRow = $targetIndex[$key]
$srcVal = $srcRow.$CompareField
$tgtVal = $tgtRow.$CompareField

if ($srcVal -gt $tgtVal) {
# 源数据更新,覆盖目标
$srcRow | Add-Member -MemberType NoteProperty -Name "_action" -Value "UPDATE" -Force
$stats.Updated++
$srcRow
} else {
# 无变化,跳过
$stats.Skipped++
}
} else {
# 新记录,插入
$srcRow | Add-Member -MemberType NoteProperty -Name "_action" -Value "INSERT" -Force
$stats.Inserted++
$srcRow
}
}

# 合并数据:保留未变更的目标记录 + 更新/新增的记录
$unchanged = $target | Where-Object {
$key = $_.$KeyField
$srcKeys = $source | ForEach-Object { $_.$KeyField }
$key -in $srcKeys -and -not ($result | Where-Object { $_.$KeyField -eq $key })
}

$finalData = @($unchanged) + @($result | Where-Object { $_._action -eq "UPDATE" }) +
@($result | Where-Object { $_._action -eq "INSERT" })

# 移除内部字段后导出
$finalData | ForEach-Object { $_.PSObject.Properties.Remove("_action") }
$finalData | Export-Csv -Path $TargetPath -NoTypeInformation -Encoding UTF8

Write-Host "迁移完成:" -ForegroundColor Green
Write-Host " 总记录数:$($stats.Total)"
Write-Host " 新增:$($stats.Inserted)" -ForegroundColor Cyan
Write-Host " 更新:$($stats.Updated)" -ForegroundColor Yellow
Write-Host " 跳过(无变化):$($stats.Skipped)" -ForegroundColor Gray

return $stats
}

# 使用示例(模拟源数据)
$sourceData = @"
ID,Name,Department,UpdatedAt
E001,张三,技术部,2025-08-01
E002,李四,市场部,2025-08-05
E003,王五,财务部,2025-08-06
"@

$targetData = @"
ID,Name,Department,UpdatedAt
E001,张三,技术部,2025-07-15
E002,李四,市场部,2025-08-05
"@

$sourceData | Set-Content "$env:TEMP\source.csv" -Encoding UTF8
$targetData | Set-Content "$env:TEMP\target.csv" -Encoding UTF8

Start-IncrementalMigration -SourcePath "$env:TEMP\source.csv" `
-TargetPath "$env:TEMP\target.csv" `
-KeyField "ID" `
-CompareField "UpdatedAt"

执行结果示例:

1
2
3
4
5
迁移完成:
总记录数:3
新增:1
更新:1
跳过(无变化):1

多文件合并与格式批量转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# 场景:批量合并多个 CSV 文件,统一格式后输出为 JSON

function Merge-MultipleCsv {
param(
[Parameter(Mandatory)]
[string]$InputFolder,

[string]$Filter = "*.csv",

[string]$OutputPath,

[string[]]$FieldMapping
)

$allData = @()
$files = Get-ChildItem -Path $InputFolder -Filter $Filter -File

Write-Host "找到 $($files.Count) 个文件待合并" -ForegroundColor Cyan

foreach ($file in $files) {
Write-Host " 处理:$($file.Name)" -ForegroundColor Gray

$csv = Import-Csv -Path $file.FullName -Encoding UTF8

foreach ($row in $csv) {
# 如果指定了字段映射,只提取需要的字段
if ($FieldMapping) {
$mappedRow = @{}
foreach ($field in $FieldMapping) {
if ($row.PSObject.Properties[$field]) {
$mappedRow[$field] = $row.$field
}
}
$mappedRow["_sourceFile"] = $file.Name
$allData += [PSCustomObject]$mappedRow
} else {
$row | Add-Member -MemberType NoteProperty -Name "_sourceFile" `
-Value $file.Name -Force
$allData += $row
}
}
}

Write-Host "合并完成,共 $($allData.Count) 条记录" -ForegroundColor Green

# 根据输出路径的扩展名决定格式
if ($OutputPath) {
$ext = [System.IO.Path]::GetExtension($OutputPath).ToLower()
switch ($ext) {
".json" {
$allData | ConvertTo-Json -Depth 3 | Set-Content $OutputPath -Encoding UTF8
}
".csv" {
$allData | Export-Csv -Path $OutputPath -NoTypeInformation -Encoding UTF8
}
".xml" {
$allData | ConvertTo-Xml -NoTypeInformation | Select-Object -ExpandProperty OuterXml |
Set-Content $OutputPath -Encoding UTF8
}
default {
$allData | Export-Csv -Path $OutputPath -NoTypeInformation -Encoding UTF8
}
}
Write-Host "已输出到:$OutputPath" -ForegroundColor Green
}

return $allData
}

# 批量格式转换工具
function Convert-DataFormat {
param(
[Parameter(Mandatory)]
[string]$InputPath,

[Parameter(Mandatory)]
[string]$OutputPath
)

$ext = [System.IO.Path]::GetExtension($InputPath).ToLower()

# 读取数据
$data = switch ($ext) {
".csv" { Import-Csv -Path $InputPath -Encoding UTF8 }
".json" { Get-Content $InputPath -Raw | ConvertFrom-Json }
default { throw "不支持的输入格式:$ext" }
}

# 如果 JSON 顶层是数组,ConvertFrom-Json 已经返回数组
# 如果是单个对象,包装为数组
if ($data -isnot [System.Array]) {
$data = @($data)
}

# 写入目标格式
$outExt = [System.IO.Path]::GetExtension($OutputPath).ToLower()
switch ($outExt) {
".json" {
$data | ConvertTo-Json -Depth 5 | Set-Content $OutputPath -Encoding UTF8
}
".csv" {
$data | Export-Csv -Path $OutputPath -NoTypeInformation -Encoding UTF8
}
default {
throw "不支持的输出格式:$outExt"
}
}

Write-Host "转换完成:$InputPath -> $OutputPath" -ForegroundColor Green
}

# 示例
Convert-DataFormat -InputPath "$env:TEMP\source.csv" -OutputPath "$env:TEMP\source.json"

执行结果示例:

1
2
3
4
5
6
找到 3 个文件待合并
处理:employees_dept_a.csv
处理:employees_dept_b.csv
处理:employees_dept_c.csv
合并完成,共 15 条记录
转换完成:C:\Users\admin\AppData\Local\Temp\source.csv -> C:\Users\admin\AppData\Local\Temp\source.json

注意事项

  1. 编码一致性:读写文件时始终显式指定 -Encoding UTF8,避免中文乱码,尤其在不同 Windows 版本间迁移时
  2. 大文件性能:超过 10 万行的 CSV 不要用 Import-Csv 一次性加载到内存,应逐行流式处理或使用 StreamReader
  3. 日期格式标准化:迁移前统一日期格式(推荐 ISO 8601),避免 2025/08/062025-08-06 混用导致比较失败
  4. 空值策略:明确区分空字符串、$null 和缺失值,迁移前定义好每种情况的处理规则(跳过、默认值、报错)
  5. 事务性保证:关键数据迁移应在导入前备份目标数据,失败时能回滚到迁移前的状态
  6. 字段映射验证:源和目标的字段名、类型、长度可能不同,迁移前应做字段映射表并逐条校验