gglib_core/domain/agent/events.rs
1//! [`AgentEvent`] and [`LlmStreamEvent`] — observable events in the agentic loop.
2
3use serde::Serialize;
4
5use super::tool_types::{ToolCall, ToolResult};
6
7// =============================================================================
8// Agent events (SSE stream units)
9// =============================================================================
10
11/// An observable event emitted by the agentic loop.
12///
13/// These events are the unit of SSE emission: every state change in the loop
14/// produces exactly one variant. Axum SSE handlers serialise these to
15/// `data: <json>\n\n` frames; CLI consumers may log or render them directly.
16///
17/// # Serde tag
18///
19/// `#[serde(tag = "type", rename_all = "snake_case")]` produces e.g.
20/// `{"type":"tool_call_start","tool_call":{...}}`.
21#[derive(Debug, Clone, Serialize)]
22#[serde(tag = "type", rename_all = "snake_case")]
23pub enum AgentEvent {
24 /// An incremental text fragment from the model's response.
25 TextDelta {
26 /// The new text fragment (append to the current buffer).
27 content: String,
28 },
29
30 /// An incremental reasoning/thinking fragment from the model (`CoT` tokens).
31 ///
32 /// Emitted by reasoning-capable models (e.g. `DeepSeek` R1, `QwQ`) that expose
33 /// their chain-of-thought via a separate `reasoning_content` SSE field.
34 /// These fragments are forwarded to the UI as they arrive but are **not**
35 /// included in the conversation history sent back to the model.
36 ReasoningDelta {
37 /// The new reasoning fragment (append to the current reasoning buffer).
38 content: String,
39 },
40
41 /// The model has requested execution of a tool.
42 ToolCallStart {
43 /// The tool call that is about to be dispatched.
44 tool_call: ToolCall,
45 },
46
47 /// A tool execution has completed (success or failure).
48 ToolCallComplete {
49 /// The outcome of the tool execution.
50 result: ToolResult,
51 /// Time spent waiting for a concurrency permit (semaphore), in milliseconds.
52 wait_ms: u64,
53 /// Wall-clock time taken to execute the tool after acquiring the permit,
54 /// in milliseconds.
55 execute_duration_ms: u64,
56 },
57
58 /// One full LLM→tool-execution cycle has completed.
59 IterationComplete {
60 /// The 1-based iteration index that just finished.
61 iteration: usize,
62 /// Number of tool calls executed during this iteration.
63 tool_calls: usize,
64 },
65
66 /// The loop has concluded and produced a definitive answer.
67 FinalAnswer {
68 /// The complete final response text.
69 content: String,
70 },
71
72 /// A fatal error has terminated the loop.
73 Error {
74 /// Human-readable description of the failure.
75 message: String,
76 },
77}
78
79// =============================================================================
80// LLM stream events (consumed by LlmCompletionPort implementors)
81// =============================================================================
82
83/// A single event produced by a streaming LLM response.
84///
85/// These low-level events are the currency of [`crate::ports::LlmCompletionPort`];
86/// they are parsed by adapter crates from raw SSE frames and handed to
87/// `gglib-agent`'s stream collector, which:
88///
89/// - Forwards [`TextDelta`](LlmStreamEvent::TextDelta) items directly to the
90/// caller's [`AgentEvent`] channel so text appears in real time.
91/// - Accumulates [`ToolCallDelta`](LlmStreamEvent::ToolCallDelta) fragments
92/// until the stream ends, then assembles them into [`ToolCall`] values.
93/// - Waits for [`Done`](LlmStreamEvent::Done) before triggering tool execution.
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub enum LlmStreamEvent {
96 /// An incremental text fragment from the model's response.
97 TextDelta {
98 /// The new text fragment (append to the running content buffer).
99 content: String,
100 },
101
102 /// An incremental reasoning/thinking fragment (`CoT` tokens).
103 ///
104 /// Produced by reasoning-capable models (e.g. `DeepSeek` R1, `QwQ`) when
105 /// llama-server is started with `--reasoning-format deepseek`. The
106 /// runtime adapter maps `delta["reasoning_content"]` frames to this
107 /// variant; the stream collector forwards them as
108 /// [`AgentEvent::ReasoningDelta`] and accumulates them in a separate
109 /// buffer that is never sent back to the LLM as context.
110 ReasoningDelta {
111 /// The new reasoning fragment (append to the current reasoning buffer).
112 content: String,
113 },
114
115 /// An incremental fragment of a tool-call request.
116 ///
117 /// The adapter crate streams these before the model has finished
118 /// generating the full arguments JSON. The stream collector accumulates
119 /// all deltas for a given `index` into a single [`ToolCall`].
120 ToolCallDelta {
121 /// Zero-based index of the tool call within the current response.
122 index: usize,
123 /// Call identifier (only present in the first delta for this index).
124 id: Option<String>,
125 /// Tool name (only present in the first delta for this index).
126 name: Option<String>,
127 /// Partial arguments JSON string fragment (accumulate with `push_str`).
128 arguments: Option<String>,
129 },
130
131 /// Signals the end of the stream.
132 ///
133 /// Every conforming stream must end with exactly one `Done` item.
134 Done {
135 /// The OpenAI-compatible finish reason (e.g. `"stop"`, `"tool_calls"`,
136 /// `"length"`).
137 finish_reason: String,
138 },
139}
140
141// =============================================================================
142// Channel sizing
143// =============================================================================
144
145/// [`tokio::sync::mpsc`] channel capacity for streaming [`AgentEvent`]s
146/// produced by a single [`crate::ports::AgentLoopPort::run`] call.
147///
148/// Set to a generous static ceiling (4 096) rather than a formula tied to
149/// default config values. The formula-based value (~532 for default config)
150/// is too small for callers that use non-default settings such as
151/// `max_iterations = 50` + `max_parallel_tools = 20`, which would produce
152/// up to ~2 061 structural events per run before any `TextDelta` headroom.
153/// Filling the channel causes `tx.send().await` to back-pressure on every
154/// token in the hot streaming path, with measurable latency impact.
155///
156/// 4 096 fits comfortably in a few hundred kilobytes of memory per active
157/// agent session and is sufficient for any realistic configuration.
158///
159/// All callers (SSE handlers, CLI REPL) should use this constant instead of
160/// a magic literal.
161pub const AGENT_EVENT_CHANNEL_CAPACITY: usize = 4_096;
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166
167 #[test]
168 fn agent_event_serde_tag_matches_wire_format() {
169 let evt = AgentEvent::FinalAnswer {
170 content: "done".into(),
171 };
172 let json = serde_json::to_value(&evt).unwrap();
173 assert_eq!(json["type"], "final_answer");
174 assert_eq!(json["content"], "done");
175 }
176
177 #[test]
178 fn tool_call_start_serialises_correctly() {
179 let evt = AgentEvent::ToolCallStart {
180 tool_call: ToolCall {
181 id: "c1".into(),
182 name: "search".into(),
183 arguments: serde_json::json!({"q": "rust"}),
184 },
185 };
186 let json = serde_json::to_value(&evt).unwrap();
187 assert_eq!(json["type"], "tool_call_start");
188 assert_eq!(json["tool_call"]["name"], "search");
189 }
190
191 /// [`AGENT_EVENT_CHANNEL_CAPACITY`] must be positive and must be at least
192 /// large enough for a full run at the maximum ceiling configuration
193 /// (`MAX_ITERATIONS_CEILING` × (`MAX_PARALLEL_TOOLS_CEILING` × 2 + 1) + 1
194 /// structural events), so that back-pressure never occurs on the hot
195 /// streaming path for any valid configuration.
196 #[test]
197 fn agent_event_channel_capacity_is_sufficient_for_max_config() {
198 use super::super::config::{MAX_ITERATIONS_CEILING, MAX_PARALLEL_TOOLS_CEILING};
199
200 // Minimum structural events for a run at ceiling config
201 // (no TextDelta headroom included — this is the hard lower bound).
202 let structural_per_iter = MAX_PARALLEL_TOOLS_CEILING * 2 + 1;
203 let minimum_structural = MAX_ITERATIONS_CEILING * structural_per_iter + 1;
204 assert!(
205 AGENT_EVENT_CHANNEL_CAPACITY >= minimum_structural,
206 "AGENT_EVENT_CHANNEL_CAPACITY ({AGENT_EVENT_CHANNEL_CAPACITY}) is smaller than \
207 the minimum required for ceiling config ({minimum_structural}); \
208 increase the constant"
209 );
210 }
211}