Skip to content

tasks

Class info

Classes

Name Children Inherits
PrioritizedTask
llmling_agent.utils.tasks
Task with priority and optional delay.
    TaskManager
    llmling_agent.utils.tasks
    Mixin for managing async tasks.

      🛈 DocStrings

      Task management mixin.

      PrioritizedTask dataclass

      Task with priority and optional delay.

      Source code in src/llmling_agent/utils/tasks.py
      22
      23
      24
      25
      26
      27
      28
      29
      @dataclass(order=True)
      class PrioritizedTask:
          """Task with priority and optional delay."""
      
          priority: int
          execute_at: datetime
          coroutine: Coroutine[Any, Any, Any] = field(compare=False)
          name: str | None = field(default=None, compare=False)
      

      TaskManager

      Mixin for managing async tasks.

      Provides utilities for: - Creating and tracking tasks - Fire-and-forget task execution - Running coroutines in sync context - Cleanup of pending tasks

      Source code in src/llmling_agent/utils/tasks.py
       32
       33
       34
       35
       36
       37
       38
       39
       40
       41
       42
       43
       44
       45
       46
       47
       48
       49
       50
       51
       52
       53
       54
       55
       56
       57
       58
       59
       60
       61
       62
       63
       64
       65
       66
       67
       68
       69
       70
       71
       72
       73
       74
       75
       76
       77
       78
       79
       80
       81
       82
       83
       84
       85
       86
       87
       88
       89
       90
       91
       92
       93
       94
       95
       96
       97
       98
       99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      154
      155
      156
      157
      158
      159
      160
      161
      162
      163
      164
      165
      166
      167
      168
      169
      170
      171
      172
      173
      174
      175
      176
      177
      178
      179
      180
      181
      182
      183
      184
      185
      186
      187
      188
      class TaskManager:
          """Mixin for managing async tasks.
      
          Provides utilities for:
          - Creating and tracking tasks
          - Fire-and-forget task execution
          - Running coroutines in sync context
          - Cleanup of pending tasks
          """
      
          def __init__(self, *args: Any, **kwargs: Any):
              self._pending_tasks: set[asyncio.Task[Any]] = set()
              self._task_queue: list[PrioritizedTask] = []  # heap queue
              self._scheduler_task: asyncio.Task[Any] | None = None
      
          def create_task[T](
              self,
              coro: Coroutine[Any, Any, T],
              *,
              name: str | None = None,
              priority: int = 0,
              delay: timedelta | None = None,
          ) -> asyncio.Task[T]:
              """Create and track a new task with optional priority and delay.
      
              Args:
                  coro: Coroutine to run
                  name: Optional name for the task
                  priority: Priority (lower = higher priority, default 0)
                  delay: Optional delay before execution
              """
              task = asyncio.create_task(coro, name=name)
              logger.debug(
                  "Created task: %s (priority=%d, delay=%s)", task.get_name(), priority, delay
              )
      
              def _done_callback(t: asyncio.Task[Any]):
                  logger.debug("Task completed: %s", t.get_name())
                  self._pending_tasks.discard(t)
                  if t.exception():
                      logger.error("Task failed with error: %s", t.exception())
      
              task.add_done_callback(_done_callback)
              self._pending_tasks.add(task)
      
              if delay is not None:
                  execute_at = get_now() + delay
                  # Store the coroutine instead of the task
                  heapq.heappush(
                      self._task_queue, PrioritizedTask(priority, execute_at, coro, name)
                  )
                  # Start scheduler if not running
                  if not self._scheduler_task:
                      self._scheduler_task = asyncio.create_task(self._run_scheduler())
                  # Cancel the original task since we'll run it later
                  task.cancel()
                  return task
      
              return task
      
          async def _run_scheduler(self):
              """Run scheduled tasks when their time comes."""
              try:
                  while self._task_queue:
                      # Get next task without removing
                      next_task = self._task_queue[0]
                      now = get_now()
      
                      if now >= next_task.execute_at:
                          # Remove and execute
                          heapq.heappop(self._task_queue)
                          # Create new task from stored coroutine
                          new_task = asyncio.create_task(
                              next_task.coroutine,
                              name=next_task.name,
                          )
                          self._pending_tasks.add(new_task)
                          new_task.add_done_callback(self._pending_tasks.discard)
                      else:
                          # Wait until next task is due
                          await asyncio.sleep((next_task.execute_at - now).total_seconds())
      
              except Exception:
                  logger.exception("Task scheduler error")
              finally:
                  self._scheduler_task = None
      
          def fire_and_forget(self, coro: Coroutine[Any, Any, Any]):
              """Run coroutine without waiting for result."""
              try:
                  loop = asyncio.get_running_loop()
                  task = loop.create_task(coro)
                  self._pending_tasks.add(task)
                  task.add_done_callback(self._pending_tasks.discard)
              except RuntimeError:
                  # No running loop - use new loop
                  loop = asyncio.new_event_loop()
                  try:
                      loop.run_until_complete(coro)
                  finally:
                      loop.close()
      
          def run_task_sync[T](self, coro: Coroutine[Any, Any, T]) -> T:
              """Run coroutine synchronously."""
              try:
                  loop = asyncio.get_running_loop()
                  if loop.is_running():
                      # Running loop - use thread pool
                      import concurrent.futures
      
                      msg = "Running coroutine %r in Executor due to active event loop"
                      logger.debug(msg, coro.__name__)
                      with concurrent.futures.ThreadPoolExecutor() as pool:
                          future = pool.submit(lambda: asyncio.run(coro))
                          return future.result()
      
                  # Existing but not running loop - use task tracking
                  task = loop.create_task(coro)
                  self._pending_tasks.add(task)
                  task.add_done_callback(self._pending_tasks.discard)
                  return loop.run_until_complete(task)
              except RuntimeError:
                  # No loop - create new one
                  return asyncio.run(coro)
      
          def run_background(
              self,
              coro: Coroutine[Any, Any, Any],
              name: str | None = None,
              priority: int = 0,
              delay: timedelta | None = None,
          ):
              """Run a coroutine in the background and track it."""
              try:
                  self.create_task(coro, name=name, priority=priority, delay=delay)
      
              except RuntimeError:
                  # No running loop - use fire_and_forget
                  self.fire_and_forget(coro)
      
          def is_busy(self) -> bool:
              """Check if we have any tasks pending."""
              return bool(self._pending_tasks)
      
          async def cleanup_tasks(self):
              """Wait for all pending tasks to complete."""
              if self._pending_tasks:
                  await asyncio.gather(*self._pending_tasks, return_exceptions=True)
              self._pending_tasks.clear()
      
          async def complete_tasks(self, cancel: bool = False):
              """Wait for all pending tasks to complete."""
              if cancel:
                  for task in self._pending_tasks:
                      task.cancel()
              if self._pending_tasks:
                  await asyncio.wait(self._pending_tasks)
      

      cleanup_tasks async

      cleanup_tasks()
      

      Wait for all pending tasks to complete.

      Source code in src/llmling_agent/utils/tasks.py
      176
      177
      178
      179
      180
      async def cleanup_tasks(self):
          """Wait for all pending tasks to complete."""
          if self._pending_tasks:
              await asyncio.gather(*self._pending_tasks, return_exceptions=True)
          self._pending_tasks.clear()
      

      complete_tasks async

      complete_tasks(cancel: bool = False)
      

      Wait for all pending tasks to complete.

      Source code in src/llmling_agent/utils/tasks.py
      182
      183
      184
      185
      186
      187
      188
      async def complete_tasks(self, cancel: bool = False):
          """Wait for all pending tasks to complete."""
          if cancel:
              for task in self._pending_tasks:
                  task.cancel()
          if self._pending_tasks:
              await asyncio.wait(self._pending_tasks)
      

      create_task

      create_task(
          coro: Coroutine[Any, Any, T],
          *,
          name: str | None = None,
          priority: int = 0,
          delay: timedelta | None = None,
      ) -> Task[T]
      

      Create and track a new task with optional priority and delay.

      Parameters:

      Name Type Description Default
      coro Coroutine[Any, Any, T]

      Coroutine to run

      required
      name str | None

      Optional name for the task

      None
      priority int

      Priority (lower = higher priority, default 0)

      0
      delay timedelta | None

      Optional delay before execution

      None
      Source code in src/llmling_agent/utils/tasks.py
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      def create_task[T](
          self,
          coro: Coroutine[Any, Any, T],
          *,
          name: str | None = None,
          priority: int = 0,
          delay: timedelta | None = None,
      ) -> asyncio.Task[T]:
          """Create and track a new task with optional priority and delay.
      
          Args:
              coro: Coroutine to run
              name: Optional name for the task
              priority: Priority (lower = higher priority, default 0)
              delay: Optional delay before execution
          """
          task = asyncio.create_task(coro, name=name)
          logger.debug(
              "Created task: %s (priority=%d, delay=%s)", task.get_name(), priority, delay
          )
      
          def _done_callback(t: asyncio.Task[Any]):
              logger.debug("Task completed: %s", t.get_name())
              self._pending_tasks.discard(t)
              if t.exception():
                  logger.error("Task failed with error: %s", t.exception())
      
          task.add_done_callback(_done_callback)
          self._pending_tasks.add(task)
      
          if delay is not None:
              execute_at = get_now() + delay
              # Store the coroutine instead of the task
              heapq.heappush(
                  self._task_queue, PrioritizedTask(priority, execute_at, coro, name)
              )
              # Start scheduler if not running
              if not self._scheduler_task:
                  self._scheduler_task = asyncio.create_task(self._run_scheduler())
              # Cancel the original task since we'll run it later
              task.cancel()
              return task
      
          return task
      

      fire_and_forget

      fire_and_forget(coro: Coroutine[Any, Any, Any])
      

      Run coroutine without waiting for result.

      Source code in src/llmling_agent/utils/tasks.py
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      def fire_and_forget(self, coro: Coroutine[Any, Any, Any]):
          """Run coroutine without waiting for result."""
          try:
              loop = asyncio.get_running_loop()
              task = loop.create_task(coro)
              self._pending_tasks.add(task)
              task.add_done_callback(self._pending_tasks.discard)
          except RuntimeError:
              # No running loop - use new loop
              loop = asyncio.new_event_loop()
              try:
                  loop.run_until_complete(coro)
              finally:
                  loop.close()
      

      is_busy

      is_busy() -> bool
      

      Check if we have any tasks pending.

      Source code in src/llmling_agent/utils/tasks.py
      172
      173
      174
      def is_busy(self) -> bool:
          """Check if we have any tasks pending."""
          return bool(self._pending_tasks)
      

      run_background

      run_background(
          coro: Coroutine[Any, Any, Any],
          name: str | None = None,
          priority: int = 0,
          delay: timedelta | None = None,
      )
      

      Run a coroutine in the background and track it.

      Source code in src/llmling_agent/utils/tasks.py
      157
      158
      159
      160
      161
      162
      163
      164
      165
      166
      167
      168
      169
      170
      def run_background(
          self,
          coro: Coroutine[Any, Any, Any],
          name: str | None = None,
          priority: int = 0,
          delay: timedelta | None = None,
      ):
          """Run a coroutine in the background and track it."""
          try:
              self.create_task(coro, name=name, priority=priority, delay=delay)
      
          except RuntimeError:
              # No running loop - use fire_and_forget
              self.fire_and_forget(coro)
      

      run_task_sync

      run_task_sync(coro: Coroutine[Any, Any, T]) -> T
      

      Run coroutine synchronously.

      Source code in src/llmling_agent/utils/tasks.py
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      154
      155
      def run_task_sync[T](self, coro: Coroutine[Any, Any, T]) -> T:
          """Run coroutine synchronously."""
          try:
              loop = asyncio.get_running_loop()
              if loop.is_running():
                  # Running loop - use thread pool
                  import concurrent.futures
      
                  msg = "Running coroutine %r in Executor due to active event loop"
                  logger.debug(msg, coro.__name__)
                  with concurrent.futures.ThreadPoolExecutor() as pool:
                      future = pool.submit(lambda: asyncio.run(coro))
                      return future.result()
      
              # Existing but not running loop - use task tracking
              task = loop.create_task(coro)
              self._pending_tasks.add(task)
              task.add_done_callback(self._pending_tasks.discard)
              return loop.run_until_complete(task)
          except RuntimeError:
              # No loop - create new one
              return asyncio.run(coro)