Skip to content

manager

Class info

Classes

Name Children Inherits
FileStorageConfig
llmling_agent_config.storage
File storage configuration.
    • BaseStorageProviderConfig
    MemoryStorageConfig
    llmling_agent_config.storage
    In-memory storage configuration for testing.
      • BaseStorageProviderConfig
      SQLStorageConfig
      llmling_agent_config.storage
      SQL database storage configuration.
        • BaseStorageProviderConfig
        StorageManager
        llmling_agent.storage.manager
        Manages multiple storage providers.
          TaskManager
          llmling_agent.utils.tasks
          Mixin for managing async tasks.
            TextLogConfig
            llmling_agent_config.storage
            Text log configuration.
              • BaseStorageProviderConfig

              🛈 DocStrings

              Storage manager for handling multiple providers.

              StorageManager

              Manages multiple storage providers.

              Handles: - Provider initialization and cleanup - Message distribution to providers - History loading from capable providers - Global logging filters

              Source code in src/llmling_agent/storage/manager.py
               34
               35
               36
               37
               38
               39
               40
               41
               42
               43
               44
               45
               46
               47
               48
               49
               50
               51
               52
               53
               54
               55
               56
               57
               58
               59
               60
               61
               62
               63
               64
               65
               66
               67
               68
               69
               70
               71
               72
               73
               74
               75
               76
               77
               78
               79
               80
               81
               82
               83
               84
               85
               86
               87
               88
               89
               90
               91
               92
               93
               94
               95
               96
               97
               98
               99
              100
              101
              102
              103
              104
              105
              106
              107
              108
              109
              110
              111
              112
              113
              114
              115
              116
              117
              118
              119
              120
              121
              122
              123
              124
              125
              126
              127
              128
              129
              130
              131
              132
              133
              134
              135
              136
              137
              138
              139
              140
              141
              142
              143
              144
              145
              146
              147
              148
              149
              150
              151
              152
              153
              154
              155
              156
              157
              158
              159
              160
              161
              162
              163
              164
              165
              166
              167
              168
              169
              170
              171
              172
              173
              174
              175
              176
              177
              178
              179
              180
              181
              182
              183
              184
              185
              186
              187
              188
              189
              190
              191
              192
              193
              194
              195
              196
              197
              198
              199
              200
              201
              202
              203
              204
              205
              206
              207
              208
              209
              210
              211
              212
              213
              214
              215
              216
              217
              218
              219
              220
              221
              222
              223
              224
              225
              226
              227
              228
              229
              230
              231
              232
              233
              234
              235
              236
              237
              238
              239
              240
              241
              242
              243
              244
              245
              246
              247
              248
              249
              250
              251
              252
              253
              254
              255
              256
              257
              258
              259
              260
              261
              262
              263
              264
              265
              266
              267
              268
              269
              270
              271
              272
              273
              274
              275
              276
              277
              278
              279
              280
              281
              282
              283
              284
              285
              286
              287
              288
              289
              290
              291
              292
              293
              294
              295
              296
              297
              298
              299
              300
              301
              302
              303
              304
              305
              306
              307
              308
              309
              310
              311
              312
              313
              314
              315
              316
              317
              318
              319
              320
              321
              322
              323
              324
              325
              326
              327
              328
              329
              330
              331
              332
              333
              class StorageManager:
                  """Manages multiple storage providers.
              
                  Handles:
                  - Provider initialization and cleanup
                  - Message distribution to providers
                  - History loading from capable providers
                  - Global logging filters
                  """
              
                  def __init__(self, config: StorageConfig) -> None:
                      """Initialize storage manager.
              
                      Args:
                          config: Storage configuration including providers and filters
                      """
                      self.config = config
                      self.task_manager = TaskManager()
                      self.providers = [self._create_provider(cfg) for cfg in self.config.effective_providers]
              
                  async def __aenter__(self) -> Self:
                      """Initialize all providers."""
                      for provider in self.providers:
                          await provider.__aenter__()
                      return self
              
                  async def __aexit__(
                      self,
                      exc_type: type[BaseException] | None,
                      exc_val: BaseException | None,
                      exc_tb: TracebackType | None,
                  ) -> None:
                      """Clean up all providers."""
                      errors = []
                      for provider in self.providers:
                          try:
                              await provider.__aexit__(exc_type, exc_val, exc_tb)
                          except Exception as e:
                              errors.append(e)
                              logger.exception("Error cleaning up provider", provider=provider)
              
                      await self.task_manager.cleanup_tasks()
              
                      if errors:
                          msg = "Provider cleanup errors"
                          raise ExceptionGroup(msg, errors)
              
                  def cleanup(self) -> None:
                      """Clean up all providers."""
                      for provider in self.providers:
                          try:
                              provider.cleanup()
                          except Exception:
                              logger.exception("Error cleaning up provider", provider=provider)
                      self.providers.clear()
              
                  def _create_provider(self, config: BaseStorageProviderConfig) -> StorageProvider:
                      """Create provider instance from configuration."""
                      # Extract common settings from BaseStorageProviderConfig
                      match self.config.filter_mode:
                          case "and" if self.config.agents and config.agents:
                              logged_agents: set[str] | None = self.config.agents & config.agents
                          case "and":
                              # If either is None, use the other; if both None, use None (log all)
                              if self.config.agents is None and config.agents is None:
                                  logged_agents = None
                              else:
                                  logged_agents = self.config.agents or config.agents or set()
                          case "override":
                              logged_agents = config.agents if config.agents is not None else self.config.agents
              
                      provider_config = config.model_copy(
                          update={
                              "log_messages": config.log_messages and self.config.log_messages,
                              "log_conversations": config.log_conversations and self.config.log_conversations,
                              "log_commands": config.log_commands and self.config.log_commands,
                              "log_context": config.log_context and self.config.log_context,
                              "agents": logged_agents,
                          }
                      )
              
                      match provider_config:
                          case SQLStorageConfig() as config:
                              from llmling_agent_storage.sql_provider import SQLModelProvider
              
                              return SQLModelProvider(provider_config)
                          case FileStorageConfig():
                              from llmling_agent_storage.file_provider import FileProvider
              
                              return FileProvider(provider_config)
                          case TextLogConfig():
                              from llmling_agent_storage.text_log_provider import TextLogProvider
              
                              return TextLogProvider(provider_config)
              
                          case MemoryStorageConfig():
                              from llmling_agent_storage.memory_provider import MemoryStorageProvider
              
                              return MemoryStorageProvider(provider_config)
                          case _:
                              msg = f"Unknown provider type: {provider_config}"
                              raise ValueError(msg)
              
                  def get_history_provider(self, preferred: str | None = None) -> StorageProvider:
                      """Get provider for loading history.
              
                      Args:
                          preferred: Optional preferred provider name
              
                      Returns:
                          First capable provider based on priority:
                          1. Preferred provider if specified and capable
                          2. Default provider if specified and capable
                          3. First capable provider
                          4. Raises error if no capable provider found
                      """
              
                      # Function to find capable provider by name
                      def find_provider(name: str) -> StorageProvider | None:
                          for p in self.providers:
                              if (
                                  not getattr(p, "write_only", False)
                                  and p.can_load_history
                                  and p.__class__.__name__.lower() == name.lower()
                              ):
                                  return p
                          return None
              
                      # Try preferred provider
                      if preferred and (provider := find_provider(preferred)):
                          return provider
              
                      # Try default provider
                      if self.config.default_provider:
                          if provider := find_provider(self.config.default_provider):
                              return provider
                          msg = "Default provider not found or not capable of loading history"
                          logger.warning(msg, provider=self.config.default_provider)
              
                      # Find first capable provider
                      for provider in self.providers:
                          if not getattr(provider, "write_only", False) and provider.can_load_history:
                              return provider
              
                      msg = "No capable provider found for loading history"
                      raise RuntimeError(msg)
              
                  @method_spawner
                  async def filter_messages(
                      self,
                      query: SessionQuery,
                      preferred_provider: str | None = None,
                  ) -> list[ChatMessage[str]]:
                      """Get messages matching query.
              
                      Args:
                          query: Filter criteria
                          preferred_provider: Optional preferred provider to use
                      """
                      provider = self.get_history_provider(preferred_provider)
                      return await provider.filter_messages(query)
              
                  @method_spawner
                  async def log_message(self, message: ChatMessage[Any]) -> None:
                      """Log message to all providers."""
                      if not self.config.log_messages:
                          return
              
                      for provider in self.providers:
                          if provider.should_log_agent(message.name or "no name"):
                              await provider.log_message(
                                  conversation_id=message.conversation_id or "",
                                  message_id=message.message_id,
                                  content=str(message.content),
                                  role=message.role,
                                  name=message.name,
                                  cost_info=message.cost_info,
                                  model=message.model_name,
                                  response_time=message.response_time,
                                  forwarded_from=message.forwarded_from,
                                  provider_name=message.provider_name,
                                  provider_response_id=message.provider_response_id,
                                  messages=serialize_messages(message.messages),
                                  finish_reason=message.finish_reason,
                              )
              
                  @method_spawner
                  async def log_conversation(
                      self,
                      *,
                      conversation_id: str,
                      node_name: str,
                      start_time: datetime | None = None,
                  ) -> None:
                      """Log conversation to all providers."""
                      if not self.config.log_conversations:
                          return
              
                      for provider in self.providers:
                          await provider.log_conversation(
                              conversation_id=conversation_id,
                              node_name=node_name,
                              start_time=start_time,
                          )
              
                  @method_spawner
                  async def log_command(
                      self,
                      *,
                      agent_name: str,
                      session_id: str,
                      command: str,
                      context_type: type | None = None,
                      metadata: dict[str, JsonValue] | None = None,
                  ) -> None:
                      """Log command to all providers."""
                      if not self.config.log_commands:
                          return
              
                      for provider in self.providers:
                          await provider.log_command(
                              agent_name=agent_name,
                              session_id=session_id,
                              command=command,
                              context_type=context_type,
                              metadata=metadata,
                          )
              
                  @method_spawner
                  async def log_context_message(
                      self,
                      *,
                      conversation_id: str,
                      content: str,
                      role: str,
                      name: str | None = None,
                      model: str | None = None,
                  ) -> None:
                      """Log context message to all providers."""
                      for provider in self.providers:
                          await provider.log_context_message(
                              conversation_id=conversation_id,
                              content=content,
                              role=role,
                              name=name,
                              model=model,
                          )
              
                  @method_spawner
                  async def reset(
                      self,
                      *,
                      agent_name: str | None = None,
                      hard: bool = False,
                  ) -> tuple[int, int]:
                      """Reset storage in all providers concurrently."""
              
                      async def reset_provider(provider: StorageProvider) -> tuple[int, int]:
                          try:
                              return await provider.reset(agent_name=agent_name, hard=hard)
                          except Exception:
                              cls_name = provider.__class__.__name__
                              logger.exception("Error resetting provider", provider=cls_name)
                              return (0, 0)
              
                      results = await asyncio.gather(*(reset_provider(provider) for provider in self.providers))
                      # Return the counts from the last provider (maintaining existing behavior)
                      return results[-1] if results else (0, 0)
              
                  @method_spawner
                  async def get_conversation_counts(
                      self,
                      *,
                      agent_name: str | None = None,
                  ) -> tuple[int, int]:
                      """Get counts from primary provider."""
                      provider = self.get_history_provider()
                      return await provider.get_conversation_counts(agent_name=agent_name)
              
                  @method_spawner
                  async def get_commands(
                      self,
                      agent_name: str,
                      session_id: str,
                      *,
                      limit: int | None = None,
                      current_session_only: bool = False,
                      preferred_provider: str | None = None,
                  ) -> list[str]:
                      """Get command history."""
                      if not self.config.log_commands:
                          return []
              
                      provider = self.get_history_provider(preferred_provider)
                      return await provider.get_commands(
                          agent_name=agent_name,
                          session_id=session_id,
                          limit=limit,
                          current_session_only=current_session_only,
                      )
              

              __aenter__ async

              __aenter__() -> Self
              

              Initialize all providers.

              Source code in src/llmling_agent/storage/manager.py
              54
              55
              56
              57
              58
              async def __aenter__(self) -> Self:
                  """Initialize all providers."""
                  for provider in self.providers:
                      await provider.__aenter__()
                  return self
              

              __aexit__ async

              __aexit__(
                  exc_type: type[BaseException] | None,
                  exc_val: BaseException | None,
                  exc_tb: TracebackType | None,
              ) -> None
              

              Clean up all providers.

              Source code in src/llmling_agent/storage/manager.py
              60
              61
              62
              63
              64
              65
              66
              67
              68
              69
              70
              71
              72
              73
              74
              75
              76
              77
              78
              79
              async def __aexit__(
                  self,
                  exc_type: type[BaseException] | None,
                  exc_val: BaseException | None,
                  exc_tb: TracebackType | None,
              ) -> None:
                  """Clean up all providers."""
                  errors = []
                  for provider in self.providers:
                      try:
                          await provider.__aexit__(exc_type, exc_val, exc_tb)
                      except Exception as e:
                          errors.append(e)
                          logger.exception("Error cleaning up provider", provider=provider)
              
                  await self.task_manager.cleanup_tasks()
              
                  if errors:
                      msg = "Provider cleanup errors"
                      raise ExceptionGroup(msg, errors)
              

              __init__

              __init__(config: StorageConfig) -> None
              

              Initialize storage manager.

              Parameters:

              Name Type Description Default
              config StorageConfig

              Storage configuration including providers and filters

              required
              Source code in src/llmling_agent/storage/manager.py
              44
              45
              46
              47
              48
              49
              50
              51
              52
              def __init__(self, config: StorageConfig) -> None:
                  """Initialize storage manager.
              
                  Args:
                      config: Storage configuration including providers and filters
                  """
                  self.config = config
                  self.task_manager = TaskManager()
                  self.providers = [self._create_provider(cfg) for cfg in self.config.effective_providers]
              

              cleanup

              cleanup() -> None
              

              Clean up all providers.

              Source code in src/llmling_agent/storage/manager.py
              81
              82
              83
              84
              85
              86
              87
              88
              def cleanup(self) -> None:
                  """Clean up all providers."""
                  for provider in self.providers:
                      try:
                          provider.cleanup()
                      except Exception:
                          logger.exception("Error cleaning up provider", provider=provider)
                  self.providers.clear()
              

              filter_messages async

              filter_messages(
                  query: SessionQuery, preferred_provider: str | None = None
              ) -> list[ChatMessage[str]]
              

              Get messages matching query.

              Parameters:

              Name Type Description Default
              query SessionQuery

              Filter criteria

              required
              preferred_provider str | None

              Optional preferred provider to use

              None
              Source code in src/llmling_agent/storage/manager.py
              181
              182
              183
              184
              185
              186
              187
              188
              189
              190
              191
              192
              193
              194
              @method_spawner
              async def filter_messages(
                  self,
                  query: SessionQuery,
                  preferred_provider: str | None = None,
              ) -> list[ChatMessage[str]]:
                  """Get messages matching query.
              
                  Args:
                      query: Filter criteria
                      preferred_provider: Optional preferred provider to use
                  """
                  provider = self.get_history_provider(preferred_provider)
                  return await provider.filter_messages(query)
              

              get_commands async

              get_commands(
                  agent_name: str,
                  session_id: str,
                  *,
                  limit: int | None = None,
                  current_session_only: bool = False,
                  preferred_provider: str | None = None
              ) -> list[str]
              

              Get command history.

              Source code in src/llmling_agent/storage/manager.py
              313
              314
              315
              316
              317
              318
              319
              320
              321
              322
              323
              324
              325
              326
              327
              328
              329
              330
              331
              332
              333
              @method_spawner
              async def get_commands(
                  self,
                  agent_name: str,
                  session_id: str,
                  *,
                  limit: int | None = None,
                  current_session_only: bool = False,
                  preferred_provider: str | None = None,
              ) -> list[str]:
                  """Get command history."""
                  if not self.config.log_commands:
                      return []
              
                  provider = self.get_history_provider(preferred_provider)
                  return await provider.get_commands(
                      agent_name=agent_name,
                      session_id=session_id,
                      limit=limit,
                      current_session_only=current_session_only,
                  )
              

              get_conversation_counts async

              get_conversation_counts(*, agent_name: str | None = None) -> tuple[int, int]
              

              Get counts from primary provider.

              Source code in src/llmling_agent/storage/manager.py
              303
              304
              305
              306
              307
              308
              309
              310
              311
              @method_spawner
              async def get_conversation_counts(
                  self,
                  *,
                  agent_name: str | None = None,
              ) -> tuple[int, int]:
                  """Get counts from primary provider."""
                  provider = self.get_history_provider()
                  return await provider.get_conversation_counts(agent_name=agent_name)
              

              get_history_provider

              get_history_provider(preferred: str | None = None) -> StorageProvider
              

              Get provider for loading history.

              Parameters:

              Name Type Description Default
              preferred str | None

              Optional preferred provider name

              None

              Returns:

              Type Description
              StorageProvider

              First capable provider based on priority:

              StorageProvider
              1. Preferred provider if specified and capable
              StorageProvider
              1. Default provider if specified and capable
              StorageProvider
              1. First capable provider
              StorageProvider
              1. Raises error if no capable provider found
              Source code in src/llmling_agent/storage/manager.py
              137
              138
              139
              140
              141
              142
              143
              144
              145
              146
              147
              148
              149
              150
              151
              152
              153
              154
              155
              156
              157
              158
              159
              160
              161
              162
              163
              164
              165
              166
              167
              168
              169
              170
              171
              172
              173
              174
              175
              176
              177
              178
              179
              def get_history_provider(self, preferred: str | None = None) -> StorageProvider:
                  """Get provider for loading history.
              
                  Args:
                      preferred: Optional preferred provider name
              
                  Returns:
                      First capable provider based on priority:
                      1. Preferred provider if specified and capable
                      2. Default provider if specified and capable
                      3. First capable provider
                      4. Raises error if no capable provider found
                  """
              
                  # Function to find capable provider by name
                  def find_provider(name: str) -> StorageProvider | None:
                      for p in self.providers:
                          if (
                              not getattr(p, "write_only", False)
                              and p.can_load_history
                              and p.__class__.__name__.lower() == name.lower()
                          ):
                              return p
                      return None
              
                  # Try preferred provider
                  if preferred and (provider := find_provider(preferred)):
                      return provider
              
                  # Try default provider
                  if self.config.default_provider:
                      if provider := find_provider(self.config.default_provider):
                          return provider
                      msg = "Default provider not found or not capable of loading history"
                      logger.warning(msg, provider=self.config.default_provider)
              
                  # Find first capable provider
                  for provider in self.providers:
                      if not getattr(provider, "write_only", False) and provider.can_load_history:
                          return provider
              
                  msg = "No capable provider found for loading history"
                  raise RuntimeError(msg)
              

              log_command async

              log_command(
                  *,
                  agent_name: str,
                  session_id: str,
                  command: str,
                  context_type: type | None = None,
                  metadata: dict[str, JsonValue] | None = None
              ) -> None
              

              Log command to all providers.

              Source code in src/llmling_agent/storage/manager.py
              239
              240
              241
              242
              243
              244
              245
              246
              247
              248
              249
              250
              251
              252
              253
              254
              255
              256
              257
              258
              259
              260
              @method_spawner
              async def log_command(
                  self,
                  *,
                  agent_name: str,
                  session_id: str,
                  command: str,
                  context_type: type | None = None,
                  metadata: dict[str, JsonValue] | None = None,
              ) -> None:
                  """Log command to all providers."""
                  if not self.config.log_commands:
                      return
              
                  for provider in self.providers:
                      await provider.log_command(
                          agent_name=agent_name,
                          session_id=session_id,
                          command=command,
                          context_type=context_type,
                          metadata=metadata,
                      )
              

              log_context_message async

              log_context_message(
                  *,
                  conversation_id: str,
                  content: str,
                  role: str,
                  name: str | None = None,
                  model: str | None = None
              ) -> None
              

              Log context message to all providers.

              Source code in src/llmling_agent/storage/manager.py
              262
              263
              264
              265
              266
              267
              268
              269
              270
              271
              272
              273
              274
              275
              276
              277
              278
              279
              280
              @method_spawner
              async def log_context_message(
                  self,
                  *,
                  conversation_id: str,
                  content: str,
                  role: str,
                  name: str | None = None,
                  model: str | None = None,
              ) -> None:
                  """Log context message to all providers."""
                  for provider in self.providers:
                      await provider.log_context_message(
                          conversation_id=conversation_id,
                          content=content,
                          role=role,
                          name=name,
                          model=model,
                      )
              

              log_conversation async

              log_conversation(
                  *, conversation_id: str, node_name: str, start_time: datetime | None = None
              ) -> None
              

              Log conversation to all providers.

              Source code in src/llmling_agent/storage/manager.py
              220
              221
              222
              223
              224
              225
              226
              227
              228
              229
              230
              231
              232
              233
              234
              235
              236
              237
              @method_spawner
              async def log_conversation(
                  self,
                  *,
                  conversation_id: str,
                  node_name: str,
                  start_time: datetime | None = None,
              ) -> None:
                  """Log conversation to all providers."""
                  if not self.config.log_conversations:
                      return
              
                  for provider in self.providers:
                      await provider.log_conversation(
                          conversation_id=conversation_id,
                          node_name=node_name,
                          start_time=start_time,
                      )
              

              log_message async

              log_message(message: ChatMessage[Any]) -> None
              

              Log message to all providers.

              Source code in src/llmling_agent/storage/manager.py
              196
              197
              198
              199
              200
              201
              202
              203
              204
              205
              206
              207
              208
              209
              210
              211
              212
              213
              214
              215
              216
              217
              218
              @method_spawner
              async def log_message(self, message: ChatMessage[Any]) -> None:
                  """Log message to all providers."""
                  if not self.config.log_messages:
                      return
              
                  for provider in self.providers:
                      if provider.should_log_agent(message.name or "no name"):
                          await provider.log_message(
                              conversation_id=message.conversation_id or "",
                              message_id=message.message_id,
                              content=str(message.content),
                              role=message.role,
                              name=message.name,
                              cost_info=message.cost_info,
                              model=message.model_name,
                              response_time=message.response_time,
                              forwarded_from=message.forwarded_from,
                              provider_name=message.provider_name,
                              provider_response_id=message.provider_response_id,
                              messages=serialize_messages(message.messages),
                              finish_reason=message.finish_reason,
                          )
              

              reset async

              reset(*, agent_name: str | None = None, hard: bool = False) -> tuple[int, int]
              

              Reset storage in all providers concurrently.

              Source code in src/llmling_agent/storage/manager.py
              282
              283
              284
              285
              286
              287
              288
              289
              290
              291
              292
              293
              294
              295
              296
              297
              298
              299
              300
              301
              @method_spawner
              async def reset(
                  self,
                  *,
                  agent_name: str | None = None,
                  hard: bool = False,
              ) -> tuple[int, int]:
                  """Reset storage in all providers concurrently."""
              
                  async def reset_provider(provider: StorageProvider) -> tuple[int, int]:
                      try:
                          return await provider.reset(agent_name=agent_name, hard=hard)
                      except Exception:
                          cls_name = provider.__class__.__name__
                          logger.exception("Error resetting provider", provider=cls_name)
                          return (0, 0)
              
                  results = await asyncio.gather(*(reset_provider(provider) for provider in self.providers))
                  # Return the counts from the last provider (maintaining existing behavior)
                  return results[-1] if results else (0, 0)