Resolving new conflict with PMCORE-543
This commit is contained in:
@@ -2,6 +2,7 @@
|
||||
|
||||
namespace ProcessMaker\Core;
|
||||
|
||||
use Bootstrap;
|
||||
use Exception;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
use ProcessMaker\BusinessModel\Factories\Jobs;
|
||||
@@ -140,12 +141,14 @@ class JobsManager
|
||||
|
||||
$_SESSION = $environment['session'];
|
||||
$_SERVER = $environment['server'];
|
||||
Propel::initConfiguration($environment['configuration']);
|
||||
foreach ($environment['constants'] as $key => $value) {
|
||||
if (!defined($key)) {
|
||||
define($key, $value);
|
||||
}
|
||||
}
|
||||
|
||||
Propel::close();
|
||||
Propel::init(PATH_CONFIG . "databases.php");
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -190,6 +193,11 @@ class JobsManager
|
||||
$callback($environment);
|
||||
} catch (Exception $e) {
|
||||
Log::error($e->getMessage() . ": " . $e->getTraceAsString());
|
||||
$context = [
|
||||
"trace" => $e->getTraceAsString(),
|
||||
"workspace" => $environment["constants"]["SYS_SYS"]
|
||||
];
|
||||
Bootstrap::registerMonolog("queue:work", 400, $e->getMessage(), $context, "");
|
||||
throw $e;
|
||||
}
|
||||
});
|
||||
|
||||
92
workflow/engine/src/ProcessMaker/Core/MultiProcOpen.php
Normal file
92
workflow/engine/src/ProcessMaker/Core/MultiProcOpen.php
Normal file
@@ -0,0 +1,92 @@
|
||||
<?php
|
||||
|
||||
namespace ProcessMaker\Core;
|
||||
|
||||
class MultiProcOpen
|
||||
{
|
||||
/**
|
||||
* Represents the waiting time before starting the process monitoring.
|
||||
* @var integer
|
||||
*/
|
||||
private $sleepTime = 1;
|
||||
|
||||
/**
|
||||
* This method obtains a paging by returning the start and limit indexes
|
||||
* compatible with the mysql pagination in its call function.
|
||||
* The return function must return an instance of the object "ProcessMaker\Core\ProcOpen".
|
||||
* Returns an array containing the status, content, and errors generated by
|
||||
* the open process.
|
||||
* @param int $size
|
||||
* @param int $chunk
|
||||
* @param callable $callback
|
||||
* @return array
|
||||
*/
|
||||
public function chunk(int $size, int $chunk, callable $callback): array
|
||||
{
|
||||
$start = 0;
|
||||
$limit = $chunk;
|
||||
$queries = [];
|
||||
for ($i = 1; $start < $size; $i++) {
|
||||
$queries[] = $callback($size, $start, $limit);
|
||||
$start = $i * $limit;
|
||||
}
|
||||
return $this->run($queries);
|
||||
}
|
||||
|
||||
/**
|
||||
* Open a set of background processes.
|
||||
* The array must contain one or more instances of the object inherited from
|
||||
* the class "ProcessMaker\Core\ProcOpen"
|
||||
* Returns an array containing the status, content, and errors generated by
|
||||
* the open process.
|
||||
* @param array $processes
|
||||
* @return array
|
||||
*/
|
||||
public function run(array $processes): array
|
||||
{
|
||||
foreach ($processes as $procOpen) {
|
||||
$procOpen->open();
|
||||
}
|
||||
return $this->processMonitoring($processes);
|
||||
}
|
||||
|
||||
/**
|
||||
* It monitors the open processes, verifying if they have ended or thrown an
|
||||
* error and later closing the resources related to the process.
|
||||
* Returns an array containing the status, content, and errors generated by
|
||||
* the open process.
|
||||
* @param array $processes
|
||||
* @return array
|
||||
*/
|
||||
private function processMonitoring(array $processes): array
|
||||
{
|
||||
sleep($this->sleepTime); //this sleep is very important
|
||||
$i = 0;
|
||||
$n = count($processes);
|
||||
if ($n === 0) {
|
||||
return [];
|
||||
}
|
||||
$outputs = [];
|
||||
do {
|
||||
$index = $i % $n;
|
||||
if (isset($processes[$index])) {
|
||||
$procOpen = $processes[$index];
|
||||
$status = $procOpen->getStatus();
|
||||
$contents = $procOpen->getContents();
|
||||
$errors = $procOpen->getErrors();
|
||||
if ($status->running === false || !empty($errors)) {
|
||||
$outputs[] = [
|
||||
"status" => $status,
|
||||
"contents" => $contents,
|
||||
"errors" => $errors,
|
||||
];
|
||||
$procOpen->terminate();
|
||||
$procOpen->close();
|
||||
unset($processes[$index]);
|
||||
}
|
||||
}
|
||||
$i = $i + 1;
|
||||
} while (!empty($processes));
|
||||
return $outputs;
|
||||
}
|
||||
}
|
||||
126
workflow/engine/src/ProcessMaker/Core/ProcOpen.php
Normal file
126
workflow/engine/src/ProcessMaker/Core/ProcOpen.php
Normal file
@@ -0,0 +1,126 @@
|
||||
<?php
|
||||
|
||||
namespace ProcessMaker\Core;
|
||||
|
||||
class ProcOpen
|
||||
{
|
||||
private $command;
|
||||
private $resource;
|
||||
private $descriptorspec;
|
||||
private $pipes;
|
||||
private $cwd;
|
||||
|
||||
/**
|
||||
* This initializes the descriptors and the command for the open process.
|
||||
* @param string $command
|
||||
*/
|
||||
public function __construct(string $command)
|
||||
{
|
||||
$this->descriptorspec = [
|
||||
['pipe', 'r'],
|
||||
['pipe', 'w'],
|
||||
['pipe', 'w']
|
||||
];
|
||||
$this->command = $command;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the resource that represents the process.
|
||||
* @return resource
|
||||
*/
|
||||
public function getResource()
|
||||
{
|
||||
return $this->resource;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the process execution directory.
|
||||
* @param string $cwd
|
||||
*/
|
||||
public function setCwd(string $cwd)
|
||||
{
|
||||
$this->cwd = $cwd;
|
||||
}
|
||||
|
||||
/**
|
||||
* Open a background process.
|
||||
*/
|
||||
public function open()
|
||||
{
|
||||
if (empty($this->cwd)) {
|
||||
$this->resource = proc_open($this->command, $this->descriptorspec, $this->pipes);
|
||||
} else {
|
||||
$this->resource = proc_open($this->command, $this->descriptorspec, $this->pipes, $this->cwd);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the content of the process when it is finished.
|
||||
* @return string
|
||||
*/
|
||||
public function getContents()
|
||||
{
|
||||
if (is_resource($this->pipes[1])) {
|
||||
return stream_get_contents($this->pipes[1]);
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the process errors when it is finished.
|
||||
* @return string
|
||||
*/
|
||||
public function getErrors()
|
||||
{
|
||||
if (is_resource($this->pipes[2])) {
|
||||
return stream_get_contents($this->pipes[2]);
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the resources related to the open process.
|
||||
* return void
|
||||
*/
|
||||
public function close()
|
||||
{
|
||||
if (is_resource($this->resource)) {
|
||||
foreach ($this->pipes as $value) {
|
||||
fclose($value);
|
||||
}
|
||||
proc_close($this->resource);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* End the process before it ends.
|
||||
*/
|
||||
public function terminate()
|
||||
{
|
||||
if (is_resource($this->resource)) {
|
||||
proc_terminate($this->resource);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the status of the process.
|
||||
* @return object
|
||||
*/
|
||||
public function getStatus()
|
||||
{
|
||||
$status = [
|
||||
"command" => $this->command,
|
||||
"pid" => null,
|
||||
"running" => false,
|
||||
"signaled" => false,
|
||||
"stopped" => false,
|
||||
"exitcode" => -1,
|
||||
"termsig" => 0,
|
||||
"stopsig" => 0
|
||||
];
|
||||
if (is_resource($this->resource)) {
|
||||
$status = proc_get_status($this->resource);
|
||||
}
|
||||
return (object) $status;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user