Skip to content

tasks

Class info

Classes

Name Children Inherits
PrioritizedTask
llmling_agent.utils.tasks
Task with priority and optional delay.
    TaskManagerMixin
    llmling_agent.utils.tasks
    Mixin for managing async tasks.

    🛈 DocStrings

    PrioritizedTask dataclass

    Task with priority and optional delay.

    Source code in src/llmling_agent/utils/tasks.py
    20
    21
    22
    23
    24
    25
    26
    27
    @dataclass(order=True)
    class PrioritizedTask:
        """Task with priority and optional delay."""
    
        priority: int
        execute_at: datetime
        coroutine: Coroutine[Any, Any, Any] = field(compare=False)
        name: str | None = field(default=None, compare=False)
    

    TaskManagerMixin

    Mixin for managing async tasks.

    Provides utilities for: - Creating and tracking tasks - Fire-and-forget task execution - Running coroutines in sync context - Cleanup of pending tasks

    Source code in src/llmling_agent/utils/tasks.py
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    class TaskManagerMixin:
        """Mixin for managing async tasks.
    
        Provides utilities for:
        - Creating and tracking tasks
        - Fire-and-forget task execution
        - Running coroutines in sync context
        - Cleanup of pending tasks
        """
    
        def __init__(self, *args: Any, **kwargs: Any):
            super().__init__(*args, **kwargs)
            self._pending_tasks: set[asyncio.Task[Any]] = set()
            self._task_queue: list[PrioritizedTask] = []  # heap queue
            self._scheduler_task: asyncio.Task[Any] | None = None
    
        def create_task(
            self,
            coro: Coroutine[Any, Any, T],
            *,
            name: str | None = None,
            priority: int = 0,
            delay: timedelta | None = None,
        ) -> asyncio.Task[T]:
            """Create and track a new task with optional priority and delay.
    
            Args:
                coro: Coroutine to run
                name: Optional name for the task
                priority: Priority (lower = higher priority, default 0)
                delay: Optional delay before execution
            """
            task = asyncio.create_task(coro, name=name)
            logger.debug(
                "Created task: %s (priority=%d, delay=%s)", task.get_name(), priority, delay
            )
    
            def _done_callback(t: asyncio.Task[Any]):
                logger.debug("Task completed: %s", t.get_name())
                self._pending_tasks.discard(t)
                if t.exception():
                    logger.error("Task failed with error: %s", t.exception())
    
            task.add_done_callback(_done_callback)
            self._pending_tasks.add(task)
    
            if delay is not None:
                execute_at = datetime.now() + delay
                # Store the coroutine instead of the task
                heapq.heappush(
                    self._task_queue, PrioritizedTask(priority, execute_at, coro, name)
                )
                # Start scheduler if not running
                if not self._scheduler_task:
                    self._scheduler_task = asyncio.create_task(self._run_scheduler())
                # Cancel the original task since we'll run it later
                task.cancel()
                return task
    
            return task
    
        async def _run_scheduler(self):
            """Run scheduled tasks when their time comes."""
            try:
                while self._task_queue:
                    # Get next task without removing
                    next_task = self._task_queue[0]
                    now = datetime.now()
    
                    if now >= next_task.execute_at:
                        # Remove and execute
                        heapq.heappop(self._task_queue)
                        # Create new task from stored coroutine
                        new_task = asyncio.create_task(
                            next_task.coroutine,
                            name=next_task.name,
                        )
                        self._pending_tasks.add(new_task)
                        new_task.add_done_callback(self._pending_tasks.discard)
                    else:
                        # Wait until next task is due
                        await asyncio.sleep((next_task.execute_at - now).total_seconds())
    
            except Exception:
                logger.exception("Task scheduler error")
            finally:
                self._scheduler_task = None
    
        def fire_and_forget(self, coro: Coroutine[Any, Any, Any]):
            """Run coroutine without waiting for result."""
            try:
                loop = asyncio.get_running_loop()
                task = loop.create_task(coro)
                self._pending_tasks.add(task)
                task.add_done_callback(self._pending_tasks.discard)
            except RuntimeError:
                # No running loop - use new loop
                loop = asyncio.new_event_loop()
                try:
                    loop.run_until_complete(coro)
                finally:
                    loop.close()
    
        def run_task_sync(self, coro: Coroutine[Any, Any, T]) -> T:
            """Run coroutine synchronously."""
            try:
                loop = asyncio.get_running_loop()
                if loop.is_running():
                    # Running loop - use thread pool
                    import concurrent.futures
    
                    msg = "Running coroutine %r in Executor due to active event loop"
                    logger.debug(msg, coro.__name__)
                    with concurrent.futures.ThreadPoolExecutor() as pool:
                        future = pool.submit(lambda: asyncio.run(coro))
                        return future.result()
    
                # Existing but not running loop - use task tracking
                task = loop.create_task(coro)
                self._pending_tasks.add(task)
                task.add_done_callback(self._pending_tasks.discard)
                return loop.run_until_complete(task)
            except RuntimeError:
                # No loop - create new one
                return asyncio.run(coro)
    
        def run_background(
            self,
            coro: Coroutine[Any, Any, Any],
            name: str | None = None,
            priority: int = 0,
            delay: timedelta | None = None,
        ):
            """Run a coroutine in the background and track it."""
            try:
                self.create_task(coro, name=name, priority=priority, delay=delay)
    
            except RuntimeError:
                # No running loop - use fire_and_forget
                self.fire_and_forget(coro)
    
        def is_busy(self) -> bool:
            """Check if we have any tasks pending."""
            return bool(self._pending_tasks)
    
        async def cleanup_tasks(self):
            """Wait for all pending tasks to complete."""
            if self._pending_tasks:
                await asyncio.gather(*self._pending_tasks, return_exceptions=True)
            self._pending_tasks.clear()
    
        async def complete_tasks(self, cancel: bool = False):
            """Wait for all pending tasks to complete."""
            if cancel:
                for task in self._pending_tasks:
                    task.cancel()
            if self._pending_tasks:
                await asyncio.wait(self._pending_tasks)
    

    _run_scheduler async

    _run_scheduler()
    

    Run scheduled tasks when their time comes.

    Source code in src/llmling_agent/utils/tasks.py
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    async def _run_scheduler(self):
        """Run scheduled tasks when their time comes."""
        try:
            while self._task_queue:
                # Get next task without removing
                next_task = self._task_queue[0]
                now = datetime.now()
    
                if now >= next_task.execute_at:
                    # Remove and execute
                    heapq.heappop(self._task_queue)
                    # Create new task from stored coroutine
                    new_task = asyncio.create_task(
                        next_task.coroutine,
                        name=next_task.name,
                    )
                    self._pending_tasks.add(new_task)
                    new_task.add_done_callback(self._pending_tasks.discard)
                else:
                    # Wait until next task is due
                    await asyncio.sleep((next_task.execute_at - now).total_seconds())
    
        except Exception:
            logger.exception("Task scheduler error")
        finally:
            self._scheduler_task = None
    

    cleanup_tasks async

    cleanup_tasks()
    

    Wait for all pending tasks to complete.

    Source code in src/llmling_agent/utils/tasks.py
    175
    176
    177
    178
    179
    async def cleanup_tasks(self):
        """Wait for all pending tasks to complete."""
        if self._pending_tasks:
            await asyncio.gather(*self._pending_tasks, return_exceptions=True)
        self._pending_tasks.clear()
    

    complete_tasks async

    complete_tasks(cancel: bool = False)
    

    Wait for all pending tasks to complete.

    Source code in src/llmling_agent/utils/tasks.py
    181
    182
    183
    184
    185
    186
    187
    async def complete_tasks(self, cancel: bool = False):
        """Wait for all pending tasks to complete."""
        if cancel:
            for task in self._pending_tasks:
                task.cancel()
        if self._pending_tasks:
            await asyncio.wait(self._pending_tasks)
    

    create_task

    create_task(
        coro: Coroutine[Any, Any, T],
        *,
        name: str | None = None,
        priority: int = 0,
        delay: timedelta | None = None,
    ) -> Task[T]
    

    Create and track a new task with optional priority and delay.

    Parameters:

    Name Type Description Default
    coro Coroutine[Any, Any, T]

    Coroutine to run

    required
    name str | None

    Optional name for the task

    None
    priority int

    Priority (lower = higher priority, default 0)

    0
    delay timedelta | None

    Optional delay before execution

    None
    Source code in src/llmling_agent/utils/tasks.py
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    def create_task(
        self,
        coro: Coroutine[Any, Any, T],
        *,
        name: str | None = None,
        priority: int = 0,
        delay: timedelta | None = None,
    ) -> asyncio.Task[T]:
        """Create and track a new task with optional priority and delay.
    
        Args:
            coro: Coroutine to run
            name: Optional name for the task
            priority: Priority (lower = higher priority, default 0)
            delay: Optional delay before execution
        """
        task = asyncio.create_task(coro, name=name)
        logger.debug(
            "Created task: %s (priority=%d, delay=%s)", task.get_name(), priority, delay
        )
    
        def _done_callback(t: asyncio.Task[Any]):
            logger.debug("Task completed: %s", t.get_name())
            self._pending_tasks.discard(t)
            if t.exception():
                logger.error("Task failed with error: %s", t.exception())
    
        task.add_done_callback(_done_callback)
        self._pending_tasks.add(task)
    
        if delay is not None:
            execute_at = datetime.now() + delay
            # Store the coroutine instead of the task
            heapq.heappush(
                self._task_queue, PrioritizedTask(priority, execute_at, coro, name)
            )
            # Start scheduler if not running
            if not self._scheduler_task:
                self._scheduler_task = asyncio.create_task(self._run_scheduler())
            # Cancel the original task since we'll run it later
            task.cancel()
            return task
    
        return task
    

    fire_and_forget

    fire_and_forget(coro: Coroutine[Any, Any, Any])
    

    Run coroutine without waiting for result.

    Source code in src/llmling_agent/utils/tasks.py
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    def fire_and_forget(self, coro: Coroutine[Any, Any, Any]):
        """Run coroutine without waiting for result."""
        try:
            loop = asyncio.get_running_loop()
            task = loop.create_task(coro)
            self._pending_tasks.add(task)
            task.add_done_callback(self._pending_tasks.discard)
        except RuntimeError:
            # No running loop - use new loop
            loop = asyncio.new_event_loop()
            try:
                loop.run_until_complete(coro)
            finally:
                loop.close()
    

    is_busy

    is_busy() -> bool
    

    Check if we have any tasks pending.

    Source code in src/llmling_agent/utils/tasks.py
    171
    172
    173
    def is_busy(self) -> bool:
        """Check if we have any tasks pending."""
        return bool(self._pending_tasks)
    

    run_background

    run_background(
        coro: Coroutine[Any, Any, Any],
        name: str | None = None,
        priority: int = 0,
        delay: timedelta | None = None,
    )
    

    Run a coroutine in the background and track it.

    Source code in src/llmling_agent/utils/tasks.py
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    def run_background(
        self,
        coro: Coroutine[Any, Any, Any],
        name: str | None = None,
        priority: int = 0,
        delay: timedelta | None = None,
    ):
        """Run a coroutine in the background and track it."""
        try:
            self.create_task(coro, name=name, priority=priority, delay=delay)
    
        except RuntimeError:
            # No running loop - use fire_and_forget
            self.fire_and_forget(coro)
    

    run_task_sync

    run_task_sync(coro: Coroutine[Any, Any, T]) -> T
    

    Run coroutine synchronously.

    Source code in src/llmling_agent/utils/tasks.py
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    def run_task_sync(self, coro: Coroutine[Any, Any, T]) -> T:
        """Run coroutine synchronously."""
        try:
            loop = asyncio.get_running_loop()
            if loop.is_running():
                # Running loop - use thread pool
                import concurrent.futures
    
                msg = "Running coroutine %r in Executor due to active event loop"
                logger.debug(msg, coro.__name__)
                with concurrent.futures.ThreadPoolExecutor() as pool:
                    future = pool.submit(lambda: asyncio.run(coro))
                    return future.result()
    
            # Existing but not running loop - use task tracking
            task = loop.create_task(coro)
            self._pending_tasks.add(task)
            task.add_done_callback(self._pending_tasks.discard)
            return loop.run_until_complete(task)
        except RuntimeError:
            # No loop - create new one
            return asyncio.run(coro)