gglib_core/sse/
parser.rs

1//! Parser for OpenAI-compatible SSE `data:` frames.
2//!
3//! Isolated so the frame-parsing logic and its tests are self-contained and
4//! do not require an HTTP client or async runtime.
5//!
6//! # Frame ordering for reasoning models
7//!
8//! When a single SSE frame carries both `reasoning_content` (chain-of-thought)
9//! **and** `content` (answer text), the [`ReasoningDelta`] event is emitted
10//! first.  This matches the temporal semantics of reasoning models such as
11//! `DeepSeek` R1 and `QwQ`, where the chain-of-thought is always produced before
12//! the answer — even if llama-server coalesces both into the same frame.
13//!
14//! [`ReasoningDelta`]: crate::LlmStreamEvent::ReasoningDelta
15
16use anyhow::{Result, anyhow};
17
18use crate::domain::agent::LlmStreamEvent;
19
20// =============================================================================
21// Public types
22// =============================================================================
23
24/// Result of parsing a single SSE `data:` payload.
25#[derive(Debug)]
26pub enum SseParseResult {
27    /// The value `[DONE]` — stream terminator, no events.
28    Done,
29    /// One or more events decoded from the JSON frame.
30    Events(Vec<LlmStreamEvent>),
31}
32
33// =============================================================================
34// Parser
35// =============================================================================
36
37/// Parse a single SSE `data:` payload into zero or more [`LlmStreamEvent`]s.
38///
39/// Returns:
40/// - `Ok(SseParseResult::Done)` when `data == "[DONE]"`
41/// - `Ok(SseParseResult::Events(…))` for a valid JSON frame (may be empty
42///   when the frame carries no content or tool-call deltas)
43/// - `Err(…)` when the frame is not valid JSON
44///
45/// # Errors
46///
47/// Returns an error if the `data` payload is not valid JSON.
48pub fn parse_sse_frame(data: &str) -> Result<SseParseResult> {
49    if data == "[DONE]" {
50        return Ok(SseParseResult::Done);
51    }
52
53    let parsed: serde_json::Value = serde_json::from_str(data)
54        .map_err(|e| anyhow!("SSE frame JSON parse error: {e} — data: {data}"))?;
55
56    // ── Prompt-progress frames (llama-server `return_progress: true`) ────
57    // These arrive during the pre-fill phase and have no `choices` array.
58    // We check for them *before* the choices guard so they aren't silently
59    // dropped as "no choices" frames.
60    if let Some(pp) = parsed.get("prompt_progress") {
61        let processed = u32::try_from(pp["processed"].as_u64().unwrap_or(0)).unwrap_or(u32::MAX);
62        let total = u32::try_from(pp["total"].as_u64().unwrap_or(0)).unwrap_or(u32::MAX);
63        let cached = u32::try_from(pp["cache"].as_u64().unwrap_or(0)).unwrap_or(u32::MAX);
64        let time_ms = pp["time_ms"].as_u64().unwrap_or(0);
65        return Ok(SseParseResult::Events(vec![
66            LlmStreamEvent::PromptProgress {
67                processed,
68                total,
69                cached,
70                time_ms,
71            },
72        ]));
73    }
74
75    // Guard against keepalive / error frames that carry no `choices` array.
76    // Without this check every field access falls through to `Value::Null`,
77    // events are silently dropped, and a `finish_reason: "stop"` in such a
78    // frame would mean the stream never emits `Done`.
79    let choices = &parsed["choices"];
80    if choices.as_array().is_none_or(Vec::is_empty) {
81        tracing::debug!(data = %data, "SSE frame has no 'choices' entries — skipping");
82        return Ok(SseParseResult::Events(vec![]));
83    }
84    let choice = &choices[0];
85    let delta = &choice["delta"];
86
87    let mut events: Vec<LlmStreamEvent> = Vec::new();
88
89    // ── Reasoning/CoT content delta (DeepSeek R1 / QwQ) ────────────────────
90    // Emitted FIRST: chain-of-thought semantically precedes answer text, so
91    // even when both fields appear in the same frame we preserve this order.
92    // llama-server emits `delta["reasoning_content"]` when started with
93    // `--reasoning-format deepseek`.
94    if let Some(reasoning) = delta["reasoning_content"].as_str()
95        && !reasoning.is_empty()
96    {
97        events.push(LlmStreamEvent::ReasoningDelta {
98            content: reasoning.to_owned(),
99        });
100    }
101
102    // ── Text content delta ──────────────────────────────────────────────────
103    if let Some(content) = delta["content"].as_str()
104        && !content.is_empty()
105    {
106        events.push(LlmStreamEvent::TextDelta {
107            content: content.to_owned(),
108        });
109    }
110
111    // ── Tool-call deltas ────────────────────────────────────────────────────
112    if let Some(tool_calls) = delta["tool_calls"].as_array() {
113        for (sequential, tc) in tool_calls.iter().enumerate() {
114            // Prefer the explicit `index` field; fall back to the element's
115            // position in the array when `index` is absent.  A server that
116            // omits `index` on every element is non-compliant with the OpenAI
117            // spec, but we handle it gracefully rather than silently collapsing
118            // all calls onto slot 0.
119            let index = tc["index"]
120                .as_u64()
121                .and_then(|i| usize::try_from(i).ok())
122                .unwrap_or(sequential);
123            let id = tc["id"].as_str().map(str::to_owned);
124            let name = tc["function"]["name"].as_str().map(str::to_owned);
125            let arguments = tc["function"]["arguments"].as_str().map(str::to_owned);
126            events.push(LlmStreamEvent::ToolCallDelta {
127                index,
128                id,
129                name,
130                arguments,
131            });
132        }
133    }
134
135    // ── Finish reason → Done ────────────────────────────────────────────────
136    if let Some(finish_reason) = choice["finish_reason"].as_str()
137        && !finish_reason.is_empty()
138    {
139        events.push(LlmStreamEvent::Done {
140            finish_reason: finish_reason.to_owned(),
141        });
142    }
143
144    Ok(SseParseResult::Events(events))
145}
146
147// =============================================================================
148// Tests
149// =============================================================================
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154
155    // ── Frame builders ─────────────────────────────────────────────────────
156
157    fn text_frame(content: &str) -> String {
158        serde_json::json!({
159            "choices": [{ "delta": { "content": content }, "finish_reason": null }]
160        })
161        .to_string()
162    }
163
164    fn finish_frame(reason: &str) -> String {
165        serde_json::json!({
166            "choices": [{ "delta": {}, "finish_reason": reason }]
167        })
168        .to_string()
169    }
170
171    fn tool_frame(index: usize, id: &str, name: &str, args: &str) -> String {
172        serde_json::json!({
173            "choices": [{
174                "delta": {
175                    "tool_calls": [{
176                        "index": index,
177                        "id": id,
178                        "function": { "name": name, "arguments": args }
179                    }]
180                },
181                "finish_reason": null
182            }]
183        })
184        .to_string()
185    }
186
187    /// Build a frame whose `tool_calls` array elements intentionally omit the
188    /// `index` field, simulating a non-compliant but real-world server.
189    fn tool_frame_no_index(id: &str, name: &str, args: &str) -> String {
190        serde_json::json!({
191            "choices": [{
192                "delta": {
193                    "tool_calls": [
194                        { "id": id, "function": { "name": name, "arguments": args } }
195                    ]
196                },
197                "finish_reason": null
198            }]
199        })
200        .to_string()
201    }
202
203    /// Build a frame with two tool-call elements that both omit `index`.
204    fn two_tool_frames_no_index() -> String {
205        serde_json::json!({
206            "choices": [{
207                "delta": {
208                    "tool_calls": [
209                        { "id": "c1", "function": { "name": "search",  "arguments": "{}" } },
210                        { "id": "c2", "function": { "name": "read_file", "arguments": "{}" } }
211                    ]
212                },
213                "finish_reason": null
214            }]
215        })
216        .to_string()
217    }
218
219    // ── Tests ──────────────────────────────────────────────────────────────
220
221    #[test]
222    fn done_sentinel_returns_done_variant() {
223        assert!(matches!(
224            parse_sse_frame("[DONE]"),
225            Ok(SseParseResult::Done)
226        ));
227    }
228
229    #[test]
230    fn text_delta_frame_produces_text_event() {
231        let events = match parse_sse_frame(&text_frame("hello")) {
232            Ok(SseParseResult::Events(e)) => e,
233            other => panic!("unexpected: {other:?}"),
234        };
235        assert_eq!(events.len(), 1);
236        assert!(matches!(
237            &events[0],
238            LlmStreamEvent::TextDelta { content } if content == "hello"
239        ));
240    }
241
242    #[test]
243    fn empty_content_produces_no_text_event() {
244        let frame = serde_json::json!({
245            "choices": [{ "delta": { "content": "" }, "finish_reason": null }]
246        })
247        .to_string();
248        let events = match parse_sse_frame(&frame) {
249            Ok(SseParseResult::Events(e)) => e,
250            other => panic!("unexpected: {other:?}"),
251        };
252        assert!(
253            events.is_empty(),
254            "empty content should not produce TextDelta"
255        );
256    }
257
258    #[test]
259    fn finish_reason_produces_done_event() {
260        let events = match parse_sse_frame(&finish_frame("stop")) {
261            Ok(SseParseResult::Events(e)) => e,
262            other => panic!("unexpected: {other:?}"),
263        };
264        assert_eq!(events.len(), 1);
265        assert!(matches!(
266            &events[0],
267            LlmStreamEvent::Done { finish_reason } if finish_reason == "stop"
268        ));
269    }
270
271    #[test]
272    fn tool_call_delta_frame_is_parsed() {
273        let events = match parse_sse_frame(&tool_frame(0, "tc1", "search", r#"{"q":"rust"}"#)) {
274            Ok(SseParseResult::Events(e)) => e,
275            other => panic!("unexpected: {other:?}"),
276        };
277        assert_eq!(events.len(), 1);
278        assert!(matches!(
279            &events[0],
280            LlmStreamEvent::ToolCallDelta {
281                index: 0,
282                id: Some(id),
283                name: Some(n),
284                arguments: Some(a),
285            } if id == "tc1" && n == "search" && a == r#"{"q":"rust"}"#
286        ));
287    }
288
289    #[test]
290    fn tool_call_delta_with_no_index_defaults_to_sequential_position() {
291        let events = match parse_sse_frame(&tool_frame_no_index("tc1", "search", r#"{"q":"rust"}"#))
292        {
293            Ok(SseParseResult::Events(e)) => e,
294            other => panic!("unexpected: {other:?}"),
295        };
296        assert_eq!(events.len(), 1);
297        assert!(matches!(
298            &events[0],
299            LlmStreamEvent::ToolCallDelta { index: 0, id: Some(id), .. } if id == "tc1"
300        ));
301    }
302
303    #[test]
304    fn two_tool_calls_with_no_index_get_distinct_sequential_slots() {
305        let events = match parse_sse_frame(&two_tool_frames_no_index()) {
306            Ok(SseParseResult::Events(e)) => e,
307            other => panic!("unexpected: {other:?}"),
308        };
309        assert_eq!(events.len(), 2, "both tool-call deltas must be emitted");
310        assert!(matches!(
311            &events[0],
312            LlmStreamEvent::ToolCallDelta { index: 0, id: Some(id), .. } if id == "c1"
313        ));
314        assert!(matches!(
315            &events[1],
316            LlmStreamEvent::ToolCallDelta { index: 1, id: Some(id), .. } if id == "c2"
317        ));
318    }
319
320    #[test]
321    fn malformed_json_returns_error() {
322        assert!(
323            parse_sse_frame("{ broken json }").is_err(),
324            "malformed JSON should return Err"
325        );
326    }
327
328    #[test]
329    fn frame_with_text_and_finish_reason_produces_both_events() {
330        let frame = serde_json::json!({
331            "choices": [{ "delta": { "content": "hi" }, "finish_reason": "stop" }]
332        })
333        .to_string();
334        let events = match parse_sse_frame(&frame) {
335            Ok(SseParseResult::Events(e)) => e,
336            other => panic!("unexpected: {other:?}"),
337        };
338        assert_eq!(events.len(), 2);
339        assert!(matches!(&events[0], LlmStreamEvent::TextDelta { .. }));
340        assert!(matches!(&events[1], LlmStreamEvent::Done { .. }));
341    }
342
343    #[test]
344    fn reasoning_content_produces_reasoning_delta_event() {
345        let frame = serde_json::json!({
346            "choices": [{ "delta": { "reasoning_content": "I should check..." }, "finish_reason": null }]
347        })
348        .to_string();
349        let events = match parse_sse_frame(&frame) {
350            Ok(SseParseResult::Events(e)) => e,
351            other => panic!("unexpected: {other:?}"),
352        };
353        assert_eq!(events.len(), 1);
354        assert!(matches!(
355            &events[0],
356            LlmStreamEvent::ReasoningDelta { content } if content == "I should check..."
357        ));
358    }
359
360    #[test]
361    fn empty_reasoning_content_produces_no_event() {
362        let frame = serde_json::json!({
363            "choices": [{ "delta": { "reasoning_content": "" }, "finish_reason": null }]
364        })
365        .to_string();
366        let events = match parse_sse_frame(&frame) {
367            Ok(SseParseResult::Events(e)) => e,
368            other => panic!("unexpected: {other:?}"),
369        };
370        assert!(
371            events.is_empty(),
372            "empty reasoning_content should not produce ReasoningDelta"
373        );
374    }
375
376    #[test]
377    fn frame_with_reasoning_and_text_reasoning_emitted_first() {
378        let frame = serde_json::json!({
379            "choices": [{ "delta": { "content": "ok", "reasoning_content": "think" }, "finish_reason": null }]
380        })
381        .to_string();
382        let events = match parse_sse_frame(&frame) {
383            Ok(SseParseResult::Events(e)) => e,
384            other => panic!("unexpected: {other:?}"),
385        };
386        assert_eq!(events.len(), 2);
387        assert!(
388            matches!(&events[0], LlmStreamEvent::ReasoningDelta { content } if content == "think"),
389            "ReasoningDelta must come first"
390        );
391        assert!(
392            matches!(&events[1], LlmStreamEvent::TextDelta { content } if content == "ok"),
393            "TextDelta must come second"
394        );
395    }
396
397    #[test]
398    fn prompt_progress_frame_produces_progress_event() {
399        let frame = serde_json::json!({
400            "prompt_progress": {
401                "processed": 2048,
402                "total": 8192,
403                "cache": 512,
404                "time_ms": 1234
405            }
406        })
407        .to_string();
408        let events = match parse_sse_frame(&frame) {
409            Ok(SseParseResult::Events(e)) => e,
410            other => panic!("unexpected: {other:?}"),
411        };
412        assert_eq!(events.len(), 1);
413        assert!(matches!(
414            &events[0],
415            LlmStreamEvent::PromptProgress {
416                processed: 2048,
417                total: 8192,
418                cached: 512,
419                time_ms: 1234
420            }
421        ));
422    }
423
424    #[test]
425    fn prompt_progress_frame_not_confused_with_choices() {
426        let frame = serde_json::json!({
427            "prompt_progress": {
428                "processed": 100,
429                "total": 100,
430                "cache": 0,
431                "time_ms": 50
432            }
433        })
434        .to_string();
435        let events = match parse_sse_frame(&frame) {
436            Ok(SseParseResult::Events(e)) => e,
437            other => panic!("unexpected: {other:?}"),
438        };
439        assert!(
440            !events.is_empty(),
441            "prompt_progress frame must not be skipped"
442        );
443    }
444}