Skip to content

storage

Class info

Classes

Name Children Inherits
StorageManager
llmling_agent.storage.manager
Manages multiple storage providers.

    🛈 DocStrings

    Storage package.

    StorageManager

    Manages multiple storage providers.

    Handles: - Provider initialization and cleanup - Message distribution to providers - History loading from capable providers - Global logging filters

    Source code in src/llmling_agent/storage/manager.py
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    282
    283
    284
    285
    286
    287
    288
    289
    290
    291
    292
    293
    294
    295
    296
    297
    298
    299
    300
    301
    302
    303
    304
    305
    306
    307
    308
    309
    310
    311
    312
    313
    314
    315
    316
    317
    318
    319
    320
    321
    322
    323
    324
    325
    326
    327
    328
    329
    330
    331
    332
    333
    class StorageManager:
        """Manages multiple storage providers.
    
        Handles:
        - Provider initialization and cleanup
        - Message distribution to providers
        - History loading from capable providers
        - Global logging filters
        """
    
        def __init__(self, config: StorageConfig) -> None:
            """Initialize storage manager.
    
            Args:
                config: Storage configuration including providers and filters
            """
            self.config = config
            self.task_manager = TaskManager()
            self.providers = [self._create_provider(cfg) for cfg in self.config.effective_providers]
    
        async def __aenter__(self) -> Self:
            """Initialize all providers."""
            for provider in self.providers:
                await provider.__aenter__()
            return self
    
        async def __aexit__(
            self,
            exc_type: type[BaseException] | None,
            exc_val: BaseException | None,
            exc_tb: TracebackType | None,
        ) -> None:
            """Clean up all providers."""
            errors = []
            for provider in self.providers:
                try:
                    await provider.__aexit__(exc_type, exc_val, exc_tb)
                except Exception as e:
                    errors.append(e)
                    logger.exception("Error cleaning up provider", provider=provider)
    
            await self.task_manager.cleanup_tasks()
    
            if errors:
                msg = "Provider cleanup errors"
                raise ExceptionGroup(msg, errors)
    
        def cleanup(self) -> None:
            """Clean up all providers."""
            for provider in self.providers:
                try:
                    provider.cleanup()
                except Exception:
                    logger.exception("Error cleaning up provider", provider=provider)
            self.providers.clear()
    
        def _create_provider(self, config: BaseStorageProviderConfig) -> StorageProvider:
            """Create provider instance from configuration."""
            # Extract common settings from BaseStorageProviderConfig
            match self.config.filter_mode:
                case "and" if self.config.agents and config.agents:
                    logged_agents: set[str] | None = self.config.agents & config.agents
                case "and":
                    # If either is None, use the other; if both None, use None (log all)
                    if self.config.agents is None and config.agents is None:
                        logged_agents = None
                    else:
                        logged_agents = self.config.agents or config.agents or set()
                case "override":
                    logged_agents = config.agents if config.agents is not None else self.config.agents
    
            provider_config = config.model_copy(
                update={
                    "log_messages": config.log_messages and self.config.log_messages,
                    "log_conversations": config.log_conversations and self.config.log_conversations,
                    "log_commands": config.log_commands and self.config.log_commands,
                    "log_context": config.log_context and self.config.log_context,
                    "agents": logged_agents,
                }
            )
    
            match provider_config:
                case SQLStorageConfig() as config:
                    from llmling_agent_storage.sql_provider import SQLModelProvider
    
                    return SQLModelProvider(provider_config)
                case FileStorageConfig():
                    from llmling_agent_storage.file_provider import FileProvider
    
                    return FileProvider(provider_config)
                case TextLogConfig():
                    from llmling_agent_storage.text_log_provider import TextLogProvider
    
                    return TextLogProvider(provider_config)
    
                case MemoryStorageConfig():
                    from llmling_agent_storage.memory_provider import MemoryStorageProvider
    
                    return MemoryStorageProvider(provider_config)
                case _:
                    msg = f"Unknown provider type: {provider_config}"
                    raise ValueError(msg)
    
        def get_history_provider(self, preferred: str | None = None) -> StorageProvider:
            """Get provider for loading history.
    
            Args:
                preferred: Optional preferred provider name
    
            Returns:
                First capable provider based on priority:
                1. Preferred provider if specified and capable
                2. Default provider if specified and capable
                3. First capable provider
                4. Raises error if no capable provider found
            """
    
            # Function to find capable provider by name
            def find_provider(name: str) -> StorageProvider | None:
                for p in self.providers:
                    if (
                        not getattr(p, "write_only", False)
                        and p.can_load_history
                        and p.__class__.__name__.lower() == name.lower()
                    ):
                        return p
                return None
    
            # Try preferred provider
            if preferred and (provider := find_provider(preferred)):
                return provider
    
            # Try default provider
            if self.config.default_provider:
                if provider := find_provider(self.config.default_provider):
                    return provider
                msg = "Default provider not found or not capable of loading history"
                logger.warning(msg, provider=self.config.default_provider)
    
            # Find first capable provider
            for provider in self.providers:
                if not getattr(provider, "write_only", False) and provider.can_load_history:
                    return provider
    
            msg = "No capable provider found for loading history"
            raise RuntimeError(msg)
    
        @method_spawner
        async def filter_messages(
            self,
            query: SessionQuery,
            preferred_provider: str | None = None,
        ) -> list[ChatMessage[str]]:
            """Get messages matching query.
    
            Args:
                query: Filter criteria
                preferred_provider: Optional preferred provider to use
            """
            provider = self.get_history_provider(preferred_provider)
            return await provider.filter_messages(query)
    
        @method_spawner
        async def log_message(self, message: ChatMessage[Any]) -> None:
            """Log message to all providers."""
            if not self.config.log_messages:
                return
    
            for provider in self.providers:
                if provider.should_log_agent(message.name or "no name"):
                    await provider.log_message(
                        conversation_id=message.conversation_id or "",
                        message_id=message.message_id,
                        content=str(message.content),
                        role=message.role,
                        name=message.name,
                        cost_info=message.cost_info,
                        model=message.model_name,
                        response_time=message.response_time,
                        forwarded_from=message.forwarded_from,
                        provider_name=message.provider_name,
                        provider_response_id=message.provider_response_id,
                        messages=serialize_messages(message.messages),
                        finish_reason=message.finish_reason,
                    )
    
        @method_spawner
        async def log_conversation(
            self,
            *,
            conversation_id: str,
            node_name: str,
            start_time: datetime | None = None,
        ) -> None:
            """Log conversation to all providers."""
            if not self.config.log_conversations:
                return
    
            for provider in self.providers:
                await provider.log_conversation(
                    conversation_id=conversation_id,
                    node_name=node_name,
                    start_time=start_time,
                )
    
        @method_spawner
        async def log_command(
            self,
            *,
            agent_name: str,
            session_id: str,
            command: str,
            context_type: type | None = None,
            metadata: dict[str, JsonValue] | None = None,
        ) -> None:
            """Log command to all providers."""
            if not self.config.log_commands:
                return
    
            for provider in self.providers:
                await provider.log_command(
                    agent_name=agent_name,
                    session_id=session_id,
                    command=command,
                    context_type=context_type,
                    metadata=metadata,
                )
    
        @method_spawner
        async def log_context_message(
            self,
            *,
            conversation_id: str,
            content: str,
            role: str,
            name: str | None = None,
            model: str | None = None,
        ) -> None:
            """Log context message to all providers."""
            for provider in self.providers:
                await provider.log_context_message(
                    conversation_id=conversation_id,
                    content=content,
                    role=role,
                    name=name,
                    model=model,
                )
    
        @method_spawner
        async def reset(
            self,
            *,
            agent_name: str | None = None,
            hard: bool = False,
        ) -> tuple[int, int]:
            """Reset storage in all providers concurrently."""
    
            async def reset_provider(provider: StorageProvider) -> tuple[int, int]:
                try:
                    return await provider.reset(agent_name=agent_name, hard=hard)
                except Exception:
                    cls_name = provider.__class__.__name__
                    logger.exception("Error resetting provider", provider=cls_name)
                    return (0, 0)
    
            results = await asyncio.gather(*(reset_provider(provider) for provider in self.providers))
            # Return the counts from the last provider (maintaining existing behavior)
            return results[-1] if results else (0, 0)
    
        @method_spawner
        async def get_conversation_counts(
            self,
            *,
            agent_name: str | None = None,
        ) -> tuple[int, int]:
            """Get counts from primary provider."""
            provider = self.get_history_provider()
            return await provider.get_conversation_counts(agent_name=agent_name)
    
        @method_spawner
        async def get_commands(
            self,
            agent_name: str,
            session_id: str,
            *,
            limit: int | None = None,
            current_session_only: bool = False,
            preferred_provider: str | None = None,
        ) -> list[str]:
            """Get command history."""
            if not self.config.log_commands:
                return []
    
            provider = self.get_history_provider(preferred_provider)
            return await provider.get_commands(
                agent_name=agent_name,
                session_id=session_id,
                limit=limit,
                current_session_only=current_session_only,
            )
    

    __aenter__ async

    __aenter__() -> Self
    

    Initialize all providers.

    Source code in src/llmling_agent/storage/manager.py
    54
    55
    56
    57
    58
    async def __aenter__(self) -> Self:
        """Initialize all providers."""
        for provider in self.providers:
            await provider.__aenter__()
        return self
    

    __aexit__ async

    __aexit__(
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None
    

    Clean up all providers.

    Source code in src/llmling_agent/storage/manager.py
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Clean up all providers."""
        errors = []
        for provider in self.providers:
            try:
                await provider.__aexit__(exc_type, exc_val, exc_tb)
            except Exception as e:
                errors.append(e)
                logger.exception("Error cleaning up provider", provider=provider)
    
        await self.task_manager.cleanup_tasks()
    
        if errors:
            msg = "Provider cleanup errors"
            raise ExceptionGroup(msg, errors)
    

    __init__

    __init__(config: StorageConfig) -> None
    

    Initialize storage manager.

    Parameters:

    Name Type Description Default
    config StorageConfig

    Storage configuration including providers and filters

    required
    Source code in src/llmling_agent/storage/manager.py
    44
    45
    46
    47
    48
    49
    50
    51
    52
    def __init__(self, config: StorageConfig) -> None:
        """Initialize storage manager.
    
        Args:
            config: Storage configuration including providers and filters
        """
        self.config = config
        self.task_manager = TaskManager()
        self.providers = [self._create_provider(cfg) for cfg in self.config.effective_providers]
    

    cleanup

    cleanup() -> None
    

    Clean up all providers.

    Source code in src/llmling_agent/storage/manager.py
    81
    82
    83
    84
    85
    86
    87
    88
    def cleanup(self) -> None:
        """Clean up all providers."""
        for provider in self.providers:
            try:
                provider.cleanup()
            except Exception:
                logger.exception("Error cleaning up provider", provider=provider)
        self.providers.clear()
    

    filter_messages async

    filter_messages(
        query: SessionQuery, preferred_provider: str | None = None
    ) -> list[ChatMessage[str]]
    

    Get messages matching query.

    Parameters:

    Name Type Description Default
    query SessionQuery

    Filter criteria

    required
    preferred_provider str | None

    Optional preferred provider to use

    None
    Source code in src/llmling_agent/storage/manager.py
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    @method_spawner
    async def filter_messages(
        self,
        query: SessionQuery,
        preferred_provider: str | None = None,
    ) -> list[ChatMessage[str]]:
        """Get messages matching query.
    
        Args:
            query: Filter criteria
            preferred_provider: Optional preferred provider to use
        """
        provider = self.get_history_provider(preferred_provider)
        return await provider.filter_messages(query)
    

    get_commands async

    get_commands(
        agent_name: str,
        session_id: str,
        *,
        limit: int | None = None,
        current_session_only: bool = False,
        preferred_provider: str | None = None
    ) -> list[str]
    

    Get command history.

    Source code in src/llmling_agent/storage/manager.py
    313
    314
    315
    316
    317
    318
    319
    320
    321
    322
    323
    324
    325
    326
    327
    328
    329
    330
    331
    332
    333
    @method_spawner
    async def get_commands(
        self,
        agent_name: str,
        session_id: str,
        *,
        limit: int | None = None,
        current_session_only: bool = False,
        preferred_provider: str | None = None,
    ) -> list[str]:
        """Get command history."""
        if not self.config.log_commands:
            return []
    
        provider = self.get_history_provider(preferred_provider)
        return await provider.get_commands(
            agent_name=agent_name,
            session_id=session_id,
            limit=limit,
            current_session_only=current_session_only,
        )
    

    get_conversation_counts async

    get_conversation_counts(*, agent_name: str | None = None) -> tuple[int, int]
    

    Get counts from primary provider.

    Source code in src/llmling_agent/storage/manager.py
    303
    304
    305
    306
    307
    308
    309
    310
    311
    @method_spawner
    async def get_conversation_counts(
        self,
        *,
        agent_name: str | None = None,
    ) -> tuple[int, int]:
        """Get counts from primary provider."""
        provider = self.get_history_provider()
        return await provider.get_conversation_counts(agent_name=agent_name)
    

    get_history_provider

    get_history_provider(preferred: str | None = None) -> StorageProvider
    

    Get provider for loading history.

    Parameters:

    Name Type Description Default
    preferred str | None

    Optional preferred provider name

    None

    Returns:

    Type Description
    StorageProvider

    First capable provider based on priority:

    StorageProvider
    1. Preferred provider if specified and capable
    StorageProvider
    1. Default provider if specified and capable
    StorageProvider
    1. First capable provider
    StorageProvider
    1. Raises error if no capable provider found
    Source code in src/llmling_agent/storage/manager.py
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    def get_history_provider(self, preferred: str | None = None) -> StorageProvider:
        """Get provider for loading history.
    
        Args:
            preferred: Optional preferred provider name
    
        Returns:
            First capable provider based on priority:
            1. Preferred provider if specified and capable
            2. Default provider if specified and capable
            3. First capable provider
            4. Raises error if no capable provider found
        """
    
        # Function to find capable provider by name
        def find_provider(name: str) -> StorageProvider | None:
            for p in self.providers:
                if (
                    not getattr(p, "write_only", False)
                    and p.can_load_history
                    and p.__class__.__name__.lower() == name.lower()
                ):
                    return p
            return None
    
        # Try preferred provider
        if preferred and (provider := find_provider(preferred)):
            return provider
    
        # Try default provider
        if self.config.default_provider:
            if provider := find_provider(self.config.default_provider):
                return provider
            msg = "Default provider not found or not capable of loading history"
            logger.warning(msg, provider=self.config.default_provider)
    
        # Find first capable provider
        for provider in self.providers:
            if not getattr(provider, "write_only", False) and provider.can_load_history:
                return provider
    
        msg = "No capable provider found for loading history"
        raise RuntimeError(msg)
    

    log_command async

    log_command(
        *,
        agent_name: str,
        session_id: str,
        command: str,
        context_type: type | None = None,
        metadata: dict[str, JsonValue] | None = None
    ) -> None
    

    Log command to all providers.

    Source code in src/llmling_agent/storage/manager.py
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    @method_spawner
    async def log_command(
        self,
        *,
        agent_name: str,
        session_id: str,
        command: str,
        context_type: type | None = None,
        metadata: dict[str, JsonValue] | None = None,
    ) -> None:
        """Log command to all providers."""
        if not self.config.log_commands:
            return
    
        for provider in self.providers:
            await provider.log_command(
                agent_name=agent_name,
                session_id=session_id,
                command=command,
                context_type=context_type,
                metadata=metadata,
            )
    

    log_context_message async

    log_context_message(
        *,
        conversation_id: str,
        content: str,
        role: str,
        name: str | None = None,
        model: str | None = None
    ) -> None
    

    Log context message to all providers.

    Source code in src/llmling_agent/storage/manager.py
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    @method_spawner
    async def log_context_message(
        self,
        *,
        conversation_id: str,
        content: str,
        role: str,
        name: str | None = None,
        model: str | None = None,
    ) -> None:
        """Log context message to all providers."""
        for provider in self.providers:
            await provider.log_context_message(
                conversation_id=conversation_id,
                content=content,
                role=role,
                name=name,
                model=model,
            )
    

    log_conversation async

    log_conversation(
        *, conversation_id: str, node_name: str, start_time: datetime | None = None
    ) -> None
    

    Log conversation to all providers.

    Source code in src/llmling_agent/storage/manager.py
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    @method_spawner
    async def log_conversation(
        self,
        *,
        conversation_id: str,
        node_name: str,
        start_time: datetime | None = None,
    ) -> None:
        """Log conversation to all providers."""
        if not self.config.log_conversations:
            return
    
        for provider in self.providers:
            await provider.log_conversation(
                conversation_id=conversation_id,
                node_name=node_name,
                start_time=start_time,
            )
    

    log_message async

    log_message(message: ChatMessage[Any]) -> None
    

    Log message to all providers.

    Source code in src/llmling_agent/storage/manager.py
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    @method_spawner
    async def log_message(self, message: ChatMessage[Any]) -> None:
        """Log message to all providers."""
        if not self.config.log_messages:
            return
    
        for provider in self.providers:
            if provider.should_log_agent(message.name or "no name"):
                await provider.log_message(
                    conversation_id=message.conversation_id or "",
                    message_id=message.message_id,
                    content=str(message.content),
                    role=message.role,
                    name=message.name,
                    cost_info=message.cost_info,
                    model=message.model_name,
                    response_time=message.response_time,
                    forwarded_from=message.forwarded_from,
                    provider_name=message.provider_name,
                    provider_response_id=message.provider_response_id,
                    messages=serialize_messages(message.messages),
                    finish_reason=message.finish_reason,
                )
    

    reset async

    reset(*, agent_name: str | None = None, hard: bool = False) -> tuple[int, int]
    

    Reset storage in all providers concurrently.

    Source code in src/llmling_agent/storage/manager.py
    282
    283
    284
    285
    286
    287
    288
    289
    290
    291
    292
    293
    294
    295
    296
    297
    298
    299
    300
    301
    @method_spawner
    async def reset(
        self,
        *,
        agent_name: str | None = None,
        hard: bool = False,
    ) -> tuple[int, int]:
        """Reset storage in all providers concurrently."""
    
        async def reset_provider(provider: StorageProvider) -> tuple[int, int]:
            try:
                return await provider.reset(agent_name=agent_name, hard=hard)
            except Exception:
                cls_name = provider.__class__.__name__
                logger.exception("Error resetting provider", provider=cls_name)
                return (0, 0)
    
        results = await asyncio.gather(*(reset_provider(provider) for provider in self.providers))
        # Return the counts from the last provider (maintaining existing behavior)
        return results[-1] if results else (0, 0)
    

    deserialize_messages

    deserialize_messages(messages_json: str | None) -> list[ModelMessage]
    

    Deserialize pydantic-ai ModelMessage list from JSON string.

    Parameters:

    Name Type Description Default
    messages_json str | None

    JSON string representation of messages or None if empty

    required

    Returns:

    Type Description
    list[ModelMessage]

    List of ModelMessage objects, empty if deserialization fails

    Source code in src/llmling_agent/storage/serialization.py
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    def deserialize_messages(messages_json: str | None) -> list[ModelMessage]:
        """Deserialize pydantic-ai ModelMessage list from JSON string.
    
        Args:
            messages_json: JSON string representation of messages or None if empty
    
        Returns:
            List of ModelMessage objects, empty if deserialization fails
        """
        if not messages_json:
            return []
    
        try:
            # Deserialize using pydantic's JSON deserialization
            return messages_adapter.validate_json(messages_json.encode())
        except Exception as e:  # noqa: BLE001
            logger.warning("Failed to deserialize model messages", error=e)
            return []  # Return empty list on failure
    

    deserialize_parts

    deserialize_parts(parts_json: str | None) -> Sequence[ModelResponsePart]
    

    Deserialize pydantic-ai message parts from JSON string.

    Parameters:

    Name Type Description Default
    parts_json str | None

    JSON string representation of parts or None if empty

    required

    Returns:

    Type Description
    Sequence[ModelResponsePart]

    Sequence of ModelResponsePart objects, empty if deserialization fails

    Source code in src/llmling_agent/storage/serialization.py
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    def deserialize_parts(parts_json: str | None) -> Sequence[ModelResponsePart]:
        """Deserialize pydantic-ai message parts from JSON string.
    
        Args:
            parts_json: JSON string representation of parts or None if empty
    
        Returns:
            Sequence of ModelResponsePart objects, empty if deserialization fails
        """
        if not parts_json:
            return []
    
        try:
            # Deserialize using pydantic's JSON deserialization
            return parts_adapter.validate_json(parts_json.encode())
        except Exception as e:  # noqa: BLE001
            logger.warning("Failed to deserialize message parts", error=e)
            return []  # Return empty list on failure
    

    serialize_messages

    serialize_messages(messages: Sequence[ModelMessage]) -> str | None
    

    Serialize pydantic-ai ModelMessage list to JSON string.

    Parameters:

    Name Type Description Default
    messages Sequence[ModelMessage]

    Sequence of ModelMessage objects from ChatMessage.messages

    required

    Returns:

    Type Description
    str | None

    JSON string representation of messages or None if empty

    Source code in src/llmling_agent/storage/serialization.py
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    def serialize_messages(messages: Sequence[ModelMessage]) -> str | None:
        """Serialize pydantic-ai ModelMessage list to JSON string.
    
        Args:
            messages: Sequence of ModelMessage objects from ChatMessage.messages
    
        Returns:
            JSON string representation of messages or None if empty
        """
        if not messages:
            return None
    
        try:
            # Convert messages to serializable format
            serializable_messages = []
            for message in messages:
                # Handle RetryPromptPart context serialization issues
                from pydantic_ai import ModelRequest, RetryPromptPart
    
                if isinstance(message, ModelRequest):
                    for part in message.parts:
                        if isinstance(part, RetryPromptPart) and isinstance(part.content, list):
                            for content in part.content:
                                if isinstance(content, dict) and "ctx" in content:
                                    content["ctx"] = {k: str(v) for k, v in content["ctx"].items()}
                serializable_messages.append(message)
    
            # Serialize using pydantic's JSON serialization
            return messages_adapter.dump_json(serializable_messages).decode()
        except Exception as e:  # noqa: BLE001
            logger.warning("Failed to serialize model messages", error=e)
            return str(messages)  # Fallback to string representation
    

    serialize_parts

    serialize_parts(parts: Sequence[ModelResponsePart | ModelRequestPart]) -> str | None
    

    Serialize pydantic-ai message parts from ChatMessage.

    Parameters:

    Name Type Description Default
    parts Sequence[ModelResponsePart | ModelRequestPart]

    Sequence of ModelResponsePart from ChatMessage.parts

    required

    Returns:

    Type Description
    str | None

    JSON string representation of parts or None if empty

    Source code in src/llmling_agent/storage/serialization.py
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    def serialize_parts(parts: Sequence[ModelResponsePart | ModelRequestPart]) -> str | None:
        """Serialize pydantic-ai message parts from ChatMessage.
    
        Args:
            parts: Sequence of ModelResponsePart from ChatMessage.parts
    
        Returns:
            JSON string representation of parts or None if empty
        """
        if not parts:
            return None
    
        try:
            # Convert parts to serializable format
            serializable_parts = []
            for part in parts:
                # Handle RetryPromptPart context serialization issues
                from pydantic_ai import RetryPromptPart
    
                if isinstance(part, RetryPromptPart) and isinstance(part.content, list):
                    for content in part.content:
                        if isinstance(content, dict) and "ctx" in content:
                            content["ctx"] = {k: str(v) for k, v in content["ctx"].items()}
                serializable_parts.append(part)
    
            # Serialize using pydantic's JSON serialization
            return parts_adapter.dump_json(serializable_parts).decode()
        except Exception as e:  # noqa: BLE001
            logger.warning("Failed to serialize message parts", error=e)
            return str(parts)  # Fallback to string representation