1. 当批量操作成为必选项

在电商大促的零点秒杀场景中,我们曾遇到每秒上万条订单数据的写入需求;在物联网领域,传感器设备每分钟产生的数万条监测数据需要实时入库;在金融行业,每天收盘后的批量持仓更新直接影响着次日的交易决策。这些真实场景都在向我们传递一个明确信号:掌握高效的批量数据处理技术,是现代后端开发的必修课。

2. 技术兵器库的选择

2.1 SqlBulkCopy:批量插入的"冲锋枪"

// 使用SqlBulkCopy实现高效批量插入
public void BulkInsertOrders(List<Order> orders)
{
    using (var connection = new SqlConnection(_connectionString))
    {
        connection.Open();
        
        // 创建内存数据表结构
        DataTable orderTable = new DataTable();
        orderTable.Columns.Add("OrderID", typeof(int));
        orderTable.Columns.Add("CustomerID", typeof(string));
        orderTable.Columns.Add("OrderDate", typeof(DateTime));
        orderTable.Columns.Add("TotalAmount", typeof(decimal));

        // 填充数据(示例数据)
        foreach (var order in orders)
        {
            var row = orderTable.NewRow();
            row["OrderID"] = order.Id;
            row["CustomerID"] = order.CustomerId;
            row["OrderDate"] = order.CreateTime;
            row["TotalAmount"] = order.Amount;
            orderTable.Rows.Add(row);
        }

        // 配置批量插入参数
        using (var bulkCopy = new SqlBulkCopy(connection))
        {
            bulkCopy.DestinationTableName = "dbo.Orders";
            bulkCopy.BatchSize = 5000; // 每批5000条
            bulkCopy.BulkCopyTimeout = 600; // 超时10分钟
            bulkCopy.EnableStreaming = true; // 启用流式传输
            
            // 列映射(可选)
            bulkCopy.ColumnMappings.Add("OrderID", "OrderID");
            bulkCopy.ColumnMappings.Add("CustomerID", "CustomerCode");
            bulkCopy.ColumnMappings.Add("OrderDate", "CreateDate");
            bulkCopy.ColumnMappings.Add("TotalAmount", "Amount");

            // 执行批量插入
            bulkCopy.WriteToServer(orderTable);
        }
    }
}

技术栈:.NET Framework 4.8 / .NET Core 3.1+,System.Data.SqlClient 4.8.3

2.2 表值参数+MERGE:批量更新的"瑞士军刀"

// 使用表值参数实现批量更新
public void BulkUpdateProducts(List<Product> products)
{
    using (var connection = new SqlConnection(_connectionString))
    {
        connection.Open();
        
        // 创建内存数据表
        DataTable productTable = new DataTable();
        productTable.Columns.Add("ProductID", typeof(int));
        productTable.Columns.Add("ProductName", typeof(string));
        productTable.Columns.Add("UnitPrice", typeof(decimal));
        productTable.Columns.Add("Stock", typeof(int));

        foreach (var product in products)
        {
            productTable.Rows.Add(
                product.Id,
                product.Name,
                product.Price,
                product.Stock
            );
        }

        // 使用存储过程处理更新
        using (var command = new SqlCommand("usp_UpdateProducts", connection))
        {
            command.CommandType = CommandType.StoredProcedure;
            command.Parameters.Add(new SqlParameter
            {
                ParameterName = "@productUpdates",
                SqlDbType = SqlDbType.Structured,
                TypeName = "dbo.ProductUpdateType",
                Value = productTable
            });

            // 启用事务
            using (var transaction = connection.BeginTransaction())
            {
                command.Transaction = transaction;
                try
                {
                    command.ExecuteNonQuery();
                    transaction.Commit();
                }
                catch
                {
                    transaction.Rollback();
                    throw;
                }
            }
        }
    }
}

/* SQL Server端存储过程示例:
CREATE PROCEDURE usp_UpdateProducts
    @productUpdates dbo.ProductUpdateType READONLY
AS
BEGIN
    MERGE INTO Products AS target
    USING @productUpdates AS source
    ON target.ProductID = source.ProductID
    WHEN MATCHED THEN
        UPDATE SET 
            ProductName = source.ProductName,
            UnitPrice = source.UnitPrice,
            Stock = source.Stock
    WHEN NOT MATCHED THEN
        INSERT (ProductID, ProductName, UnitPrice, Stock)
        VALUES (source.ProductID, source.ProductName, source.UnitPrice, source.Stock);
END
*/

技术栈:SQL Server 2016+,C# 8.0,System.Data.SqlClient 4.8.3

3. 技术方案选型指南

3.1 适用场景对比

场景特征 SqlBulkCopy 表值参数+MERGE
纯插入操作 ★★★★★ ★★★☆☆
存在更新需求 不支持 ★★★★★
数据量 > 100万 ★★★★★ ★★★★☆
需要复杂业务逻辑 ★☆☆☆☆ ★★★★★
实时性要求高 ★★★★☆ ★★★☆☆

3.2 性能优化技巧

  • 批处理拆分:当单次操作超过50万条时,建议分批次处理(10-20万/批)
  • 索引策略:批量操作前禁用非聚集索引,操作后重建
  • 内存优化:使用DataTable时注意及时释放内存
  • 超时设置:根据数据量合理配置CommandTimeout和BulkCopyTimeout

4. 避坑指南:那些年我们踩过的坑

4.1 数据类型映射陷阱

在最近一个金融项目中,我们遇到Decimal精度丢失的问题。解决方案是明确指定DataTable列的精度:

DataColumn priceColumn = new DataColumn("Price", typeof(decimal));
priceColumn.ExtendedProperties.Add("Precision", 18);
priceColumn.ExtendedProperties.Add("Scale", 4);

4.2 事务管理的正确姿势

使用嵌套事务时需要注意事务隔离级别:

var options = new TransactionOptions {
    IsolationLevel = IsolationLevel.ReadCommitted,
    Timeout = TimeSpan.FromMinutes(5)
};
using (var scope = new TransactionScope(TransactionScopeOption.Required, options))
{
    // 批量操作代码
    scope.Complete();
}

4.3 并发控制策略

在高并发场景下,建议配合使用:

  1. 行版本控制(ROWVERSION)
  2. 乐观并发控制
  3. 重试策略(Polly库)

5. 进阶之路:关联技术生态

5.1 与Dapper的集成

// 结合Dapper执行批量操作
public void HybridBulkInsert(IEnumerable<User> users)
{
    using (var conn = new SqlConnection(_connectionString))
    {
        conn.Open();
        using (var transaction = conn.BeginTransaction())
        {
            // 使用Dapper执行预处理
            conn.Execute("DELETE FROM TempUsers", transaction: transaction);
            
            // 使用SqlBulkCopy
            var bulkCopy = new SqlBulkCopy(conn, SqlBulkCopyOptions.Default, transaction);
            // ...配置bulkCopy...
            
            // 混合事务提交
            transaction.Commit();
        }
    }
}

5.2 现代替代方案展望

虽然SqlBulkCopy仍然是.NET生态中最快的批量操作方案,但在.NET Core环境中也可以考虑:

  • Entity Framework Core的BulkExtensions
  • Npgsql的BulkCopy(PostgreSQL)
  • Azure Cosmos DB的批量执行器

6. 总结:选择你的武器

经过多个项目的实战检验,我们总结出以下经验法则:

  • 数据清洗场景:SqlBulkCopy + 临时表
  • 实时同步需求:内存表 + MERGE语句
  • 复杂业务逻辑:TVP + 存储过程
  • 超大数据量:分区切换技术

最终选择取决于你的具体场景:是更看重吞吐量,还是需要灵活的更新逻辑?是处理结构化数据,还是需要复杂转换?理解每种方案的优势边界,才能在各种业务场景中游刃有余。