利用Redis keyspace notification(鍵空間通知)實現過期提醒


一、序言:

本文所說的定時任務或者說計划任務並不是很多人想象中的那樣,比如說每天凌晨三點自動運行起來跑一個腳本。這種都已經爛大街了,隨便一個 Crontab 就能搞定了。

這里所說的定時任務可以說是計時器任務,比如說用戶觸發了某個動作,那么從這個點開始過二十四小時我們要對這個動作做點什么。那么如果有 1000 個用戶觸發了這個動作,就會有 1000 個定時任務。於是這就不是 Cron 范疇里面的內容了。

舉個最簡單的例子,一個用戶推薦了另一個用戶,我們定一個二十四小時之后的任務,看看被推薦的用戶有沒有來注冊,如果沒注冊就給他搞一條短信過去

二、需求分析:

  1. 設置了生存時間的Key,在過期時能不能有所提示?

  2. 如果能對過期Key有個監聽,如何對過期Key進行一個回調處理?

  3. 如何使用 Redis 來實現定時任務?

  4. 更具體需求:

    現在需要做一個拍賣活動,如何在拍賣結束那一刻,就執行任務進行相關邏輯;

    如何在訂單交易有效期時間結束的那一刻,進行相關邏輯

三、Redis介紹

在 Redis 的 2.8.0 版本之后,其推出了一個新的特性——鍵空間消息(Redis Keyspace Notifications),它配合 2.0.0 版本之后的 SUBSCRIBE 就能完成這個定時任務

的操作了,不過定時的單位是秒。

(1)Publish / Subscribe

Redis 在 2.0.0 之后推出了 Pub / Sub 的指令,大致就是說一邊給 Redis 的特定頻道發送消息,另一邊從 Redis 的特定頻道取值——形成了一個簡易的消息隊列。

(2)Redis Keyspace Notifications

在 Redis 里面有一些事件,比如鍵到期、鍵被刪除等。然后我們可以通過配置一些東西來讓 Redis 一旦觸發這些事件的時候就往特定的 Channel 推一條消息。

大致的流程就是我們給 Redis 的某一個 db 設置過期事件,使其鍵一旦過期就會往特定頻道推消息,我在自己的客戶端這邊就一直消費這個頻道就好了。

以后一來一條定時任務,我們就把這個任務狀態壓縮成一個鍵,並且過期時間為距這個任務執行的時間差。那么當鍵一旦到期,就到了任務該執行的時間,Redis 自然會把過期消息推去,我們的客戶端就能接收到了。這樣一來就起到了定時任務的作用。

配置

因為開啟鍵空間通知功能需要消耗一些 CPU , 所以在默認配置下, 該功能處於關閉狀態。

可以通過修改 redis.conf 文件, 或者直接使用 CONFIG SET 命令來開啟或關閉鍵空間通知功能:

  • notify-keyspace-events 選項的參數為空字符串時,功能關閉。

  • 另一方面,當參數不是空字符串時,功能開啟。

notify-keyspace-events 的參數可以是以下字符的任意組合, 它指定了服務器該發送哪些類型的通知:

字符 發送的通知
  K 鍵空間通知,所有通知以 __keyspace@<db>__ 為前綴
  E 鍵事件通知,所有通知以 __keyevent@<db>__ 為前綴
  g DELEXPIRERENAME 等類型無關的通用命令的通知
  $ 字符串命令的通知
  l 列表命令的通知
  s 集合命令的通知
  h 哈希命令的通知
  z 有序集合命令的通知
  x 過期事件:每當有過期鍵被刪除時發送
  e 驅逐(evict)事件:每當有鍵因為 maxmemory 政策而被刪除時發送
  A 參數 g$lshzxe 的別名

輸入的參數中至少要有一個 K 或者 E , 否則的話, 不管其余的參數是什么, 都不會有任何通知被分發。

舉個例子, 如果只想訂閱鍵空間中和列表相關的通知, 那么參數就應該設為 Kl , 諸如此類。

將參數設為字符串 "AKE" 表示發送所有類型的通知。

監聽過期事件需要設置Redis 配置文件

notify-keyspace-events "Ex"

命令產生的通知

以下列表記錄了不同命令所產生的不同通知:

Note

所有命令都只在鍵真的被改動了之后,才會產生通知。

比如說,當 [SREM key member member …] 試圖刪除不存在於集合的元素時,刪除操作會執行失敗,因為沒有真正的改動鍵,所以這一操作不會發送通知。

如果對命令所產生的通知有疑問, 最好還是使用以下命令, 自己來驗證一下:

$ redis-cli config set notify-keyspace-events KEA
$ redis-cli --csv psubscribe '__key*__:*'
Reading messages... (press Ctrl-C to quit)
"psubscribe","__key*__:*",1

然后, 只要在其他終端里用 Redis 客戶端發送命令, 就可以看到產生的通知了:

"pmessage","__key*__:*","__keyspace@0__:foo","set"
"pmessage","__key*__:*","__keyevent@0__:set","foo"
...

過期通知的發送時間

Redis 使用以下兩種方式刪除過期的鍵:

  • 當一個鍵被訪問時,程序會對這個鍵進行檢查,如果鍵已經過期,那么該鍵將被刪除。

  • 底層系統會在后台漸進地查找並刪除那些過期的鍵,從而處理那些已經過期、但是不會被訪問到的鍵。

當過期鍵被以上兩個程序的任意一個發現、 並且將鍵從數據庫中刪除時, Redis 會產生一個 expired 通知。

Redis 並不保證生存時間(TTL)變為 0 的鍵會立即被刪除: 如果程序沒有訪問這個過期鍵, 或者帶有生存時間的鍵非常多的話, 那么在鍵的生存時間變為 0 , 直到鍵真正被刪除這中間, 可能會有一段比較顯著的時間間隔。

因此, Redis 產生 expired 通知的時間為過期鍵被刪除的時候, 而不是鍵的生存時間變為 0 的時候。

四、高可用性

因為 Redis 目前的訂閱與發布功能采取的是發送即忘(fire and forget)策略, 所以如果你的程序需要可靠事件通知(reliable notification of events), 那么目前的鍵空間通知可能並不適合你:當訂閱事件的客戶端斷線時, 它會丟失所有在斷線期間分發給它的事件。並不能確保消息送達。未來有計划允許更可靠的事件傳遞,但可能這將在更一般的層面上解決,或者為Pub / Sub本身帶來可靠性,或者允許Lua腳本攔截Pub / Sub消息來執行諸如推送將事件列入清單。

事件類型

對於每個修改數據庫的操作,鍵空間通知都會發送兩種不同類型的事件消息:keyspace 和 keyevent。以 keyspace 為前綴的頻道被稱為鍵空間通知(key-space notification), 而以 keyevent 為前綴的頻道則被稱為鍵事件通知(key-event notification)。

事件是用  __keyspace@DB__:KeyPattern 或者  __keyevent@DB__:OpsType 的格式來發布消息的。
DB表示在第幾個庫;KeyPattern則是表示需要監控的鍵模式(可以用通配符,如:__key*__:*);OpsType則表示操作類型。因此,如果想要訂閱特殊的Key上的事件,應該是訂閱keyspace。
比如說,對 0 號數據庫的鍵 mykey 執行 DEL 命令時, 系統將分發兩條消息, 相當於執行以下兩個 PUBLISH 命令:
PUBLISH __keyspace@0__:sampleKey del
PUBLISH __keyevent@0__:del sampleKey
訂閱第一個頻道 __keyspace@0__:mykey 可以接收 0 號數據庫中所有修改鍵 mykey 的事件, 而訂閱第二個頻道 __keyevent@0__:del 則可以接收 0 號數據庫中所有執行 del 命令的鍵。

五、實現步驟

為了高可用性,為了確保解決過期事件的執行,將 定時事件存入MySQL數據庫。觸發鍵過期事件后,再查詢一次數據庫,檢查一下過期事件是否全部執行了。

數據表結構

CREATE TABLE `tb_time_limit_task` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `key` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT 'Redis鍵',
  `status` tinyint(3) unsigned NOT NULL COMMENT '狀態,0未處理,1已處理',
  `start_time` decimal(13,3) unsigned NOT NULL COMMENT '開始時間(小數部分為毫秒)',
  `end_time` decimal(13,3) unsigned NOT NULL COMMENT '結束時間(小數部分為毫秒)',
  PRIMARY KEY (`id`),
  KEY `we` (`key`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='這個表用於記錄需要時間控制的任務Key,配合Redis、以及回調腳本使用';
​
key存儲規則是 類名@方法名@參數...   (參數可為空,多個參數以@分隔) 
例子: PTCountdown@countdown@218

 

實現思路:

  1. (查詢數據庫)任務狀態檢查,執行未正常執行的任務

    任務狀態檢查

    查詢 ”結束時間 < 當前時間“ 的未處理的任務

    如果存在,則執行任務,

    1.先解析key,類名@方法名@參數... 2.然后根據類名去執行相應方法

  2. 連接redis

    • 連接成功

      • (查詢數據庫)任務狀態檢查,查看在腳本未運行期間是否有部分任務未處理,可能很長時間才連上redis,需要查看連接時間內的任務狀況;

    • 可能會永遠連不上,則每10s,嘗試重連

  3. 生成訂閱消息丟失控制鍵

    向redis初始新增 10個有效期(900/1800/...)的鍵

    #SILCK`1 900
    #SILCK`2 1800
    #SILCK`3 2700
    ...
    #SILCK`10 9000

這一步的目的是 每900秒(15)分鍾,查詢數據庫,檢查任務執行情況

  1. 訂閱過期事件

    • 正常鍵過期

      • 執行任務

    • 訂閱消息控制鍵過期

      • 檢查任務狀態

        • 如果超過一半的控制鍵都過期了,那么重新生成10個

具體代碼:

監聽腳本
<?php
/**
 * Description:時間結點任務監聽
 * Created by dong.cx
 * DateTime: 2019-03-15 10:58
 */

namespace wladmin\cmd;

\think\Loader::addNamespace('wlmis', './wlmis/');

use wlmis\logic\timeLimitTask\base\TimeLimitTaskLogic;
use think\Config;
use think\console\Input;
use think\console\Output;
use think\console\Command;
use think\Log;
use wlmis\common\redis\Redis;
use wlmis\logic\timeLimitTask\base\LogRecord;

class TimeLimitTask extends Command
{
    use LogRecord;
    /**
     * 訂閱信息丟失控制鍵最大數量
     * @var int
     */
    protected $subscription_info_loss_control_key_max = 10;

    /**
     * 訂閱信息丟失控制鍵最后執行的索引,鍵的索引從1開始,為0表示未執行過,這個變量用於控制訂閱信息控制鍵自動生成
     * @var int
     */
    protected $subscription_info_loss_control_key_last = 0;

    public function __construct($name = null)
    {
        parent::__construct($name);
        // 日志記錄初始化
        Log::init([
            'type' => 'File',
            'path' => RUNTIME_PATH . 'redis-logs/',
            // error和sql日志單獨記錄
            'apart_level' => ['log', 'error', 'sql', 'debug', 'info', 'notice'],
        ]);
    }

    /**
     * 運行方式 php tp5cornnew.php TimeLimitTask
     * @author dong.cx 2019-04-02 10:59
     */
    protected function configure()
    {
        $this->setName('TimeLimitTask')->setDescription('Redis keyspace notification subscription script');
    }

    protected function execute(Input $input, Output $output)
    {
        // 配置斷線重連
        Config::set('database.break_reconnect', true);
        $config = Config::get('redis_db');
        $reconnect_str = '';
        RedisReconnect:
        try {
            $this->logRecord('info', "ThinkPHP Version: " . THINK_VERSION);
            $this->logRecord('info', $reconnect_str . "Redis host: " . $config['host'], true, true);
            // 進行任務狀態檢查
            $this->taskStatusCheck();
            $redis = new Redis(get_class($this), true);
            if ($redis->ping() == '+PONG') {
                $this->logRecord('info', 'Connection succeeded', true, true);
                // 查看在腳本未運行期間是否有部分任務未處理
                $this->taskStatusCheck();
            }
            // 生成訂閱消息丟失控制鍵
            $this->subscription_info_loss_control(true);
            $this->logRecord('info', 'Start listening', true, true);
            // 訂閱消息
            $redis->psubscribe(array(
                '__keyevent@' . $config['db'] . '__:expired'
            ), function ($redis, $pattern, $channelName, $message) {
                $msg_split = explode('`', $message);
                if (count($msg_split) == 2 && $msg_split[0] == '#SILCK' && is_numeric($msg_split[1])) {
                    $this->subscription_info_loss_control_key_last = $msg_split[1];
                    $this->taskStatusCheck();
                    if ($this->subscription_info_loss_control_key_last > ($this->subscription_info_loss_control_key_max / 2)) {
                        $this->subscription_info_loss_control();
                        $this->subscription_info_loss_control_key_last = 0;
                    }
                } else {
                    // 這里代表是Redis回調執行
                    $this->task($message);
                }
            });
        } catch (\RedisException $redisThrow) {
            // Redis拋出異常,一般的情況是失去連接,執行重新連接
            $this->logRecord('notice', "Redis loses connection and is reconnecting...", true, true);
            try {
                $redis->close();
            } catch (\Exception $ee) {
            }
            sleep(10);
            $reconnect_str = 'Reconnect ';
            goto RedisReconnect;
        } catch (\Exception $e) {
            // 運行錯誤,這里拋出錯誤的原因為這個文件中的代碼有誤,其他任務執行代碼拋出錯誤,不會導致運行中斷 - 執行到這里運行中斷
            $this->logRecord('error', 'Run-time error' . PHP_EOL . 'File location: ' . $e->getFile() . PHP_EOL . 'Line: ' . $e->getLine() . PHP_EOL . 'Error Message: ' . $e->getMessage() . PHP_EOL, true, true);
        }
    }

    /**
     * 任務執行
     * @param string $key 任務鍵名,記錄於Redis中的鍵名
     *                         鍵名規則:類名@方法名@參數...(后續的多個參數都用@分隔),在時間限制任務基類中有生成鍵的封裝函數
     * @author: dong.cx
     */
    private function task($key)
    {
        try {
            $params = explode('@', $key, 3);
            if (count($params) < 2) {
                return;
            }
            $class = new \ReflectionClass('wlmis\\logic\\timeLimitTask\\' . $params[0]);
            $instance = $class->newInstance();
            $transfer = array();
            if (count($params) == 3) {
                $transfer = explode('@', $params[2]);
            }
            $instance->call_func($params[1], $transfer);
        } catch (\Exception $e) {
            $this->logRecord('notice', 'Task execution class or method not found! Or call the method to throw an error.'
                . PHP_EOL . 'Pass Key Parameter: ' . $key . PHP_EOL . 'File location: ' . get_class($this)
                . PHP_EOL . 'Line: ' . $e->getLine() . PHP_EOL . 'Error Message: ' . $e->getMessage() . PHP_EOL . PHP_EOL);
        }
    }

    /**
     * 任務狀態檢查,執行未正常執行的任務
     * @author dong.cx 2019-04-02 10:57
     */
    private function taskStatusCheck()
    {
        try {
            $result = (new TimeLimitTaskLogic())->getNotPerformedTask();
            if (!empty($result)) {
                $this->logRecord('info', 'Find ' . count($result) . ' unprocessed task:');
                foreach ($result as $value) {
                    $this->task($value['key']);
                }
            }
        } catch (\Exception $e) {
            $this->logRecord('notice', 'An exception occurred during task status checking.');
        }
    }

    /**
     * 生成訂閱消息丟失控制鍵
     * @param boolean $always_output_screen 不管不否在調試模式都輸出到屏幕
     *
     * @author dong.cx 2019-04-02 10:58
     */
    private function subscription_info_loss_control($always_output_screen = false)
    {
        try {
            $this->logRecord('info', 'Generates subscription information loss control keys.', true, $always_output_screen);
            $success = 0;
            $error = 0;
            $redis = new Redis();
            for ($i = 1; $i <= $this->subscription_info_loss_control_key_max; $i++) {
                $redis->setex('#SILCK`' . $i, $i * 900, '') ? $success++ : $error++;
            }
            $this->logRecord('info', 'Generates loss control keys: ' . $this->subscription_info_loss_control_key_max . ' total, ' . $success . ' success, ' . $error . ' error', true, $always_output_screen);
            $redis->close();
        } catch (\Exception $e) {
            $this->logRecord('notice', 'An exception occurs when the subscription information loss control key is created.', true, $always_output_screen);
        }
    }
}

 

鍵事件回調操作
<?php
/**
 * Description:拍賣倒計時操作
 * Created by dong.cx
 * DateTime: 2019-03-18 10:04
 */

namespace wlmis\logic\timeLimitTask;


use think\Config;
use think\Exception;
use wlmis\common\redis\Redis;
use wlmis\dao\addons\auction\AuctionGoodsDao;
use wlmis\logic\oper\addons\auction\AuctionLogic;
use wlmis\logic\timeLimitTask\base\TimeLimitBaseLogic;

class AuctionCutDownLogic extends TimeLimitBaseLogic
{
    private $auctionGoodsDao;
    public function __construct()
    {
        parent::__construct();
        $this->auctionGoodsDao = new AuctionGoodsDao();
    }

    /**
     * 拍賣結束, 更新拍品表/保單表 操作
     * @param $params
     *
     * @author dong.cx 2019-03-18 18:39
     */
    public function auctionEndCutDown($params)
    {
        $auctionId = $params[0];
        $auctionLogic = new AuctionLogic();
        try {
            if (!$auctionId || !is_numeric($auctionId)) throw new Exception('Params error');
            $goodsInfo = $this->auctionGoodsDao->load($auctionId, 'final_end_time');
            if (!$goodsInfo) {
                $this->logRecord('notice', 'tb_auction_goods主鍵:' . $auctionId . '不存在');
            } else {
                parent::startTrans();
                // 拍賣結束
                $result = $auctionLogic->auctionEnded($auctionId);
                if ($result['code'] == 0) {
                    $this->logRecord('notice', $result['msg']);
                }
                // 更改mysql中鍵的狀態為已處理
                $this->recording_process_mysql($this->key_splice(__FUNCTION__, [$auctionId]));
                // 刪除 redis 當前價
                $redis = new Redis();
                $redis->del('auction_gid@' . $auctionId . '@current_bid');

                websocket_send($auctionId . 'bid/index', true, 2, '拍賣結束');
            }
            parent::commit();

        } catch (Exception $e) {
            parent::rollback();
            $this->throw_message(__FUNCTION__, $e);
        }
    }

    /**
     * 拍賣交易結束
     *     無訂單/未付款,不釋放保證金
     * @param $params
     *
     * @author dong.cx 2019-03-18 20:15
     */
    public function dealCutDown($params)
    {
        $auctionId = $params[0];
        $auctionLogic = new AuctionLogic();
        try {
            parent::startTrans();
            if (!$auctionId || !is_numeric($auctionId)) throw new Exception('Params error');
            $goodsInfo = $this->auctionGoodsDao->load($auctionId, 'final_end_time');
            if (!$goodsInfo) {
                $this->logRecord('notice', 'tb_auction_goods主鍵:' . $auctionId . '不存在');
            } elseif (!$goodsInfo['final_end_time']) {
                $this->logRecord('notice', 'tb_auction_goods主鍵:' . $auctionId . '的拍品還未結束或最終結束時間為空');
            } else {
                $result = $auctionLogic->checkStatus($auctionId);
                if ($result['code'] == 0) $this->logRecord('notice', $result['msg']);
                // 更改mysql中鍵的狀態為已處理
                $this->recording_process_mysql($this->key_splice(__FUNCTION__, [$auctionId]));
            }
            parent::commit();
        } catch (Exception $e) {
            parent::rollback();
            $this->throw_message(__FUNCTION__, $e);
        }
    }

    /**
     * 創建拍賣結束倒計時任務
     * @param $auctionId
     * @param int $ttl
     *
     * @throws Exception
     * @author dong.cx 2019-04-01 09:49
     */
    public function auction_end_countdown_create($auctionId, $ttl=0)
    {
        return $this->create('auctionEndCutDown', $ttl, [$auctionId]);
    }

    /**
     * 刪除拍賣結束倒計時任務
     * @param int $auctionId 拍賣商品表主鍵
     *
     * @return bool|int
     * @throws Exception
     * @author dong.cx 2019-04-01 10:08:49
     */
    public function auction_end_countdown_delete($auctionId)
    {
        return $this->del_key('auctionEndCutDown', [$auctionId]);
    }

    /**
     * 創建交易倒計時任務
     * @param int $auctionId 拍賣商品表主鍵
     * @param int $ttl 生存時間
     *
     * @throws Exception
     * 異常代碼:
     * 500     redis操作失敗
     * @author dong.cx 2019-03-22 15:36
     */
    public function deal_countdown_create($auctionId, $ttl=0)
    {
        $this->create('dealCutDown', $ttl + Config::get('auction_deal_limit_time'), [$auctionId]);
    }

    /**
     * 刪除交易倒計時任務
     * @param int $auctionId 拍賣商品表主鍵
     *
     * @return bool|int
     * @throws Exception
     * @author dong.cx 2019-03-22 15:36
     */
    public function deal_countdown_delete($auctionId)
    {
        return $this->del_key('countdown', [$auctionId]);
    }
}

 

 

任務基類
<?php
/**
 * Created by dong.cx
 * Date: 2019/3/27 17:13
 * Description: 時間限制任務基類
 *              每一個子類繼承這個基類實現時間任務調度
 *              子類中開放給Redis調度的函數設置訪問權限為protected,防止外部誤觸發
 *              子類中其他開放給內部調用的訪問權限為public
 * ************************************************
 * 存儲到Redis中的鍵名規則為:類名@方法名@參數...(參數可為空,多個參數則以@分隔) key_splice 函數可生成鍵
 * 所有的參數通過一個數組傳入方法(一維索引數組,跟存儲函數 create 傳入參數時一樣)
 * 類名、方法名,盡量精簡,能節約帶寬以及Redis查詢速度
 * 參數設計也盡量精簡,所有操作都在服務端內部完成,所以能用1個條件准確查詢數據庫的,不要用兩個條件查詢
 *
 * 存儲鍵直接使用 create 方法,以秒為單位,會自動拼接鍵鍵
 * 如果以毫秒為單位則 create_ms 方法
 * ************************************************
 */

namespace wlmis\logic\timeLimitTask\base;


use think\Exception;
use wlmis\common\redis\Redis;
use wlmis\model\sys\TimeLimitTaskModel;
use wlmis\logic\BaseLogic;

class TimeLimitBaseLogic extends BaseLogic
{
    use LogRecord;

    /**
     * Redis連接實例
     * @var Redis
     */
    protected $redis;

    /**
     * TimeLimitBaseLogic constructor.
     * @author dong.cx
     */
    public function __construct()
    {
        parent::__construct();
        $this->redis = new Redis();
    }

    /**
     * 任務調度入口
     * @param string $funcName 調用方法名
     * @param array $params 傳遞參數
     * @author: dong.cx
     */
    public function call_func($funcName, $params = array())
    {
        call_user_func(array($this, $funcName), $params);
    }

    /**
     * 鍵拼接
     * 鍵用 @ 符號作為分隔符,所以方法名、參數中不可出現
     * 鍵名規則中的類名會自動生成
     * @param string $funcName 方法名
     * @param array $params 參數(必須傳入一維索引數組,請勿傳入關聯數組,按照順序生成參數,關聯數組不保證順序)
     * @return string                  返回鍵
     * @author: dong.cx
     */
    protected function key_splice($funcName, $params = array())
    {
        $class = explode('\\', get_class($this));
        $paramsStr = '';
        foreach ($params as $value) {
            $paramsStr .= '@' . $value;
        }
        return $class[count($class) - 1] . '@' . $funcName . $paramsStr;
    }

    /**
     * 向Redis存儲鍵(延時單位秒)
     * 會自動將參數進行拼接,然后存入Redis
     * @param string $funcName 調用方法名
     * @param int $ttl 延時(秒)
     * @param array $params 參數(必須傳入一維索引數組,請勿傳入關聯數組,按照順序生成參數,關聯數組不保證順序)
     * @throws Exception
     * *********************
     * 異常代碼:
     * 500     redis操作失敗
     * *********************
     * @author: dong.cx
     */
    public function create($funcName, $ttl = 0, $params = array())
    {
        $key = $this->key_splice($funcName, $params);
        $this->recording_mysql($key, $ttl);
        if (!($this->redis->setex($key, $ttl, ''))) {
            throw new Exception('Redis存儲失敗', 500);
        }
    }

    /**
     * 向Redis存儲鍵(延時單位毫秒)
     * 會自動將參數進行拼接,然后存入Redis
     * @param string $funcName 調用方法名
     * @param int $ttl 延時(毫秒)
     * @param array $params 參數(必須傳入一維索引數組,請勿傳入關聯數組,按照順序生成參數,關聯數組不保證順序)
     * @throws Exception
     * *********************
     * 異常代碼:
     * 500     redis操作失敗
     * *********************
     * @author: dong.cx
     */
    public function create_ms($funcName, $ttl = 0, $params = array())
    {
        $key = $this->key_splice($funcName, $params);
        $this->recording_mysql($key, $ttl, true);
        if (!($this->redis->psetex($key, $ttl, ''))) {
            throw new Exception('Redis存儲失敗', 500);
        }
    }

    /**
     * 獲取指定鍵的剩余生存時間(秒)
     * @param string $funcName  任務方法名
     * @param array $params 任務參數
     * @return bool|int         如果為false,說明Redis連接失敗
     *                          如果為-1,說明改鍵不是定時鍵
     *                          如果為-2,說明鍵不存在(已消失)
     *                          其他為剩余生存時間(秒)
     * @throws Exception
     * @author: dong.cx
     */
    protected function getTTL($funcName, $params = array())
    {
        $key = $this->key_splice($funcName, $params);
        return $this->redis->ttl($key);
    }

    /**
     * 獲取指定鍵的剩余生存時間(毫秒)
     * @param string $funcName  任務方法名
     * @param array $params 任務參數
     * @return bool|int         如果為false,說明Redis連接失敗
     *                          如果為-1,說明改鍵不是定時鍵
     *                          如果為-2,說明鍵不存在(已消失)
     *                          其他為剩余生存時間(秒)
     * @throws Exception
     * @author: dong.cx
     */
    protected function getPTTL($funcName, $params = array())
    {
        $key = $this->key_splice($funcName, $params);
        return $this->redis->pttl($key);
    }

    /**
     * 刪除指定鍵
     * ***********************************************
     * 刪除不會觸發事件,用於無用記錄的刪除
     * 如生成支付訂單二次提交時刪除前面一個未處理任務。
     * 一般在設計任務處理流程時需要考慮到無用任務的觸發,並進行規避,必要時進行主動刪除任務可以減輕服務器負擔
     * 任務處理流程應該做到無用記錄的觸發不會影響到系統正常運行
     * ***********************************************
     * @param $funcName
     * @param array $params
     * @return bool|int         返回false則Redis實例獲取失敗,連接不上,返回int則為影響的記錄條數
     * @throws Exception
     * @author: dong.cx
     */
    protected function del_key($funcName, $params = array())
    {
        $key = $this->key_splice($funcName, $params);
        TimeLimitTaskModel::where('key', $key)->update([
            'sts' => 1
        ]);
        return $this->redis->del($key);
    }

    /**
     * 記錄鍵到mysql中,
     * @param string $key 鍵
     * @param int $ttl 觸發時間
     * @param bool $mode 當為false時,觸發時間為秒,當為true時,觸發時間為毫秒
     * @throws \think\db\exception\DataNotFoundException
     * @throws \think\db\exception\ModelNotFoundException
     * @throws \think\exception\DbException
     * @author: dong.cx
     */
    private function recording_mysql($key, $ttl, $mode = false)
    {
        if ($mode) {
            // 這里說明 TTL 以毫秒為單位
            $currentTime = bcmul(microtime(true), '1', 3);
            $endTime = bcadd($currentTime, bcdiv($ttl, '1000', 3), 3);
        } else {
            // 這里說明 TTL 以秒為單位
            $currentTime = time();
            $endTime = $currentTime + $ttl;
        }
        if (TimeLimitTaskModel::field('id')->where('key', $key)->find() !== null) {
            TimeLimitTaskModel::where('key', $key)->update([
                'status'     => 0,
                'start_time' => $currentTime,
                'end_time'   => $endTime
            ]);
        } else {
            TimeLimitTaskModel::create([
                'key'        => $key,
                'status'     => 0,
                'start_time' => $currentTime,
                'end_time'   => $endTime,
                'sts'        => 0
            ]);
        }
    }

    /**
     * 更改鍵在mysql中的狀態為已處理
     * @param $key
     * @author: dong.cx
     */
    protected function recording_process_mysql($key)
    {
        $tlm = new TimeLimitTaskModel();
        $tlm->where('key', $key)->update([
            'status' => 1
        ]);
    }

    /**
     * 拋出錯誤信息
     * @param string $funcName 出錯方法名(__FUNCTION__)
     * @param \Exception $e 錯誤信息
     * @author: dong.cx
     */
    protected function throw_message($funcName, \Exception $e)
    {
        $this->logRecord('error', 'The task logic has made an error:' . PHP_EOL . 'Class:' . get_class($this)
            . PHP_EOL . 'Method name:' . $funcName . PHP_EOL . 'File:' . $e->getFile() . PHP_EOL . 'Line: ' . $e->getLine()
            . PHP_EOL . 'Error Message:' . $e->getMessage() . PHP_EOL);
    }

    /**
     * 析構函數
     * @author dong.cx
     */
    public function __destruct()
    {
        $this->redis->close();
    }

}

 

運行

 ✘  ~/Documents/card253  php tp5cornnew.php TimeLimitTask
【2019-04-08 11:40:02】ThinkPHP Version: 5.0.72019-04-08 11:40:02】Redis host: 127.0.0.12019-04-08 11:40:02】Connection succeeded
【2019-04-08 11:40:02】Generates subscription information loss control keys.
【2019-04-08 11:40:02】Generates loss control keys: 10 total, 10 success, 0 error
【2019-04-08 11:40:02】Start listening

使用:

只需要啟動腳本,

在需要的時候,新增任務即可

參考資料:

redis鍵空間

Redis實踐操作之—— keyspace notification(鍵空間通知)

 


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2020 ITdaan.com