Tornado 异步框架:长连接与 WebSocket
Tornado 异步框架入门:长连接与 WebSocket 实战
Tornado 是一个用 Python 编写的可扩展、非阻塞式 Web 服务器与异步网络库。它最初为 FriendFeed 而开发,因其出色的长连接处理能力而被广泛用于实时应用、WebSocket 服务和高并发场景。本教程面向初学者,从 Tornado 的异步核心讲起,逐步掌握如何构建支持长轮询和 WebSocket 的实时通信服务。
为什么选择 Tornado 处理长连接
传统同步 Web 框架(如 Flask、Django 默认模式)一个请求对应一个线程或进程。当客户端保持大量长连接时,操作系统资源会被迅速耗尽。Tornado 使用单线程事件循环和 epoll/kqueue 等 I/O 多路复用机制,一个进程即可维护数万个空闲连接,只在连接真正有数据时才会触发回调处理。
安装与环境
确保你的 Python 版本为 3.8 或更高,然后使用 pip 安装 Tornado:
pip install tornado
验证安装:
import tornado
print(tornado.version) # 应输出类似 '6.3.3' 的版本号
理解 Tornado 异步核心
事件循环与协程
Tornado 的所有异步操作都依赖 IOLoop(事件循环)。在较新版本中,推荐使用原生的 async/await 语法编写异步代码。
一个最简单的异步请求处理器:
import tornado.ioloop
import tornado.web
import asyncio
class MainHandler(tornado.web.RequestHandler):
async def get(self):
# 模拟一个异步操作(如数据库查询、HTTP 请求)
await asyncio.sleep(0.5)
self.write("Hello, async world!")
def make_app():
return tornado.web.Application([
(r"/", MainHandler),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
访问 http://localhost:8888,0.5 秒后返回字符串。整个过程中服务器并未阻塞,可同时处理其他请求。
非阻塞的关键
Tornado 通过 async/await 配合特殊的异步客户端(如 AsyncHTTPClient)实现非阻塞 I/O。任何可能导致阻塞的库(如标准 time.sleep()、同步数据库驱动)都会破坏事件循环,必须替换为异步版本或用线程池执行。
长轮询:长连接的基础形态
长轮询是 WebSocket 普及前常用的实时通信方案:客户端发送 HTTP 请求,服务器挂起该请求直到有新数据才返回响应。客户端收到响应后立即再发起一个新请求,形成近乎实时的推送效果。
实现一个简单的长轮询聊天室
我们使用 Tornado 实现一个基于长轮询的消息等待接口。服务端保存一条最新消息,客户端通过 /poll 接口阻塞等待新消息。
import tornado.ioloop
import tornado.web
import tornado.escape
import asyncio
# 存储当前等待的 Future 对象
waiters = set()
latest_message = None
class PollHandler(tornado.web.RequestHandler):
async def get(self):
# 创建一个 Future 并加入等待集合
future = asyncio.get_event_loop().create_future()
waiters.add(future)
# 如果已经有消息,立即返回
if latest_message is not None:
self.write({"message": latest_message})
waiters.remove(future)
return
# 否则挂起请求,直到有消息被推送
try:
message = await future
self.write({"message": message})
finally:
waiters.discard(future)
class SendHandler(tornado.web.RequestHandler):
async def post(self):
global latest_message
msg = self.get_argument("msg")
latest_message = msg
# 通知所有等待的轮询请求
for future in list(waiters):
future.set_result(msg)
waiters.clear()
self.write({"status": "sent"})
def make_app():
return tornado.web.Application([
(r"/poll", PollHandler),
(r"/send", SendHandler),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
print("Server running on port 8888")
tornado.ioloop.IOLoop.current().start()
- PollHandler:客户端 GET 请求到达后,创建一个
asyncio.Future对象。如果当前没有新消息,则await future会挂起请求。当有人发送消息时,SendHandler对所有等待的future调用set_result(),从而唤醒所有挂起的请求,返回消息。 - SendHandler:接收 POST 参数
msg,更新全局变量并通知所有等待者。
这种方式能支撑较多客户端,但每个长连接仍然会占用一个 HTTP 请求通道,且每次消息推送后客户端需要重新连接。WebSocket 提供了更高效的双向通道。
WebSocket:全双工实时通信
WebSocket 是 HTML5 开始提供的在单个 TCP 连接上进行全双工通讯的协议。Tornado 原生支持 WebSocket,通过继承 tornado.websocket.WebSocketHandler 即可。使用 WebSocket 后,服务器可以主动向客户端推送数据,且无需反复重建连接。
基础 WebSocket 处理器
import tornado.ioloop
import tornado.web
import tornado.websocket
class EchoWebSocket(tornado.websocket.WebSocketHandler):
# 维护所有连接的客户端
connections = set()
def open(self):
print("WebSocket opened")
self.connections.add(self)
def on_message(self, message):
# 回显消息给发送者
self.write_message(f"Echo: {message}")
def on_close(self):
self.connections.remove(self)
print("WebSocket closed")
# 可选:检查来源,允许跨域
def check_origin(self, origin):
return True # 在生产环境中应根据需要限制
def make_app():
return tornado.web.Application([
(r"/ws", EchoWebSocket),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
客户端测试(浏览器控制台):
const ws = new WebSocket("ws://localhost:8888/ws");
ws.onopen = () => ws.send("Hello!");
ws.onmessage = (e) => console.log(e.data);
构建 WebSocket 实时广播聊天室
将前面的长轮询改造为 WebSocket 版本:所有连接的客户端都会收到别人发送的消息。
class ChatWebSocket(tornado.websocket.WebSocketHandler):
clients = set()
def open(self):
self.clients.add(self)
print(f"New client connected, total: {len(self.clients)}")
def on_message(self, message):
# 向所有客户端广播(包括发送者)
for client in self.clients:
client.write_message(message)
def on_close(self):
self.clients.remove(self)
print(f"Client disconnected, total: {len(self.clients)}")
def check_origin(self, origin):
return True
处理 Ping/Pong 与心跳
WebSocket 协议内置 Ping/Pong 帧用于保持连接存活和检测断开。Tornado 在默认情况下会自动回复 Ping 帧,但我们可以添加服务端主动 Ping 的逻辑来确保连接健康。
class ChatWebSocket(tornado.websocket.WebSocketHandler):
clients = set()
def open(self):
self.clients.add(self)
# 每 30 秒发送一次 Ping
self.ping_interval = 30
# 允许 3 次未收到 Pong 后主动断开
self.ping_timeout = 10
def on_pong(self, data):
# 可选:记录 Pong 延迟等
pass
def on_message(self, message):
# 广播消息
for c in self.clients:
if c != self: # 可选择不发给自己
c.write_message(message)
Tornado 在 write_message 时会自动处理文本/二进制帧,也可以传递字典等对象,它会自动 JSON 序列化。
生产环境建议
- 反向代理:使用 Nginx 做前端代理,处理静态文件、SSL 终止,并转发 WebSocket 请求给 Tornado。Nginx 配置需支持
Upgrade头。 - 部署方式:Tornado 单进程也能处理大量长连接,但可利用多进程(
fork_processes)绑定同一端口提升多核 CPU 利用率。 - 监控与日志:开启 Tornado 的日志记录,监控当前连接数、内存使用。
- 安全考虑:务必校验 WebSocket 的来源 origin,防止跨站 WebSocket 劫持;对消息内容进行转义或安全处理。
高级话题:结合 async/await 实现复杂业务
在现代 Tornado 中,所有异步操作都可以统一在 async/await 下进行。例如,在 WebSocket 处理函数中异步查询数据库:
class DataWebSocket(tornado.websocket.WebSocketHandler):
async def on_message(self, message):
# 假设有一个异步数据库访问函数
result = await async_db_query(message)
self.write_message(result)
Tornado 自身不提供异步数据库驱动,你需要使用如 aiomysql、asyncpg、motor(MongoDB)等库,它们与 Tornado 事件循环完美兼容。
快速对比:长轮询 vs WebSocket
| 特性 | 长轮询 | WebSocket |
|---|---|---|
| 通信方向 | 客户端拉取 | 全双工,服务器可主动推送 |
| 连接开销 | 每次请求建立新 HTTP 连接 | 一次握手,连接持续保持 |
| 实时性 | 依赖轮询间隔,近实时 | 即时推送 |
| 浏览器兼容 | 所有浏览器 | 现代浏览器,IE10+ |
| 服务器资源 | 中等,连接为短时挂起 | 节省资源,但需维护长连接 |
下一步学习
- 阅读 Tornado 官方文档中的异步网络编程指南。
- 尝试使用 Tornado 的
tornado.gen模块(兼容旧式协程写法)。 - 将 WebSocket 结合 Redis Pub/Sub 实现跨进程实时推送。
- 了解
tornado.httpclient.AsyncHTTPClient进行非阻塞外部 API 调用。
通过本教程,你已经掌握了 Tornado 异步框架处理长连接和 WebSocket 的核心方法。构建一个支持高并发实时通信的服务,正是 Tornado 最擅长的领域。