前言-面试小记

前段时间面试遇到的问题。单台服务器以http请求的方式如何一个小时内把200W的数据完整的接收下来。自己的工作中很少碰到类似这样的问题,一般牵扯到大批量数据时已经开始思考要弄几台服务器做负载均衡了,或者使用TCP/UDP的方式进行数据传输了,当然最终的方案拍板也不是我能控制的,毕竟大项目有架构师输出方案而小项目压根不用考虑到这么多。虽然也有思考过一台性能一般的服务器到底能承受多少数据量但从未实践过,既然现在又碰到这个问题了那就来尝试一下。

正文

既然是大批量数据那http反复建立连接的代价就太大了,反复弄个头、反复建立上下文、反复建立响应,所以数据的上传就要设计成支持批量上传的模式(如每批1-10000),这样减少http的请求数量。当然也可以对数据进行压缩传输,这样也能减少网络传输量。

数据传输已经考虑好了,那就该想想如何安稳的接收数据了。 在此我介绍一个类Channel,Channel是C#中用于生产者-消费者场景的高性能线程安全队列,属于System.Threading.Channels命名空间,是.NET Core 3.0后引入的现代并发编程工具

Channel的核心特点

1、线程安全:内置线程同步机制,无需额外锁; 2、高性能:优化过的内存模型,比传统BlockingCollection更高效; 3、丰富API:支持同步/异步读写、批量操作、完成通知; 4、背压支持:通过容量控制实现流量控制; 5、取消支持:与CancellationToken深度集成; 总之Channel特别适合现代异步编程场景,尤其是在ASP.NET Core等高并发环境中,能有效解耦生产者和消费者,同时提供良好的流量控制机制。想了解详情可以去System.Threading.Channels/DeepSeek/GPT 处了解。 了解完channel后我们直接看代码:

 /// <summary>
    /// 接收数据
    /// </summary>
    /// <returns></returns>
    [HttpPost("ReceiveTest")]
    [Tags("测试接口")]
    public async Task<IActionResult> ReceiveStreamParallel()
    {
        const int totalExpected = 2_000_000; // 预期总数
        var totalTimer = Stopwatch.StartNew();
        var concurrencyCounter = new ConcurrentCounter();
        var processedCount = 0L;
        var batchSize = 10_000;
        var lostItems = new ConcurrentBag<TestDataModel>();
        try
        {
            // 1. 创建处理管道
            var channel = Channel.CreateBounded<TestDataModel>(new BoundedChannelOptions(batchSize *10)
            {
                FullMode = BoundedChannelFullMode.Wait
            });

            // 2. 启动多个消费者
            var consumers = Enumerable.Range(0, Environment.ProcessorCount)
                .Select(_ => Task.Run(async () =>
                {
                    var dataList = new List<TestDataModel>(batchSize);
                    await foreach (var item in channel.Reader.ReadAllAsync())
                    {
                        using (concurrencyCounter.Track())
                        {
                            dataList.Add(item);
                            if (dataList.Count >= batchSize)
                            {

                                await ProcessBatchWithRetry(dataList, lostItems);
                                Interlocked.Add(ref processedCount, dataList.Count);
                                dataList.Clear();
                                _logger.LogInformation($"已处理 {processedCount} 条");
                            }
                        }
                    }
                    // 处理最后一批
                    if (dataList.Count > 0)
                    {
                        await ProcessBatchWithRetry(dataList, lostItems);
                        Interlocked.Add(ref processedCount, dataList.Count);
                    }
                }))
                .ToArray();

            // 3. 生产者(带背压控制)
            await ProduceItemsAsync(channel.Writer,channel.Reader,batchSize, totalExpected);
            
            //等待完成并处理残留
            channel.Writer.Complete();
            await Task.WhenAll(consumers);
            totalTimer.Stop();
            
            // 5. 处理丢失数据(第二次机会)
            if (!lostItems.IsEmpty)
            {
                await _storeDataService.SaveListToDatabaseAsync(lostItems.ToList());
                Interlocked.Add(ref processedCount, lostItems.Count);
            }

            // 验证完整性
            if (processedCount != totalExpected)
            {
                _logger.LogWarning($"数据不完整! 预期:{totalExpected} 实际:{processedCount}");
            }

            // 4. 返回性能报告
            return Ok(new {
                TotalTime = totalTimer.Elapsed.ToString(),
                TotalItems = processedCount,
                AvgItemsPerSecond = processedCount / totalTimer.Elapsed.TotalSeconds,
                MaxConcurrency = concurrencyCounter.MaxCount
            });
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "处理失败");
            return StatusCode(500, new { 
                Error = ex.Message,
                Processed = processedCount,
                Elapsed = totalTimer.Elapsed.ToString() 
            });
        }
    }
    
    // 带重试的批次处理
    private async Task ProcessBatchWithRetry(List<TestDataModel> batch, ConcurrentBag<TestDataModel> lostItems, int maxRetries = 3)
    {
        for (int i = 0; i < maxRetries; i++)
        {
            try
            {
                await _storeDataService.SaveListToDatabaseAsync(new List<TestDataModel>(batch));
                return;
            }
            catch (Exception ex)
            {
                _logger.LogError($"批次插入失败(重试 {i+1}): {ex.Message}");
                if (i == maxRetries - 1)
                {
                    foreach (var item in batch) lostItems.Add(item);
                }
                await Task.Delay(100 * (i + 1));
            }
        }
    }
    
    // 受控的生产者
    private async Task ProduceItemsAsync(ChannelWriter<TestDataModel> writer, ChannelReader<TestDataModel> reader,int batchSize ,int totalExpected)
    {
        await using var stream = Request.Body;
        var options = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
    
        var itemsRead = 0;
        await foreach (var item in JsonSerializer.DeserializeAsyncEnumerable<TestDataModel>(stream, options))
        {

            // 修正后的背压控制:通过reader获取队列积压量
            while (reader.Count > batchSize * 8) 
            {
                var delay = (int)(reader.Count * 0.1); // 动态延迟
                await Task.Delay(Math.Min(delay, 500));
                _logger.LogTrace($"背压控制: 积压 {reader.Count} 条,延迟 {delay}ms");
            }

            await writer.WriteAsync(item);
            itemsRead++;

            if (itemsRead % 100_000 == 0)
            {
                var memUsage = Process.GetCurrentProcess().WorkingSet64 / 1024 / 1024;
                _logger.LogInformation(
                    $"进度: {itemsRead:N0}/{totalExpected:N0} | " +
                    $"积压: {reader.Count:N0} | " +
                    $"内存: {memUsage}MB");
            }
        }
    }

既然是大批量数据,那也要记得解开服务器服务器限制:

// 配置Kestrel服务器限制(解决默认4MB限制)
builder.WebHost.ConfigureKestrel(options =>
{
    options.Limits.MaxRequestBodySize =2L* 1024 * 1024 * 1024; // 2GB
});

这样我们就完成了一个简单的单服务器接收大批量数据的API,另外在实现这个.NET 9 WebAPI的过程中发现了一个API管理平台Scalar,这是微软用来代替Swagger。初步接触Scalar页面更现代化,还能生成各种语言的测试代码。 Scalar

测试平台

这次准备了一个2核2G2M的服务器,将webapi服务部署上去开测,从图中可以看到小水管被塞满了。 服务器带宽使用率 不过这次准备的数据不大,200W条数据写到SQLite中也才300多M。数据完全处理完时间也还行,当然代码还有优化空间而且也没做任何安全验证。这么来看一台一般的服务器承载的业务量也是相当可观的。

返回数据: {"totalTime":"00:00:01.4694604","totalItems":10000,"avgItemsPerSecond":6805.219113083959,"maxConcurrency":2}
返回数据: {"totalTime":"00:00:01.2923632","totalItems":10000,"avgItemsPerSecond":7737.762882756178,"maxConcurrency":2}
返回数据: {"totalTime":"00:00:01.1822560","totalItems":10000,"avgItemsPerSecond":8458.404947828558,"maxConcurrency":2}
返回数据: {"totalTime":"00:00:01.5288595","totalItems":10000,"avgItemsPerSecond":6540.823404635939,"maxConcurrency":2}
返回数据: {"totalTime":"00:00:00.8823993","totalItems":10000,"avgItemsPerSecond":11332.737911283475,"maxConcurrency":2}
返回数据: {"totalTime":"00:00:01.5933753","totalItems":10000,"avgItemsPerSecond":6275.985324989035,"maxConcurrency":2}
数据处理完成,总耗时: 283.9990324秒

处理时间