gglib_core/domain/agent/
events.rs

1//! [`AgentEvent`] and [`LlmStreamEvent`] — observable events in the agentic loop.
2
3use serde::Serialize;
4
5use super::tool_types::{ToolCall, ToolResult};
6
7// =============================================================================
8// Agent events (SSE stream units)
9// =============================================================================
10
11/// An observable event emitted by the agentic loop.
12///
13/// These events are the unit of SSE emission: every state change in the loop
14/// produces exactly one variant. Axum SSE handlers serialise these to
15/// `data: <json>\n\n` frames; CLI consumers may log or render them directly.
16///
17/// # Serde tag
18///
19/// `#[serde(tag = "type", rename_all = "snake_case")]` produces e.g.
20/// `{"type":"tool_call_start","tool_call":{...}}`.
21#[derive(Debug, Clone, Serialize)]
22#[serde(tag = "type", rename_all = "snake_case")]
23pub enum AgentEvent {
24    /// An incremental text fragment from the model's response.
25    TextDelta {
26        /// The new text fragment (append to the current buffer).
27        content: String,
28    },
29
30    /// An incremental reasoning/thinking fragment from the model (`CoT` tokens).
31    ///
32    /// Emitted by reasoning-capable models (e.g. `DeepSeek` R1, `QwQ`) that expose
33    /// their chain-of-thought via a separate `reasoning_content` SSE field.
34    /// These fragments are forwarded to the UI as they arrive but are **not**
35    /// included in the conversation history sent back to the model.
36    ReasoningDelta {
37        /// The new reasoning fragment (append to the current reasoning buffer).
38        content: String,
39    },
40
41    /// The model has requested execution of a tool.
42    ToolCallStart {
43        /// The tool call that is about to be dispatched.
44        tool_call: ToolCall,
45    },
46
47    /// A tool execution has completed (success or failure).
48    ToolCallComplete {
49        /// The outcome of the tool execution.
50        result: ToolResult,
51        /// Time spent waiting for a concurrency permit (semaphore), in milliseconds.
52        wait_ms: u64,
53        /// Wall-clock time taken to execute the tool after acquiring the permit,
54        /// in milliseconds.
55        execute_duration_ms: u64,
56    },
57
58    /// One full LLM→tool-execution cycle has completed.
59    IterationComplete {
60        /// The 1-based iteration index that just finished.
61        iteration: usize,
62        /// Number of tool calls executed during this iteration.
63        tool_calls: usize,
64    },
65
66    /// The loop has concluded and produced a definitive answer.
67    FinalAnswer {
68        /// The complete final response text.
69        content: String,
70    },
71
72    /// A fatal error has terminated the loop.
73    Error {
74        /// Human-readable description of the failure.
75        message: String,
76    },
77}
78
79// =============================================================================
80// LLM stream events (consumed by LlmCompletionPort implementors)
81// =============================================================================
82
83/// A single event produced by a streaming LLM response.
84///
85/// These low-level events are the currency of [`crate::ports::LlmCompletionPort`];
86/// they are parsed by adapter crates from raw SSE frames and handed to
87/// `gglib-agent`'s stream collector, which:
88///
89/// - Forwards [`TextDelta`](LlmStreamEvent::TextDelta) items directly to the
90///   caller's [`AgentEvent`] channel so text appears in real time.
91/// - Accumulates [`ToolCallDelta`](LlmStreamEvent::ToolCallDelta) fragments
92///   until the stream ends, then assembles them into [`ToolCall`] values.
93/// - Waits for [`Done`](LlmStreamEvent::Done) before triggering tool execution.
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub enum LlmStreamEvent {
96    /// An incremental text fragment from the model's response.
97    TextDelta {
98        /// The new text fragment (append to the running content buffer).
99        content: String,
100    },
101
102    /// An incremental reasoning/thinking fragment (`CoT` tokens).
103    ///
104    /// Produced by reasoning-capable models (e.g. `DeepSeek` R1, `QwQ`) when
105    /// llama-server is started with `--reasoning-format deepseek`.  The
106    /// runtime adapter maps `delta["reasoning_content"]` frames to this
107    /// variant; the stream collector forwards them as
108    /// [`AgentEvent::ReasoningDelta`] and accumulates them in a separate
109    /// buffer that is never sent back to the LLM as context.
110    ReasoningDelta {
111        /// The new reasoning fragment (append to the current reasoning buffer).
112        content: String,
113    },
114
115    /// An incremental fragment of a tool-call request.
116    ///
117    /// The adapter crate streams these before the model has finished
118    /// generating the full arguments JSON. The stream collector accumulates
119    /// all deltas for a given `index` into a single [`ToolCall`].
120    ToolCallDelta {
121        /// Zero-based index of the tool call within the current response.
122        index: usize,
123        /// Call identifier (only present in the first delta for this index).
124        id: Option<String>,
125        /// Tool name (only present in the first delta for this index).
126        name: Option<String>,
127        /// Partial arguments JSON string fragment (accumulate with `push_str`).
128        arguments: Option<String>,
129    },
130
131    /// Signals the end of the stream.
132    ///
133    /// Every conforming stream must end with exactly one `Done` item.
134    Done {
135        /// The OpenAI-compatible finish reason (e.g. `"stop"`, `"tool_calls"`,
136        /// `"length"`).
137        finish_reason: String,
138    },
139}
140
141// =============================================================================
142// Channel sizing
143// =============================================================================
144
145/// [`tokio::sync::mpsc`] channel capacity for streaming [`AgentEvent`]s
146/// produced by a single [`crate::ports::AgentLoopPort::run`] call.
147///
148/// Set to a generous static ceiling (4 096) rather than a formula tied to
149/// default config values.  The formula-based value (~532 for default config)
150/// is too small for callers that use non-default settings such as
151/// `max_iterations = 50` + `max_parallel_tools = 20`, which would produce
152/// up to ~2 061 structural events per run before any `TextDelta` headroom.
153/// Filling the channel causes `tx.send().await` to back-pressure on every
154/// token in the hot streaming path, with measurable latency impact.
155///
156/// 4 096 fits comfortably in a few hundred kilobytes of memory per active
157/// agent session and is sufficient for any realistic configuration.
158///
159/// All callers (SSE handlers, CLI REPL) should use this constant instead of
160/// a magic literal.
161pub const AGENT_EVENT_CHANNEL_CAPACITY: usize = 4_096;
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166
167    #[test]
168    fn agent_event_serde_tag_matches_wire_format() {
169        let evt = AgentEvent::FinalAnswer {
170            content: "done".into(),
171        };
172        let json = serde_json::to_value(&evt).unwrap();
173        assert_eq!(json["type"], "final_answer");
174        assert_eq!(json["content"], "done");
175    }
176
177    #[test]
178    fn tool_call_start_serialises_correctly() {
179        let evt = AgentEvent::ToolCallStart {
180            tool_call: ToolCall {
181                id: "c1".into(),
182                name: "search".into(),
183                arguments: serde_json::json!({"q": "rust"}),
184            },
185        };
186        let json = serde_json::to_value(&evt).unwrap();
187        assert_eq!(json["type"], "tool_call_start");
188        assert_eq!(json["tool_call"]["name"], "search");
189    }
190
191    /// [`AGENT_EVENT_CHANNEL_CAPACITY`] must be positive and must be at least
192    /// large enough for a full run at the maximum ceiling configuration
193    /// (`MAX_ITERATIONS_CEILING` × (`MAX_PARALLEL_TOOLS_CEILING` × 2 + 1) + 1
194    /// structural events), so that back-pressure never occurs on the hot
195    /// streaming path for any valid configuration.
196    #[test]
197    fn agent_event_channel_capacity_is_sufficient_for_max_config() {
198        use super::super::config::{MAX_ITERATIONS_CEILING, MAX_PARALLEL_TOOLS_CEILING};
199
200        // Minimum structural events for a run at ceiling config
201        // (no TextDelta headroom included — this is the hard lower bound).
202        let structural_per_iter = MAX_PARALLEL_TOOLS_CEILING * 2 + 1;
203        let minimum_structural = MAX_ITERATIONS_CEILING * structural_per_iter + 1;
204        assert!(
205            AGENT_EVENT_CHANNEL_CAPACITY >= minimum_structural,
206            "AGENT_EVENT_CHANNEL_CAPACITY ({AGENT_EVENT_CHANNEL_CAPACITY}) is smaller than \
207             the minimum required for ceiling config ({minimum_structural}); \
208             increase the constant"
209        );
210    }
211}