博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用python的分布式任务队列huey实现任务的异步化
阅读量:6587 次
发布时间:2019-06-24

本文共 8521 字,大约阅读时间需要 28 分钟。

前言:

    一个轻型的任务队列,功能和相关的broker没有celery强大,重在轻型,而且代码读起来也比较的简单。 

这次算是原文的翻译了....  一开始看到这个东西的时候,想看看有没有中文的资料,能立马的入门,结果一看老外用的倒是挺多的,算了 既然是看文档,顺便也按照自己的意思翻译下把。 

关于huey的介绍:  (比celery轻型,比mrq、rq要好用 !)

a lightweight alternative.

  • written in python

  • no deps outside stdlib, except redis (or roll your own backend)

  • support for django

supports:

  • multi-threaded task execution

  • scheduled execution at a given time

  • periodic execution, like a crontab

  • retrying tasks that fail

  • task result storage

安装:

Installinghuey can be installed very easily using pip.pip install hueyhuey has no dependencies outside the standard library, but currently the only fully-implemented queue backend it ships with requires redis. To use the redis backend, you will need to install the python client.pip install redisUsing gitIf you want to run the very latest, feel free to pull down the repo from github and install by hand.git clone https://github.com/coleifer/huey.gitcd hueypython setup.py installYou can run the tests using the test-runner:python setup.py test

关于huey的api,下面有详细的介绍及参数介绍的。 

from huey import RedisHuey, crontabhuey = RedisHuey('my-app', host='redis.myapp.com')@huey.task()def add_numbers(a, b):    return a + b@huey.periodic_task(crontab(minute='0', hour='3'))def nightly_backup():    sync_all_data()

juey作为woker的时候,一些cli参数。 

常用的是:  

-l                  关于日志文件的执行 。

-w                 workers的数目,-w的数值大了,肯定是增加任务的处理能力

-p --periodic     启动huey worker的时候,他会从tasks.py里面找到 需要crontab的任务,会派出几个线程专门处理这些事情。 

-n                  不启动关于crontab里面的预周期执行,只有你触发的时候,才会执行周期星期的任务。 

--threads   意思你懂的。

# 原文:     The following table lists the options available for the consumer as well as their default values.-l, --logfilePath to file used for logging. When a file is specified, by default Huey will use a rotating file handler (1MB / chunk) with a maximum of 3 backups. You can attach your own handler (huey.logger) as well. The default loglevel is INFO.-v, --verboseVerbose logging (equates to DEBUG level). If no logfile is specified and verbose is set, then the consumer will log to the console. This is very useful for testing/debugging.-q, --quietOnly log errors. The default loglevel for the consumer is INFO.-w, --workersNumber of worker threads, the default is 1 thread but for applications that have many I/O bound tasks, increasing this number may lead to greater throughput.-p, --periodicIndicate that this consumer process should start a thread dedicated to enqueueing “periodic” tasks (crontab-like functionality). This defaults to True, so should not need to be specified in practice.-n, --no-periodicIndicate that this consumer process should not enqueue periodic tasks.-d, --delayWhen using a “polling”-type queue backend, the amount of time to wait between polling the backend. Default is 0.1 seconds.-m, --max-delayThe maximum amount of time to wait between polling, if using weighted backoff. Default is 10 seconds.-b, --backoffThe amount to back-off when polling for results. Must be greater than one. Default is 1.15.-u, --utcIndicates that the consumer should use UTC time for all tasks, crontabs and scheduling. Default is True, so in practice you should not need to specify this option.--localtimeIndicates that the consumer should use localtime for all tasks, crontabs and scheduling. Default is False.ExamplesRunning the consumer with 8 threads, a logfile for errors only, and a very short polling interval:huey_consumer.py my.app.huey -l /var/log/app.huey.log -w 8 -b 1.1 -m 1.0

任务队列huey 是靠着redis来实现queue的任务存储,所以需要咱们提前先把redis-server和redis-py都装好。 安装的方法就不说了,自己搜搜吧。 

我们首先创建下huey的链接实例 :

# config.pyfrom huey import Hueyfrom huey.backends.redis_backend import RedisBlockingQueuequeue = RedisBlockingQueue('test-queue', host='localhost', port=6379)huey = Huey(queue)

然后就是关于任务的,也就是你想让谁到任务队列这个圈子里面,和celey、rq,mrq一样,都是用tasks.py表示的。

原文:   

from config import huey # import the huey we instantiated in config.py@huey.task()def count_beans(num):    print '-- counted %s beans --' % num

再来一个真正去执行的 。  main.py 相当于生产者,tasks.py相当于消费者的关系。  main.py负责喂数据。 

原文:   

 main.pyfrom config import huey  # import our "huey" objectfrom tasks import count_beans  # import our taskif __name__ == '__main__':    beans = raw_input('How many beans? ')    count_beans(int(beans))    print 'Enqueued job to count %s beans' % beans

Ensure you have Redis running locally

Ensure you have installed huey

Start the consumer: huey_consumer.py main.huey (notice this is “main.huey” and not “config.huey”).

Run the main program: python main.py

和celery、rq一样,他的结果获取是需要在你的config.py或者主代码里面指明他的存储的方式,现在huey还仅仅是支持redis,但相对他的特点和体积,这已经很足够了 !

只是那几句话而已,导入RedisDataStore库,申明下存储的地址。 

from huey import Hueyfrom huey.backends.redis_backend import RedisBlockingQueuefrom huey.backends.redis_backend import RedisDataStore  # ADD THIS LINEqueue = RedisBlockingQueue('test-queue', host='localhost', port=6379)result_store = RedisDataStore('results', host='localhost', port=6379)  # ADDEDhuey = Huey(queue, result_store=result_store) # ADDED result store

这个时候,我们在ipython再次去尝试的时候,会发现可以获取到tasks.py里面的return值了 其实你在main.py里面获取的时候,他还是通过uuid从redis里面取出来的。 

>>> from main import count_beans>>> res = count_beans(100)>>> res  # what is "res" ?
>>> res.get()  # get the result of this task'Counted 100 beans'

huey也是支持celey的延迟执行和crontab的功能 。  这些功能很是重要,可以自定义的优先级或者不用再借助linux本身的crontab。

用法很简单,多加一个delay的时间就行了,看了下huey的源码,他默认是立马执行的。当然还是要看你的线程是否都是待执行的状态了。 

>>> import datetime>>> res = count_beans.schedule(args=(100,), delay=60)>>> res
>>> res.get()  # this returns None, no data is ready>>> res.get()  # still no data...>>> res.get(blocking=True)  # ok, let's just block until its ready'Counted 100 beans'

再来一个重试retry的介绍,huey也是有retry,这个很是实用的东西。 如果大家有看到我的上面文章关于celery重试机制的介绍,应该也能明白huey是个怎么个回事了。  是的,他其实也是在tasks里具体函数的前面做了装饰器,装饰器里面有个func try 异常重试的逻辑 。 大家懂的。 

# tasks.pyfrom datetime import datetimefrom config import huey@huey.task(retries=3, retry_delay=10)def try_thrice():    print 'trying....%s' % datetime.now()    raise Exception('nope')

huey是给你反悔的机会饿 ~  也就是说,你做了deley的计划任务后,如果你又想取消,那好看,直接revoke就可以了。 

# count some beansres = count_beans(10000000)res.revoke()The same applies to tasks that are scheduled in the future:res = count_beans.schedule(args=(100000,), eta=in_the_future)res.revoke()@huey.task(crontab(minute='*'))def print_time():    print datetime.now()

task() - 透明的装饰器,让你的函数变得优美点。 

periodic_task() - 这个是周期性的任务

crontab() - 启动worker的时候,附带的crontab的周期任务。 

BaseQueue - 任务队列

BaseDataStore - 任务执行后,可以把 结果塞入进去。  BAseDataStore可以自己重写。 

原文:  http://xiaorui.cc

官方的huey的git库里面是提供了相关的测试代码的: 

main.py

from config import hueyfrom tasks import count_beansif __name__ == '__main__':    beans = raw_input('How many beans? ')    count_beans(int(beans))    print('Enqueued job to count %s beans' % beans)

tasks.py

import randomimport timefrom huey import crontabfrom config import huey@huey.task()def count_beans(num):    print "start..."    print('-- counted %s beans --' % num)    time.sleep(3)    print "end..."    return 'Counted %s beans' % num@huey.periodic_task(crontab(minute='*/5'))def every_five_mins():    print('Consumer prints this every 5 mins')@huey.task(retries=3, retry_delay=10)def try_thrice():    if random.randint(1, 3) == 1:        print('OK')    else:        print('About to fail, will retry in 10 seconds')        raise Exception('Crap something went wrong')@huey.task()def slow(n):    time.sleep(n)    print('slept %s' % n)

run.sh

#!/bin/bashecho "HUEY CONSUMER"echo "-------------"echo "In another terminal, run 'python main.py'"echo "Stop the consumer using Ctrl+C"PYTHONPATH=.:$PYTHONPATHpython ../../huey/bin/huey_consumer.py main.huey --threads=2

=>

咱们可以先clone下huey的代码库。 里面有个examples例子目录,可以看到他是支持django的,但是这不是重点 !

[xiaorui@devops /tmp ]$ git clone https://github.com/coleifer/huey.gitCloning into 'huey'...remote: Counting objects: 1423, done.remote: Compressing objects: 100% (9/9), done.Receiving objects:  34% (497/1423), 388.00 KiB | 29.00 KiB/s   KiB/sReceiving objects:  34% (498/1423), 628.00 KiB | 22.00 KiB/sremote: Total 1423 (delta 0), reused 0 (delta 0)Receiving objects: 100% (1423/1423), 2.24 MiB | 29.00 KiB/s, done.Resolving deltas: 100% (729/729), done.Checking connectivity... done.[xiaorui@devops /tmp ]$cd huey/examples/simple[xiaorui@devops simple (master)]$ lltotal 40-rw-r--r--  1 xiaorui  wheel    79B  9  8 08:49 README-rw-r--r--  1 xiaorui  wheel     0B  9  8 08:49 __init__.py-rw-r--r--  1 xiaorui  wheel    56B  9  8 08:49 config.py-rwxr-xr-x  1 xiaorui  wheel   227B  9  8 08:49 cons.sh-rw-r--r--  1 xiaorui  wheel   205B  9  8 08:49 main.py-rw-r--r--  1 xiaorui  wheel   607B  9  8 08:49 tasks.py[xiaorui@devops simple (master)]$

转载地址:http://jhqno.baihongyu.com/

你可能感兴趣的文章
分布式搜索引擎Elasticsearch的查询与过滤
查看>>
SolidEdge 工程图中如何给零件添加纹理或贴图
查看>>
【Java面试题】14 super.getClass()方法调用
查看>>
六种流行的语言---C、C++、python、Java、php、C#比较[转]
查看>>
AP INVOICES IMPORT API(NOT request)
查看>>
怎样面试程序猿
查看>>
Redhat6.5安装DB2 Express-C版本
查看>>
php的http数据传输get/post...
查看>>
【剑指Offer面试题】 九度OJ1368:二叉树中和为某一值的路径
查看>>
checkbox的name与JavaBean的交互时发现的一个现象
查看>>
基于Token的身份验证——JWT(转)
查看>>
Maven(五)之Maven配置阿里云镜像飞快下jar包
查看>>
Mysql加锁过程详解(5)-innodb 多版本并发控制原理详解
查看>>
script 里写 html 模版
查看>>
vue2.0 + vux (三)MySettings 页
查看>>
ASP.NET Core 使用 Alipay.AopSdk.Core 常见问题解答
查看>>
spring @Value 设置默认值
查看>>
带你从零学ReactNative开发跨平台App开发(十一)
查看>>
java 生成zip文件并导出
查看>>
atitit.userService 用户系统设计 v4 q316 .doc
查看>>