在 C 中实现秒杀(抢购)功能,控制并发是核心挑战。想象一下,当服务器收到成千上万个用户请求,大家都在争夺同一件商品时,如果不加控制,会发生什么?
场景还原:
设想我们有一件商品,库存只有 10 件。秒杀开始的瞬间,可能有 10000 个用户同时发起请求。
不加控制的灾难:
1. 超卖(Overselling): 如果没有并发控制,多个用户可能同时读取到库存为 10。当他们进行扣减操作时,可能都成功了,最终卖出了 11 件、12 件,甚至更多,这绝对是灾难性的。
2. 数据不一致: 即使不超卖,因为多个请求同时修改库存,数据库或其他存储的记录可能会出现混乱,导致最终的库存数字不准确。
3. 系统崩溃: 大量未被有效处理的请求堆积,耗尽服务器资源(CPU、内存、网络带宽),导致系统响应缓慢甚至宕机。
那么,我们该如何巧妙地“驯服”这股并发洪流呢?
在 C 中,我们可以从多个层面着手,组合使用各种策略来构建一个健壮的秒杀系统。
1. 原子性操作:确保“一步到位”
最根本的控制在于,当用户抢购时,读取库存、判断库存、扣减库存这三个动作必须成为一个“不可分割”的整体。也就是说,在执行完这三个步骤之前,不允许任何其他请求来干扰。
想象一下,你是一个收银员,正在给顾客打包商品。你不能在检查到商品还有货之后,跑去处理另一个顾客,然后才回来打包。你需要一步到位,处理完一个顾客的交易,再进行下一个。
在 C 中,这通常意味着利用数据库的事务或者某些特定的锁定机制。
数据库事务 (Transactions): 这是最常用的手段。当你开始一个数据库事务,然后在其中执行读取库存、判断库存、扣减库存的操作,并最终提交事务时,数据库系统会保证这些操作的原子性。如果在事务过程中有其他操作试图修改同一份库存数据,数据库会阻塞或回滚其中一个事务,确保数据的一致性。
例子:
```csharp
using (var transaction = _dbContext.Database.BeginTransaction())
{
try
{
// 1. 获取当前商品信息,包括库存
var product = await _dbContext.Products
.Where(p => p.Id == productId && p.Stock > 0)
.FirstOrDefaultAsync();
if (product == null)
{
// 库存不足,直接返回
transaction.Rollback(); // 即使没有修改,也回滚一下
return "库存不足";
}
// 2. 扣减库存
product.Stock;
await _dbContext.SaveChangesAsync();
// 3. 记录抢购订单(这一步也可以包含在事务内)
await _dbContext.Orders.AddAsync(new Order { UserId = userId, ProductId = productId });
await _dbContext.SaveChangesAsync();
// 4. 提交事务
await transaction.CommitAsync();
return "抢购成功!";
}
catch (DbUpdateConcurrencyException)
{
// 处理并发冲突,例如:另一个用户在同一时间修改了库存
transaction.Rollback();
return "抢购失败,请重试"; // 提示用户重试
}
catch (Exception ex)
{
transaction.Rollback();
// 记录日志等
return $"抢购出现错误: {ex.Message}";
}
}
```
这里的关键是 `DbUpdateConcurrencyException`。如果两个事务在读取库存后,都尝试更新同一件商品,数据库会检测到这种情况,并抛出这个异常。我们捕获它,然后回滚事务,并让用户重试。
数据库的行级锁 (RowLevel Locking): 许多数据库也支持在读取数据时加上锁,比如 `SELECT ... FOR UPDATE`。这会锁定那一行数据,直到事务结束。
例子(SQL Server):
```sql
BEGIN TRANSACTION;
DECLARE @CurrentStock INT;
SELECT @CurrentStock = Stock FROM Products WITH (UPDLOCK, ROWLOCK) WHERE Id = @ProductId; 使用 UPdLOKC 获取排他锁
IF @CurrentStock > 0
BEGIN
UPDATE Products
SET Stock = Stock 1
WHERE Id = @ProductId;
插入订单记录
INSERT INTO Orders (UserId, ProductId) VALUES (@UserId, @ProductId);
COMMIT TRANSACTION;
SELECT '抢购成功!';
END
ELSE
BEGIN
ROLLBACK TRANSACTION;
SELECT '库存不足';
END
```
这种方式直接在数据库层面锁定了数据行,能非常有效地防止超卖。
2. 速率限制 (Rate Limiting):过滤掉“捣乱的”
在秒杀开始时,真正的用户可能只有几千人,但恶意攻击(如爬虫、黄牛脚本)可能会发起几十万甚至上百万的请求。如果不对这些请求进行限制,它们会瞬间压垮系统,让真正的用户也无法参与。
想象一下,入口只有一个,如果前面挤满了无意义的队伍,后面真正想买东西的人就进不去了。
IP 地址限制: 限制来自同一 IP 地址在短时间内的请求次数。
用户 ID 限制: 限制同一用户在短时间内的请求次数。
令牌桶 (Token Bucket) 或漏桶 (Leaky Bucket) 算法: 这些是经典的速率限制算法。
令牌桶: 想象一个水桶,可以不断地往里面加水(令牌)。每当一个请求过来,就从桶里拿走一个令牌。如果桶空了,请求就被拒绝。桶里的令牌满了,就不会再加了。这允许一定程度的突发流量。
漏桶: 想象一个漏斗,请求就像水一样倒入。水会以恒定的速率从漏斗底部漏出(被处理)。如果水倒得太快,漏斗会溢出,请求就被拒绝。这主要用于平滑流量。
在 C Web API 中,可以使用像 `AspNetCoreRateLimit` 这样的库来方便地实现。
示例(概念性,通常会集成到中间件):
```csharp
// 在 Startup.cs 或 Program.cs 中配置
services.AddMemoryCache(); // 需要缓存来存储速率限制信息
services.Configure(Configuration.GetSection("IpRateLimiting"));
services.AddSingleton();
services.AddSingleton();
services.AddSingleton();
services.AddSingleton();
// ... 将 RateLimiterMiddleware 添加到管道中
```
你需要在 `app.Use` 语句中加入这个中间件,它会在请求处理早期生效,过滤掉不符合规则的请求。
3. 乐观并发控制 (Optimistic Concurrency) / 版本号:检测“后来者”
与前面提到的悲观锁(直接锁定数据)不同,乐观并发控制假设冲突发生的概率较低。它允许请求“同时”进行,但在最终提交时,会检查数据是否在自己读取之后被其他请求修改过。
想象一下,你借一本杂志,大家都以为还在那里。你拿到杂志,开始阅读,但回家后发现,你借的那本其实被另一个朋友不小心拿走,并且已经放回去了,但是杂志的内容可能已经被修改了(比如撕了一页)。你发现内容不对,就知道这本杂志被别人动过了。
在 C 中,这通常通过在数据库表中添加一个 `Version` 或 `RowVersion` 列来实现。
1. 读取数据时: 同时读取商品的库存和它的版本号。
2. 更新数据时: 在 `UPDATE` 语句中,同时指定 `Stock = Stock 1` 和 `WHERE Id = @ProductId AND Version = @OriginalVersion`。
3. 数据库检查: 如果数据库中的 `Version` 值与你读取到的 `OriginalVersion` 不符,说明数据已被其他事务修改,`UPDATE` 语句会影响 0 行,你就知道发生了并发冲突。
例子:
数据库表结构可能如下:
```sql
CREATE TABLE Products (
Id INT PRIMARY KEY,
Name VARCHAR(255),
Stock INT,
Version BIGINT NOT NULL 版本号
);
```
C 代码(使用 EF Core):
```csharp
// 假设 Product.Version 是一个 long 类型属性
var product = await _dbContext.Products
.Where(p => p.Id == productId && p.Stock > 0)
.AsNoTracking() // 避免 EF Core 跟踪,以模拟独立读取
.FirstOrDefaultAsync();
if (product == null)
{
return "库存不足";
}
// 尝试更新,同时检查版本号
product.Stock;
product.Version++; // 递增版本号
try
{
// EF Core 会自动根据主键和并发令牌(如果配置了)来生成 WHERE 子句
// 如果配置了 RowVersion,EF Core 会自动处理
// 如果手动管理版本号,你需要确保 SQL 包含 Version = @OriginalVersion
await _dbContext.SaveChangesAsync(); // EF Core 会生成类似 UPDATE ... WHERE Id = @ProductId AND Version = @OriginalVersion
return "抢购成功!";
}
catch (DbUpdateConcurrencyException)
{
// 版本号不匹配,说明在读取后,库存已被其他用户修改
return "抢购失败,请重试";
}
catch (Exception ex)
{
// ... 其他错误处理
return $"抢购出现错误: {ex.Message}";
}
```
EF Core 的 `RowVersion` 属性(通常映射到数据库的 `rowversion` 或 `timestamp` 类型)会自动处理这个版本比较。如果手动管理 `Version` 列,你需要确保在 `SaveChangesAsync` 之前,你的实体状态是 `Modified`,并且 `Version` 属性的值是读取时的那个旧值(EF Core 默认不会跟踪非主键/外键属性的旧值,所以可能需要一些手动技巧,或者确保 EF Core 知道这个属性是用于并发控制的)。
4. 请求队列和异步处理:让“排队”变合理
即使有了上述控制,如果同一时间涌入的用户数远超系统处理能力,仍然可能导致请求积压。这时,将请求放入一个队列,然后由工作线程以可控的速度处理,是一种有效的方法。
想象一个繁忙的餐厅,服务员(处理能力)有限。客人(请求)到了,如果没有位置,就在门口排队等候。服务员忙完一个桌子,再去叫下一批等候的客人。
内存队列 (Memory Queue): 对于单个服务器实例,可以使用 `ConcurrentQueue` 或 TPL Dataflow 的 `BufferBlock` 等内存数据结构来构建简单的队列。
分布式队列 (Distributed Queue): 如果是分布式系统,可以使用 Redis 的 List/Stream、RabbitMQ、Kafka 等消息队列。
工作流程:
1. 接收请求: Web API 接收到秒杀请求。
2. 入队: 将请求信息(用户ID、商品ID)放入消息队列。
3. 消费者处理: 后台有专门的消费者服务(或者一个独立的后台任务),从队列中拉取请求。
4. 执行业务逻辑: 消费者执行前面提到的带有原子性操作的抢购逻辑。
5. 结果反馈(可选): 将处理结果(成功/失败)通过某种方式(如 WebSocket、通知服务)返回给用户。
C 结合 Redis 实现队列(概念性):
```csharp
// 假设 RedisHelper 提供了 LPush 和 BLPop 方法
public class SeckillService
{
private readonly RedisHelper _redisHelper;
private readonly ProductService _productService; // 包含原子性抢购逻辑
private readonly ConcurrentBag _results; // 简单示例,实际应有更复杂的反馈机制
public SeckillService(RedisHelper redisHelper, ProductService productService)
{
_redisHelper = redisHelper;
_productService = productService;
_results = new ConcurrentBag();
}
// 用户发起抢购
public async Task EnqueueSeckillRequest(string userId, int productId)
{
var requestData = $"{userId}:{productId}";
await _redisHelper.LPush("seckill_queue", requestData); // 将请求放入 Redis 队列
}
// 后台消费者启动一个长时间运行的任务
public void StartConsumer()
{
Task.Run(async () =>
{
while (true)
{
// BLPop 会阻塞直到有数据,避免忙轮询
var result = await _redisHelper.BLPop("seckill_queue", TimeSpan.FromSeconds(5));
if (result != null && result.Length > 0)
{
var requestData = result[1]; // BLPop 返回 [key, value]
var parts = requestData.Split(':');
var userId = parts[0];
var productId = int.Parse(parts[1]);
// 执行抢购逻辑
var outcome = await _productService.ProcessSeckill(userId, productId);
_results.Add($"{requestData} > {outcome}"); // 记录结果
}
}
});
}
}
```
这种方式将请求的接收与处理分离,Web 服务器可以快速响应用户“已加入队列”,而实际的抢购逻辑由后台工作者异步处理,大大降低了 Web 服务器的压力。
5. 预减库存或分布式锁:提前锁定
在一些场景下,我们甚至可以在用户真正执行扣减操作之前,就做一些预处理。
预减库存 (Predecrement Stock): 在用户进入抢购环节,或者通过某种验证(如验证码、登录状态)后,先从总库存中预先扣减,但这个操作本身也要考虑并发。比如,使用 Redis 的 `INCRBY` 命令(带负数)来原子性地减少一个计数器。
思路:
1. 一个全局的 Redis 键 `product:productId:available_stock`,初始值是总库存。
2. 当用户通过验证,准备抢购时,尝试 `DECRBY product:productId:available_stock 1`。
3. 如果返回值大于等于 0,说明预减成功,可以继续后续的数据库操作(此时数据库中的库存可能还没来得及更新,但已经有一个“名额”被锁定)。
4. 如果返回值小于 0,说明已经没有预扣减的名额了,直接拒绝。
5. 后续的数据库操作依然需要事务保证,以处理最终的数据一致性。
分布式锁: 在抢购开始前,对商品加一个分布式锁。只有持有锁的进程或线程才能执行抢购逻辑。
Redis 分布式锁 (SETNX + EXPIRE):
```csharp
// 尝试获取锁
bool lockAcquired = await redisClient.SetAsync($"lock:product:{productId}", "locked_value", TimeSpan.FromSeconds(10), When.NotExists);
if (lockAcquired)
{
try
{
// 抢购逻辑...
}
finally
{
// 释放锁
await redisClient.KeyDeleteAsync($"lock:product:{productId}");
}
}
else
{
// 未获取到锁,说明已经被别人抢占了,或者锁已超时
return "请稍后重试";
}
```
注意: 分布式锁的实现需要谨慎,要考虑锁的过期、续期、以及客户端故障导致死锁等问题。
6. 前端协同:降低后端压力
虽然这是后端技术讨论,但前端的配合同样重要。
秒杀开始前禁用按钮: 在秒杀开始前,按钮应置灰,防止用户在没到时间时就提交请求。
倒计时: 精确的倒计时能让用户有预期,避免在错误时间发起请求。
请求节流 (Throttling): 前端可以对用户的点击行为进行节流,例如,用户连续点击按钮,只在短时间内发送一个请求。
总结一下,控制 C 秒杀并发的核心在于:
1. 原子性: 确保“读取判断修改”这一系列动作不可分割。数据库事务是关键。
2. 隔离性: 阻止其他请求干扰正在进行的抢购操作。锁(数据库行锁、分布式锁)或版本号是手段。
3. 过滤: 拒绝无效或恶意的请求。速率限制是必不可少的。
4. 解耦: 将高并发的请求处理异步化。消息队列是常用方案。
最终的秒杀系统,往往是这些策略的组合使用。 比如,先用速率限制过滤掉大部分无效请求,然后对通过验证的用户使用分布式锁或预减库存来锁定名额,再通过数据库事务来完成最终的扣减和订单创建,同时将高并发的请求放入消息队列由后台工作者处理。每一步都像一个关卡,确保只有真正有能力且按规则来的用户,才能成功抢到商品。