🔐 Sid Gifari File Manager Pro
v8.0.5 | 2026-06-23 05:01:59 | PHP 8.2.31
📂
/ (Root)
/
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
defence360agent
/
internals
/
__pycache__
📍 /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/internals/__pycache__
🔄 Refresh
✏️
Editing: the_sink.cpython-311.pyc
Read Only
� 'Nd�gG� � � d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZ d dl m Z mZ d dlm Z d dlmZmZ d dlmZmZmZ d dlmZ d dlmZmZmZ d d lmZ ej e� � Z e� � Z ej! d ddg� � Z" G d � de � � Z# G d� de� � Z$d� Z% G d� de&� � Z' G d� de&� � Z( G d� de j) � � Z*dS )� N)� attrgetter)�Message�Reject)�BaseMessageProcessor)� is_enabled�mqtt_tracked_methods)�Gen�message_id_gen� publisher)�safe_cancel_task)�DAY�ServiceBase� rate_limit)�g�ProcessingMessage�message� start_timec �2 � e Zd Zd� Zd� Zd� Zd� Zd� Zd� ZdS )�TheSinkc � � t |t d� � �� � | _ || _ t |t | j � � � � | _ | t _ d S )N�PROCESSING_ORDER)�key) �sortedr �_sinks_ordered�_loop�TaskManager�MessageProcessor� _task_managerr �sink)�self� sink_list�loops �W/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/internals/the_sink.py�__init__zTheSink.__init__# s` � �$��:�&8�9�9� � � ��� �� �(��"�4�#6�7�7� � ��� ����� c �8 � | j j �d| j j ��S )N�.)� __class__� __module__�__name__�r s r# �__repr__zTheSink.__repr__- s � ��.�3�3�3�T�^�5L�5L�M�Mr% c � �� �fd�| j D � � }t |� � dk s J d� � �t t |� � d� � S )ze introspection: decompose a specific role :return classobj: instance or None c �4 �� g | ]}t |�� � �|��S � )� isinstance)�.0r �classobjs �r# � <listcomp>z%TheSink.decompose.<locals>.<listcomp>5 s8 �� � � � ��J�t�X�4N�4N� �� � � r% � zAmbiguous requestN)r �len�next�iter)r r2 �optionss ` r# � decomposezTheSink.decompose0 sf �� � � � � �!�0� � � �� �7�|�|�q� � � �"5� � � ��D��M�M�4�(�(�(r% c �8 � | j � � � dS )z� Make sure to run message processing bus only when every MessageSource (or MessageSource+MessageSink mix) got initialized N)r �startr+ s r# r; z TheSink.start; s � � �� � �"�"�"�"�"r% c � �X K � t � d� � | j � � � t � d� � | j � d�� � � d {V �� t � d� � | j � � � � d {V �� d S )Nzshutdown the sink startedzwait for current tasks� ��timeoutzfinish wait task)�logger�infor �should_stop�wait_current_tasks�waitr+ s r# �shutdownzTheSink.shutdownC s� � � � ����/�0�0�0���&�&�(�(�(����,�-�-�-�� �3�3�A�3�>�>�>�>�>�>�>�>�>����&�'�'�'�� �%�%�'�'�'�'�'�'�'�'�'�'�'r% c � �J K � | j � |� � � d {V �� d S �N)r �push_msg)r r s r# �process_messagezTheSink.process_messageK s5 � � � �� �)�)�'�2�2�2�2�2�2�2�2�2�2�2r% N) r* r) �__qualname__r$ r, r9 r; rE rI r/ r% r# r r "