streams
Class info¶
🛈 DocStrings¶
Stream utilities for merging async iterators.
merge_queue_into_iterator
async
¶
merge_queue_into_iterator(
primary_stream: AsyncIterator[T], secondary_queue: Queue[V]
) -> AsyncIterator[AsyncIterator[T | V]]
Merge a primary async stream with events from a secondary queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
primary_stream
|
AsyncIterator[T]
|
The main async iterator (e.g., provider events) |
required |
secondary_queue
|
Queue[V]
|
Queue containing secondary events (e.g., progress events) |
required |
Yields:
| Type | Description |
|---|---|
AsyncIterator[AsyncIterator[T | V]]
|
Async iterator that yields events from both sources in real-time |
Example
progress_queue: asyncio.Queue[ProgressEvent] = asyncio.Queue()
async with merge_queue_into_iterator(provider_stream, progress_queue) as events:
async for event in events:
print(f"Got event: {event}")
Source code in src/llmling_agent/utils/streams.py
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | |