Файловый менеджер - Редактировать - /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/api/server/send_message.py
Назад
import base64 import json import os import time import urllib.error try: import nats _has_nats = True except ImportError: _has_nats = False import urllib.request from abc import ABC, abstractmethod from logging import getLogger from typing import Optional import asyncio import uuid from defence360agent.api.server import ( API, APIError, APITokenError, FGWSendMessgeException, NATSSendMessageException, ) from defence360agent.contracts.config import Core from defence360agent.contracts.messages import Message from defence360agent.internals.global_scope import g from defence360agent.internals.iaid import ( IndependentAgentIDAPI, IAIDTokenError, ) from defence360agent.utils.async_utils import AsyncIterate from defence360agent.utils.json import ServerJSONEncoder logger = getLogger(__name__) async def _nats_error_cb(ex: Exception) -> None: """Downgrade nats-py internal errors to DEBUG. Transient errors (ConnectionRefused, AuthorizationViolation) are expected during agent restarts. Our code already logs a WARNING with context, so the nats-py default ERROR + traceback is noise. """ logger.debug("nats: %s", ex) class BaseSendMessageAPI(API, ABC): URL = "/api/v2/send-message/{method}" @abstractmethod async def _send_request(self, message_method, headers, post_data) -> dict: pass # pragma: no cover def check_response(self, result: dict) -> None: if "status" not in result: raise APIError("unexpected server response: {!r}".format(result)) if result["status"] != "ok": raise APIError("server error: {}".format(result.get("msg"))) async def send_data(self, method: str, post_data: bytes) -> None: try: token = await IndependentAgentIDAPI.get_token() except IAIDTokenError as e: raise APITokenError(f"IAID token error occurred {e}") headers = { "Content-Type": "application/json", "X-Auth": token, } result = await self._send_request(method, headers, post_data) self.check_response(result) class SendMessageAPI(BaseSendMessageAPI): _SOCKET_TIMEOUT = Core.DEFAULT_SOCKET_TIMEOUT def __init__(self, rpm_ver: str, base_url: str = None, executor=None): self._executor = executor self.rpm_ver = rpm_ver self.product_name = "" self.server_id = None # type: Optional[str] self.license = {} # type: dict if base_url: self.base_url = base_url else: self.base_url = self._BASE_URL def set_product_name(self, product_name: str) -> None: self.product_name = product_name def set_server_id(self, server_id: Optional[str]) -> None: self.server_id = server_id def set_license(self, license: dict) -> None: self.license = license async def _send_request(self, message_method, headers, post_data): request = urllib.request.Request( self.base_url + self.URL.format(method=message_method), data=post_data, headers=headers, method="POST", ) return await self.async_request(request, executor=self._executor) async def send_message(self, message: Message) -> None: # add message handling time if it does not exist, so that # the server does not depend on the time it was received if "timestamp" not in message: message["timestamp"] = time.time() if "message_id" not in message: message["message_id"] = uuid.uuid4().hex if "method" not in message: message["method"] = "INCIDENT_LIST" data2send = { "payload": message.payload, "rpm_ver": self.rpm_ver, "message_id": message.message_id, "server_id": self.server_id, "name": self.product_name, } post_data = json.dumps(data2send, cls=ServerJSONEncoder).encode() await self.send_data(message.method, post_data) class FileBasedGatewayAPI(SendMessageAPI): async def _prepare_message(self, message, semaphore) -> dict: async with semaphore: loaded = await asyncio.to_thread(json.loads, message) return { "method": loaded["method"], "data": {k: v for k, v in loaded.items() if k != "method"}, } async def send_messages(self, messages: list[tuple[float, bytes]]) -> None: max_threads = 5 semaphore = asyncio.Semaphore(max_threads) tasks = [ self._prepare_message(msg, semaphore) async for _, msg in AsyncIterate(messages) ] prepared_messages = await asyncio.gather(*tasks) dumped_messages = await asyncio.to_thread( json.dumps, prepared_messages ) bin_file_path = os.getenv( "I360_MESSAGE_GATEWAY_BIN_PATH", "/usr/libexec/" ) bin_file = os.path.join(bin_file_path, "imunify-message-gateway") command = [ bin_file, "send-many", "--producer=i360-agent-non-resident", ] process = await asyncio.create_subprocess_exec( *command, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) b64data = base64.b64encode(dumped_messages.encode()) stdout, stderr = await process.communicate(input=b64data) if g.get("DEBUG"): logger.info( "Message sent to fgw: %s %s %s", len(messages), stdout, stderr ) if process.returncode != 0: logger.error(f"Error sending message: {stderr.decode()}") raise FGWSendMessgeException( str(f"Error sending message: {stderr.decode()}") ) class NATSGatewayAPI: """Publishes messages to the embedded NATS server via localhost TCP. Connects to nats://127.0.0.1:<port> with an auth token read from a file written by the resident-agent on startup. """ NATS_SUBJECT_PREFIX = "imunify.api." DEFAULT_PORT = 44222 DEFAULT_TOKEN_PATH = "/var/run/imunify360/nats.token" CONNECT_TIMEOUT = 5 MIN_RECONNECT_INTERVAL = 5 def __init__(self): self._nc = None self._last_connect_attempt = 0 async def _ensure_connected(self): if self._nc is not None and self._nc.is_connected: return if not _has_nats: raise NATSSendMessageException("nats-py is not installed") now = time.monotonic() since_last = now - self._last_connect_attempt if since_last < self.MIN_RECONNECT_INTERVAL: raise NATSSendMessageException( "NATS reconnect backoff" f" ({self.MIN_RECONNECT_INTERVAL - since_last:.1f}s remaining)" ) self._last_connect_attempt = now # Clean up stale connection before reconnecting await self._close() port = int(os.getenv("I360_NATS_PORT", str(self.DEFAULT_PORT))) token_path = os.getenv("I360_NATS_TOKEN_PATH", self.DEFAULT_TOKEN_PATH) try: with open(token_path) as f: token = f.read().strip() self._nc = await nats.connect( f"nats://127.0.0.1:{port}", token=token, connect_timeout=self.CONNECT_TIMEOUT, max_reconnect_attempts=0, error_cb=_nats_error_cb, ) logger.info("Connected to NATS at 127.0.0.1:%s", port) except Exception as e: raise NATSSendMessageException( f"Failed to connect to NATS: {e}" ) from e async def send_messages(self, messages: list[tuple[float, bytes]]) -> None: await self._ensure_connected() published = 0 try: js = self._nc.jetstream() for _, msg_bytes in messages: try: loaded = json.loads(msg_bytes) except (json.JSONDecodeError, UnicodeDecodeError) as e: logger.warning("Skipping malformed message: %s", e) published += 1 # count as handled, not re-queued continue method = loaded.pop("method", "UNKNOWN") message_id = loaded.get("message_id") payload = json.dumps(loaded).encode() subject = self.NATS_SUBJECT_PREFIX + method headers = {} if message_id: headers["Nats-Msg-Id"] = message_id ack = await js.publish( subject, payload, headers=headers if headers else None, ) published += 1 if g.get("DEBUG"): logger.debug( "Published to %s, stream=%s, seq=%s", subject, ack.stream, ack.seq, ) except Exception as e: await self._close() logger.warning( "NATS publish failed after %d/%d messages: %s", published, len(messages), e, ) raise NATSSendMessageException( f"Failed to publish messages: {e}", published=published, ) from e async def _close(self): if self._nc is not None: try: await self._nc.drain() except Exception: pass finally: self._nc = None async def close(self): await self._close()
| ver. 1.6 |
Github
|
.
| PHP 8.3.30 | Генерация страницы: 0 |
proxy
|
phpinfo
|
Настройка