Skip to content

aggregating

Class info

Classes

Name Children Inherits
AggregatingResourceProvider
llmling_agent.resource_providers.aggregating
Provider that combines resources from multiple providers.
ResourceProvider
llmling_agent.resource_providers.base
Base class for resource providers.

🛈 DocStrings

Aggregating resource provider.

AggregatingResourceProvider

Bases: ResourceProvider

Provider that combines resources from multiple providers.

Source code in src/llmling_agent/resource_providers/aggregating.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class AggregatingResourceProvider(ResourceProvider):
    """Provider that combines resources from multiple providers."""

    def __init__(self, providers: list[ResourceProvider], name: str = "aggregating") -> None:
        """Initialize provider with list of providers to aggregate.

        Args:
            providers: Resource providers to aggregate (stores reference to list)
            name: Name for this provider
        """
        super().__init__(name=name)
        # Store reference to the providers list for dynamic updates
        self.providers = providers

    async def get_tools(self) -> list[Tool]:
        """Get tools from all providers."""
        return [t for provider in self.providers for t in await provider.get_tools()]

    async def get_prompts(self) -> list[BasePrompt]:
        """Get prompts from all providers."""
        return [p for provider in self.providers for p in await provider.get_prompts()]

    async def get_resources(self) -> list[ResourceInfo]:
        """Get resources from all providers."""
        return [r for provider in self.providers for r in await provider.get_resources()]

    async def get_request_parts(
        self, name: str, arguments: dict[str, str] | None = None
    ) -> list[ModelRequestPart]:
        """Try to get prompt from first provider that has it."""
        for provider in self.providers:
            try:
                return await provider.get_request_parts(name, arguments)
            except KeyError:
                continue
        msg = f"Prompt {name!r} not found in any provider"
        raise KeyError(msg)

__init__

__init__(providers: list[ResourceProvider], name: str = 'aggregating') -> None

Initialize provider with list of providers to aggregate.

Parameters:

Name Type Description Default
providers list[ResourceProvider]

Resource providers to aggregate (stores reference to list)

required
name str

Name for this provider

'aggregating'
Source code in src/llmling_agent/resource_providers/aggregating.py
21
22
23
24
25
26
27
28
29
30
def __init__(self, providers: list[ResourceProvider], name: str = "aggregating") -> None:
    """Initialize provider with list of providers to aggregate.

    Args:
        providers: Resource providers to aggregate (stores reference to list)
        name: Name for this provider
    """
    super().__init__(name=name)
    # Store reference to the providers list for dynamic updates
    self.providers = providers

get_prompts async

get_prompts() -> list[BasePrompt]

Get prompts from all providers.

Source code in src/llmling_agent/resource_providers/aggregating.py
36
37
38
async def get_prompts(self) -> list[BasePrompt]:
    """Get prompts from all providers."""
    return [p for provider in self.providers for p in await provider.get_prompts()]

get_request_parts async

get_request_parts(
    name: str, arguments: dict[str, str] | None = None
) -> list[ModelRequestPart]

Try to get prompt from first provider that has it.

Source code in src/llmling_agent/resource_providers/aggregating.py
44
45
46
47
48
49
50
51
52
53
54
async def get_request_parts(
    self, name: str, arguments: dict[str, str] | None = None
) -> list[ModelRequestPart]:
    """Try to get prompt from first provider that has it."""
    for provider in self.providers:
        try:
            return await provider.get_request_parts(name, arguments)
        except KeyError:
            continue
    msg = f"Prompt {name!r} not found in any provider"
    raise KeyError(msg)

get_resources async

get_resources() -> list[ResourceInfo]

Get resources from all providers.

Source code in src/llmling_agent/resource_providers/aggregating.py
40
41
42
async def get_resources(self) -> list[ResourceInfo]:
    """Get resources from all providers."""
    return [r for provider in self.providers for r in await provider.get_resources()]

get_tools async

get_tools() -> list[Tool]

Get tools from all providers.

Source code in src/llmling_agent/resource_providers/aggregating.py
32
33
34
async def get_tools(self) -> list[Tool]:
    """Get tools from all providers."""
    return [t for provider in self.providers for t in await provider.get_tools()]