使用 Beanstalk 实现微信支付的异步通知

Beanstalk介绍

Beanstalk是一个基于内存的(binlog持久化到硬盘),事件驱动(libevent),简单、快速的任务队列,支持大部分编程语言,将前台的任务转为后台异步处理,为web开发提供更高弹性。它可以支持多个server(客户端支持),一个任务只会被投递到一台server,一个任务只会被一个消费者获取(Reverse)。

使用Beanstalk任务队列提升PHP异步处理能力,降低程序耦合度,使前台更专注,后台处理耗时、扩展性任务(也可以使用其他语言开发),使得web架构更具扩展性。

相比RabbitMQ,Beanstalk作为一个任务队列,设计比较简单,支持以下特性:

  • 优先级(priority),可以对任务进行优先处理(或降级),越小的值优先级越高(0~4,294,967,295),默认按先进先出(FIFO)
  • 延迟执行(delay),一个任务创建完成并稍后再执行(比如等待主从同步)
  • 超时重试(TTR),一个任务没有在指定时间内完成,将会被重新投递,由其他客户端处理。客户端也可以主动进行延时(touch)或重新入队(release)
  • 隐藏(bury),一个任务执行失败了,可以先隐藏,隐藏的任务可以被重新激活(kick).

应用场景

对接过微信支付的应该会知道,用户支付成功后,微信会给我们发一个异步通知,如果我们没有正确处理,这个通知会发多次,直到我们返回正确的标识。

今天我们就用 Beanstalk 实现一下这个通知(通知频率为15s/15s/30s/3m/10m/20m/30m/30m/30m/60m/3h/3h/3h/6h/6h - 总计 24h4m)

先看下结果,如下图,15s/15s/30s/3m都是正常的,第10m出现了1s误差,这个也算正常。后面的就不展示了,时间太长

微信图片_20210925131906.png

目录结构

测试

composer up -d

访问 producer.php,向队列中推一条任务

执行 php consumer.php,结果如上图

代码

docker-compose.yml

version: '3'

networks:
  web-network:

services:
  docker-beanstalkd:
    # registry.cn-hangzhou.aliyuncs.com/cuiw/beanstalkd:20210923
    image: "beanstalkd:20210923"1️⃣
    restart: always
    tty: true
    volumes:
      - ./beanstalkd/data:/var/lib/beanstalkd
    ports:
      - "11300:11300"
    networks:
      - web-network

composer.json

{
  "require": {
    "pda/pheanstalk": "^4.0"
  }
}

beanstalkd.php

<?php
require __DIR__ . '/../../vendor/autoload.php';

use Pheanstalk\Pheanstalk;

class beanstalkd{
    public $conf=[
        'host'=>'docker-beanstalkd',
        'port'=>11300,
        'timeout'=>10,
    ];

    public static function factory(){
        return new self();
    }

    public function handle(): Pheanstalk {
        return Pheanstalk::create($this->conf['host'], $this->conf['port'], $this->conf['timeout']);
    }
}

producer.php

<?php
require './beanstalkd.php';

$pheanstalk = beanstalkd::factory()->handle();

$re=$pheanstalk
    ->useTube('testtube')
    ->put(
        json_encode(['test' => 'data'], JSON_UNESCAPED_UNICODE),
        1024,
        30,
        60
    );
echo json_encode(['id'=>$re->getId(), 'data'=>json_decode($re->getData(), true)], JSON_UNESCAPED_UNICODE);

consumer.php

<?php
require './beanstalkd.php';

$pheanstalk = beanstalkd::factory()->handle();
$pheanstalk->watch('testtube');
while (true) {
    $job = $pheanstalk->reserve();

    //echo $job->getData().PHP_EOL;
    //处理任务
    exec('php result.php', $re, $status);
    $data=json_decode($re[0], true);
    if ($data['err']==0) {
        //删除任务
        $pheanstalk->delete($job);
    } else {
        $stats = $pheanstalk->statsJob($job);
        echo date("Y-m-d H:i:s").':'.json_encode($stats)."\n".PHP_EOL;
        if ($stats['releases'] >=0 && $stats['releases'] <15) {
            //15次以下延时返回队列,通知频率为15s/15s/30s/3m/10m/20m/30m/30m/30m/60m/3h/3h/3h/6h/6h - 总计 24h4m
            $timer = [15,15,30,180,600,1200,1800,1800,1800,3600,10800,10800,10800,21600,21600];
            $pheanstalk->release($job, 0, $timer[$stats['releases']]);

        }else{
            //错误次数过多时 bury
            $pheanstalk->bury($job);
        }

    }

}

result.php

<?php
//处理过程,err==0为成功,
echo json_encode(['err'=>1, 'data'=>[]]);

其他

1️⃣ 构建 beanstalkd 容器 我已经build一个并上传到阿里云,可以直接使用:registry.cn-hangzhou.aliyuncs.com/cuiw/beanstalkd:20210923

git clone git@github.com:beanstalkd/beanstalkd.git

cd beanstalkd

docker build -t beanstalkd:20210923 .

感谢阅读这篇文章,如果你喜欢,或者遇到了问题,可以关注我的公众号