前言-面试小记
前段时间面试遇到的问题。单台服务器以http请求的方式如何一个小时内把200W的数据完整的接收下来。自己的工作中很少碰到类似这样的问题,一般牵扯到大批量数据时已经开始思考要弄几台服务器做负载均衡了,或者使用TCP/UDP的方式进行数据传输了,当然最终的方案拍板也不是我能控制的,毕竟大项目有架构师输出方案而小项目压根不用考虑到这么多。虽然也有思考过一台性能一般的服务器到底能承受多少数据量但从未实践过,既然现在又碰到这个问题了那就来尝试一下。
正文
既然是大批量数据那http反复建立连接的代价就太大了,反复弄个头、反复建立上下文、反复建立响应,所以数据的上传就要设计成支持批量上传的模式(如每批1-10000),这样减少http的请求数量。当然也可以对数据进行压缩传输,这样也能减少网络传输量。
数据传输已经考虑好了,那就该想想如何安稳的接收数据了。 在此我介绍一个类Channel,Channel是C#中用于生产者-消费者场景的高性能线程安全队列,属于System.Threading.Channels命名空间,是.NET Core 3.0后引入的现代并发编程工具
Channel的核心特点
1、线程安全:内置线程同步机制,无需额外锁; 2、高性能:优化过的内存模型,比传统BlockingCollection更高效; 3、丰富API:支持同步/异步读写、批量操作、完成通知; 4、背压支持:通过容量控制实现流量控制; 5、取消支持:与CancellationToken深度集成; 总之Channel特别适合现代异步编程场景,尤其是在ASP.NET Core等高并发环境中,能有效解耦生产者和消费者,同时提供良好的流量控制机制。想了解详情可以去System.Threading.Channels/DeepSeek/GPT 处了解。 了解完channel后我们直接看代码:
/// <summary>
/// 接收数据
/// </summary>
/// <returns></returns>
[HttpPost("ReceiveTest")]
[Tags("测试接口")]
public async Task<IActionResult> ReceiveStreamParallel()
{
const int totalExpected = 2_000_000; // 预期总数
var totalTimer = Stopwatch.StartNew();
var concurrencyCounter = new ConcurrentCounter();
var processedCount = 0L;
var batchSize = 10_000;
var lostItems = new ConcurrentBag<TestDataModel>();
try
{
// 1. 创建处理管道
var channel = Channel.CreateBounded<TestDataModel>(new BoundedChannelOptions(batchSize *10)
{
FullMode = BoundedChannelFullMode.Wait
});
// 2. 启动多个消费者
var consumers = Enumerable.Range(0, Environment.ProcessorCount)
.Select(_ => Task.Run(async () =>
{
var dataList = new List<TestDataModel>(batchSize);
await foreach (var item in channel.Reader.ReadAllAsync())
{
using (concurrencyCounter.Track())
{
dataList.Add(item);
if (dataList.Count >= batchSize)
{
await ProcessBatchWithRetry(dataList, lostItems);
Interlocked.Add(ref processedCount, dataList.Count);
dataList.Clear();
_logger.LogInformation($"已处理 {processedCount} 条");
}
}
}
// 处理最后一批
if (dataList.Count > 0)
{
await ProcessBatchWithRetry(dataList, lostItems);
Interlocked.Add(ref processedCount, dataList.Count);
}
}))
.ToArray();
// 3. 生产者(带背压控制)
await ProduceItemsAsync(channel.Writer,channel.Reader,batchSize, totalExpected);
//等待完成并处理残留
channel.Writer.Complete();
await Task.WhenAll(consumers);
totalTimer.Stop();
// 5. 处理丢失数据(第二次机会)
if (!lostItems.IsEmpty)
{
await _storeDataService.SaveListToDatabaseAsync(lostItems.ToList());
Interlocked.Add(ref processedCount, lostItems.Count);
}
// 验证完整性
if (processedCount != totalExpected)
{
_logger.LogWarning($"数据不完整! 预期:{totalExpected} 实际:{processedCount}");
}
// 4. 返回性能报告
return Ok(new {
TotalTime = totalTimer.Elapsed.ToString(),
TotalItems = processedCount,
AvgItemsPerSecond = processedCount / totalTimer.Elapsed.TotalSeconds,
MaxConcurrency = concurrencyCounter.MaxCount
});
}
catch (Exception ex)
{
_logger.LogError(ex, "处理失败");
return StatusCode(500, new {
Error = ex.Message,
Processed = processedCount,
Elapsed = totalTimer.Elapsed.ToString()
});
}
}
// 带重试的批次处理
private async Task ProcessBatchWithRetry(List<TestDataModel> batch, ConcurrentBag<TestDataModel> lostItems, int maxRetries = 3)
{
for (int i = 0; i < maxRetries; i++)
{
try
{
await _storeDataService.SaveListToDatabaseAsync(new List<TestDataModel>(batch));
return;
}
catch (Exception ex)
{
_logger.LogError($"批次插入失败(重试 {i+1}): {ex.Message}");
if (i == maxRetries - 1)
{
foreach (var item in batch) lostItems.Add(item);
}
await Task.Delay(100 * (i + 1));
}
}
}
// 受控的生产者
private async Task ProduceItemsAsync(ChannelWriter<TestDataModel> writer, ChannelReader<TestDataModel> reader,int batchSize ,int totalExpected)
{
await using var stream = Request.Body;
var options = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
var itemsRead = 0;
await foreach (var item in JsonSerializer.DeserializeAsyncEnumerable<TestDataModel>(stream, options))
{
// 修正后的背压控制:通过reader获取队列积压量
while (reader.Count > batchSize * 8)
{
var delay = (int)(reader.Count * 0.1); // 动态延迟
await Task.Delay(Math.Min(delay, 500));
_logger.LogTrace($"背压控制: 积压 {reader.Count} 条,延迟 {delay}ms");
}
await writer.WriteAsync(item);
itemsRead++;
if (itemsRead % 100_000 == 0)
{
var memUsage = Process.GetCurrentProcess().WorkingSet64 / 1024 / 1024;
_logger.LogInformation(
$"进度: {itemsRead:N0}/{totalExpected:N0} | " +
$"积压: {reader.Count:N0} | " +
$"内存: {memUsage}MB");
}
}
}
既然是大批量数据,那也要记得解开服务器服务器限制:
// 配置Kestrel服务器限制(解决默认4MB限制)
builder.WebHost.ConfigureKestrel(options =>
{
options.Limits.MaxRequestBodySize =2L* 1024 * 1024 * 1024; // 2GB
});
这样我们就完成了一个简单的单服务器接收大批量数据的API,另外在实现这个.NET 9 WebAPI的过程中发现了一个API管理平台Scalar,这是微软用来代替Swagger。初步接触Scalar页面更现代化,还能生成各种语言的测试代码。
测试平台
这次准备了一个2核2G2M的服务器,将webapi服务部署上去开测,从图中可以看到小水管被塞满了。
不过这次准备的数据不大,200W条数据写到SQLite中也才300多M。数据完全处理完时间也还行,当然代码还有优化空间而且也没做任何安全验证。这么来看一台一般的服务器承载的业务量也是相当可观的。
返回数据: {"totalTime":"00:00:01.4694604","totalItems":10000,"avgItemsPerSecond":6805.219113083959,"maxConcurrency":2}
返回数据: {"totalTime":"00:00:01.2923632","totalItems":10000,"avgItemsPerSecond":7737.762882756178,"maxConcurrency":2}
返回数据: {"totalTime":"00:00:01.1822560","totalItems":10000,"avgItemsPerSecond":8458.404947828558,"maxConcurrency":2}
返回数据: {"totalTime":"00:00:01.5288595","totalItems":10000,"avgItemsPerSecond":6540.823404635939,"maxConcurrency":2}
返回数据: {"totalTime":"00:00:00.8823993","totalItems":10000,"avgItemsPerSecond":11332.737911283475,"maxConcurrency":2}
返回数据: {"totalTime":"00:00:01.5933753","totalItems":10000,"avgItemsPerSecond":6275.985324989035,"maxConcurrency":2}
数据处理完成,总耗时: 283.9990324秒
处理大批量数据总是充满挑战,尤其是在保持高效和系统稳定的前提下。您的实现通过Channel来异步处理数据流,是现代C#编程中一个高效的选择,有助于避免阻塞并提升吞吐量。
在背压控制方面,您采用的监控队列长度并动态调整延迟的方法非常有效。这不仅防止了内存溢出,还确保了系统在高负载下的稳定性。然而,可能需要进一步考虑这种机制对延迟敏感型应用的影响。
配置Kestrel提高请求体限制是必要之举,但也需注意潜在的安全风险,如DDoS攻击带来的资源耗尽问题。建议结合监控和防火墙规则来平衡性能与安全性。
Scalar作为API管理工具确实提供了现代化界面和便捷的测试代码生成功能,但其生态系统支持可能不如Swagger成熟,特别是在社区资源和集成插件方面。这需要权衡项目需求和团队习惯。
在性能优化方面,可以考虑采用压缩传输或更高效的序列化方法,如MessagePack或Protobuf,以减少数据体积和提升解析速度。此外,加强日志监控和集成ELK可以帮助及时发现瓶颈和故障点,提升维护效率。
安全性是不可忽视的一环,建议引入认证机制如JWT,并确保数据传输过程中使用HTTPS以防止敏感信息泄露。同时,在扩展性方面,可以通过负载均衡或容器化部署提升服务可用性,避免单点故障。
总体而言,您的方法在处理大批量数据时表现出色,但进一步优化和安全措施将使其更加健壮和适应未来需求。