gglib_core/domain/agent/
events.rs

1//! [`AgentEvent`] and [`LlmStreamEvent`] — observable events in the agentic loop.
2
3use serde::Serialize;
4
5use super::tool_types::{ToolCall, ToolResult};
6use crate::normalize::NormalizationErrorKind;
7
8// =============================================================================
9// Agent events (SSE stream units)
10// =============================================================================
11
12/// An observable event emitted by the agentic loop.
13///
14/// These events are the unit of SSE emission: every state change in the loop
15/// produces exactly one variant. Axum SSE handlers serialise these to
16/// `data: <json>\n\n` frames; CLI consumers may log or render them directly.
17///
18/// # Serde tag
19///
20/// `#[serde(tag = "type", rename_all = "snake_case")]` produces e.g.
21/// `{"type":"tool_call_start","tool_call":{...}}`.
22#[derive(Debug, Clone, Serialize)]
23#[serde(tag = "type", rename_all = "snake_case")]
24pub enum AgentEvent {
25    /// An incremental text fragment from the model's response.
26    TextDelta {
27        /// The new text fragment (append to the current buffer).
28        content: String,
29    },
30
31    /// An incremental reasoning/thinking fragment from the model (`CoT` tokens).
32    ///
33    /// Emitted by reasoning-capable models (e.g. `DeepSeek` R1, `QwQ`) that expose
34    /// their chain-of-thought via a separate `reasoning_content` SSE field.
35    /// These fragments are forwarded to the UI as they arrive but are **not**
36    /// included in the conversation history sent back to the model.
37    ReasoningDelta {
38        /// The new reasoning fragment (append to the current reasoning buffer).
39        content: String,
40    },
41
42    /// The model has requested execution of a tool.
43    ToolCallStart {
44        /// The tool call that is about to be dispatched.
45        tool_call: ToolCall,
46        /// Human-readable tool name (prefix stripped, title-cased).
47        display_name: String,
48        /// One-line argument summary (e.g. file path, search pattern).
49        args_summary: Option<String>,
50    },
51
52    /// A tool execution has completed (success or failure).
53    ToolCallComplete {
54        /// Name of the tool that was executed.
55        tool_name: String,
56        /// The outcome of the tool execution.
57        result: ToolResult,
58        /// Time spent waiting for a concurrency permit (semaphore), in milliseconds.
59        wait_ms: u64,
60        /// Wall-clock time taken to execute the tool after acquiring the permit,
61        /// in milliseconds.
62        execute_duration_ms: u64,
63        /// Human-readable tool name (prefix stripped, title-cased).
64        display_name: String,
65        /// Human-readable duration (e.g. "125ms", "1.5s").
66        duration_display: String,
67    },
68
69    /// One full LLM→tool-execution cycle has completed.
70    IterationComplete {
71        /// The 1-based iteration index that just finished.
72        iteration: usize,
73        /// Number of tool calls executed during this iteration.
74        tool_calls: usize,
75    },
76
77    /// The loop has concluded and produced a definitive answer.
78    FinalAnswer {
79        /// The complete final response text.
80        content: String,
81    },
82
83    /// Prompt-processing progress from the LLM backend.
84    ///
85    /// Emitted during the pre-fill phase when llama-server is streaming
86    /// `prompt_progress` frames.  Surfaces token-level progress so the UI
87    /// can show "processing prompt: 2048 / 8192 tokens".
88    PromptProgress {
89        /// Number of tokens processed so far.
90        processed: u32,
91        /// Total number of tokens in the prompt.
92        total: u32,
93        /// Number of tokens served from KV cache (already processed).
94        cached: u32,
95        /// Elapsed wall-clock time in milliseconds since processing began.
96        time_ms: u64,
97    },
98
99    /// A non-fatal system-level warning surfaced by the loop itself.
100    ///
101    /// Unlike [`AgentEvent::Error`], a `SystemWarning` does **not** terminate
102    /// the loop — it informs the user that the loop encountered a recoverable
103    /// condition (e.g. the model requested more parallel tool calls than the
104    /// configured limit, and the loop is auto-retrying with a synthetic
105    /// error fed back to the model).
106    ///
107    /// `suggested_action`, when present, contains an actionable hint the UI
108    /// can render verbatim (e.g. a CLI command to permanently raise a limit).
109    SystemWarning {
110        /// Human-readable description of the recoverable condition.
111        message: String,
112        /// Optional actionable hint (e.g. CLI command) the UI can show to the
113        /// user to permanently address the cause.
114        #[serde(skip_serializing_if = "Option::is_none")]
115        suggested_action: Option<String>,
116    },
117
118    /// A fatal error has terminated the loop.
119    Error {
120        /// Human-readable description of the failure.
121        message: String,
122    },
123}
124
125// =============================================================================
126// LLM stream events (consumed by LlmCompletionPort implementors)
127// =============================================================================
128
129/// A single event produced by a streaming LLM response.
130///
131/// These low-level events are the currency of [`crate::ports::LlmCompletionPort`];
132/// they are parsed by adapter crates from raw SSE frames and handed to
133/// `gglib-agent`'s stream collector, which:
134///
135/// - Forwards [`TextDelta`](LlmStreamEvent::TextDelta) items directly to the
136///   caller's [`AgentEvent`] channel so text appears in real time.
137/// - Accumulates [`ToolCallDelta`](LlmStreamEvent::ToolCallDelta) fragments
138///   until the stream ends, then assembles them into [`ToolCall`] values.
139/// - Waits for [`Done`](LlmStreamEvent::Done) before triggering tool execution.
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub enum LlmStreamEvent {
142    /// An incremental text fragment from the model's response.
143    TextDelta {
144        /// The new text fragment (append to the running content buffer).
145        content: String,
146    },
147
148    /// An incremental reasoning/thinking fragment (`CoT` tokens).
149    ///
150    /// Produced by reasoning-capable models (e.g. `DeepSeek` R1, `QwQ`) when
151    /// llama-server is started with `--reasoning-format deepseek`.  The
152    /// runtime adapter maps `delta["reasoning_content"]` frames to this
153    /// variant; the stream collector forwards them as
154    /// [`AgentEvent::ReasoningDelta`] and accumulates them in a separate
155    /// buffer that is never sent back to the LLM as context.
156    ReasoningDelta {
157        /// The new reasoning fragment (append to the current reasoning buffer).
158        content: String,
159    },
160
161    /// An incremental fragment of a tool-call request.
162    ///
163    /// The adapter crate streams these before the model has finished
164    /// generating the full arguments JSON. The stream collector accumulates
165    /// all deltas for a given `index` into a single [`ToolCall`].
166    ToolCallDelta {
167        /// Zero-based index of the tool call within the current response.
168        index: usize,
169        /// Call identifier (only present in the first delta for this index).
170        id: Option<String>,
171        /// Tool name (only present in the first delta for this index).
172        name: Option<String>,
173        /// Partial arguments JSON string fragment (accumulate with `push_str`).
174        arguments: Option<String>,
175    },
176
177    /// Prompt-processing progress from llama-server.
178    ///
179    /// Emitted when the request includes `return_progress: true`.  These
180    /// frames arrive during the pre-fill phase (before any `TextDelta`),
181    /// giving real-time visibility into how far along token ingestion is.
182    PromptProgress {
183        /// Number of tokens processed so far.
184        processed: u32,
185        /// Total number of tokens in the prompt.
186        total: u32,
187        /// Number of tokens served from KV cache (already processed).
188        cached: u32,
189        /// Elapsed wall-clock time in milliseconds since processing began.
190        time_ms: u64,
191    },
192
193    /// Signals the end of the stream.
194    ///
195    /// Every conforming stream must end with exactly one `Done` item.
196    Done {
197        /// The OpenAI-compatible finish reason (e.g. `"stop"`, `"tool_calls"`,
198        /// `"length"`).
199        finish_reason: String,
200    },
201
202    /// A non-fatal normalization issue surfaced by the
203    /// [`crate::normalize`] layer.
204    ///
205    /// Emitted when a dialect-specific parser detects malformed markup
206    /// (e.g. a Qwen `<tool_call>` whose body is not valid JSON, or a tag
207    /// that the stream ended without closing).  The stream is **not**
208    /// aborted; the offending bytes are simply discarded or surfaced via
209    /// this event so consumers can log diagnostics.
210    NormalizationError {
211        /// Structured detail about what went wrong.
212        kind: NormalizationErrorKind,
213        /// Short, human-readable excerpt of the offending input.
214        raw: String,
215    },
216}
217
218// =============================================================================
219// Channel sizing
220// =============================================================================
221
222/// [`tokio::sync::mpsc`] channel capacity for streaming [`AgentEvent`]s
223/// produced by a single [`crate::ports::AgentLoopPort::run`] call.
224///
225/// Sized so that a full run at the **maximum ceiling configuration**
226/// (`MAX_ITERATIONS_CEILING` × (`MAX_PARALLEL_TOOLS_CEILING` × 2 + 1) + 1
227/// structural events ≈ 5 051 with `MAX_ITERATIONS_CEILING = 50` and
228/// `MAX_PARALLEL_TOOLS_CEILING = 50`) fits without back-pressure on the hot
229/// streaming path.  Any value ≥ 5 051 satisfies the structural ceiling test;
230/// 8 192 leaves comfortable headroom for `TextDelta` bursts.
231///
232/// Filling the channel causes `tx.send().await` to back-pressure on every
233/// token, with measurable latency impact, so the constant is set well above
234/// the hard floor.
235///
236/// 8 192 fits comfortably in under a megabyte of memory per active agent
237/// session.
238///
239/// All callers (SSE handlers, CLI REPL) should use this constant instead of
240/// a magic literal.
241pub const AGENT_EVENT_CHANNEL_CAPACITY: usize = 8_192;
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246
247    #[test]
248    fn agent_event_serde_tag_matches_wire_format() {
249        let evt = AgentEvent::FinalAnswer {
250            content: "done".into(),
251        };
252        let json = serde_json::to_value(&evt).unwrap();
253        assert_eq!(json["type"], "final_answer");
254        assert_eq!(json["content"], "done");
255    }
256
257    #[test]
258    fn tool_call_start_serialises_correctly() {
259        let evt = AgentEvent::ToolCallStart {
260            tool_call: ToolCall {
261                id: "c1".into(),
262                name: "search".into(),
263                arguments: serde_json::json!({"q": "rust"}),
264            },
265            display_name: "Search".into(),
266            args_summary: None,
267        };
268        let json = serde_json::to_value(&evt).unwrap();
269        assert_eq!(json["type"], "tool_call_start");
270        assert_eq!(json["tool_call"]["name"], "search");
271    }
272
273    /// [`AGENT_EVENT_CHANNEL_CAPACITY`] must be positive and must be at least
274    /// large enough for a full run at the maximum ceiling configuration
275    /// (`MAX_ITERATIONS_CEILING` × (`MAX_PARALLEL_TOOLS_CEILING` × 2 + 1) + 1
276    /// structural events), so that back-pressure never occurs on the hot
277    /// streaming path for any valid configuration.
278    #[test]
279    fn agent_event_channel_capacity_is_sufficient_for_max_config() {
280        use super::super::config::{MAX_ITERATIONS_CEILING, MAX_PARALLEL_TOOLS_CEILING};
281
282        // Minimum structural events for a run at ceiling config
283        // (no TextDelta headroom included — this is the hard lower bound).
284        let structural_per_iter = MAX_PARALLEL_TOOLS_CEILING * 2 + 1;
285        let minimum_structural = MAX_ITERATIONS_CEILING * structural_per_iter + 1;
286        assert!(
287            AGENT_EVENT_CHANNEL_CAPACITY >= minimum_structural,
288            "AGENT_EVENT_CHANNEL_CAPACITY ({AGENT_EVENT_CHANNEL_CAPACITY}) is smaller than \
289             the minimum required for ceiling config ({minimum_structural}); \
290             increase the constant"
291        );
292    }
293}