一、Beanstalkd是什么?
Beanstalkd是一个高性能,轻量级的分布式内存队列
二、Beanstalkd特性
1、支持优先级(支持任务插队)
2、延迟(实现定时任务)
3、持久化(定时把内存中的数据刷到binlog日志)
4、预留(把任务设置成预留,消费者无法取出任务,等某个合适时机再拿出来处理)
5、任务超时重发(消费者必须在指定时间内处理任务,如果没有则认为任务失败,重新进入队列)
三、Beanstalkd核心元素
生产者 -> 管道(tube) -> 任务(job) -> 消费者
Beanstalkd可以创建多个管道,管道里面存了很多任务,消费者从管道中取出任务进行处理。
四、任务job状态
delayed 延迟状态
ready 准备好状态
reserved 消费者把任务读出来,处理时
buried 预留状态
delete 删除状态
五、安装Beanstalkd
下载beanstalkd-1.10.tar.gz
1
2
3
|
> tar -xf beanstalkd-1.10.tar.gz
> cd beanstalkd-1.10
> make
|
查看beanstalkd参数信息
启动beanstalkd
1
|
> ./beanstalkd -l 127.0.0.1 -p 11300 -b /data/beanstalkd/binlog &
|
-b表示开启binlog,断电后重启自动恢复任务
六、下载Pheanstalk类
首先安装composer
1
2
3
|
> curl -sS https:
> mv composer.phar /usr/local/bin/composer
> composer require pda/pheanstalk
|
编写一个简单脚本查看信息
1
2
3
4
5
6
7
8
|
<?php
require './vendor/autoload.php' ;
use Pheanstalk\Pheanstalk;
$p = new Pheanstalk( '127.0.0.1' , 11300);
var_dump( $p ->stats());
|
七、Pheanstalk使用方法
维护方法
1
2
3
4
5
6
7
|
stats() 查看状态方法
listTubes() 目前存在的管道
listTubesWatched() 目前监听的管道
statsTube() 管道的状态
useTube() 指定使用的管道
statsJob() 查看任务的详细信息
peek() 通过任务ID获取任务
|
生产者方法
1
2
|
putInTube() 往管道中写入数据
put() 配合useTube()使用
|
消费者方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
watch() 监听管道,可以同时监听多个管道
ignore() 不监听管道
reserve() 以阻塞方式监听管道,获取任务
reserveFromTube()
release() 把任务重新放回管道
bury() 把任务预留
peekBuried() 把预留任务读取出来
kickJob() 把buried状态的任务设置成ready
kick() 批量把buried状态的任务设置成ready
peekReady() 把准备好的任务读取出来
peekDelayed() 把延迟的任务读取出来
pauseTube() 给管道设置延迟
resumeTube() 取消管道延迟
touch() 让任务重新计算ttr时间,给任务续命
|
生产者producer.php代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
<?php
require './vendor/autoload.php' ;
use Pheanstalk\Pheanstalk;
$p = new Pheanstalk( '192.168.1.222' , 11300);
$data = array (
'id' => 1,
'name' => 'test' ,
);
$id = $p ->useTube( 'userReg' )->put(json_encode( $data ));
$job = $p ->peek( $id );
print_r( $p ->statsJob( $job ));
|
消费者consumer.php代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
<?php
require './vendor/autoload.php' ;
use Pheanstalk\Pheanstalk;
$p = new Pheanstalk( '192.168.1.222' , 11300);
$job = $p ->watch( 'userReg' )->ignore( 'default' )->reserve();
$data = json_decode( $job ->getData());
print_r( $data );
$p -> delete ( $job );
|
|