From b98b1001a56c3460d05423e9026424a64d4c2f2e Mon Sep 17 00:00:00 2001 From: fern-api <115122769+fern-api[bot]@users.noreply.github.com> Date: Fri, 28 Feb 2025 15:25:35 +0000 Subject: [PATCH 1/7] Release 0.8.25 --- src/humanloop/flows/client.py | 454 ++++++++++++++++++---------------- 1 file changed, 244 insertions(+), 210 deletions(-) diff --git a/src/humanloop/flows/client.py b/src/humanloop/flows/client.py index 45187f6c..6452b27b 100644 --- a/src/humanloop/flows/client.py +++ b/src/humanloop/flows/client.py @@ -14,13 +14,13 @@ from ..types.http_validation_error import HttpValidationError from json.decoder import JSONDecodeError from ..core.api_error import ApiError -from ..types.flow_response import FlowResponse +from ..types.flow_log_response import FlowLogResponse from ..core.jsonable_encoder import jsonable_encoder +from ..types.flow_response import FlowResponse from ..types.project_sort_by import ProjectSortBy from ..types.sort_order import SortOrder from ..core.pagination import SyncPager from ..types.paginated_data_flow_response import PaginatedDataFlowResponse -from ..types.flow_log_response import FlowLogResponse from ..types.version_status import VersionStatus from ..types.list_flows import ListFlows from ..types.file_environment_response import FileEnvironmentResponse @@ -79,6 +79,9 @@ def log( You can use query parameters `version_id`, or `environment`, to target an existing version of the Flow. Otherwise, the default deployed version will be chosen. + If you create the Flow Log with a `trace_status` of `incomplete`, you should later update it to `complete` + in order to trigger Evaluators. + Parameters ---------- version_id : typing.Optional[str] @@ -202,10 +205,10 @@ def log( output="The patient is likely experiencing a myocardial infarction. Immediate medical attention is required.", trace_status="incomplete", start_time=datetime.datetime.fromisoformat( - "2024-07-08 22:40:35+00:00", + "2024-07-08 21:40:35+00:00", ), end_time=datetime.datetime.fromisoformat( - "2024-07-08 22:40:39+00:00", + "2024-07-08 21:40:39+00:00", ), ) """ @@ -279,6 +282,118 @@ def log( raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) + def update_log( + self, + log_id: str, + *, + trace_status: TraceStatus, + messages: typing.Optional[typing.Sequence[ChatMessageParams]] = OMIT, + output_message: typing.Optional[ChatMessageParams] = OMIT, + inputs: typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] = OMIT, + output: typing.Optional[str] = OMIT, + error: typing.Optional[str] = OMIT, + request_options: typing.Optional[RequestOptions] = None, + ) -> FlowLogResponse: + """ + Update the status, inputs, output of a Flow Log. + + Marking a Flow Log as complete will trigger any monitoring Evaluators to run. + Inputs and output (or error) must be provided in order to mark it as complete. + + The end_time log attribute will be set to match the time the log is marked as complete. + + Parameters + ---------- + log_id : str + Unique identifier of the Flow Log. + + trace_status : TraceStatus + Status of the Trace. When a Trace is marked as `complete`, no more Logs can be added to it. Monitoring Evaluators will only run on completed Traces. + + messages : typing.Optional[typing.Sequence[ChatMessageParams]] + List of chat messages that were used as an input to the Flow. + + output_message : typing.Optional[ChatMessageParams] + The output message returned by this Flow. + + inputs : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] + The inputs passed to the Flow Log. + + output : typing.Optional[str] + The output of the Flow Log. Provide None to unset existing `output` value. Provide either this, `output_message` or `error`. + + error : typing.Optional[str] + The error message of the Flow Log. Provide None to unset existing `error` value. Provide either this, `output_message` or `output`. + + request_options : typing.Optional[RequestOptions] + Request-specific configuration. + + Returns + ------- + FlowLogResponse + Successful Response + + Examples + -------- + from humanloop import Humanloop + + client = Humanloop( + api_key="YOUR_API_KEY", + ) + client.flows.update_log( + log_id="medqa_experiment_0001", + inputs={ + "question": "Patient with a history of diabetes and normal tension presents with chest pain and shortness of breath." + }, + output="The patient is likely experiencing a myocardial infarction. Immediate medical attention is required.", + trace_status="complete", + ) + """ + _response = self._client_wrapper.httpx_client.request( + f"flows/logs/{jsonable_encoder(log_id)}", + method="PATCH", + json={ + "messages": convert_and_respect_annotation_metadata( + object_=messages, annotation=typing.Sequence[ChatMessageParams], direction="write" + ), + "output_message": convert_and_respect_annotation_metadata( + object_=output_message, annotation=ChatMessageParams, direction="write" + ), + "inputs": inputs, + "output": output, + "error": error, + "trace_status": trace_status, + }, + headers={ + "content-type": "application/json", + }, + request_options=request_options, + omit=OMIT, + ) + try: + if 200 <= _response.status_code < 300: + return typing.cast( + FlowLogResponse, + construct_type( + type_=FlowLogResponse, # type: ignore + object_=_response.json(), + ), + ) + if _response.status_code == 422: + raise UnprocessableEntityError( + typing.cast( + HttpValidationError, + construct_type( + type_=HttpValidationError, # type: ignore + object_=_response.json(), + ), + ) + ) + _response_json = _response.json() + except JSONDecodeError: + raise ApiError(status_code=_response.status_code, body=_response.text) + raise ApiError(status_code=_response.status_code, body=_response_json) + def get( self, id: str, @@ -701,104 +816,6 @@ def upsert( raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) - def update_log( - self, - log_id: str, - *, - trace_status: TraceStatus, - inputs: typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] = OMIT, - output: typing.Optional[str] = OMIT, - error: typing.Optional[str] = OMIT, - request_options: typing.Optional[RequestOptions] = None, - ) -> FlowLogResponse: - """ - Update the status, inputs, output of a Flow Log. - - Marking a Flow Log as complete will trigger any monitoring Evaluators to run. - Inputs and output (or error) must be provided in order to mark it as complete. - - The end_time log attribute will be set to match the time the log is marked as complete. - - Parameters - ---------- - log_id : str - Unique identifier of the Flow Log. - - trace_status : TraceStatus - Status of the Trace. When a Trace is marked as `complete`, no more Logs can be added to it. Monitoring Evaluators will only run on completed Traces. - - inputs : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] - The inputs passed to the Flow Log. - - output : typing.Optional[str] - The output of the Flow Log. Provide None to unset existing `output` value. Provide either this or `error`. - - error : typing.Optional[str] - The error message of the Flow Log. Provide None to unset existing `error` value. Provide either this or `output`. - - request_options : typing.Optional[RequestOptions] - Request-specific configuration. - - Returns - ------- - FlowLogResponse - Successful Response - - Examples - -------- - from humanloop import Humanloop - - client = Humanloop( - api_key="YOUR_API_KEY", - ) - client.flows.update_log( - log_id="medqa_experiment_0001", - inputs={ - "question": "Patient with a history of diabetes and normal tension presents with chest pain and shortness of breath." - }, - output="The patient is likely experiencing a myocardial infarction. Immediate medical attention is required.", - trace_status="complete", - ) - """ - _response = self._client_wrapper.httpx_client.request( - f"flows/logs/{jsonable_encoder(log_id)}", - method="PATCH", - json={ - "inputs": inputs, - "output": output, - "error": error, - "trace_status": trace_status, - }, - headers={ - "content-type": "application/json", - }, - request_options=request_options, - omit=OMIT, - ) - try: - if 200 <= _response.status_code < 300: - return typing.cast( - FlowLogResponse, - construct_type( - type_=FlowLogResponse, # type: ignore - object_=_response.json(), - ), - ) - if _response.status_code == 422: - raise UnprocessableEntityError( - typing.cast( - HttpValidationError, - construct_type( - type_=HttpValidationError, # type: ignore - object_=_response.json(), - ), - ) - ) - _response_json = _response.json() - except JSONDecodeError: - raise ApiError(status_code=_response.status_code, body=_response.text) - raise ApiError(status_code=_response.status_code, body=_response_json) - def list_versions( self, id: str, @@ -1323,6 +1340,9 @@ async def log( You can use query parameters `version_id`, or `environment`, to target an existing version of the Flow. Otherwise, the default deployed version will be chosen. + If you create the Flow Log with a `trace_status` of `incomplete`, you should later update it to `complete` + in order to trigger Evaluators. + Parameters ---------- version_id : typing.Optional[str] @@ -1450,10 +1470,10 @@ async def main() -> None: output="The patient is likely experiencing a myocardial infarction. Immediate medical attention is required.", trace_status="incomplete", start_time=datetime.datetime.fromisoformat( - "2024-07-08 22:40:35+00:00", + "2024-07-08 21:40:35+00:00", ), end_time=datetime.datetime.fromisoformat( - "2024-07-08 22:40:39+00:00", + "2024-07-08 21:40:39+00:00", ), ) @@ -1530,6 +1550,126 @@ async def main() -> None: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) + async def update_log( + self, + log_id: str, + *, + trace_status: TraceStatus, + messages: typing.Optional[typing.Sequence[ChatMessageParams]] = OMIT, + output_message: typing.Optional[ChatMessageParams] = OMIT, + inputs: typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] = OMIT, + output: typing.Optional[str] = OMIT, + error: typing.Optional[str] = OMIT, + request_options: typing.Optional[RequestOptions] = None, + ) -> FlowLogResponse: + """ + Update the status, inputs, output of a Flow Log. + + Marking a Flow Log as complete will trigger any monitoring Evaluators to run. + Inputs and output (or error) must be provided in order to mark it as complete. + + The end_time log attribute will be set to match the time the log is marked as complete. + + Parameters + ---------- + log_id : str + Unique identifier of the Flow Log. + + trace_status : TraceStatus + Status of the Trace. When a Trace is marked as `complete`, no more Logs can be added to it. Monitoring Evaluators will only run on completed Traces. + + messages : typing.Optional[typing.Sequence[ChatMessageParams]] + List of chat messages that were used as an input to the Flow. + + output_message : typing.Optional[ChatMessageParams] + The output message returned by this Flow. + + inputs : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] + The inputs passed to the Flow Log. + + output : typing.Optional[str] + The output of the Flow Log. Provide None to unset existing `output` value. Provide either this, `output_message` or `error`. + + error : typing.Optional[str] + The error message of the Flow Log. Provide None to unset existing `error` value. Provide either this, `output_message` or `output`. + + request_options : typing.Optional[RequestOptions] + Request-specific configuration. + + Returns + ------- + FlowLogResponse + Successful Response + + Examples + -------- + import asyncio + + from humanloop import AsyncHumanloop + + client = AsyncHumanloop( + api_key="YOUR_API_KEY", + ) + + + async def main() -> None: + await client.flows.update_log( + log_id="medqa_experiment_0001", + inputs={ + "question": "Patient with a history of diabetes and normal tension presents with chest pain and shortness of breath." + }, + output="The patient is likely experiencing a myocardial infarction. Immediate medical attention is required.", + trace_status="complete", + ) + + + asyncio.run(main()) + """ + _response = await self._client_wrapper.httpx_client.request( + f"flows/logs/{jsonable_encoder(log_id)}", + method="PATCH", + json={ + "messages": convert_and_respect_annotation_metadata( + object_=messages, annotation=typing.Sequence[ChatMessageParams], direction="write" + ), + "output_message": convert_and_respect_annotation_metadata( + object_=output_message, annotation=ChatMessageParams, direction="write" + ), + "inputs": inputs, + "output": output, + "error": error, + "trace_status": trace_status, + }, + headers={ + "content-type": "application/json", + }, + request_options=request_options, + omit=OMIT, + ) + try: + if 200 <= _response.status_code < 300: + return typing.cast( + FlowLogResponse, + construct_type( + type_=FlowLogResponse, # type: ignore + object_=_response.json(), + ), + ) + if _response.status_code == 422: + raise UnprocessableEntityError( + typing.cast( + HttpValidationError, + construct_type( + type_=HttpValidationError, # type: ignore + object_=_response.json(), + ), + ) + ) + _response_json = _response.json() + except JSONDecodeError: + raise ApiError(status_code=_response.status_code, body=_response.text) + raise ApiError(status_code=_response.status_code, body=_response_json) + async def get( self, id: str, @@ -1992,112 +2132,6 @@ async def main() -> None: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) - async def update_log( - self, - log_id: str, - *, - trace_status: TraceStatus, - inputs: typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] = OMIT, - output: typing.Optional[str] = OMIT, - error: typing.Optional[str] = OMIT, - request_options: typing.Optional[RequestOptions] = None, - ) -> FlowLogResponse: - """ - Update the status, inputs, output of a Flow Log. - - Marking a Flow Log as complete will trigger any monitoring Evaluators to run. - Inputs and output (or error) must be provided in order to mark it as complete. - - The end_time log attribute will be set to match the time the log is marked as complete. - - Parameters - ---------- - log_id : str - Unique identifier of the Flow Log. - - trace_status : TraceStatus - Status of the Trace. When a Trace is marked as `complete`, no more Logs can be added to it. Monitoring Evaluators will only run on completed Traces. - - inputs : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] - The inputs passed to the Flow Log. - - output : typing.Optional[str] - The output of the Flow Log. Provide None to unset existing `output` value. Provide either this or `error`. - - error : typing.Optional[str] - The error message of the Flow Log. Provide None to unset existing `error` value. Provide either this or `output`. - - request_options : typing.Optional[RequestOptions] - Request-specific configuration. - - Returns - ------- - FlowLogResponse - Successful Response - - Examples - -------- - import asyncio - - from humanloop import AsyncHumanloop - - client = AsyncHumanloop( - api_key="YOUR_API_KEY", - ) - - - async def main() -> None: - await client.flows.update_log( - log_id="medqa_experiment_0001", - inputs={ - "question": "Patient with a history of diabetes and normal tension presents with chest pain and shortness of breath." - }, - output="The patient is likely experiencing a myocardial infarction. Immediate medical attention is required.", - trace_status="complete", - ) - - - asyncio.run(main()) - """ - _response = await self._client_wrapper.httpx_client.request( - f"flows/logs/{jsonable_encoder(log_id)}", - method="PATCH", - json={ - "inputs": inputs, - "output": output, - "error": error, - "trace_status": trace_status, - }, - headers={ - "content-type": "application/json", - }, - request_options=request_options, - omit=OMIT, - ) - try: - if 200 <= _response.status_code < 300: - return typing.cast( - FlowLogResponse, - construct_type( - type_=FlowLogResponse, # type: ignore - object_=_response.json(), - ), - ) - if _response.status_code == 422: - raise UnprocessableEntityError( - typing.cast( - HttpValidationError, - construct_type( - type_=HttpValidationError, # type: ignore - object_=_response.json(), - ), - ) - ) - _response_json = _response.json() - except JSONDecodeError: - raise ApiError(status_code=_response.status_code, body=_response.text) - raise ApiError(status_code=_response.status_code, body=_response_json) - async def list_versions( self, id: str, From 7cd018c2836edc370ac6eb96d7e7c72918eab526 Mon Sep 17 00:00:00 2001 From: Andrei Bratu Date: Sun, 2 Mar 2025 12:30:02 +0000 Subject: [PATCH 2/7] deduplicate logging --- poetry.lock | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/poetry.lock b/poetry.lock index 3e292bfe..401fb3cd 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3704,4 +3704,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = ">=3.9,<4" -content-hash = "a4ea51a34af494df07d810d923e6856a0418fb138c6357ba0cf8358713440219" +content-hash = "d8e2b0f969008add4ec774eeaf1d5e766c66cd8f9264acb256c265dfddd1cfd7" diff --git a/pyproject.toml b/pyproject.toml index b863571d..5a4fed83 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,7 @@ Repository = 'https://github.com/humanloop/humanloop-python' [tool.poetry.dependencies] python = ">=3.9,<4" -deepdiff = "^8.2.0" +deepdiff = "^6.7.1" httpx = ">=0.21.2" httpx-sse = "0.4.0" mmh3 = "^5.1.0" From d91f0a2abb20642fcf9cff8bcc4e301961b7cf0e Mon Sep 17 00:00:00 2001 From: Andrei Bratu Date: Sun, 2 Mar 2025 19:30:40 +0000 Subject: [PATCH 3/7] Colors on print messages, hanging threads, allow no type in file, cancel run on program abort --- src/humanloop/eval_utils/run.py | 57 ++++++++++++++++++++++----------- 1 file changed, 39 insertions(+), 18 deletions(-) diff --git a/src/humanloop/eval_utils/run.py b/src/humanloop/eval_utils/run.py index 358c072c..f4e548f9 100644 --- a/src/humanloop/eval_utils/run.py +++ b/src/humanloop/eval_utils/run.py @@ -19,7 +19,7 @@ import time import types import typing -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from functools import partial from logging import INFO @@ -204,7 +204,10 @@ def _overload_log( response = self._log(**kwargs) except Exception as e: error_message = str(e).replace("\n", " ") - sys.stderr.write(f"{RED}Failed to log: {error_message[:100]}...{RESET}\n") + if len(error_message) > 100: + sys.stderr.write(f"{RED}Failed to log: {error_message[:100]}...{RESET}\n") + else: + sys.stderr.write(f"{RED}Failed to log: {error_message}{RESET}\n") raise e # Notify the run_eval utility about one Log being created @@ -362,13 +365,25 @@ def upload_callback(log_id: str): end_time=datetime.now(), ) error_message = str(e).replace("\n", " ") - sys.stderr.write( - f"\n{RED}Your {hl_file.type}'s `callable` failed for Datapoint: {dp.id}. Error: {error_message[:100]}...{RESET}\n" - ) + if len(error_message) > 100: + sys.stderr.write( + f"\n{RED}Your {hl_file.type}'s `callable` failed for Datapoint: {dp.id}. Error: {error_message[:100]}...{RESET}\n" + ) + else: + sys.stderr.write( + f"\n{RED}Your {hl_file.type}'s `callable` failed for Datapoint: {dp.id}. Error: {error_message}{RESET}\n" + ) with ThreadPoolExecutor(max_workers=workers) as executor: + futures = [] for datapoint in hl_dataset.datapoints: - executor.submit(_process_datapoint, datapoint) + futures.append(executor.submit(_process_datapoint, datapoint)) + # Program hangs if any uncaught exceptions are not handled here + for future in as_completed(futures): + try: + future.result() + except Exception: + pass stats = _wait_for_evaluation_to_complete( client=client, @@ -901,13 +916,8 @@ def _run_local_evaluators( progress_bar: _SimpleProgressBar, ): """Run local Evaluators on the Log and send the judgments to Humanloop.""" - # If there are no local evaluators, we don't need to do the log lookup. - if len(local_evaluators) == 0: - progress_bar.increment() - return - + # Need to get the full log to pass to the evaluators try: - # Need to get the full log to pass to the evaluators log = client.logs.get(id=log_id) if not isinstance(log, dict): log_dict = log.dict() @@ -921,6 +931,7 @@ def _run_local_evaluators( log_dict = log.dict() else: log_dict = log + time.sleep(2) datapoint_dict = datapoint.dict() if datapoint else None for local_evaluator_tuple in local_evaluators: @@ -953,14 +964,24 @@ def _run_local_evaluators( end_time=datetime.now(), ) error_message = str(e).replace("\n", " ") - sys.stderr.write( - f"{RED}Evaluator {local_evaluator.path} failed with error {error_message[:100]}...{RESET}\n" - ) + if len(error_message) > 100: + sys.stderr.write( + f"{RED}Evaluator {local_evaluator.path} failed with error {error_message[:100]}...{RESET}\n" + ) + else: + sys.stderr.write( + f"{RED}Evaluator {local_evaluator.path} failed with error {error_message}{RESET}\n" + ) except Exception as e: error_message = str(e).replace("\n", " ") - sys.stderr.write( - f"{RED}Failed to run local Evaluators for source datapoint {datapoint.dict()['id'] if datapoint else None}: {error_message[:100]}...{RESET}\n" - ) + if len(error_message) > 100: + sys.stderr.write( + f"{RED}Failed to run local Evaluators for source datapoint {datapoint.dict()['id'] if datapoint else None}: {error_message[:100]}...{RESET}\n" + ) + else: + sys.stderr.write( + f"{RED}Failed to run local Evaluators for source datapoint {datapoint.dict()['id'] if datapoint else None}: {error_message}{RESET}\n" + ) pass finally: progress_bar.increment() From 2fb90860a95e0d3b8ed2cbab7429e82abfb84c69 Mon Sep 17 00:00:00 2001 From: Andrei Bratu Date: Mon, 3 Mar 2025 12:22:35 +0000 Subject: [PATCH 4/7] fix output on @flow error, @flow + logging inside edge case on evals --- poetry.lock | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/poetry.lock b/poetry.lock index 401fb3cd..2f8b7b08 100644 --- a/poetry.lock +++ b/poetry.lock @@ -482,21 +482,21 @@ cron = ["capturer (>=2.4)"] [[package]] name = "deepdiff" -version = "8.2.0" +version = "6.7.1" description = "Deep Difference and Search of any Python object/data. Recreate objects by adding adding deltas to each other." optional = false -python-versions = ">=3.8" +python-versions = ">=3.7" groups = ["main"] files = [ - {file = "deepdiff-8.2.0-py3-none-any.whl", hash = "sha256:5091f2cdfd372b1b9f6bfd8065ba323ae31118dc4e42594371b38c8bea3fd0a4"}, - {file = "deepdiff-8.2.0.tar.gz", hash = "sha256:6ec78f65031485735545ffbe7a61e716c3c2d12ca6416886d5e9291fc76c46c3"}, + {file = "deepdiff-6.7.1-py3-none-any.whl", hash = "sha256:58396bb7a863cbb4ed5193f548c56f18218060362311aa1dc36397b2f25108bd"}, + {file = "deepdiff-6.7.1.tar.gz", hash = "sha256:b367e6fa6caac1c9f500adc79ada1b5b1242c50d5f716a1a4362030197847d30"}, ] [package.dependencies] -orderly-set = ">=5.3.0,<6" +ordered-set = ">=4.0.2,<4.2.0" [package.extras] -cli = ["click (==8.1.8)", "pyyaml (==6.0.2)"] +cli = ["click (==8.1.3)", "pyyaml (==6.0.1)"] optimize = ["orjson"] [[package]] @@ -1876,17 +1876,20 @@ files = [ ] [[package]] -name = "orderly-set" -version = "5.3.0" -description = "Orderly set" +name = "ordered-set" +version = "4.1.0" +description = "An OrderedSet is a custom MutableSet that remembers its order, so that every" optional = false -python-versions = ">=3.8" +python-versions = ">=3.7" groups = ["main"] files = [ - {file = "orderly_set-5.3.0-py3-none-any.whl", hash = "sha256:c2c0bfe604f5d3d9b24e8262a06feb612594f37aa3845650548befd7772945d1"}, - {file = "orderly_set-5.3.0.tar.gz", hash = "sha256:80b3d8fdd3d39004d9aad389eaa0eab02c71f0a0511ba3a6d54a935a6c6a0acc"}, + {file = "ordered-set-4.1.0.tar.gz", hash = "sha256:694a8e44c87657c59292ede72891eb91d34131f6531463aab3009191c77364a8"}, + {file = "ordered_set-4.1.0-py3-none-any.whl", hash = "sha256:046e1132c71fcf3330438a539928932caf51ddbc582496833e23de611de14562"}, ] +[package.extras] +dev = ["black", "mypy", "pytest"] + [[package]] name = "orjson" version = "3.10.15" @@ -3704,4 +3707,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = ">=3.9,<4" -content-hash = "d8e2b0f969008add4ec774eeaf1d5e766c66cd8f9264acb256c265dfddd1cfd7" +content-hash = "5e19a9be29dc9cd9a134a4178edf7d3b99970959a8951acc1759c9bbd9cbbcf5" From 099ae1de5dc14be625c76788b7a51a36af5ca395 Mon Sep 17 00:00:00 2001 From: James Baskerville Date: Mon, 3 Mar 2025 12:24:45 +0000 Subject: [PATCH 5/7] Don't query if no local evaluators; cleanup error printing --- src/humanloop/eval_utils/run.py | 45 ++++++++++++--------------------- 1 file changed, 16 insertions(+), 29 deletions(-) diff --git a/src/humanloop/eval_utils/run.py b/src/humanloop/eval_utils/run.py index f4e548f9..6aa2b05c 100644 --- a/src/humanloop/eval_utils/run.py +++ b/src/humanloop/eval_utils/run.py @@ -204,10 +204,7 @@ def _overload_log( response = self._log(**kwargs) except Exception as e: error_message = str(e).replace("\n", " ") - if len(error_message) > 100: - sys.stderr.write(f"{RED}Failed to log: {error_message[:100]}...{RESET}\n") - else: - sys.stderr.write(f"{RED}Failed to log: {error_message}{RESET}\n") + sys.stderr.write(f"{RED}Failed to log: {error_message[:100]}...{RESET}\n") raise e # Notify the run_eval utility about one Log being created @@ -365,14 +362,9 @@ def upload_callback(log_id: str): end_time=datetime.now(), ) error_message = str(e).replace("\n", " ") - if len(error_message) > 100: - sys.stderr.write( - f"\n{RED}Your {hl_file.type}'s `callable` failed for Datapoint: {dp.id}. Error: {error_message[:100]}...{RESET}\n" - ) - else: - sys.stderr.write( - f"\n{RED}Your {hl_file.type}'s `callable` failed for Datapoint: {dp.id}. Error: {error_message}{RESET}\n" - ) + sys.stderr.write( + f"\n{RED}Your {hl_file.type}'s `callable` failed for Datapoint: {dp.id}. Error: {error_message[:100]}...{RESET}\n" + ) with ThreadPoolExecutor(max_workers=workers) as executor: futures = [] @@ -916,8 +908,13 @@ def _run_local_evaluators( progress_bar: _SimpleProgressBar, ): """Run local Evaluators on the Log and send the judgments to Humanloop.""" - # Need to get the full log to pass to the evaluators + # If there are no local evaluators, we don't need to do the log lookup. + if len(local_evaluators) == 0: + progress_bar.increment() + return + try: + # Need to get the full log to pass to the evaluators log = client.logs.get(id=log_id) if not isinstance(log, dict): log_dict = log.dict() @@ -964,24 +961,14 @@ def _run_local_evaluators( end_time=datetime.now(), ) error_message = str(e).replace("\n", " ") - if len(error_message) > 100: - sys.stderr.write( - f"{RED}Evaluator {local_evaluator.path} failed with error {error_message[:100]}...{RESET}\n" - ) - else: - sys.stderr.write( - f"{RED}Evaluator {local_evaluator.path} failed with error {error_message}{RESET}\n" - ) + sys.stderr.write( + f"{RED}Evaluator {local_evaluator.path} failed with error {error_message[:100]}...{RESET}\n" + ) except Exception as e: error_message = str(e).replace("\n", " ") - if len(error_message) > 100: - sys.stderr.write( - f"{RED}Failed to run local Evaluators for source datapoint {datapoint.dict()['id'] if datapoint else None}: {error_message[:100]}...{RESET}\n" - ) - else: - sys.stderr.write( - f"{RED}Failed to run local Evaluators for source datapoint {datapoint.dict()['id'] if datapoint else None}: {error_message}{RESET}\n" - ) + sys.stderr.write( + f"{RED}Failed to run local Evaluators for source datapoint {datapoint.dict()['id'] if datapoint else None}: {error_message[:100]}...{RESET}\n" + ) pass finally: progress_bar.increment() From c78906d65375f37e74e928077fc6b8a05124a69d Mon Sep 17 00:00:00 2001 From: Andrei Bratu Date: Mon, 3 Mar 2025 13:12:17 +0000 Subject: [PATCH 6/7] PR feedback --- poetry.lock | 29 +- pyproject.toml | 2 +- src/humanloop/eval_utils/run.py | 11 +- src/humanloop/flows/client.py | 454 +++++++++++++++----------------- 4 files changed, 226 insertions(+), 270 deletions(-) diff --git a/poetry.lock b/poetry.lock index 2f8b7b08..3e292bfe 100644 --- a/poetry.lock +++ b/poetry.lock @@ -482,21 +482,21 @@ cron = ["capturer (>=2.4)"] [[package]] name = "deepdiff" -version = "6.7.1" +version = "8.2.0" description = "Deep Difference and Search of any Python object/data. Recreate objects by adding adding deltas to each other." optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" groups = ["main"] files = [ - {file = "deepdiff-6.7.1-py3-none-any.whl", hash = "sha256:58396bb7a863cbb4ed5193f548c56f18218060362311aa1dc36397b2f25108bd"}, - {file = "deepdiff-6.7.1.tar.gz", hash = "sha256:b367e6fa6caac1c9f500adc79ada1b5b1242c50d5f716a1a4362030197847d30"}, + {file = "deepdiff-8.2.0-py3-none-any.whl", hash = "sha256:5091f2cdfd372b1b9f6bfd8065ba323ae31118dc4e42594371b38c8bea3fd0a4"}, + {file = "deepdiff-8.2.0.tar.gz", hash = "sha256:6ec78f65031485735545ffbe7a61e716c3c2d12ca6416886d5e9291fc76c46c3"}, ] [package.dependencies] -ordered-set = ">=4.0.2,<4.2.0" +orderly-set = ">=5.3.0,<6" [package.extras] -cli = ["click (==8.1.3)", "pyyaml (==6.0.1)"] +cli = ["click (==8.1.8)", "pyyaml (==6.0.2)"] optimize = ["orjson"] [[package]] @@ -1876,20 +1876,17 @@ files = [ ] [[package]] -name = "ordered-set" -version = "4.1.0" -description = "An OrderedSet is a custom MutableSet that remembers its order, so that every" +name = "orderly-set" +version = "5.3.0" +description = "Orderly set" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" groups = ["main"] files = [ - {file = "ordered-set-4.1.0.tar.gz", hash = "sha256:694a8e44c87657c59292ede72891eb91d34131f6531463aab3009191c77364a8"}, - {file = "ordered_set-4.1.0-py3-none-any.whl", hash = "sha256:046e1132c71fcf3330438a539928932caf51ddbc582496833e23de611de14562"}, + {file = "orderly_set-5.3.0-py3-none-any.whl", hash = "sha256:c2c0bfe604f5d3d9b24e8262a06feb612594f37aa3845650548befd7772945d1"}, + {file = "orderly_set-5.3.0.tar.gz", hash = "sha256:80b3d8fdd3d39004d9aad389eaa0eab02c71f0a0511ba3a6d54a935a6c6a0acc"}, ] -[package.extras] -dev = ["black", "mypy", "pytest"] - [[package]] name = "orjson" version = "3.10.15" @@ -3707,4 +3704,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = ">=3.9,<4" -content-hash = "5e19a9be29dc9cd9a134a4178edf7d3b99970959a8951acc1759c9bbd9cbbcf5" +content-hash = "a4ea51a34af494df07d810d923e6856a0418fb138c6357ba0cf8358713440219" diff --git a/pyproject.toml b/pyproject.toml index 5a4fed83..b863571d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,7 @@ Repository = 'https://github.com/humanloop/humanloop-python' [tool.poetry.dependencies] python = ">=3.9,<4" -deepdiff = "^6.7.1" +deepdiff = "^8.2.0" httpx = ">=0.21.2" httpx-sse = "0.4.0" mmh3 = "^5.1.0" diff --git a/src/humanloop/eval_utils/run.py b/src/humanloop/eval_utils/run.py index 6aa2b05c..b66b1008 100644 --- a/src/humanloop/eval_utils/run.py +++ b/src/humanloop/eval_utils/run.py @@ -19,7 +19,7 @@ import time import types import typing -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import ThreadPoolExecutor from datetime import datetime from functools import partial from logging import INFO @@ -367,15 +367,8 @@ def upload_callback(log_id: str): ) with ThreadPoolExecutor(max_workers=workers) as executor: - futures = [] for datapoint in hl_dataset.datapoints: - futures.append(executor.submit(_process_datapoint, datapoint)) - # Program hangs if any uncaught exceptions are not handled here - for future in as_completed(futures): - try: - future.result() - except Exception: - pass + executor.submit(_process_datapoint, datapoint) stats = _wait_for_evaluation_to_complete( client=client, diff --git a/src/humanloop/flows/client.py b/src/humanloop/flows/client.py index 6452b27b..45187f6c 100644 --- a/src/humanloop/flows/client.py +++ b/src/humanloop/flows/client.py @@ -14,13 +14,13 @@ from ..types.http_validation_error import HttpValidationError from json.decoder import JSONDecodeError from ..core.api_error import ApiError -from ..types.flow_log_response import FlowLogResponse -from ..core.jsonable_encoder import jsonable_encoder from ..types.flow_response import FlowResponse +from ..core.jsonable_encoder import jsonable_encoder from ..types.project_sort_by import ProjectSortBy from ..types.sort_order import SortOrder from ..core.pagination import SyncPager from ..types.paginated_data_flow_response import PaginatedDataFlowResponse +from ..types.flow_log_response import FlowLogResponse from ..types.version_status import VersionStatus from ..types.list_flows import ListFlows from ..types.file_environment_response import FileEnvironmentResponse @@ -79,9 +79,6 @@ def log( You can use query parameters `version_id`, or `environment`, to target an existing version of the Flow. Otherwise, the default deployed version will be chosen. - If you create the Flow Log with a `trace_status` of `incomplete`, you should later update it to `complete` - in order to trigger Evaluators. - Parameters ---------- version_id : typing.Optional[str] @@ -205,10 +202,10 @@ def log( output="The patient is likely experiencing a myocardial infarction. Immediate medical attention is required.", trace_status="incomplete", start_time=datetime.datetime.fromisoformat( - "2024-07-08 21:40:35+00:00", + "2024-07-08 22:40:35+00:00", ), end_time=datetime.datetime.fromisoformat( - "2024-07-08 21:40:39+00:00", + "2024-07-08 22:40:39+00:00", ), ) """ @@ -282,118 +279,6 @@ def log( raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) - def update_log( - self, - log_id: str, - *, - trace_status: TraceStatus, - messages: typing.Optional[typing.Sequence[ChatMessageParams]] = OMIT, - output_message: typing.Optional[ChatMessageParams] = OMIT, - inputs: typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] = OMIT, - output: typing.Optional[str] = OMIT, - error: typing.Optional[str] = OMIT, - request_options: typing.Optional[RequestOptions] = None, - ) -> FlowLogResponse: - """ - Update the status, inputs, output of a Flow Log. - - Marking a Flow Log as complete will trigger any monitoring Evaluators to run. - Inputs and output (or error) must be provided in order to mark it as complete. - - The end_time log attribute will be set to match the time the log is marked as complete. - - Parameters - ---------- - log_id : str - Unique identifier of the Flow Log. - - trace_status : TraceStatus - Status of the Trace. When a Trace is marked as `complete`, no more Logs can be added to it. Monitoring Evaluators will only run on completed Traces. - - messages : typing.Optional[typing.Sequence[ChatMessageParams]] - List of chat messages that were used as an input to the Flow. - - output_message : typing.Optional[ChatMessageParams] - The output message returned by this Flow. - - inputs : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] - The inputs passed to the Flow Log. - - output : typing.Optional[str] - The output of the Flow Log. Provide None to unset existing `output` value. Provide either this, `output_message` or `error`. - - error : typing.Optional[str] - The error message of the Flow Log. Provide None to unset existing `error` value. Provide either this, `output_message` or `output`. - - request_options : typing.Optional[RequestOptions] - Request-specific configuration. - - Returns - ------- - FlowLogResponse - Successful Response - - Examples - -------- - from humanloop import Humanloop - - client = Humanloop( - api_key="YOUR_API_KEY", - ) - client.flows.update_log( - log_id="medqa_experiment_0001", - inputs={ - "question": "Patient with a history of diabetes and normal tension presents with chest pain and shortness of breath." - }, - output="The patient is likely experiencing a myocardial infarction. Immediate medical attention is required.", - trace_status="complete", - ) - """ - _response = self._client_wrapper.httpx_client.request( - f"flows/logs/{jsonable_encoder(log_id)}", - method="PATCH", - json={ - "messages": convert_and_respect_annotation_metadata( - object_=messages, annotation=typing.Sequence[ChatMessageParams], direction="write" - ), - "output_message": convert_and_respect_annotation_metadata( - object_=output_message, annotation=ChatMessageParams, direction="write" - ), - "inputs": inputs, - "output": output, - "error": error, - "trace_status": trace_status, - }, - headers={ - "content-type": "application/json", - }, - request_options=request_options, - omit=OMIT, - ) - try: - if 200 <= _response.status_code < 300: - return typing.cast( - FlowLogResponse, - construct_type( - type_=FlowLogResponse, # type: ignore - object_=_response.json(), - ), - ) - if _response.status_code == 422: - raise UnprocessableEntityError( - typing.cast( - HttpValidationError, - construct_type( - type_=HttpValidationError, # type: ignore - object_=_response.json(), - ), - ) - ) - _response_json = _response.json() - except JSONDecodeError: - raise ApiError(status_code=_response.status_code, body=_response.text) - raise ApiError(status_code=_response.status_code, body=_response_json) - def get( self, id: str, @@ -816,6 +701,104 @@ def upsert( raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) + def update_log( + self, + log_id: str, + *, + trace_status: TraceStatus, + inputs: typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] = OMIT, + output: typing.Optional[str] = OMIT, + error: typing.Optional[str] = OMIT, + request_options: typing.Optional[RequestOptions] = None, + ) -> FlowLogResponse: + """ + Update the status, inputs, output of a Flow Log. + + Marking a Flow Log as complete will trigger any monitoring Evaluators to run. + Inputs and output (or error) must be provided in order to mark it as complete. + + The end_time log attribute will be set to match the time the log is marked as complete. + + Parameters + ---------- + log_id : str + Unique identifier of the Flow Log. + + trace_status : TraceStatus + Status of the Trace. When a Trace is marked as `complete`, no more Logs can be added to it. Monitoring Evaluators will only run on completed Traces. + + inputs : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] + The inputs passed to the Flow Log. + + output : typing.Optional[str] + The output of the Flow Log. Provide None to unset existing `output` value. Provide either this or `error`. + + error : typing.Optional[str] + The error message of the Flow Log. Provide None to unset existing `error` value. Provide either this or `output`. + + request_options : typing.Optional[RequestOptions] + Request-specific configuration. + + Returns + ------- + FlowLogResponse + Successful Response + + Examples + -------- + from humanloop import Humanloop + + client = Humanloop( + api_key="YOUR_API_KEY", + ) + client.flows.update_log( + log_id="medqa_experiment_0001", + inputs={ + "question": "Patient with a history of diabetes and normal tension presents with chest pain and shortness of breath." + }, + output="The patient is likely experiencing a myocardial infarction. Immediate medical attention is required.", + trace_status="complete", + ) + """ + _response = self._client_wrapper.httpx_client.request( + f"flows/logs/{jsonable_encoder(log_id)}", + method="PATCH", + json={ + "inputs": inputs, + "output": output, + "error": error, + "trace_status": trace_status, + }, + headers={ + "content-type": "application/json", + }, + request_options=request_options, + omit=OMIT, + ) + try: + if 200 <= _response.status_code < 300: + return typing.cast( + FlowLogResponse, + construct_type( + type_=FlowLogResponse, # type: ignore + object_=_response.json(), + ), + ) + if _response.status_code == 422: + raise UnprocessableEntityError( + typing.cast( + HttpValidationError, + construct_type( + type_=HttpValidationError, # type: ignore + object_=_response.json(), + ), + ) + ) + _response_json = _response.json() + except JSONDecodeError: + raise ApiError(status_code=_response.status_code, body=_response.text) + raise ApiError(status_code=_response.status_code, body=_response_json) + def list_versions( self, id: str, @@ -1340,9 +1323,6 @@ async def log( You can use query parameters `version_id`, or `environment`, to target an existing version of the Flow. Otherwise, the default deployed version will be chosen. - If you create the Flow Log with a `trace_status` of `incomplete`, you should later update it to `complete` - in order to trigger Evaluators. - Parameters ---------- version_id : typing.Optional[str] @@ -1470,10 +1450,10 @@ async def main() -> None: output="The patient is likely experiencing a myocardial infarction. Immediate medical attention is required.", trace_status="incomplete", start_time=datetime.datetime.fromisoformat( - "2024-07-08 21:40:35+00:00", + "2024-07-08 22:40:35+00:00", ), end_time=datetime.datetime.fromisoformat( - "2024-07-08 21:40:39+00:00", + "2024-07-08 22:40:39+00:00", ), ) @@ -1550,126 +1530,6 @@ async def main() -> None: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) - async def update_log( - self, - log_id: str, - *, - trace_status: TraceStatus, - messages: typing.Optional[typing.Sequence[ChatMessageParams]] = OMIT, - output_message: typing.Optional[ChatMessageParams] = OMIT, - inputs: typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] = OMIT, - output: typing.Optional[str] = OMIT, - error: typing.Optional[str] = OMIT, - request_options: typing.Optional[RequestOptions] = None, - ) -> FlowLogResponse: - """ - Update the status, inputs, output of a Flow Log. - - Marking a Flow Log as complete will trigger any monitoring Evaluators to run. - Inputs and output (or error) must be provided in order to mark it as complete. - - The end_time log attribute will be set to match the time the log is marked as complete. - - Parameters - ---------- - log_id : str - Unique identifier of the Flow Log. - - trace_status : TraceStatus - Status of the Trace. When a Trace is marked as `complete`, no more Logs can be added to it. Monitoring Evaluators will only run on completed Traces. - - messages : typing.Optional[typing.Sequence[ChatMessageParams]] - List of chat messages that were used as an input to the Flow. - - output_message : typing.Optional[ChatMessageParams] - The output message returned by this Flow. - - inputs : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] - The inputs passed to the Flow Log. - - output : typing.Optional[str] - The output of the Flow Log. Provide None to unset existing `output` value. Provide either this, `output_message` or `error`. - - error : typing.Optional[str] - The error message of the Flow Log. Provide None to unset existing `error` value. Provide either this, `output_message` or `output`. - - request_options : typing.Optional[RequestOptions] - Request-specific configuration. - - Returns - ------- - FlowLogResponse - Successful Response - - Examples - -------- - import asyncio - - from humanloop import AsyncHumanloop - - client = AsyncHumanloop( - api_key="YOUR_API_KEY", - ) - - - async def main() -> None: - await client.flows.update_log( - log_id="medqa_experiment_0001", - inputs={ - "question": "Patient with a history of diabetes and normal tension presents with chest pain and shortness of breath." - }, - output="The patient is likely experiencing a myocardial infarction. Immediate medical attention is required.", - trace_status="complete", - ) - - - asyncio.run(main()) - """ - _response = await self._client_wrapper.httpx_client.request( - f"flows/logs/{jsonable_encoder(log_id)}", - method="PATCH", - json={ - "messages": convert_and_respect_annotation_metadata( - object_=messages, annotation=typing.Sequence[ChatMessageParams], direction="write" - ), - "output_message": convert_and_respect_annotation_metadata( - object_=output_message, annotation=ChatMessageParams, direction="write" - ), - "inputs": inputs, - "output": output, - "error": error, - "trace_status": trace_status, - }, - headers={ - "content-type": "application/json", - }, - request_options=request_options, - omit=OMIT, - ) - try: - if 200 <= _response.status_code < 300: - return typing.cast( - FlowLogResponse, - construct_type( - type_=FlowLogResponse, # type: ignore - object_=_response.json(), - ), - ) - if _response.status_code == 422: - raise UnprocessableEntityError( - typing.cast( - HttpValidationError, - construct_type( - type_=HttpValidationError, # type: ignore - object_=_response.json(), - ), - ) - ) - _response_json = _response.json() - except JSONDecodeError: - raise ApiError(status_code=_response.status_code, body=_response.text) - raise ApiError(status_code=_response.status_code, body=_response_json) - async def get( self, id: str, @@ -2132,6 +1992,112 @@ async def main() -> None: raise ApiError(status_code=_response.status_code, body=_response.text) raise ApiError(status_code=_response.status_code, body=_response_json) + async def update_log( + self, + log_id: str, + *, + trace_status: TraceStatus, + inputs: typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] = OMIT, + output: typing.Optional[str] = OMIT, + error: typing.Optional[str] = OMIT, + request_options: typing.Optional[RequestOptions] = None, + ) -> FlowLogResponse: + """ + Update the status, inputs, output of a Flow Log. + + Marking a Flow Log as complete will trigger any monitoring Evaluators to run. + Inputs and output (or error) must be provided in order to mark it as complete. + + The end_time log attribute will be set to match the time the log is marked as complete. + + Parameters + ---------- + log_id : str + Unique identifier of the Flow Log. + + trace_status : TraceStatus + Status of the Trace. When a Trace is marked as `complete`, no more Logs can be added to it. Monitoring Evaluators will only run on completed Traces. + + inputs : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] + The inputs passed to the Flow Log. + + output : typing.Optional[str] + The output of the Flow Log. Provide None to unset existing `output` value. Provide either this or `error`. + + error : typing.Optional[str] + The error message of the Flow Log. Provide None to unset existing `error` value. Provide either this or `output`. + + request_options : typing.Optional[RequestOptions] + Request-specific configuration. + + Returns + ------- + FlowLogResponse + Successful Response + + Examples + -------- + import asyncio + + from humanloop import AsyncHumanloop + + client = AsyncHumanloop( + api_key="YOUR_API_KEY", + ) + + + async def main() -> None: + await client.flows.update_log( + log_id="medqa_experiment_0001", + inputs={ + "question": "Patient with a history of diabetes and normal tension presents with chest pain and shortness of breath." + }, + output="The patient is likely experiencing a myocardial infarction. Immediate medical attention is required.", + trace_status="complete", + ) + + + asyncio.run(main()) + """ + _response = await self._client_wrapper.httpx_client.request( + f"flows/logs/{jsonable_encoder(log_id)}", + method="PATCH", + json={ + "inputs": inputs, + "output": output, + "error": error, + "trace_status": trace_status, + }, + headers={ + "content-type": "application/json", + }, + request_options=request_options, + omit=OMIT, + ) + try: + if 200 <= _response.status_code < 300: + return typing.cast( + FlowLogResponse, + construct_type( + type_=FlowLogResponse, # type: ignore + object_=_response.json(), + ), + ) + if _response.status_code == 422: + raise UnprocessableEntityError( + typing.cast( + HttpValidationError, + construct_type( + type_=HttpValidationError, # type: ignore + object_=_response.json(), + ), + ) + ) + _response_json = _response.json() + except JSONDecodeError: + raise ApiError(status_code=_response.status_code, body=_response.text) + raise ApiError(status_code=_response.status_code, body=_response_json) + async def list_versions( self, id: str, From 03038eea44e112fe986d14cd356ca805d64a3983 Mon Sep 17 00:00:00 2001 From: Andrei Bratu Date: Mon, 3 Mar 2025 15:33:07 +0000 Subject: [PATCH 7/7] Fix polling with no wait time --- pyproject.toml | 2 +- src/humanloop/eval_utils/run.py | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index b863571d..d6f0ff94 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ name = "humanloop" [tool.poetry] name = "humanloop" -version = "0.8.25" +version = "0.8.26" description = "" readme = "README.md" authors = [] diff --git a/src/humanloop/eval_utils/run.py b/src/humanloop/eval_utils/run.py index b66b1008..60dd7105 100644 --- a/src/humanloop/eval_utils/run.py +++ b/src/humanloop/eval_utils/run.py @@ -308,6 +308,7 @@ def handle_exit_signal(signum, frame): def _process_datapoint(dp: Datapoint): def upload_callback(log_id: str): """Logic ran after the Log has been created.""" + # Need to get the full log to pass to the evaluators evaluators_worker_pool.submit( _run_local_evaluators, client=client, @@ -901,11 +902,6 @@ def _run_local_evaluators( progress_bar: _SimpleProgressBar, ): """Run local Evaluators on the Log and send the judgments to Humanloop.""" - # If there are no local evaluators, we don't need to do the log lookup. - if len(local_evaluators) == 0: - progress_bar.increment() - return - try: # Need to get the full log to pass to the evaluators log = client.logs.get(id=log_id) @@ -915,7 +911,9 @@ def _run_local_evaluators( log_dict = log # Wait for the Flow trace to complete before running evaluators - while file_type == "flow" and log_dict["trace_status"] != "complete": + while True: + if file_type != "flow" or log_dict["trace_status"] == "complete": + break log = client.logs.get(id=log_id) if not isinstance(log, dict): log_dict = log.dict()