Skip to content

LLMAgent Tasks Module Redesign Proposal #4

Open
@madawei2699

Description

@madawei2699

LLMAgent Tasks Module Redesign Proposal

Executive Summary

This document proposes a redesign of the LLMAgent Tasks module to seamlessly integrate with the Flows system. The redesign aims to leverage LLM capabilities for dynamic task execution management, enable parallel tool execution, and provide a more intuitive developer experience.

Design Philosophy

The redesigned Tasks module will:

  1. Flow Integration: Tasks will be integrated as a natural extension of the Flow system rather than a separate subsystem
  2. Tool-Centric: Long-running operations will be defined as task-aware tools rather than standalone tasks
  3. LLM-Driven: The LLM will make context-aware decisions about task management and scheduling
  4. Progressive Disclosure: Users will receive appropriate feedback during long-running operations
  5. Parallelism: Support concurrent execution of multiple tools when appropriate

Architecture Components

1. Task-Aware Tools

Tools that may take significant time to complete will be marked with task-related attributes:

%{
  name: "complex_analysis",
  description: "Analyze large datasets",
  parameters: %{...},
  execute: &MyApp.Tools.analyze_dataset/1,
  
  # Task-specific attributes
  task_attributes: %{
    async: true,               # Run asynchronously
    estimated_duration: :long, # Indicates to LLM this is a long operation
    provides_progress: true,   # Tool can report progress
    cancelable: true,          # Can be canceled mid-execution
    priority: :normal          # Execution priority (:high, :normal, :low)
  }
}

2. Task Signals

Introducing new signal types for task management:

# Task started signal
Signals.task_started(task_id, tool_name, args)

# Task progress signal
Signals.task_progress(task_id, tool_name, percentage, message)

# Task complete signal
Signals.task_completed(task_id, tool_name, result)

# Task error signal
Signals.task_error(task_id, tool_name, error)

# Task cancellation signal 
Signals.task_cancel(task_id)

3. Enhanced Tool Handler

A modified tool handler that understands task-aware tools:

def tool_handler(%{type: :tool_call} = signal, state) do
  tool_name = signal.data.name
  tool_args = signal.data.args
  store_name = get_store_name(state)
  
  # Find tool definition
  tool = find_tool(tool_name, state.available_tools)
  task_attributes = Map.get(tool, :task_attributes, %{})
  
  # Determine execution strategy
  if Map.get(task_attributes, :async, false) do
    # Generate unique task ID
    task_id = "task_#{System.unique_integer([:positive, :monotonic])}"
    
    # Log task start in store
    Store.add_task(store_name, %{
      id: task_id,
      tool_name: tool_name,
      args: tool_args,
      started_at: DateTime.utc_now(),
      status: "running",
      progress: 0
    })
    
    # Start task in background
    Task.start(fn ->
      execute_async_tool(task_id, tool, tool_args, store_name)
    end)
    
    # Generate start signal
    start_signal = Signals.task_started(task_id, tool_name, tool_args)
    {{:emit, start_signal}, state}
  else
    # For non-task tools, use the existing synchronous execution
    # (This is the current tool_handler implementation)
    execute_synchronous_tool(tool, tool_args, state)
  end
end

4. Task Progress Handling

New handler for task progress signals:

def task_progress_handler(%{type: :task_progress} = signal, state) do
  task_id = signal.data.task_id
  percentage = signal.data.percentage
  message = signal.data.message
  store_name = get_store_name(state)
  
  # Update task progress in store
  Store.update_task_progress(store_name, task_id, percentage, message)
  
  # Check if we should notify the LLM about progress
  if should_notify_progress?(percentage, state) do
    # Generate thinking signal with progress update
    thinking = Signals.thinking(
      "Task #{task_id} is #{percentage}% complete: #{message}",
      System.unique_integer([:positive])
    )
    {{:emit, thinking}, state}
  else
    # Continue without emitting a new signal
    {:skip, state}
  end
end

5. Task Completion Handling

Handler for task completion signals:

def task_completed_handler(%{type: :task_completed} = signal, state) do
  task_id = signal.data.task_id
  tool_name = signal.data.tool_name
  result = signal.data.result
  store_name = get_store_name(state)
  
  # Update task status in store
  Store.update_task(store_name, task_id, %{
    status: "completed",
    completed_at: DateTime.utc_now(),
    result: result
  })
  
  # Convert to tool result for normal processing
  tool_result = Signals.tool_result(tool_name, result)
  {{:emit, tool_result}, state}
end

6. Enhanced Store Functions

Addition of task management functions to the Store module:

# Add a new task
def add_task(store_name, task) do
  update_tasks(store_name, fn tasks -> [task | tasks] end)
end

# Update a task
def update_task(store_name, task_id, updates) do
  update_tasks(store_name, fn tasks ->
    Enum.map(tasks, fn task ->
      if task.id == task_id, do: Map.merge(task, updates), else: task
    end)
  end)
end

# Update task progress
def update_task_progress(store_name, task_id, percentage, message \\ nil) do
  update_task(store_name, task_id, %{
    progress: percentage,
    last_update: DateTime.utc_now(),
    last_message: message
  })
end

# Get all running tasks
def get_running_tasks(store_name) do
  {:ok, tasks} = get(store_name, :tasks, [])
  Enum.filter(tasks, &(&1.status == "running"))
end

# Helper function to update tasks list
defp update_tasks(store_name, updater) do
  {:ok, tasks} = get(store_name, :tasks, [])
  put(store_name, :tasks, updater.(tasks))
end

7. LLM Context Enhancement

Modified thinking handler to include task context:

def thinking_handler(%{type: :thinking} = signal, state) do
  # Get running tasks
  store_name = get_store_name(state)
  running_tasks = Store.get_running_tasks(store_name)
  
  # Include task context in LLM call
  history = Store.get_llm_history(store_name)
  
  # If there are running tasks, include their status in the context
  llm_options = 
    if length(running_tasks) > 0 do
      Map.put(state.llm_options || %{}, :context, %{
        running_tasks: format_tasks_for_llm(running_tasks)
      })
    else
      state.llm_options
    end
  
  # Call LLM with enhanced context
  llm_result = LLMAgent.Plugin.call_llm(%{
    "provider" => state.provider,
    "messages" => history,
    "tools" => state.available_tools,
    "options" => llm_options
  })
  
  # Rest of the handler remains the same
  # ...
end

# Format tasks for LLM consumption
defp format_tasks_for_llm(tasks) do
  Enum.map(tasks, fn task ->
    %{
      id: task.id,
      tool: task.tool_name,
      progress: task.progress,
      started_at: task.started_at,
      elapsed_seconds: DateTime.diff(DateTime.utc_now(), task.started_at),
      last_update: Map.get(task, :last_message)
    }
  end)
end

8. System Prompt Enhancement

Additional system prompt content to guide the LLM on task management:

system_prompt_addon = """
You can work with tools that run asynchronously for long operations. When using such tools:

1. You will receive progress updates that you should relay to the user
2. You can decide to wait for completion or work on other tasks in parallel
3. You can provide partial information while waiting for results
4. You should adapt based on the importance of the task and user needs

When multiple tasks are running:
- Prioritize addressing user questions about running tasks
- Consider providing updates on long-running tasks
- Suggest next steps while waiting for completion
"""

Implementation Plan

Phase 1: Core Components

  1. Define task-aware tool structure
  2. Implement task-related signals
  3. Enhance Store with task management functions
  4. Modify tool_handler to support async execution

Phase 2: Signal Handling

  1. Implement task_progress_handler
  2. Implement task_completed_handler
  3. Implement task_error_handler
  4. Enhance thinking_handler with task context

Phase 3: Testing & Integration

  1. Develop comprehensive test suite
  2. Create example implementations
  3. Update documentation
  4. Implement migration path from existing Tasks

Usage Examples

Defining Task-Aware Tools

tools = [
  %{
    name: "quick_search",
    description: "Quick web search for recent information",
    execute: &MyApp.Tools.quick_search/1
  },
  %{
    name: "deep_analysis",
    description: "Perform deep analysis on large datasets",
    execute: &MyApp.Tools.analyze_data/1,
    task_attributes: %{
      async: true,
      estimated_duration: :long,
      provides_progress: true
    }
  }
]

system_prompt = """
You are an assistant capable of both quick searches and deep data analysis.
For deep analysis, please keep the user informed about progress.
"""

{flow, state} = LLMAgent.Flows.conversation(system_prompt, tools)

Task Implementation With Progress

def analyze_data(args) do
  # Extract dataset and parameters
  dataset = args["dataset"]
  params = args["parameters"]
  
  # Get task context
  task_id = Process.get(:current_task_id)
  store_name = Process.get(:store_name)
  
  try
    # Initial processing
    send_progress(10, "Loading dataset", task_id, store_name)
    
    # Data preparation
    send_progress(30, "Preparing data", task_id, store_name)
    
    # Main analysis
    send_progress(60, "Running analysis algorithms", task_id, store_name)
    
    # Result compilation
    send_progress(90, "Compiling results", task_id, store_name)
    
    # Final results
    result = %{
      summary: "Analysis complete",
      key_metrics: %{...},
      trends: [...],
      recommendations: [...]
    }
    
    # Return final result
    %{
      status: "success",
      result: result
    }
  catch
    kind, error ->
      %{
        status: "error",
        error: Exception.format(kind, error, __STACKTRACE__)
      }
  end
end

defp send_progress(percentage, message, task_id, store_name) do
  # Create progress signal
  progress_signal = Signals.task_progress(task_id, "deep_analysis", percentage, message)
  
  # Send to process running the flow
  send(Process.get(:flow_process), {:task_progress, progress_signal})
  
  # Also update store directly
  Store.update_task_progress(store_name, task_id, percentage, message)
  
  # Add some delay to simulate work
  Process.sleep(1000)
end

Migration Strategy

  1. Identify existing Tasks module usage in the codebase
  2. Create adapter functions to maintain backward compatibility
  3. Update examples and tests to use the new approach
  4. Document changes and migration path
  5. Deprecate standalone Tasks module functions in favor of integrated approach

Testing Strategy

  1. Unit tests for each new component:

    • Task signal creators
    • Task-specific handlers
    • Store task management functions
  2. Integration tests for end-to-end flows:

    • Asynchronous tool execution
    • Progress reporting
    • Task cancellation
    • Parallel task execution
  3. Mock LLM tests:

    • Verify LLM receives task context
    • Test LLM response to task updates

Conclusion

This redesign transforms the Tasks module from a standalone system into an integrated part of the Flow execution model. By making tools task-aware rather than creating a separate task concept, the system becomes more intuitive, flexible, and aligned with the dynamic nature of LLM-driven workflows.

The key benefits include:

  1. Simplified Mental Model: Developers can think in terms of tools that may be quick or long-running
  2. Improved User Experience: Progress updates and parallel execution
  3. LLM-Aware Design: Leverages the LLM's context understanding for better task management
  4. Architectural Consistency: Maintains the signal-driven design pattern

This approach addresses the limitations of the current Tasks module while enhancing the system's ability to handle complex, long-running operations in a natural way.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions