1use anyhow::{Result, anyhow};
17
18use crate::domain::agent::LlmStreamEvent;
19
20#[derive(Debug)]
26pub enum SseParseResult {
27 Done,
29 Events(Vec<LlmStreamEvent>),
31}
32
33pub fn parse_sse_frame(data: &str) -> Result<SseParseResult> {
49 if data == "[DONE]" {
50 return Ok(SseParseResult::Done);
51 }
52
53 let parsed: serde_json::Value = serde_json::from_str(data)
54 .map_err(|e| anyhow!("SSE frame JSON parse error: {e} — data: {data}"))?;
55
56 if let Some(pp) = parsed.get("prompt_progress") {
61 let processed = u32::try_from(pp["processed"].as_u64().unwrap_or(0)).unwrap_or(u32::MAX);
62 let total = u32::try_from(pp["total"].as_u64().unwrap_or(0)).unwrap_or(u32::MAX);
63 let cached = u32::try_from(pp["cache"].as_u64().unwrap_or(0)).unwrap_or(u32::MAX);
64 let time_ms = pp["time_ms"].as_u64().unwrap_or(0);
65 return Ok(SseParseResult::Events(vec![
66 LlmStreamEvent::PromptProgress {
67 processed,
68 total,
69 cached,
70 time_ms,
71 },
72 ]));
73 }
74
75 let choices = &parsed["choices"];
80 if choices.as_array().is_none_or(Vec::is_empty) {
81 tracing::debug!(data = %data, "SSE frame has no 'choices' entries — skipping");
82 return Ok(SseParseResult::Events(vec![]));
83 }
84 let choice = &choices[0];
85 let delta = &choice["delta"];
86
87 let mut events: Vec<LlmStreamEvent> = Vec::new();
88
89 if let Some(reasoning) = delta["reasoning_content"].as_str()
95 && !reasoning.is_empty()
96 {
97 events.push(LlmStreamEvent::ReasoningDelta {
98 content: reasoning.to_owned(),
99 });
100 }
101
102 if let Some(content) = delta["content"].as_str()
104 && !content.is_empty()
105 {
106 events.push(LlmStreamEvent::TextDelta {
107 content: content.to_owned(),
108 });
109 }
110
111 if let Some(tool_calls) = delta["tool_calls"].as_array() {
113 for (sequential, tc) in tool_calls.iter().enumerate() {
114 let index = tc["index"]
120 .as_u64()
121 .and_then(|i| usize::try_from(i).ok())
122 .unwrap_or(sequential);
123 let id = tc["id"].as_str().map(str::to_owned);
124 let name = tc["function"]["name"].as_str().map(str::to_owned);
125 let arguments = tc["function"]["arguments"].as_str().map(str::to_owned);
126 events.push(LlmStreamEvent::ToolCallDelta {
127 index,
128 id,
129 name,
130 arguments,
131 });
132 }
133 }
134
135 if let Some(finish_reason) = choice["finish_reason"].as_str()
137 && !finish_reason.is_empty()
138 {
139 events.push(LlmStreamEvent::Done {
140 finish_reason: finish_reason.to_owned(),
141 });
142 }
143
144 Ok(SseParseResult::Events(events))
145}
146
147#[cfg(test)]
152mod tests {
153 use super::*;
154
155 fn text_frame(content: &str) -> String {
158 serde_json::json!({
159 "choices": [{ "delta": { "content": content }, "finish_reason": null }]
160 })
161 .to_string()
162 }
163
164 fn finish_frame(reason: &str) -> String {
165 serde_json::json!({
166 "choices": [{ "delta": {}, "finish_reason": reason }]
167 })
168 .to_string()
169 }
170
171 fn tool_frame(index: usize, id: &str, name: &str, args: &str) -> String {
172 serde_json::json!({
173 "choices": [{
174 "delta": {
175 "tool_calls": [{
176 "index": index,
177 "id": id,
178 "function": { "name": name, "arguments": args }
179 }]
180 },
181 "finish_reason": null
182 }]
183 })
184 .to_string()
185 }
186
187 fn tool_frame_no_index(id: &str, name: &str, args: &str) -> String {
190 serde_json::json!({
191 "choices": [{
192 "delta": {
193 "tool_calls": [
194 { "id": id, "function": { "name": name, "arguments": args } }
195 ]
196 },
197 "finish_reason": null
198 }]
199 })
200 .to_string()
201 }
202
203 fn two_tool_frames_no_index() -> String {
205 serde_json::json!({
206 "choices": [{
207 "delta": {
208 "tool_calls": [
209 { "id": "c1", "function": { "name": "search", "arguments": "{}" } },
210 { "id": "c2", "function": { "name": "read_file", "arguments": "{}" } }
211 ]
212 },
213 "finish_reason": null
214 }]
215 })
216 .to_string()
217 }
218
219 #[test]
222 fn done_sentinel_returns_done_variant() {
223 assert!(matches!(
224 parse_sse_frame("[DONE]"),
225 Ok(SseParseResult::Done)
226 ));
227 }
228
229 #[test]
230 fn text_delta_frame_produces_text_event() {
231 let events = match parse_sse_frame(&text_frame("hello")) {
232 Ok(SseParseResult::Events(e)) => e,
233 other => panic!("unexpected: {other:?}"),
234 };
235 assert_eq!(events.len(), 1);
236 assert!(matches!(
237 &events[0],
238 LlmStreamEvent::TextDelta { content } if content == "hello"
239 ));
240 }
241
242 #[test]
243 fn empty_content_produces_no_text_event() {
244 let frame = serde_json::json!({
245 "choices": [{ "delta": { "content": "" }, "finish_reason": null }]
246 })
247 .to_string();
248 let events = match parse_sse_frame(&frame) {
249 Ok(SseParseResult::Events(e)) => e,
250 other => panic!("unexpected: {other:?}"),
251 };
252 assert!(
253 events.is_empty(),
254 "empty content should not produce TextDelta"
255 );
256 }
257
258 #[test]
259 fn finish_reason_produces_done_event() {
260 let events = match parse_sse_frame(&finish_frame("stop")) {
261 Ok(SseParseResult::Events(e)) => e,
262 other => panic!("unexpected: {other:?}"),
263 };
264 assert_eq!(events.len(), 1);
265 assert!(matches!(
266 &events[0],
267 LlmStreamEvent::Done { finish_reason } if finish_reason == "stop"
268 ));
269 }
270
271 #[test]
272 fn tool_call_delta_frame_is_parsed() {
273 let events = match parse_sse_frame(&tool_frame(0, "tc1", "search", r#"{"q":"rust"}"#)) {
274 Ok(SseParseResult::Events(e)) => e,
275 other => panic!("unexpected: {other:?}"),
276 };
277 assert_eq!(events.len(), 1);
278 assert!(matches!(
279 &events[0],
280 LlmStreamEvent::ToolCallDelta {
281 index: 0,
282 id: Some(id),
283 name: Some(n),
284 arguments: Some(a),
285 } if id == "tc1" && n == "search" && a == r#"{"q":"rust"}"#
286 ));
287 }
288
289 #[test]
290 fn tool_call_delta_with_no_index_defaults_to_sequential_position() {
291 let events = match parse_sse_frame(&tool_frame_no_index("tc1", "search", r#"{"q":"rust"}"#))
292 {
293 Ok(SseParseResult::Events(e)) => e,
294 other => panic!("unexpected: {other:?}"),
295 };
296 assert_eq!(events.len(), 1);
297 assert!(matches!(
298 &events[0],
299 LlmStreamEvent::ToolCallDelta { index: 0, id: Some(id), .. } if id == "tc1"
300 ));
301 }
302
303 #[test]
304 fn two_tool_calls_with_no_index_get_distinct_sequential_slots() {
305 let events = match parse_sse_frame(&two_tool_frames_no_index()) {
306 Ok(SseParseResult::Events(e)) => e,
307 other => panic!("unexpected: {other:?}"),
308 };
309 assert_eq!(events.len(), 2, "both tool-call deltas must be emitted");
310 assert!(matches!(
311 &events[0],
312 LlmStreamEvent::ToolCallDelta { index: 0, id: Some(id), .. } if id == "c1"
313 ));
314 assert!(matches!(
315 &events[1],
316 LlmStreamEvent::ToolCallDelta { index: 1, id: Some(id), .. } if id == "c2"
317 ));
318 }
319
320 #[test]
321 fn malformed_json_returns_error() {
322 assert!(
323 parse_sse_frame("{ broken json }").is_err(),
324 "malformed JSON should return Err"
325 );
326 }
327
328 #[test]
329 fn frame_with_text_and_finish_reason_produces_both_events() {
330 let frame = serde_json::json!({
331 "choices": [{ "delta": { "content": "hi" }, "finish_reason": "stop" }]
332 })
333 .to_string();
334 let events = match parse_sse_frame(&frame) {
335 Ok(SseParseResult::Events(e)) => e,
336 other => panic!("unexpected: {other:?}"),
337 };
338 assert_eq!(events.len(), 2);
339 assert!(matches!(&events[0], LlmStreamEvent::TextDelta { .. }));
340 assert!(matches!(&events[1], LlmStreamEvent::Done { .. }));
341 }
342
343 #[test]
344 fn reasoning_content_produces_reasoning_delta_event() {
345 let frame = serde_json::json!({
346 "choices": [{ "delta": { "reasoning_content": "I should check..." }, "finish_reason": null }]
347 })
348 .to_string();
349 let events = match parse_sse_frame(&frame) {
350 Ok(SseParseResult::Events(e)) => e,
351 other => panic!("unexpected: {other:?}"),
352 };
353 assert_eq!(events.len(), 1);
354 assert!(matches!(
355 &events[0],
356 LlmStreamEvent::ReasoningDelta { content } if content == "I should check..."
357 ));
358 }
359
360 #[test]
361 fn empty_reasoning_content_produces_no_event() {
362 let frame = serde_json::json!({
363 "choices": [{ "delta": { "reasoning_content": "" }, "finish_reason": null }]
364 })
365 .to_string();
366 let events = match parse_sse_frame(&frame) {
367 Ok(SseParseResult::Events(e)) => e,
368 other => panic!("unexpected: {other:?}"),
369 };
370 assert!(
371 events.is_empty(),
372 "empty reasoning_content should not produce ReasoningDelta"
373 );
374 }
375
376 #[test]
377 fn frame_with_reasoning_and_text_reasoning_emitted_first() {
378 let frame = serde_json::json!({
379 "choices": [{ "delta": { "content": "ok", "reasoning_content": "think" }, "finish_reason": null }]
380 })
381 .to_string();
382 let events = match parse_sse_frame(&frame) {
383 Ok(SseParseResult::Events(e)) => e,
384 other => panic!("unexpected: {other:?}"),
385 };
386 assert_eq!(events.len(), 2);
387 assert!(
388 matches!(&events[0], LlmStreamEvent::ReasoningDelta { content } if content == "think"),
389 "ReasoningDelta must come first"
390 );
391 assert!(
392 matches!(&events[1], LlmStreamEvent::TextDelta { content } if content == "ok"),
393 "TextDelta must come second"
394 );
395 }
396
397 #[test]
398 fn prompt_progress_frame_produces_progress_event() {
399 let frame = serde_json::json!({
400 "prompt_progress": {
401 "processed": 2048,
402 "total": 8192,
403 "cache": 512,
404 "time_ms": 1234
405 }
406 })
407 .to_string();
408 let events = match parse_sse_frame(&frame) {
409 Ok(SseParseResult::Events(e)) => e,
410 other => panic!("unexpected: {other:?}"),
411 };
412 assert_eq!(events.len(), 1);
413 assert!(matches!(
414 &events[0],
415 LlmStreamEvent::PromptProgress {
416 processed: 2048,
417 total: 8192,
418 cached: 512,
419 time_ms: 1234
420 }
421 ));
422 }
423
424 #[test]
425 fn prompt_progress_frame_not_confused_with_choices() {
426 let frame = serde_json::json!({
427 "prompt_progress": {
428 "processed": 100,
429 "total": 100,
430 "cache": 0,
431 "time_ms": 50
432 }
433 })
434 .to_string();
435 let events = match parse_sse_frame(&frame) {
436 Ok(SseParseResult::Events(e)) => e,
437 other => panic!("unexpected: {other:?}"),
438 };
439 assert!(
440 !events.is_empty(),
441 "prompt_progress frame must not be skipped"
442 );
443 }
444}