Skip to content

LLMAgent Simple Rate Limiter Based On Plugin - Implementation Guide #5

Open
@madawei2699

Description

@madawei2699

LLMAgent Simple Rate Limiter Based On Plugin - Implementation Guide

Overview

This document describes a lightweight rate limiting implementation for the LLMAgent Plugin module. The rate limiter provides basic protection against excessive API usage while maintaining the minimalist design philosophy of LLMAgent.

Design Principles

  • Simplicity: No external dependencies, minimal code footprint
  • Process-based: Uses process dictionary for state storage
  • Lightweight: Adds minimal overhead to API calls
  • Developer-friendly: Simple interface that wraps existing functionality

Implementation Specification

1. Rate Limiter Module

Create a new module LLMAgent.Plugin.RateLimiter with the following functionality:

defmodule LLMAgent.Plugin.RateLimiter do
  @moduledoc """
  A minimalist rate limiter for LLM API calls.
  
  Provides basic protection against excessive API usage with:
  - Requests per minute limits
  - Daily token usage limits
  
  Uses process dictionary for tracking usage, making it lightweight
  but process-specific.
  """
  
  # Default rate limits
  @default_limits %{
    requests_per_minute: 20,
    tokens_per_day: 10_000
  }
  
  # Process dictionary key
  @rate_bucket :llm_agent_rate_bucket
  
  @doc """
  Calls an LLM with rate limiting applied.
  
  Wraps the standard LLMAgent.Plugin.call_llm/1 function with rate limiting
  checks based on configured limits.
  
  ## Parameters
  
  - `params` - The parameters to pass to call_llm
  - `opts` - Options including custom rate limits
  
  ## Options
  
  - `:limits` - Custom rate limits map (optional)
  
  ## Returns
  
  Same return value as LLMAgent.Plugin.call_llm/1 or an error if rate limited.
  
  ## Examples
  
      # With default limits
      LLMAgent.Plugin.RateLimiter.with_rate_limit(%{
        "provider" => :openai,
        "messages" => messages
      })
      
      # With custom limits
      LLMAgent.Plugin.RateLimiter.with_rate_limit(params, 
        limits: %{
          requests_per_minute: 10,
          tokens_per_day: 5_000
        }
      )
  """
  def with_rate_limit(params, opts \\ []) do
    # Initialize and get bucket
    init_bucket()
    
    # Get custom limits or use defaults
    limits = Keyword.get(opts, :limits, @default_limits)
    
    # Check if we're within limits
    case check_limits(limits) do
      :ok ->
        # Not rate limited, make the actual call
        result = LLMAgent.Plugin.call_llm(params)
        
        # Record usage
        record_usage(params, result)
        
        # Return original result
        result
        
      {:error, reason} ->
        # Rate limited, return error
        {:error, %{
          error: "rate_limit_exceeded",
          message: reason
        }}
    end
  end
  
  # Initialize the rate bucket if needed
  defp init_bucket do
    bucket = Process.get(@rate_bucket)
    
    if is_nil(bucket) do
      # Create new bucket with initial values
      bucket = %{
        minute_start: current_minute(),
        minute_count: 0,
        day_start: current_day(),
        day_tokens: 0
      }
      
      Process.put(@rate_bucket, bucket)
    end
    
    # Reset expired counters
    reset_expired_counters()
  end
  
  # Check if we've exceeded any limits
  defp check_limits(limits) do
    bucket = Process.get(@rate_bucket)
    
    cond do
      # Check requests per minute limit
      bucket.minute_count >= limits.requests_per_minute ->
        seconds_to_next_minute = 60 - :calendar.local_time() |> elem(1) |> elem(1)
        {:error, "Rate limit exceeded: maximum requests per minute reached. Retry after #{seconds_to_next_minute} seconds."}
        
      # Check daily token limit
      bucket.day_tokens >= limits.tokens_per_day ->
        {:error, "Rate limit exceeded: maximum daily token usage reached."}
        
      # All limits are fine
      true ->
        :ok
    end
  end
  
  # Record API usage after a successful call
  defp record_usage(params, result) do
    bucket = Process.get(@rate_bucket)
    
    # Increment request count
    bucket = %{bucket | minute_count: bucket.minute_count + 1}
    
    # Estimate token usage
    token_count = estimate_tokens(params, result)
    bucket = %{bucket | day_tokens: bucket.day_tokens + token_count}
    
    # Save updated bucket
    Process.put(@rate_bucket, bucket)
  end
  
  # Reset counters if their time periods have expired
  defp reset_expired_counters do
    bucket = Process.get(@rate_bucket)
    current_min = current_minute()
    current_d = current_day()
    
    # Reset minute counter if we're in a new minute
    bucket = 
      if bucket.minute_start != current_min do
        %{bucket | minute_start: current_min, minute_count: 0}
      else
        bucket
      end
      
    # Reset daily counter if we're in a new day
    bucket = 
      if bucket.day_start != current_d do
        %{bucket | day_start: current_d, day_tokens: 0}
      else
        bucket
      end
      
    Process.put(@rate_bucket, bucket)
  end
  
  # Get current minute timestamp
  defp current_minute do
    {{y, m, d}, {h, min, _}} = :calendar.local_time()
    "#{y}-#{m}-#{d}-#{h}-#{min}"
  end
  
  # Get current day timestamp
  defp current_day do
    {{y, m, d}, _} = :calendar.local_time()
    "#{y}-#{m}-#{d}"
  end
  
  # Estimate token usage from request and response
  defp estimate_tokens(params, result) do
    # Estimate input tokens (approx. 1 token per 4 chars)
    input_tokens = 
      params
      |> get_in(["messages"])
      |> Enum.reduce(0, fn msg, acc ->
        content = get_message_content(msg)
        acc + div(String.length(content), 4)
      end)
    
    # Estimate output tokens
    output_tokens = estimate_output_tokens(result)
    
    # Return total estimated tokens
    input_tokens + output_tokens
  end
  
  # Extract message content from various formats
  defp get_message_content(message) when is_map(message) do
    cond do
      is_binary(message["content"]) -> message["content"]
      is_binary(message[:content]) -> message[:content]
      true -> ""
    end
  end
  
  defp get_message_content(_), do: ""
  
  # Estimate tokens in the output/response
  defp estimate_output_tokens(result) do
    case result do
      {:ok, response} when is_map(response) ->
        # Try to extract content from various response formats
        cond do
          # OpenAI-style usage info is available
          is_map(response["usage"]) and is_integer(response["usage"]["completion_tokens"]) ->
            response["usage"]["completion_tokens"]
            
          # Try to extract from content
          is_binary(response["content"]) ->
            div(String.length(response["content"]), 4)
            
          # Try to extract from choices array
          is_list(response["choices"]) and length(response["choices"]) > 0 ->
            choice = List.first(response["choices"])
            content = get_in(choice, ["message", "content"]) || ""
            div(String.length(content), 4)
            
          # Default estimation
          true ->
            100
        end
        
      # Default estimation for other cases
      _ ->
        100
    end
  end
end

2. Plugin Documentation Enhancement

Update the LLMAgent.Plugin module documentation to include information about the rate limiter:

@moduledoc """
Implements the AgentForge.Plugin behavior for LLM integrations.

This module provides a plugin implementation that registers LLM-specific tools
and allows LLMAgent to integrate with different LLM providers.

## Rate Limiting

To protect against excessive API usage, you can use the rate limiter module:

# Use with default limits (20 requests/minute, 10,000 tokens/day)
LLMAgent.Plugin.RateLimiter.with_rate_limit(%{
  "provider" => :openai,
  "messages" => messages
})

# Use with custom limits
LLMAgent.Plugin.RateLimiter.with_rate_limit(params, 
  limits: %{
    requests_per_minute: 10,
    tokens_per_day: 5_000
  }
)

Note that the rate limiter uses process dictionary for storage, so limits are specific to the current process.

3. Usage Examples

Create a simple example demonstrating how to use the rate limiter:

defmodule LLMAgent.Examples.RateLimitedExample do
  @moduledoc """
  Example showing how to use LLMAgent with rate limiting.
  """
  
  def run do
    # Define a system prompt for our agent
    system_prompt = "You are a helpful assistant."
    
    # Create a flow
    {flow, initial_state} = LLMAgent.Flows.conversation(system_prompt, [])
    
    # Process a message with rate limiting
    process_with_rate_limit(flow, initial_state, "Hello! How are you?")
  end
  
  def process_with_rate_limit(flow, state, message) do
    # Create a user message signal
    signal = LLMAgent.Signals.user_message(message)
    
    # Get the handler function from the flow
    case flow.(signal, state) do
      {:emit, %{type: :thinking} = thinking_signal, new_state} ->
        # We need to call LLM for thinking
        # Wrap the original Plugin.call_llm with our rate limiter
        original_call_llm = &LLMAgent.Plugin.call_llm/1
        
        # Replace the call_llm function with our rate-limited version
        new_call_llm = fn params ->
          LLMAgent.Plugin.RateLimiter.with_rate_limit(params)
        end
        
        # Apply the replacement
        :meck.new(LLMAgent.Plugin, [:passthrough])
        :meck.expect(LLMAgent.Plugin, :call_llm, new_call_llm)
        
        # Continue processing with the flow
        result = flow.(thinking_signal, new_state)
        
        # Clean up the mock
        :meck.unload(LLMAgent.Plugin)
        
        # Return the result
        result
        
      other ->
        # For other signal types, continue normally
        other
    end
  end
end

Integration Steps

  1. Create the Rate Limiter Module

    • Add the LLMAgent.Plugin.RateLimiter module
    • Implement all helper functions
    • Test with basic usage
  2. Update Documentation

    • Enhance the Plugin module documentation
    • Add inline documentation to the rate limiter functions
  3. Add Usage Example

    • Implement the example
    • Test with various rate limit settings
  4. Implement Monitoring (Optional)

    • Add basic logging of rate limiting events
    • Create a simple function to query current usage

Testing Guidelines

Test the rate limiter with the following scenarios:

  1. Basic Functionality

    • Verify requests within limits succeed
    • Verify requests exceeding per-minute limit are blocked
    • Verify requests exceeding daily token limit are blocked
  2. Counter Reset Logic

    • Test minute counter reset works correctly
    • Test daily counter reset works correctly
  3. Token Estimation

    • Test token estimation with various message sizes
    • Verify estimation is reasonably accurate

Limitations

This implementation has some intentional limitations to maintain simplicity:

  1. Process-Specific: Limits apply to each process individually, not globally
  2. No Persistence: Counters reset when the process terminates
  3. Approximate Token Counting: Uses character-based estimation
  4. Simple Time Windows: Uses fixed minute/day windows, not sliding windows

These limitations are acceptable for personal projects and development usage. For production systems with critical rate limiting needs, consider a more robust implementation.

Conclusion

This minimalist rate limiter provides basic protection against accidental API overuse while maintaining the lightweight design philosophy of LLMAgent. It's easy to integrate, requires no external dependencies, and adds minimal overhead to API calls.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions