'NdgGddlZddlZddlZddlZddlZddlZddlZddlmZddl m Z m Z ddl m Z ddlmZmZddlmZmZmZddlmZddlmZmZmZdd lmZejeZeZ ej!d d d gZ"Gd de Z#GddeZ$dZ%Gdde&Z'Gdde&Z(Gddej)Z*dS)N) attrgetter)MessageReject)BaseMessageProcessor) is_enabledmqtt_tracked_methods)Genmessage_id_gen publisher)safe_cancel_task)DAY ServiceBase rate_limit)gProcessingMessagemessage start_timec2eZdZdZdZdZdZdZdZdS)TheSinkct|td|_||_t |t |j|_|t_dS)NPROCESSING_ORDER)key) sortedr_sinks_ordered_loop TaskManagerMessageProcessor _task_managerrsink)self sink_listloops W/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/internals/the_sink.py__init__zTheSink.__init__#s`$ :&899    ( "4#677  c8|jjd|jjS)N.) __class__ __module____name__r s r#__repr__zTheSink.__repr__-s .333T^5L5LMMr%cfd|jD}t|dks Jdtt|dS)ze introspection: decompose a specific role :return classobj: instance or None c4g|]}t||S) isinstance).0rclassobjs r# z%TheSink.decompose..5s8   JtX4N4N    r%zAmbiguous requestN)rlennextiter)r r2optionss ` r# decomposezTheSink.decompose0sf     !0   7||q   "5   DMM4(((r%c8|jdS)z Make sure to run message processing bus only when every MessageSource (or MessageSource+MessageSink mix) got initialized N)rstartr+s r#r;z TheSink.start;s   """""r%cXKtd|jtd|jdd{Vtd|jd{VdS)Nzshutdown the sink startedzwait for current taskstimeoutzfinish wait task)loggerinfor should_stopwait_current_taskswaitr+s r#shutdownzTheSink.shutdownCs /000 &&((( ,--- 33A3>>>>>>>>> &''' %%'''''''''''r%cJK|j|d{VdSN)rpush_msg)r rs r#process_messagezTheSink.process_messageKs5 ))'22222222222r%N) r*r) __qualname__r$r,r9r;rErIr/r%r#rr"sqNNN ) ) )###(((33333r%rcpeZdZdZdZdZfdZdZedZ d dZ d Z d Z e d ZxZS) rir=c~t|t|j|_|j|_|j|_||_ tj |_ tttj|_|tj|_dS)N)maxsize)periodon_drop)superr$ MessageQueueMAXSIZE_queue CONCURRENCY _concurrencyTIMEOUT_process_message_timeout_msg_processorweakrefWeakSettasksrr r@warning_throttled_loggererrorthrottled_log_error)r r" msg_processorr(s r#r$zTaskManager.__init__Ws "4<888  ,(, %+_&& !+3!O!O!O#'#9#9&,#G#G   r%c,K|js/|jt|d{VdS|jjrd|j|j|f}n"d|j|j|f}|j|dS)z&Push message unless the queue is full.NzNMessage queue is full %s. Current processing messages: %s. Message ignored: %szZMessage queue is full. Queue size: %s Current processing messages: %s. Message ignored: %s) rTfullputMessageComparabler^should_be_calledcurrent_processing_messagesqsizer`)r msgargss r#rHzTaskManager.push_msgas{!! ,+//"3C"8"899 9 9 9 9 9 9 9 9 9%6 OK4OK%%''4 %D $d + + + +r%c>td|jDS)Nc3K|]R}||jjtt j|jjz dfVSdS)N)doneprocessing_msgrroundtime monotonicr)r1tasks r# z:TaskManager.current_processing_messages..sq  99;;  #+dn&&)<)GGKK       r%)tupler\r+s r#rgz'TaskManager.current_processing_messages|s6         r%NcK|jrOd|jD}td|t j|j|d{VdSdS)Ncjg|]0\}}|d|d|f1S)method message_id)get)r1mlastings r#r3z2TaskManager.wait_current_tasks..sIAwx!%% "5"5w?r%z#Waiting for %r processing to finishr>)r\rgr@rAasynciorD)r r?msg_to_processs r#rCzTaskManager.wait_current_taskss : <"&"BN KK5   ,tz7;;; ; ; ; ; ; ; ; ; ; < z"TaskManager._run..si.?.?.A.Ar%z3There is still %s unprocessed messages in the queue Error during message processing:)r}BoundedSemaphorerV _should_stopr@debugrTrh_TaskManager__limit_concurrencyrzCancelledErrorr create_taskrYrirrqrrroadd_done_callback_on_msg_processedr\addr] exception)r msg_comparablet unprocessedrs @r#_runzTaskManager._runs,T->??  A' " 5t{7H7H7J7JKKK229=========+/;??+<+<%<%<%<%<%<%Nz+Message hasn't been processed in %s seconds)r}wait_foracquirerX TimeoutErrorr`)r rs r#__limit_concurrencyzTaskManager.__limit_concurrencys  $-%%'' 9'   ((A1  s28*A%$A%cn|}|rtd|dSdS)Nr)exc_info)rr@)futurees r#rzTaskManager._on_msg_processedsH       M   ?!  L L L L L M Mr%rG)r*r)rJrSrUrWr$rHpropertyrgrCrr staticmethodr __classcell__r(s@r#rrOsGKGHHHHH,,,6   X   < < < <AAA8   MM\MMMMMr%rc`K|st|d{VdSdSrG)rnr )rss r# cancel_taskrsF 99;;%t$$$$$$$$$$$%%r%c$eZdZdZdZdZdZdS)rrLc||_tj|_t dt j|_dS)NrL)rO)sinksrZWeakValueDictionarylocksrr@r_r`)r rs r#r$zMessageProcessor.__init__sC 022 #=:W#=#=#= L$ $    r%cZK|d}|rv|j|tj}|4d{V||d{Vdddd{VdS#1d{VswxYwYdS||d{VdS)N attackers_ip)rzr setdefaultr}Lock_call_unlocked)r riiplocks r#__call__zMessageProcessor.__call__sT WW^ $ $  +:((W\^^<zRejected: %s -> %r)filerzCMessage %r was not processed in the %r plugin in %ss; Traceback: %szError processing %r in %rz%s processed in %.4f secondszE%s message took longer to process than expected (%.4f sec > %.4f sec))$rrzrr enrichr report_reporter_gen_sinkrqrrrr}rrIrshieldTIMEOUT_TO_SINK_PROCESSr0rrrrr@rAstrrioStringIO print_stackseekr_read ExceptionrPROCESSING_TIME_THRESHOLDr`) r rir;rprocess_message_task processedrstackprocessing_times r#rzMessageProcessor._call_unlockeds  ' ' '!!%9%;%;;;%S00  !# & & &08MNNNN  J* 8* 8D) 8'.':((--(($#*"2 N#788 8 ### Di11$#C7)   8""6777777777777    0#a&&#>>>2""67777777777771'     $00e0<<< 1  $0JJLL ""6777777777777     !N)r*r)rJrr$rrr/r%r#rrsL"   +++EEEEEr%rcBeZdZdZdZefdZdZdZxZ S)rez#Wrapper to make message comparable.c|xjdz c_t|}|j|jf|_||_|SNr4)indexrQ__new__PRIORITYpriorityri)clsrirvr(s r#rzMessageComparable.__new__/sC Q WW__S ! !lCI-  r%c@|j|jSrG)r__lt__)r others r#rzMessageComparable.__lt__7s}##EN333r%cZd|jj|j|jS)Nz'<{klass}({msg!r}), priority={priority}>)klassrir)formatr(r*rirr+s r#r,zMessageComparable.__repr__:s28??.)]@   r%) r*r)rJ__doc__rrrrr,rrs@r#rere)sm-- E\444       r%rec4eZdZfdZdeffd ZdZxZS)rRctj|i|tj|_d|j_d|j_dS)N2i)rQr$reprlibRepr_repr maxstringmaxtuple)r rjkwargsr(s r#r$zMessageQueue.__init__CsF$)&)))\^^ ! " r%itemcVKt|d{VSrG)rQrd)r rr(s r#rdzMessageQueue.putIs/WW[[&&&&&&&&&r%cttjd|jDdd}d|jd|d|j|dS) Nc0g|]}|jjjSr/)rir(rJ)r1rs r#r3z(MessageQueue.__str__..Ps IIIT#0IIIr%c|dSrr/)rs r#rz&MessageQueue.__str__..Rs T!Wr%T)rreversez) r collectionsCounterrTitemsrNrhrrepr)r msg_countss r#__str__zMessageQueue.__str__Ls  IIT[III  egg$$     rsh  >>>>>>>>BBBBBB 322222EEEEEEEEEE444444  8 $ $SUU*K*)\2 *3*3*3*3*3"*3*3*3ZwMwMwMwMwM+wMwMwMt%%% XXXXXvXXXv        2     7(     r%