Skip to content

streams

Class info

🛈 DocStrings

Stream utilities for merging async iterators.

merge_queue_into_iterator async

merge_queue_into_iterator(
    primary_stream: AsyncIterator[T], secondary_queue: Queue[V]
) -> AsyncIterator[AsyncIterator[T | V]]

Merge a primary async stream with events from a secondary queue.

Parameters:

Name Type Description Default
primary_stream AsyncIterator[T]

The main async iterator (e.g., provider events)

required
secondary_queue Queue[V]

Queue containing secondary events (e.g., progress events)

required

Yields:

Type Description
AsyncIterator[AsyncIterator[T | V]]

Async iterator that yields events from both sources in real-time

Example
progress_queue: asyncio.Queue[ProgressEvent] = asyncio.Queue()

async with merge_queue_into_iterator(provider_stream, progress_queue) as events:
    async for event in events:
        print(f"Got event: {event}")
Source code in src/llmling_agent/utils/streams.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@asynccontextmanager
async def merge_queue_into_iterator[T, V](
    primary_stream: AsyncIterator[T],
    secondary_queue: asyncio.Queue[V],
) -> AsyncIterator[AsyncIterator[T | V]]:
    """Merge a primary async stream with events from a secondary queue.

    Args:
        primary_stream: The main async iterator (e.g., provider events)
        secondary_queue: Queue containing secondary events (e.g., progress events)

    Yields:
        Async iterator that yields events from both sources in real-time

    Example:
        ```python
        progress_queue: asyncio.Queue[ProgressEvent] = asyncio.Queue()

        async with merge_queue_into_iterator(provider_stream, progress_queue) as events:
            async for event in events:
                print(f"Got event: {event}")
        ```
    """
    # Create a queue for all merged events
    event_queue: asyncio.Queue[V | T | None] = asyncio.Queue()
    primary_done = False

    # Task to read from primary stream and put into merged queue
    async def primary_task() -> None:
        nonlocal primary_done
        try:
            async for event in primary_stream:
                await event_queue.put(event)
        finally:
            primary_done = True
            # Signal end of primary stream
            await event_queue.put(None)

    # Task to read from secondary queue and put into merged queue
    async def secondary_task() -> None:
        try:
            while not primary_done:
                try:
                    secondary_event = await asyncio.wait_for(secondary_queue.get(), timeout=0.1)
                    await event_queue.put(secondary_event)
                except TimeoutError:
                    continue
        except asyncio.CancelledError:
            pass

    # Start both tasks
    primary_task_obj = asyncio.create_task(primary_task())
    secondary_task_obj = asyncio.create_task(secondary_task())

    try:
        # Create async iterator that drains the merged queue
        async def merged_events() -> AsyncIterator[V | T]:
            while True:
                event = await event_queue.get()
                if event is None:  # End of primary stream
                    break
                yield event

        yield merged_events()

    finally:
        # Clean up tasks
        secondary_task_obj.cancel()
        await asyncio.gather(primary_task_obj, secondary_task_obj, return_exceptions=True)