我们在Sanic HTTP 响应中已经学习了响应的流式传输。实际上,Sanic还支持请求的流式传输。
请求流
下面的例子,当请求结束,await request.stream.read()
返回 None
。只有post, put, patch
修饰器有流参数。
from sanic import Sanic
from sanic.views import CompositionView
from sanic.views import HTTPMethodView
from sanic.views import stream as stream_decorator
from sanic.blueprints import Blueprint
from sanic.response import stream, text
bp = Blueprint('blueprint_request_stream')
app = Sanic('request_stream')
class SimpleView(HTTPMethodView):
@stream_decorator
async def post(self, request):
result = ''
while True:
body = await request.stream.read()
if body is None:
break
result += body.decode('utf-8')
return text(result)
@app.post('/stream', stream=True)
async def handler(request):
async def streaming(response):
while True:
body = await request.stream.read()
if body is None:
break
body = body.decode('utf-8').replace('1', 'A')
await response.write(body)
return stream(streaming)
@bp.put('/bp_stream', stream=True)
async def bp_put_handler(request):
result = ''
while True:
body = await request.stream.read()
if body is None:
break
result += body.decode('utf-8').replace('1', 'A')
return text(result)
# You can also use `bp.add_route()` with stream argument
async def bp_post_handler(request):
result = ''
while True:
body = await request.stream.read()
if body is None:
break
result += body.decode('utf-8').replace('1', 'A')
return text(result)
bp.add_route(bp_post_handler, '/bp_stream', methods=['POST'], stream=True)
async def post_handler(request):
result = ''
while True:
body = await request.stream.read()
if body is None:
break
result += body.decode('utf-8')
return text(result)
app.blueprint(bp)
app.add_route(SimpleView.as_view(), '/method_view')
view = CompositionView()
view.add(['POST'], post_handler, stream=True)
app.add_route(view, '/composition_view')
if __name__ == '__main__':
app.run(host='127.0.0.1', port=8000)
响应流
Sanic 允许我们使用stream
方法向客户端流式传输内容。该方法接受一个带有StreamingHTTPResponse
对象参数用以写操作的协程回调函数。比如下面的例子:
from sanic import Sanic
from sanic.response import stream
app = Sanic(__name__)
@app.route("/")
async def test(request):
async def sample_streaming_fn(response):
await response.write('foo,')
await response.write('bar')
return stream(sample_streaming_fn, content_type='text/csv')
这在将源自外部服务(如数据库)的内容流式传输给客户端的情况下非常有用。例如,我们可以使用asyncpg
提供的异步游标将数据库记录流式传输到客户端:
@app.route("/")
async def index(request):
async def stream_from_db(response):
conn = await asyncpg.connect(database='test')
async with conn.transaction():
async for record in conn.cursor('SELECT generate_series(0, 10)'):
await response.write(record[0])
return stream(stream_from_db)

我的公众号:猿人学 Python 上会分享更多心得体会,敬请关注。
***版权申明:若没有特殊说明,文章皆是猿人学 yuanrenxue.con 原创,没有猿人学授权,请勿以任何形式转载。***
说点什么吧...