Skip to content

Conversation

@devin-ai-integration
Copy link
Contributor

📥 Pull Request

📘 Description

Fixes #1262 - Resolves TypeError: 'async for' requires an object with __aiter__ method, got coroutine when using Agno v2 async streaming with AgentOps instrumentation.

Root Cause: When Team.arun(stream=True) is called, it returns an async generator directly. The previous wrapper was defined as async def, which caused Python to wrap the returned async generator in a coroutine, breaking async for loops.

Solution:

  1. Added AsyncStreamingResultWrapper class that properly implements the async iteration protocol (__aiter__ and __anext__)
  2. Changed create_team_async_wrapper from async def wrapper to def wrapper - this allows async generators to be returned directly without coroutine wrapping
  3. Added inspect.isasyncgen() checks to detect and properly wrap async generators
  4. Updated create_streaming_agent_async_wrapper to check for __aiter__ before __iter__ for correct async detection

🧪 Testing

Created test script examples/agno/test_async_streaming_team.py that reproduces the original issue:

  • ✅ Async iteration successfully started (no more coroutine error)
  • ✅ Received RunStartedEvent confirming async streaming works
  • ✅ Linting passed: ruff check and ruff format
  • ⚠️ Hit OpenAI quota during full test (unrelated to fix)

🔍 Human Review Checklist

Critical items:

  1. Verify inspect.isasyncgen(result) is the correct check for detecting async generators at runtime
  2. Confirm the synchronous wrapper pattern (non-async function returning async generator) is correct
  3. Review cleanup logic in AsyncStreamingResultWrapper.__anext__ - does span cleanup happen correctly in all cases?
  4. Check if other async wrappers (e.g., create_workflow_async_wrapper) need similar fixes

Additional considerations:
5. Verify context attachment/detachment is correct and doesn't leak contexts
6. Consider if this pattern change should have automated tests added


Session Info:

Fixes #1262

The issue was that Team.arun() with stream=True returns an async generator
directly, not a coroutine. The team async wrapper was defined as 'async def'
which caused it to wrap the async generator in a coroutine, leading to the
error: 'async for' requires an object with __aiter__ method, got coroutine.

Changes:
- Added AsyncStreamingResultWrapper class to properly wrap async generators
- Modified create_team_async_wrapper to be a regular function (not async)
- Added inspect.isasyncgen() check to detect async generators
- Updated create_streaming_agent_async_wrapper to check for __aiter__ first

The fix ensures that async generators are returned directly without being
wrapped in a coroutine, allowing proper async iteration with 'async for'.

Co-Authored-By: Alex <meta.alex.r@gmail.com>
@devin-ai-integration
Copy link
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

Also fixes Agent.arun() async streaming in addition to Team.arun().

Changes:
- Changed create_streaming_agent_async_wrapper from async def to def
- Added inspect.isasyncgen() check for detecting async generators
- For non-streaming case, created inner async_wrapper function
- Both Team and Agent async streaming now work correctly

Co-Authored-By: Alex <meta.alex.r@gmail.com>
@areibman areibman merged commit 89b980f into main Oct 23, 2025
2 of 9 checks passed
@areibman areibman deleted the devin/1761154738-fix-agno-async-streaming-clean branch October 23, 2025 01:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: AgentOps breaks async streaming in Agno framework

2 participants