gglib_core/domain/thinking/accumulator.rs
1//! Stateful streaming accumulator for thinking-tag detection.
2//!
3//! An LLM may stream `<` in one SSE chunk, `thi` in the next, and `nk>` in a
4//! third. [`ThinkingAccumulator`] handles this by buffering partial tags
5//! and only emitting classified [`ThinkingEvent`]s once enough bytes have
6//! arrived to decide.
7
8use std::fmt;
9
10use super::normalize::normalize_thinking_tags;
11use super::types::ThinkingEvent;
12
13/// Internal state of the accumulator FSM.
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15enum AccState {
16 /// Haven't seen the opening `<think…>` yet (may be buffering a partial tag).
17 AwaitingOpen,
18 /// Inside the thinking block, forwarding as `ThinkingDelta`.
19 InsideThinking,
20 /// Past the closing `</think>`, forwarding as `ContentDelta`.
21 ContentPhase,
22}
23
24/// Stateful accumulator for streaming thinking-tag detection.
25///
26/// Feed SSE text deltas one-by-one via [`push`](Self::push) and collect the
27/// returned [`ThinkingEvent`]s. The accumulator handles tags that are split
28/// arbitrarily across chunks.
29///
30/// # Example
31///
32/// ```
33/// use gglib_core::domain::thinking::{ThinkingAccumulator, ThinkingEvent};
34///
35/// let mut acc = ThinkingAccumulator::new();
36///
37/// // Tag arrives split across three chunks
38/// let e1 = acc.push("<thi");
39/// assert!(e1.is_empty()); // buffered
40///
41/// let e2 = acc.push("nk>");
42/// assert_eq!(e2, vec![]); // open tag consumed, no content yet
43///
44/// let e3 = acc.push("hmm");
45/// assert_eq!(e3, vec![ThinkingEvent::ThinkingDelta("hmm".into())]);
46///
47/// let e4 = acc.push("</think>");
48/// assert_eq!(e4, vec![ThinkingEvent::ThinkingEnd]);
49///
50/// let e5 = acc.push("Hello!");
51/// assert_eq!(e5, vec![ThinkingEvent::ContentDelta("Hello!".into())]);
52/// ```
53pub struct ThinkingAccumulator {
54 state: AccState,
55 /// Buffer for bytes that *might* be part of an opening or closing tag.
56 buf: String,
57}
58
59impl fmt::Debug for ThinkingAccumulator {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 f.debug_struct("ThinkingAccumulator")
62 .field("state", &self.state)
63 .field("buf_len", &self.buf.len())
64 .finish()
65 }
66}
67
68/// All possible opening-tag prefixes after normalisation (lowercase).
69/// Used to determine whether the buffer *could* still become a valid tag.
70const OPEN_TAG_PREFIX: &str = "<think";
71
72/// The canonical closing tag (lowercase for matching).
73const CLOSE_TAG: &str = "</think>";
74
75impl ThinkingAccumulator {
76 /// Create a new accumulator in the initial state.
77 #[must_use]
78 pub const fn new() -> Self {
79 Self {
80 state: AccState::AwaitingOpen,
81 buf: String::new(),
82 }
83 }
84
85 /// Feed a new text chunk and return any events produced.
86 pub fn push(&mut self, chunk: &str) -> Vec<ThinkingEvent> {
87 if chunk.is_empty() {
88 return vec![];
89 }
90 match self.state {
91 AccState::AwaitingOpen => self.handle_awaiting_open(chunk),
92 AccState::InsideThinking => self.handle_inside_thinking(chunk),
93 AccState::ContentPhase => Self::handle_content(chunk),
94 }
95 }
96
97 /// Flush any remaining buffered content.
98 ///
99 /// Call this when the stream ends to emit any buffered text that turned
100 /// out not to be a tag.
101 pub fn flush(&mut self) -> Vec<ThinkingEvent> {
102 if self.buf.is_empty() {
103 return vec![];
104 }
105
106 let text = std::mem::take(&mut self.buf);
107 match self.state {
108 AccState::AwaitingOpen => {
109 // Never completed an open tag — treat as plain content.
110 self.state = AccState::ContentPhase;
111 vec![ThinkingEvent::ContentDelta(text)]
112 }
113 AccState::InsideThinking => {
114 // Stream ended mid-thinking — flush as thinking.
115 vec![ThinkingEvent::ThinkingDelta(text)]
116 }
117 AccState::ContentPhase => {
118 vec![ThinkingEvent::ContentDelta(text)]
119 }
120 }
121 }
122
123 // -- Private state handlers -----------------------------------------------
124
125 fn handle_awaiting_open(&mut self, chunk: &str) -> Vec<ThinkingEvent> {
126 // Normalise variant tags in the incoming chunk.
127 let normalized = normalize_thinking_tags(chunk);
128 self.buf.push_str(&normalized);
129
130 let lower = self.buf.to_lowercase();
131
132 // 1. Check if buffer contains a complete open tag (with closing '>').
133 if let Some(gt_pos) = Self::find_open_tag_end(&lower) {
134 // We have a complete open tag. Anything before `<think` is content.
135 let lt_pos = lower.find(OPEN_TAG_PREFIX).unwrap_or(0);
136 let mut events = vec![];
137
138 if lt_pos > 0 {
139 let before = self.buf[..lt_pos].to_string();
140 if !before.is_empty() {
141 // Text before thinking tag goes to content.
142 self.state = AccState::ContentPhase;
143 events.push(ThinkingEvent::ContentDelta(before));
144 // Actually, if there's content before <think>, the thinking
145 // tag is not at the start — treat entire thing as content.
146 let rest = self.buf[lt_pos..].to_string();
147 events.push(ThinkingEvent::ContentDelta(rest));
148 self.buf.clear();
149 return events;
150 }
151 }
152
153 // Consume the open tag and move to InsideThinking.
154 let after = self.buf[gt_pos + 1..].to_string();
155 self.buf.clear();
156 self.state = AccState::InsideThinking;
157
158 // Any text after the '>' is the first thinking chunk.
159 if !after.is_empty() {
160 events.extend(self.handle_inside_thinking(&after));
161 }
162
163 return events;
164 }
165
166 // 2. Check if buffer could still be a prefix of `<think...>`.
167 if is_potential_open_tag_prefix(&lower) {
168 // Keep buffering.
169 return vec![];
170 }
171
172 // 3. Not a thinking tag — flush buffer as content.
173 let text = std::mem::take(&mut self.buf);
174 self.state = AccState::ContentPhase;
175 vec![ThinkingEvent::ContentDelta(text)]
176 }
177
178 fn handle_inside_thinking(&mut self, chunk: &str) -> Vec<ThinkingEvent> {
179 // Normalise variant close tags (e.g. </reasoning> → </think>).
180 let normalized = normalize_thinking_tags(chunk);
181 self.buf.push_str(&normalized);
182
183 let mut events = vec![];
184
185 let lower = self.buf.to_lowercase();
186
187 // Look for </think> in the buffer.
188 if let Some(pos) = lower.find(CLOSE_TAG) {
189 // Everything before the close tag is thinking content.
190 let thinking = self.buf[..pos].to_string();
191 let after = self.buf[pos + CLOSE_TAG.len()..].to_string();
192 self.buf.clear();
193
194 if !thinking.is_empty() {
195 events.push(ThinkingEvent::ThinkingDelta(thinking));
196 }
197 events.push(ThinkingEvent::ThinkingEnd);
198
199 self.state = AccState::ContentPhase;
200 if !after.is_empty() {
201 events.push(ThinkingEvent::ContentDelta(after));
202 }
203 return events;
204 }
205
206 // Check if buffer ends with a potential partial `</think>` tag.
207 // e.g. buffer ends with `</thi` — we can't emit that yet.
208 let safe_emit_len = safe_thinking_emit_len(&self.buf);
209
210 if safe_emit_len > 0 {
211 let to_emit = self.buf[..safe_emit_len].to_string();
212 let remainder = self.buf[safe_emit_len..].to_string();
213 self.buf = remainder;
214 events.push(ThinkingEvent::ThinkingDelta(to_emit));
215 }
216
217 events
218 }
219
220 fn handle_content(chunk: &str) -> Vec<ThinkingEvent> {
221 if chunk.is_empty() {
222 return vec![];
223 }
224 vec![ThinkingEvent::ContentDelta(chunk.to_string())]
225 }
226
227 /// Find the position of `>` that closes an opening `<think...>` tag.
228 ///
229 /// Returns the byte index of `>` in `lower`.
230 fn find_open_tag_end(lower: &str) -> Option<usize> {
231 if !lower.starts_with(OPEN_TAG_PREFIX) {
232 return None;
233 }
234 // Find first '>' after "<think"
235 lower[OPEN_TAG_PREFIX.len()..]
236 .find('>')
237 .map(|p| OPEN_TAG_PREFIX.len() + p)
238 }
239}
240
241impl Default for ThinkingAccumulator {
242 fn default() -> Self {
243 Self::new()
244 }
245}
246
247// ---------------------------------------------------------------------------
248// Private helpers
249// ---------------------------------------------------------------------------
250
251/// Check whether `s` is a valid prefix of any opening tag we recognise.
252///
253/// This includes the full `<think...>` pattern as well as just `<`, `<t`,
254/// `<th`, etc. Used to decide whether to keep buffering.
255fn is_potential_open_tag_prefix(s: &str) -> bool {
256 let s = s.trim_start();
257 if s.is_empty() {
258 return false;
259 }
260
261 // Check if s is a prefix of "<think" (possibly followed by attrs + ">")
262 let tag = OPEN_TAG_PREFIX;
263 if s.len() <= tag.len() {
264 return tag.starts_with(s);
265 }
266
267 // s is longer than "<think" — it must start with "<think" and be
268 // waiting for the closing '>'.
269 if !s.starts_with(tag) {
270 return false;
271 }
272
273 // After "<think" we allow optional attributes until '>'.
274 !s[tag.len()..].contains('>')
275}
276
277/// Return how many bytes from the start of `buf` are safe to emit as
278/// thinking content — i.e. bytes that cannot be part of a `</think>` tag.
279fn safe_thinking_emit_len(buf: &str) -> usize {
280 let lower = buf.to_lowercase();
281 // Find the earliest position where a potential closing tag could start.
282 // A closing tag starts with '<'. We need to check if '<' at position i
283 // could be the start of "</think>".
284 for (i, _) in lower.char_indices().rev() {
285 if lower[i..].starts_with('<') {
286 let tail = &lower[i..];
287 // Check if tail is a prefix of "</think>"
288 if CLOSE_TAG.starts_with(tail) {
289 // This '<' might be the start of a closing tag.
290 return i;
291 }
292 }
293 }
294 // No potential closing tag fragment found — entire buffer is safe.
295 buf.len()
296}