Skip to content

message_flow_tracker

Class info

Classes

Name Children Inherits
MessageFlowTracker
llmling_agent.delegation.message_flow_tracker

    🛈 DocStrings

    Agent pool management for collaboration.

    MessageFlowTracker

    Source code in src/llmling_agent/delegation/message_flow_tracker.py
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    class MessageFlowTracker:
        def __init__(self):
            self.events: list[Talk.ConnectionProcessed] = []
    
        def track(self, event: Talk.ConnectionProcessed):
            self.events.append(event)
    
        def filter(self, message: ChatMessage) -> list[ChatMessage]:
            """Filter events for specific conversation."""
            return [
                e.message
                for e in self.events
                if e.message.conversation_id == message.conversation_id
            ]
    
        def visualize(self, message: ChatMessage) -> str:
            """Get flow visualization for specific conversation."""
            # Filter events for this conversation
            conv_events = [
                e for e in self.events if e.message.conversation_id == message.conversation_id
            ]
            lines = ["flowchart LR"]
            for event in conv_events:
                source = event.message.name
                for target in event.targets:
                    lines.append(f"    {source}-->{target.name}")  # noqa: PERF401
            return "\n".join(lines)
    

    filter

    filter(message: ChatMessage) -> list[ChatMessage]
    

    Filter events for specific conversation.

    Source code in src/llmling_agent/delegation/message_flow_tracker.py
    20
    21
    22
    23
    24
    25
    26
    def filter(self, message: ChatMessage) -> list[ChatMessage]:
        """Filter events for specific conversation."""
        return [
            e.message
            for e in self.events
            if e.message.conversation_id == message.conversation_id
        ]
    

    visualize

    visualize(message: ChatMessage) -> str
    

    Get flow visualization for specific conversation.

    Source code in src/llmling_agent/delegation/message_flow_tracker.py
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    def visualize(self, message: ChatMessage) -> str:
        """Get flow visualization for specific conversation."""
        # Filter events for this conversation
        conv_events = [
            e for e in self.events if e.message.conversation_id == message.conversation_id
        ]
        lines = ["flowchart LR"]
        for event in conv_events:
            source = event.message.name
            for target in event.targets:
                lines.append(f"    {source}-->{target.name}")  # noqa: PERF401
        return "\n".join(lines)