Skip to content

Commit ce09d3d

Browse files
author
Andrei Bratu
committed
wip
1 parent bc5fc8a commit ce09d3d

File tree

8 files changed

+305
-1289
lines changed

8 files changed

+305
-1289
lines changed

poetry.lock

Lines changed: 133 additions & 1179 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ parse = ">=1"
5151
pydantic = ">= 1.9.2"
5252
pydantic-core = "^2.18.2"
5353
typing_extensions = ">= 4.0.0"
54+
opentelemetry-proto = "^1.30.0"
5455

5556
[tool.poetry.dev-dependencies]
5657
mypy = "1.0.1"
@@ -59,7 +60,6 @@ pytest-asyncio = "^0.23.5"
5960
python-dateutil = "^2.9.0"
6061
types-python-dateutil = "^2.9.0.20240316"
6162
anthropic = ">=0.37.1"
62-
chromadb = "^0.6.3"
6363
cohere = "^5.11.2"
6464
groq = ">=0.11.0"
6565
jsonschema = "^4.23.0"

src/humanloop/context.py

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
from contextlib import contextmanager
12
from dataclasses import dataclass
23
import threading
3-
from typing import Callable, Optional
4+
from typing import Callable, Generator, Optional
45
from opentelemetry import context as context_api
56

67
from humanloop.otel.constants import (
@@ -10,20 +11,16 @@
1011
)
1112

1213

13-
ResetToken = object
14-
15-
1614
def get_trace_id() -> Optional[str]:
1715
key = hash((HUMANLOOP_CONTEXT_TRACE_ID, threading.get_ident()))
1816
return context_api.get_value(key=key)
1917

2018

21-
def set_trace_id(flow_log_id: str) -> ResetToken:
19+
@contextmanager
20+
def set_trace_id(flow_log_id: str) -> Generator[None, None, None]:
2221
key = hash((HUMANLOOP_CONTEXT_TRACE_ID, threading.get_ident()))
23-
return context_api.attach(context_api.set_value(key=key, value=flow_log_id))
24-
25-
26-
def reset_trace_id_context(token: ResetToken):
22+
token = context_api.attach(context_api.set_value(key=key, value=flow_log_id))
23+
yield
2724
context_api.detach(token=token)
2825

2926

@@ -33,37 +30,54 @@ class PromptContext:
3330
template: Optional[str]
3431

3532

36-
def set_prompt_context(prompt_context: PromptContext) -> ResetToken:
33+
@contextmanager
34+
def set_prompt_context(prompt_context: PromptContext) -> Generator[None, None, None]:
3735
key = hash((HUMANLOOP_CONTEXT_PROMPT, threading.get_ident()))
38-
return context_api.attach(
36+
reset_token = context_api.attach(
3937
context_api.set_value(
4038
key=key,
4139
value=prompt_context,
4240
)
4341
)
44-
45-
46-
def reset_prompt_context(token: ResetToken):
47-
context_api.detach(token=token)
42+
yield
43+
context_api.detach(token=reset_token)
4844

4945

5046
def get_prompt_context() -> Optional[PromptContext]:
5147
key = hash((HUMANLOOP_CONTEXT_PROMPT, threading.get_ident()))
5248
return context_api.get_value(key)
5349

5450

55-
@dataclass
5651
class EvaluationContext:
5752
source_datapoint_id: str
5853
run_id: str
5954
callback: Callable[[str], None]
6055
file_id: str
6156
path: str
62-
63-
64-
def set_evaluation_context(evaluation_context: EvaluationContext) -> ResetToken:
57+
logging_counter: int
58+
59+
def __init__(
60+
self,
61+
source_datapoint_id: str,
62+
run_id: str,
63+
callback: Callable[[str], None],
64+
file_id: str,
65+
path: str,
66+
):
67+
self.source_datapoint_id = source_datapoint_id
68+
self.run_id = run_id
69+
self.callback = callback
70+
self.file_id = file_id
71+
self.path = path
72+
self.logging_counter = 0
73+
74+
75+
@contextmanager
76+
def set_evaluation_context(evaluation_context: EvaluationContext) -> Generator[None, None, None]:
6577
key = hash((HUMANLOOP_CONTEXT_EVALUATION, threading.get_ident()))
66-
return context_api.attach(context_api.set_value(key, evaluation_context))
78+
reset_token = context_api.attach(context_api.set_value(key, evaluation_context))
79+
yield
80+
context_api.detach(token=reset_token)
6781

6882

6983
def get_evaluation_context() -> Optional[EvaluationContext]:

src/humanloop/eval_utils/run.py

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -167,46 +167,45 @@ def upload_callback(log_id: str):
167167
)
168168

169169
# Set the Evaluation Context for current datapoint
170-
set_evaluation_context(
170+
with set_evaluation_context(
171171
EvaluationContext(
172172
source_datapoint_id=dp.id,
173173
callback=upload_callback,
174174
file_id=hl_file.id,
175175
run_id=run.id,
176176
path=hl_file.path,
177177
)
178-
)
179-
180-
log_func = _get_log_func(
181-
client=client,
182-
file_type=hl_file.type,
183-
file_id=hl_file.id,
184-
version_id=hl_file.version_id,
185-
run_id=run.id,
186-
)
187-
start_time = datetime.now()
188-
try:
189-
output = _call_function(function_, hl_file.type, dp)
190-
if not _callable_is_decorated(file):
191-
# function_ is a plain callable so we need to create a Log
178+
):
179+
log_func = _get_log_func(
180+
client=client,
181+
file_type=hl_file.type,
182+
file_id=hl_file.id,
183+
version_id=hl_file.version_id,
184+
run_id=run.id,
185+
)
186+
start_time = datetime.now()
187+
try:
188+
output = _call_function(function_, hl_file.type, dp)
189+
if not _callable_is_decorated(file):
190+
# function_ is a plain callable so we need to create a Log
191+
log_func(
192+
inputs=dp.inputs,
193+
output=output,
194+
start_time=start_time,
195+
end_time=datetime.now(),
196+
)
197+
except Exception as e:
192198
log_func(
193199
inputs=dp.inputs,
194-
output=output,
200+
error=str(e),
201+
source_datapoint_id=dp.id,
202+
run_id=run.id,
195203
start_time=start_time,
196204
end_time=datetime.now(),
197205
)
198-
except Exception as e:
199-
log_func(
200-
inputs=dp.inputs,
201-
error=str(e),
202-
source_datapoint_id=dp.id,
203-
run_id=run.id,
204-
start_time=start_time,
205-
end_time=datetime.now(),
206-
)
207-
logger.warning(
208-
msg=f"\nYour {hl_file.type}'s `callable` failed for Datapoint: {dp.id}. \n Error: {str(e)}"
209-
)
206+
logger.warning(
207+
msg=f"\nYour {hl_file.type}'s `callable` failed for Datapoint: {dp.id}. \n Error: {str(e)}"
208+
)
210209

211210
with ThreadPoolExecutor(max_workers=workers) as executor:
212211
for datapoint in hl_dataset.datapoints:

src/humanloop/otel/exporter.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@
1717
HUMANLOOP_PATH_KEY,
1818
)
1919
from humanloop.otel.helpers import read_from_opentelemetry_span, write_to_opentelemetry_span
20+
from opentelemetry.proto.common.v1.common_pb2 import KeyValue, Link
21+
from opentelemetry.proto.trace.v1.trace_pb2 import (
22+
TracesData,
23+
ResourceSpans,
24+
InstrumentationScope,
25+
ScopeSpans,
26+
Span as ProtoBufferSpan,
27+
)
2028

2129
if typing.TYPE_CHECKING:
2230
from humanloop.client import Humanloop
@@ -118,15 +126,58 @@ def _do_work(self):
118126
value=log_args,
119127
)
120128

129+
payload = TracesData(
130+
resource_spans=[
131+
ResourceSpans(
132+
scope_spans=ScopeSpans(
133+
scope=InstrumentationScope(
134+
name="humanloop-otel",
135+
version="0.1.0",
136+
),
137+
spans=[
138+
ProtoBufferSpan(
139+
trace_id=span_to_export.context.trace_id,
140+
span_id=span_to_export.span_id,
141+
name=span_to_export.name,
142+
kind=span_to_export.kind,
143+
start_time_unix_nano=span_to_export.start_time,
144+
end_time_unix_nano=span_to_export.end_time,
145+
attributes=[
146+
KeyValue(
147+
key=key,
148+
value=value,
149+
)
150+
for key, value in span_to_export.attributes.items()
151+
],
152+
dropped_attributes_count=len(span_to_export.dropped_attributes),
153+
dropped_events_count=len(span_to_export.dropped_events),
154+
dropped_links_count=len(span_to_export.dropped_links),
155+
links=[
156+
Link(
157+
trace_id=link.trace_id,
158+
span_id=link.span_id,
159+
attributes=link.attributes,
160+
)
161+
for link in span_to_export.links
162+
],
163+
events=[],
164+
)
165+
],
166+
)
167+
)
168+
]
169+
)
170+
121171
response = requests.post(
122-
f"{self._client._client_wrapper.get_base_url()}/import/otel",
172+
f"{self._client._client_wrapper.get_base_url()}/import/otel/v1/traces",
123173
headers=self._client._client_wrapper.get_headers(),
124174
data=span_to_export.to_json().encode("ascii"),
125175
)
126176
if response.status_code != 200:
127177
# TODO: handle
128178
pass
129179
else:
180+
print("FOO", response.json())
130181
if evaluation_context and file_path == evaluation_context.path:
131182
log_id = response.json()["log_id"]
132183
evaluation_context.callback(log_id)

src/humanloop/utilities/flow.py

Lines changed: 48 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22
from functools import wraps
3-
from typing import Any, Callable, Mapping, Optional, Sequence, TypeVar
3+
from typing import Any, Callable, Optional, TypeVar
44
from typing_extensions import ParamSpec
55

66
from opentelemetry.trace import Span, Tracer
@@ -67,56 +67,54 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> Optional[R]:
6767
# **log_inputs,
6868
# log_status="incomplete",
6969
# )
70-
token = set_trace_id(init_log["id"])
71-
72-
span.set_attribute(HUMANLOOP_PATH_KEY, decorator_path)
73-
span.set_attribute(HUMANLOOP_FILE_TYPE_KEY, file_type)
74-
75-
func_output: Optional[R]
76-
log_output: str
77-
log_error: Optional[str]
78-
log_output_message: ChatMessage
79-
try:
80-
func_output = func(*args, **kwargs)
81-
if (
82-
isinstance(func_output, dict)
83-
and len(func_output.keys()) == 2
84-
and "role" in func_output
85-
and "content" in func_output
86-
):
87-
log_output_message = ChatMessage(**func_output)
88-
log_output = None
89-
else:
90-
log_output = process_output(func=func, output=func_output)
70+
with set_trace_id(init_log["id"]):
71+
span.set_attribute(HUMANLOOP_PATH_KEY, decorator_path)
72+
span.set_attribute(HUMANLOOP_FILE_TYPE_KEY, file_type)
73+
74+
func_output: Optional[R]
75+
log_output: str
76+
log_error: Optional[str]
77+
log_output_message: ChatMessage
78+
try:
79+
func_output = func(*args, **kwargs)
80+
if (
81+
isinstance(func_output, dict)
82+
and len(func_output.keys()) == 2
83+
and "role" in func_output
84+
and "content" in func_output
85+
):
86+
log_output_message = ChatMessage(**func_output)
87+
log_output = None
88+
else:
89+
log_output = process_output(func=func, output=func_output)
90+
log_output_message = None
91+
log_error = None
92+
except Exception as e:
93+
logger.error(f"Error calling {func.__name__}: {e}")
94+
output = None
9195
log_output_message = None
92-
log_error = None
93-
except Exception as e:
94-
logger.error(f"Error calling {func.__name__}: {e}")
95-
output = None
96-
log_output_message = None
97-
log_error = str(e)
98-
99-
flow_log = {
100-
"inputs": {k: v for k, v in args_to_func.items() if k != "messages"},
101-
"messages": args_to_func.get("messages"),
102-
"log_status": "complete",
103-
"output": log_output,
104-
"error": log_error,
105-
"output_message": log_output_message,
106-
"id": init_log["id"],
107-
}
108-
109-
# Write the Flow Log to the Span on HL_LOG_OT_KEY
110-
if flow_log:
111-
write_to_opentelemetry_span(
112-
span=span,
113-
key=HUMANLOOP_LOG_KEY,
114-
value=flow_log, # type: ignore
115-
)
116-
117-
context_api.detach(token=token)
118-
# Return the output of the decorated function
119-
return output
96+
log_error = str(e)
97+
98+
flow_log = {
99+
"inputs": {k: v for k, v in args_to_func.items() if k != "messages"},
100+
"messages": args_to_func.get("messages"),
101+
"log_status": "complete",
102+
"output": log_output,
103+
"error": log_error,
104+
"output_message": log_output_message,
105+
"id": init_log["id"],
106+
}
107+
108+
# Write the Flow Log to the Span on HL_LOG_OT_KEY
109+
if flow_log:
110+
write_to_opentelemetry_span(
111+
span=span,
112+
key=HUMANLOOP_LOG_KEY,
113+
value=flow_log, # type: ignore
114+
)
115+
116+
# Return the output of the decorated function
117+
return func_output
120118

121119
wrapper.file = File( # type: ignore
122120
path=decorator_path,

0 commit comments

Comments
 (0)