gglib_core/domain/agent/
events.rs

1//! [`AgentEvent`] and [`LlmStreamEvent`] — observable events in the agentic loop.
2
3use serde::Serialize;
4
5use super::tool_types::{ToolCall, ToolResult};
6
7// =============================================================================
8// Agent events (SSE stream units)
9// =============================================================================
10
11/// An observable event emitted by the agentic loop.
12///
13/// These events are the unit of SSE emission: every state change in the loop
14/// produces exactly one variant. Axum SSE handlers serialise these to
15/// `data: <json>\n\n` frames; CLI consumers may log or render them directly.
16///
17/// # Serde tag
18///
19/// `#[serde(tag = "type", rename_all = "snake_case")]` produces e.g.
20/// `{"type":"tool_call_start","tool_call":{...}}`.
21#[derive(Debug, Clone, Serialize)]
22#[serde(tag = "type", rename_all = "snake_case")]
23pub enum AgentEvent {
24    /// An incremental text fragment from the model's response.
25    TextDelta {
26        /// The new text fragment (append to the current buffer).
27        content: String,
28    },
29
30    /// An incremental reasoning/thinking fragment from the model (`CoT` tokens).
31    ///
32    /// Emitted by reasoning-capable models (e.g. `DeepSeek` R1, `QwQ`) that expose
33    /// their chain-of-thought via a separate `reasoning_content` SSE field.
34    /// These fragments are forwarded to the UI as they arrive but are **not**
35    /// included in the conversation history sent back to the model.
36    ReasoningDelta {
37        /// The new reasoning fragment (append to the current reasoning buffer).
38        content: String,
39    },
40
41    /// The model has requested execution of a tool.
42    ToolCallStart {
43        /// The tool call that is about to be dispatched.
44        tool_call: ToolCall,
45        /// Human-readable tool name (prefix stripped, title-cased).
46        display_name: String,
47        /// One-line argument summary (e.g. file path, search pattern).
48        args_summary: Option<String>,
49    },
50
51    /// A tool execution has completed (success or failure).
52    ToolCallComplete {
53        /// Name of the tool that was executed.
54        tool_name: String,
55        /// The outcome of the tool execution.
56        result: ToolResult,
57        /// Time spent waiting for a concurrency permit (semaphore), in milliseconds.
58        wait_ms: u64,
59        /// Wall-clock time taken to execute the tool after acquiring the permit,
60        /// in milliseconds.
61        execute_duration_ms: u64,
62        /// Human-readable tool name (prefix stripped, title-cased).
63        display_name: String,
64        /// Human-readable duration (e.g. "125ms", "1.5s").
65        duration_display: String,
66    },
67
68    /// One full LLM→tool-execution cycle has completed.
69    IterationComplete {
70        /// The 1-based iteration index that just finished.
71        iteration: usize,
72        /// Number of tool calls executed during this iteration.
73        tool_calls: usize,
74    },
75
76    /// The loop has concluded and produced a definitive answer.
77    FinalAnswer {
78        /// The complete final response text.
79        content: String,
80    },
81
82    /// Prompt-processing progress from the LLM backend.
83    ///
84    /// Emitted during the pre-fill phase when llama-server is streaming
85    /// `prompt_progress` frames.  Surfaces token-level progress so the UI
86    /// can show "processing prompt: 2048 / 8192 tokens".
87    PromptProgress {
88        /// Number of tokens processed so far.
89        processed: u32,
90        /// Total number of tokens in the prompt.
91        total: u32,
92        /// Number of tokens served from KV cache (already processed).
93        cached: u32,
94        /// Elapsed wall-clock time in milliseconds since processing began.
95        time_ms: u64,
96    },
97
98    /// A non-fatal system-level warning surfaced by the loop itself.
99    ///
100    /// Unlike [`AgentEvent::Error`], a `SystemWarning` does **not** terminate
101    /// the loop — it informs the user that the loop encountered a recoverable
102    /// condition (e.g. the model requested more parallel tool calls than the
103    /// configured limit, and the loop is auto-retrying with a synthetic
104    /// error fed back to the model).
105    ///
106    /// `suggested_action`, when present, contains an actionable hint the UI
107    /// can render verbatim (e.g. a CLI command to permanently raise a limit).
108    SystemWarning {
109        /// Human-readable description of the recoverable condition.
110        message: String,
111        /// Optional actionable hint (e.g. CLI command) the UI can show to the
112        /// user to permanently address the cause.
113        #[serde(skip_serializing_if = "Option::is_none")]
114        suggested_action: Option<String>,
115    },
116
117    /// A fatal error has terminated the loop.
118    Error {
119        /// Human-readable description of the failure.
120        message: String,
121    },
122}
123
124// =============================================================================
125// LLM stream events (consumed by LlmCompletionPort implementors)
126// =============================================================================
127
128/// A single event produced by a streaming LLM response.
129///
130/// These low-level events are the currency of [`crate::ports::LlmCompletionPort`];
131/// they are parsed by adapter crates from raw SSE frames and handed to
132/// `gglib-agent`'s stream collector, which:
133///
134/// - Forwards [`TextDelta`](LlmStreamEvent::TextDelta) items directly to the
135///   caller's [`AgentEvent`] channel so text appears in real time.
136/// - Accumulates [`ToolCallDelta`](LlmStreamEvent::ToolCallDelta) fragments
137///   until the stream ends, then assembles them into [`ToolCall`] values.
138/// - Waits for [`Done`](LlmStreamEvent::Done) before triggering tool execution.
139#[derive(Debug, Clone, PartialEq, Eq)]
140pub enum LlmStreamEvent {
141    /// An incremental text fragment from the model's response.
142    TextDelta {
143        /// The new text fragment (append to the running content buffer).
144        content: String,
145    },
146
147    /// An incremental reasoning/thinking fragment (`CoT` tokens).
148    ///
149    /// Produced by reasoning-capable models (e.g. `DeepSeek` R1, `QwQ`) when
150    /// llama-server is started with `--reasoning-format deepseek`.  The
151    /// runtime adapter maps `delta["reasoning_content"]` frames to this
152    /// variant; the stream collector forwards them as
153    /// [`AgentEvent::ReasoningDelta`] and accumulates them in a separate
154    /// buffer that is never sent back to the LLM as context.
155    ReasoningDelta {
156        /// The new reasoning fragment (append to the current reasoning buffer).
157        content: String,
158    },
159
160    /// An incremental fragment of a tool-call request.
161    ///
162    /// The adapter crate streams these before the model has finished
163    /// generating the full arguments JSON. The stream collector accumulates
164    /// all deltas for a given `index` into a single [`ToolCall`].
165    ToolCallDelta {
166        /// Zero-based index of the tool call within the current response.
167        index: usize,
168        /// Call identifier (only present in the first delta for this index).
169        id: Option<String>,
170        /// Tool name (only present in the first delta for this index).
171        name: Option<String>,
172        /// Partial arguments JSON string fragment (accumulate with `push_str`).
173        arguments: Option<String>,
174    },
175
176    /// Prompt-processing progress from llama-server.
177    ///
178    /// Emitted when the request includes `return_progress: true`.  These
179    /// frames arrive during the pre-fill phase (before any `TextDelta`),
180    /// giving real-time visibility into how far along token ingestion is.
181    PromptProgress {
182        /// Number of tokens processed so far.
183        processed: u32,
184        /// Total number of tokens in the prompt.
185        total: u32,
186        /// Number of tokens served from KV cache (already processed).
187        cached: u32,
188        /// Elapsed wall-clock time in milliseconds since processing began.
189        time_ms: u64,
190    },
191
192    /// Signals the end of the stream.
193    ///
194    /// Every conforming stream must end with exactly one `Done` item.
195    Done {
196        /// The OpenAI-compatible finish reason (e.g. `"stop"`, `"tool_calls"`,
197        /// `"length"`).
198        finish_reason: String,
199    },
200}
201
202// =============================================================================
203// Channel sizing
204// =============================================================================
205
206/// [`tokio::sync::mpsc`] channel capacity for streaming [`AgentEvent`]s
207/// produced by a single [`crate::ports::AgentLoopPort::run`] call.
208///
209/// Sized so that a full run at the **maximum ceiling configuration**
210/// (`MAX_ITERATIONS_CEILING` × (`MAX_PARALLEL_TOOLS_CEILING` × 2 + 1) + 1
211/// structural events ≈ 5 051 with `MAX_ITERATIONS_CEILING = 50` and
212/// `MAX_PARALLEL_TOOLS_CEILING = 50`) fits without back-pressure on the hot
213/// streaming path.  Any value ≥ 5 051 satisfies the structural ceiling test;
214/// 8 192 leaves comfortable headroom for `TextDelta` bursts.
215///
216/// Filling the channel causes `tx.send().await` to back-pressure on every
217/// token, with measurable latency impact, so the constant is set well above
218/// the hard floor.
219///
220/// 8 192 fits comfortably in under a megabyte of memory per active agent
221/// session.
222///
223/// All callers (SSE handlers, CLI REPL) should use this constant instead of
224/// a magic literal.
225pub const AGENT_EVENT_CHANNEL_CAPACITY: usize = 8_192;
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230
231    #[test]
232    fn agent_event_serde_tag_matches_wire_format() {
233        let evt = AgentEvent::FinalAnswer {
234            content: "done".into(),
235        };
236        let json = serde_json::to_value(&evt).unwrap();
237        assert_eq!(json["type"], "final_answer");
238        assert_eq!(json["content"], "done");
239    }
240
241    #[test]
242    fn tool_call_start_serialises_correctly() {
243        let evt = AgentEvent::ToolCallStart {
244            tool_call: ToolCall {
245                id: "c1".into(),
246                name: "search".into(),
247                arguments: serde_json::json!({"q": "rust"}),
248            },
249            display_name: "Search".into(),
250            args_summary: None,
251        };
252        let json = serde_json::to_value(&evt).unwrap();
253        assert_eq!(json["type"], "tool_call_start");
254        assert_eq!(json["tool_call"]["name"], "search");
255    }
256
257    /// [`AGENT_EVENT_CHANNEL_CAPACITY`] must be positive and must be at least
258    /// large enough for a full run at the maximum ceiling configuration
259    /// (`MAX_ITERATIONS_CEILING` × (`MAX_PARALLEL_TOOLS_CEILING` × 2 + 1) + 1
260    /// structural events), so that back-pressure never occurs on the hot
261    /// streaming path for any valid configuration.
262    #[test]
263    fn agent_event_channel_capacity_is_sufficient_for_max_config() {
264        use super::super::config::{MAX_ITERATIONS_CEILING, MAX_PARALLEL_TOOLS_CEILING};
265
266        // Minimum structural events for a run at ceiling config
267        // (no TextDelta headroom included — this is the hard lower bound).
268        let structural_per_iter = MAX_PARALLEL_TOOLS_CEILING * 2 + 1;
269        let minimum_structural = MAX_ITERATIONS_CEILING * structural_per_iter + 1;
270        assert!(
271            AGENT_EVENT_CHANNEL_CAPACITY >= minimum_structural,
272            "AGENT_EVENT_CHANNEL_CAPACITY ({AGENT_EVENT_CHANNEL_CAPACITY}) is smaller than \
273             the minimum required for ceiling config ({minimum_structural}); \
274             increase the constant"
275        );
276    }
277}