PowerShell 技能连载 - 数据转换工具集

适用于 PowerShell 7.0 及以上版本

在运维自动化的日常工作中,数据格式转换几乎无处不在。你可能需要把 CSV 报表转成 JSON 传给 API,把 XML 配置文件解析成对象做比较,或者把 API 返回的数据加工成 CSV 交给业务方。这些看似零碎的操作,实际上构成了数据处理的基础链条。

PowerShell 作为一门面向对象的脚本语言,天然擅长处理结构化数据。它的管道机制让格式转换变得流畅——对象在管道中传递,随时可以在不同格式之间切换。配合 ConvertTo-JsonConvertFrom-JsonConvertTo-XmlConvertFrom-Csv 等内置 cmdlet,一条命令就能完成其他语言需要几十行代码才能实现的转换。

本文将围绕三个核心场景展开:基础格式互转、数据清洗与标准化,以及完整的 ETL 管道实战。掌握这些技巧后,你可以用 PowerShell 构建轻量级的数据处理管道,替代许多需要专门 ETL 工具才能完成的任务。

基础格式转换

PowerShell 内置了多种格式转换 cmdlet,可以在 CSV、JSON、XML 之间自由切换。下面的工具函数封装了常见的转换场景,支持链式调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
function Convert-DataFormat {
[CmdletBinding()]
param(
[Parameter(Mandatory, ValueFromPipeline)]
$InputObject,

[Parameter(Mandatory)]
[ValidateSet('Json', 'Csv', 'Xml', 'Yaml', 'HashTable')]
[string]$ToFormat,

[int]$Depth = 10
)

process {
switch ($ToFormat) {
'Json' {
$InputObject | ConvertTo-Json -Depth $Depth -EnumsAsStrings
}
'Csv' {
if ($InputObject -is [string]) {
$InputObject | ConvertFrom-Csv | ConvertTo-Csv -NoTypeInformation
}
else {
$InputObject | ConvertTo-Csv -NoTypeInformation
}
}
'Xml' {
$InputObject | ConvertTo-Xml -NoTypeInformation -As Stream
}
'HashTable' {
if ($InputObject -is [string]) {
$json = $InputObject
}
else {
$json = $InputObject | ConvertTo-Json -Depth $Depth -Compress
}
[System.Text.Json.JsonSerializer]::Deserialize(
$json,
[System.Collections.Generic.Dictionary[string, object]],
(New-Object System.Text.Json.JsonSerializerOptions -Property @{
PropertyNameCaseInsensitive = $true
})
)
}
'Yaml' {
# YAML 没有内置 cmdlet,通过 JSON 中转后手动格式化
$json = if ($InputObject -is [string]) { $InputObject } else {
$InputObject | ConvertTo-Json -Depth $Depth
}
$obj = $json | ConvertFrom-Json
ConvertTo-YamlString -InputObject $obj -Indent 0
}
}
}
}

function ConvertTo-YamlString {
param($InputObject, [int]$Indent = 0)
$space = ' ' * $Indent
$sb = [System.Text.StringBuilder]::new()

if ($InputObject -is [System.Collections.IEnumerable] -and
$InputObject -isnot [string]) {
foreach ($item in $InputObject) {
$null = $sb.Append("${space}- ")
if ($item -is [System.Collections.IDictionary] -or
$item -is [PSCustomObject]) {
$null = $sb.AppendLine()
$null = $sb.Append((ConvertTo-YamlString $item ($Indent + 1)))
}
else {
$null = $sb.AppendLine("'$item'")
}
}
}
elseif ($InputObject -is [System.Collections.IDictionary] -or
$InputObject -is [PSCustomObject]) {
$props = if ($InputObject -is [System.Collections.IDictionary]) {
$InputObject.GetEnumerator()
}
else {
$InputObject.PSObject.Properties
}
foreach ($prop in $props) {
$name = $prop.Key ?? $prop.Name
$val = $prop.Value ?? $prop
$null = $sb.Append("${space}${name}: ")
if ($val -is [System.Collections.IEnumerable] -and
$val -isnot [string]) {
$null = $sb.AppendLine()
$null = $sb.Append((ConvertTo-YamlString $val ($Indent + 1)))
}
elseif ($val -is [System.Collections.IDictionary] -or
$val -is [PSCustomObject]) {
$null = $sb.AppendLine()
$null = $sb.Append((ConvertTo-YamlString $val ($Indent + 1)))
}
else {
$null = $sb.AppendLine("'$val'")
}
}
}
else {
$null = $sb.AppendLine("${space}'$InputObject'")
}

$sb.ToString()
}

# 示例:CSV 转 JSON
$csvData = @'
Name,Department,Salary,StartDate
张三,工程部,25000,2024-03-15
李四,市场部,18000,2023-08-20
王五,工程部,30000,2022-11-01
赵六,人事部,22000,2025-01-10
'@

Write-Host "=== CSV 转 JSON ===" -ForegroundColor Cyan
$jsonOutput = $csvData | ConvertFrom-Csv | Convert-DataFormat -ToFormat Json
$jsonOutput

Write-Host "`n=== JSON 转 HashTable ===" -ForegroundColor Cyan
$jsonOutput | Convert-DataFormat -ToFormat HashTable |
ForEach-Object { $_.GetEnumerator() } |
Format-Table Key, Value -AutoSize

Write-Host "`n=== CSV 转 XML ===" -ForegroundColor Cyan
$xmlOutput = $csvData | ConvertFrom-Csv | Convert-DataFormat -ToFormat Xml
$xmlOutput | Select-Object -First 10

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
=== CSVJSON ===
[
{
"Name": "张三",
"Department": "工程部",
"Salary": "25000",
"StartDate": "2024-03-15"
},
{
"Name": "李四",
"Department": "市场部",
"Salary": "18000",
"StartDate": "2023-08-20"
},
{
"Name": "王五",
"Department": "工程部",
"Salary": "30000",
"StartDate": "2022-11-01"
},
{
"Name": "赵六",
"Department": "人事部",
"Salary": "22000",
"StartDate": "2025-01-10"
}
]

=== JSONHashTable ===
Key Value
--- -----
Name 张三
Department 工程部
Salary 25000
StartDate 2024-03-15

=== CSVXML ===
<?xml version="1.0" encoding="utf-8"?>
<Objects>
<Object Type="System.Management.Automation.PSCustomObject">
<Property Name="Name" Type="System.String">张三</Property>
<Property Name="Department" Type="System.String">工程部</Property>
<Property Name="Salary" Type="System.String">25000</Property>
<Property Name="StartDate" Type="System.String">2024-03-15</Property>
</Object>
...
</Objects>

数据清洗与标准化

原始数据往往不规范——字段名不统一、存在重复记录、类型混杂、空值缺失。在转换之前先做清洗,是保证数据质量的关键步骤。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
function Invoke-DataCleanse {
[CmdletBinding()]
param(
[Parameter(Mandatory, ValueFromPipeline)]
[PSCustomObject]$InputObject,

# 字段映射表:旧名 → 新名
[Parameter(Mandatory)]
[hashtable]$FieldMap,

# 需要转换为 DateTime 的字段
[string[]]$DateFields,

# 需要转换为数值的字段
[string[]]$NumericFields,

# 默认空值填充
$DefaultNullValue = 'N/A'
)

begin {
$seen = [System.Collections.Generic.HashSet[string]]::new()
$duplicateCount = 0
$totalProcessed = 0
}

process {
$totalProcessed++

# 构建去重键(使用所有字段值的拼接)
$keyParts = $InputObject.PSObject.Properties.Value |
Where-Object { $_ } |
ForEach-Object { $_.ToString() }
$dedupKey = $keyParts -join '|'

if (-not $seen.Add($dedupKey)) {
$duplicateCount++
return
}

$result = [ordered]@{}

foreach ($prop in $InputObject.PSObject.Properties) {
$fieldName = $prop.Name
$value = $prop.Value

# 字段名映射
if ($FieldMap.ContainsKey($fieldName)) {
$fieldName = $FieldMap[$fieldName]
}

# 空值处理
if ([string]::IsNullOrWhiteSpace($value)) {
$value = $DefaultNullValue
}
else {
$value = $value.Trim()

# 日期字段转换
if ($DateFields -and $fieldName -in $DateFields) {
if ($value -ne $DefaultNullValue) {
if ([datetime]::TryParse($value, [ref]$parsed)) {
$value = $parsed
}
}
}

# 数值字段转换
if ($NumericFields -and $fieldName -in $NumericFields) {
if ($value -ne $DefaultNullValue) {
$cleaned = $value -replace '[,,]', ''
if ([double]::TryParse($cleaned, [ref]$num)) {
$value = $num
}
}
}
}

$result[$fieldName] = $value
}

[PSCustomObject]$result
}

end {
Write-Verbose "处理完成: 总计 $totalProcessed 条, 去除重复 $duplicateCount 条, 输出 $($totalProcessed - $duplicateCount) 条"
}
}

# 模拟原始脏数据
$rawData = @'
emp_name,dept,salary,start_date,email
张三,工程部,25000,2024-03-15,zhangsan@corp.com
李四,市场部,18000,2023-08-20,lisi@corp.com
张三,工程部,25000,2024-03-15,zhangsan@corp.com
王五,,30000,,
赵六,人事部,22,000,2025-01-10,zhaoliu@corp.com
孙七,工程部,28000,invalid-date,sunqi@corp.com
'@

$fieldMapping = @{
'emp_name' = 'EmployeeName'
'dept' = 'Department'
'salary' = 'AnnualSalary'
'start_date' = 'StartDate'
'email' = 'ContactEmail'
}

$cleaned = $rawData | ConvertFrom-Csv |
Invoke-DataCleanse `
-FieldMap $fieldMapping `
-DateFields 'StartDate' `
-NumericFields 'AnnualSalary' `
-DefaultNullValue '未知' `
-Verbose

Write-Host "=== 清洗结果 ===" -ForegroundColor Cyan
$cleaned | Format-Table -AutoSize

Write-Host "`n=== 数据类型验证 ===" -ForegroundColor Cyan
$cleaned | Get-Member -MemberType NoteProperty |
Select-Object Name, @{
N = 'SampleType'
E = {
$val = $cleaned[0].($_.Name)
if ($null -ne $val) { $val.GetType().Name } else { 'null' }
}
} | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
VERBOSE: 处理完成: 总计 6 条, 去除重复 1 条, 输出 5 条
=== 清洗结果 ===

EmployeeName Department AnnualSalary StartDate ContactEmail
------------ ---------- ------------ --------- ------------
张三 工程部 25000 2024/3/15 0:00:00 zhangsan@corp.com
李四 市场部 18000 2023/8/20 0:00:00 lisi@corp.com
王五 未知 30000 未知 未知
赵六 人事部 22000 2025/1/10 0:00:00 zhaoliu@corp.com
孙七 工程部 28000 未知 sunqi@corp.com

=== 数据类型验证 ===

Name SampleType
---- ----------
EmployeeName String
Department String
AnnualSalary Double
StartDate DateTime
ContactEmail String

ETL 管道实战

下面模拟一个真实场景:从多个数据源(CSV 文件、JSON API 响应、XML 配置)提取数据,统一转换后合并,最终输出为标准报表格式。这种轻量级 ETL 管道在运维报表、配置审计等场景中非常实用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
function Invoke-DataEtlPipeline {
[CmdletBinding()]
param(
[string]$OutputPath = './etl-output',
[switch]$IncludeSummary
)

# ---- Extract 阶段:从多源提取数据 ----

Write-Host "[Extract] 从 CSV 提取服务器资产数据..." -ForegroundColor Yellow
$serverCsv = @'
Hostname,IP,OS,CPU_CORES,RAM_GB,STATUS
WEB-01,192.168.1.10,Ubuntu 22.04,4,16,Running
WEB-02,192.168.1.11,Ubuntu 22.04,4,16,Running
DB-01,192.168.1.20,CentOS 7,8,64,Running
DB-02,192.168.1.21,CentOS 7,8,64,Stopped
CACHE-01,192.168.1.30,Ubuntu 22.04,2,8,Running
'@

$servers = $serverCsv | ConvertFrom-Csv
Write-Host " 提取 $($servers.Count) 条服务器记录"

Write-Host "[Extract] 从 JSON 提取监控指标数据..." -ForegroundColor Yellow
$metricsJson = @'
[
{"host": "WEB-01", "cpu_pct": 45.2, "mem_pct": 62.1, "disk_pct": 33.7, "timestamp": "2026-02-23T08:00:00Z"},
{"host": "WEB-02", "cpu_pct": 12.8, "mem_pct": 45.3, "disk_pct": 28.9, "timestamp": "2026-02-23T08:00:00Z"},
{"host": "DB-01", "cpu_pct": 78.5, "mem_pct": 85.2, "disk_pct": 71.4, "timestamp": "2026-02-23T08:00:00Z"},
{"host": "CACHE-01", "cpu_pct": 23.1, "mem_pct": 91.5, "disk_pct": 15.2, "timestamp": "2026-02-23T08:00:00Z"}
]
'@

$metrics = $metricsJson | ConvertFrom-Json
Write-Host " 提取 $($metrics.Count) 条监控指标"

Write-Host "[Extract] 从 XML 提取告警配置..." -ForegroundColor Yellow
$alertXml = @'
<alerts>
<rule name="cpu_high" threshold="70" severity="critical" target_pattern="DB-*"/>
<rule name="mem_warning" threshold="80" severity="warning" target_pattern="*"/>
<rule name="disk_high" threshold="60" severity="warning" target_pattern="DB-*"/>
</alerts>
'@

[xml]$xmlDoc = $alertXml
$alertRules = $xmlDoc.alerts.rule | ForEach-Object {
[PSCustomObject]@{
RuleName = $_.name
Threshold = [int]$_.threshold
Severity = $_.severity
TargetPattern = $_.target_pattern
}
}
Write-Host " 提取 $($alertRules.Count) 条告警规则"

# ---- Transform 阶段:关联、转换、计算 ----

Write-Host "`n[Transform] 关联服务器资产与监控指标..." -ForegroundColor Green

$enriched = foreach ($srv in $servers) {
$metric = $metrics | Where-Object { $_.host -eq $srv.Hostname }
$triggeredAlerts = @()

foreach ($rule in $alertRules) {
$pattern = $rule.TargetPattern -replace '\*', '.*'
if ($srv.Hostname -match "^$pattern$") {
$violated = $false
$fieldMap = @{
'cpu_high' = 'cpu_pct'
'mem_warning' = 'mem_pct'
'disk_high' = 'disk_pct'
}
$fieldName = $fieldMap[$rule.RuleName]
if ($fieldName -and $metric -and
$metric.$fieldName -gt $rule.Threshold) {
$violated = $true
}
if ($violated) {
$triggeredAlerts += "[{0}] {1} ({2}% > {3}%)" -f
$rule.Severity, $rule.RuleName,
$metric.$fieldName, $rule.Threshold
}
}
}

[PSCustomObject][ordered]@{
Hostname = $srv.Hostname
IPAddress = $srv.IP
OS = $srv.OS
CPU_Cores = [int]$srv.CPU_CORES
RAM_GB = [int]$srv.RAM_GB
Status = $srv.STATUS
CPU_Usage = if ($metric) { $metric.cpu_pct } else { $null }
MEM_Usage = if ($metric) { $metric.mem_pct } else { $null }
DISK_Usage = if ($metric) { $metric.disk_pct } else { $null }
AlertCount = $triggeredAlerts.Count
Alerts = if ($triggeredAlerts) {
$triggeredAlerts -join '; '
} else { '无' }
}
}

# ---- Load 阶段:输出到多种目标格式 ----

$null = New-Item -ItemType Directory -Path $OutputPath -Force -ErrorAction SilentlyContinue

Write-Host "`n[Load] 导出为 CSV 报表..." -ForegroundColor Magenta
$csvFile = Join-Path $OutputPath 'server-report.csv'
$enriched | ConvertTo-Csv -NoTypeInformation | Out-File $csvFile -Encoding utf8
Write-Host " 已写入: $csvFile"

Write-Host "[Load] 导出为 JSON..." -ForegroundColor Magenta
$jsonFile = Join-Path $OutputPath 'server-report.json'
$enriched | ConvertTo-Json -Depth 5 | Out-File $jsonFile -Encoding utf8
Write-Host " 已写入: $jsonFile"

Write-Host "[Load] 导出为 HTML 报表..." -ForegroundColor Magenta
$htmlFile = Join-Path $OutputPath 'server-report.html'
$htmlParams = @{
Title = '服务器状态报表'
Body = '<h1>服务器状态报表</h1>' +
'<p>生成时间: {0}</p>' -f (Get-Date -Format 'yyyy-MM-dd HH:mm:ss')
Head = '<style>table { border-collapse: collapse; } ' +
'td, th { border: 1px solid #ccc; padding: 6px; } ' +
'.critical { color: red; font-weight: bold; }</style>'
}
$enriched | ConvertTo-Html @htmlParams | Out-File $htmlFile -Encoding utf8
Write-Host " 已写入: $htmlFile"

if ($IncludeSummary) {
Write-Host "`n=== ETL 汇总 ===" -ForegroundColor Cyan
Write-Host (" 服务器总数: {0}" -f $enriched.Count)
Write-Host (" 运行中: {0}" -f ($enriched | Where-Object Status -eq 'Running').Count)
Write-Host (" 已停机: {0}" -f ($enriched | Where-Object Status -eq 'Stopped').Count)
Write-Host (" 触发告警的服务器: {0}" -f ($enriched | Where-Object AlertCount -gt 0).Count)
Write-Host (" 平均 CPU 使用率: {0:N1}%" -f (
($enriched | Where-Object CPU_Usage | Measure-Object -Property CPU_Usage -Average).Average
))
Write-Host (" 平均内存使用率: {0:N1}%" -f (
($enriched | Where-Object MEM_Usage | Measure-Object -Property MEM_Usage -Average).Average
))
}

return $enriched
}

# 执行完整 ETL 管道
$result = Invoke-DataEtlPipeline -IncludeSummary

Write-Host "`n=== 最终报表 ===" -ForegroundColor Cyan
$result | Format-Table Hostname, IPAddress, OS, Status,
@{N='CPU%';E={'{0:N1}' -f $_.CPU_Usage}},
@{N='MEM%';E={'{0:N1}' -f $_.MEM_Usage}},
@{N='DISK%';E={'{0:N1}' -f $_.DISK_Usage}},
AlertCount -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
[Extract] 从 CSV 提取服务器资产数据...
提取 5 条服务器记录
[Extract] 从 JSON 提取监控指标数据...
提取 4 条监控指标
[Extract] 从 XML 提取告警配置...
提取 3 条告警规则

[Transform] 关联服务器资产与监控指标...

[Load] 导出为 CSV 报表...
已写入: ./etl-output/server-report.csv
[Load] 导出为 JSON...
已写入: ./etl-output/server-report.json
[Load] 导出为 HTML 报表...
已写入: ./etl-output/server-report.html

=== ETL 汇总 ===
服务器总数: 5
运行中: 4
已停机: 1
触发告警的服务器: 2
平均 CPU 使用率: 39.9%
平均内存使用率: 71.0%

=== 最终报表 ===

Hostname IPAddress OS Status CPU% MEM% DISK% AlertCount
-------- --------- -- ------ ---- ---- ----- ----------
WEB-01 192.168.1.10 Ubuntu 22.04 Running 45.2 62.1 33.7 0
WEB-02 192.168.1.11 Ubuntu 22.04 Running 12.8 45.3 28.9 0
DB-01 192.168.1.20 CentOS 7 Running 78.5 85.2 71.4 3
DB-02 192.168.1.21 CentOS 7 Running 0
CACHE-01 192.168.1.30 Ubuntu 22.04 Running 23.1 91.5 15.2 1

注意事项

  1. ConvertTo-Json 的 Depth 参数:默认 Depth 只有 2,嵌套对象会被截断为 System.Object。处理复杂对象时务必显式指定足够的深度,建议设为 10 以上。

  2. CSV 的类型丢失问题:CSV 格式本质上是纯文本,所有值都会变成字符串。从 CSV 读取后需要手动对日期、数值字段做类型转换,否则后续计算会出错。

  3. 大文件的内存占用ConvertFrom-JsonConvertFrom-Csv 会一次性将全部数据加载到内存。处理数百 MB 以上的文件时,考虑使用流式处理或分批读取,避免内存溢出。

  4. XML 命名空间处理:真实的 XML 文档通常带有命名空间(如 xmlns),直接用 Select-Xml 可能匹配不到节点。需要使用 NamespaceManager 注册前缀,或者用 [xml] 类型加速器的 dot 访问绕过命名空间。

  5. 编码一致性:导出文件时显式指定 -Encoding utf8(PowerShell 7 默认即为 UTF-8,但在 Windows PowerShell 5.1 中默认是 ASCII)。跨平台场景中统一用 UTF-8 可以避免中文乱码。

  6. 管道中的去重性能HashSet 去重在数据量小时效率很高,但当记录达到数十万条时,拼接去重键的字符串操作会成为瓶颈。大数据量场景建议改用基于主键的字典查找。

PowerShell 技能连载 - 对 CSV 执行 SQL 风格查询

适用于 PowerShell 5.1 及以上版本

在日常运维和数据处理工作中,CSV 是最常见的文件格式之一。日志导出、监控报表、资产清单等数据通常都以 CSV 形式存储。虽然 PowerShell 原生的 Import-Csv 配合 Where-ObjectSort-ObjectSelect-Object 可以完成基本的数据筛选,但当查询逻辑复杂时——多表关联、聚合统计、分组排序——原生管道写法既冗长又难以维护。

SQL 作为数据查询的标准语言,其表达力远超管道命令。借助 .NET 内置的 OleDb 或 JDBC 方式,PowerShell 可以直接对 CSV 文件执行 SQL 查询语句,从而用熟悉的 SELECT、JOIN、GROUP BY 语法完成复杂的数据分析任务,性能也比管道过滤高出不少。

本文将介绍三种在 PowerShell 中对 CSV 执行 SQL 查询的方式:OLE DB Provider、ADO.NET 内存表,以及第三方模块。

方式一:使用 OLE DB Provider 查询 CSV

Windows 系统内置的 Microsoft.ACE.OLEDB 提供程序可以直接将 CSV 文件当作数据库表来查询。这种方式性能优秀,适合处理大量数据。

首先创建示例 CSV 文件用于后续演示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 创建示例 CSV:员工信息表
$employeeCsv = @"
Id,Name,Department,Salary,City
1,张三,工程部,25000,北京
2,李四,市场部,18000,上海
3,王五,工程部,30000,北京
4,赵六,人事部,15000,广州
5,钱七,市场部,22000,深圳
6,孙八,工程部,28000,北京
7,周九,人事部,16000,上海
8,吴十,市场部,20000,广州
"@ | Out-String

$employeeCsv.Trim() | Set-Content -Path ".\employees.csv" -Encoding UTF8

# 创建示例 CSV:部门预算表
$budgetCsv = @"
Department,Budget,Year
工程部,500000,2025
市场部,300000,2025
人事部,150000,2025
"@ | Out-String

$budgetCsv.Trim() | Set-Content -Path ".\budgets.csv" -Encoding UTF8

Write-Host "示例 CSV 文件已创建"

执行结果示例:

1
示例 CSV 文件已创建

接下来使用 OLE DB 连接 CSV 文件并执行 SQL 查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
function Invoke-CsvSqlQuery {
param(
[Parameter(Mandatory)]
[string[]]$CsvPath,

[Parameter(Mandatory)]
[string]$Query,

[string]$Delimiter = ","
)

# 获取 CSV 文件所在目录作为数据源
$directory = (Resolve-Path (Split-Path $CsvPath[0] -Parent)).Path

# 构建 OLE DB 连接字符串
$connectionString = "Provider=Microsoft.ACE.OLEDB.12.0;Data Source=$directory;Extended Properties=`"Text;HDR=YES;FMT=Delimited($Delimiter);Charset=UTF8`""

try {
$connection = New-Object System.Data.OleDb.OleDbConnection($connectionString)
$connection.Open()

$command = New-Object System.Data.OleDb.OleDbCommand($Query, $connection)
$adapter = New-Object System.Data.OleDb.OleDbDataAdapter($command)
$dataset = New-Object System.Data.DataSet
$adapter.Fill($dataset) | Out-Null

$dataset.Tables[0]
}
finally {
if ($connection.State -eq 'Open') {
$connection.Close()
}
}
}

# 查询薪资大于 20000 的员工
$results = Invoke-CsvSqlQuery -CsvPath ".\employees.csv" -Query "SELECT Name, Department, Salary FROM employees.csv WHERE Salary > 20000 ORDER BY Salary DESC"
$results | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
Name Department Salary
---- ---------- ------
王五 工程部 30000
孙八 工程部 28000
钱七 市场部 22000

方式二:使用 DataTable 内存查询

当 OLE DB 提供程序不可用时(例如在非 Windows 系统或没有安装 ACE 组件的环境中),可以将 CSV 导入 DataTable 后使用 DataTable.Select() 方法执行 SQL 风格的过滤表达式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
function Invoke-CsvDataTableQuery {
param(
[Parameter(Mandatory)]
[string]$CsvPath,

[Parameter(Mandatory)]
[string]$FilterExpression,

[string[]]$SortColumns
)

# 将 CSV 导入 DataTable
$data = Import-Csv -Path $CsvPath
$dataTable = [System.Data.DataTable]::new("CsvData")

# 根据第一行数据推断列类型
$firstRow = $data[0]
foreach ($property in $firstRow.PSObject.Properties) {
$column = [System.Data.DataColumn]::new($property.Name)
$dataTable.Columns.Add($column)
}

# 填充数据行
foreach ($row in $data) {
$dataRow = $dataTable.NewRow()
foreach ($property in $row.PSObject.Properties) {
$dataRow[$property.Name] = $property.Value
}
$dataTable.Rows.Add($dataRow)
}

# 执行查询
$sortExpression = if ($SortColumns) { $SortColumns -join ", " } else { $null }
$dataTable.Select($FilterExpression, $sortExpression) |
ForEach-Object {
$hashtable = @{}
foreach ($col in $dataTable.Columns.ColumnName) {
$hashtable[$col] = $_[$col]
}
[PSCustomObject]$hashtable
}
}

# 查询北京的工程部员工并按薪资降序排列
$results = Invoke-CsvDataTableQuery -CsvPath ".\employees.csv" -FilterExpression "City = '北京' AND Department = '工程部'" -SortColumns "Salary DESC"
$results | Format-Table -AutoSize

执行结果示例:

Department Salary City
1
2
3
4
---- ---------- ------ ----
王五 工程部 30000 北京
孙八 工程部 28000 北京
张三 工程部 25000 北京

方式三:实现聚合与分组统计

结合 DataTable 方式,我们可以进一步实现 GROUP BY 和聚合函数的效果。以下函数支持对 CSV 数据执行分组统计。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
function Get-CsvAggregate {
param(
[Parameter(Mandatory)]
[string]$CsvPath,

[Parameter(Mandatory)]
[string]$GroupByColumn,

[ValidateSet("Sum", "Average", "Count", "Min", "Max")]
[string]$AggregateFunction = "Count",

[string]$AggregateColumn
)

$data = Import-Csv -Path $CsvPath

# 验证聚合列是否存在
if ($AggregateColumn -and $AggregateFunction -ne "Count") {
$sample = $data[0].PSObject.Properties.Name
if ($AggregateColumn -notin $sample) {
throw "列 '$AggregateColumn' 在 CSV 中不存在。可用列: $($sample -join ', ')"
}
}

# 分组
$groups = $data | Group-Object -Property $GroupByColumn

$results = foreach ($group in $groups) {
$aggregateValue = switch ($AggregateFunction) {
"Count" { $group.Count }
"Sum" { ($group.Group | Measure-Object -Property $AggregateColumn -Sum).Sum }
"Average" { [math]::Round(($group.Group | Measure-Object -Property $AggregateColumn -Average).Average, 2) }
"Min" { ($group.Group | Measure-Object -Property $AggregateColumn -Minimum).Minimum }
"Max" { ($group.Group | Measure-Object -Property $AggregateColumn -Maximum).Maximum }
}

[PSCustomObject]@{
$GroupByColumn = $group.Name
"$AggregateFunction`($AggregateColumn)" = $aggregateValue
RecordCount = $group.Count
}
}

$results
}

# 按部门统计平均薪资
Write-Host "`n=== 按部门统计平均薪资 ==="
Get-CsvAggregate -CsvPath ".\employees.csv" -GroupByColumn Department -AggregateFunction Average -AggregateColumn Salary |
Format-Table -AutoSize

# 按城市统计人数
Write-Host "`n=== 按城市统计人数 ==="
Get-CsvAggregate -CsvPath ".\employees.csv" -GroupByColumn City -AggregateFunction Count |
Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
=== 按部门统计平均薪资 ===

Department Average(Salary) RecordCount
---------- --------------- -----------
工程部 27666.67 3
市场部 20000 3
人事部 15500 2

=== 按城市统计人数 ===

City Count() RecordCount
---- ------- -----------
北京 3 3
上海 2 2
广州 2 2
深圳 1 1

方式四:模拟 JOIN 关联查询

将两个 CSV 文件关联起来是常见需求。以下函数模拟了 SQL 的 INNER JOIN 操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
function Join-CsvData {
param(
[Parameter(Mandatory)]
[string]$LeftCsvPath,

[Parameter(Mandatory)]
[string]$RightCsvPath,

[Parameter(Mandatory)]
[string]$LeftJoinColumn,

[Parameter(Mandatory)]
[string]$RightJoinColumn,

[ValidateSet("Inner", "Left")]
[string]$JoinType = "Inner"
)

$leftData = Import-Csv -Path $LeftCsvPath
$rightData = Import-Csv -Path $RightCsvPath

# 构建右侧数据的哈希表索引
$rightIndex = @{}
foreach ($rightRow in $rightData) {
$key = $rightRow.$RightJoinColumn
if (-not $rightIndex.ContainsKey($key)) {
$rightIndex[$key] = @()
}
$rightIndex[$key] += $rightRow
}

# 获取列名集合
$leftColumns = $leftData[0].PSObject.Properties.Name
$rightColumns = $rightData[0].PSObject.Properties.Name | Where-Object { $_ -ne $RightJoinColumn }

$results = foreach ($leftRow in $leftData) {
$key = $leftRow.$LeftJoinColumn
$matched = $rightIndex[$key]

if ($matched) {
foreach ($rightRow in $matched) {
$row = [ordered]@{}
foreach ($col in $leftColumns) {
$row[$col] = $leftRow.$col
}
foreach ($col in $rightColumns) {
$row[$col] = $rightRow.$col
}
[PSCustomObject]$row
}
}
elseif ($JoinType -eq "Left") {
$row = [ordered]@{}
foreach ($col in $leftColumns) {
$row[$col] = $leftRow.$col
}
foreach ($col in $rightColumns) {
$row[$col] = $null
}
[PSCustomObject]$row
}
}

$results
}

# 关联员工表和部门预算表
Write-Host "`n=== 员工与部门预算关联 ==="
Join-CsvData -LeftCsvPath ".\employees.csv" -RightCsvPath ".\budgets.csv" -LeftJoinColumn Department -RightJoinColumn Department |
Format-Table Name, Department, Salary, Budget -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
=== 员工与部门预算关联 ===

Name Department Salary Budget
---- ---------- ------ ------
张三 工程部 25000 500000
李四 市场部 18000 300000
王五 工程部 30000 500000
赵六 人事部 15000 150000
钱七 市场部 22000 300000
孙八 工程部 28000 500000
周九 人事部 16000 150000
吴十 市场部 20000 300000

实战场景:分析 IIS 日志

以下是一个实际运维场景:将 IIS 日志转换为 CSV 后执行 SQL 查询,快速统计访问热点和异常状态码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
function Get-IisLogStatistics {
param(
[Parameter(Mandatory)]
[string]$LogFilePath,

[int]$TopN = 10
)

# 解析 IIS 日志(跳过注释行)
$logData = Get-Content -Path $LogFilePath |
Where-Object { $_ -notlike "#*" -and $_.Trim() -ne "" } |
ConvertFrom-Csv -Delimiter " "

Write-Host "`n=== 状态码分布 ==="
$logData | Group-Object -Property "sc-status" |
Sort-Object Count -Descending |
Select-Object Count, Name -First 5 |
Format-Table @{Label="次数"; Expression={$_.Count}}, @{Label="状态码"; Expression={$_.Name}} -AutoSize

Write-Host "`n=== 访问量前 $TopN 的 URI ==="
$logData | Group-Object -Property "cs-uri-stem" |
Sort-Object Count -Descending |
Select-Object Count, Name -First $TopN |
Format-Table @{Label="次数"; Expression={$_.Count}}, @{Label="URI"; Expression={$_.Name}} -AutoSize

Write-Host "`n=== 5xx 错误详情 ==="
$logData | Where-Object { $_."sc-status" -like "5*" } |
Select-Object -First $TopN |
Format-Table date, time, "c-ip", "cs-method", "cs-uri-stem", "sc-status" -AutoSize
}

# 模拟 IIS 日志进行分析(实际使用时传入真实日志路径)
$logContent = @"
date time c-ip cs-method cs-uri-stem sc-status time-taken
2025-08-11 08:01:10 192.168.1.100 GET /api/users 200 45
2025-08-11 08:01:11 192.168.1.101 GET /api/orders 500 1200
2025-08-11 08:01:12 192.168.1.100 GET /api/users 200 38
2025-08-11 08:01:13 10.0.0.50 POST /api/login 401 15
2025-08-11 08:01:14 192.168.1.102 GET /api/products 200 92
2025-08-11 08:01:15 192.168.1.101 GET /api/orders 503 3500
"@

$logContent | Set-Content -Path ".\iis-test.log" -Encoding UTF8
Get-IisLogStatistics -LogFilePath ".\iis-test.log" -TopN 5

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
=== 状态码分布 ===
次数 状态码
---- ------
3 200
2 500
1 401

=== 访问量前 5 的 URI ===
次数 URI
---- ---
2 /api/users
2 /api/orders
1 /api/login
1 /api/products

=== 5xx 错误详情 ===
date time c-ip cs-method cs-uri-stem sc-status time-taken
---- ---- ---- --------- ----------- --------- ----------
2025-08-11 08:01:11 192.168.1.101 GET /api/orders 500 1200
2025-08-11 08:01:15 192.168.1.101 GET /api/orders 503 3500

注意事项

  1. OLE DB 提供程序依赖:Microsoft.ACE.OLEDB.12.0 需要在 Windows 上安装 Microsoft Access Database Engine,64 位 PowerShell 需要安装 64 位版本的引擎。如果系统没有安装,可以使用 DataTable 方式作为替代方案。

  2. CSV 文件编码问题:OLE DB Provider 对 CSV 的编码支持有限,包含中文的 CSV 文件建议使用 UTF-8 with BOM 编码,否则可能出现乱码。可以在 schema.ini 文件中指定字符集。

  3. 内存消耗:DataTable 方式将整个 CSV 加载到内存,处理超大文件(数百 MB 以上)时可能导致内存不足。对于大数据量场景,建议使用流式处理或分批加载。

  4. 列类型推断:CSV 是纯文本格式,所有值都是字符串。进行数值比较或聚合计算时,需要确保数据能正确转换为数值类型,否则 SUM、AVG 等操作会失败。

  5. SQL 注入风险:如果 SQL 查询语句中包含用户输入的内容,务必进行参数化处理或严格的输入验证,防止 SQL 注入攻击。尤其是在将脚本发布为共享工具时。

  6. 性能对比:对于简单过滤(单列、单条件),PowerShell 原生管道 (Where-Object) 的性能足够;对于复杂查询(多表关联、聚合统计),SQL 方式在代码可读性和执行效率上都有明显优势。根据实际场景选择合适的方式。

PowerShell 技能连载 - 数据迁移与转换技巧

适用于 PowerShell 5.1 及以上版本

在 IT 运维和开发工作中,数据迁移和格式转换是高频需求。系统升级时需要将用户数据从旧格式迁移到新格式,报表汇总时需要合并多个 CSV 文件并转换字段,跨系统对接时需要在 JSON、CSV、XML、YAML 之间来回转换。这些任务看似简单,但涉及编码问题、类型映射、空值处理、增量同步等细节时往往让人头疼。

PowerShell 内置了 ConvertTo-CsvConvertFrom-JsonConvertTo-Xml 等丰富的转换 cmdlet,配合管道操作可以快速构建 ETL(Extract-Transform-Load)流程。本文将从实际场景出发,讲解数据迁移和转换中的常用技巧。

CSV 数据读取与清洗

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# 场景:从遗留系统导出的 CSV 数据质量差,需要清洗后再导入新系统

$rawCsv = @"
EmployeeID,Name,Department,Salary,HireDate,Status
E001,张三,技术部,15000.50,2020-03-15,Active
E002,李四,市场部,,2021-07-20,Active
E003,王五,技术部,18000,2022-01-10,Inactive
E004,,财务部,12000,2023/06/01,Active
E005,赵六,技术部,20000.99,invalid,Active
E006,钱七,,16500,2020-11-30,Active
"@

function Import-AndCleanCsv {
param(
[Parameter(Mandatory)]
[string[]]$CsvLines,

[string]$DefaultDepartment = "未分配",

[decimal]$DefaultSalary = 0
)

# 导入 CSV
$data = $CsvLines | ConvertFrom-Csv

$cleaned = foreach ($row in $data) {
# 清洗姓名:去除空值
$name = if ([string]::IsNullOrWhiteSpace($row.Name)) {
"未知员工($($row.EmployeeID))"
} else {
$row.Name.Trim()
}

# 清洗部门:空值使用默认值
$dept = if ([string]::IsNullOrWhiteSpace($row.Department)) {
$DefaultDepartment
} else {
$row.Department.Trim()
}

# 清洗薪资:非数字使用默认值
$salary = if ($row.Salary -match '^\d+\.?\d*$') {
[decimal]$row.Salary
} else {
$DefaultSalary
}

# 清洗日期:多种格式统一转换
$hireDate = $null
if ($row.HireDate -match '^\d{4}-\d{2}-\d{2}$') {
$hireDate = [datetime]::ParseExact($row.HireDate, "yyyy-MM-dd", $null)
} elseif ($row.HireDate -match '^\d{4}/\d{2}/\d{2}$') {
$hireDate = [datetime]::ParseExact($row.HireDate, "yyyy/MM/dd", $null)
}

# 输出清洗后的对象
[PSCustomObject]@{
EmployeeID = $row.EmployeeID
Name = $name
Department = $dept
Salary = $salary
HireDate = if ($hireDate) { $hireDate.ToString("yyyy-MM-dd") } else { "N/A" }
Status = $row.Status
Cleaned = $true
}
}

return $cleaned
}

$cleanData = Import-AndCleanCsv -CsvLines ($rawCsv -split "`n")
$cleanData | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
EmployeeID Name          Department Salary   HireDate   Status  Cleaned
---------- ---- ---------- ------ -------- ------ -------
E001 张三 技术部 15000.50 2020-03-15 Active True
E002 李四 市场部 0.00 2021-07-20 Active True
E003 王五 技术部 18000.00 2022-01-10 Inactive True
E004 未知员工(E004) 财务部 12000.00 2023-06-01 Active True
E005 赵六 技术部 20000.99 N/A Active True
E006 钱七 未分配 16500.00 2020-11-30 Active True

CSV 转 JSON 与嵌套结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# 场景:将扁平的 CSV 数据转换为嵌套的 JSON 结构供 API 消费

$flatData = @"
ID,Name,Email,DeptID,DeptName,RoleID,RoleName
U001,张三,zhangsan@contoso.com,D001,技术部,R001,开发工程师
U002,李四,lisi@contoso.com,D001,技术部,R002,测试工程师
U003,王五,wangwu@contoso.com,D002,市场部,R003,市场经理
U004,赵六,zhaoliu@contoso.com,D002,市场部,R003,市场经理
"@

function ConvertFrom-FlatToNestedJson {
param(
[Parameter(Mandatory)]
[string[]]$CsvLines
)

$data = $CsvLines | ConvertFrom-Csv

# 按部门分组,构建嵌套结构
$nested = $data | Group-Object -Property DeptID | ForEach-Object {
$deptGroup = $_

# 按角色分组
$roles = $deptGroup.Group | Group-Object -Property RoleID | ForEach-Object {
$roleGroup = $_
$members = $roleGroup.Group | ForEach-Object {
@{
id = $_.ID
name = $_.Name
email = $_.Email
}
}

@{
roleId = $roleGroup.Name
roleName = ($roleGroup.Group | Select-Object -First 1).RoleName
members = @($members)
}
}

@{
departmentId = $deptGroup.Name
departmentName = ($deptGroup.Group | Select-Object -First 1).DeptName
roles = @($roles)
}
}

$result = @{
exportDate = (Get-Date).ToString("yyyy-MM-ddTHH:mm:ss")
totalCount = $data.Count
departments = @($nested)
}

return $result | ConvertTo-Json -Depth 5
}

$jsonOutput = ConvertFrom-FlatToNestedJson -CsvLines ($flatData -split "`n")
Write-Host $jsonOutput

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
{
"exportDate": "2025-08-06T08:30:00",
"totalCount": 4,
"departments": [
{
"departmentId": "D001",
"departmentName": "技术部",
"roles": [
{
"roleId": "R001",
"roleName": "开发工程师",
"members": [
{
"id": "U001",
"name": "张三",
"email": "zhangsan@contoso.com"
}
]
},
{
"roleId": "R002",
"roleName": "测试工程师",
"members": [
{
"id": "U002",
"name": "李四",
"email": "lisi@contoso.com"
}
]
}
]
},
{
"departmentId": "D002",
"departmentName": "市场部",
"roles": [
{
"roleId": "R003",
"roleName": "市场经理",
"members": [
{ "id": "U003", "name": "王五", "email": "wangwu@contoso.com" },
{ "id": "U004", "name": "赵六", "email": "zhaoliu@contoso.com" }
]
}
]
}
]
}

JSON 转 CSV 与字段展开

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# 场景:从 API 获取的嵌套 JSON 需要展开为扁平的 CSV 供报表使用

$apiJson = @"
{
"users": [
{
"id": "U001",
"profile": {
"name": "张三",
"age": 28,
"address": {
"city": "北京",
"district": "海淀区"
}
},
"skills": ["PowerShell", "Python", "Docker"],
"active": true
},
{
"id": "U002",
"profile": {
"name": "李四",
"age": 32,
"address": {
"city": "上海",
"district": "浦东新区"
}
},
"skills": ["C#", "SQL Server"],
"active": false
}
]
}
"@

function Expand-NestedJsonToCsv {
param(
[Parameter(Mandatory)]
[string]$JsonString
)

$data = $JsonString | ConvertFrom-Json

$flatRows = foreach ($user in $data.users) {
[PSCustomObject]@{
ID = $user.id
Name = $user.profile.name
Age = $user.profile.age
City = $user.profile.address.city
District = $user.profile.address.district
Skills = ($user.skills -join "; ")
SkillCount = $user.skills.Count
Active = $user.active
}
}

return $flatRows
}

$flatRows = Expand-NestedJsonToCsv -JsonString $apiJson
$flatRows | Export-Csv -Path "$env:TEMP\users_flat.csv" -NoTypeInformation -Encoding UTF8
Write-Host "已导出到 $env:TEMP\users_flat.csv"
Get-Content "$env:TEMP\users_flat.csv"

执行结果示例:

1
2
3
4
已导出到 C:\Users\admin\AppData\Local\Temp\users_flat.csv
"ID","Name","Age","City","District","Skills","SkillCount","Active"
"U001","张三","28","北京","海淀区","PowerShell; Python; Docker","3","True"
"U002","李四","32","上海","浦东新区","C#; SQL Server","2","False"

增量数据迁移与变更追踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# 场景:定期从源系统迁移数据到目标系统,只迁移新增或变更的记录

function Start-IncrementalMigration {
param(
[Parameter(Mandatory)]
[string]$SourcePath,

[Parameter(Mandatory)]
[string]$TargetPath,

[Parameter(Mandatory)]
[string]$KeyField,

[string]$CompareField = "UpdatedAt"
)

# 加载源数据
$source = Import-Csv -Path $SourcePath -Encoding UTF8
$target = if (Test-Path $TargetPath) {
Import-Csv -Path $TargetPath -Encoding UTF8
} else {
@()
}

# 构建目标数据的索引
$targetIndex = @{}
foreach ($row in $target) {
$targetIndex[$row.$KeyField] = $row
}

$stats = @{
Total = $source.Count
Inserted = 0
Updated = 0
Skipped = 0
}

$result = foreach ($srcRow in $source) {
$key = $srcRow.$KeyField

if ($targetIndex.ContainsKey($key)) {
# 记录已存在,检查是否需要更新
$tgtRow = $targetIndex[$key]
$srcVal = $srcRow.$CompareField
$tgtVal = $tgtRow.$CompareField

if ($srcVal -gt $tgtVal) {
# 源数据更新,覆盖目标
$srcRow | Add-Member -MemberType NoteProperty -Name "_action" -Value "UPDATE" -Force
$stats.Updated++
$srcRow
} else {
# 无变化,跳过
$stats.Skipped++
}
} else {
# 新记录,插入
$srcRow | Add-Member -MemberType NoteProperty -Name "_action" -Value "INSERT" -Force
$stats.Inserted++
$srcRow
}
}

# 合并数据:保留未变更的目标记录 + 更新/新增的记录
$unchanged = $target | Where-Object {
$key = $_.$KeyField
$srcKeys = $source | ForEach-Object { $_.$KeyField }
$key -in $srcKeys -and -not ($result | Where-Object { $_.$KeyField -eq $key })
}

$finalData = @($unchanged) + @($result | Where-Object { $_._action -eq "UPDATE" }) +
@($result | Where-Object { $_._action -eq "INSERT" })

# 移除内部字段后导出
$finalData | ForEach-Object { $_.PSObject.Properties.Remove("_action") }
$finalData | Export-Csv -Path $TargetPath -NoTypeInformation -Encoding UTF8

Write-Host "迁移完成:" -ForegroundColor Green
Write-Host " 总记录数:$($stats.Total)"
Write-Host " 新增:$($stats.Inserted)" -ForegroundColor Cyan
Write-Host " 更新:$($stats.Updated)" -ForegroundColor Yellow
Write-Host " 跳过(无变化):$($stats.Skipped)" -ForegroundColor Gray

return $stats
}

# 使用示例(模拟源数据)
$sourceData = @"
ID,Name,Department,UpdatedAt
E001,张三,技术部,2025-08-01
E002,李四,市场部,2025-08-05
E003,王五,财务部,2025-08-06
"@

$targetData = @"
ID,Name,Department,UpdatedAt
E001,张三,技术部,2025-07-15
E002,李四,市场部,2025-08-05
"@

$sourceData | Set-Content "$env:TEMP\source.csv" -Encoding UTF8
$targetData | Set-Content "$env:TEMP\target.csv" -Encoding UTF8

Start-IncrementalMigration -SourcePath "$env:TEMP\source.csv" `
-TargetPath "$env:TEMP\target.csv" `
-KeyField "ID" `
-CompareField "UpdatedAt"

执行结果示例:

1
2
3
4
5
迁移完成:
总记录数:3
新增:1
更新:1
跳过(无变化):1

多文件合并与格式批量转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# 场景:批量合并多个 CSV 文件,统一格式后输出为 JSON

function Merge-MultipleCsv {
param(
[Parameter(Mandatory)]
[string]$InputFolder,

[string]$Filter = "*.csv",

[string]$OutputPath,

[string[]]$FieldMapping
)

$allData = @()
$files = Get-ChildItem -Path $InputFolder -Filter $Filter -File

Write-Host "找到 $($files.Count) 个文件待合并" -ForegroundColor Cyan

foreach ($file in $files) {
Write-Host " 处理:$($file.Name)" -ForegroundColor Gray

$csv = Import-Csv -Path $file.FullName -Encoding UTF8

foreach ($row in $csv) {
# 如果指定了字段映射,只提取需要的字段
if ($FieldMapping) {
$mappedRow = @{}
foreach ($field in $FieldMapping) {
if ($row.PSObject.Properties[$field]) {
$mappedRow[$field] = $row.$field
}
}
$mappedRow["_sourceFile"] = $file.Name
$allData += [PSCustomObject]$mappedRow
} else {
$row | Add-Member -MemberType NoteProperty -Name "_sourceFile" `
-Value $file.Name -Force
$allData += $row
}
}
}

Write-Host "合并完成,共 $($allData.Count) 条记录" -ForegroundColor Green

# 根据输出路径的扩展名决定格式
if ($OutputPath) {
$ext = [System.IO.Path]::GetExtension($OutputPath).ToLower()
switch ($ext) {
".json" {
$allData | ConvertTo-Json -Depth 3 | Set-Content $OutputPath -Encoding UTF8
}
".csv" {
$allData | Export-Csv -Path $OutputPath -NoTypeInformation -Encoding UTF8
}
".xml" {
$allData | ConvertTo-Xml -NoTypeInformation | Select-Object -ExpandProperty OuterXml |
Set-Content $OutputPath -Encoding UTF8
}
default {
$allData | Export-Csv -Path $OutputPath -NoTypeInformation -Encoding UTF8
}
}
Write-Host "已输出到:$OutputPath" -ForegroundColor Green
}

return $allData
}

# 批量格式转换工具
function Convert-DataFormat {
param(
[Parameter(Mandatory)]
[string]$InputPath,

[Parameter(Mandatory)]
[string]$OutputPath
)

$ext = [System.IO.Path]::GetExtension($InputPath).ToLower()

# 读取数据
$data = switch ($ext) {
".csv" { Import-Csv -Path $InputPath -Encoding UTF8 }
".json" { Get-Content $InputPath -Raw | ConvertFrom-Json }
default { throw "不支持的输入格式:$ext" }
}

# 如果 JSON 顶层是数组,ConvertFrom-Json 已经返回数组
# 如果是单个对象,包装为数组
if ($data -isnot [System.Array]) {
$data = @($data)
}

# 写入目标格式
$outExt = [System.IO.Path]::GetExtension($OutputPath).ToLower()
switch ($outExt) {
".json" {
$data | ConvertTo-Json -Depth 5 | Set-Content $OutputPath -Encoding UTF8
}
".csv" {
$data | Export-Csv -Path $OutputPath -NoTypeInformation -Encoding UTF8
}
default {
throw "不支持的输出格式:$outExt"
}
}

Write-Host "转换完成:$InputPath -> $OutputPath" -ForegroundColor Green
}

# 示例
Convert-DataFormat -InputPath "$env:TEMP\source.csv" -OutputPath "$env:TEMP\source.json"

执行结果示例:

1
2
3
4
5
6
找到 3 个文件待合并
处理:employees_dept_a.csv
处理:employees_dept_b.csv
处理:employees_dept_c.csv
合并完成,共 15 条记录
转换完成:C:\Users\admin\AppData\Local\Temp\source.csv -> C:\Users\admin\AppData\Local\Temp\source.json

注意事项

  1. 编码一致性:读写文件时始终显式指定 -Encoding UTF8,避免中文乱码,尤其在不同 Windows 版本间迁移时
  2. 大文件性能:超过 10 万行的 CSV 不要用 Import-Csv 一次性加载到内存,应逐行流式处理或使用 StreamReader
  3. 日期格式标准化:迁移前统一日期格式(推荐 ISO 8601),避免 2025/08/062025-08-06 混用导致比较失败
  4. 空值策略:明确区分空字符串、$null 和缺失值,迁移前定义好每种情况的处理规则(跳过、默认值、报错)
  5. 事务性保证:关键数据迁移应在导入前备份目标数据,失败时能回滚到迁移前的状态
  6. 字段映射验证:源和目标的字段名、类型、长度可能不同,迁移前应做字段映射表并逐条校验

PowerShell 技能连载 - CSV 高级处理

适用于 PowerShell 5.1 及以上版本

CSV(逗号分隔值)是运维中最常见的数据交换格式——导出用户清单、导入配置数据、处理监控报表、批量操作清单。虽然 PowerShell 的 Import-CsvExport-Csv 命令简单易用,但面对大数据量、复杂转换、编码问题、多文件合并等场景时,需要掌握更多技巧。

本文将讲解 CSV 处理的高级技巧和实用场景。

CSV 读写基础

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 创建测试数据
$employees = @(
[PSCustomObject]@{ Name = "张三"; Department = "工程部"; Salary = 25000; JoinDate = "2023-03-15" }
[PSCustomObject]@{ Name = "李四"; Department = "市场部"; Salary = 20000; JoinDate = "2023-06-01" }
[PSCustomObject]@{ Name = "王五"; Department = "工程部"; Salary = 30000; JoinDate = "2022-01-10" }
[PSCustomObject]@{ Name = "赵六"; Department = "人事部"; Salary = 22000; JoinDate = "2024-02-20" }
[PSCustomObject]@{ Name = "孙七"; Department = "工程部"; Salary = 28000; JoinDate = "2023-09-05" }
)

$employees | Export-Csv -Path "C:\Data\employees.csv" -NoTypeInformation -Encoding UTF8
Write-Host "已导出 CSV" -ForegroundColor Green

# 读取 CSV
$data = Import-Csv -Path "C:\Data\employees.csv" -Encoding UTF8
Write-Host "读取 $($data.Count) 条记录" -ForegroundColor Cyan

# CSV 数据天然就是对象集合,可以直接操作
$data | Where-Object { $_.Department -eq "工程部" } |
Sort-Object Salary -Descending |
Format-Table Name, Salary -AutoSize

# 使用 -Delimiter 处理非逗号分隔符
# Import-Csv -Path "data.tsv" -Delimiter "`t"

# 指定列名(文件没有标题行时)
# Import-Csv -Path "data.csv" -Header "Name","Dept","Salary" | Select-Object -Skip 1

执行结果示例:

1
2
3
4
5
6
7
已导出 CSV
读取 5 条记录
Name Salary
---- ------
王五 30000
孙七 28000
张三 25000

数据转换与计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 读取并转换数据
$data = Import-Csv "C:\Data\employees.csv"

# 添加计算列
$enhanced = $data | Select-Object *, @{
N = 'SalaryK'
E = { [math]::Round([int]$_.Salary / 1000, 1) }
}, @{
N = 'YearsOfService'
E = { [math]::Round(((Get-Date) - [datetime]$_.JoinDate).TotalDays / 365, 1) }
}, @{
N = 'AnnualSalary'
E = { [int]$_.Salary * 12 }
}

$enhanced | Format-Table Name, Department, SalaryK, YearsOfService, AnnualSalary -AutoSize

# 分组统计
$deptStats = $data | Group-Object Department | ForEach-Object {
$avgSalary = [math]::Round(($_.Group | ForEach-Object { [int]$_.Salary } | Measure-Object -Average).Average)
$maxSalary = ($_.Group | ForEach-Object { [int]$_.Salary } | Measure-Object -Maximum).Maximum

[PSCustomObject]@{
Department = $_.Name
Count = $_.Count
AvgSalary = $avgSalary
MaxSalary = $maxSalary
}
}

Write-Host "`n部门统计:" -ForegroundColor Cyan
$deptStats | Format-Table -AutoSize

# 数据透视
$pivot = $data | Group-Object Department | ForEach-Object {
$names = ($_.Group | Select-Object -ExpandProperty Name) -join ', '
[PSCustomObject]@{
Department = $_.Name
Count = $_.Count
Members = $names
}
}
$pivot | Format-Table -AutoSize

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Name Department SalaryK YearsOfService AnnualSalary
---- ---------- ------- -------------- ------------
张三 工程部 25.0 2.3 300000
李四 市场部 20.0 2.1 240000
王五 工程部 30.0 3.5 360000
赵六 人事部 22.0 1.4 264000
孙七 工程部 28.0 1.9 336000

部门统计:
Department Count AvgSalary MaxSalary
---------- ----- ---------- ---------
工程部 3 27666 30000
市场部 1 20000 20000
人事部 1 22000 22000

Department Count Members
---------- ----- -------
工程部 3 张三, 王五, 孙七
市场部 1 李四
人事部 1 赵六

大文件高效处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# 流式处理大 CSV(不一次性加载到内存)
function Process-LargeCsv {
param(
[Parameter(Mandatory)]
[string]$Path,

[scriptblock]$ProcessBlock,

[int]$ReportInterval = 10000
)

$reader = [System.IO.StreamReader]::new($Path, [System.Text.Encoding]::UTF8)
$header = $reader.ReadLine()

if (-not $header) {
$reader.Close()
return
}

$columns = $header -split ','
$count = 0
$output = @()

while ($null -ne ($line = $reader.ReadLine())) {
$count++
$values = $line -split ','
$obj = [ordered]@{}
for ($i = 0; $i -lt $columns.Count; $i++) {
$obj[$columns[$i].Trim('"')] = if ($i -lt $values.Count) { $values[$i].Trim('"') } else { "" }
}

$result = & $ProcessBlock ([PSCustomObject]$obj)
if ($result) { $output += $result }

if ($count % $ReportInterval -eq 0) {
Write-Host " 已处理 $count 行..." -ForegroundColor DarkGray
}
}

$reader.Close()
Write-Host "处理完成:$count 行" -ForegroundColor Green
return $output
}

# 使用流式处理筛选数据
$filtered = Process-LargeCsv -Path "C:\Data\large-dataset.csv" -ProcessBlock {
param($row)
if ([int]$row.Amount -gt 10000) {
$row
}
} -ReportInterval 50000

Write-Host "筛选结果:$($filtered.Count) 条"

# 使用 StreamWriter 高效输出 CSV
function Export-CsvFast {
param(
[Parameter(Mandatory)]
[object[]]$Data,

[Parameter(Mandatory)]
[string]$Path
)

$writer = [System.IO.StreamWriter]::new($Path, $false, [System.Text.Encoding]::UTF8)

try {
if ($Data.Count -gt 0) {
$properties = $Data[0].PSObject.Properties.Name
$writer.WriteLine(($properties | ForEach-Object { "`"$_`"" }) -join ',')

foreach ($item in $Data) {
$values = foreach ($prop in $properties) {
$val = $item.$prop
if ($null -eq $val) { '""' }
elseif ($val -match '[,""\r\n]') { "`"$($val -replace '"', '""')`"" }
else { "`"$val`"" }
}
$writer.WriteLine(($values -join ','))
}
}
} finally {
$writer.Close()
}
}

执行结果示例:

1
2
3
4
5
  已处理 10000 行...
已处理 20000 行...
已处理 30000 行...
处理完成:32500
筛选结果:1250

多文件合并与对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# 合并多个 CSV 文件
function Merge-CsvFiles {
param(
[Parameter(Mandatory)]
[string[]]$Paths,

[string]$OutputPath
)

$allData = @()
foreach ($path in $Paths) {
if (Test-Path $path) {
$data = Import-Csv $path -Encoding UTF8
$source = [System.IO.Path]::GetFileName($path)
$data | ForEach-Object {
$_ | Add-Member -NotePropertyName "Source" -NotePropertyValue $source -Force
}
$allData += $data
Write-Host "已加载:$source ($($data.Count) 行)" -ForegroundColor Green
}
}

$allData | Export-Csv $OutputPath -NoTypeInformation -Encoding UTF8
Write-Host "合并完成:$($allData.Count) 行 => $OutputPath" -ForegroundColor Cyan
}

# 按日期范围合并
$dateFiles = 1..5 | ForEach-Object {
"C:\Data\report-$(Get-Date).AddDays(-$_).ToString('yyyyMMdd').csv"
}
Merge-CsvFiles -Paths $dateFiles -OutputPath "C:\Data\report-weekly.csv"

# CSV 对比
function Compare-CsvData {
param(
[Parameter(Mandatory)][string]$ReferenceFile,
[Parameter(Mandatory)][string]$DifferenceFile,
[Parameter(Mandatory)][string]$KeyColumn
)

$ref = Import-Csv $ReferenceFile -Encoding UTF8
$diff = Import-Csv $DifferenceFile -Encoding UTF8

$refKeys = $ref | Select-Object -ExpandProperty $KeyColumn
$diffKeys = $diff | Select-Object -ExpandProperty $KeyColumn

$added = $diffKeys | Where-Object { $_ -notin $refKeys }
$removed = $refKeys | Where-Object { $_ -notin $diffKeys }
$common = $refKeys | Where-Object { $_ -in $diffKeys }

Write-Host "CSV 对比结果:" -ForegroundColor Cyan
Write-Host " 新增:$($added.Count) 条" -ForegroundColor Green
Write-Host " 删除:$($removed.Count) 条" -ForegroundColor Red
Write-Host " 共有:$($common.Count) 条" -ForegroundColor Gray

if ($added) { Write-Host "`n新增记录:" -ForegroundColor Green; $added | ForEach-Object { Write-Host " + $_" } }
if ($removed) { Write-Host "`n删除记录:" -ForegroundColor Red; $removed | ForEach-Object { Write-Host " - $_" } }
}

Compare-CsvData -ReferenceFile "C:\Data\servers-old.csv" -DifferenceFile "C:\Data\servers-new.csv" -KeyColumn "ServerName"

执行结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
CSV 对比结果:
新增:3 条
删除:1 条
共有:47 条

新增记录:
+ SRV-NEW-01
+ SRV-NEW-02
+ SRV-NEW-03

删除记录:
- SRV-OLD-05

注意事项

  1. 编码问题Import-Csv 在 PowerShell 5.1 中默认使用 ANSI 编码,中文 CSV 务必指定 -Encoding UTF8
  2. 引号处理:CSV 中包含逗号的字段应该用双引号包裹,Import-Csv 会自动处理
  3. 大文件性能Import-Csv 会将整个文件加载到内存,超大文件使用流式处理
  4. 类型转换:CSV 所有值都是字符串,数值比较和计算前需要显式转换类型
  5. NoTypeInformationExport-Csv 务必加 -NoTypeInformation,否则第一行会输出 .NET 类型信息
  6. 日期格式:CSV 中的日期格式不统一时,使用 [datetime]::ParseExact() 指定格式解析