mirror of
https://git.friendi.ca/friendica/friendica.git
synced 2025-06-07 13:16:32 +02:00
New logger for Jetstream / new Worker-Id per task
This commit is contained in:
parent
3e3f49219d
commit
d462ade8f6
7 changed files with 310 additions and 18 deletions
27
src/App.php
27
src/App.php
|
@ -203,7 +203,7 @@ class App
|
|||
/**
|
||||
* @internal
|
||||
*/
|
||||
public function processConsole(array $serverParams): void
|
||||
public function processConsole(array $serverParams, bool $testmode = false): void
|
||||
{
|
||||
$argv = $serverParams['argv'] ?? [];
|
||||
|
||||
|
@ -231,6 +231,10 @@ class App
|
|||
|
||||
$this->registerTemplateEngine();
|
||||
|
||||
if ($testmode) {
|
||||
return;
|
||||
}
|
||||
|
||||
(\Friendica\Core\Console::create($this->container, $argv))->execute();
|
||||
}
|
||||
|
||||
|
@ -290,16 +294,18 @@ class App
|
|||
{
|
||||
$command = strtolower($argv[1] ?? '');
|
||||
|
||||
if ($command === 'daemon' || $command === 'jetstream') {
|
||||
if ($command === 'daemon') {
|
||||
return LogChannel::DAEMON;
|
||||
}
|
||||
|
||||
if ($command === 'jetstream') {
|
||||
return LogChannel::JETSTREAM;
|
||||
}
|
||||
|
||||
if ($command === 'worker') {
|
||||
return LogChannel::WORKER;
|
||||
}
|
||||
|
||||
// @TODO Add support for jetstream
|
||||
|
||||
return LogChannel::CONSOLE;
|
||||
}
|
||||
|
||||
|
@ -453,11 +459,13 @@ class App
|
|||
|
||||
if (!$this->mode->isInstall()) {
|
||||
// Force SSL redirection
|
||||
if ($this->config->get('system', 'force_ssl') &&
|
||||
if (
|
||||
$this->config->get('system', 'force_ssl') &&
|
||||
(empty($serverVars['HTTPS']) || $serverVars['HTTPS'] === 'off') &&
|
||||
(empty($serverVars['HTTP_X_FORWARDED_PROTO']) || $serverVars['HTTP_X_FORWARDED_PROTO'] === 'http') &&
|
||||
!empty($serverVars['REQUEST_METHOD']) &&
|
||||
$serverVars['REQUEST_METHOD'] === 'GET') {
|
||||
$serverVars['REQUEST_METHOD'] === 'GET'
|
||||
) {
|
||||
System::externalRedirect($this->baseURL . '/' . $this->args->getQueryString());
|
||||
}
|
||||
|
||||
|
@ -478,7 +486,8 @@ class App
|
|||
// Only continue when the given profile link seems valid.
|
||||
// Valid profile links contain a path with "/profile/" and no query parameters
|
||||
if ((parse_url($queryVars['zrl'], PHP_URL_QUERY) == '') &&
|
||||
strpos(parse_url($queryVars['zrl'], PHP_URL_PATH) ?? '', '/profile/') !== false) {
|
||||
strpos(parse_url($queryVars['zrl'], PHP_URL_PATH) ?? '', '/profile/') !== false
|
||||
) {
|
||||
$this->auth->setUnauthenticatedVisitor($queryVars['zrl']);
|
||||
OpenWebAuth::zrlInit();
|
||||
} else {
|
||||
|
@ -649,8 +658,8 @@ class App
|
|||
@file_put_contents(
|
||||
$logfile,
|
||||
DateTimeFormat::utcNow() . "\t" . round($duration, 3) . "\t" .
|
||||
$this->requestId . "\t" . $code . "\t" .
|
||||
$request . "\t" . $agent . "\n",
|
||||
$this->requestId . "\t" . $code . "\t" .
|
||||
$request . "\t" . $agent . "\n",
|
||||
FILE_APPEND
|
||||
);
|
||||
}
|
||||
|
|
|
@ -24,6 +24,8 @@ interface LogChannel
|
|||
public const DAEMON = 'daemon';
|
||||
/** @var string channel for worker execution */
|
||||
public const WORKER = 'worker';
|
||||
/** @var string channel for jetstream execution */
|
||||
public const JETSTREAM = 'jetstream';
|
||||
/** @var string channel for frontend app executions */
|
||||
public const APP = 'app';
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ namespace Friendica\Core\Logger;
|
|||
use Friendica\Core\Config\Capability\IManageConfigValues;
|
||||
use Friendica\Core\Logger\Capability\LogChannel;
|
||||
use Friendica\Core\Logger\Factory\LoggerFactory;
|
||||
use Friendica\Core\Logger\Type\JetstreamLogger;
|
||||
use Friendica\Core\Logger\Type\ProfilerLogger;
|
||||
use Friendica\Core\Logger\Type\WorkerLogger;
|
||||
use Friendica\Util\Profiler;
|
||||
|
@ -66,6 +67,11 @@ final class LoggerManager
|
|||
self::$logger = null;
|
||||
}
|
||||
|
||||
public function getLogChannel(): string
|
||||
{
|
||||
return self::$logChannel;
|
||||
}
|
||||
|
||||
/**
|
||||
* (Creates and) Returns the logger instance
|
||||
*/
|
||||
|
@ -78,6 +84,11 @@ final class LoggerManager
|
|||
return self::$logger;
|
||||
}
|
||||
|
||||
public function setLogger(LoggerInterface $logger): void
|
||||
{
|
||||
self::$logger = $logger;
|
||||
}
|
||||
|
||||
private function createLogger(): LoggerInterface
|
||||
{
|
||||
// Always create NullLogger if debug is disabled
|
||||
|
@ -98,6 +109,10 @@ final class LoggerManager
|
|||
$logger = new WorkerLogger($logger);
|
||||
}
|
||||
|
||||
if (self::$logChannel === LogChannel::JETSTREAM) {
|
||||
$logger = new JetstreamLogger($logger);
|
||||
}
|
||||
|
||||
return $logger;
|
||||
}
|
||||
}
|
||||
|
|
219
src/Core/Logger/Type/JetstreamLogger.php
Normal file
219
src/Core/Logger/Type/JetstreamLogger.php
Normal file
|
@ -0,0 +1,219 @@
|
|||
<?php
|
||||
|
||||
// Copyright (C) 2010-2024, the Friendica project
|
||||
// SPDX-FileCopyrightText: 2010-2024 the Friendica project
|
||||
//
|
||||
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
namespace Friendica\Core\Logger\Type;
|
||||
|
||||
use Friendica\Core\Logger\Exception\LoggerException;
|
||||
use Friendica\Util\Strings;
|
||||
use Psr\Log\LoggerInterface;
|
||||
|
||||
/**
|
||||
* A Logger for specific jetstream tasks, which adds a jetstream id to it.
|
||||
* Uses the decorator pattern (https://en.wikipedia.org/wiki/Decorator_pattern)
|
||||
*/
|
||||
class JetstreamLogger implements LoggerInterface
|
||||
{
|
||||
/** @var int Length of the unique jetstream id */
|
||||
const JETSTREAM_ID_LENGTH = 7;
|
||||
|
||||
/**
|
||||
* @var LoggerInterface The original Logger instance
|
||||
*/
|
||||
private $logger;
|
||||
|
||||
/**
|
||||
* @var string the current jetstream ID
|
||||
*/
|
||||
private $jetstreamId;
|
||||
|
||||
/**
|
||||
* @param LoggerInterface $logger The logger for jetstream tasks
|
||||
*
|
||||
* @throws LoggerException
|
||||
*/
|
||||
public function __construct(LoggerInterface $logger)
|
||||
{
|
||||
$this->logger = $logger;
|
||||
try {
|
||||
$this->jetstreamId = Strings::getRandomHex(self::JETSTREAM_ID_LENGTH);
|
||||
} catch (\Exception $exception) {
|
||||
throw new LoggerException('Cannot generate random Hex.', $exception);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the jetstream context for each log entry
|
||||
*
|
||||
* @param array $context
|
||||
*/
|
||||
private function addContext(array &$context)
|
||||
{
|
||||
$context['jetstream_id'] = $this->jetstreamId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the jetstream ID
|
||||
*
|
||||
* @return string
|
||||
*/
|
||||
public function getJetstreamId(): string
|
||||
{
|
||||
return $this->jetstreamId;
|
||||
}
|
||||
|
||||
public function setJetstreamId(string $jetstreamId): void
|
||||
{
|
||||
$this->jetstreamId = $jetstreamId;
|
||||
}
|
||||
|
||||
public function initJetstreamId(): void
|
||||
{
|
||||
try {
|
||||
$this->jetstreamId = Strings::getRandomHex(self::JETSTREAM_ID_LENGTH);
|
||||
} catch (\Exception $exception) {
|
||||
throw new LoggerException('Cannot generate random Hex.', $exception);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* System is unusable.
|
||||
*
|
||||
* @param string $message
|
||||
* @param array $context
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function emergency($message, array $context = [])
|
||||
{
|
||||
$this->addContext($context);
|
||||
$this->logger->emergency($message, $context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Action must be taken immediately.
|
||||
*
|
||||
* Example: Entire website down, database unavailable, etc. This should
|
||||
* trigger the SMS alerts and wake you up.
|
||||
*
|
||||
* @param string $message
|
||||
* @param array $context
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function alert($message, array $context = [])
|
||||
{
|
||||
$this->addContext($context);
|
||||
$this->logger->alert($message, $context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Critical conditions.
|
||||
*
|
||||
* Example: Application component unavailable, unexpected exception.
|
||||
*
|
||||
* @param string $message
|
||||
* @param array $context
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function critical($message, array $context = [])
|
||||
{
|
||||
$this->addContext($context);
|
||||
$this->logger->critical($message, $context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Runtime errors that do not require immediate action but should typically
|
||||
* be logged and monitored.
|
||||
*
|
||||
* @param string $message
|
||||
* @param array $context
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function error($message, array $context = [])
|
||||
{
|
||||
$this->addContext($context);
|
||||
$this->logger->error($message, $context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Exceptional occurrences that are not errors.
|
||||
*
|
||||
* Example: Use of deprecated APIs, poor use of an API, undesirable things
|
||||
* that are not necessarily wrong.
|
||||
*
|
||||
* @param string $message
|
||||
* @param array $context
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function warning($message, array $context = [])
|
||||
{
|
||||
$this->addContext($context);
|
||||
$this->logger->warning($message, $context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Normal but significant events.
|
||||
*
|
||||
* @param string $message
|
||||
* @param array $context
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function notice($message, array $context = [])
|
||||
{
|
||||
$this->addContext($context);
|
||||
$this->logger->notice($message, $context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Interesting events.
|
||||
*
|
||||
* Example: User logs in, SQL logs.
|
||||
*
|
||||
* @param string $message
|
||||
* @param array $context
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function info($message, array $context = [])
|
||||
{
|
||||
$this->addContext($context);
|
||||
$this->logger->info($message, $context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Detailed debug information.
|
||||
*
|
||||
* @param string $message
|
||||
* @param array $context
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function debug($message, array $context = [])
|
||||
{
|
||||
$this->addContext($context);
|
||||
$this->logger->debug($message, $context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Logs with an arbitrary level.
|
||||
*
|
||||
* @param mixed $level
|
||||
* @param string $message
|
||||
* @param array $context
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function log($level, $message, array $context = [])
|
||||
{
|
||||
$this->addContext($context);
|
||||
$this->logger->log($level, $message, $context);
|
||||
}
|
||||
}
|
|
@ -57,7 +57,7 @@ class WorkerLogger implements LoggerInterface
|
|||
*
|
||||
* @throws LoggerException
|
||||
*/
|
||||
public function setFunctionName(string $functionName)
|
||||
public function setFunctionName(?string $functionName)
|
||||
{
|
||||
$this->functionName = $functionName;
|
||||
try {
|
||||
|
@ -67,6 +67,11 @@ class WorkerLogger implements LoggerInterface
|
|||
}
|
||||
}
|
||||
|
||||
public function getFunctionName(): ?string
|
||||
{
|
||||
return $this->functionName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the worker context for each log entry
|
||||
*
|
||||
|
@ -88,6 +93,11 @@ class WorkerLogger implements LoggerInterface
|
|||
return $this->workerId;
|
||||
}
|
||||
|
||||
public function setWorkerId(string $workerId): void
|
||||
{
|
||||
$this->workerId = $workerId;
|
||||
}
|
||||
|
||||
/**
|
||||
* System is unusable.
|
||||
*
|
||||
|
|
|
@ -82,7 +82,7 @@ class Worker
|
|||
// Kill stale processes every 5 minutes
|
||||
$last_cleanup = DI::keyValue()->get('worker_last_cleaned') ?? 0;
|
||||
if (time() > ($last_cleanup + 300)) {
|
||||
DI::keyValue()->set( 'worker_last_cleaned', time());
|
||||
DI::keyValue()->set('worker_last_cleaned', time());
|
||||
Worker\Cron::killStaleWorkers();
|
||||
}
|
||||
|
||||
|
@ -399,7 +399,7 @@ class Worker
|
|||
|
||||
require_once $include;
|
||||
|
||||
$funcname = str_replace('.php', '', basename($argv[0])) .'_run';
|
||||
$funcname = str_replace('.php', '', basename($argv[0])) . '_run';
|
||||
|
||||
if (function_exists($funcname)) {
|
||||
// We constantly update the "executed" date every minute to avoid being killed too soon
|
||||
|
@ -541,7 +541,19 @@ class Worker
|
|||
|
||||
self::coolDown();
|
||||
|
||||
DI::loggerManager()->changeLogChannel(LogChannel::WORKER);
|
||||
$loggerManager = DI::loggerManager();
|
||||
|
||||
$previousLogger = $loggerManager->getLogger();
|
||||
$previousChannel = $loggerManager->getLogChannel();
|
||||
if ($previousLogger instanceof WorkerLogger) {
|
||||
$previousId = $previousLogger->getWorkerId();
|
||||
$previousFunction = $previousLogger->getFunctionName();
|
||||
} else {
|
||||
$previousId = null;
|
||||
$previousFunction = null;
|
||||
}
|
||||
|
||||
$loggerManager->changeLogChannel(LogChannel::WORKER);
|
||||
|
||||
$logger = DI::logger();
|
||||
|
||||
|
@ -624,6 +636,14 @@ class Worker
|
|||
|
||||
DI::logger()->info('Process done.', ['function' => $funcname, 'priority' => $queue['priority'], 'retrial' => $queue['retrial'], 'id' => $queue['id'], 'duration' => round($duration, 3)]);
|
||||
|
||||
$loggerManager->changeLogChannel($previousChannel);
|
||||
$loggerManager->setLogger($previousLogger);
|
||||
|
||||
if ($previousLogger instanceof WorkerLogger) {
|
||||
$previousLogger->setWorkerId($previousId);
|
||||
$previousLogger->setFunctionName($previousFunction);
|
||||
}
|
||||
|
||||
DI::profiler()->saveLog(DI::logger(), 'ID ' . $queue['id'] . ': ' . $funcname);
|
||||
}
|
||||
|
||||
|
@ -684,7 +704,7 @@ class Worker
|
|||
$level = ($used / $max) * 100;
|
||||
|
||||
if ($level >= $maxlevel) {
|
||||
DI::logger()->warning('Maximum level (' . $maxlevel . '%) of user connections reached: ' . $used .'/' . $max);
|
||||
DI::logger()->warning('Maximum level (' . $maxlevel . '%) of user connections reached: ' . $used . '/' . $max);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -765,7 +785,7 @@ class Worker
|
|||
self::$db_duration_stat += (microtime(true) - $stamp);
|
||||
$jobs_per_minute[$interval] = number_format($jobs / $interval, 0);
|
||||
}
|
||||
$processlist = ' - jpm: '.implode('/', $jobs_per_minute);
|
||||
$processlist = ' - jpm: ' . implode('/', $jobs_per_minute);
|
||||
}
|
||||
|
||||
// Create a list of queue entries grouped by their priority
|
||||
|
@ -810,7 +830,7 @@ class Worker
|
|||
|
||||
$listitem[0] = '0:' . max(0, $idle_workers);
|
||||
|
||||
$processlist .= ' ('.implode(', ', $listitem).')';
|
||||
$processlist .= ' (' . implode(', ', $listitem) . ')';
|
||||
|
||||
if (DI::config()->get('system', 'worker_fastlane', false) && ($queues > 0) && ($active >= $queues) && self::entriesExists()) {
|
||||
$top_priority = self::highestPriority();
|
||||
|
@ -1305,8 +1325,13 @@ class Worker
|
|||
}
|
||||
|
||||
if (empty($queue)) {
|
||||
if (!DBA::insert('workerqueue', ['command' => $command, 'parameter' => $parameters, 'created' => $created,
|
||||
'priority' => $priority, 'next_try' => $delayed])) {
|
||||
if (!DBA::insert('workerqueue', [
|
||||
'command' => $command,
|
||||
'parameter' => $parameters,
|
||||
'created' => $created,
|
||||
'priority' => $priority,
|
||||
'next_try' => $delayed
|
||||
])) {
|
||||
return 0;
|
||||
}
|
||||
$added = DBA::lastInsertId();
|
||||
|
|
|
@ -12,6 +12,7 @@ namespace Friendica\Protocol\ATProtocol;
|
|||
|
||||
use Friendica\Core\Config\Capability\IManageConfigValues;
|
||||
use Friendica\Core\KeyValueStorage\Capability\IManageKeyValuePairs;
|
||||
use Friendica\Core\Logger\Type\JetstreamLogger;
|
||||
use Friendica\Core\Protocol;
|
||||
use Friendica\Core\System;
|
||||
use Friendica\Model\Contact;
|
||||
|
@ -276,6 +277,13 @@ class Jetstream
|
|||
*/
|
||||
private function route(stdClass $data): void
|
||||
{
|
||||
if ($this->logger instanceof JetstreamLogger) {
|
||||
$previousId = $this->logger->getJetstreamId();
|
||||
$this->logger->initJetstreamId();
|
||||
} else {
|
||||
$previousId = null;
|
||||
}
|
||||
|
||||
Item::incrementInbound(Protocol::BLUESKY);
|
||||
|
||||
switch ($data->kind) {
|
||||
|
@ -293,6 +301,10 @@ class Jetstream
|
|||
$this->routeCommits($data);
|
||||
break;
|
||||
}
|
||||
|
||||
if ($this->logger instanceof JetstreamLogger) {
|
||||
$this->logger->setJetstreamId($previousId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue