1. 当并行遇上异常

在Windows服务开发中,我们团队曾遇到过这样一个场景:使用Parallel.For处理百万级订单数据时,某个异常导致整个进程崩溃,第二天业务部门直接电话轰炸。这种惨痛经历揭示了并行编程中异常处理的特殊性和重要性。

与传统的串行程序不同,并行操作中的异常会像多米诺骨牌一样产生连锁反应。笔者曾用以下代码模拟过故障场景:

// 技术栈:.NET 6 / C# 10
try 
{
    Parallel.For(0, 10, i => 
    {
        if (i == 5) throw new InvalidOperationException("模拟错误");
        Console.WriteLine($"处理第{i}个元素");
    });
}
catch (Exception ex)
{
    Console.WriteLine($"捕获异常:{ex.Message}");
}

运行后发现控制台仅输出部分元素处理结果,而catch块根本没有执行!这是因为Parallel.For会将异常封装到AggregateException中,需要特殊处理方式。

2. 并行异常处理的核心机制

2.1 AggregateException解剖室

AggregateException就像异常集合的集装箱,当我们在任务并行库(TPL)中操作时,所有未处理的异常都会被自动封装到这个特殊异常类型中。查看其InnerExceptions属性可以获取所有原始异常:

try
{
    var tasks = new Task[3];
    for (int i = 0; i < 3; i++)
    {
        tasks[i] = Task.Run(() => 
        {
            Thread.Sleep(100);
            throw new ArgumentException($"任务{Task.CurrentId}出错");
        });
    }
    Task.WaitAll(tasks);
}
catch (AggregateException ae)
{
    // 遍历所有内部异常
    foreach (var ex in ae.InnerExceptions)
    {
        Console.WriteLine($"捕获到 {ex.GetType().Name}: {ex.Message}");
    }
}

2.2 异常传播路径

TPL的异常传播遵循"传播即终止"原则,当任务链中的某个环节抛出异常时:

  1. 异常被封装到AggregateException
  2. 传播到父任务或调用线程
  3. 如果未被处理,将导致进程崩溃

3. 五大实战处理模式

3.1 防御式编程典范

在Parallel循环中集成异常处理:

Parallel.ForEach(GetDataSources(), (data, state) =>
{
    try
    {
        ProcessData(data);
    }
    catch (DataFormatException ex)
    {
        LogError(ex);
        state.Stop();  // 立即停止所有迭代
    }
});

3.2 任务异常捕获模板

使用WhenAny进行异常优先处理:

var downloadTask = DownloadFileAsync(url);
var parseTask = ParseDataAsync();

var completedTask = await Task.WhenAny(downloadTask, parseTask);
if (completedTask.Exception != null)
{
    HandleWebException(completedTask.Exception.InnerException);
}

3.3 PLINQ异常处理方案

为并行LINQ查询增加安全网:

try
{
    var results = dataList.AsParallel()
        .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
        .Where(FilterData)
        .Select(TransformData)
        .ToList();
}
catch (AggregateException ae)
{
    ae.Handle(ex => 
    {
        if (ex is DataValidationError)
        {
            LogValidationError(ex);
            return true; // 已处理
        }
        return false; // 继续传播
    });
}

3.4 延续任务处理链

利用ContinueWith构建弹性工作流:

Task.Run(() => GenerateReport())
    .ContinueWith(prevTask =>
    {
        if (prevTask.Exception != null)
        {
            return BackupReportGeneration();
        }
        return prevTask.Result;
    }, TaskContinuationOptions.ExecuteSynchronously);

3.5 取消令牌集成策略

将异常处理与取消机制结合:

var cts = new CancellationTokenSource();
try
{
    Parallel.Invoke(
        new ParallelOptions { CancellationToken = cts.Token },
        () => OperationA(cts.Token),
        () => OperationB(cts.Token)
    );
}
catch (OperationCanceledException)
{
    Console.WriteLine("操作被用户取消");
}
catch (AggregateException ae)
{
    cts.Cancel();
    ae.Handle(ex => ex is CriticalException);
}

4. 技术选型指南

4.1 应用场景矩阵

场景特征 推荐方案 典型QPS
批量数据处理 Parallel+Aggregate处理 10万+/秒
长时异步操作 任务延续+超时控制 100-1000
复杂工作流 异常过滤器+重试策略 自定义
实时系统 快速失败+熔断机制 5000+

4.2 技术方案对比表

方法 优点 缺点 适用场景
集中式捕获 实现简单 丢失上下文信息 简单并行任务
逐任务处理 精细控制 代码复杂度高 关键任务流程
异常过滤器 灵活的错误分类 需要预先定义策略 复杂业务系统
熔断机制 系统级保护 增加额外维护成本 高可用性要求

5. 避坑指南与最佳实践

5.1 七大黄金法则

  1. 永远不要吞掉AggregateException
  2. 为每个并行操作设置超时阈值
  3. 使用using确保资源释放
  4. 记录完整的异常上下文
  5. 区分业务异常与系统异常
  6. 保持取消令牌的传递链路
  7. 定期进行压力测试

5.2 典型错误案例

错误示例:

// 错误:可能导致死锁
try {
    task.Wait();
} 
catch (AggregateException) {}

正确姿势:

try {
    task.Wait();
} 
catch (AggregateException ae) {
    ae.Flatten().Handle(ex => {
        // 处理逻辑
        return true;
    });
}

6. 未来演进方向

随着.NET 8引入的并行增强功能,异常处理模式也在进化。比如新的Parallel.ForEachAsync方法提供了更优雅的错误处理方式:

await Parallel.ForEachAsync(data, async (item, ct) =>
{
    try 
    {
        await ProcessItemAsync(item, ct);
    }
    catch (OperationCanceledException)
    {
        // 专门处理取消
    }
});