gglib_core/domain/agent/events.rs
1//! [`AgentEvent`] and [`LlmStreamEvent`] — observable events in the agentic loop.
2
3use serde::Serialize;
4
5use super::tool_types::{ToolCall, ToolResult};
6
7// =============================================================================
8// Agent events (SSE stream units)
9// =============================================================================
10
11/// An observable event emitted by the agentic loop.
12///
13/// These events are the unit of SSE emission: every state change in the loop
14/// produces exactly one variant. Axum SSE handlers serialise these to
15/// `data: <json>\n\n` frames; CLI consumers may log or render them directly.
16///
17/// # Serde tag
18///
19/// `#[serde(tag = "type", rename_all = "snake_case")]` produces e.g.
20/// `{"type":"tool_call_start","tool_call":{...}}`.
21#[derive(Debug, Clone, Serialize)]
22#[serde(tag = "type", rename_all = "snake_case")]
23pub enum AgentEvent {
24 /// An incremental text fragment from the model's response.
25 TextDelta {
26 /// The new text fragment (append to the current buffer).
27 content: String,
28 },
29
30 /// An incremental reasoning/thinking fragment from the model (`CoT` tokens).
31 ///
32 /// Emitted by reasoning-capable models (e.g. `DeepSeek` R1, `QwQ`) that expose
33 /// their chain-of-thought via a separate `reasoning_content` SSE field.
34 /// These fragments are forwarded to the UI as they arrive but are **not**
35 /// included in the conversation history sent back to the model.
36 ReasoningDelta {
37 /// The new reasoning fragment (append to the current reasoning buffer).
38 content: String,
39 },
40
41 /// The model has requested execution of a tool.
42 ToolCallStart {
43 /// The tool call that is about to be dispatched.
44 tool_call: ToolCall,
45 /// Human-readable tool name (prefix stripped, title-cased).
46 display_name: String,
47 /// One-line argument summary (e.g. file path, search pattern).
48 args_summary: Option<String>,
49 },
50
51 /// A tool execution has completed (success or failure).
52 ToolCallComplete {
53 /// Name of the tool that was executed.
54 tool_name: String,
55 /// The outcome of the tool execution.
56 result: ToolResult,
57 /// Time spent waiting for a concurrency permit (semaphore), in milliseconds.
58 wait_ms: u64,
59 /// Wall-clock time taken to execute the tool after acquiring the permit,
60 /// in milliseconds.
61 execute_duration_ms: u64,
62 /// Human-readable tool name (prefix stripped, title-cased).
63 display_name: String,
64 /// Human-readable duration (e.g. "125ms", "1.5s").
65 duration_display: String,
66 },
67
68 /// One full LLM→tool-execution cycle has completed.
69 IterationComplete {
70 /// The 1-based iteration index that just finished.
71 iteration: usize,
72 /// Number of tool calls executed during this iteration.
73 tool_calls: usize,
74 },
75
76 /// The loop has concluded and produced a definitive answer.
77 FinalAnswer {
78 /// The complete final response text.
79 content: String,
80 },
81
82 /// Prompt-processing progress from the LLM backend.
83 ///
84 /// Emitted during the pre-fill phase when llama-server is streaming
85 /// `prompt_progress` frames. Surfaces token-level progress so the UI
86 /// can show "processing prompt: 2048 / 8192 tokens".
87 PromptProgress {
88 /// Number of tokens processed so far.
89 processed: u32,
90 /// Total number of tokens in the prompt.
91 total: u32,
92 /// Number of tokens served from KV cache (already processed).
93 cached: u32,
94 /// Elapsed wall-clock time in milliseconds since processing began.
95 time_ms: u64,
96 },
97
98 /// A non-fatal system-level warning surfaced by the loop itself.
99 ///
100 /// Unlike [`AgentEvent::Error`], a `SystemWarning` does **not** terminate
101 /// the loop — it informs the user that the loop encountered a recoverable
102 /// condition (e.g. the model requested more parallel tool calls than the
103 /// configured limit, and the loop is auto-retrying with a synthetic
104 /// error fed back to the model).
105 ///
106 /// `suggested_action`, when present, contains an actionable hint the UI
107 /// can render verbatim (e.g. a CLI command to permanently raise a limit).
108 SystemWarning {
109 /// Human-readable description of the recoverable condition.
110 message: String,
111 /// Optional actionable hint (e.g. CLI command) the UI can show to the
112 /// user to permanently address the cause.
113 #[serde(skip_serializing_if = "Option::is_none")]
114 suggested_action: Option<String>,
115 },
116
117 /// A fatal error has terminated the loop.
118 Error {
119 /// Human-readable description of the failure.
120 message: String,
121 },
122}
123
124// =============================================================================
125// LLM stream events (consumed by LlmCompletionPort implementors)
126// =============================================================================
127
128/// A single event produced by a streaming LLM response.
129///
130/// These low-level events are the currency of [`crate::ports::LlmCompletionPort`];
131/// they are parsed by adapter crates from raw SSE frames and handed to
132/// `gglib-agent`'s stream collector, which:
133///
134/// - Forwards [`TextDelta`](LlmStreamEvent::TextDelta) items directly to the
135/// caller's [`AgentEvent`] channel so text appears in real time.
136/// - Accumulates [`ToolCallDelta`](LlmStreamEvent::ToolCallDelta) fragments
137/// until the stream ends, then assembles them into [`ToolCall`] values.
138/// - Waits for [`Done`](LlmStreamEvent::Done) before triggering tool execution.
139#[derive(Debug, Clone, PartialEq, Eq)]
140pub enum LlmStreamEvent {
141 /// An incremental text fragment from the model's response.
142 TextDelta {
143 /// The new text fragment (append to the running content buffer).
144 content: String,
145 },
146
147 /// An incremental reasoning/thinking fragment (`CoT` tokens).
148 ///
149 /// Produced by reasoning-capable models (e.g. `DeepSeek` R1, `QwQ`) when
150 /// llama-server is started with `--reasoning-format deepseek`. The
151 /// runtime adapter maps `delta["reasoning_content"]` frames to this
152 /// variant; the stream collector forwards them as
153 /// [`AgentEvent::ReasoningDelta`] and accumulates them in a separate
154 /// buffer that is never sent back to the LLM as context.
155 ReasoningDelta {
156 /// The new reasoning fragment (append to the current reasoning buffer).
157 content: String,
158 },
159
160 /// An incremental fragment of a tool-call request.
161 ///
162 /// The adapter crate streams these before the model has finished
163 /// generating the full arguments JSON. The stream collector accumulates
164 /// all deltas for a given `index` into a single [`ToolCall`].
165 ToolCallDelta {
166 /// Zero-based index of the tool call within the current response.
167 index: usize,
168 /// Call identifier (only present in the first delta for this index).
169 id: Option<String>,
170 /// Tool name (only present in the first delta for this index).
171 name: Option<String>,
172 /// Partial arguments JSON string fragment (accumulate with `push_str`).
173 arguments: Option<String>,
174 },
175
176 /// Prompt-processing progress from llama-server.
177 ///
178 /// Emitted when the request includes `return_progress: true`. These
179 /// frames arrive during the pre-fill phase (before any `TextDelta`),
180 /// giving real-time visibility into how far along token ingestion is.
181 PromptProgress {
182 /// Number of tokens processed so far.
183 processed: u32,
184 /// Total number of tokens in the prompt.
185 total: u32,
186 /// Number of tokens served from KV cache (already processed).
187 cached: u32,
188 /// Elapsed wall-clock time in milliseconds since processing began.
189 time_ms: u64,
190 },
191
192 /// Signals the end of the stream.
193 ///
194 /// Every conforming stream must end with exactly one `Done` item.
195 Done {
196 /// The OpenAI-compatible finish reason (e.g. `"stop"`, `"tool_calls"`,
197 /// `"length"`).
198 finish_reason: String,
199 },
200}
201
202// =============================================================================
203// Channel sizing
204// =============================================================================
205
206/// [`tokio::sync::mpsc`] channel capacity for streaming [`AgentEvent`]s
207/// produced by a single [`crate::ports::AgentLoopPort::run`] call.
208///
209/// Sized so that a full run at the **maximum ceiling configuration**
210/// (`MAX_ITERATIONS_CEILING` × (`MAX_PARALLEL_TOOLS_CEILING` × 2 + 1) + 1
211/// structural events ≈ 5 051 with `MAX_ITERATIONS_CEILING = 50` and
212/// `MAX_PARALLEL_TOOLS_CEILING = 50`) fits without back-pressure on the hot
213/// streaming path. Any value ≥ 5 051 satisfies the structural ceiling test;
214/// 8 192 leaves comfortable headroom for `TextDelta` bursts.
215///
216/// Filling the channel causes `tx.send().await` to back-pressure on every
217/// token, with measurable latency impact, so the constant is set well above
218/// the hard floor.
219///
220/// 8 192 fits comfortably in under a megabyte of memory per active agent
221/// session.
222///
223/// All callers (SSE handlers, CLI REPL) should use this constant instead of
224/// a magic literal.
225pub const AGENT_EVENT_CHANNEL_CAPACITY: usize = 8_192;
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230
231 #[test]
232 fn agent_event_serde_tag_matches_wire_format() {
233 let evt = AgentEvent::FinalAnswer {
234 content: "done".into(),
235 };
236 let json = serde_json::to_value(&evt).unwrap();
237 assert_eq!(json["type"], "final_answer");
238 assert_eq!(json["content"], "done");
239 }
240
241 #[test]
242 fn tool_call_start_serialises_correctly() {
243 let evt = AgentEvent::ToolCallStart {
244 tool_call: ToolCall {
245 id: "c1".into(),
246 name: "search".into(),
247 arguments: serde_json::json!({"q": "rust"}),
248 },
249 display_name: "Search".into(),
250 args_summary: None,
251 };
252 let json = serde_json::to_value(&evt).unwrap();
253 assert_eq!(json["type"], "tool_call_start");
254 assert_eq!(json["tool_call"]["name"], "search");
255 }
256
257 /// [`AGENT_EVENT_CHANNEL_CAPACITY`] must be positive and must be at least
258 /// large enough for a full run at the maximum ceiling configuration
259 /// (`MAX_ITERATIONS_CEILING` × (`MAX_PARALLEL_TOOLS_CEILING` × 2 + 1) + 1
260 /// structural events), so that back-pressure never occurs on the hot
261 /// streaming path for any valid configuration.
262 #[test]
263 fn agent_event_channel_capacity_is_sufficient_for_max_config() {
264 use super::super::config::{MAX_ITERATIONS_CEILING, MAX_PARALLEL_TOOLS_CEILING};
265
266 // Minimum structural events for a run at ceiling config
267 // (no TextDelta headroom included — this is the hard lower bound).
268 let structural_per_iter = MAX_PARALLEL_TOOLS_CEILING * 2 + 1;
269 let minimum_structural = MAX_ITERATIONS_CEILING * structural_per_iter + 1;
270 assert!(
271 AGENT_EVENT_CHANNEL_CAPACITY >= minimum_structural,
272 "AGENT_EVENT_CHANNEL_CAPACITY ({AGENT_EVENT_CHANNEL_CAPACITY}) is smaller than \
273 the minimum required for ceiling config ({minimum_structural}); \
274 increase the constant"
275 );
276 }
277}