From e91b4ba000f92da1cbe5c1aaf3e4fe657c43d31e Mon Sep 17 00:00:00 2001 From: Anton Samuelsson Date: Tue, 18 Jun 2019 13:54:26 +0000 Subject: [PATCH 01/12] Feature update --- LICENSE | 2 +- .../Pulchritudinous/Queue/Helper/Data.php | 2 +- .../Pulchritudinous/Queue/Model/Config.php | 2 +- .../Pulchritudinous/Queue/Model/Labour.php | 184 ++---------------- .../Queue/Model/Labour/Batch.php | 148 ++++++++++++++ .../Pulchritudinous/Queue/Model/Lock.php | 2 +- .../Pulchritudinous/Queue/Model/Lock/File.php | 2 +- .../Queue/Model/Lock/Interface.php | 2 +- .../Pulchritudinous/Queue/Model/Queue.php | 37 ++-- .../Queue/Model/Resource/Helper/Mysql4.php | 2 +- .../Queue/Model/Resource/Queue/Labour.php | 2 +- .../Resource/Queue/Labour/Collection.php | 2 +- .../Queue/Model/Shell/Server.php | 3 +- .../Queue/Model/Trait/Labour.php | 159 +++++++++++++++ .../Queue/Model/Trait/Queue.php | 4 +- .../Queue/Model/Worker/Abstract.php | 65 +++---- .../Queue/Model/Worker/Batch.php | 146 ++++++++++++++ .../Queue/Model/Worker/Config.php | 2 +- .../Queue/Model/Worker/Interface.php | 2 +- .../Model/Worker/Labour/Test/Exception.php | 2 +- .../Worker/Labour/Test/Recurring/Success.php | 2 +- .../Labour/Test/Reschedule/Exception.php | 2 +- .../Model/Worker/Labour/Test/Success.php | 2 +- .../Queue/RescheduleException.php | 2 +- .../Pulchritudinous/Queue/etc/config.xml | 4 +- .../Pulchritudinous/Queue/etc/worker.xml | 2 +- .../Queue/etc/worker.xml.sample | 2 +- .../pulchqueue_setup/upgrade-1.0.4-1.0.5.php | 25 +++ app/etc/modules/Pulchritudinous_Queue.xml | 2 +- .../Queue/Model/ConfigTest.php | 2 +- .../Queue/Model/Lock/DbTest.php | 2 +- .../Pulchritudinous/Queue/Model/LockTest.php | 2 +- .../Pulchritudinous/Queue/Model/QueueTest.php | 2 +- .../Queue/Model/Resource/LabourTest.php | 2 +- .../Queue/Model/Shell/ServerTest.php | 2 +- .../Queue/Model/Worker/ConfigTest.php | 2 +- shell/queue.php | 19 +- 37 files changed, 598 insertions(+), 248 deletions(-) create mode 100644 app/code/community/Pulchritudinous/Queue/Model/Labour/Batch.php create mode 100644 app/code/community/Pulchritudinous/Queue/Model/Trait/Labour.php create mode 100644 app/code/community/Pulchritudinous/Queue/Model/Worker/Batch.php create mode 100644 app/code/community/Pulchritudinous/Queue/sql/pulchqueue_setup/upgrade-1.0.4-1.0.5.php diff --git a/LICENSE b/LICENSE index 4935e0a..9f32696 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2018 Pulchritudinous +Copyright (c) 2019 Pulchritudinous Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/app/code/community/Pulchritudinous/Queue/Helper/Data.php b/app/code/community/Pulchritudinous/Queue/Helper/Data.php index cd970a0..b24789d 100644 --- a/app/code/community/Pulchritudinous/Queue/Helper/Data.php +++ b/app/code/community/Pulchritudinous/Queue/Helper/Data.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/app/code/community/Pulchritudinous/Queue/Model/Config.php b/app/code/community/Pulchritudinous/Queue/Model/Config.php index 03a2120..fae222e 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Config.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Config.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/app/code/community/Pulchritudinous/Queue/Model/Labour.php b/app/code/community/Pulchritudinous/Queue/Model/Labour.php index 08d8416..190b54f 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Labour.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Labour.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -93,6 +93,11 @@ class Pulchritudinous_Queue_Model_Labour */ use Pulchritudinous_Queue_Model_Trait_Queue; + /** + * Labour model trait. + */ + use Pulchritudinous_Queue_Model_Trait_Labour; + /** * Initial configuration. */ @@ -101,6 +106,7 @@ public function __construct() $this->_init('pulchqueue/queue_labour'); set_error_handler([$this, 'errorHandler']); + register_shutdown_function([$this, 'shutdownHandler']); } /** @@ -157,6 +163,16 @@ protected function _execute() $payload = $this->getPayload(true); $childLabour = $this->getChildLabour(); + if ('batch' === $config->getRule()) { + $collection = Mage::getModel('pulchqueue/labour') + ->getCollection() + ->addFieldToFilter('batch', ['eq' => $this->getId()]); + + $batch = Mage::getModel('pulchqueue', [$labour->getBatch(), $config->getWorker(), $collection]); + + $batch->execute(); + } + $model ->setLabour($this) ->setConfig($config) @@ -191,20 +207,6 @@ public function reschedule() $transaction = Mage::getModel('core/resource_transaction'); - if ($config->getRule() == 'batch') { - $queueCollection = $this->getBatchCollection(); - - $this->setChildLabour($queueCollection); - - foreach ($queueCollection as $bundle) { - if ($bundle->getId() != $this->getId()) { - $bundle->addData($data); - - $transaction->addObject($bundle); - } - } - } - $this->addData($data); $transaction->addObject($this); @@ -229,20 +231,6 @@ public function setAsFailed() 'finished_at' => time(), ]; - if ($config instanceof Varien_Object && $config->getRule() == 'batch') { - $queueCollection = $this->getBatchCollection(); - - $this->setChildLabour($queueCollection); - - foreach ($queueCollection as $bundle) { - if ($bundle->getId() != $this->getId()) { - $bundle->addData($data); - - $transaction->addObject($bundle); - } - } - } - $this->addData($data); $transaction->addObject($this); @@ -263,11 +251,6 @@ public function setAsUnknown() 'status' => self::STATUS_UNKNOWN, ]; - foreach ($this->getBatchCollection() as $bundle) { - $bundle->addData($data); - $transaction->addObject($bundle); - } - $this->addData($data); $transaction->addObject($this); @@ -288,11 +271,6 @@ public function setAsSkipped() 'status' => self::STATUS_SKIPPED, ]; - foreach ($this->getBatchCollection() as $bundle) { - $bundle->addData($data); - $transaction->addObject($bundle); - } - $this->addData($data); $transaction->addObject($this); @@ -320,20 +298,6 @@ protected function _beforeExecute() 'attempts' => $currentAttempts + 1, ]; - if ($config->getRule() == 'batch') { - $queueCollection = $this->getBatchCollection(); - - $this->setChildLabour($queueCollection); - - foreach ($queueCollection as $bundle) { - if ($bundle->getId() != $this->getId()) { - $bundle->addData($data); - - $transaction->addObject($bundle); - } - } - } - $this->addData($data); $transaction->addObject($this); @@ -367,11 +331,6 @@ public function setAsFinished() 'finished_at' => time(), ]; - foreach ($this->getBatchCollection() as $bundle) { - $bundle->addData($data); - $transaction->addObject($bundle); - } - $this->addData($data); $transaction->addObject($this); @@ -380,18 +339,6 @@ public function setAsFinished() return $this; } - /** - * Get batched labour collection. - * - * @return Pulchritudinous_Queue_Model_Resource_Queue_Labour_Collection - */ - public function getBatchCollection() - { - return Mage::getModel('pulchqueue/labour') - ->getCollection() - ->addFieldToFilter('parent_id', ['eq' => $this->getId()]); - } - /** * Get payload. * @@ -421,6 +368,8 @@ public function getPayload($asObject = false) */ public function getWorkerConfig() { + $this->applyWorkerConfig(); + return $this->_workerConfig; } @@ -476,100 +425,5 @@ public function applyWorkerConfig() return $this; } - - /** - * Handle any errors. - * - * @param integer $errNo - * @param string $errStr - * @param string $errFile - * @param integer $errLine - */ - public function errorHandler($errNo, $errStr, $errFile, $errLine) - { - $errno = $errNo & error_reporting(); - - if ($errno == 0) { - return false; - } - - if (!defined('E_STRICT')) { - define('E_STRICT', 2048); - } - - if (!defined('E_RECOVERABLE_ERROR')) { - define('E_RECOVERABLE_ERROR', 4096); - } - - if (!defined('E_DEPRECATED')) { - define('E_DEPRECATED', 8192); - } - - // PEAR specific message handling - if (stripos($errFile . $errStr, 'pear') !== false) { - // ignore strict and deprecated notices - if (($errno == E_STRICT) || ($errno == E_DEPRECATED)) { - return true; - } - // ignore attempts to read system files when open_basedir is set - if ($errno == E_WARNING && stripos($errStr, 'open_basedir') !== false) { - return true; - } - } - - $errorMessage = ''; - - switch($errno){ - case E_ERROR: - $errorMessage .= "Error"; - break; - case E_WARNING: - $errorMessage .= "Warning"; - break; - case E_PARSE: - $errorMessage .= "Parse Error"; - break; - case E_NOTICE: - $errorMessage .= "Notice"; - break; - case E_CORE_ERROR: - $errorMessage .= "Core Error"; - break; - case E_CORE_WARNING: - $errorMessage .= "Core Warning"; - break; - case E_COMPILE_ERROR: - $errorMessage .= "Compile Error"; - break; - case E_COMPILE_WARNING: - $errorMessage .= "Compile Warning"; - break; - case E_USER_ERROR: - $errorMessage .= "User Error"; - break; - case E_USER_WARNING: - $errorMessage .= "User Warning"; - break; - case E_USER_NOTICE: - $errorMessage .= "User Notice"; - break; - case E_STRICT: - $errorMessage .= "Strict Notice"; - break; - case E_RECOVERABLE_ERROR: - $errorMessage .= "Recoverable Error"; - break; - case E_DEPRECATED: - $errorMessage .= "Deprecated functionality"; - break; - default: - $errorMessage .= "Unknown error ($errno)"; - break; - } - - $errorMessage .= ": {$errStr} in {$errFile} on line {$errLine}"; - - Mage::log($errorMessage, Zend_Log::ERR); - } } diff --git a/app/code/community/Pulchritudinous/Queue/Model/Labour/Batch.php b/app/code/community/Pulchritudinous/Queue/Model/Labour/Batch.php new file mode 100644 index 0000000..e1a4f11 --- /dev/null +++ b/app/code/community/Pulchritudinous/Queue/Model/Labour/Batch.php @@ -0,0 +1,148 @@ + + + */ +class Pulchritudinous_Queue_Model_Labour_Batch + extends Pulchritudinous_Queue_Model_Labour_Batch_Abstract +{ + /** + * Batch ID. + * + * @var string + */ + protected $_id; + + /** + * Worker code. + * + * @var string + */ + protected $_worker; + + /** + * Labour collection. + * + * @var Pulchritudinous_Queue_Model_Resource_Queue_Labour_Collection + */ + protected $_collection; + + /** + * Labour model trait. + */ + use Pulchritudinous_Queue_Model_Trait_Labour; + + /** + * Initial configuration. + * + * @param string $id + * @param string $worker + * @param Pulchritudinous_Queue_Model_Resource_Queue_Labour_Collection $collection + */ + public function __construct($id, $worker, Pulchritudinous_Queue_Model_Resource_Queue_Labour_Collection $collection) + { + set_error_handler([$this, 'errorHandler']); + register_shutdown_function([$this, 'shutdownHandler']); + + $this->_id = $id; + $this->_worker = $worker; + $this->_collection = $collection; + } + + /** + * Get worker configuration. + * + * @return Varien_Object|false + */ + protected function _getWorkerConfig() + { + if (!$this->_workerConfig) { + $this->_workerConfig = Mage::getSingleton('pulchqueue/worker_config') + ->getWorkerConfigByName($this->_worker); + } + + return $this->_workerConfig; + } + + /** + * Execute labour. + */ + public function execute() + { + try { + $config = $this->_getWorkerConfig(); + + if (!($config instanceof Varien_Object)) { + Mage::throwException( + "Unable to execute batch job with ID {$this->_id} and worker code {$this->_worker}" + ); + } + + $model = $config->getWorkerModel(); + + $model::beforeBatchExecute($this); + + $this->_execute(); + + $model::afterBatchExecute($this); + } catch (Exception $e) { + Mage::logException($e); + } + } + + /** + * Execute labour. + * + * @return Pulchritudinous_Queue_Model_Labour_Batch + */ + protected function _execute() + { + $collection = $this->_collection; + + foreach ($collection as $labour) { + try { + $labour->execute(); + + if (Pulchritudinous_Queue_Model_Labour::STATUS_FAILED === $labour->getStatus()) { + throw new Exception('Reschedule'); + } + } catch (Exception $e) { + try { + $labour->setBatch(null)->save(); + $labour->reschedule(); + } catch (Exception $e) { + Mage::logException($e); + } + } + } + + return $this; + } +} + diff --git a/app/code/community/Pulchritudinous/Queue/Model/Lock.php b/app/code/community/Pulchritudinous/Queue/Model/Lock.php index 688aa41..f76d7f0 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Lock.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Lock.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/app/code/community/Pulchritudinous/Queue/Model/Lock/File.php b/app/code/community/Pulchritudinous/Queue/Model/Lock/File.php index 46b3f56..544b3e4 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Lock/File.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Lock/File.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/app/code/community/Pulchritudinous/Queue/Model/Lock/Interface.php b/app/code/community/Pulchritudinous/Queue/Model/Lock/Interface.php index e1269c1..bc096ab 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Lock/Interface.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Lock/Interface.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/app/code/community/Pulchritudinous/Queue/Model/Queue.php b/app/code/community/Pulchritudinous/Queue/Model/Queue.php index 3bb42ea..e48685f 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Queue.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Queue.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -53,6 +53,10 @@ public function add($worker, array $payload = [], $options = []) $config = $configModel->getWorkerConfigByName($worker); $labourModel = Mage::getModel('pulchqueue/labour'); + if (!$worker) { + return $this; + } + if (!$config) { Mage::throwException("Unable to find worker with name {$worker}"); } @@ -116,11 +120,18 @@ public function receive() $running = []; $pageNr = 0; $runningCollection = $this->getRunning(); + $runningWorkerCount = []; foreach ($runningCollection as $labour) { $identity = "{$labour->getWorker()}-{$labour->getIdentity()}"; $running[$identity] = $identity; + + if (!isset($runningWorkerCount[$labour->getWorker()])) { + $runningWorkerCount[$labour->getWorker()] = 0; + } + + $runningWorkerCount[$labour->getWorker()]++; } $queueCollection = $this->_getQueueCollection(); @@ -136,8 +147,11 @@ public function receive() ->load(); foreach ($queueCollection as $labour) { - $config = $configModel->getWorkerConfigByName($labour->getWorker()); - $identity = "{$labour->getWorker()}-{$labour->getIdentity()}"; + $config = $configModel->getWorkerConfigByName($labour->getWorker()); + $identity = "{$labour->getWorker()}-{$labour->getIdentity()}"; + $currentRunning = isset($runningWorkerCount[$labour->getWorker()]) + ? $runningWorkerCount[$labour->getWorker()] + : 0; if (!$config) { continue; @@ -149,6 +163,10 @@ public function receive() } } + if ($config->getLimit() && $config->getLimit() <= $currentRunning) { + continue; + } + return $this->_beforeReturn($labour, $config); } @@ -187,8 +205,10 @@ protected function _getQueueCollection() protected function _beforeReturn(Pulchritudinous_Queue_Model_Labour $labour, Varien_Object $config) { $transaction = Mage::getModel('core/resource_transaction'); + $id = uniqid('', true); $data = [ - 'status' => Pulchritudinous_Queue_Model_Labour::STATUS_DEPLOYED, + 'status' => Pulchritudinous_Queue_Model_Labour::STATUS_DEPLOYED, + 'batch' => $id, ]; if ($config->getRule() == 'batch') { @@ -198,14 +218,7 @@ protected function _beforeReturn(Pulchritudinous_Queue_Model_Labour $labour, Var foreach ($queueCollection as $bundle) { if ($bundle->getId() != $labour->getId()) { - $bundle->addData( - array_merge( - $data, - [ - 'parent_id' => $labour->getId(), - ] - ) - ); + $bundle->addData($data); $transaction->addObject($bundle); } diff --git a/app/code/community/Pulchritudinous/Queue/Model/Resource/Helper/Mysql4.php b/app/code/community/Pulchritudinous/Queue/Model/Resource/Helper/Mysql4.php index 2faae90..b06bba1 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Resource/Helper/Mysql4.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Resource/Helper/Mysql4.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/app/code/community/Pulchritudinous/Queue/Model/Resource/Queue/Labour.php b/app/code/community/Pulchritudinous/Queue/Model/Resource/Queue/Labour.php index c19360c..9273b52 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Resource/Queue/Labour.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Resource/Queue/Labour.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/app/code/community/Pulchritudinous/Queue/Model/Resource/Queue/Labour/Collection.php b/app/code/community/Pulchritudinous/Queue/Model/Resource/Queue/Labour/Collection.php index 8b774fe..b960691 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Resource/Queue/Labour/Collection.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Resource/Queue/Labour/Collection.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/app/code/community/Pulchritudinous/Queue/Model/Shell/Server.php b/app/code/community/Pulchritudinous/Queue/Model/Shell/Server.php index 1211399..c896ec8 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Shell/Server.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Shell/Server.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -120,6 +120,7 @@ protected function _startServer() */ protected function _canStart() { + return true; return $this->_getLockStorage()->setLock(); } diff --git a/app/code/community/Pulchritudinous/Queue/Model/Trait/Labour.php b/app/code/community/Pulchritudinous/Queue/Model/Trait/Labour.php new file mode 100644 index 0000000..0429182 --- /dev/null +++ b/app/code/community/Pulchritudinous/Queue/Model/Trait/Labour.php @@ -0,0 +1,159 @@ + + + */ +trait Pulchritudinous_Queue_Model_Trait_Labour +{ + /** + * Handle unexpected shutdown. + */ + public function shutdownHandler() + { + $error = error_get_last(); + + if (null === $error) { + return; + } + + list ($errNo, $errStr, $errFile, $errLine) = array_values($error); + + if ($this->getId()) { + $errLine .= " and caused by labour ID {$this->getId()}"; + } + + $this->errorHandler($errNo, $errStr, $errFile, $errLine); + + switch ($errNo){ + case E_ERROR: + case E_PARSE: + case E_CORE_ERROR: + case E_CORE_WARNING: + case E_COMPILE_ERROR: + case E_COMPILE_WARNING: + $this->setAsFailed(); + } + } + + /** + * Handle any errors. + * + * @param integer $errNo + * @param string $errStr + * @param string $errFile + * @param integer $errLine + */ + public function errorHandler($errNo, $errStr, $errFile, $errLine) + { + $errno = $errNo & error_reporting(); + + if ($errno == 0) { + return false; + } + + if (!defined('E_STRICT')) { + define('E_STRICT', 2048); + } + + if (!defined('E_RECOVERABLE_ERROR')) { + define('E_RECOVERABLE_ERROR', 4096); + } + + if (!defined('E_DEPRECATED')) { + define('E_DEPRECATED', 8192); + } + + // PEAR specific message handling + if (stripos($errFile . $errStr, 'pear') !== false) { + // ignore strict and deprecated notices + if (($errno == E_STRICT) || ($errno == E_DEPRECATED)) { + return true; + } + // ignore attempts to read system files when open_basedir is set + if ($errno == E_WARNING && stripos($errStr, 'open_basedir') !== false) { + return true; + } + } + + $errorMessage = ''; + + switch($errno){ + case E_ERROR: + $errorMessage .= "Error"; + break; + case E_WARNING: + $errorMessage .= "Warning"; + break; + case E_PARSE: + $errorMessage .= "Parse Error"; + break; + case E_NOTICE: + $errorMessage .= "Notice"; + break; + case E_CORE_ERROR: + $errorMessage .= "Core Error"; + break; + case E_CORE_WARNING: + $errorMessage .= "Core Warning"; + break; + case E_COMPILE_ERROR: + $errorMessage .= "Compile Error"; + break; + case E_COMPILE_WARNING: + $errorMessage .= "Compile Warning"; + break; + case E_USER_ERROR: + $errorMessage .= "User Error"; + break; + case E_USER_WARNING: + $errorMessage .= "User Warning"; + break; + case E_USER_NOTICE: + $errorMessage .= "User Notice"; + break; + case E_STRICT: + $errorMessage .= "Strict Notice"; + break; + case E_RECOVERABLE_ERROR: + $errorMessage .= "Recoverable Error"; + break; + case E_DEPRECATED: + $errorMessage .= "Deprecated functionality"; + break; + default: + $errorMessage .= "Unknown error ($errno)"; + break; + } + + $errorMessage .= ": {$errStr} in {$errFile} on line {$errLine}"; + + Mage::log($errorMessage, Zend_Log::ERR); + } +} + diff --git a/app/code/community/Pulchritudinous/Queue/Model/Trait/Queue.php b/app/code/community/Pulchritudinous/Queue/Model/Trait/Queue.php index 9a56fb1..e8e6f68 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Trait/Queue.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Trait/Queue.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -105,7 +105,7 @@ protected function _validateArrayData($data) { $return = []; - foreach ($data as $key => $value) { + foreach ((array) $data as $key => $value) { if (is_array($value)) { $return[$key] = $this->_validateArrayData($value); diff --git a/app/code/community/Pulchritudinous/Queue/Model/Worker/Abstract.php b/app/code/community/Pulchritudinous/Queue/Model/Worker/Abstract.php index 0460dc8..0759b2d 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Worker/Abstract.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Worker/Abstract.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -54,13 +54,6 @@ abstract class Pulchritudinous_Queue_Model_Worker_Abstract */ protected $_payload; - /** - * Child labour collection. - * - * @var Pulchritudinous_Queue_Model_Resource_Queue_Labour_Collection - */ - protected $_childLabour; - /** * Set labour model to worker. * @@ -123,16 +116,6 @@ public function setPayload(Varien_Object $payload) return $this; } - /** - * Get payload object. - * - * @return Varien_Object - */ - protected function _getPayload() - { - return $this->_payload; - } - /** * Get payload object. * @@ -144,27 +127,13 @@ public function getPayload() } /** - * Set child labour collection. - * - * @param Pulchritudinous_Queue_Model_Resource_Queue_Labour_Collection $children - * - * @return Pulchritudinous_Queue_Model_Worker_Abstract - */ - public function setChildLabour(Pulchritudinous_Queue_Model_Resource_Queue_Labour_Collection $children = null) - { - $this->_childLabour = $children; - - return $this; - } - - /** - * Get child labour collection. + * Get payload object. * - * @return Pulchritudinous_Queue_Model_Worker_Abstract|null + * @return Varien_Object */ - protected function _getChildLabour() + protected function _getPayload() { - return $this->_childLabour; + return $this->_payload; } /** @@ -187,6 +156,30 @@ public static function getRecurringOptions($worderConfig = []) return []; } + /** + * Called before batch job is executed. + * + * @param Pulchritudinous_Queue_Model_Labour_Batch $batch + * + * @return null + */ + public static function beforeBatchExecute(Pulchritudinous_Queue_Model_Labour_Batch $batch) + { + return null; + } + + /** + * Called after batch job is executed. + * + * @param Pulchritudinous_Queue_Model_Labour_Batch $batch + * + * @return null + */ + public static function afterBatchExecute(Pulchritudinous_Queue_Model_Labour_Batch $batch) + { + return null; + } + /** * Throw Reschedule Exception. * diff --git a/app/code/community/Pulchritudinous/Queue/Model/Worker/Batch.php b/app/code/community/Pulchritudinous/Queue/Model/Worker/Batch.php new file mode 100644 index 0000000..f630ff7 --- /dev/null +++ b/app/code/community/Pulchritudinous/Queue/Model/Worker/Batch.php @@ -0,0 +1,146 @@ + + + */ +class Pulchritudinous_Queue_Model_Wroker_Batch + extends Pulchritudinous_Queue_Model_Labour_Batch_Abstract +{ + /** + * Batch ID. + * + * @var string + */ + protected $_id; + + /** + * Worker code. + * + * @var string + */ + protected $_worker; + + /** + * Labour collection. + * + * @var Pulchritudinous_Queue_Model_Resource_Queue_Labour_Collection + */ + protected $_collection; + + /** + * Labour model trait. + */ + use Pulchritudinous_Queue_Model_Trait_Labour; + + /** + * Initial configuration. + */ + public function __construct(array $args) + { + list($id, $worker, $collection) = $arg; + + set_error_handler([$this, 'errorHandler']); + register_shutdown_function([$this, 'shutdownHandler']); + + $this->_id = $id; + $this->_worker = $worker; + $this->_collection = $collection; + } + + /** + * Get worker configuration. + * + * @return Varien_Object|false + */ + protected function _getWorkerConfig() + { + if (!$this->_workerConfig) { + $this->_workerConfig = Mage::getSingleton('pulchqueue/worker_config') + ->getWorkerConfigByName($this->_worker); + } + + return $this->_workerConfig; + } + + /** + * Execute labour. + */ + public function execute() + { + try { + $config = $this->_getWorkerConfig(); + + if (!($config instanceof Varien_Object)) { + Mage::throwException( + "Unable to execute batch job with ID {$this->_id} and worker code {$this->_worker}" + ); + } + + $model = $config->getWorkerModel(); + + $model::beforeBatchExecute($this); + + $this->_execute(); + + $model::afterBatchExecute($this); + } catch (Exception $e) { + Mage::logException($e); + } + } + + /** + * Execute labour. + * + * @return Pulchritudinous_Queue_Model_Labour_Batch + */ + protected function _execute() + { + $collection = $this->_collection; + + foreach ($collection as $labour) { + try { + $labour->execute(); + + if (Pulchritudinous_Queue_Model_Labour::STATUS_FAILED === $labour->getStatus()) { + throw new Exception('Reschedule'); + } + } catch (Exception $e) { + try { + $labour->setBatch(null)->save(); + $labour->reschedule(); + } catch (Exception $e) { + Mage::logException($e); + } + } + } + + return $this; + } +} + diff --git a/app/code/community/Pulchritudinous/Queue/Model/Worker/Config.php b/app/code/community/Pulchritudinous/Queue/Model/Worker/Config.php index 2a361f0..282302a 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Worker/Config.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Worker/Config.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/app/code/community/Pulchritudinous/Queue/Model/Worker/Interface.php b/app/code/community/Pulchritudinous/Queue/Model/Worker/Interface.php index 68312e7..d696a7b 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Worker/Interface.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Worker/Interface.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/app/code/community/Pulchritudinous/Queue/Model/Worker/Labour/Test/Exception.php b/app/code/community/Pulchritudinous/Queue/Model/Worker/Labour/Test/Exception.php index c96cdcc..9a83828 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Worker/Labour/Test/Exception.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Worker/Labour/Test/Exception.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/app/code/community/Pulchritudinous/Queue/Model/Worker/Labour/Test/Recurring/Success.php b/app/code/community/Pulchritudinous/Queue/Model/Worker/Labour/Test/Recurring/Success.php index 3b71874..4cb6f19 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Worker/Labour/Test/Recurring/Success.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Worker/Labour/Test/Recurring/Success.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/app/code/community/Pulchritudinous/Queue/Model/Worker/Labour/Test/Reschedule/Exception.php b/app/code/community/Pulchritudinous/Queue/Model/Worker/Labour/Test/Reschedule/Exception.php index 39edcdb..d84e0c7 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Worker/Labour/Test/Reschedule/Exception.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Worker/Labour/Test/Reschedule/Exception.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/app/code/community/Pulchritudinous/Queue/Model/Worker/Labour/Test/Success.php b/app/code/community/Pulchritudinous/Queue/Model/Worker/Labour/Test/Success.php index 1d7a9b5..45207a7 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Worker/Labour/Test/Success.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Worker/Labour/Test/Success.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/app/code/community/Pulchritudinous/Queue/RescheduleException.php b/app/code/community/Pulchritudinous/Queue/RescheduleException.php index b0f2fc9..c858def 100644 --- a/app/code/community/Pulchritudinous/Queue/RescheduleException.php +++ b/app/code/community/Pulchritudinous/Queue/RescheduleException.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/app/code/community/Pulchritudinous/Queue/etc/config.xml b/app/code/community/Pulchritudinous/Queue/etc/config.xml index 94d84fc..8afc130 100644 --- a/app/code/community/Pulchritudinous/Queue/etc/config.xml +++ b/app/code/community/Pulchritudinous/Queue/etc/config.xml @@ -3,7 +3,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -32,7 +32,7 @@ - 1.0.4 + 1.0.5 diff --git a/app/code/community/Pulchritudinous/Queue/etc/worker.xml b/app/code/community/Pulchritudinous/Queue/etc/worker.xml index 101dc79..997ecb1 100644 --- a/app/code/community/Pulchritudinous/Queue/etc/worker.xml +++ b/app/code/community/Pulchritudinous/Queue/etc/worker.xml @@ -3,7 +3,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/app/code/community/Pulchritudinous/Queue/etc/worker.xml.sample b/app/code/community/Pulchritudinous/Queue/etc/worker.xml.sample index aee01a4..168feb2 100644 --- a/app/code/community/Pulchritudinous/Queue/etc/worker.xml.sample +++ b/app/code/community/Pulchritudinous/Queue/etc/worker.xml.sample @@ -3,7 +3,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/app/code/community/Pulchritudinous/Queue/sql/pulchqueue_setup/upgrade-1.0.4-1.0.5.php b/app/code/community/Pulchritudinous/Queue/sql/pulchqueue_setup/upgrade-1.0.4-1.0.5.php new file mode 100644 index 0000000..d11d689 --- /dev/null +++ b/app/code/community/Pulchritudinous/Queue/sql/pulchqueue_setup/upgrade-1.0.4-1.0.5.php @@ -0,0 +1,25 @@ +startSetup(); + +$tableName = $this->getTable('pulchqueue/labour'); +$adapter = $this->getConnection(); + +$adapter->addColumn( + $tableName, + 'batch', + [ + 'type' => Varien_Db_Ddl_Table::TYPE_TEXT, + 'size' => 255, + 'nullable' => false, + 'after' => 'parent_id', + 'comment' => 'Batch ID' + ] +); + +$adapter->dropColumn( + $tableName, + 'parent_id' +); + +$this->endSetup(); + diff --git a/app/etc/modules/Pulchritudinous_Queue.xml b/app/etc/modules/Pulchritudinous_Queue.xml index d40a2ed..aa4c74d 100644 --- a/app/etc/modules/Pulchritudinous_Queue.xml +++ b/app/etc/modules/Pulchritudinous_Queue.xml @@ -3,7 +3,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/ConfigTest.php b/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/ConfigTest.php index fe6759f..e2c8a72 100644 --- a/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/ConfigTest.php +++ b/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/ConfigTest.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/Lock/DbTest.php b/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/Lock/DbTest.php index 654b79a..b0a6583 100644 --- a/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/Lock/DbTest.php +++ b/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/Lock/DbTest.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/LockTest.php b/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/LockTest.php index 621f94f..aa12c87 100644 --- a/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/LockTest.php +++ b/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/LockTest.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/QueueTest.php b/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/QueueTest.php index 03bc87e..17a6869 100644 --- a/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/QueueTest.php +++ b/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/QueueTest.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/Resource/LabourTest.php b/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/Resource/LabourTest.php index 148982d..3a67ab0 100644 --- a/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/Resource/LabourTest.php +++ b/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/Resource/LabourTest.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/Shell/ServerTest.php b/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/Shell/ServerTest.php index 3598104..3801c7d 100644 --- a/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/Shell/ServerTest.php +++ b/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/Shell/ServerTest.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/Worker/ConfigTest.php b/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/Worker/ConfigTest.php index 3ac98ff..32f4390 100644 --- a/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/Worker/ConfigTest.php +++ b/dev/tests/pulchqueue/src/tests/Pulchritudinous/Queue/Model/Worker/ConfigTest.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal diff --git a/shell/queue.php b/shell/queue.php index 1028dc7..88a500f 100644 --- a/shell/queue.php +++ b/shell/queue.php @@ -2,7 +2,7 @@ /** * The MIT License (MIT) * - * Copyright (c) 2018 Pulchritudinous + * Copyright (c) 2019 Pulchritudinous * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -175,7 +175,9 @@ protected function _runServer() ); } - sleep($configData->getPoll()); + if (!$server->canStartNext($processes->count())) { + sleep($configData->getPoll()); + } } } catch (Exception $e) { exit(0); @@ -220,7 +222,10 @@ public static function validateProcesses() foreach ($processes as $process) { if (!self::validateProcess($process)) { - proc_close($process->getResource()); + if (is_resource($process->getResource())) { + proc_close($process->getResource()); + } + $processes->removeItemByKey($process->getId()); } } @@ -242,6 +247,12 @@ public static function validateProcess($process) $timeout = $config->getTimeout(); $pid = $labour->getPid(); + if (null === $resource) { + $labour->setAsUnknown(); + + return false; + } + if (0 != $timeout && (time() - $process->getStarted()) > $timeout) { $labour->setAsUnknown(); @@ -261,7 +272,7 @@ public static function validateProcess($process) $status = proc_get_status($resource); - return $status['running']; + return (bool) @$status['running']; } /** From 948178a20df837476d20cf562a4985cc4040640d Mon Sep 17 00:00:00 2001 From: Anton Samuelsson Date: Tue, 18 Jun 2019 20:49:27 +0200 Subject: [PATCH 02/12] Fixed typo --- .../Queue/sql/pulchqueue_setup/upgrade-1.0.4-1.0.5.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/code/community/Pulchritudinous/Queue/sql/pulchqueue_setup/upgrade-1.0.4-1.0.5.php b/app/code/community/Pulchritudinous/Queue/sql/pulchqueue_setup/upgrade-1.0.4-1.0.5.php index d11d689..19aa935 100644 --- a/app/code/community/Pulchritudinous/Queue/sql/pulchqueue_setup/upgrade-1.0.4-1.0.5.php +++ b/app/code/community/Pulchritudinous/Queue/sql/pulchqueue_setup/upgrade-1.0.4-1.0.5.php @@ -9,8 +9,8 @@ 'batch', [ 'type' => Varien_Db_Ddl_Table::TYPE_TEXT, - 'size' => 255, - 'nullable' => false, + 'length' => 255, + 'nullable' => true, 'after' => 'parent_id', 'comment' => 'Batch ID' ] From a231b187075e9a92f08e784b68ef851a1ccd6ab8 Mon Sep 17 00:00:00 2001 From: Anton Samuelsson Date: Tue, 18 Jun 2019 20:50:01 +0200 Subject: [PATCH 03/12] Remove recover function due to its unreliable --- shell/queue.php | 27 +-------------------------- 1 file changed, 1 insertion(+), 26 deletions(-) diff --git a/shell/queue.php b/shell/queue.php index 88a500f..c6de2af 100644 --- a/shell/queue.php +++ b/shell/queue.php @@ -135,7 +135,7 @@ protected function _runServer() self::$configData = $configData; self::$server = $server; - self::$processes = self::initProcessCollection($queue); + self::$processes = new Varien_Data_Collection(); $processes = self::$processes; try { @@ -186,31 +186,6 @@ protected function _runServer() exit(0); } - /** - * Init process collection. - * - * @param Pulchritudinous_Queue_Model_Queue $queue - * - * @return Varien_Data_Collection - */ - public static function initProcessCollection(Pulchritudinous_Queue_Model_Queue $queue) - { - $collection = new Varien_Data_Collection(); - $runingCollection = $queue->getRunning(true); - - foreach ($runingCollection as $labour) { - $collection->addItem( - new Varien_Object([ - 'id' => $labour->getPid(), - 'started' => $labour->getStartedAt(), - 'labour' => $labour, - ]) - ); - } - - return $collection; - } - /** * Validate all labour processes. * From 8c47cabdc5c52fddc938606b4ea05f62016eb6e8 Mon Sep 17 00:00:00 2001 From: Anton Samuelsson Date: Tue, 18 Jun 2019 20:51:17 +0200 Subject: [PATCH 04/12] Moved more batch function to the new class --- .../Pulchritudinous/Queue/Model/Labour.php | 71 ++++++------------- .../Queue/Model/Labour/Batch.php | 48 +++++++++++-- 2 files changed, 66 insertions(+), 53 deletions(-) diff --git a/app/code/community/Pulchritudinous/Queue/Model/Labour.php b/app/code/community/Pulchritudinous/Queue/Model/Labour.php index 190b54f..6eaebd6 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Labour.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Labour.php @@ -168,18 +168,23 @@ protected function _execute() ->getCollection() ->addFieldToFilter('batch', ['eq' => $this->getId()]); - $batch = Mage::getModel('pulchqueue', [$labour->getBatch(), $config->getWorker(), $collection]); + $batch = Mage::getModel('pulchqueue', [ + $labour->getBatch(), + $labour->getPid(), + $config->getWorker(), + $collection + ]); $batch->execute(); + } else { + $model + ->setLabour($this) + ->setConfig($config) + ->setPayload($payload) + ->setChildLabour($childLabour) + ->execute(); } - $model - ->setLabour($this) - ->setConfig($config) - ->setPayload($payload) - ->setChildLabour($childLabour) - ->execute(); - return $this; } @@ -205,12 +210,7 @@ public function reschedule() 'execute_at' => $when, ]; - $transaction = Mage::getModel('core/resource_transaction'); - - $this->addData($data); - - $transaction->addObject($this); - $transaction->save(); + $this->addData($data)->save(); return $this; } @@ -222,19 +222,11 @@ public function reschedule() */ public function setAsFailed() { - $configModel = Mage::getSingleton('pulchqueue/worker_config'); - $config = $configModel->getWorkerConfigByName($this->getWorker()); - $transaction = Mage::getModel('core/resource_transaction'); - $data = [ + $this->addData([ 'status' => self::STATUS_FAILED, 'started_at' => time(), 'finished_at' => time(), - ]; - - $this->addData($data); - - $transaction->addObject($this); - $transaction->save(); + ])->save(); return $this; } @@ -246,15 +238,9 @@ public function setAsFailed() */ public function setAsUnknown() { - $transaction = Mage::getModel('core/resource_transaction'); - $data = [ + $this->addData([ 'status' => self::STATUS_UNKNOWN, - ]; - - $this->addData($data); - - $transaction->addObject($this); - $transaction->save(); + ])->save(); return $this; } @@ -266,15 +252,9 @@ public function setAsUnknown() */ public function setAsSkipped() { - $transaction = Mage::getModel('core/resource_transaction'); - $data = [ + $this->addData([ 'status' => self::STATUS_SKIPPED, - ]; - - $this->addData($data); - - $transaction->addObject($this); - $transaction->save(); + ])->save(); return $this; } @@ -289,19 +269,12 @@ protected function _beforeExecute() $configModel = Mage::getSingleton('pulchqueue/worker_config'); $config = $configModel->getWorkerConfigByName($this->getWorker()); $currentAttempts = ($this->getAttempts()) ? $this->getAttempts() : 0; - $transaction = Mage::getModel('core/resource_transaction'); - $data = [ + $this->addData([ 'status' => self::STATUS_RUNNING, 'started_at' => time(), - 'pid' => $this->getPid(), 'attempts' => $currentAttempts + 1, - ]; - - $this->addData($data); - - $transaction->addObject($this); - $transaction->save(); + ])->save(); return $this; } diff --git a/app/code/community/Pulchritudinous/Queue/Model/Labour/Batch.php b/app/code/community/Pulchritudinous/Queue/Model/Labour/Batch.php index e1a4f11..4103471 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Labour/Batch.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Labour/Batch.php @@ -39,6 +39,13 @@ class Pulchritudinous_Queue_Model_Labour_Batch */ protected $_id; + /** + * Process ID. + * + * @var string + */ + protected $_pid; + /** * Worker code. * @@ -61,20 +68,31 @@ class Pulchritudinous_Queue_Model_Labour_Batch /** * Initial configuration. * - * @param string $id - * @param string $worker - * @param Pulchritudinous_Queue_Model_Resource_Queue_Labour_Collection $collection + * @param array $args */ - public function __construct($id, $worker, Pulchritudinous_Queue_Model_Resource_Queue_Labour_Collection $collection) + public function __construct(array $args) { + list ($id, $pid, $worker, $collection) = $args; + set_error_handler([$this, 'errorHandler']); register_shutdown_function([$this, 'shutdownHandler']); $this->_id = $id; + $this->_pid = $pid; $this->_worker = $worker; $this->_collection = $collection; } + /** + * Get labour collection. + * + * @return Pulchritudinous_Queue_Model_Resource_Queue_Labour_Collection + */ + public function getCollection() + { + return $this->_collection; + } + /** * Get worker configuration. * @@ -107,6 +125,7 @@ public function execute() $model = $config->getWorkerModel(); $model::beforeBatchExecute($this); + $this->_beforeExecute(); $this->_execute(); @@ -116,6 +135,27 @@ public function execute() } } + /** + * Synchronize data before start. + * + * @return Pulchritudinous_Queue_Model_Labour + */ + protected function _beforeExecute() + { + $collection = $this->_collection; + $transaction = Mage::getModel('core/resource_transaction'); + $data = ['pid' => $this->_pid]; + + foreach ($collection as $labour) { + $labour->addData($data); + $transaction->addObject($labour); + } + + $transaction->save(); + + return $this; + } + /** * Execute labour. * From 15fdb64e0ebf7bdc12b3c8ddc192bb90b26f9485 Mon Sep 17 00:00:00 2001 From: Anton Samuelsson Date: Wed, 19 Jun 2019 11:20:03 +0000 Subject: [PATCH 05/12] Code cleanup and bugfixes for batch worker --- .../Pulchritudinous/Queue/Model/Labour.php | 23 ++++++++++++++----- .../Queue/Model/Labour/Batch.php | 18 +++++++++++++-- .../Pulchritudinous/Queue/Model/Queue.php | 23 +++++++++---------- .../Queue/Model/Resource/Queue/Labour.php | 2 +- 4 files changed, 45 insertions(+), 21 deletions(-) diff --git a/app/code/community/Pulchritudinous/Queue/Model/Labour.php b/app/code/community/Pulchritudinous/Queue/Model/Labour.php index 6eaebd6..7669845 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Labour.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Labour.php @@ -163,15 +163,15 @@ protected function _execute() $payload = $this->getPayload(true); $childLabour = $this->getChildLabour(); - if ('batch' === $config->getRule()) { + if ('batch' === $config->getRule() && true !== $this->getIsBatchLabour()) { $collection = Mage::getModel('pulchqueue/labour') ->getCollection() - ->addFieldToFilter('batch', ['eq' => $this->getId()]); + ->addFieldToFilter('batch', ['eq' => $this->getBatch()]); - $batch = Mage::getModel('pulchqueue', [ - $labour->getBatch(), - $labour->getPid(), - $config->getWorker(), + $batch = Mage::getModel('pulchqueue/labour_batch', [ + $this->getBatch(), + $this->getPid(), + $this->getWorker(), $collection ]); @@ -270,6 +270,10 @@ protected function _beforeExecute() $config = $configModel->getWorkerConfigByName($this->getWorker()); $currentAttempts = ($this->getAttempts()) ? $this->getAttempts() : 0; + if ('batch' === $config->getRule() && true !== $this->getIsBatchLabour()) { + return $this; + } + $this->addData([ 'status' => self::STATUS_RUNNING, 'started_at' => time(), @@ -286,6 +290,13 @@ protected function _beforeExecute() */ protected function _afterExecute() { + $configModel = Mage::getSingleton('pulchqueue/worker_config'); + $config = $configModel->getWorkerConfigByName($this->getWorker()); + + if ('batch' === $config->getRule() && true !== $this->getIsBatchLabour()) { + return $this; + } + $this->setAsFinished(); return $this; diff --git a/app/code/community/Pulchritudinous/Queue/Model/Labour/Batch.php b/app/code/community/Pulchritudinous/Queue/Model/Labour/Batch.php index 4103471..c332339 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Labour/Batch.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Labour/Batch.php @@ -30,7 +30,7 @@ * @author Anton Samuelsson */ class Pulchritudinous_Queue_Model_Labour_Batch - extends Pulchritudinous_Queue_Model_Labour_Batch_Abstract + extends Varien_Object { /** * Batch ID. @@ -93,6 +93,20 @@ public function getCollection() return $this->_collection; } + /** + * Set labour collection. + * + * @param Pulchritudinous_Queue_Model_Resource_Queue_Labour_Collection $collection + * + * @return Pulchritudinous_Queue_Model_Labour_Batch + */ + public function setCollection(Pulchritudinous_Queue_Model_Resource_Queue_Labour_Collection $collection) + { + $this->_collection = $collection; + + return $this; + } + /** * Get worker configuration. * @@ -167,7 +181,7 @@ protected function _execute() foreach ($collection as $labour) { try { - $labour->execute(); + $labour->setIsBatchLabour(true)->execute(); if (Pulchritudinous_Queue_Model_Labour::STATUS_FAILED === $labour->getStatus()) { throw new Exception('Reschedule'); diff --git a/app/code/community/Pulchritudinous/Queue/Model/Queue.php b/app/code/community/Pulchritudinous/Queue/Model/Queue.php index e48685f..349cc0a 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Queue.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Queue.php @@ -43,7 +43,7 @@ class Pulchritudinous_Queue_Model_Queue * @param array $payload * @param array $options * - * @return Pulchritudinous_Queue_Model_Labour + * @return Pulchritudinous_Queue_Model_Labour|true * * @throws Mage_Core_Exception */ @@ -80,7 +80,7 @@ public function add($worker, array $payload = [], $options = []) ); } - if ($config->getRule() == 'ignore') { + if ('ignore' === $config->getRule()) { $hasLabour = $labourModel->getResource()->hasUnprocessedWorkerIdentity( $worker, $options->getIdentity() @@ -89,7 +89,7 @@ public function add($worker, array $payload = [], $options = []) if ($hasLabour == true) { return true; } - } elseif ($config->getRule() == 'replace') { + } elseif ('replace' === $config->getRule()) { $labourModel->getResource()->setStatusOnUnprocessedByWorkerIdentity( 'replaced', $worker, @@ -157,7 +157,7 @@ public function receive() continue; } - if ($config->getRule() == 'wait') { + if ('wait' === $config->getRule()) { if (isset($running[$identity])) { continue; } @@ -205,18 +205,17 @@ protected function _getQueueCollection() protected function _beforeReturn(Pulchritudinous_Queue_Model_Labour $labour, Varien_Object $config) { $transaction = Mage::getModel('core/resource_transaction'); - $id = uniqid('', true); - $data = [ - 'status' => Pulchritudinous_Queue_Model_Labour::STATUS_DEPLOYED, - 'batch' => $id, - ]; + $data = ['status' => Pulchritudinous_Queue_Model_Labour::STATUS_DEPLOYED]; + + if ('batch' === $config->getRule()) { + $id = uniqid('', true); + $data['batch'] = $id; - if ($config->getRule() == 'batch') { - $queueCollection = $this->_getQueueCollection() + $collection = $this->_getQueueCollection() ->addFieldToFilter('identity', ['eq' => $labour->getIdentity()]) ->addFieldToFilter('worker', ['eq' => $labour->getWorker()]); - foreach ($queueCollection as $bundle) { + foreach ($collection as $bundle) { if ($bundle->getId() != $labour->getId()) { $bundle->addData($data); diff --git a/app/code/community/Pulchritudinous/Queue/Model/Resource/Queue/Labour.php b/app/code/community/Pulchritudinous/Queue/Model/Resource/Queue/Labour.php index 9273b52..549e5fd 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Resource/Queue/Labour.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Resource/Queue/Labour.php @@ -123,7 +123,7 @@ public function updateField(Pulchritudinous_Queue_Model_Labour $labour, $field, $data, new Zend_Db_Expr( $adapter->quoteInto('id = ?', (int) $labour->getId()) . ' OR ' . - $adapter->quoteInto('parent_id = ?', (int) $labour->getId()) + $adapter->quoteInto('batch = ?', (int) $labour->getBatch()) ) ); From 8b39bb0ba0fc6b06467a183c873acc4c0819925f Mon Sep 17 00:00:00 2001 From: Anton Samuelsson Date: Thu, 20 Jun 2019 06:42:02 +0000 Subject: [PATCH 06/12] Added shell command to list all workers --- shell/queue.php | 42 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/shell/queue.php b/shell/queue.php index c6de2af..c0106d9 100644 --- a/shell/queue.php +++ b/shell/queue.php @@ -69,7 +69,7 @@ public function __construct() { $this->_parseArgs(); - if (!$this->getArg('labour')) { + if (!$this->getArg('labour') && !$this->getArg('list') && !$this->getArg('help')) { register_shutdown_function([$this, 'exitStrategy']); } @@ -98,6 +98,11 @@ public function run() return $this->_runLabour($id); } + if ($this->getArg('list')) { + echo $this->_listWorkers(); + exit(0); + } + if ($this->getArg('help')) { echo $this->usageHelp(); exit(0); @@ -123,6 +128,41 @@ protected function _runLabour($id) exit(0); } + /** + * List workers. + * + * @return string + */ + protected function _listWorkers() + { + $workers = Mage::getSingleton('pulchqueue/worker_config')->getWorkers(); + $list = []; + $re = []; + + foreach ($workers as $worker) { + $rec = new Varien_Object((array) $worker->getRecurring()); + + $name = $worker->getWorkerName(); + $prio = $worker->getPriority(); + $pattern = $rec->getPattern(); + + $list[$name] = $prio; + $re[$name] = $pattern; + } + + asort($list); + + $table = new Zend_Text_Table(['columnWidths' => [10, 50, 20]]); + $table->appendRow(['Prio', 'Worker', 'Regex pattern']); + + foreach ($list as $name => $prio) { + $pattern = $re[$name]; + $table->appendRow([(string) $prio, (string) $name, (string) $pattern]); + } + + return (string) $table; + } + /** * Run server. */ From f7b5cc50dfe5a71e184e3e5f13df175ad7b0e824 Mon Sep 17 00:00:00 2001 From: Anton Samuelsson Date: Thu, 20 Jun 2019 07:06:05 +0000 Subject: [PATCH 07/12] Ensure labors reset some data before reshedule --- .../community/Pulchritudinous/Queue/Model/Labour/Batch.php | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/app/code/community/Pulchritudinous/Queue/Model/Labour/Batch.php b/app/code/community/Pulchritudinous/Queue/Model/Labour/Batch.php index c332339..b153d34 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Labour/Batch.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Labour/Batch.php @@ -188,7 +188,9 @@ protected function _execute() } } catch (Exception $e) { try { - $labour->setBatch(null)->save(); + $labour->setBatch(null); + $labour->setFinishedAt(null); + $labour->setStartedAt(null); $labour->reschedule(); } catch (Exception $e) { Mage::logException($e); From 6b2b38b82d0c8684978abfdbf19a8d7ff7832994 Mon Sep 17 00:00:00 2001 From: Anton Samuelsson Date: Thu, 20 Jun 2019 07:08:07 +0000 Subject: [PATCH 08/12] Changed default attempts to 1 --- app/code/community/Pulchritudinous/Queue/etc/worker.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/code/community/Pulchritudinous/Queue/etc/worker.xml b/app/code/community/Pulchritudinous/Queue/etc/worker.xml index 997ecb1..ca84729 100644 --- a/app/code/community/Pulchritudinous/Queue/etc/worker.xml +++ b/app/code/community/Pulchritudinous/Queue/etc/worker.xml @@ -91,7 +91,7 @@ 100 wait 0 - 0 + 1 0 60 pulchqueue_worker.log From 909aa4ab1cd602d4dd579b544b90162a14e0a8c6 Mon Sep 17 00:00:00 2001 From: Anton Samuelsson Date: Thu, 20 Jun 2019 07:35:47 +0000 Subject: [PATCH 09/12] Remove the posibility to execute unsaved labours --- app/code/community/Pulchritudinous/Queue/Model/Labour.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/app/code/community/Pulchritudinous/Queue/Model/Labour.php b/app/code/community/Pulchritudinous/Queue/Model/Labour.php index 7669845..684a5df 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Labour.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Labour.php @@ -130,6 +130,10 @@ public function loadByWorkerIdentity($worker, $identity = '') */ public function execute() { + if (!$labour->getId()) { + Mage::throwException("Unable to load labour"); + } + try { if (!($this->getWorkerConfig() instanceof Varien_Object)) { Mage::throwException( From 14bf90b58a1c684ff03a4b1405cd3be83abfe2d7 Mon Sep 17 00:00:00 2001 From: Anton Samuelsson Date: Mon, 1 Jul 2019 18:17:28 +0200 Subject: [PATCH 10/12] Removed unused variable --- shell/queue.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/shell/queue.php b/shell/queue.php index c0106d9..ffd97bc 100644 --- a/shell/queue.php +++ b/shell/queue.php @@ -78,8 +78,7 @@ public function __construct() Mage::app($this->_appCode, $this->_appType); } - $this->_factory = new Mage_Core_Model_Factory(); - $this->_shellFile = __FILE__; + $this->_shellFile = __FILE__; set_error_handler([$this, 'errorHandler']); From 07a0270bc04d89fac476c91ca6c83d17f1e59c6f Mon Sep 17 00:00:00 2001 From: Anton Samuelsson Date: Tue, 9 Jul 2019 10:41:46 +0000 Subject: [PATCH 11/12] Added interation for queue labours --- .../Pulchritudinous/Queue/Model/Iterator.php | 211 ++++++++++++++++++ .../Pulchritudinous/Queue/Model/Queue.php | 61 +++-- .../Queue/Model/Shell/Server.php | 19 +- shell/queue.php | 34 ++- 4 files changed, 271 insertions(+), 54 deletions(-) create mode 100644 app/code/community/Pulchritudinous/Queue/Model/Iterator.php diff --git a/app/code/community/Pulchritudinous/Queue/Model/Iterator.php b/app/code/community/Pulchritudinous/Queue/Model/Iterator.php new file mode 100644 index 0000000..28ff0ac --- /dev/null +++ b/app/code/community/Pulchritudinous/Queue/Model/Iterator.php @@ -0,0 +1,211 @@ + + + */ +class Pulchritudinous_Queue_Model_Iterator + implements Iterator +{ + /** + * Number of rows collected each time. + * + * @var integer + */ + const PAGE_SIZE = 50; + + /** + * Entity collection. + * + * @var Varien_Data_Collection + */ + protected $_collection; + + /** + * Current item. + * + * @var Varien_Object|null + */ + protected $_data; + + /** + * Current slice. + * + * @var array + */ + protected $_slice = []; + + /** + * Is initialized. + * + * @var boolean + */ + protected $_initiated = false; + + /** + * Is valid. + * + * @var boolean + */ + protected $_isValid = false; + + /** + * Current slice index. + * + * @var integer + */ + protected $_index = 0; + + /** + * Current page. + * + * @var integer + */ + protected $_page; + + /** + * Initial page. + * + * @var integer + */ + protected $_initPage; + + /** + * Last page. + * + * @var integer + */ + protected $_lastPage; + + /** + * Initialize collection iterator. + * + * @param Varien_Data_Collection $collection + * + * @throws Mage_Core_Exception + */ + public function __construct(Varien_Data_Collection $collection) + { + $collection->setPageSize(self::PAGE_SIZE); + + $this->_initPage = $collection->getCurPage(); + $this->_page = $collection->getCurPage(); + $this->_lastPage = $collection->getLastPageNumber(); + + $this->_collection = $collection; + } + + /** + * Load more data into slice. + * + * @return Pulchritudinous_Queue_Model_Iterator + */ + protected function _loadMore() + { + $this->_index = 0; + $this->_slice = []; + + if ($this->_page > $this->_lastPage) { + return $this; + } + + $collection = $this->_collection + ->setCurPage(++$this->_page) + ->load(); + + foreach ($collection as $item) { + $this->_slice[] = $item; + } + + $this->_lastPage = $this->_collection + ->getLastPageNumber(); + + $this->_collection->clear(); + + return $this; + } + + /** + * Reset collection to initial state. + */ + public function rewind() + { + $this->next(); + $this->_collection->setCurPage($this->_initPage); + } + + /** + * Return current item. + * + * @return Varien_Object|null + */ + public function current() + { + return ($this->valid()) ? $this->_data : null; + } + + /** + * This method returns the current entity id. + * + * @return integer + */ + public function key() + { + return ($this->valid()) ? $this->_data->getId() : null; + } + + /** + * Get next entity. + * + * @return boolean + */ + public function next() + { + $this->_data = next($this->_slice); + + if (false === $this->_data) { + $this->_loadMore(); + $this->_data = next($this->_slice); + } + + $this->_isValid = (false !== $this->_data); + + return $this->_data; + } + + /** + * This method checks if the next row is a valid row. + * + * @return boolean + */ + public function valid() + { + return $this->_isValid; + } +} + diff --git a/app/code/community/Pulchritudinous/Queue/Model/Queue.php b/app/code/community/Pulchritudinous/Queue/Model/Queue.php index 349cc0a..0353872 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Queue.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Queue.php @@ -112,15 +112,19 @@ public function add($worker, array $payload = [], $options = []) /** * Receive next job from the queue. * + * @param integer $qty + * * @return Pulchritudinous_Queue_Model_Labour|false */ - public function receive() + public function receive($qty = 1) { + $qty = max(1, (int) $qty); $configModel = Mage::getSingleton('pulchqueue/worker_config'); $running = []; $pageNr = 0; $runningCollection = $this->getRunning(); $runningWorkerCount = []; + $labours = []; foreach ($runningCollection as $labour) { $identity = "{$labour->getWorker()}-{$labour->getIdentity()}"; @@ -134,47 +138,42 @@ public function receive() $runningWorkerCount[$labour->getWorker()]++; } - $queueCollection = $this->_getQueueCollection(); - - $queueCollection->setPageSize(50); - - $pages = $queueCollection->getLastPageNumber(); - $pageNr = 1; + $queueCollection = $this->_getQueueCollection(); + $iterator = Mage::getModel('pulchqueue/iterator', $queueCollection); - do { - $queueCollection - ->setCurPage($pageNr) - ->load(); + foreach ($iterator as $labour) { + $config = $configModel->getWorkerConfigByName($labour->getWorker()); + $identity = "{$labour->getWorker()}-{$labour->getIdentity()}"; + $currentRunning = isset($runningWorkerCount[$labour->getWorker()]) + ? $runningWorkerCount[$labour->getWorker()] + : 0; - foreach ($queueCollection as $labour) { - $config = $configModel->getWorkerConfigByName($labour->getWorker()); - $identity = "{$labour->getWorker()}-{$labour->getIdentity()}"; - $currentRunning = isset($runningWorkerCount[$labour->getWorker()]) - ? $runningWorkerCount[$labour->getWorker()] - : 0; + if (!$config) { + continue; + } - if (!$config) { + if ($config->getRule() == 'wait') { + if (isset($running[$identity])) { continue; } + } - if ('wait' === $config->getRule()) { - if (isset($running[$identity])) { - continue; - } - } + if ($config->getLimit() && $config->getLimit() <= $currentRunning) { + continue; + } - if ($config->getLimit() && $config->getLimit() <= $currentRunning) { - continue; - } + $labours[] = $this->_beforeReturn($labour, $config); - return $this->_beforeReturn($labour, $config); + if (count($labours) >= $qty) { + break; } + } - $pageNr++; - $queueCollection->clear(); - } while ($pageNr <= $pages); + if (empty($labours)) { + return false; + } - return false; + return $labours; } /** diff --git a/app/code/community/Pulchritudinous/Queue/Model/Shell/Server.php b/app/code/community/Pulchritudinous/Queue/Model/Shell/Server.php index c896ec8..948992b 100644 --- a/app/code/community/Pulchritudinous/Queue/Model/Shell/Server.php +++ b/app/code/community/Pulchritudinous/Queue/Model/Shell/Server.php @@ -145,16 +145,25 @@ protected function _getLockStorage() */ public function canStartNext($processCount) { - $configModel = Mage::getSingleton('pulchqueue/config'); - $configData = $configModel->getQueueConfig(); - - if ($processCount < $configData->getThreads()) { + if (0 !== $this->canReceiveCount($processCount)) { return true; } - return false; } + /** + * Can receive number of labours. + * + * @return boolean + */ + public function canReceiveCount($processCount) + { + $configModel = Mage::getSingleton('pulchqueue/config'); + $configData = $configModel->getQueueConfig(); + + return max(0, $configData->getThreads() - $processCount); + } + /** * Add recurring labours to queue. * diff --git a/shell/queue.php b/shell/queue.php index ffd97bc..d32eade 100644 --- a/shell/queue.php +++ b/shell/queue.php @@ -188,34 +188,32 @@ protected function _runServer() continue; } - $labour = $queue->receive(); + $labours = $queue->receive($server->canReceiveCount($processes->count())); if ($labour === false) { sleep($configData->getPoll()); continue; } - $resource = $server->startChildProcess($labour); + foreach ($labours as $labour) { + $resource = $server->startChildProcess($labour); - $validateProcesses = @self::validateProcess($resource); + $validateProcesses = @self::validateProcess($resource); - if ($validateProcesses) { - $status = proc_get_status($resource); + if ($validateProcesses) { + $status = proc_get_status($resource); - $labour->getResource()->updateField($labour, 'pid', $status['pid']); + $labour->getResource()->updateField($labour, 'pid', $status['pid']); - $processes->addItem( - new Varien_Object([ - 'id' => $status['pid'], - 'resource' => $resource, - 'labour' => $labour, - 'started' => time(), - ]) - ); - } - - if (!$server->canStartNext($processes->count())) { - sleep($configData->getPoll()); + $processes->addItem( + new Varien_Object([ + 'id' => $status['pid'], + 'resource' => $resource, + 'labour' => $labour, + 'started' => time(), + ]) + ); + } } } } catch (Exception $e) { From c9c83bb511040455e8b383c108cc471fb1b064e7 Mon Sep 17 00:00:00 2001 From: Anton Samuelsson Date: Tue, 9 Jul 2019 12:13:49 +0000 Subject: [PATCH 12/12] Fixed typo --- shell/queue.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shell/queue.php b/shell/queue.php index d32eade..ab9bc93 100644 --- a/shell/queue.php +++ b/shell/queue.php @@ -190,7 +190,7 @@ protected function _runServer() $labours = $queue->receive($server->canReceiveCount($processes->count())); - if ($labour === false) { + if ($labours === false) { sleep($configData->getPoll()); continue; }