gglib_core/domain/agent/events.rs
1//! [`AgentEvent`] and [`LlmStreamEvent`] — observable events in the agentic loop.
2
3use serde::Serialize;
4
5use super::tool_types::{ToolCall, ToolResult};
6use crate::normalize::NormalizationErrorKind;
7
8// =============================================================================
9// Agent events (SSE stream units)
10// =============================================================================
11
12/// An observable event emitted by the agentic loop.
13///
14/// These events are the unit of SSE emission: every state change in the loop
15/// produces exactly one variant. Axum SSE handlers serialise these to
16/// `data: <json>\n\n` frames; CLI consumers may log or render them directly.
17///
18/// # Serde tag
19///
20/// `#[serde(tag = "type", rename_all = "snake_case")]` produces e.g.
21/// `{"type":"tool_call_start","tool_call":{...}}`.
22#[derive(Debug, Clone, Serialize)]
23#[serde(tag = "type", rename_all = "snake_case")]
24pub enum AgentEvent {
25 /// An incremental text fragment from the model's response.
26 TextDelta {
27 /// The new text fragment (append to the current buffer).
28 content: String,
29 },
30
31 /// An incremental reasoning/thinking fragment from the model (`CoT` tokens).
32 ///
33 /// Emitted by reasoning-capable models (e.g. `DeepSeek` R1, `QwQ`) that expose
34 /// their chain-of-thought via a separate `reasoning_content` SSE field.
35 /// These fragments are forwarded to the UI as they arrive but are **not**
36 /// included in the conversation history sent back to the model.
37 ReasoningDelta {
38 /// The new reasoning fragment (append to the current reasoning buffer).
39 content: String,
40 },
41
42 /// The model has requested execution of a tool.
43 ToolCallStart {
44 /// The tool call that is about to be dispatched.
45 tool_call: ToolCall,
46 /// Human-readable tool name (prefix stripped, title-cased).
47 display_name: String,
48 /// One-line argument summary (e.g. file path, search pattern).
49 args_summary: Option<String>,
50 },
51
52 /// A tool execution has completed (success or failure).
53 ToolCallComplete {
54 /// Name of the tool that was executed.
55 tool_name: String,
56 /// The outcome of the tool execution.
57 result: ToolResult,
58 /// Time spent waiting for a concurrency permit (semaphore), in milliseconds.
59 wait_ms: u64,
60 /// Wall-clock time taken to execute the tool after acquiring the permit,
61 /// in milliseconds.
62 execute_duration_ms: u64,
63 /// Human-readable tool name (prefix stripped, title-cased).
64 display_name: String,
65 /// Human-readable duration (e.g. "125ms", "1.5s").
66 duration_display: String,
67 },
68
69 /// One full LLM→tool-execution cycle has completed.
70 IterationComplete {
71 /// The 1-based iteration index that just finished.
72 iteration: usize,
73 /// Number of tool calls executed during this iteration.
74 tool_calls: usize,
75 },
76
77 /// The loop has concluded and produced a definitive answer.
78 FinalAnswer {
79 /// The complete final response text.
80 content: String,
81 },
82
83 /// Prompt-processing progress from the LLM backend.
84 ///
85 /// Emitted during the pre-fill phase when llama-server is streaming
86 /// `prompt_progress` frames. Surfaces token-level progress so the UI
87 /// can show "processing prompt: 2048 / 8192 tokens".
88 PromptProgress {
89 /// Number of tokens processed so far.
90 processed: u32,
91 /// Total number of tokens in the prompt.
92 total: u32,
93 /// Number of tokens served from KV cache (already processed).
94 cached: u32,
95 /// Elapsed wall-clock time in milliseconds since processing began.
96 time_ms: u64,
97 },
98
99 /// A non-fatal system-level warning surfaced by the loop itself.
100 ///
101 /// Unlike [`AgentEvent::Error`], a `SystemWarning` does **not** terminate
102 /// the loop — it informs the user that the loop encountered a recoverable
103 /// condition (e.g. the model requested more parallel tool calls than the
104 /// configured limit, and the loop is auto-retrying with a synthetic
105 /// error fed back to the model).
106 ///
107 /// `suggested_action`, when present, contains an actionable hint the UI
108 /// can render verbatim (e.g. a CLI command to permanently raise a limit).
109 SystemWarning {
110 /// Human-readable description of the recoverable condition.
111 message: String,
112 /// Optional actionable hint (e.g. CLI command) the UI can show to the
113 /// user to permanently address the cause.
114 #[serde(skip_serializing_if = "Option::is_none")]
115 suggested_action: Option<String>,
116 },
117
118 /// A fatal error has terminated the loop.
119 Error {
120 /// Human-readable description of the failure.
121 message: String,
122 },
123}
124
125// =============================================================================
126// LLM stream events (consumed by LlmCompletionPort implementors)
127// =============================================================================
128
129/// A single event produced by a streaming LLM response.
130///
131/// These low-level events are the currency of [`crate::ports::LlmCompletionPort`];
132/// they are parsed by adapter crates from raw SSE frames and handed to
133/// `gglib-agent`'s stream collector, which:
134///
135/// - Forwards [`TextDelta`](LlmStreamEvent::TextDelta) items directly to the
136/// caller's [`AgentEvent`] channel so text appears in real time.
137/// - Accumulates [`ToolCallDelta`](LlmStreamEvent::ToolCallDelta) fragments
138/// until the stream ends, then assembles them into [`ToolCall`] values.
139/// - Waits for [`Done`](LlmStreamEvent::Done) before triggering tool execution.
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub enum LlmStreamEvent {
142 /// An incremental text fragment from the model's response.
143 TextDelta {
144 /// The new text fragment (append to the running content buffer).
145 content: String,
146 },
147
148 /// An incremental reasoning/thinking fragment (`CoT` tokens).
149 ///
150 /// Produced by reasoning-capable models (e.g. `DeepSeek` R1, `QwQ`) when
151 /// llama-server is started with `--reasoning-format deepseek`. The
152 /// runtime adapter maps `delta["reasoning_content"]` frames to this
153 /// variant; the stream collector forwards them as
154 /// [`AgentEvent::ReasoningDelta`] and accumulates them in a separate
155 /// buffer that is never sent back to the LLM as context.
156 ReasoningDelta {
157 /// The new reasoning fragment (append to the current reasoning buffer).
158 content: String,
159 },
160
161 /// An incremental fragment of a tool-call request.
162 ///
163 /// The adapter crate streams these before the model has finished
164 /// generating the full arguments JSON. The stream collector accumulates
165 /// all deltas for a given `index` into a single [`ToolCall`].
166 ToolCallDelta {
167 /// Zero-based index of the tool call within the current response.
168 index: usize,
169 /// Call identifier (only present in the first delta for this index).
170 id: Option<String>,
171 /// Tool name (only present in the first delta for this index).
172 name: Option<String>,
173 /// Partial arguments JSON string fragment (accumulate with `push_str`).
174 arguments: Option<String>,
175 },
176
177 /// Prompt-processing progress from llama-server.
178 ///
179 /// Emitted when the request includes `return_progress: true`. These
180 /// frames arrive during the pre-fill phase (before any `TextDelta`),
181 /// giving real-time visibility into how far along token ingestion is.
182 PromptProgress {
183 /// Number of tokens processed so far.
184 processed: u32,
185 /// Total number of tokens in the prompt.
186 total: u32,
187 /// Number of tokens served from KV cache (already processed).
188 cached: u32,
189 /// Elapsed wall-clock time in milliseconds since processing began.
190 time_ms: u64,
191 },
192
193 /// Signals the end of the stream.
194 ///
195 /// Every conforming stream must end with exactly one `Done` item.
196 Done {
197 /// The OpenAI-compatible finish reason (e.g. `"stop"`, `"tool_calls"`,
198 /// `"length"`).
199 finish_reason: String,
200 },
201
202 /// A non-fatal normalization issue surfaced by the
203 /// [`crate::normalize`] layer.
204 ///
205 /// Emitted when a dialect-specific parser detects malformed markup
206 /// (e.g. a Qwen `<tool_call>` whose body is not valid JSON, or a tag
207 /// that the stream ended without closing). The stream is **not**
208 /// aborted; the offending bytes are simply discarded or surfaced via
209 /// this event so consumers can log diagnostics.
210 NormalizationError {
211 /// Structured detail about what went wrong.
212 kind: NormalizationErrorKind,
213 /// Short, human-readable excerpt of the offending input.
214 raw: String,
215 },
216}
217
218// =============================================================================
219// Channel sizing
220// =============================================================================
221
222/// [`tokio::sync::mpsc`] channel capacity for streaming [`AgentEvent`]s
223/// produced by a single [`crate::ports::AgentLoopPort::run`] call.
224///
225/// Sized so that a full run at the **maximum ceiling configuration**
226/// (`MAX_ITERATIONS_CEILING` × (`MAX_PARALLEL_TOOLS_CEILING` × 2 + 1) + 1
227/// structural events ≈ 5 051 with `MAX_ITERATIONS_CEILING = 50` and
228/// `MAX_PARALLEL_TOOLS_CEILING = 50`) fits without back-pressure on the hot
229/// streaming path. Any value ≥ 5 051 satisfies the structural ceiling test;
230/// 8 192 leaves comfortable headroom for `TextDelta` bursts.
231///
232/// Filling the channel causes `tx.send().await` to back-pressure on every
233/// token, with measurable latency impact, so the constant is set well above
234/// the hard floor.
235///
236/// 8 192 fits comfortably in under a megabyte of memory per active agent
237/// session.
238///
239/// All callers (SSE handlers, CLI REPL) should use this constant instead of
240/// a magic literal.
241pub const AGENT_EVENT_CHANNEL_CAPACITY: usize = 8_192;
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246
247 #[test]
248 fn agent_event_serde_tag_matches_wire_format() {
249 let evt = AgentEvent::FinalAnswer {
250 content: "done".into(),
251 };
252 let json = serde_json::to_value(&evt).unwrap();
253 assert_eq!(json["type"], "final_answer");
254 assert_eq!(json["content"], "done");
255 }
256
257 #[test]
258 fn tool_call_start_serialises_correctly() {
259 let evt = AgentEvent::ToolCallStart {
260 tool_call: ToolCall {
261 id: "c1".into(),
262 name: "search".into(),
263 arguments: serde_json::json!({"q": "rust"}),
264 },
265 display_name: "Search".into(),
266 args_summary: None,
267 };
268 let json = serde_json::to_value(&evt).unwrap();
269 assert_eq!(json["type"], "tool_call_start");
270 assert_eq!(json["tool_call"]["name"], "search");
271 }
272
273 /// [`AGENT_EVENT_CHANNEL_CAPACITY`] must be positive and must be at least
274 /// large enough for a full run at the maximum ceiling configuration
275 /// (`MAX_ITERATIONS_CEILING` × (`MAX_PARALLEL_TOOLS_CEILING` × 2 + 1) + 1
276 /// structural events), so that back-pressure never occurs on the hot
277 /// streaming path for any valid configuration.
278 #[test]
279 fn agent_event_channel_capacity_is_sufficient_for_max_config() {
280 use super::super::config::{MAX_ITERATIONS_CEILING, MAX_PARALLEL_TOOLS_CEILING};
281
282 // Minimum structural events for a run at ceiling config
283 // (no TextDelta headroom included — this is the hard lower bound).
284 let structural_per_iter = MAX_PARALLEL_TOOLS_CEILING * 2 + 1;
285 let minimum_structural = MAX_ITERATIONS_CEILING * structural_per_iter + 1;
286 assert!(
287 AGENT_EVENT_CHANNEL_CAPACITY >= minimum_structural,
288 "AGENT_EVENT_CHANNEL_CAPACITY ({AGENT_EVENT_CHANNEL_CAPACITY}) is smaller than \
289 the minimum required for ceiling config ({minimum_structural}); \
290 increase the constant"
291 );
292 }
293}