Description
LLMAgent Tasks Module Redesign Proposal
Executive Summary
This document proposes a redesign of the LLMAgent Tasks module to seamlessly integrate with the Flows system. The redesign aims to leverage LLM capabilities for dynamic task execution management, enable parallel tool execution, and provide a more intuitive developer experience.
Design Philosophy
The redesigned Tasks module will:
- Flow Integration: Tasks will be integrated as a natural extension of the Flow system rather than a separate subsystem
- Tool-Centric: Long-running operations will be defined as task-aware tools rather than standalone tasks
- LLM-Driven: The LLM will make context-aware decisions about task management and scheduling
- Progressive Disclosure: Users will receive appropriate feedback during long-running operations
- Parallelism: Support concurrent execution of multiple tools when appropriate
Architecture Components
1. Task-Aware Tools
Tools that may take significant time to complete will be marked with task-related attributes:
%{
name: "complex_analysis",
description: "Analyze large datasets",
parameters: %{...},
execute: &MyApp.Tools.analyze_dataset/1,
# Task-specific attributes
task_attributes: %{
async: true, # Run asynchronously
estimated_duration: :long, # Indicates to LLM this is a long operation
provides_progress: true, # Tool can report progress
cancelable: true, # Can be canceled mid-execution
priority: :normal # Execution priority (:high, :normal, :low)
}
}
2. Task Signals
Introducing new signal types for task management:
# Task started signal
Signals.task_started(task_id, tool_name, args)
# Task progress signal
Signals.task_progress(task_id, tool_name, percentage, message)
# Task complete signal
Signals.task_completed(task_id, tool_name, result)
# Task error signal
Signals.task_error(task_id, tool_name, error)
# Task cancellation signal
Signals.task_cancel(task_id)
3. Enhanced Tool Handler
A modified tool handler that understands task-aware tools:
def tool_handler(%{type: :tool_call} = signal, state) do
tool_name = signal.data.name
tool_args = signal.data.args
store_name = get_store_name(state)
# Find tool definition
tool = find_tool(tool_name, state.available_tools)
task_attributes = Map.get(tool, :task_attributes, %{})
# Determine execution strategy
if Map.get(task_attributes, :async, false) do
# Generate unique task ID
task_id = "task_#{System.unique_integer([:positive, :monotonic])}"
# Log task start in store
Store.add_task(store_name, %{
id: task_id,
tool_name: tool_name,
args: tool_args,
started_at: DateTime.utc_now(),
status: "running",
progress: 0
})
# Start task in background
Task.start(fn ->
execute_async_tool(task_id, tool, tool_args, store_name)
end)
# Generate start signal
start_signal = Signals.task_started(task_id, tool_name, tool_args)
{{:emit, start_signal}, state}
else
# For non-task tools, use the existing synchronous execution
# (This is the current tool_handler implementation)
execute_synchronous_tool(tool, tool_args, state)
end
end
4. Task Progress Handling
New handler for task progress signals:
def task_progress_handler(%{type: :task_progress} = signal, state) do
task_id = signal.data.task_id
percentage = signal.data.percentage
message = signal.data.message
store_name = get_store_name(state)
# Update task progress in store
Store.update_task_progress(store_name, task_id, percentage, message)
# Check if we should notify the LLM about progress
if should_notify_progress?(percentage, state) do
# Generate thinking signal with progress update
thinking = Signals.thinking(
"Task #{task_id} is #{percentage}% complete: #{message}",
System.unique_integer([:positive])
)
{{:emit, thinking}, state}
else
# Continue without emitting a new signal
{:skip, state}
end
end
5. Task Completion Handling
Handler for task completion signals:
def task_completed_handler(%{type: :task_completed} = signal, state) do
task_id = signal.data.task_id
tool_name = signal.data.tool_name
result = signal.data.result
store_name = get_store_name(state)
# Update task status in store
Store.update_task(store_name, task_id, %{
status: "completed",
completed_at: DateTime.utc_now(),
result: result
})
# Convert to tool result for normal processing
tool_result = Signals.tool_result(tool_name, result)
{{:emit, tool_result}, state}
end
6. Enhanced Store Functions
Addition of task management functions to the Store module:
# Add a new task
def add_task(store_name, task) do
update_tasks(store_name, fn tasks -> [task | tasks] end)
end
# Update a task
def update_task(store_name, task_id, updates) do
update_tasks(store_name, fn tasks ->
Enum.map(tasks, fn task ->
if task.id == task_id, do: Map.merge(task, updates), else: task
end)
end)
end
# Update task progress
def update_task_progress(store_name, task_id, percentage, message \\ nil) do
update_task(store_name, task_id, %{
progress: percentage,
last_update: DateTime.utc_now(),
last_message: message
})
end
# Get all running tasks
def get_running_tasks(store_name) do
{:ok, tasks} = get(store_name, :tasks, [])
Enum.filter(tasks, &(&1.status == "running"))
end
# Helper function to update tasks list
defp update_tasks(store_name, updater) do
{:ok, tasks} = get(store_name, :tasks, [])
put(store_name, :tasks, updater.(tasks))
end
7. LLM Context Enhancement
Modified thinking handler to include task context:
def thinking_handler(%{type: :thinking} = signal, state) do
# Get running tasks
store_name = get_store_name(state)
running_tasks = Store.get_running_tasks(store_name)
# Include task context in LLM call
history = Store.get_llm_history(store_name)
# If there are running tasks, include their status in the context
llm_options =
if length(running_tasks) > 0 do
Map.put(state.llm_options || %{}, :context, %{
running_tasks: format_tasks_for_llm(running_tasks)
})
else
state.llm_options
end
# Call LLM with enhanced context
llm_result = LLMAgent.Plugin.call_llm(%{
"provider" => state.provider,
"messages" => history,
"tools" => state.available_tools,
"options" => llm_options
})
# Rest of the handler remains the same
# ...
end
# Format tasks for LLM consumption
defp format_tasks_for_llm(tasks) do
Enum.map(tasks, fn task ->
%{
id: task.id,
tool: task.tool_name,
progress: task.progress,
started_at: task.started_at,
elapsed_seconds: DateTime.diff(DateTime.utc_now(), task.started_at),
last_update: Map.get(task, :last_message)
}
end)
end
8. System Prompt Enhancement
Additional system prompt content to guide the LLM on task management:
system_prompt_addon = """
You can work with tools that run asynchronously for long operations. When using such tools:
1. You will receive progress updates that you should relay to the user
2. You can decide to wait for completion or work on other tasks in parallel
3. You can provide partial information while waiting for results
4. You should adapt based on the importance of the task and user needs
When multiple tasks are running:
- Prioritize addressing user questions about running tasks
- Consider providing updates on long-running tasks
- Suggest next steps while waiting for completion
"""
Implementation Plan
Phase 1: Core Components
- Define task-aware tool structure
- Implement task-related signals
- Enhance Store with task management functions
- Modify tool_handler to support async execution
Phase 2: Signal Handling
- Implement task_progress_handler
- Implement task_completed_handler
- Implement task_error_handler
- Enhance thinking_handler with task context
Phase 3: Testing & Integration
- Develop comprehensive test suite
- Create example implementations
- Update documentation
- Implement migration path from existing Tasks
Usage Examples
Defining Task-Aware Tools
tools = [
%{
name: "quick_search",
description: "Quick web search for recent information",
execute: &MyApp.Tools.quick_search/1
},
%{
name: "deep_analysis",
description: "Perform deep analysis on large datasets",
execute: &MyApp.Tools.analyze_data/1,
task_attributes: %{
async: true,
estimated_duration: :long,
provides_progress: true
}
}
]
system_prompt = """
You are an assistant capable of both quick searches and deep data analysis.
For deep analysis, please keep the user informed about progress.
"""
{flow, state} = LLMAgent.Flows.conversation(system_prompt, tools)
Task Implementation With Progress
def analyze_data(args) do
# Extract dataset and parameters
dataset = args["dataset"]
params = args["parameters"]
# Get task context
task_id = Process.get(:current_task_id)
store_name = Process.get(:store_name)
try
# Initial processing
send_progress(10, "Loading dataset", task_id, store_name)
# Data preparation
send_progress(30, "Preparing data", task_id, store_name)
# Main analysis
send_progress(60, "Running analysis algorithms", task_id, store_name)
# Result compilation
send_progress(90, "Compiling results", task_id, store_name)
# Final results
result = %{
summary: "Analysis complete",
key_metrics: %{...},
trends: [...],
recommendations: [...]
}
# Return final result
%{
status: "success",
result: result
}
catch
kind, error ->
%{
status: "error",
error: Exception.format(kind, error, __STACKTRACE__)
}
end
end
defp send_progress(percentage, message, task_id, store_name) do
# Create progress signal
progress_signal = Signals.task_progress(task_id, "deep_analysis", percentage, message)
# Send to process running the flow
send(Process.get(:flow_process), {:task_progress, progress_signal})
# Also update store directly
Store.update_task_progress(store_name, task_id, percentage, message)
# Add some delay to simulate work
Process.sleep(1000)
end
Migration Strategy
- Identify existing Tasks module usage in the codebase
- Create adapter functions to maintain backward compatibility
- Update examples and tests to use the new approach
- Document changes and migration path
- Deprecate standalone Tasks module functions in favor of integrated approach
Testing Strategy
-
Unit tests for each new component:
- Task signal creators
- Task-specific handlers
- Store task management functions
-
Integration tests for end-to-end flows:
- Asynchronous tool execution
- Progress reporting
- Task cancellation
- Parallel task execution
-
Mock LLM tests:
- Verify LLM receives task context
- Test LLM response to task updates
Conclusion
This redesign transforms the Tasks module from a standalone system into an integrated part of the Flow execution model. By making tools task-aware rather than creating a separate task concept, the system becomes more intuitive, flexible, and aligned with the dynamic nature of LLM-driven workflows.
The key benefits include:
- Simplified Mental Model: Developers can think in terms of tools that may be quick or long-running
- Improved User Experience: Progress updates and parallel execution
- LLM-Aware Design: Leverages the LLM's context understanding for better task management
- Architectural Consistency: Maintains the signal-driven design pattern
This approach addresses the limitations of the current Tasks module while enhancing the system's ability to handle complex, long-running operations in a natural way.