PMCORE-949 Use of buffered query on cron's calculate parameter/task exhausts php's memory limit
This commit is contained in:
committed by
Julio Cesar Laura Avendaño
parent
c48f006c6a
commit
809a1feeef
@@ -0,0 +1,88 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace Tests\unit\workflow\engine\src\ProcessMaker\Util;
|
||||||
|
|
||||||
|
use ProcessMaker\Util\BatchProcessWithIndexes;
|
||||||
|
use Tests\TestCase;
|
||||||
|
|
||||||
|
class BatchProcessWithIndexesTest extends TestCase
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Testing object.
|
||||||
|
* @var BatchProcessWithIndexesTest
|
||||||
|
*/
|
||||||
|
protected $batchProcessWithIndexes = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set up method.
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public function setUp(): void
|
||||||
|
{
|
||||||
|
parent::setUp();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tear down method.
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public function tearDown(): void
|
||||||
|
{
|
||||||
|
parent::tearDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test the setLimit() method.
|
||||||
|
* @test
|
||||||
|
* @covers \ProcessMaker\Util\BatchProcessWithIndexes::setLimit()
|
||||||
|
*/
|
||||||
|
public function it_should_test_setLimit_method()
|
||||||
|
{
|
||||||
|
$this->batchProcessWithIndexes = new BatchProcessWithIndexes(1);
|
||||||
|
$result = $this->batchProcessWithIndexes->setLimit(1);
|
||||||
|
|
||||||
|
$this->assertInstanceOf(BatchProcessWithIndexes::class, $result);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test the process() method.
|
||||||
|
* @test
|
||||||
|
* @covers \ProcessMaker\Util\BatchProcessWithIndexes::process()
|
||||||
|
*/
|
||||||
|
public function it_should_test_process_method()
|
||||||
|
{
|
||||||
|
//for 10 elements will be expect the next indexes
|
||||||
|
$expected = [
|
||||||
|
[0, 2], [2, 2], [4, 2], [6, 2], [8, 2]
|
||||||
|
];
|
||||||
|
|
||||||
|
$size = 10;
|
||||||
|
$limit = 2;
|
||||||
|
$result = [];
|
||||||
|
$this->batchProcessWithIndexes = new BatchProcessWithIndexes($size);
|
||||||
|
$this->batchProcessWithIndexes->setLimit($limit);
|
||||||
|
$this->batchProcessWithIndexes->process(function ($start, $limit) use (&$result) {
|
||||||
|
$result[] = [$start, $limit];
|
||||||
|
});
|
||||||
|
|
||||||
|
$this->assertEquals(count($expected), count($result));
|
||||||
|
$this->assertEquals(json_encode($expected), json_encode($result));
|
||||||
|
|
||||||
|
//for 17 elements will be expect the next indexes
|
||||||
|
$expected = [
|
||||||
|
[0, 3], [3, 3], [6, 3], [9, 3], [12, 3], [15, 3]
|
||||||
|
];
|
||||||
|
|
||||||
|
$size = 17;
|
||||||
|
$limit = 3;
|
||||||
|
$result = [];
|
||||||
|
$this->batchProcessWithIndexes = new BatchProcessWithIndexes($size);
|
||||||
|
$this->batchProcessWithIndexes->setLimit($limit);
|
||||||
|
$this->batchProcessWithIndexes->process(function ($start, $limit) use (&$result) {
|
||||||
|
$result[] = [$start, $limit];
|
||||||
|
});
|
||||||
|
|
||||||
|
$this->assertEquals(count($expected), count($result));
|
||||||
|
$this->assertEquals(json_encode($expected), json_encode($result));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -7,6 +7,7 @@ use ProcessMaker\Core\JobsManager;
|
|||||||
use ProcessMaker\Core\System;
|
use ProcessMaker\Core\System;
|
||||||
use ProcessMaker\Model\Application;
|
use ProcessMaker\Model\Application;
|
||||||
use ProcessMaker\Model\Fields;
|
use ProcessMaker\Model\Fields;
|
||||||
|
use ProcessMaker\Util\BatchProcessWithIndexes;
|
||||||
|
|
||||||
require_once 'classes/model/om/BaseAdditionalTables.php';
|
require_once 'classes/model/om/BaseAdditionalTables.php';
|
||||||
|
|
||||||
@@ -782,19 +783,17 @@ class AdditionalTables extends BaseAdditionalTables
|
|||||||
$reportTableBatchRegeneration = $config['report_table_batch_regeneration'];
|
$reportTableBatchRegeneration = $config['report_table_batch_regeneration'];
|
||||||
|
|
||||||
// Initializing more variables
|
// Initializing more variables
|
||||||
$size = $n;
|
$batch = new BatchProcessWithIndexes($n);
|
||||||
$start = 0;
|
$batch->setLimit($reportTableBatchRegeneration);
|
||||||
$limit = $reportTableBatchRegeneration;
|
|
||||||
|
|
||||||
// Creating jobs
|
// Creating jobs
|
||||||
for ($i = 1; $start < $size; $i++) {
|
$batch->process(function ($start, $limit) use ($workspace, $tableName, $type, $processUid, $gridKey, $addTabUid) {
|
||||||
$closure = function () use ($workspace, $tableName, $type, $processUid, $gridKey, $addTabUid, $start, $limit) {
|
$closure = function () use ($workspace, $tableName, $type, $processUid, $gridKey, $addTabUid, $start, $limit) {
|
||||||
$workspaceTools = new WorkspaceTools($workspace);
|
$workspaceTools = new WorkspaceTools($workspace);
|
||||||
$workspaceTools->generateDataReport($tableName, $type, $processUid, $gridKey, $addTabUid, $start, $limit);
|
$workspaceTools->generateDataReport($tableName, $type, $processUid, $gridKey, $addTabUid, $start, $limit);
|
||||||
};
|
};
|
||||||
JobsManager::getSingleton()->dispatch(GenerateReportTable::class, $closure);
|
JobsManager::getSingleton()->dispatch(GenerateReportTable::class, $closure);
|
||||||
$start = $i * $limit;
|
});
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -1,32 +1,9 @@
|
|||||||
<?php
|
<?php
|
||||||
/**
|
|
||||||
* AppDelegation.php
|
|
||||||
*
|
|
||||||
* @package workflow.engine.classes.model
|
|
||||||
*
|
|
||||||
* ProcessMaker Open Source Edition
|
|
||||||
* Copyright (C) 2004 - 2011 Colosa Inc.
|
|
||||||
*
|
|
||||||
* This program is free software: you can redistribute it and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License as
|
|
||||||
* published by the Free Software Foundation, either version 3 of the
|
|
||||||
* License, or (at your option) any later version.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful,
|
|
||||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
* GNU Affero General Public License for more details.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*
|
|
||||||
* For more information, contact Colosa Inc, 2566 Le Jeune Rd.,
|
|
||||||
* Coral Gables, FL, 33134, USA, or email info@colosa.com.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
|
|
||||||
|
use Illuminate\Database\Eloquent\Builder;
|
||||||
use ProcessMaker\Model\Delegation;
|
use ProcessMaker\Model\Delegation;
|
||||||
use ProcessMaker\Plugins\PluginRegistry;
|
use ProcessMaker\Plugins\PluginRegistry;
|
||||||
|
use ProcessMaker\Util\BatchProcessWithIndexes;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Skeleton subclass for representing a row from the 'APP_DELEGATION' table.
|
* Skeleton subclass for representing a row from the 'APP_DELEGATION' table.
|
||||||
@@ -569,35 +546,97 @@ class AppDelegation extends BaseAppDelegation
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//usually this function is called when routing in the flow, so by default cron =0
|
/**
|
||||||
public function calculateDuration($cron = 0)
|
* Usually this function is called when routing in the flow, so by default cron = 0
|
||||||
|
* @param int $cron
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public function calculateDuration($cron = 0): void
|
||||||
{
|
{
|
||||||
$this->writeFileIfCalledFromCronForCalculateDuration($cron);
|
$this->writeFileIfCalledFromCronForCalculateDuration($cron);
|
||||||
$this->patchDataWithValuesForCalculateDuration();
|
$this->patchDataWithValuesForCalculateDuration();
|
||||||
$rs = $this->recordSetForCalculateDuration();
|
|
||||||
$rs->next();
|
|
||||||
$row = $rs->getRow();
|
|
||||||
$i = 0;
|
|
||||||
$calendar = new Calendar();
|
|
||||||
$now = new DateTime();
|
|
||||||
while (is_array($row)) {
|
|
||||||
$oAppDel = AppDelegationPeer::retrieveByPk($row['APP_UID'], $row['DEL_INDEX']);
|
|
||||||
$calendar = new Calendar();
|
|
||||||
$calendar->getCalendar($row['USR_UID'], $row['PRO_UID'], $row['TAS_UID']);
|
|
||||||
$calData = $calendar->getCalendarData();
|
|
||||||
$calculatedValues = $this->getValuesToStoreForCalculateDuration($row, $calendar, $calData, $now);
|
|
||||||
|
|
||||||
$oAppDel->setDelStarted($calculatedValues['isStarted']);
|
$builder = $this->getAppDelegationTask();
|
||||||
$oAppDel->setDelFinished($calculatedValues['isFinished']);
|
$count = $builder->count();
|
||||||
$oAppDel->setDelDelayed($calculatedValues['isDelayed']);
|
$now = new DateTime();
|
||||||
$oAppDel->setDelQueueDuration($calculatedValues['queueTime']);
|
|
||||||
$oAppDel->setDelDelayDuration($calculatedValues['delayTime']);
|
$batch = new BatchProcessWithIndexes($count);
|
||||||
$oAppDel->setDelDuration($calculatedValues['durationTime']);
|
$batch->process(function ($start, $limit) use ($builder, $now) {
|
||||||
$oAppDel->setAppOverduePercentage($calculatedValues['percentDelay']);
|
$results = $builder
|
||||||
$RES = $oAppDel->save();
|
->offset($start)
|
||||||
$rs->next();
|
->limit($limit)
|
||||||
$row = $rs->getRow();
|
->get();
|
||||||
|
foreach ($results as $object) {
|
||||||
|
$appDelegationTask = $object->toArray();
|
||||||
|
$this->updateAppDelegationWithCalendar($appDelegationTask, $now);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get APP_DELEGATION and TASK tables where 'started' and 'finished' are 0.
|
||||||
|
* @return iterable
|
||||||
|
*/
|
||||||
|
private function getAppDelegationTask(): Builder
|
||||||
|
{
|
||||||
|
$columns = [
|
||||||
|
'APP_DELEGATION.APP_UID',
|
||||||
|
'APP_DELEGATION.DEL_INDEX',
|
||||||
|
'APP_DELEGATION.USR_UID',
|
||||||
|
'APP_DELEGATION.PRO_UID',
|
||||||
|
'APP_DELEGATION.TAS_UID',
|
||||||
|
'APP_DELEGATION.DEL_DELEGATE_DATE',
|
||||||
|
'APP_DELEGATION.DEL_INIT_DATE',
|
||||||
|
'APP_DELEGATION.DEL_TASK_DUE_DATE',
|
||||||
|
'APP_DELEGATION.DEL_FINISH_DATE',
|
||||||
|
'APP_DELEGATION.DEL_DURATION',
|
||||||
|
'APP_DELEGATION.DEL_QUEUE_DURATION',
|
||||||
|
'APP_DELEGATION.DEL_DELAY_DURATION',
|
||||||
|
'APP_DELEGATION.DEL_STARTED',
|
||||||
|
'APP_DELEGATION.DEL_FINISHED',
|
||||||
|
'APP_DELEGATION.DEL_DELAYED',
|
||||||
|
'TASK.TAS_DURATION',
|
||||||
|
'TASK.TAS_TIMEUNIT',
|
||||||
|
'TASK.TAS_TYPE_DAY'
|
||||||
|
];
|
||||||
|
$builder = Delegation::query()
|
||||||
|
->select($columns)
|
||||||
|
->leftjoin('TASK', function ($join) {
|
||||||
|
$join->on('APP_DELEGATION.TAS_UID', '=', 'TASK.TAS_UID');
|
||||||
|
})
|
||||||
|
->where(function ($query) {
|
||||||
|
$query->where('APP_DELEGATION.DEL_STARTED', '=', 0)
|
||||||
|
->orWhere('APP_DELEGATION.DEL_FINISHED', '=', 0);
|
||||||
|
})
|
||||||
|
->orderBy('DELEGATION_ID', 'asc');
|
||||||
|
return $builder;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the APP_DELEGATION table with the calculated calendar results.
|
||||||
|
* @param array $appDelegationTask
|
||||||
|
* @param DateTime $date
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
private function updateAppDelegationWithCalendar(array $appDelegationTask, DateTime $date): void
|
||||||
|
{
|
||||||
|
$calendar = new Calendar();
|
||||||
|
$calendar->getCalendar($appDelegationTask['USR_UID'], $appDelegationTask['PRO_UID'], $appDelegationTask['TAS_UID']);
|
||||||
|
$calData = $calendar->getCalendarData();
|
||||||
|
$calculatedValues = $this->getValuesToStoreForCalculateDuration($appDelegationTask, $calendar, $calData, $date);
|
||||||
|
|
||||||
|
Delegation::select()
|
||||||
|
->where('APP_UID', '=', $appDelegationTask['APP_UID'])
|
||||||
|
->where('DEL_INDEX', '=', $appDelegationTask['DEL_INDEX'])
|
||||||
|
->update([
|
||||||
|
'DEL_STARTED' => $calculatedValues['isStarted'],
|
||||||
|
'DEL_FINISHED' => $calculatedValues['isFinished'],
|
||||||
|
'DEL_DELAYED' => $calculatedValues['isDelayed'],
|
||||||
|
'DEL_QUEUE_DURATION' => $calculatedValues['queueTime'],
|
||||||
|
'DEL_DELAY_DURATION' => $calculatedValues['delayTime'],
|
||||||
|
'DEL_DURATION' => $calculatedValues['durationTime'],
|
||||||
|
'APP_OVERDUE_PERCENTAGE' => $calculatedValues['percentDelay']
|
||||||
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
public function getValuesToStoreForCalculateDuration($row, $calendar, $calData, $nowDate)
|
public function getValuesToStoreForCalculateDuration($row, $calendar, $calData, $nowDate)
|
||||||
@@ -706,39 +745,6 @@ class AppDelegation extends BaseAppDelegation
|
|||||||
return new DateTime($stringDate);
|
return new DateTime($stringDate);
|
||||||
}
|
}
|
||||||
|
|
||||||
private function recordSetForCalculateDuration()
|
|
||||||
{
|
|
||||||
//walk in all rows with DEL_STARTED = 0 or DEL_FINISHED = 0
|
|
||||||
$c = new Criteria('workflow');
|
|
||||||
$c->clearSelectColumns();
|
|
||||||
$c->addSelectColumn(AppDelegationPeer::APP_UID);
|
|
||||||
$c->addSelectColumn(AppDelegationPeer::DEL_INDEX);
|
|
||||||
$c->addSelectColumn(AppDelegationPeer::USR_UID);
|
|
||||||
$c->addSelectColumn(AppDelegationPeer::PRO_UID);
|
|
||||||
$c->addSelectColumn(AppDelegationPeer::TAS_UID);
|
|
||||||
$c->addSelectColumn(AppDelegationPeer::DEL_DELEGATE_DATE);
|
|
||||||
$c->addSelectColumn(AppDelegationPeer::DEL_INIT_DATE);
|
|
||||||
$c->addSelectColumn(AppDelegationPeer::DEL_TASK_DUE_DATE);
|
|
||||||
$c->addSelectColumn(AppDelegationPeer::DEL_FINISH_DATE);
|
|
||||||
$c->addSelectColumn(AppDelegationPeer::DEL_DURATION);
|
|
||||||
$c->addSelectColumn(AppDelegationPeer::DEL_QUEUE_DURATION);
|
|
||||||
$c->addSelectColumn(AppDelegationPeer::DEL_DELAY_DURATION);
|
|
||||||
$c->addSelectColumn(AppDelegationPeer::DEL_STARTED);
|
|
||||||
$c->addSelectColumn(AppDelegationPeer::DEL_FINISHED);
|
|
||||||
$c->addSelectColumn(AppDelegationPeer::DEL_DELAYED);
|
|
||||||
$c->addSelectColumn(TaskPeer::TAS_DURATION);
|
|
||||||
$c->addSelectColumn(TaskPeer::TAS_TIMEUNIT);
|
|
||||||
$c->addSelectColumn(TaskPeer::TAS_TYPE_DAY);
|
|
||||||
|
|
||||||
$c->addJoin(AppDelegationPeer::TAS_UID, TaskPeer::TAS_UID, Criteria::LEFT_JOIN);
|
|
||||||
$cton1 = $c->getNewCriterion(AppDelegationPeer::DEL_STARTED, 0);
|
|
||||||
$cton2 = $c->getNewCriterion(AppDelegationPeer::DEL_FINISHED, 0);
|
|
||||||
$cton1->addOR($cton2);
|
|
||||||
$c->add($cton1);
|
|
||||||
$rs = AppDelegationPeer::doSelectRS($c);
|
|
||||||
$rs->setFetchmode(ResultSet::FETCHMODE_ASSOC);
|
|
||||||
return $rs;
|
|
||||||
}
|
|
||||||
private function writeFileIfCalledFromCronForCalculateDuration($cron)
|
private function writeFileIfCalledFromCronForCalculateDuration($cron)
|
||||||
{
|
{
|
||||||
if ($cron == 1) {
|
if ($cron == 1) {
|
||||||
|
|||||||
@@ -0,0 +1,57 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace ProcessMaker\Util;
|
||||||
|
|
||||||
|
class BatchProcessWithIndexes
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Start of the query.
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
private $start = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Limit of the query.
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
private $limit = 1000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Total size of the query.
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
private $size = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor of the class.
|
||||||
|
* @param int $size
|
||||||
|
*/
|
||||||
|
public function __construct(int $size)
|
||||||
|
{
|
||||||
|
$this->size = $size;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set custom limit of the query.
|
||||||
|
* @param int $limit
|
||||||
|
* @return self
|
||||||
|
*/
|
||||||
|
public function setLimit(int $limit): self
|
||||||
|
{
|
||||||
|
$this->limit = $limit;
|
||||||
|
return $this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Batch process returning the index for query.
|
||||||
|
* @param callable $callback
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public function process(callable $callback): void
|
||||||
|
{
|
||||||
|
for ($batch = 1; $this->start < $this->size; $batch++) {
|
||||||
|
$callback($this->start, $this->limit);
|
||||||
|
$this->start = $batch * $this->limit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user