tornado_process
tornado中使用多进程处理任务
在最近的工作中遇到需要在服务中有重cpu计算的任务
现有两种方案选择
celery 实现分布式计算
程序内部实现多进程消费
考虑到轻量级以及响应速度选着后者
原因如下:
该项目中需要传输数据较大,celery在消息处理上消耗较大
搞项目初始化需要加载较多资源,这里不太了解celery是否可以有全局的配置
celery 涉及服务组件较多,维护成本较高
代码展示
run_on_executor_decorator
这个方法只适用于多线程 (这里涉及到进程之间的序列化)
1 2 3 4 5 6 7 8 9 10 11 12 13
| def run_on_executor_decorator(fn): executor = kwargs.get("executor", "executor") io_loop = kwargs.get("io_loop", "io_loop")
@functools.wraps(fn) def wrapper(self, *args, **kwargs): callback = kwargs.pop("callback", None) future = getattr(self, executor).submit(fn, self, *args, **kwargs) if callback: getattr(self, io_loop).add_future( future, lambda future: callback(future.result())) return future
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import functools import os import time from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import tornado.ioloop import tornado.web
def do_something(*args): print(args) time.sleep(5) print('fafasfasfacscs')
class FutureHandler(tornado.web.RequestHandler): executor = ProcessPoolExecutor(10)
@tornado.web.asynchronous @tornado.gen.coroutine def get(self, *args, **kwargs):
url = 'www.google.com'
self.executor.submit(do_something, url) print('works') self.finish('It works')
def do_something(self): pass
application = tornado.web.Application([ (r"/", FutureHandler), ])
if __name__ == "__main__": application.listen(7777) tornado.ioloop.IOLoop.instance().start()
|
参考资料
关于并行计算可以参考
Python并行编程 中文版
TODO
async 的方式如何使用
分布式计算的实现