1- import copy
1+ import contextvars
22import json
33import logging
4+ import threading
45import typing
56from queue import Empty as EmptyQueue
67from queue import Queue
@@ -62,6 +63,7 @@ def __init__(
6263 self ._shutdown : bool = False
6364 for thread in self ._threads :
6465 thread .start ()
66+ logger .debug ("Exporter Thread %s started" , thread .ident )
6567
6668 def export (self , spans : trace .Sequence [ReadableSpan ]) -> SpanExportResult :
6769 def is_evaluated_file (
@@ -89,15 +91,25 @@ def is_evaluated_file(
8991 self ._upload_queue .put (
9092 (
9193 span ,
92- copy .deepcopy (evaluation_context ),
93- )
94+ contextvars .copy_context ()[self ._client .evaluation_context_variable ],
95+ ),
96+ )
97+ logger .debug (
98+ "Span %s with EvaluationContext %s added to upload queue" ,
99+ span .attributes ,
100+ contextvars .copy_context ()[self ._client .evaluation_context_variable ],
94101 )
95102 # Reset the EvaluationContext so run eval does not
96103 # create a duplicate Log
97104 if evaluation_context is not None and is_evaluated_file (
98105 spans [0 ],
99106 evaluation_context ,
100107 ):
108+ logger .debug (
109+ "EvaluationContext %s marked as exhausted for Log in Span %s" ,
110+ evaluation_context ,
111+ spans [0 ].attributes ,
112+ )
101113 # Mark the EvaluationContext as used
102114 self ._client .evaluation_context_variable .set (None )
103115 return SpanExportResult .SUCCESS
@@ -109,6 +121,7 @@ def shutdown(self) -> None:
109121 self ._shutdown = True
110122 for thread in self ._threads :
111123 thread .join ()
124+ logger .debug ("Exporter Thread %s joined" , thread .ident )
112125
113126 def force_flush (self , timeout_millis : int = 3000 ) -> bool :
114127 self ._shutdown = True
@@ -146,21 +159,37 @@ def _do_work(self):
146159 # Set the EvaluationContext for the thread so the .log action works as expected
147160 # NOTE: Expecting the evaluation thread to send a single span so we are
148161 # not resetting the EvaluationContext in the scope of the export thread
149- self ._client .evaluation_context_variable .set (
150- copy .deepcopy (evaluation_context ),
151- )
162+ self ._client .evaluation_context_variable .set (evaluation_context )
152163 except EmptyQueue :
153164 continue
154165 trace_metadata = TRACE_FLOW_CONTEXT .get (span_to_export .get_span_context ().span_id )
155166 if trace_metadata is None :
156167 # Span is not part of a Flow Log
157168 self ._export_span_dispatch (span_to_export )
169+ logger .debug (
170+ "_do_work on Thread %s: Dispatched span %s with FlowContext %s which is not part of a Flow" ,
171+ threading .get_ident (),
172+ span_to_export .attributes ,
173+ trace_metadata ,
174+ )
158175 elif trace_metadata ["trace_parent_id" ] is None :
159176 # Span is the head of a Flow Trace
160177 self ._export_span_dispatch (span_to_export )
178+ logger .debug (
179+ "Dispatched span %s which is a Flow Log with FlowContext %s" ,
180+ span_to_export .attributes ,
181+ trace_metadata ,
182+ )
161183 elif trace_metadata ["trace_parent_id" ] in self ._span_id_to_uploaded_log_id :
162184 # Span is part of a Flow and its parent has been uploaded
163185 self ._export_span_dispatch (span_to_export )
186+ logger .debug (
187+ "_do_work on Thread %s: Dispatched span %s after its parent %s with FlowContext %s" ,
188+ threading .get_ident (),
189+ span_to_export .attributes ,
190+ trace_metadata ["trace_parent_id" ],
191+ trace_metadata ,
192+ )
164193 else :
165194 # Requeue the Span to be uploaded later
166195 self ._upload_queue .put ((span_to_export , evaluation_context ))
0 commit comments