gglib_core/sse/
decoder.rs1use anyhow::Result;
10use tracing::debug;
11
12use crate::LlmStreamEvent;
13
14use super::parser::{SseParseResult, parse_sse_frame};
15
16#[derive(Default)]
31pub struct SseStreamDecoder {
32 buf: String,
33 done_sent: bool,
36}
37
38impl SseStreamDecoder {
39 pub fn feed_bytes(&mut self, bytes: &[u8]) -> (Vec<Result<LlmStreamEvent>>, bool) {
49 let text = match std::str::from_utf8(bytes) {
50 Ok(t) => t,
51 Err(e) => {
52 return (
53 vec![Err(anyhow::anyhow!("invalid UTF-8 in LLM SSE stream: {e}"))],
54 true,
55 );
56 }
57 };
58 self.buf.push_str(text);
59 let mut events = Vec::new();
60
61 while let Some(newline_pos) = self.buf.find('\n') {
62 let line = self.buf[..newline_pos].trim_end_matches('\r').to_owned();
63 self.buf.drain(..=newline_pos);
64
65 let Some(data) = line.strip_prefix("data: ") else {
67 continue;
68 };
69
70 match parse_sse_frame(data) {
71 Ok(SseParseResult::Done) => {
72 if !self.done_sent {
73 debug!(
74 "LLM stream ended with [DONE] but no prior finish_reason \
75 — emitting fallback Done"
76 );
77 events.push(Ok(LlmStreamEvent::Done {
78 finish_reason: "stop".to_owned(),
79 }));
80 }
81 self.done_sent = true;
82 return (events, true);
83 }
84 Ok(SseParseResult::Events(parsed_events)) => {
85 for event in parsed_events {
86 if matches!(event, LlmStreamEvent::Done { .. }) {
87 self.done_sent = true;
88 }
89 events.push(Ok(event));
90 }
91 }
92 Err(e) => {
93 events.push(Err(e));
94 return (events, true);
95 }
96 }
97 }
98
99 (events, false)
100 }
101
102 #[must_use]
107 pub fn finish(self) -> Option<LlmStreamEvent> {
108 if self.done_sent {
109 None
110 } else {
111 debug!("LLM byte-stream ended without [DONE] sentinel — emitting fallback Done");
112 Some(LlmStreamEvent::Done {
113 finish_reason: "stop".to_owned(),
114 })
115 }
116 }
117}
118
119#[cfg(test)]
124mod tests {
125 use anyhow::Result;
126
127 use super::SseStreamDecoder;
128 use crate::LlmStreamEvent;
129
130 fn text_delta_frame(text: &str) -> String {
131 let json = serde_json::json!({
132 "choices": [{
133 "delta": { "content": text },
134 "finish_reason": null
135 }]
136 });
137 format!("data: {json}\n")
138 }
139
140 fn done_frame() -> &'static str {
141 "data: [DONE]\n"
142 }
143
144 fn finish_reason_frame() -> String {
145 let json = serde_json::json!({
146 "choices": [{
147 "delta": {},
148 "finish_reason": "stop"
149 }]
150 });
151 format!("data: {json}\n")
152 }
153
154 fn collect_all(decoder: &mut SseStreamDecoder, input: &str) -> (Vec<LlmStreamEvent>, bool) {
157 let (raw, stop) = decoder.feed_bytes(input.as_bytes());
158 let events: Vec<_> = raw.into_iter().map(Result::unwrap).collect();
159 (events, stop)
160 }
161
162 #[test]
165 fn text_delta_is_emitted() {
166 let mut dec = SseStreamDecoder::default();
167 let (events, stop) = collect_all(&mut dec, &text_delta_frame("hello"));
168 assert!(!stop);
169 assert!(
170 events
171 .iter()
172 .any(|e| matches!(e, LlmStreamEvent::TextDelta { content } if content == "hello"))
173 );
174 }
175
176 #[test]
177 fn done_sentinel_signals_stop_and_emits_fallback() {
178 let mut dec = SseStreamDecoder::default();
179 let (events, stop) = collect_all(&mut dec, done_frame());
180 assert!(stop, "decoder should signal stop on [DONE]");
181 assert!(
182 events
183 .iter()
184 .any(|e| matches!(e, LlmStreamEvent::Done { .. })),
185 "fallback Done should be emitted when no prior finish_reason"
186 );
187 assert!(
188 dec.finish().is_none(),
189 "finish() must return None after a [DONE] sentinel — done_sent must be set"
190 );
191 }
192
193 #[test]
194 fn finish_reason_then_done_no_duplicate_done() {
195 let mut dec = SseStreamDecoder::default();
196 let input = format!("{}{}", finish_reason_frame(), done_frame());
197 let (events, stop) = collect_all(&mut dec, &input);
198 assert!(stop);
199 let done_count = events
200 .iter()
201 .filter(|e| matches!(e, LlmStreamEvent::Done { .. }))
202 .count();
203 assert_eq!(done_count, 1, "exactly one Done should be emitted");
204 }
205
206 #[test]
207 fn finish_emits_fallback_when_stream_ends_without_done() {
208 let mut dec = SseStreamDecoder::default();
209 let _ = collect_all(&mut dec, &text_delta_frame("partial"));
210 let fallback = dec.finish();
211 assert!(
212 fallback.is_some(),
213 "finish() should return a fallback Done when stream ends without one"
214 );
215 }
216
217 #[test]
218 fn finish_returns_none_when_done_already_sent() {
219 let mut dec = SseStreamDecoder::default();
220 let _ = collect_all(&mut dec, &finish_reason_frame());
221 assert!(
222 dec.finish().is_none(),
223 "finish() must not emit a second Done"
224 );
225 }
226
227 #[test]
228 fn partial_line_buffered_until_newline_arrives() {
229 let mut dec = SseStreamDecoder::default();
230 let full_frame = text_delta_frame("world");
231
232 let mid = full_frame.len() / 2;
233 let (first_events, stop1) = collect_all(&mut dec, &full_frame[..mid]);
234 assert!(!stop1);
235 assert!(first_events.is_empty(), "no complete line yet");
236
237 let (second_events, stop2) = collect_all(&mut dec, &full_frame[mid..]);
238 assert!(!stop2);
239 assert!(
240 second_events
241 .iter()
242 .any(|e| matches!(e, LlmStreamEvent::TextDelta { .. })),
243 "TextDelta should be emitted once the newline arrives"
244 );
245 }
246}