gglib_core/sse/
decoder.rs

1//! Stateful SSE byte-stream decoder.
2//!
3//! [`SseStreamDecoder`] accumulates raw bytes from an HTTP response into a line
4//! buffer, drains complete `data:` lines, and delegates frame parsing to
5//! [`super::parser`].  Its explicit state makes it straightforward to unit-
6//! test without standing up an actual HTTP server or wrapping everything in an
7//! `async_stream` macro block.
8
9use anyhow::Result;
10use tracing::debug;
11
12use crate::LlmStreamEvent;
13
14use super::parser::{SseParseResult, parse_sse_frame};
15
16/// Stateful decoder that turns a sequence of raw SSE byte chunks into a
17/// sequence of [`LlmStreamEvent`] values.
18///
19/// # Usage
20///
21/// ```ignore
22/// let mut decoder = SseStreamDecoder::default();
23/// while let Some(chunk) = byte_stream.next().await { … }
24///     let (events, stop) = decoder.feed_bytes(&chunk);
25///     for event in events { … }
26///     if stop { break; }
27/// }
28/// if let Some(fallback) = decoder.finish() { … }
29/// ```
30#[derive(Default)]
31pub struct SseStreamDecoder {
32    buf: String,
33    /// Set to `true` once a [`LlmStreamEvent::Done`] has been yielded, so the
34    /// `[DONE]` sentinel doesn't generate a duplicate.
35    done_sent: bool,
36}
37
38impl SseStreamDecoder {
39    /// Feed one raw byte chunk into the decoder.
40    ///
41    /// Returns `(events, should_stop)`.
42    ///
43    /// - `events` — zero or more parsed [`LlmStreamEvent`] values (or stream
44    ///   errors) extracted from the bytes fed so far.
45    /// - `should_stop` — `true` when the SSE stream has reached its natural end
46    ///   (a `[DONE]` sentinel or an unrecoverable parse error).  The caller
47    ///   must not feed any further chunks once this flag is `true`.
48    pub fn feed_bytes(&mut self, bytes: &[u8]) -> (Vec<Result<LlmStreamEvent>>, bool) {
49        let text = match std::str::from_utf8(bytes) {
50            Ok(t) => t,
51            Err(e) => {
52                return (
53                    vec![Err(anyhow::anyhow!("invalid UTF-8 in LLM SSE stream: {e}"))],
54                    true,
55                );
56            }
57        };
58        self.buf.push_str(text);
59        let mut events = Vec::new();
60
61        while let Some(newline_pos) = self.buf.find('\n') {
62            let line = self.buf[..newline_pos].trim_end_matches('\r').to_owned();
63            self.buf.drain(..=newline_pos);
64
65            // Skip blank lines and SSE comment lines.
66            let Some(data) = line.strip_prefix("data: ") else {
67                continue;
68            };
69
70            match parse_sse_frame(data) {
71                Ok(SseParseResult::Done) => {
72                    if !self.done_sent {
73                        debug!(
74                            "LLM stream ended with [DONE] but no prior finish_reason \
75                             — emitting fallback Done"
76                        );
77                        events.push(Ok(LlmStreamEvent::Done {
78                            finish_reason: "stop".to_owned(),
79                        }));
80                    }
81                    self.done_sent = true;
82                    return (events, true);
83                }
84                Ok(SseParseResult::Events(parsed_events)) => {
85                    for event in parsed_events {
86                        if matches!(event, LlmStreamEvent::Done { .. }) {
87                            self.done_sent = true;
88                        }
89                        events.push(Ok(event));
90                    }
91                }
92                Err(e) => {
93                    events.push(Err(e));
94                    return (events, true);
95                }
96            }
97        }
98
99        (events, false)
100    }
101
102    /// Emit a fallback `Done` event if the byte stream ended without one.
103    ///
104    /// Call this once after the upstream byte stream is fully exhausted.
105    /// Returns `None` if a `Done` was already yielded by [`feed_bytes`].
106    #[must_use]
107    pub fn finish(self) -> Option<LlmStreamEvent> {
108        if self.done_sent {
109            None
110        } else {
111            debug!("LLM byte-stream ended without [DONE] sentinel — emitting fallback Done");
112            Some(LlmStreamEvent::Done {
113                finish_reason: "stop".to_owned(),
114            })
115        }
116    }
117}
118
119// =============================================================================
120// Tests
121// =============================================================================
122
123#[cfg(test)]
124mod tests {
125    use anyhow::Result;
126
127    use super::SseStreamDecoder;
128    use crate::LlmStreamEvent;
129
130    fn text_delta_frame(text: &str) -> String {
131        let json = serde_json::json!({
132            "choices": [{
133                "delta": { "content": text },
134                "finish_reason": null
135            }]
136        });
137        format!("data: {json}\n")
138    }
139
140    fn done_frame() -> &'static str {
141        "data: [DONE]\n"
142    }
143
144    fn finish_reason_frame() -> String {
145        let json = serde_json::json!({
146            "choices": [{
147                "delta": {},
148                "finish_reason": "stop"
149            }]
150        });
151        format!("data: {json}\n")
152    }
153
154    // ---- helpers ------------------------------------------------------------
155
156    fn collect_all(decoder: &mut SseStreamDecoder, input: &str) -> (Vec<LlmStreamEvent>, bool) {
157        let (raw, stop) = decoder.feed_bytes(input.as_bytes());
158        let events: Vec<_> = raw.into_iter().map(Result::unwrap).collect();
159        (events, stop)
160    }
161
162    // ---- tests --------------------------------------------------------------
163
164    #[test]
165    fn text_delta_is_emitted() {
166        let mut dec = SseStreamDecoder::default();
167        let (events, stop) = collect_all(&mut dec, &text_delta_frame("hello"));
168        assert!(!stop);
169        assert!(
170            events
171                .iter()
172                .any(|e| matches!(e, LlmStreamEvent::TextDelta { content } if content == "hello"))
173        );
174    }
175
176    #[test]
177    fn done_sentinel_signals_stop_and_emits_fallback() {
178        let mut dec = SseStreamDecoder::default();
179        let (events, stop) = collect_all(&mut dec, done_frame());
180        assert!(stop, "decoder should signal stop on [DONE]");
181        assert!(
182            events
183                .iter()
184                .any(|e| matches!(e, LlmStreamEvent::Done { .. })),
185            "fallback Done should be emitted when no prior finish_reason"
186        );
187        assert!(
188            dec.finish().is_none(),
189            "finish() must return None after a [DONE] sentinel — done_sent must be set"
190        );
191    }
192
193    #[test]
194    fn finish_reason_then_done_no_duplicate_done() {
195        let mut dec = SseStreamDecoder::default();
196        let input = format!("{}{}", finish_reason_frame(), done_frame());
197        let (events, stop) = collect_all(&mut dec, &input);
198        assert!(stop);
199        let done_count = events
200            .iter()
201            .filter(|e| matches!(e, LlmStreamEvent::Done { .. }))
202            .count();
203        assert_eq!(done_count, 1, "exactly one Done should be emitted");
204    }
205
206    #[test]
207    fn finish_emits_fallback_when_stream_ends_without_done() {
208        let mut dec = SseStreamDecoder::default();
209        let _ = collect_all(&mut dec, &text_delta_frame("partial"));
210        let fallback = dec.finish();
211        assert!(
212            fallback.is_some(),
213            "finish() should return a fallback Done when stream ends without one"
214        );
215    }
216
217    #[test]
218    fn finish_returns_none_when_done_already_sent() {
219        let mut dec = SseStreamDecoder::default();
220        let _ = collect_all(&mut dec, &finish_reason_frame());
221        assert!(
222            dec.finish().is_none(),
223            "finish() must not emit a second Done"
224        );
225    }
226
227    #[test]
228    fn partial_line_buffered_until_newline_arrives() {
229        let mut dec = SseStreamDecoder::default();
230        let full_frame = text_delta_frame("world");
231
232        let mid = full_frame.len() / 2;
233        let (first_events, stop1) = collect_all(&mut dec, &full_frame[..mid]);
234        assert!(!stop1);
235        assert!(first_events.is_empty(), "no complete line yet");
236
237        let (second_events, stop2) = collect_all(&mut dec, &full_frame[mid..]);
238        assert!(!stop2);
239        assert!(
240            second_events
241                .iter()
242                .any(|e| matches!(e, LlmStreamEvent::TextDelta { .. })),
243            "TextDelta should be emitted once the newline arrives"
244        );
245    }
246}