Файловый менеджер - Редактировать - /opt/cloudlinux/venv/lib/python3.11/site-packages/xray/agent/daemon.py
Назад
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ This module contains contains classes implementing X-Ray Agent behaviour """ import collections import io import json import logging import os import queue import re import signal import socket import subprocess import time import typing from threading import Thread, current_thread, Lock from typing import Any, Optional, Dict, Tuple from dataclasses import dataclass import psutil from .executor import BoundedThreadExecutor from xray import gettext as _ from xray.apiclient import get_client from xray.internal.constants import local_tasks_storage, tasks_base_storage, safe_id_pattern from xray.internal.exceptions import XRayError, XRayAPIError from .fault_detector import FaultDetector from xray.internal.local_counters import ( open_local_storage, flush_memory_storage, get_task_ids ) from xray.internal.types import Task from xray.internal.user_plugin_utils import extract_creds from xray.internal.utils import ( dbm_storage, get_current_cpu_throttling_time ) if typing.TYPE_CHECKING: from xray.apiclient.api_client import ( SendClient, SmartAdviceAPIClient, APIClient ) @dataclass class APIDataContainer: client: 'APIClient' task: Task logger = logging.getLogger(__name__) class Agent: """ X-Ray Agent class """ COUNTERS_FLUSH_INTERVAL = 15 MONGO_FLUSH_INTERVAL = 60 CLEANUP_INTERVAL = 43200 # once in 12 hours def __init__(self, system_id, # keep max_connections quite big to handle spikes max_connections=psutil.cpu_count() * 8, # max_workers can also be quite big because they are not cpu-bound max_workers=psutil.cpu_count() * 4, maxqueuesize=psutil.cpu_count() * 16, max_connections_per_uid=None): self.sys_id = system_id self.maxqueuesize = maxqueuesize self.max_connections = max_connections self.max_workers = max_workers self.max_connections_per_uid = max_connections_per_uid if max_connections_per_uid is not None else max_connections // 2 # don't process SIGUSR2 with default handler signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGUSR2}) # initialize ClickHouse API client to send requests data clickhouse_client_object: typing.Type[SendClient] = get_client('api_req') self.send_client: SendClient = clickhouse_client_object(system_id=self.sys_id) # initialize Adviser API client to send requests data adviser_client_object: typing.Type[SmartAdviceAPIClient] = get_client('adviser') self.adviser_client: SmartAdviceAPIClient = adviser_client_object() # initial state of MongoDB API client to interact with tasks self.task_client_object: typing.Type[APIClient] = get_client() # initialize storage for cache of remote API data self.api_data_cache_lock = Lock() self.api_data_cache: Dict[str, APIDataContainer] = dict() # initialize Fault Detector self.fault_detector = FaultDetector() # per-UID concurrent connection counter (N6 hardening) self._uid_connections: Dict[int, int] = collections.defaultdict(int) self._uid_connections_lock = Lock() self.signal_handler_thread: Optional[Thread] = None self.flusher_thread: Optional[Thread] = None def _wait_for_sigusr2(self): siginfo = signal.sigwaitinfo({signal.SIGUSR2}) logging.info('Received SIGUSR2 from pid=%s, ' 'flushing database storage on disk', siginfo.si_pid) self._flush_mongodb_counters() flush_memory_storage() logging.info('Sending signal back to process that requested storage flush') try: os.kill(siginfo.si_pid, signal.SIGUSR2) except OSError: logging.warning('Process that requested storage flush no longer exists') def _setup_signal_handler(self): """ Setup SIGUSR2 handler that starts in-memory storage flush when received. When flushed, send SIGUSR2 back to the process that send signal. """ signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGUSR2}) while True: try: self._wait_for_sigusr2() except Exception: logging.exception('Unable to process signal, see traceback for details.') def _flush_mongodb_counters(self, task_id=None): tasks_to_flush = [task_id] if task_id is not None else get_task_ids() for task_id in tasks_to_flush: logger.info('Updating task requests counters in mongo for task_id=%s', task_id) try: apiclient, task = self.get_cached_or_load(task_id) except XRayError: logging.warning('Unable to get client and task %s', task_id) continue # read stored request_id with open_local_storage(task_id) as storage: task.update_with_local_data(next_request_id=storage.next_request_id) if task.tracing_by == 'time': # tracing_count for task by time represents number of minutes # left to active tracing and is updated by stop|continue # task routines only self.update_counts(apiclient, task.request_count) else: # tracing_count for task by request_qty depends on number of # collected requests, thus should be updated alongside self.update_counts(apiclient, task.request_count, task.tracing_count) def _flusher(self): """ This method flushes data from memory to local storage periodically. """ last_mongo_flush_time = 0 last_api_data_cache_cleanup = 0 while True: time.sleep(self.COUNTERS_FLUSH_INTERVAL) if time.time() - last_mongo_flush_time > self.MONGO_FLUSH_INTERVAL: self._flush_mongodb_counters() flush_memory_storage() last_mongo_flush_time = time.time() # we should cleanup API data cache only after flushing counters # in order not to lose counters for already inactive tasks if time.time() - last_api_data_cache_cleanup > self.CLEANUP_INTERVAL: self.cleanup_api_data_cache() last_api_data_cache_cleanup = time.time() else: flush_memory_storage(remove=False) def start(self, sock: socket.socket, background_routine: bool = False, loops: Optional[int] = None) -> None: """ Start listening socket """ logger.info('Starting daemon') if background_routine: # setup signal handlers self.signal_handler_thread = Thread(target=self._setup_signal_handler) self.signal_handler_thread.start() # start periodical database flushing self.flusher_thread = Thread(target=self._flusher) self.flusher_thread.start() with BoundedThreadExecutor( max_workers=self.max_workers, maxqueuesize=self.maxqueuesize) as workers_pool, \ BoundedThreadExecutor( max_workers=self.max_connections, # turn off the queue for connections because we # don't want the php processes to wait in queue # and slow down the php processes maxqueuesize=0 ) as connections_pool: while loops if loops is not None else True: connection, address = sock.accept() try: connections_pool.submit( self.handle_incoming_connection, connection, workers_pool) except queue.Full: logger.error('Request %s was rejected because our connections thread pool ' 'is full of tasks. Increase max_connections in configuration.') connection.close() if loops is not None: loops -= 1 def add_limit_faults(self, data, t_key, cpu_value): """ Calculate throttling values and update given data with resulting throttling stat """ data['hitting_limits'], data['throttled_time'] = self.fault_detector( t_key, cpu_value) def _handle_request_init(self, php_pid: int, cpu_usage: int, caller_uid: Optional[int] = None): """ Called when php request starts and sends us welcome request meaning that request started on the php side. """ logger.info('Received request init trigger from php=%s uid=%s cpu_usage=%s', php_pid, caller_uid, cpu_usage) # save current CPU throttling time and timestamp self.fault_detector.save(php_pid, cpu_usage) # attempt to flush expired entries self.fault_detector.flush() def _handle_request_end(self, php_pid: int, cpu_usage: int, request_data: dict, caller_uid: int): # otherwise calculate throttling fact, add it to data # and send gathered stat to CH if request_data.get('hitting_limits') is None: # only calculate faults if extension failed to get them itself self.add_limit_faults(request_data, php_pid, cpu_usage) logger.info('[%s] Processing trace for task %s (%s)', current_thread().name, request_data.get('tracing_task_id'), request_data.get('url')) self.process_request_data(request_data, caller_uid) # Maximum payload size for a single request. # The PHP C extension can emit up to ~3 MB in worst-case configuration: # 331 spans (100 mysql + 100 external + 30 plugin + 100 slow + 1 shortcode) # × ~9 KB each (4 KB query + 20-frame backtrace with ~200 B/frame + overhead). # 4 MB provides 2× headroom over the calculated worst case while still # bounding memory against a malicious or misbehaving client. _MAX_READ_SIZE = 4 * 1024 * 1024 # Socket read timeout in seconds _READ_TIMEOUT = 5.0 def handle_incoming_connection(self, connection: socket.socket, workers_pool: BoundedThreadExecutor) -> None: """ Handle incoming connection :param connection: socket object usable to send and receive data on the connection :param workers_pool: pool where we can place tasks for the futher processing """ try: _pid, _uid, _gid = extract_creds(connection) except OSError: logger.warning('Failed to extract credentials, closing connection') connection.close() return with self._uid_connections_lock: if self._uid_connections.get(_uid, 0) >= self.max_connections_per_uid: logger.warning('Too many concurrent connections from uid=%s, rejecting', _uid) connection.close() return self._uid_connections[_uid] += 1 try: self._process_connection(connection, workers_pool, _pid, _uid) finally: with self._uid_connections_lock: self._uid_connections[_uid] -= 1 if self._uid_connections[_uid] <= 0: del self._uid_connections[_uid] def _process_connection(self, connection: socket.socket, workers_pool: BoundedThreadExecutor, _pid: int, _uid: int) -> None: current_cpu = get_current_cpu_throttling_time(_uid) connection.settimeout(self._READ_TIMEOUT) fileobj: io.TextIOBase = connection.makefile(errors='ignore') try: input_data = self.read_input(fileobj) except json.JSONDecodeError as e: logger.error('JSON decode failed: %s', str(e), extra={'t_name': current_thread().name}) return except (OSError, socket.timeout): logger.warning('Connection read timed out or failed for pid=%s uid=%s', _pid, _uid) return finally: # close connection as soon as possible to # allow new clients to be connected connection.close() try: # continue in different pool and threads because we don't want # our php processes to wait until processing is complete if input_data is None: workers_pool.submit(self._handle_request_init, php_pid=_pid, cpu_usage=current_cpu, caller_uid=_uid) else: workers_pool.submit(self._handle_request_end, php_pid=_pid, cpu_usage=current_cpu, request_data=input_data, caller_uid=_uid) except queue.Full: logger.error('Request %s was rejected because our workers thread pool ' 'is full of tasks. Increase queuemaxsize or max_threads in configuration.') def read_input(self, fileio: io.TextIOBase) -> Any: """ Read input data and return decoded json :param fileio: a file-like object providing read method """ data = fileio.read(self._MAX_READ_SIZE) logger.debug('I received data: %s', data) if len(data.strip()) == 0: return return json.loads(data.strip(), strict=False) def instantiate_mongo_client(self, fake_task_id: str) -> 'APIClient': """ Initialize MongoDB client for current task """ try: with dbm_storage(local_tasks_storage) as task_storage: try: real_id = task_storage[fake_task_id].decode() except KeyError: raise XRayError( _("Cannot resolve tracing_task_id: no match found in storage"), extra={'id': fake_task_id, 'all_ids': task_storage.keys()}) except RuntimeError as e: raise XRayError(_("Cannot resolve tracing_task_id: %s") % str(e)) return self.task_client_object(system_id=self.sys_id, tracing_task_id=real_id) def get_cached_or_load(self, fake_task_id: str) -> Tuple['APIClient', Task]: """ Returns a client and task from cache of API data or initialize client and GET task from MongoDB and add to cache """ logger.debug('Cached API data: %s', self.api_data_cache) cached_data = self.api_data_cache.get(fake_task_id) if cached_data is not None: return cached_data.client, cached_data.task apiclient = self.instantiate_mongo_client(fake_task_id) _t = apiclient.get_task() logger.debug('Adding new container in cache: %s --> %s, %s', fake_task_id, _t, apiclient) self.api_data_cache[fake_task_id] = APIDataContainer(client=apiclient, task=_t) return apiclient, _t def cleanup_api_data_cache(self) -> None: """ Cleanup an API data im-memory cache dict in order not store inactive (stopped, already completed) tasks there """ try: with dbm_storage(local_tasks_storage) as task_storage: active_tasks = [k.decode() for k in task_storage.keys()] except RuntimeError: logger.warning( 'Unable to cleanup cache, storage unavailable') return for _task in list(self.api_data_cache.keys()): with self.api_data_cache_lock: if _task in self.api_data_cache and _task not in active_tasks: logger.info('Cleaning up inactive container %s', _task) self.api_data_cache.pop(_task) def process_request_data(self, request_data: dict, caller_uid: int = None) -> None: """ Increment request ID in /usr/share/alt-php-xray/requests/{tracing_task_id} file Substitute request_id and tracing_task_id in request_data. Send request_data to ClickHouse :param request_data: original request data :param caller_uid: UID of the connecting process (from SO_PEERCRED) """ fake_task_id = request_data['tracing_task_id'] # Verify the caller owns this task to prevent cross-user data injection. # Task files live at /usr/share/alt-php-xray-tasks/{uid}/{fake_id}, # created by the manager at task start. Only root can write there, # so the presence of the file is a reliable ownership proof. if caller_uid is not None: if not safe_id_pattern.match(fake_task_id): logger.warning('Rejecting request with invalid task id: %s', fake_task_id) return owner_task_file = os.path.join( tasks_base_storage, str(caller_uid), fake_task_id) if not os.path.isfile(owner_task_file): logger.warning( 'Rejecting request from uid=%s for task %s: ' 'no matching task file in per-UID storage', caller_uid, fake_task_id) return _, task = self.get_cached_or_load(fake_task_id) logger.info('Processing task: %s', task.task_id) with open_local_storage(request_data['tracing_task_id'], flush=task.is_manual) as storage: # read stored request_id task.update_with_local_data(next_request_id=storage.next_request_id) if task.tracing_count <= 0: logger.info('Tracing count is 0, nothing should be done') return # update input data with stored request_id updated_request_data = self.update_request_data(request_data, task) # send data with updated ids logger.info('Sending to ClickHouse') self.send_client(updated_request_data) try: logger.info('Sending to SmartAdvice') self.adviser_client(updated_request_data) except XRayAPIError: # ignore all errors occurring within smart advice # microservice intercommunication pass # then increment request_id counter storage.next_request_id += 1 # locally recalculate how much requests left to process task.update_with_local_data(next_request_id=storage.next_request_id) if task.is_manual: self._flush_mongodb_counters(task.fake_id) if task.tracing_by != 'time' and task.tracing_count <= 0: self.complete_task(task) def update_request_data(self, data: dict, task: Task) -> dict: """ Substitute request_id and tracing_task_id :param data: original input :param task: a Task instance :return: updated input """ data['request_id'] = task.request_count + 1 data['tracing_task_id'] = task.task_id for item in data['data']: item['request_id'] = task.request_count + 1 item['tracing_task_id'] = task.task_id if item['type'] == 'mysql_query': item['query'] = self.hide_symbols(item['query']) logger.info('Input updated: tracing_task_id = %s & request_id = %s', data.get('tracing_task_id'), data.get('request_id')) logger.debug('Full updated input %s', data) return data def update_counts(self, client: 'APIClient instance', request_count: int, tracing_count: Optional[int] = None) -> None: """ Update task counters in mongodb instance """ client.update_counts_only(tracing_count=tracing_count, request_count=request_count) def complete_task(self, _task: Task) -> None: """ Stop and complete request_qty task :param _task: tracing task to stop """ logger.info('Task %s should be completed', _task.task_id) # delay for MongoDB to process counts, received lately (see XRAY-87) time.sleep(1) self._run_complete_task_cmd(_task.task_id) def _run_complete_task_cmd(self, task_id): try: subprocess.check_output([ 'cloudlinux-xray-manager', 'stop', '--system_id', self.sys_id, '--tracing_task_id', task_id ], timeout=120, stderr=subprocess.PIPE) except (subprocess.TimeoutExpired, subprocess.CalledProcessError) as e: logger.error('Failed to complete task %s: %s', task_id, e) @staticmethod def hide_symbols(mysql_query: str) -> str: """ Sanitize data in single quotes from MySQL query """ def replacer(m): """ Works with whole string in single or double quotes """ q = m.group('quote') t = m.group('trunc') def inner_repl(inner_m): """ Works with characters inside quotes """ if inner_m.group('digit'): return '0' elif inner_m.group('symbol'): return 'x' sanitized = re.sub(r"((?P<digit>\d)|(?P<symbol>[^0-9_:;\-/',. \\]))", inner_repl, m.group('in_quote')) # wrap sanitized string back with originally detected characters # (quotes/truncation marker) return f'{q}{sanitized}{t or q}' # string either wrapped in quotes (single or double) or # starting from quote and finishing with ... (truncation marker) # including escaped with either / or \ quote pattern = re.compile(r"""(?P<quote>['"])(?P<in_quote>.*?)((?<![\\|/])(?P=quote)|(?P<trunc>\.{3}))""") return re.sub(pattern, replacer, mysql_query)
| ver. 1.6 |
Github
|
.
| PHP 8.3.30 | Генерация страницы: 0 |
proxy
|
phpinfo
|
Настройка