API 速率限制:模式和最佳实践
每个公共 API 端点都是一个目标——机器人、抓取工具和不良行为者将在您上线时攻击您的服务器。如果没有速率限制,单个行为不当的客户端可能会耗尽您的数据库连接,增加您的云费用,并停止为每个合法用户提供服务。速率限制不是可选的;它是任何生产 API 的第一道防线。
本指南介绍了四种主要的速率限制算法、它们的权衡,以及如何在 NestJS 和 Redis 中正确实现它们。您将获得常见场景的复制粘贴配置以及用于为每个端点选择正确策略的心理模型。
要点
- 令牌桶允许受控突发,而固定窗口计数器的实现成本最低
- 滑动窗口日志最准确但占用内存;推拉窗柜台是最好的平衡
- 当您运行多个 API 副本时,Redis 是唯一正确的后备存储
- NestJS
@nestjs/throttler支持自定义存储适配器 - 通过一次配置更改即可交换到 Redis- 始终返回
Retry-After、X-RateLimit-Limit、X-RateLimit-Remaining和X-RateLimit-Reset标头- 按端点敏感度区分限制:身份验证(5/分钟)与读取 API(1000/分钟)
- 对匿名流量使用基于 IP 的限制,对经过身份验证的请求使用基于用户的限制
- 永远不要默默地删除请求 - 始终返回
429 Too Many Requests并附上有用的消息
四大核心算法
固定窗口柜台
最简单的方法:在固定时间窗口内对请求进行计数,在边界处重置。
// Fixed window: 100 requests per minute
// Window resets at :00, :01, :02 ...
const windowKey = `ratelimit:${userId}:${Math.floor(Date.now() / 60000)}`;
const count = await redis.incr(windowKey);
await redis.expire(windowKey, 60);
if (count > 100) {
throw new TooManyRequestsException();
}
弱点:边界利用。客户端可以在 12:00:59 发送 100 个请求,并在 12:01:00 发送另外 100 个请求 — 两秒内有效发送 200 个请求。对于大多数 API,这是可以接受的。对于身份验证端点来说,情况并非如此。
滑动窗口日志
存储每个请求的时间戳。对于每个请求,计算最后一个窗口中的时间戳。
const now = Date.now();
const windowStart = now - 60_000; // 60 seconds ago
// Remove old entries, add current
await redis.zremrangebyscore(key, 0, windowStart);
await redis.zadd(key, now, now.toString());
const count = await redis.zcard(key);
await redis.expire(key, 60);
if (count > 100) {
throw new TooManyRequestsException();
}
权衡:完全准确,但每个用户存储 O(n) 个条目,其中 n 是请求计数。在 10,000 个用户的 1,000 RPS 下,您的 Redis 内存会快速攀升。用于低容量、高安全性端点,例如密码重置。
滑动窗口计数器
使用两个固定窗口的近似滑动窗口——无内存爆炸。
const now = Date.now();
const currentWindow = Math.floor(now / 60000);
const previousWindow = currentWindow - 1;
const windowProgress = (now % 60000) / 60000; // 0.0 to 1.0
const [current, previous] = await redis.mget(
`rl:${userId}:${currentWindow}`,
`rl:${userId}:${previousWindow}`
);
const estimated =
(parseInt(previous ?? '0') * (1 - windowProgress)) +
parseInt(current ?? '0');
if (estimated >= 100) {
throw new TooManyRequestsException();
}
await redis.incr(`rl:${userId}:${currentWindow}`);
await redis.expire(`rl:${userId}:${currentWindow}`, 120);
这是 Cloudflare 使用的算法。它以最小的开销平滑边界峰值——每个窗口每个用户两个 Redis 键。
令牌桶
允许爆发同时保持长期利率的黄金标准。每个用户都有一个以恒定速率填充的桶。请求消耗令牌。
async function consumeToken(
redis: Redis,
userId: string,
ratePerSec: number,
capacity: number
): Promise<boolean> {
const now = Date.now() / 1000;
const key = `bucket:${userId}`;
const values = await redis.hmget(key, 'tokens', 'lastRefill');
const currentTokens = parseFloat(values[0] ?? String(capacity));
const lastRefillTime = parseFloat(values[1] ?? String(now));
// Refill tokens based on elapsed time
const elapsed = now - lastRefillTime;
const refilled = Math.min(capacity, currentTokens + elapsed * ratePerSec);
if (refilled < 1) {
return false; // No tokens available
}
await redis.hset(key, 'tokens', String(refilled - 1), 'lastRefill', String(now));
await redis.expire(key, Math.ceil(capacity / ratePerSec) + 60);
return true;
}
令牌桶非常适合需要允许短突发(一次上传 10 个文件)同时防止持续滥用的 API。
NestJS 节流器配置
@nestjs/throttler v5 附带 Redis 存储适配器。这是一个生产就绪的设置:
pnpm add @nestjs/throttler @nestjs/throttler-storage-redis ioredis
// app.module.ts
import { ThrottlerModule, ThrottlerGuard } from '@nestjs/throttler';
import { ThrottlerStorageRedisService } from '@nestjs/throttler-storage-redis';
import { APP_GUARD } from '@nestjs/core';
@Module({
imports: [
ThrottlerModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (config: ConfigService) => ({
throttlers: [
{ name: 'short', ttl: 1000, limit: 5 }, // 5 req/sec burst
{ name: 'medium', ttl: 60000, limit: 300 }, // 300 req/min
{ name: 'long', ttl: 3600000, limit: 5000 }, // 5000 req/hr
],
storage: new ThrottlerStorageRedisService({
host: config.get('REDIS_HOST'),
port: config.get('REDIS_PORT'),
}),
}),
}),
],
providers: [
{
provide: APP_GUARD,
useClass: ThrottlerGuard,
},
],
})
export class AppModule {}
覆盖每个控制器或路线的限制:
@Controller('auth')
export class AuthController {
// Authentication: very strict — 5 attempts per minute
@Post('login')
@Throttle({ medium: { ttl: 60000, limit: 5 } })
async login(@Body() dto: LoginDto) { /* ... */ }
// Refresh: moderate — 30 per minute
@Post('refresh')
@Throttle({ medium: { ttl: 60000, limit: 30 } })
async refresh(@Body() dto: RefreshDto) { /* ... */ }
// Skip throttling on the exchange endpoint (protected by one-time code TTL)
@Post('exchange')
@SkipThrottle()
async exchange(@Body() dto: ExchangeDto) { /* ... */ }
}
自定义密钥生成器
默认情况下,NestJS 节流器使用客户端 IP。在 Nginx/Cloudflare 后面的生产中,您需要 X-Real-IP 或 CF-Connecting-IP。
// throttler-behind-proxy.guard.ts
import { ThrottlerGuard } from '@nestjs/throttler';
import { Injectable, ExecutionContext } from '@nestjs/common';
@Injectable()
export class ThrottlerBehindProxyGuard extends ThrottlerGuard {
protected async getTracker(req: Record<string, any>): Promise<string> {
// Authenticated user — use userId for accurate per-user limits
if (req.user?.sub) {
return `user:${req.user.sub}`;
}
// Anonymous — use real IP from Cloudflare header
return (
req.headers['cf-connecting-ip'] ||
req.headers['x-real-ip'] ||
(req.headers['x-forwarded-for'] as string)?.split(',')[0] ||
req.ip
);
}
protected async throwThrottlingException(
context: ExecutionContext,
throttlerLimitDetail: ThrottlerLimitDetail
): Promise<void> {
const response = context.switchToHttp().getResponse();
response.header(
'Retry-After',
Math.ceil(throttlerLimitDetail.ttl / 1000)
);
await super.throwThrottlingException(context, throttlerLimitDetail);
}
}
响应头
RFC 6585 和草案 RateLimit 标头告诉客户端何时重试:
// rate-limit.interceptor.ts
import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from '@nestjs/common';
import { Observable, tap } from 'rxjs';
@Injectable()
export class RateLimitHeadersInterceptor implements NestInterceptor {
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
return next.handle().pipe(
tap(() => {
const res = context.switchToHttp().getResponse();
const req = context.switchToHttp().getRequest();
// Values injected by ThrottlerGuard after evaluation
if (req.rateLimit) {
res.set({
'X-RateLimit-Limit': req.rateLimit.limit,
'X-RateLimit-Remaining': Math.max(
0,
req.rateLimit.limit - req.rateLimit.current
),
'X-RateLimit-Reset': new Date(
Date.now() + req.rateLimit.ttl
).toISOString(),
'RateLimit-Policy': `${req.rateLimit.limit};w=${Math.ceil(
req.rateLimit.ttl / 1000
)}`,
});
}
})
);
}
}
特定于端点的策略
不同的端点需要不同的限制。以下是常见模式的参考表:
| 端点类型 | 算法 | 限制 | 窗口 |
|---|---|---|---|
| 登录/密码重置 | 滑动窗口日志 | 5 | 15 分钟 |
| OTP / 2FA 验证 | 固定窗 | 3 | 10 分钟 |
| 公共读取API | 令牌桶 | 1000 次突发,100 次/秒填充 | — |
| 突变 API(已验证) | 推拉窗柜台 | 300 | 300 1 分钟 |
| Webhook 摄取 | 固定窗 | 10,000 | 1 分钟 |
| 文件上传 | 令牌桶 | 10 次突发,1/秒填充 | — |
| AI / LLM 端点 | 固定窗 | 20 | 1 分钟 |
| 搜索(匿名) | 固定窗 | 30 | 1 分钟 |
用于分布式安全的原子 Lua 脚本
当您有多个 API 副本时,增量检查序列中的竞争条件可能允许超出限制的突发。使用通过 redis.defineCommand 加载的 Lua 脚本使检查和增量原子化:
// rate-limit.service.ts
import { Injectable } from '@nestjs/common';
import Redis from 'ioredis';
@Injectable()
export class RateLimitService {
constructor(private readonly redis: Redis) {
// Define atomic increment+check as a custom Redis command
this.redis.defineCommand('rateLimitCheck', {
numberOfKeys: 1,
lua: `
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local ttlMs = tonumber(ARGV[2])
local count = redis.call('INCR', key)
if count == 1 then
redis.call('PEXPIRE', key, ttlMs)
end
if count > limit then
return {0, redis.call('PTTL', key)}
end
return {1, -1}
`,
});
}
async isAllowed(
key: string,
limit: number,
windowMs: number
): Promise<{ allowed: boolean; retryAfterMs: number }> {
const result = await (this.redis as any).rateLimitCheck(
key, limit, windowMs
) as [number, number];
return {
allowed: result[0] === 1,
retryAfterMs: result[1] > 0 ? result[1] : 0,
};
}
}
优雅降级和绕过策略
速率限制不应阻止内部健康检查、监控代理或值得信赖的合作伙伴。
// trusted-bypass.guard.ts
@Injectable()
export class RateLimitWithBypassGuard extends ThrottlerBehindProxyGuard {
private readonly trustedTokens = new Set([
process.env.MONITORING_TOKEN,
process.env.PARTNER_API_TOKEN,
].filter(Boolean));
async canActivate(context: ExecutionContext): Promise<boolean> {
const req = context.switchToHttp().getRequest();
// Internal health checks bypass all rate limits
if (req.path === '/health' || req.path === '/ready') {
return true;
}
// Trusted API tokens bypass
const token = req.headers['x-bypass-token'];
if (token && this.trustedTokens.has(token)) {
return true;
}
return super.canActivate(context);
}
}
对于渐进速率限制(在硬块之前发出警告),仅在超过限制的 80% 后才返回 429 和 Retry-After 标头:
// In your custom guard, after counting requests:
if (count > limit * 0.9 && count <= limit) {
response.set('X-RateLimit-Warning', 'approaching limit');
}
if (count > limit) {
response.set('Retry-After', retryAfterSeconds.toString());
throw new HttpException('Rate limit exceeded', 429);
}
测试速率限制
// rate-limit.spec.ts
describe('Rate Limiting', () => {
it('should block after limit exceeded', async () => {
const app = moduleRef.createNestApplication();
await app.init();
// Hit the endpoint 5 times (limit for login)
for (let i = 0; i < 5; i++) {
await request(app.getHttpServer())
.post('/auth/login')
.send({ email: '[email protected]', password: 'wrongpassword' })
.expect((res) => expect(res.status).toBeLessThan(429));
}
// 6th request should be blocked
const response = await request(app.getHttpServer())
.post('/auth/login')
.send({ email: '[email protected]', password: 'wrongpassword' });
expect(response.status).toBe(429);
expect(response.headers['retry-after']).toBeDefined();
expect(response.body.message).toContain('rate limit');
});
});
监控和警报
速率限制事件是有价值的信号。将它们记录到您的可观察平台:
@Injectable()
export class RateLimitMetricsService {
async recordRateLimitHit(userId: string, endpoint: string, ip: string) {
await this.metricsService.increment('rate_limit.hits', {
endpoint,
user_type: userId ? 'authenticated' : 'anonymous',
});
// Alert on sustained attacks (>100 hits in 1 min from same IP)
const alertKey = `rl_alert:${ip}`;
const recentHits = await this.redis.incr(alertKey);
if (recentHits === 1) {
await this.redis.expire(alertKey, 60);
}
if (recentHits === 100) {
await this.alertService.send({
severity: 'high',
message: `Rate limit attack detected from IP ${ip}`,
endpoint,
});
}
}
}
创建仪表板跟踪:
- 每个端点的速率限制命中(p95、p99)
- 达到限制的热门 IP/用户
- 被阻止的请求与已服务的请求的百分比
- 重试持续时间以检测配置错误的客户端
常见问题
我应该按IP还是按用户ID进行速率限制?
两者都用。对于未经身份验证的端点,IP 是唯一可用的标识符。对于经过身份验证的端点,始终更喜欢用户 ID — 它更准确,并且可以防止一个共享 IP(如公司 NAT)阻止所有员工。实施两层检查:Nginx 级别的 IP 限制和应用程序级别的用户 ID 限制。
用于速率限制的正确 HTTP 状态代码是什么?
根据 RFC 6585 始终使用 429 Too Many Requests。切勿使用 503 Service Unavailable(意味着基础设施故障)或 403 Forbidden(意味着授权失败)。在几秒钟内包含 Retry-After 作为标头,以便客户端知道何时重试。
如何处理 Cloudflare 或负载均衡器背后的速率限制?
配置您的代理以设置 X-Real-IP 或 CF-Connecting-IP 并仅信任您的代理的 IP 范围。在 Nginx 中:set_real_ip_from 103.21.244.0/22; real_ip_header CF-Connecting-IP;。在 NestJS 中,设置 app.set('trust proxy', 1) 并读取 NestJS 从可信代理标头中解析的 req.ip 。
哪种 Redis 数据结构最适合进行速率限制?
对于固定/滑动窗口计数器,在字符串键上使用 INCR + EXPIRE — 每个请求 O(1)。对于滑动窗口日志,使用排序集 (ZADD, ZREMRANGEBYSCORE, ZCARD) — O(log n)。对于令牌桶,使用哈希 (HSET, HGET) — O(1)。 Lua 脚本使所有三种模式中的所有操作都是原子的。
我应该如何处理来自受信任提供商的 webhook 的速率限制?
Stripe、GitHub 和类似的提供商从已知的 IP 范围发送 Webhook。在 Webhook 摄取端点上维护这些 IP 的 CIDR 范围和绕过速率限制的白名单。首先验证 Webhook 签名 - 签名验证是您的实际安全层,而不是速率限制。
我可以在 Nginx 级别实施速率限制吗?
是的,您应该获得基本的 DDoS 防护。在 Nginx 中使用 limit_req_zone 进行基于 IP 的粗略限制(1000 请求/分钟)。顶层应用程序级速率限制可实现针对每个用户、每个端点的精细控制。这两层相辅相成:Nginx 可以廉价地处理容量攻击,而不会影响您的应用程序,而 NestJS 可以处理微妙的业务逻辑限制。
后续步骤
构建没有强大速率限制的生产 API 就像不锁前门一样。本指南中的模式——用于突发流量的令牌桶、用于平滑执行的滑动窗口计数器、Redis 支持的分布式存储和正确的 429 响应——构成了安全、可扩展 API 的支柱。
ECOSIRE 构建企业级 NestJS 后端,从第一天起就具有速率限制、Redis 缓存和完整的可观察性。如果您要启动新的 API 或强化现有 API,请探索我们的后端工程服务,了解我们如何加快您的交付速度。
作者
ECOSIRE TeamTechnical Writing
The ECOSIRE technical writing team covers Odoo ERP, Shopify eCommerce, AI agents, Power BI analytics, GoHighLevel automation, and enterprise software best practices. Our guides help businesses make informed technology decisions.
相关文章
电子商务人工智能欺诈检测:在不阻止销售的情况下保护收入
实施 AI 欺诈检测,捕获 95% 以上的欺诈交易,同时将误报率控制在 2% 以下。机器学习评分、行为分析和投资回报率指南。
Hepsiburada API 与 Odoo 集成:完整设置指南
通过 API 将 Hepsiburada 与 Odoo ERP 集成的完整指南。在土耳其值得信赖的市场上实现订单、库存和履行的自动化。
Shopify 集成中心:2026 年如何将 Shopify 连接到任何系统
Shopify 集成的完整指南:API、Webhooks、中间件、iPaaS 方法。将 Shopify 连接到 ERP、会计、CRM、市场和 POS 系统。