diff --git a/examples/01_standalone_sdk/31_insight_tool.py b/examples/01_standalone_sdk/31_insight_tool.py new file mode 100644 index 0000000000..1d55783ae5 --- /dev/null +++ b/examples/01_standalone_sdk/31_insight_tool.py @@ -0,0 +1,114 @@ +"""Example demonstrating the Insight tool for session analysis. + +This example shows how to use the Insight tool to analyze conversation +history and generate usage reports with optimization suggestions. + +The Insight tool can be triggered by the agent when user types '/insight'. +""" + +import os + +from pydantic import SecretStr + +from openhands.sdk import LLM, Agent, Conversation +from openhands.sdk.tool import Tool +from openhands.tools.insight import InsightAction, InsightObservation, InsightTool +from openhands.tools.preset.default import get_default_tools + + +# Configure LLM +api_key: str | None = os.getenv("LLM_API_KEY") +assert api_key is not None, "LLM_API_KEY environment variable is not set." + +llm: LLM = LLM( + model=os.getenv("LLM_MODEL", "anthropic/claude-sonnet-4-5-20250929"), + api_key=os.getenv("LLM_API_KEY"), + base_url=os.getenv("LLM_BASE_URL", None), + usage_id="agent", + drop_params=True, +) + +# Build tools list with Insight tool +tools = get_default_tools(enable_browser=False) + +# Configure Insight tool with parameters +insight_params: dict[str, bool | str] = {} + +# Add LLM configuration for Insight tool (uses same LLM as main agent) +insight_params["llm_model"] = llm.model +if llm.api_key: + if isinstance(llm.api_key, SecretStr): + insight_params["api_key"] = llm.api_key.get_secret_value() + else: + insight_params["api_key"] = llm.api_key +if llm.base_url: + insight_params["api_base"] = llm.base_url + +# Add Insight tool to the agent +tools.append(Tool(name=InsightTool.name, params=insight_params)) + +# Create agent with Insight capabilities +agent: Agent = Agent(llm=llm, tools=tools) + +# Start conversation +cwd: str = os.getcwd() +PERSISTENCE_DIR = os.path.expanduser("~/.openhands") +CONVERSATIONS_DIR = os.path.join(PERSISTENCE_DIR, "conversations") +conversation = Conversation( + agent=agent, workspace=cwd, persistence_dir=CONVERSATIONS_DIR +) + +# Run insight analysis directly using execute_tool +print("\nRunning insight analysis on conversation history...") +try: + insight_result = conversation.execute_tool( + "insight", + InsightAction( + generate_html=True, + suggest_skills=True, + max_sessions=20, + ), + ) + + # Cast to the expected observation type for type-safe access + if isinstance(insight_result, InsightObservation): + print(f"\n{insight_result.summary}") + print(f"\nSessions analyzed: {insight_result.sessions_analyzed}") + + if insight_result.common_patterns: + print("\nCommon Patterns:") + for pattern in insight_result.common_patterns: + print(f" - {pattern}") + + if insight_result.bottlenecks: + print("\nIdentified Bottlenecks:") + for bottleneck in insight_result.bottlenecks: + print(f" - {bottleneck}") + + if insight_result.suggestions: + print("\nOptimization Suggestions:") + for i, suggestion in enumerate(insight_result.suggestions, 1): + print(f" {i}. {suggestion}") + + if insight_result.report_path: + print(f"\nHTML Report generated: {insight_result.report_path}") + else: + print(f"Result: {insight_result.text}") + +except KeyError as e: + print(f"Tool not available: {e}") + +print("\n" + "=" * 80) +print("Insight tool example completed!") +print("=" * 80) + +# Report cost +cost = llm.metrics.accumulated_cost +print(f"EXAMPLE_COST: {cost}") + + +# Alternative: Use conversation to trigger insight via message +# Uncomment to test triggering via conversation: +# +# conversation.send_message("Please analyze my usage with /insight") +# conversation.run() diff --git a/openhands-tools/openhands/tools/insight/__init__.py b/openhands-tools/openhands/tools/insight/__init__.py new file mode 100644 index 0000000000..4b8f49ddcc --- /dev/null +++ b/openhands-tools/openhands/tools/insight/__init__.py @@ -0,0 +1,18 @@ +"""Insight tool for session analysis and personalization. + +This tool provides session analysis capabilities by scanning historical +conversation data and generating usage reports with optimization suggestions. +""" + +from openhands.tools.insight.definition import ( + InsightAction, + InsightObservation, + InsightTool, +) + + +__all__ = [ + "InsightTool", + "InsightAction", + "InsightObservation", +] diff --git a/openhands-tools/openhands/tools/insight/definition.py b/openhands-tools/openhands/tools/insight/definition.py new file mode 100644 index 0000000000..f01483ab25 --- /dev/null +++ b/openhands-tools/openhands/tools/insight/definition.py @@ -0,0 +1,168 @@ +"""Insight tool definition. + +This module provides the InsightTool for analyzing conversation sessions +and generating usage reports with optimization suggestions. +""" + +from collections.abc import Sequence +from typing import TYPE_CHECKING, Any, override + +from pydantic import Field + +from openhands.sdk.io import LocalFileStore +from openhands.sdk.llm import ImageContent, TextContent +from openhands.sdk.tool import Action, Observation, ToolDefinition, register_tool + + +if TYPE_CHECKING: + from openhands.sdk.conversation.state import ConversationState + + +# ==================== Action Schema ==================== + + +class InsightAction(Action): + """Action to generate session insights and usage report.""" + + generate_html: bool = Field( + default=True, + description="Whether to generate an HTML report dashboard", + ) + suggest_skills: bool = Field( + default=True, + description="Whether to suggest new skills based on usage patterns", + ) + max_sessions: int = Field( + default=50, + description="Maximum number of recent sessions to analyze", + ) + + +# ==================== Observation Schema ==================== + + +class InsightObservation(Observation): + """Observation from insight analysis.""" + + summary: str = Field( + default="", description="Summary of the session analysis" + ) + sessions_analyzed: int = Field( + default=0, description="Number of sessions analyzed" + ) + common_patterns: list[str] = Field( + default_factory=list, description="Common usage patterns identified" + ) + bottlenecks: list[str] = Field( + default_factory=list, description="Identified bottlenecks or issues" + ) + suggestions: list[str] = Field( + default_factory=list, description="Optimization suggestions" + ) + report_path: str | None = Field( + default=None, description="Path to generated HTML report" + ) + + @property + @override + def to_llm_content(self) -> Sequence[TextContent | ImageContent]: + """Convert observation to LLM-readable content.""" + parts = [] + + if self.summary: + parts.append(f"## Session Analysis Summary\n{self.summary}") + + parts.append(f"\n**Sessions Analyzed:** {self.sessions_analyzed}") + + if self.common_patterns: + parts.append("\n### Common Patterns") + for pattern in self.common_patterns: + parts.append(f"- {pattern}") + + if self.bottlenecks: + parts.append("\n### Identified Bottlenecks") + for bottleneck in self.bottlenecks: + parts.append(f"- {bottleneck}") + + if self.suggestions: + parts.append("\n### Optimization Suggestions") + for i, suggestion in enumerate(self.suggestions, 1): + parts.append(f"{i}. {suggestion}") + + if self.report_path: + parts.append(f"\n**HTML Report:** {self.report_path}") + + return [TextContent(text="\n".join(parts))] + + +# ==================== Tool Description ==================== + +_INSIGHT_DESCRIPTION = """Analyze conversation history and generate usage insights. + +This tool scans historical session data to identify: +- Common usage patterns and workflows +- Bottlenecks and recurring issues +- Opportunities for optimization + +Use this tool when: +- User requests '/insight' or wants to analyze their usage +- You need to understand user patterns for personalization +- User wants suggestions for workflow improvements + +The tool can generate an HTML dashboard report and suggest new skills +to automate repetitive tasks.""" + + +# ==================== Tool Definition ==================== + + +class InsightTool(ToolDefinition[InsightAction, InsightObservation]): + """Tool for analyzing sessions and generating insights.""" + + @classmethod + @override + def create( + cls, + conv_state: "ConversationState", + llm_model: str | None = None, + api_key: str | None = None, + api_base: str | None = None, + ) -> Sequence[ToolDefinition[Any, Any]]: + """Initialize insight tool with executor parameters. + + Args: + conv_state: Conversation state (required by registry) + llm_model: LLM model to use for analysis + api_key: API key for LLM + api_base: Base URL for LLM + + Returns: + Sequence containing InsightTool instance + """ + # conv_state required by registry but not used - state passed at runtime + _ = conv_state + + # Import here to avoid circular imports + from openhands.tools.insight.executor import InsightExecutor + + file_store = LocalFileStore(root="~/.openhands") + + executor = InsightExecutor( + file_store=file_store, + llm_model=llm_model, + api_key=api_key, + api_base=api_base, + ) + + return [ + cls( + description=_INSIGHT_DESCRIPTION, + action_type=InsightAction, + observation_type=InsightObservation, + executor=executor, + ) + ] + + +# Automatically register the tool when this module is imported +register_tool(InsightTool.name, InsightTool) diff --git a/openhands-tools/openhands/tools/insight/executor.py b/openhands-tools/openhands/tools/insight/executor.py new file mode 100644 index 0000000000..cc87ee99a4 --- /dev/null +++ b/openhands-tools/openhands/tools/insight/executor.py @@ -0,0 +1,523 @@ +"""Executor for Insight tool.""" + +import json +from collections import Counter +from datetime import datetime +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from openhands.sdk.conversation.event_store import EventLog +from openhands.sdk.event import ActionEvent, ObservationEvent +from openhands.sdk.io import FileStore +from openhands.sdk.logger import get_logger +from openhands.sdk.tool import ToolExecutor +from openhands.tools.insight.definition import ( + InsightAction, + InsightObservation, +) + + +if TYPE_CHECKING: + from openhands.sdk.conversation.base import BaseConversation + +logger = get_logger(__name__) + + +class InsightExecutor(ToolExecutor[InsightAction, InsightObservation]): + """Executor for analyzing sessions and generating insights. + + This executor scans conversation history to identify usage patterns, + bottlenecks, and optimization opportunities. + """ + + def __init__( + self, + file_store: FileStore, + llm_model: str | None = None, + api_key: str | None = None, + api_base: str | None = None, + ): + """Initialize Insight executor. + + Args: + file_store: File store for accessing conversation data + llm_model: LLM model to use for analysis (optional) + api_key: API key for LLM (optional) + api_base: Base URL for LLM (optional) + """ + self.file_store: FileStore = file_store + self.llm_model: str | None = llm_model + self.api_key: str | None = api_key + self.api_base: str | None = api_base + self.conversations_dir: str = "conversations" + self.usage_data_dir: str = "usage-data" + + def __call__( + self, + action: InsightAction, + conversation: "BaseConversation | None" = None, # noqa: ARG002 + ) -> InsightObservation: + """Execute insight analysis. + + Args: + action: The insight action with configuration + conversation: Conversation context (unused, for interface compatibility) + + Returns: + InsightObservation with analysis results + """ + logger.info("Starting session insight analysis") + + try: + # Collect session data + sessions_data = self._collect_sessions(action.max_sessions) + + if not sessions_data: + return InsightObservation( + summary="No conversation sessions found to analyze.", + sessions_analyzed=0, + ) + + # Analyze patterns + analysis = self._analyze_sessions(sessions_data) + + # Generate HTML report if requested + report_path = None + if action.generate_html: + report_path = self._generate_html_report(analysis, sessions_data) + + # Generate skill suggestions if requested + suggestions = [] + if action.suggest_skills: + suggestions = self._generate_suggestions(analysis) + + logger.info( + f"Insight analysis complete: {len(sessions_data)} sessions analyzed" + ) + + return InsightObservation( + summary=analysis.get("summary", ""), + sessions_analyzed=len(sessions_data), + common_patterns=analysis.get("patterns", []), + bottlenecks=analysis.get("bottlenecks", []), + suggestions=suggestions, + report_path=report_path, + ) + + except Exception as e: + logger.error(f"Error during insight analysis: {e}") + return InsightObservation( + summary=f"Error during analysis: {str(e)}", + sessions_analyzed=0, + ) + + def _collect_sessions(self, max_sessions: int) -> list[dict[str, Any]]: + """Collect session data from conversation history. + + Args: + max_sessions: Maximum number of sessions to collect + + Returns: + List of session data dictionaries + """ + sessions = [] + + try: + session_paths = self.file_store.list(self.conversations_dir) + all_sessions = [ + Path(path).name + for path in session_paths + if not Path(path).name.startswith(".") + ] + + # Sort by modification time (most recent first) + all_sessions = all_sessions[:max_sessions] + + for session_id in all_sessions: + session_data = self._extract_session_data(session_id) + if session_data: + sessions.append(session_data) + + except Exception as e: + logger.warning(f"Error collecting sessions: {e}") + + return sessions + + def _extract_session_data(self, session_id: str) -> dict[str, Any] | None: + """Extract data from a single session. + + Args: + session_id: The session ID to extract + + Returns: + Dictionary with session data or None if extraction fails + """ + try: + events_dir = f"{self.conversations_dir}/{session_id}/events" + + # First check if there are any event files + try: + event_files = self.file_store.list(events_dir) + # Filter out non-event files like .eventlog.lock + event_files = [ + f for f in event_files + if f.endswith(".json") and "event-" in f + ] + if not event_files: + return None + except Exception: + return None + + # Count event types + action_counts: Counter[str] = Counter() + error_count = 0 + tool_usage: Counter[str] = Counter() + event_count = 0 + first_timestamp = None + last_timestamp = None + + # Try to load and iterate events, handling deserialization errors + # (older sessions may have incompatible schemas) + try: + events = EventLog(self.file_store, events_dir) + for event in events: + try: + event_count += 1 + if first_timestamp is None: + first_timestamp = getattr(event, "timestamp", None) + last_timestamp = getattr(event, "timestamp", None) + + if isinstance(event, ActionEvent): + if event.action: + action_type = type(event.action).__name__ + action_counts[action_type] += 1 + # Track tool usage + if hasattr(event.action, "tool_name"): + tool_usage[event.action.tool_name] += 1 + elif isinstance(event, ObservationEvent): + if event.observation: + # Check for errors + obs_text = getattr(event.observation, "text", "") + if "error" in obs_text.lower(): + error_count += 1 + except Exception: + # Skip individual events that fail to parse + event_count += 1 + continue + except Exception: + # EventLog iteration failed (schema incompatibility) + # Use file count as fallback + pass + + # If we couldn't parse any events, use file count as estimate + if event_count == 0: + event_count = len(event_files) + + if event_count == 0: + return None + + return { + "session_id": session_id, + "event_count": event_count, + "action_counts": dict(action_counts), + "tool_usage": dict(tool_usage), + "error_count": error_count, + "start_time": first_timestamp, + "end_time": last_timestamp, + } + + except Exception as e: + logger.debug(f"Failed to extract session {session_id}: {e}") + return None + + def _analyze_sessions( + self, sessions_data: list[dict[str, Any]] + ) -> dict[str, Any]: + """Analyze collected session data. + + Args: + sessions_data: List of session data dictionaries + + Returns: + Analysis results dictionary + """ + # Aggregate statistics + total_events = sum(s.get("event_count", 0) for s in sessions_data) + total_errors = sum(s.get("error_count", 0) for s in sessions_data) + + # Aggregate action types + all_actions: Counter[str] = Counter() + all_tools: Counter[str] = Counter() + + for session in sessions_data: + all_actions.update(session.get("action_counts", {})) + all_tools.update(session.get("tool_usage", {})) + + # Identify patterns + patterns = [] + most_common_actions = all_actions.most_common(5) + if most_common_actions: + patterns.append( + f"Most used actions: {', '.join(a[0] for a in most_common_actions)}" + ) + + most_common_tools = all_tools.most_common(5) + if most_common_tools: + patterns.append( + f"Most used tools: {', '.join(t[0] for t in most_common_tools)}" + ) + + # Calculate average session length + avg_events = total_events / len(sessions_data) if sessions_data else 0 + patterns.append(f"Average events per session: {avg_events:.1f}") + + # Identify bottlenecks + bottlenecks = [] + error_rate = total_errors / total_events if total_events > 0 else 0 + if error_rate > 0.1: + bottlenecks.append( + f"High error rate detected: {error_rate:.1%} of operations" + ) + + # Check for repetitive patterns + for action, count in most_common_actions: + if count > len(sessions_data) * 3: + bottlenecks.append( + f"Repetitive action pattern: '{action}' used {count} times" + ) + + # Generate summary + summary = ( + f"Analyzed {len(sessions_data)} sessions with {total_events} total events. " + f"Found {len(patterns)} usage patterns and {len(bottlenecks)} potential " + f"optimization opportunities." + ) + + return { + "summary": summary, + "patterns": patterns, + "bottlenecks": bottlenecks, + "total_events": total_events, + "total_errors": total_errors, + "action_counts": dict(all_actions), + "tool_usage": dict(all_tools), + } + + def _generate_suggestions( + self, analysis: dict[str, Any] + ) -> list[str]: + """Generate optimization suggestions based on analysis. + + Args: + analysis: Analysis results dictionary + + Returns: + List of suggestion strings + """ + suggestions = [] + + # Suggest based on error rate + total_events = analysis.get("total_events", 0) + total_errors = analysis.get("total_errors", 0) + if total_events > 0 and total_errors / total_events > 0.1: + suggestions.append( + "Consider creating error-handling skills for common failure patterns" + ) + + # Suggest based on repetitive actions + action_counts = analysis.get("action_counts", {}) + for action, count in action_counts.items(): + if count > 20: + suggestions.append( + f"Create a custom skill to automate '{action}' workflows" + ) + + # Suggest based on tool usage + tool_usage = analysis.get("tool_usage", {}) + if len(tool_usage) < 3: + suggestions.append( + "Explore additional tools to expand capabilities" + ) + + # Default suggestion if none found + if not suggestions: + suggestions.append( + "Your usage patterns look efficient! " + "Consider documenting your workflows as skills for sharing." + ) + + return suggestions[:5] # Limit to top 5 suggestions + + def _generate_html_report( + self, + analysis: dict[str, Any], + sessions_data: list[dict[str, Any]], + ) -> str | None: + """Generate an HTML dashboard report. + + Args: + analysis: Analysis results dictionary + sessions_data: Raw session data + + Returns: + Path to generated HTML report or None if generation fails + """ + try: + # Ensure usage-data directory exists + report_dir = Path.home() / ".openhands" / self.usage_data_dir + report_dir.mkdir(parents=True, exist_ok=True) + + # Generate report filename with timestamp + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + report_filename = f"insight_report_{timestamp}.html" + report_path = report_dir / report_filename + + # Generate HTML content + html_content = self._build_html_report(analysis, sessions_data) + + # Write report + report_path.write_text(html_content) + + logger.info(f"Generated HTML report: {report_path}") + return str(report_path) + + except Exception as e: + logger.error(f"Failed to generate HTML report: {e}") + return None + + def _build_html_report( + self, + analysis: dict[str, Any], + sessions_data: list[dict[str, Any]], + ) -> str: + """Build HTML content for the report. + + Args: + analysis: Analysis results dictionary + sessions_data: Raw session data + + Returns: + HTML string + """ + patterns_html = "\n".join( + f"
{analysis.get("summary", "")}
+