python的分布式任務huey如何實現異步化任務講解

本文我們來分享一個python的輕型的任務隊列程序,他可以讓python的分布式任務huey實現異步化任務,感興趣的朋友可以看看。

一個輕型的任務隊列,功能和相關的broker沒有celery強大,重在輕型,而且代碼讀起來也比較的簡單。
關于huey的介紹: (比celery輕型,比mrq、rq要好用 !)
a lightweight alternative.
written in python
no deps outside stdlib, except redis (or roll your own backend)
support for django
supports:
multi-threaded task execution
scheduled execution at a given time
periodic execution, like a crontab
retrying tasks that fail
task result storage
安裝:

代碼如下
Installing
huey can be installed very easily using pip.

pip install huey
huey has no dependencies outside the standard library, but currently the only fully-implemented queue backend it ships with requires redis. To use the redis backend, you will need to install the python client.

pip install redis
Using git
If you want to run the very latest, feel free to pull down the repo from github and install by hand.

git clone https://github.com/coleifer/huey.git
cd huey
python setup.py install
You can run the tests using the test-runner:

python setup.py test


關于huey的api,下面有詳細的介紹及參數介紹的。

代碼如下
from huey import RedisHuey, crontab

huey = RedisHuey('my-app', host='redis.myapp.com')

@huey.task()
def add_numbers(a, b):
return a + b

@huey.periodic_task(crontab(minute='0', hour='3'))
def nightly_backup():
sync_all_data()


juey作爲woker的時候,一些cli參數。
常用的是:
-l 關于日志文件的執行 。
-w workers的數目,-w的數值大了,肯定是增加任務的處理能力
-p --periodic 啓動huey worker的時候,他會從tasks.py裏面找到 需要crontab的任務,會派出幾個線程專門處理這些事情。
-n 不啓動關于crontab裏面的預周期執行,只有你觸發的時候,才會執行周期星期的任務。
--threads 意思你懂的。
1

代碼如下
# 原文:
The following table lists the options available for the consumer as well as their default values.

-l, --logfile
Path to file used for logging. When a file is specified, by default Huey will use a rotating file handler (1MB / chunk) with a maximum of 3 backups. You can attach your own handler (huey.logger) as well. The default loglevel is INFO.
-v, --verbose
Verbose logging (equates to DEBUG level). If no logfile is specified and verbose is set, then the consumer will log to the console. This is very useful for testing/debugging.
-q, --quiet
Only log errors. The default loglevel for the consumer is INFO.
-w, --workers
Number of worker threads, the default is 1 thread but for applications that have many I/O bound tasks, increasing this number may lead to greater throughput.
-p, --periodic
Indicate that this consumer process should start a thread dedicated to enqueueing “periodic” tasks (crontab-like functionality). This defaults to True, so should not need to be specified in practice.
-n, --no-periodic
Indicate that this consumer process should not enqueue periodic tasks.
-d, --delay
When using a “polling”-type queue backend, the amount of time to wait between polling the backend. Default is 0.1 seconds.
-m, --max-delay
The maximum amount of time to wait between polling, if using weighted backoff. Default is 10 seconds.
-b, --backoff
The amount to back-off when polling for results. Must be greater than one. Default is 1.15.
-u, --utc
Indicates that the consumer should use UTC time for all tasks, crontabs and scheduling. Default is True, so in practice you should not need to specify this option.
--localtime
Indicates that the consumer should use localtime for all tasks, crontabs and scheduling. Default is False.
Examples

Running the consumer with 8 threads, a logfile for errors only, and a very short polling interval:

huey_consumer.py my.app.huey -l /var/log/app.huey.log -w 8 -b 1.1 -m 1.0


任務隊列huey 是靠著redis來實現queue的任務存儲,所以需要咱們提前先把redis-server和redis-py都裝好。 安裝的方法就不說了,自己搜搜吧。
我們首先創建下huey的鏈接實例 :

代碼如下
# config.py
from huey import Huey
from huey.backends.redis_backend import RedisBlockingQueue

queue = RedisBlockingQueue('test-queue', host='localhost', port=6379)
huey = Huey(queue)


然後就是關于任務的,也就是你想讓誰到任務隊列這個圈子裏面,和celey、rq,mrq一樣,都是用tasks.py表示的。

代碼如下
from config import huey # import the huey we instantiated in config.py


@huey.task()
def count_beans(num):
print '-- counted %s beans --' % num


再來一個真正去執行的 。 main.py 相當于生産者,tasks.py相當于消費者的關系。 main.py負責喂數據。

代碼如下
main.py
from config import huey # import our "huey" object
from tasks import count_beans # import our task


if __name__ == '__main__':
beans = raw_input('How many beans? ')
count_beans(int(beans))
print 'Enqueued job to count %s beans' % beans
Ensure you have Redis running locally
Ensure you have installed huey
Start the consumer: huey_consumer.py main.huey (notice this is “main.huey” and not “config.huey”).
Run the main program: python main.py


和celery、rq一樣,他的結果獲取是需要在你的config.py或者主代碼裏面指明他的存儲的方式,現在huey還僅僅是支持redis,但相對他的特點和體積,這已經很足夠了 !
只是那幾句話而已,導入RedisDataStore庫,申明下存儲的地址。

代碼如下
from huey import Huey
from huey.backends.redis_backend import RedisBlockingQueue
from huey.backends.redis_backend import RedisDataStore # ADD THIS LINE


queue = RedisBlockingQueue('test-queue', host='localhost', port=6379)
result_store = RedisDataStore('results', host='localhost', port=6379) # ADDED

huey = Huey(queue, result_store=result_store) # ADDED result store


這個時候,我們在ipython再次去嘗試的時候,會發現可以獲取到tasks.py裏面的return值了 其實你在main.py裏面獲取的時候,他還是通過uuid從redis裏面取出來的。

代碼如下
>>> from main import count_beans
>>> res = count_beans(100)
>>> res # what is "res" ?
<huey.api.AsyncData object at 0xb7471a4c>
>>> res.get() # get the result of this task
'Counted 100 beans'


huey也是支持celey的延遲執行和crontab的功能 。 這些功能很是重要,可以自定義的優先級或者不用再借助linux本身的crontab。
用法很簡單,多加一個delay的時間就行了,看了下huey的源碼,他默認是立馬執行的。當然還是要看你的線程是否都是待執行的狀態了。

代碼如下
>>> import datetime
>>> res = count_beans.schedule(args=(100,), delay=60)
>>> res
<huey.api.AsyncData object at 0xb72915ec>
>>> res.get() # this returns None, no data is ready
>>> res.get() # still no data...
>>> res.get(blocking=True) # ok, let's just block until its ready
'Counted 100 beans'


python的分布式任務huey如何實現異步化任務講解

再來一個重試retry的介紹,huey也是有retry,這個很是實用的東西。 如果大家有看到我的上面文章關于celery重試機制的介紹,應該也能明白huey是個怎麽個回事了。 是的,他其實也是在tasks裏具體函數的前面做了裝飾器,裝飾器裏面有個func try 異常重試的邏輯 。 大家懂的。

代碼如下
# tasks.py
from datetime import datetime

from config import huey

@huey.task(retries=3, retry_delay=10)
def try_thrice():
print 'trying....%s' % datetime.now()
raise Exception('nope')


python的分布式任務huey如何實現異步化任務講解

huey是給你反悔的機會餓 ~ 也就是說,你做了deley的計劃任務後,如果你又想取消,那好看,直接revoke就可以了。

代碼如下
# count some beans
res = count_beans(10000000)

res.revoke()
The same applies to tasks that are scheduled in the future:

res = count_beans.schedule(args=(100000,), eta=in_the_future)
res.revoke()

@huey.task(crontab(minute='*'))
def print_time():
print datetime.now()


task() - 透明的裝飾器,讓你的函數變得優美點。
periodic_task() - 這個是周期性的任務
crontab() - 啓動worker的時候,附帶的crontab的周期任務。
BaseQueue - 任務隊列
BaseDataStore - 任務執行後,可以把 結果塞入進去。 BAseDataStore可以自己重寫。


官方的huey的git庫裏面是提供了相關的測試代碼的:
main.py

代碼如下
from config import huey
from tasks import count_beans


if __name__ == '__main__':
beans = raw_input('How many beans? ')
count_beans(int(beans))
print('Enqueued job to count %s beans' % beans)


tasks.py

代碼如下
import random
import time
from huey import crontab

from config import huey


@huey.task()
def count_beans(num):
print "start..."
print('-- counted %s beans --' % num)
time.sleep(3)
print "end..."
return 'Counted %s beans' % num

@huey.periodic_task(crontab(minute='*/5'))
def every_five_mins():
print('Consumer prints this every 5 mins')

@huey.task(retries=3, retry_delay=10)
def try_thrice():
if random.randint(1, 3) == 1:
print('OK')
else:
print('About to fail, will retry in 10 seconds')
raise Exception('Crap something went wrong')

@huey.task()
def slow(n):
time.sleep(n)
print('slept %s' % n)


run.sh

代碼如下
#!/bin/bash
echo "HUEY CONSUMER"
echo "-------------"
echo "In another terminal, run 'python main.py'"
echo "Stop the consumer using Ctrl+C"
PYTHONPATH=.:$PYTHONPATH
python ../../huey/bin/huey_consumer.py main.huey --threads=2
=>


咱們可以先clone下huey的代碼庫。 裏面有個examples例子目錄,可以看到他是支持django的,但是這不是重點 !

代碼如下
[[email protected] /tmp ]$ git clone https://github.com/coleifer/huey.git
Cloning into 'huey'...
remote: Counting objects: 1423, done.
remote: Compressing objects: 100% (9/9), done.
Receiving objects: 34% (497/1423), 388.00 KiB | 29.00 KiB/s KiB/s

Receiving objects: 34% (498/1423), 628.00 KiB | 22.00 KiB/s


remote: Total 1423 (delta 0), reused 0 (delta 0)
Receiving objects: 100% (1423/1423), 2.24 MiB | 29.00 KiB/s, done.
Resolving deltas: 100% (729/729), done.
Checking connectivity... done.
[[email protected] /tmp ]$cd huey/examples/simple
[[email protected] simple (master)]$ ll
total 40
-rw-r--r-- 1 xiaorui wheel 79B 9 8 08:49 README
-rw-r--r-- 1 xiaorui wheel 0B 9 8 08:49 __init__.py
-rw-r--r-- 1 xiaorui wheel 56B 9 8 08:49 config.py
-rwxr-xr-x 1 xiaorui wheel 227B 9 8 08:49 cons.sh
-rw-r--r-- 1 xiaorui wheel 205B 9 8 08:49 main.py
-rw-r--r-- 1 xiaorui wheel 607B 9 8 08:49 tasks.py
[[email protected] simple (master)]$


python的分布式任務huey如何實現異步化任務講解

更多相關文章
  • 本文章來爲各位同學介紹一篇關于 基于Django與Celery實現異步對列任務的教程,希望這篇教程能夠爲各位同學帶來幫助,具體操作如下所示. 在運營系統中經常用到異步方式來處理我們的任務,比如將業務上線流程串成任務再寫入隊列,通過後台作業節點去調度執行.比較典型的案例爲騰訊的藍鯨.織雲.雲智慧等平台 ...
  • PyQt是一個創建GUI應用程序的工具包.它是Python編程語言和Qt庫的成功融合.Qt庫是目前最強大的庫之一.PyQt是由Phil Thompson 開發. PyQt實現了一個Python模塊集.下面來看看PyQ
  • 來自國外的一款優秀jQuery插件,主要功能是批量上傳文件,此插件在專案中已被廣泛之用,uploadfiy這個插件是基于js裏面的jquery庫寫的.結合了ajax和flash,實現了這個多線程上傳的功能.1.首先到官網下載Uploadify插件.2.在頁面引入uploadify.css.jquer ...
  • 文章介紹了在Linux中的Crontab實現PHP計劃任務和php獨立的實現計劃任務,他們兩各有各的特點,有需要了解的朋友可以參考一下.PHP程序有時候需要定時執行,使用Linux的Crontab執行PHP腳本,完成PHP計劃任務.譬如,微博上用得很多的一款APP應用——皮皮時光機,就是實現定時發送
  • 本文章來給大家介紹關于rsync實現linux文件同步到windows配置詳解,希望此方法對各位同學會有所幫助呀.說明:本篇文章實現linux定時將www,msyql,svn目錄rsync到win2003指定目錄上.win2003安裝CWRsync,做爲rsync的server端,運行CWRsync
  • 本文章介紹一篇關于Sqlserver in 句語實現參數化查詢 XML類型解決方案教程,有需了解的朋友可以參考一下下.1:如果參數是int類型: 代碼如下 declare @a xmlset @a='<row><id>1</id></row><r ...
  • 文章利用python來實現郵箱的發送功能,有需要掌握gmail發郵件的同學可以參考本程序. 代碼如下 View Code import os import smtplib import mimetypes from
  • 本文章來給各位介紹在python中twisted實現異步采集的一個實現過程,希望此代碼能對各位需要的朋友帶來幫助.對于大量的數據采集除了多線程,就只有異步來實現了.本文是通過twisted框架來實現異步采集,Asyn
一周排行