Fix async streaming support for Agno Team.arun() - Resolves #1262 #1264
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
📥 Pull Request
📘 Description
Fixes #1262 - Resolves TypeError when using
async forwith Agno Team.arun() streaming.Root Cause: When
Team.arun(stream=True)is called, it returns an async generator directly (not a coroutine). The team async wrapper was defined asasync def, which wrapped the returned async generator in a coroutine, causing the error:'async for' requires an object with __aiter__ method, got coroutine.Solution:
AsyncStreamingResultWrapperclass that properly implements__aiter__and__anext__for async generator wrappingcreate_team_async_wrapperfromasync defto regulardefto prevent wrapping async generators in coroutinesinspect.isasyncgen()checks to detect and handle async generators correctlycreate_streaming_agent_async_wrapperto check for__aiter__before__iter__Changes (Core Fix):
agentops/instrumentation/agentic/agno/instrumentor.py: Added AsyncStreamingResultWrapper class and modified team/agent async wrappers to properly handle async generators🧪 Testing
Created test script
test_async_streaming_team.pythat reproduces the issue:RunStartedEventshowing async streaming is workingruff checkandruff formatboth passEvidence of Fix:
RuntimeWarning: coroutine 'create_team_async_wrapper.<locals>.wrapper' was never awaitedTypeError: 'async for' requires an object with __aiter__ methodThis PR contains many unrelated files that should NOT be included:
.envrcAction Required: These files should be excluded from this PR and submitted separately if needed.
🔍 Human Review Checklist
High Priority:
AsyncStreamingResultWrapper.__anext__correctly maintains span contextcreate_team_async_wrapperfromasync deftodefis the correct solutioninspect.isasyncgen()is the right check for detecting async generatorsMedium Priority:
5. Verify the fix works for both Team.arun() and Agent.arun() streaming
6. Confirm span cleanup happens correctly in StopAsyncIteration case
7. Check if similar issues exist in other async wrappers (workflow, team_internal)
Session Info: