Merge remote-tracking branch 'upstream/develop' into no-frontend-worker

This commit is contained in:
Michael 2021-01-07 10:44:12 +00:00
commit df135c31fe
62 changed files with 306 additions and 14176 deletions

View file

@ -50,6 +50,7 @@ class Worker
private static $lock_duration = 0;
private static $last_update;
private static $state;
private static $daemon_mode = null;
/**
* Processes the tasks that are in the workerqueue table
@ -96,6 +97,10 @@ class Worker
// We fetch the next queue entry that is about to be executed
while ($r = self::workerProcess()) {
if (self::IPCJobsExists(getmypid())) {
self::IPCDeleteJobState(getmypid());
}
// Don't refetch when a worker fetches tasks for multiple workers
$refetched = DI::config()->get('system', 'worker_multiple_fetch');
foreach ($r as $entry) {
@ -146,13 +151,17 @@ class Worker
if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) {
Logger::info('Process lifetime reached, respawning.');
self::unclaimProcess();
self::spawnWorker();
if (self::isDaemonMode()) {
self::IPCSetJobState(true);
} else {
self::spawnWorker();
}
return;
}
}
// Cleaning up. Possibly not needed, but it doesn't harm anything.
if (DI::config()->get('system', 'worker_daemon_mode', false)) {
if (self::isDaemonMode()) {
self::IPCSetJobState(false);
}
Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]);
@ -190,7 +199,7 @@ class Worker
Logger::warning('Maximum processes reached, quitting.');
return false;
}
return true;
}
@ -412,6 +421,12 @@ class Worker
{
$a = DI::app();
$cooldown = DI::config()->get("system", "worker_cooldown", 0);
if ($cooldown > 0) {
Logger::info('Pre execution cooldown.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'cooldown' => $cooldown]);
sleep($cooldown);
}
Logger::enableWorker($funcname);
Logger::info("Process start.", ['priority' => $queue["priority"], 'id' => $queue["id"]]);
@ -484,10 +499,8 @@ class Worker
DI::profiler()->saveLog(DI::logger(), "ID " . $queue["id"] . ": " . $funcname);
$cooldown = DI::config()->get("system", "worker_cooldown", 0);
if ($cooldown > 0) {
Logger::info('Cooldown.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'cooldown' => $cooldown]);
Logger::info('Post execution cooldown.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'cooldown' => $cooldown]);
sleep($cooldown);
}
}
@ -771,7 +784,7 @@ class Worker
// Are there fewer workers running as possible? Then fork a new one.
if (!DI::config()->get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && self::entriesExists()) {
Logger::info("There are fewer workers as possible, fork a new worker.", ['active' => $active, 'queues' => $queues]);
if (DI::config()->get('system', 'worker_daemon_mode', false)) {
if (self::isDaemonMode()) {
self::IPCSetJobState(true);
} else {
self::spawnWorker();
@ -780,7 +793,7 @@ class Worker
}
// if there are too much worker, we don't spawn a new one.
if (DI::config()->get('system', 'worker_daemon_mode', false) && ($active > $queues)) {
if (self::isDaemonMode() && ($active > $queues)) {
self::IPCSetJobState(false);
}
@ -1100,6 +1113,11 @@ class Worker
*/
private static function forkProcess(bool $do_cron)
{
if (DI::process()->isMinMemoryReached()) {
Logger::warning('Memory limit reached - quitting');
return;
}
// Children inherit their parent's database connection.
// To avoid problems we disconnect and connect both parent and child
DBA::disconnect();
@ -1111,22 +1129,40 @@ class Worker
} elseif ($pid) {
// The parent process continues here
DBA::connect();
Logger::info('Spawned new worker', ['cron' => $do_cron, 'pid' => $pid]);
self::IPCSetJobState(true, $pid);
Logger::info('Spawned new worker', ['pid' => $pid]);
$cycles = 0;
while (self::IPCJobsExists($pid) && (++$cycles < 100)) {
usleep(10000);
}
Logger::info('Spawned worker is ready', ['pid' => $pid, 'wait_cycles' => $cycles]);
return;
}
// We now are in the new worker
DBA::connect();
Logger::info('Worker spawned', ['cron' => $do_cron, 'pid' => getmypid()]);
DI::process()->start();
// We now are in the new worker
$pid = getmypid();
DBA::connect();
/// @todo Reinitialize the logger to set a new process_id and uid
DI::process()->setPid($pid);
$cycles = 0;
while (!self::IPCJobsExists($pid) && (++$cycles < 100)) {
usleep(10000);
}
Logger::info('Worker spawned', ['pid' => $pid, 'wait_cycles' => $cycles]);
self::processQueue($do_cron);
self::unclaimProcess();
self::IPCSetJobState(false, $pid);
DI::process()->end();
Logger::info('Worker ended', ['cron' => $do_cron, 'pid' => getmypid()]);
Logger::info('Worker ended', ['pid' => $pid]);
exit();
}
@ -1139,18 +1175,14 @@ class Worker
*/
public static function spawnWorker($do_cron = false)
{
// Worker and daemon are started from the command line.
// This means that this is executed by a PHP interpreter without runtime limitations
if (function_exists('pcntl_fork') && in_array(DI::mode()->getExecutor(), [Mode::DAEMON, Mode::WORKER])) {
if (self::isDaemonMode() && DI::config()->get('system', 'worker_fork')) {
self::forkProcess($do_cron);
} else {
$process = new Core\Process(DI::logger(), DI::mode(), DI::config(),
DI::modelProcess(), DI::app()->getBasePath(), getmypid());
$process->run('bin/worker.php', ['no_cron' => !$do_cron]);
}
// after spawning we have to remove the flag.
if (DI::config()->get('system', 'worker_daemon_mode', false)) {
if (self::isDaemonMode()) {
self::IPCSetJobState(false);
}
}
@ -1242,7 +1274,7 @@ class Worker
}
// Set the IPC flag to ensure an immediate process execution via daemon
if (DI::config()->get('system', 'worker_daemon_mode', false)) {
if (self::isDaemonMode()) {
self::IPCSetJobState(true);
}
@ -1267,7 +1299,7 @@ class Worker
}
// Quit on daemon mode
if (DI::config()->get('system', 'worker_daemon_mode', false)) {
if (self::isDaemonMode()) {
return $added;
}
@ -1361,12 +1393,27 @@ class Worker
* Set the flag if some job is waiting
*
* @param boolean $jobs Is there a waiting job?
* @param int $key Key number
* @throws \Exception
*/
public static function IPCSetJobState($jobs)
public static function IPCSetJobState(bool $jobs, int $key = 0)
{
$stamp = (float)microtime(true);
DBA::update('worker-ipc', ['jobs' => $jobs], ['key' => 1], true);
DBA::replace('worker-ipc', ['jobs' => $jobs, 'key' => $key]);
self::$db_duration += (microtime(true) - $stamp);
self::$db_duration_write += (microtime(true) - $stamp);
}
/**
* Delete a key entry
*
* @param int $key Key number
* @throws \Exception
*/
public static function IPCDeleteJobState(int $key)
{
$stamp = (float)microtime(true);
DBA::delete('worker-ipc', ['key' => $key]);
self::$db_duration += (microtime(true) - $stamp);
self::$db_duration_write += (microtime(true) - $stamp);
}
@ -1374,13 +1421,14 @@ class Worker
/**
* Checks if some worker job waits to be executed
*
* @param int $key Key number
* @return bool
* @throws \Exception
*/
public static function IPCJobsExists()
public static function IPCJobsExists(int $key = 0)
{
$stamp = (float)microtime(true);
$row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => 1]);
$row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => $key]);
self::$db_duration += (microtime(true) - $stamp);
// When we don't have a row, no job is running
@ -1391,6 +1439,51 @@ class Worker
return (bool)$row['jobs'];
}
/**
* Checks if the worker is running in the daemon mode.
*
* @return boolean
*/
public static function isDaemonMode()
{
if (!is_null(self::$daemon_mode)) {
return self::$daemon_mode;
}
if (DI::mode()->getExecutor() == Mode::DAEMON) {
return true;
}
$daemon_mode = DI::config()->get('system', 'worker_daemon_mode', false, true);
if ($daemon_mode) {
return $daemon_mode;
}
if (!function_exists('pcntl_fork')) {
self::$daemon_mode = false;
return false;
}
$pidfile = DI::config()->get('system', 'pidfile');
if (empty($pidfile)) {
// No pid file, no daemon
self::$daemon_mode = false;
return false;
}
if (!is_readable($pidfile)) {
// No pid file. We assume that the daemon had been intentionally stopped.
self::$daemon_mode = false;
return false;
}
$pid = intval(file_get_contents($pidfile));
$running = posix_kill($pid, 0);
self::$daemon_mode = $running;
return $running;
}
/**
* Test if the daemon is running. If not, it will be started
*