本文所說的定時任務或者說計划任務並不是很多人想象中的那樣,比如說每天凌晨三點自動運行起來跑一個腳本。這種都已經爛大街了,隨便一個 Crontab 就能搞定了。
這里所說的定時任務可以說是計時器任務,比如說用戶觸發了某個動作,那么從這個點開始過二十四小時我們要對這個動作做點什么。那么如果有 1000 個用戶觸發了這個動作,就會有 1000 個定時任務。於是這就不是 Cron 范疇里面的內容了。
舉個最簡單的例子,一個用戶推薦了另一個用戶,我們定一個二十四小時之后的任務,看看被推薦的用戶有沒有來注冊,如果沒注冊就給他搞一條短信過去
設置了生存時間的Key,在過期時能不能有所提示?
如果能對過期Key有個監聽,如何對過期Key進行一個回調處理?
如何使用 Redis 來實現定時任務?
更具體需求:
現在需要做一個拍賣活動,如何在拍賣結束那一刻,就執行任務進行相關邏輯;
如何在訂單交易有效期時間結束的那一刻,進行相關邏輯
在 Redis 的 2.8.0 版本之后,其推出了一個新的特性——鍵空間消息(Redis Keyspace Notifications),它配合 2.0.0 版本之后的 SUBSCRIBE 就能完成這個定時任務
的操作了,不過定時的單位是秒。
(1)Publish / Subscribe
Redis 在 2.0.0 之后推出了 Pub / Sub 的指令,大致就是說一邊給 Redis 的特定頻道發送消息,另一邊從 Redis 的特定頻道取值——形成了一個簡易的消息隊列。
(2)Redis Keyspace Notifications
在 Redis 里面有一些事件,比如鍵到期、鍵被刪除等。然后我們可以通過配置一些東西來讓 Redis 一旦觸發這些事件的時候就往特定的 Channel 推一條消息。
大致的流程就是我們給 Redis 的某一個 db 設置過期事件,使其鍵一旦過期就會往特定頻道推消息,我在自己的客戶端這邊就一直消費這個頻道就好了。
以后一來一條定時任務,我們就把這個任務狀態壓縮成一個鍵,並且過期時間為距這個任務執行的時間差。那么當鍵一旦到期,就到了任務該執行的時間,Redis 自然會把過期消息推去,我們的客戶端就能接收到了。這樣一來就起到了定時任務的作用。
因為開啟鍵空間通知功能需要消耗一些 CPU , 所以在默認配置下, 該功能處於關閉狀態。
可以通過修改 redis.conf
文件, 或者直接使用 CONFIG SET
命令來開啟或關閉鍵空間通知功能:
當 notify-keyspace-events
選項的參數為空字符串時,功能關閉。
另一方面,當參數不是空字符串時,功能開啟。
notify-keyspace-events
的參數可以是以下字符的任意組合, 它指定了服務器該發送哪些類型的通知:
字符 | 發送的通知 |
---|---|
K |
鍵空間通知,所有通知以 __keyspace@<db>__ 為前綴 |
E |
鍵事件通知,所有通知以 __keyevent@<db>__ 為前綴 |
g |
DEL 、 EXPIRE 、 RENAME 等類型無關的通用命令的通知 |
$ |
字符串命令的通知 |
l |
列表命令的通知 |
s |
集合命令的通知 |
h |
哈希命令的通知 |
z |
有序集合命令的通知 |
x |
過期事件:每當有過期鍵被刪除時發送 |
e |
驅逐(evict)事件:每當有鍵因為 maxmemory 政策而被刪除時發送 |
A |
參數 g$lshzxe 的別名 |
輸入的參數中至少要有一個 K
或者 E
, 否則的話, 不管其余的參數是什么, 都不會有任何通知被分發。
舉個例子, 如果只想訂閱鍵空間中和列表相關的通知, 那么參數就應該設為 Kl
, 諸如此類。
將參數設為字符串 "AKE"
表示發送所有類型的通知。
監聽過期事件需要設置Redis 配置文件
notify-keyspace-events "Ex"
以下列表記錄了不同命令所產生的不同通知:
[DEL key key …] 命令為每個被刪除的鍵產生一個 del
通知。
RENAME key newkey 產生兩個通知:為來源鍵(source key)產生一個 rename_from
通知,並為目標鍵(destination key)產生一個 rename_to
通知。
EXPIRE key seconds 和 EXPIREAT key timestamp 在鍵被正確設置過期時間時產生一個 expire
通知。當 EXPIREAT key timestamp 設置的時間已經過期,或者 EXPIRE key seconds傳入的時間為負數值時,鍵被刪除,並產生一個 del
通知。
[SORT key [BY pattern] [LIMIT offset count] [GET pattern [GET pattern …]] [ASC | DESC] [ALPHA] [STORE destination]](http://redisdoc.com/database/sort.html#sort) 在命令帶有 STORE
參數時產生一個 sortstore
事件。如果 STORE
指示的用於保存排序結果的鍵已經存在,那么程序還會發送一個 del
事件。
SET key value [EX seconds] [PX milliseconds] [NX|XX] 以及它的所有變種(SETEX key seconds value 、 SETNX key value 和 GETSET key value)都產生 set
通知。其中 SETEX key seconds value 還會產生 expire
通知。
[MSET key value key value …] 為每個鍵產生一個 set
通知。
SETRANGE key offset value 產生一個 setrange
通知。
INCR key 、 DECR key 、 INCRBY key increment 和 DECRBY key decrement 都產生 incrby
通知。
INCRBYFLOAT key increment 產生 incrbyfloat
通知。
APPEND key value 產生 append
通知。
[LPUSH key value value …] 和 LPUSHX key value 都產生單個 lpush
通知,即使有多個輸入元素時,也是如此。
[RPUSH key value value …] 和 RPUSHX key value 都產生單個 rpush
通知,即使有多個輸入元素時,也是如此。
RPOP key 產生 rpop
通知。如果被彈出的元素是列表的最后一個元素,那么還會產生一個 del
通知。
LPOP key 產生 lpop
通知。如果被彈出的元素是列表的最后一個元素,那么還會產生一個 del
通知。
LINSERT key BEFORE|AFTER pivot value 產生一個 linsert
通知。
LSET key index value 產生一個 lset
通知。
LTRIM key start stop 產生一個 ltrim
通知。如果 LTRIM key start stop 執行之后,列表鍵被清空,那么還會產生一個 del
通知。
RPOPLPUSH source destination 和 BRPOPLPUSH source destination timeout 產生一個 rpop
通知,以及一個 lpush
通知。兩個命令都會保證 rpop
的通知在 lpush
的通知之前分發。如果從鍵彈出元素之后,被彈出的列表鍵被清空,那么還會產生一個 del
通知。
HSET hash field value 、 HSETNX hash field value 和 HMSET 都只產生一個 hset
通知。
HINCRBY 產生一個 hincrby
通知。
HINCRBYFLOAT 產生一個 hincrbyfloat
通知。
[SADD key member member …] 產生一個 sadd
通知,即使有多個輸入元素時,也是如此。
[SREM key member member …] 產生一個 srem
通知,如果執行 [SREM key member member …] 之后,集合鍵被清空,那么還會產生一個 del
通知。
SMOVE source destination member 為來源鍵(source key)產生一個 srem
通知,並為目標鍵(destination key)產生一個 sadd
事件。
SPOP key 產生一個 spop
事件。如果執行 SPOP key 之后,集合鍵被清空,那么還會產生一個 del
通知。
[SINTERSTORE destination key key …] 、 [SUNIONSTORE destination key key …] 和 [SDIFFSTORE destination key key …] 分別產生 sinterstore
、 sunionostore
和 sdiffstore
三種通知。如果用於保存結果的鍵已經存在,那么還會產生一個 del
通知。
ZINCRBY key increment member 產生一個 zincr
通知。(譯注:非對稱,請注意。)
[ZADD key score member [[score member] [score member] …]](http://redisdoc.com/sorted_set/zadd.html#zadd) 產生一個 zadd
通知,即使有多個輸入元素時,也是如此。
[ZREM key member member …] 產生一個 zrem
通知,即使有多個輸入元素時,也是如此。如果執行 [ZREM key member member …] 之后,有序集合鍵被清空,那么還會產生一個 del
通知。
ZREMRANGEBYSCORE key min max 產生一個 zrembyscore
通知。(譯注:非對稱,請注意。)如果用於保存結果的鍵已經存在,那么還會產生一個 del
通知。
ZREMRANGEBYRANK key start stop 產生一個 zrembyrank
通知。(譯注:非對稱,請注意。)如果用於保存結果的鍵已經存在,那么還會產生一個 del
通知。
[ZINTERSTORE destination numkeys key [key …] [WEIGHTS weight [weight …]] [AGGREGATE SUM|MIN|MAX]](http://redisdoc.com/sorted_set/zinterstore.html#zinterstore) 和 [ZUNIONSTORE destination numkeys key [key …] [WEIGHTS weight [weight …]] [AGGREGATE SUM|MIN|MAX]](http://redisdoc.com/sorted_set/zunionstore.html#zunionstore) 分別產生 zinterstore
和 zunionstore
兩種通知。如果用於保存結果的鍵已經存在,那么還會產生一個 del
通知。
每當一個鍵因為過期而被刪除時,產生一個 expired
通知。
每當一個鍵因為 maxmemory
政策而被刪除以回收內存時,產生一個 evicted
通知。
所有命令都只在鍵真的被改動了之后,才會產生通知。
比如說,當 [SREM key member member …] 試圖刪除不存在於集合的元素時,刪除操作會執行失敗,因為沒有真正的改動鍵,所以這一操作不會發送通知。
如果對命令所產生的通知有疑問, 最好還是使用以下命令, 自己來驗證一下:
$ redis-cli config set notify-keyspace-events KEA $ redis-cli --csv psubscribe '__key*__:*' Reading messages... (press Ctrl-C to quit) "psubscribe","__key*__:*",1
然后, 只要在其他終端里用 Redis 客戶端發送命令, 就可以看到產生的通知了:
"pmessage","__key*__:*","__keyspace@0__:foo","set" "pmessage","__key*__:*","__keyevent@0__:set","foo" ...
Redis 使用以下兩種方式刪除過期的鍵:
當一個鍵被訪問時,程序會對這個鍵進行檢查,如果鍵已經過期,那么該鍵將被刪除。
底層系統會在后台漸進地查找並刪除那些過期的鍵,從而處理那些已經過期、但是不會被訪問到的鍵。
當過期鍵被以上兩個程序的任意一個發現、 並且將鍵從數據庫中刪除時, Redis 會產生一個 expired
通知。
Redis 並不保證生存時間(TTL)變為 0
的鍵會立即被刪除: 如果程序沒有訪問這個過期鍵, 或者帶有生存時間的鍵非常多的話, 那么在鍵的生存時間變為 0
, 直到鍵真正被刪除這中間, 可能會有一段比較顯著的時間間隔。
因此, Redis 產生 expired
通知的時間為過期鍵被刪除的時候, 而不是鍵的生存時間變為 0
的時候。
因為 Redis 目前的訂閱與發布功能采取的是發送即忘(fire and forget)策略, 所以如果你的程序需要可靠事件通知(reliable notification of events), 那么目前的鍵空間通知可能並不適合你:當訂閱事件的客戶端斷線時, 它會丟失所有在斷線期間分發給它的事件。並不能確保消息送達。未來有計划允許更可靠的事件傳遞,但可能這將在更一般的層面上解決,或者為Pub / Sub本身帶來可靠性,或者允許Lua腳本攔截Pub / Sub消息來執行諸如推送將事件列入清單。
對於每個修改數據庫的操作,鍵空間通知都會發送兩種不同類型的事件消息:keyspace 和 keyevent。以 keyspace 為前綴的頻道被稱為鍵空間通知(key-space notification), 而以 keyevent 為前綴的頻道則被稱為鍵事件通知(key-event notification)。
事件是用 __keyspace@DB__:KeyPattern 或者 __keyevent@DB__:OpsType 的格式來發布消息的。
DB表示在第幾個庫;KeyPattern則是表示需要監控的鍵模式(可以用通配符,如:__key*__:*);OpsType則表示操作類型。因此,如果想要訂閱特殊的Key上的事件,應該是訂閱keyspace。
比如說,對 0 號數據庫的鍵 mykey 執行 DEL 命令時, 系統將分發兩條消息, 相當於執行以下兩個 PUBLISH 命令:
PUBLISH __keyspace@0__:sampleKey del
PUBLISH __keyevent@0__:del sampleKey
訂閱第一個頻道 __keyspace@0__:mykey 可以接收 0 號數據庫中所有修改鍵 mykey 的事件, 而訂閱第二個頻道 __keyevent@0__:del 則可以接收 0 號數據庫中所有執行 del 命令的鍵。
為了高可用性,為了確保解決過期事件的執行,將 定時事件存入MySQL數據庫。觸發鍵過期事件后,再查詢一次數據庫,檢查一下過期事件是否全部執行了。
CREATE TABLE `tb_time_limit_task` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `key` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT 'Redis鍵', `status` tinyint(3) unsigned NOT NULL COMMENT '狀態,0未處理,1已處理', `start_time` decimal(13,3) unsigned NOT NULL COMMENT '開始時間(小數部分為毫秒)', `end_time` decimal(13,3) unsigned NOT NULL COMMENT '結束時間(小數部分為毫秒)', PRIMARY KEY (`id`), KEY `we` (`key`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='這個表用於記錄需要時間控制的任務Key,配合Redis、以及回調腳本使用'; key存儲規則是 類名@方法名@參數... (參數可為空,多個參數以@分隔) 例子: PTCountdown@countdown@218
(查詢數據庫)任務狀態檢查,執行未正常執行的任務
任務狀態檢查
查詢 ”結束時間 < 當前時間“ 的未處理的任務
如果存在,則執行任務,
1.先解析key,類名@方法名@參數... 2.然后根據類名去執行相應方法
連接redis
連接成功
(查詢數據庫)任務狀態檢查,查看在腳本未運行期間是否有部分任務未處理,可能很長時間才連上redis,需要查看連接時間內的任務狀況;
可能會永遠連不上,則每10s,嘗試重連
生成訂閱消息丟失控制鍵
向redis初始新增 10個有效期(900/1800/...)的鍵
#SILCK`1 900
#SILCK`2 1800
#SILCK`3 2700
...
#SILCK`10 9000
這一步的目的是 每900秒(15)分鍾,查詢數據庫,檢查任務執行情況
訂閱過期事件
正常鍵過期
執行任務
訂閱消息控制鍵過期
檢查任務狀態
如果超過一半的控制鍵都過期了,那么重新生成10個
<?php /** * Description:時間結點任務監聽 * Created by dong.cx * DateTime: 2019-03-15 10:58 */ namespace wladmin\cmd; \think\Loader::addNamespace('wlmis', './wlmis/'); use wlmis\logic\timeLimitTask\base\TimeLimitTaskLogic; use think\Config; use think\console\Input; use think\console\Output; use think\console\Command; use think\Log; use wlmis\common\redis\Redis; use wlmis\logic\timeLimitTask\base\LogRecord; class TimeLimitTask extends Command { use LogRecord; /** * 訂閱信息丟失控制鍵最大數量 * @var int */ protected $subscription_info_loss_control_key_max = 10; /** * 訂閱信息丟失控制鍵最后執行的索引,鍵的索引從1開始,為0表示未執行過,這個變量用於控制訂閱信息控制鍵自動生成 * @var int */ protected $subscription_info_loss_control_key_last = 0; public function __construct($name = null) { parent::__construct($name); // 日志記錄初始化 Log::init([ 'type' => 'File', 'path' => RUNTIME_PATH . 'redis-logs/', // error和sql日志單獨記錄 'apart_level' => ['log', 'error', 'sql', 'debug', 'info', 'notice'], ]); } /** * 運行方式 php tp5cornnew.php TimeLimitTask * @author dong.cx 2019-04-02 10:59 */ protected function configure() { $this->setName('TimeLimitTask')->setDescription('Redis keyspace notification subscription script'); } protected function execute(Input $input, Output $output) { // 配置斷線重連 Config::set('database.break_reconnect', true); $config = Config::get('redis_db'); $reconnect_str = ''; RedisReconnect: try { $this->logRecord('info', "ThinkPHP Version: " . THINK_VERSION); $this->logRecord('info', $reconnect_str . "Redis host: " . $config['host'], true, true); // 進行任務狀態檢查 $this->taskStatusCheck(); $redis = new Redis(get_class($this), true); if ($redis->ping() == '+PONG') { $this->logRecord('info', 'Connection succeeded', true, true); // 查看在腳本未運行期間是否有部分任務未處理 $this->taskStatusCheck(); } // 生成訂閱消息丟失控制鍵 $this->subscription_info_loss_control(true); $this->logRecord('info', 'Start listening', true, true); // 訂閱消息 $redis->psubscribe(array( '__keyevent@' . $config['db'] . '__:expired' ), function ($redis, $pattern, $channelName, $message) { $msg_split = explode('`', $message); if (count($msg_split) == 2 && $msg_split[0] == '#SILCK' && is_numeric($msg_split[1])) { $this->subscription_info_loss_control_key_last = $msg_split[1]; $this->taskStatusCheck(); if ($this->subscription_info_loss_control_key_last > ($this->subscription_info_loss_control_key_max / 2)) { $this->subscription_info_loss_control(); $this->subscription_info_loss_control_key_last = 0; } } else { // 這里代表是Redis回調執行 $this->task($message); } }); } catch (\RedisException $redisThrow) { // Redis拋出異常,一般的情況是失去連接,執行重新連接 $this->logRecord('notice', "Redis loses connection and is reconnecting...", true, true); try { $redis->close(); } catch (\Exception $ee) { } sleep(10); $reconnect_str = 'Reconnect '; goto RedisReconnect; } catch (\Exception $e) { // 運行錯誤,這里拋出錯誤的原因為這個文件中的代碼有誤,其他任務執行代碼拋出錯誤,不會導致運行中斷 - 執行到這里運行中斷 $this->logRecord('error', 'Run-time error' . PHP_EOL . 'File location: ' . $e->getFile() . PHP_EOL . 'Line: ' . $e->getLine() . PHP_EOL . 'Error Message: ' . $e->getMessage() . PHP_EOL, true, true); } } /** * 任務執行 * @param string $key 任務鍵名,記錄於Redis中的鍵名 * 鍵名規則:類名@方法名@參數...(后續的多個參數都用@分隔),在時間限制任務基類中有生成鍵的封裝函數 * @author: dong.cx */ private function task($key) { try { $params = explode('@', $key, 3); if (count($params) < 2) { return; } $class = new \ReflectionClass('wlmis\\logic\\timeLimitTask\\' . $params[0]); $instance = $class->newInstance(); $transfer = array(); if (count($params) == 3) { $transfer = explode('@', $params[2]); } $instance->call_func($params[1], $transfer); } catch (\Exception $e) { $this->logRecord('notice', 'Task execution class or method not found! Or call the method to throw an error.' . PHP_EOL . 'Pass Key Parameter: ' . $key . PHP_EOL . 'File location: ' . get_class($this) . PHP_EOL . 'Line: ' . $e->getLine() . PHP_EOL . 'Error Message: ' . $e->getMessage() . PHP_EOL . PHP_EOL); } } /** * 任務狀態檢查,執行未正常執行的任務 * @author dong.cx 2019-04-02 10:57 */ private function taskStatusCheck() { try { $result = (new TimeLimitTaskLogic())->getNotPerformedTask(); if (!empty($result)) { $this->logRecord('info', 'Find ' . count($result) . ' unprocessed task:'); foreach ($result as $value) { $this->task($value['key']); } } } catch (\Exception $e) { $this->logRecord('notice', 'An exception occurred during task status checking.'); } } /** * 生成訂閱消息丟失控制鍵 * @param boolean $always_output_screen 不管不否在調試模式都輸出到屏幕 * * @author dong.cx 2019-04-02 10:58 */ private function subscription_info_loss_control($always_output_screen = false) { try { $this->logRecord('info', 'Generates subscription information loss control keys.', true, $always_output_screen); $success = 0; $error = 0; $redis = new Redis(); for ($i = 1; $i <= $this->subscription_info_loss_control_key_max; $i++) { $redis->setex('#SILCK`' . $i, $i * 900, '') ? $success++ : $error++; } $this->logRecord('info', 'Generates loss control keys: ' . $this->subscription_info_loss_control_key_max . ' total, ' . $success . ' success, ' . $error . ' error', true, $always_output_screen); $redis->close(); } catch (\Exception $e) { $this->logRecord('notice', 'An exception occurs when the subscription information loss control key is created.', true, $always_output_screen); } } }
<?php /** * Description:拍賣倒計時操作 * Created by dong.cx * DateTime: 2019-03-18 10:04 */ namespace wlmis\logic\timeLimitTask; use think\Config; use think\Exception; use wlmis\common\redis\Redis; use wlmis\dao\addons\auction\AuctionGoodsDao; use wlmis\logic\oper\addons\auction\AuctionLogic; use wlmis\logic\timeLimitTask\base\TimeLimitBaseLogic; class AuctionCutDownLogic extends TimeLimitBaseLogic { private $auctionGoodsDao; public function __construct() { parent::__construct(); $this->auctionGoodsDao = new AuctionGoodsDao(); } /** * 拍賣結束, 更新拍品表/保單表 操作 * @param $params * * @author dong.cx 2019-03-18 18:39 */ public function auctionEndCutDown($params) { $auctionId = $params[0]; $auctionLogic = new AuctionLogic(); try { if (!$auctionId || !is_numeric($auctionId)) throw new Exception('Params error'); $goodsInfo = $this->auctionGoodsDao->load($auctionId, 'final_end_time'); if (!$goodsInfo) { $this->logRecord('notice', 'tb_auction_goods主鍵:' . $auctionId . '不存在'); } else { parent::startTrans(); // 拍賣結束 $result = $auctionLogic->auctionEnded($auctionId); if ($result['code'] == 0) { $this->logRecord('notice', $result['msg']); } // 更改mysql中鍵的狀態為已處理 $this->recording_process_mysql($this->key_splice(__FUNCTION__, [$auctionId])); // 刪除 redis 當前價 $redis = new Redis(); $redis->del('auction_gid@' . $auctionId . '@current_bid'); websocket_send($auctionId . 'bid/index', true, 2, '拍賣結束'); } parent::commit(); } catch (Exception $e) { parent::rollback(); $this->throw_message(__FUNCTION__, $e); } } /** * 拍賣交易結束 * 無訂單/未付款,不釋放保證金 * @param $params * * @author dong.cx 2019-03-18 20:15 */ public function dealCutDown($params) { $auctionId = $params[0]; $auctionLogic = new AuctionLogic(); try { parent::startTrans(); if (!$auctionId || !is_numeric($auctionId)) throw new Exception('Params error'); $goodsInfo = $this->auctionGoodsDao->load($auctionId, 'final_end_time'); if (!$goodsInfo) { $this->logRecord('notice', 'tb_auction_goods主鍵:' . $auctionId . '不存在'); } elseif (!$goodsInfo['final_end_time']) { $this->logRecord('notice', 'tb_auction_goods主鍵:' . $auctionId . '的拍品還未結束或最終結束時間為空'); } else { $result = $auctionLogic->checkStatus($auctionId); if ($result['code'] == 0) $this->logRecord('notice', $result['msg']); // 更改mysql中鍵的狀態為已處理 $this->recording_process_mysql($this->key_splice(__FUNCTION__, [$auctionId])); } parent::commit(); } catch (Exception $e) { parent::rollback(); $this->throw_message(__FUNCTION__, $e); } } /** * 創建拍賣結束倒計時任務 * @param $auctionId * @param int $ttl * * @throws Exception * @author dong.cx 2019-04-01 09:49 */ public function auction_end_countdown_create($auctionId, $ttl=0) { return $this->create('auctionEndCutDown', $ttl, [$auctionId]); } /** * 刪除拍賣結束倒計時任務 * @param int $auctionId 拍賣商品表主鍵 * * @return bool|int * @throws Exception * @author dong.cx 2019-04-01 10:08:49 */ public function auction_end_countdown_delete($auctionId) { return $this->del_key('auctionEndCutDown', [$auctionId]); } /** * 創建交易倒計時任務 * @param int $auctionId 拍賣商品表主鍵 * @param int $ttl 生存時間 * * @throws Exception * 異常代碼: * 500 redis操作失敗 * @author dong.cx 2019-03-22 15:36 */ public function deal_countdown_create($auctionId, $ttl=0) { $this->create('dealCutDown', $ttl + Config::get('auction_deal_limit_time'), [$auctionId]); } /** * 刪除交易倒計時任務 * @param int $auctionId 拍賣商品表主鍵 * * @return bool|int * @throws Exception * @author dong.cx 2019-03-22 15:36 */ public function deal_countdown_delete($auctionId) { return $this->del_key('countdown', [$auctionId]); } }
<?php /** * Created by dong.cx * Date: 2019/3/27 17:13 * Description: 時間限制任務基類 * 每一個子類繼承這個基類實現時間任務調度 * 子類中開放給Redis調度的函數設置訪問權限為protected,防止外部誤觸發 * 子類中其他開放給內部調用的訪問權限為public * ************************************************ * 存儲到Redis中的鍵名規則為:類名@方法名@參數...(參數可為空,多個參數則以@分隔) key_splice 函數可生成鍵 * 所有的參數通過一個數組傳入方法(一維索引數組,跟存儲函數 create 傳入參數時一樣) * 類名、方法名,盡量精簡,能節約帶寬以及Redis查詢速度 * 參數設計也盡量精簡,所有操作都在服務端內部完成,所以能用1個條件准確查詢數據庫的,不要用兩個條件查詢 * * 存儲鍵直接使用 create 方法,以秒為單位,會自動拼接鍵鍵 * 如果以毫秒為單位則 create_ms 方法 * ************************************************ */ namespace wlmis\logic\timeLimitTask\base; use think\Exception; use wlmis\common\redis\Redis; use wlmis\model\sys\TimeLimitTaskModel; use wlmis\logic\BaseLogic; class TimeLimitBaseLogic extends BaseLogic { use LogRecord; /** * Redis連接實例 * @var Redis */ protected $redis; /** * TimeLimitBaseLogic constructor. * @author dong.cx */ public function __construct() { parent::__construct(); $this->redis = new Redis(); } /** * 任務調度入口 * @param string $funcName 調用方法名 * @param array $params 傳遞參數 * @author: dong.cx */ public function call_func($funcName, $params = array()) { call_user_func(array($this, $funcName), $params); } /** * 鍵拼接 * 鍵用 @ 符號作為分隔符,所以方法名、參數中不可出現 * 鍵名規則中的類名會自動生成 * @param string $funcName 方法名 * @param array $params 參數(必須傳入一維索引數組,請勿傳入關聯數組,按照順序生成參數,關聯數組不保證順序) * @return string 返回鍵 * @author: dong.cx */ protected function key_splice($funcName, $params = array()) { $class = explode('\\', get_class($this)); $paramsStr = ''; foreach ($params as $value) { $paramsStr .= '@' . $value; } return $class[count($class) - 1] . '@' . $funcName . $paramsStr; } /** * 向Redis存儲鍵(延時單位秒) * 會自動將參數進行拼接,然后存入Redis * @param string $funcName 調用方法名 * @param int $ttl 延時(秒) * @param array $params 參數(必須傳入一維索引數組,請勿傳入關聯數組,按照順序生成參數,關聯數組不保證順序) * @throws Exception * ********************* * 異常代碼: * 500 redis操作失敗 * ********************* * @author: dong.cx */ public function create($funcName, $ttl = 0, $params = array()) { $key = $this->key_splice($funcName, $params); $this->recording_mysql($key, $ttl); if (!($this->redis->setex($key, $ttl, ''))) { throw new Exception('Redis存儲失敗', 500); } } /** * 向Redis存儲鍵(延時單位毫秒) * 會自動將參數進行拼接,然后存入Redis * @param string $funcName 調用方法名 * @param int $ttl 延時(毫秒) * @param array $params 參數(必須傳入一維索引數組,請勿傳入關聯數組,按照順序生成參數,關聯數組不保證順序) * @throws Exception * ********************* * 異常代碼: * 500 redis操作失敗 * ********************* * @author: dong.cx */ public function create_ms($funcName, $ttl = 0, $params = array()) { $key = $this->key_splice($funcName, $params); $this->recording_mysql($key, $ttl, true); if (!($this->redis->psetex($key, $ttl, ''))) { throw new Exception('Redis存儲失敗', 500); } } /** * 獲取指定鍵的剩余生存時間(秒) * @param string $funcName 任務方法名 * @param array $params 任務參數 * @return bool|int 如果為false,說明Redis連接失敗 * 如果為-1,說明改鍵不是定時鍵 * 如果為-2,說明鍵不存在(已消失) * 其他為剩余生存時間(秒) * @throws Exception * @author: dong.cx */ protected function getTTL($funcName, $params = array()) { $key = $this->key_splice($funcName, $params); return $this->redis->ttl($key); } /** * 獲取指定鍵的剩余生存時間(毫秒) * @param string $funcName 任務方法名 * @param array $params 任務參數 * @return bool|int 如果為false,說明Redis連接失敗 * 如果為-1,說明改鍵不是定時鍵 * 如果為-2,說明鍵不存在(已消失) * 其他為剩余生存時間(秒) * @throws Exception * @author: dong.cx */ protected function getPTTL($funcName, $params = array()) { $key = $this->key_splice($funcName, $params); return $this->redis->pttl($key); } /** * 刪除指定鍵 * *********************************************** * 刪除不會觸發事件,用於無用記錄的刪除 * 如生成支付訂單二次提交時刪除前面一個未處理任務。 * 一般在設計任務處理流程時需要考慮到無用任務的觸發,並進行規避,必要時進行主動刪除任務可以減輕服務器負擔 * 任務處理流程應該做到無用記錄的觸發不會影響到系統正常運行 * *********************************************** * @param $funcName * @param array $params * @return bool|int 返回false則Redis實例獲取失敗,連接不上,返回int則為影響的記錄條數 * @throws Exception * @author: dong.cx */ protected function del_key($funcName, $params = array()) { $key = $this->key_splice($funcName, $params); TimeLimitTaskModel::where('key', $key)->update([ 'sts' => 1 ]); return $this->redis->del($key); } /** * 記錄鍵到mysql中, * @param string $key 鍵 * @param int $ttl 觸發時間 * @param bool $mode 當為false時,觸發時間為秒,當為true時,觸發時間為毫秒 * @throws \think\db\exception\DataNotFoundException * @throws \think\db\exception\ModelNotFoundException * @throws \think\exception\DbException * @author: dong.cx */ private function recording_mysql($key, $ttl, $mode = false) { if ($mode) { // 這里說明 TTL 以毫秒為單位 $currentTime = bcmul(microtime(true), '1', 3); $endTime = bcadd($currentTime, bcdiv($ttl, '1000', 3), 3); } else { // 這里說明 TTL 以秒為單位 $currentTime = time(); $endTime = $currentTime + $ttl; } if (TimeLimitTaskModel::field('id')->where('key', $key)->find() !== null) { TimeLimitTaskModel::where('key', $key)->update([ 'status' => 0, 'start_time' => $currentTime, 'end_time' => $endTime ]); } else { TimeLimitTaskModel::create([ 'key' => $key, 'status' => 0, 'start_time' => $currentTime, 'end_time' => $endTime, 'sts' => 0 ]); } } /** * 更改鍵在mysql中的狀態為已處理 * @param $key * @author: dong.cx */ protected function recording_process_mysql($key) { $tlm = new TimeLimitTaskModel(); $tlm->where('key', $key)->update([ 'status' => 1 ]); } /** * 拋出錯誤信息 * @param string $funcName 出錯方法名(__FUNCTION__) * @param \Exception $e 錯誤信息 * @author: dong.cx */ protected function throw_message($funcName, \Exception $e) { $this->logRecord('error', 'The task logic has made an error:' . PHP_EOL . 'Class:' . get_class($this) . PHP_EOL . 'Method name:' . $funcName . PHP_EOL . 'File:' . $e->getFile() . PHP_EOL . 'Line: ' . $e->getLine() . PHP_EOL . 'Error Message:' . $e->getMessage() . PHP_EOL); } /** * 析構函數 * @author dong.cx */ public function __destruct() { $this->redis->close(); } }
✘ ~/Documents/card253 php tp5cornnew.php TimeLimitTask 【2019-04-08 11:40:02】ThinkPHP Version: 5.0.7 【2019-04-08 11:40:02】Redis host: 127.0.0.1 【2019-04-08 11:40:02】Connection succeeded 【2019-04-08 11:40:02】Generates subscription information loss control keys. 【2019-04-08 11:40:02】Generates loss control keys: 10 total, 10 success, 0 error 【2019-04-08 11:40:02】Start listening
使用:
只需要啟動腳本,
在需要的時候,新增任務即可
參考資料:
Redis實踐操作之—— keyspace notification(鍵空間通知)
本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。