gglib_core/domain/thinking/
accumulator.rs

1//! Stateful streaming accumulator for thinking-tag detection.
2//!
3//! An LLM may stream `<` in one SSE chunk, `thi` in the next, and `nk>` in a
4//! third.  [`ThinkingAccumulator`] handles this by buffering partial tags
5//! and only emitting classified [`ThinkingEvent`]s once enough bytes have
6//! arrived to decide.
7
8use std::fmt;
9
10use super::normalize::normalize_thinking_tags;
11use super::types::ThinkingEvent;
12
13/// Internal state of the accumulator FSM.
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15enum AccState {
16    /// Haven't seen the opening `<think…>` yet (may be buffering a partial tag).
17    AwaitingOpen,
18    /// Inside the thinking block, forwarding as `ThinkingDelta`.
19    InsideThinking,
20    /// Past the closing `</think>`, forwarding as `ContentDelta`.
21    ContentPhase,
22}
23
24/// Stateful accumulator for streaming thinking-tag detection.
25///
26/// Feed SSE text deltas one-by-one via [`push`](Self::push) and collect the
27/// returned [`ThinkingEvent`]s.  The accumulator handles tags that are split
28/// arbitrarily across chunks.
29///
30/// # Example
31///
32/// ```
33/// use gglib_core::domain::thinking::{ThinkingAccumulator, ThinkingEvent};
34///
35/// let mut acc = ThinkingAccumulator::new();
36///
37/// // Tag arrives split across three chunks
38/// let e1 = acc.push("<thi");
39/// assert!(e1.is_empty()); // buffered
40///
41/// let e2 = acc.push("nk>");
42/// assert_eq!(e2, vec![]); // open tag consumed, no content yet
43///
44/// let e3 = acc.push("hmm");
45/// assert_eq!(e3, vec![ThinkingEvent::ThinkingDelta("hmm".into())]);
46///
47/// let e4 = acc.push("</think>");
48/// assert_eq!(e4, vec![ThinkingEvent::ThinkingEnd]);
49///
50/// let e5 = acc.push("Hello!");
51/// assert_eq!(e5, vec![ThinkingEvent::ContentDelta("Hello!".into())]);
52/// ```
53pub struct ThinkingAccumulator {
54    state: AccState,
55    /// Buffer for bytes that *might* be part of an opening or closing tag.
56    buf: String,
57}
58
59impl fmt::Debug for ThinkingAccumulator {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        f.debug_struct("ThinkingAccumulator")
62            .field("state", &self.state)
63            .field("buf_len", &self.buf.len())
64            .finish()
65    }
66}
67
68/// All possible opening-tag prefixes after normalisation (lowercase).
69/// Used to determine whether the buffer *could* still become a valid tag.
70const OPEN_TAG_PREFIX: &str = "<think";
71
72/// The canonical closing tag (lowercase for matching).
73const CLOSE_TAG: &str = "</think>";
74
75impl ThinkingAccumulator {
76    /// Create a new accumulator in the initial state.
77    #[must_use]
78    pub const fn new() -> Self {
79        Self {
80            state: AccState::AwaitingOpen,
81            buf: String::new(),
82        }
83    }
84
85    /// Feed a new text chunk and return any events produced.
86    pub fn push(&mut self, chunk: &str) -> Vec<ThinkingEvent> {
87        if chunk.is_empty() {
88            return vec![];
89        }
90        match self.state {
91            AccState::AwaitingOpen => self.handle_awaiting_open(chunk),
92            AccState::InsideThinking => self.handle_inside_thinking(chunk),
93            AccState::ContentPhase => Self::handle_content(chunk),
94        }
95    }
96
97    /// Flush any remaining buffered content.
98    ///
99    /// Call this when the stream ends to emit any buffered text that turned
100    /// out not to be a tag.
101    pub fn flush(&mut self) -> Vec<ThinkingEvent> {
102        if self.buf.is_empty() {
103            return vec![];
104        }
105
106        let text = std::mem::take(&mut self.buf);
107        match self.state {
108            AccState::AwaitingOpen => {
109                // Never completed an open tag — treat as plain content.
110                self.state = AccState::ContentPhase;
111                vec![ThinkingEvent::ContentDelta(text)]
112            }
113            AccState::InsideThinking => {
114                // Stream ended mid-thinking — flush as thinking.
115                vec![ThinkingEvent::ThinkingDelta(text)]
116            }
117            AccState::ContentPhase => {
118                vec![ThinkingEvent::ContentDelta(text)]
119            }
120        }
121    }
122
123    // -- Private state handlers -----------------------------------------------
124
125    fn handle_awaiting_open(&mut self, chunk: &str) -> Vec<ThinkingEvent> {
126        // Normalise variant tags in the incoming chunk.
127        let normalized = normalize_thinking_tags(chunk);
128        self.buf.push_str(&normalized);
129
130        let lower = self.buf.to_lowercase();
131
132        // 1. Check if buffer contains a complete open tag (with closing '>').
133        if let Some(gt_pos) = Self::find_open_tag_end(&lower) {
134            // We have a complete open tag.  Anything before `<think` is content.
135            let lt_pos = lower.find(OPEN_TAG_PREFIX).unwrap_or(0);
136            let mut events = vec![];
137
138            if lt_pos > 0 {
139                let before = self.buf[..lt_pos].to_string();
140                if !before.is_empty() {
141                    // Text before thinking tag goes to content.
142                    self.state = AccState::ContentPhase;
143                    events.push(ThinkingEvent::ContentDelta(before));
144                    // Actually, if there's content before <think>, the thinking
145                    // tag is not at the start — treat entire thing as content.
146                    let rest = self.buf[lt_pos..].to_string();
147                    events.push(ThinkingEvent::ContentDelta(rest));
148                    self.buf.clear();
149                    return events;
150                }
151            }
152
153            // Consume the open tag and move to InsideThinking.
154            let after = self.buf[gt_pos + 1..].to_string();
155            self.buf.clear();
156            self.state = AccState::InsideThinking;
157
158            // Any text after the '>' is the first thinking chunk.
159            if !after.is_empty() {
160                events.extend(self.handle_inside_thinking(&after));
161            }
162
163            return events;
164        }
165
166        // 2. Check if buffer could still be a prefix of `<think...>`.
167        if is_potential_open_tag_prefix(&lower) {
168            // Keep buffering.
169            return vec![];
170        }
171
172        // 3. Not a thinking tag — flush buffer as content.
173        let text = std::mem::take(&mut self.buf);
174        self.state = AccState::ContentPhase;
175        vec![ThinkingEvent::ContentDelta(text)]
176    }
177
178    fn handle_inside_thinking(&mut self, chunk: &str) -> Vec<ThinkingEvent> {
179        // Normalise variant close tags (e.g. </reasoning> → </think>).
180        let normalized = normalize_thinking_tags(chunk);
181        self.buf.push_str(&normalized);
182
183        let mut events = vec![];
184
185        let lower = self.buf.to_lowercase();
186
187        // Look for </think> in the buffer.
188        if let Some(pos) = lower.find(CLOSE_TAG) {
189            // Everything before the close tag is thinking content.
190            let thinking = self.buf[..pos].to_string();
191            let after = self.buf[pos + CLOSE_TAG.len()..].to_string();
192            self.buf.clear();
193
194            if !thinking.is_empty() {
195                events.push(ThinkingEvent::ThinkingDelta(thinking));
196            }
197            events.push(ThinkingEvent::ThinkingEnd);
198
199            self.state = AccState::ContentPhase;
200            if !after.is_empty() {
201                events.push(ThinkingEvent::ContentDelta(after));
202            }
203            return events;
204        }
205
206        // Check if buffer ends with a potential partial `</think>` tag.
207        // e.g. buffer ends with `</thi` — we can't emit that yet.
208        let safe_emit_len = safe_thinking_emit_len(&self.buf);
209
210        if safe_emit_len > 0 {
211            let to_emit = self.buf[..safe_emit_len].to_string();
212            let remainder = self.buf[safe_emit_len..].to_string();
213            self.buf = remainder;
214            events.push(ThinkingEvent::ThinkingDelta(to_emit));
215        }
216
217        events
218    }
219
220    fn handle_content(chunk: &str) -> Vec<ThinkingEvent> {
221        if chunk.is_empty() {
222            return vec![];
223        }
224        vec![ThinkingEvent::ContentDelta(chunk.to_string())]
225    }
226
227    /// Find the position of `>` that closes an opening `<think...>` tag.
228    ///
229    /// Returns the byte index of `>` in `lower`.
230    fn find_open_tag_end(lower: &str) -> Option<usize> {
231        if !lower.starts_with(OPEN_TAG_PREFIX) {
232            return None;
233        }
234        // Find first '>' after "<think"
235        lower[OPEN_TAG_PREFIX.len()..]
236            .find('>')
237            .map(|p| OPEN_TAG_PREFIX.len() + p)
238    }
239}
240
241impl Default for ThinkingAccumulator {
242    fn default() -> Self {
243        Self::new()
244    }
245}
246
247// ---------------------------------------------------------------------------
248// Private helpers
249// ---------------------------------------------------------------------------
250
251/// Check whether `s` is a valid prefix of any opening tag we recognise.
252///
253/// This includes the full `<think...>` pattern as well as just `<`, `<t`,
254/// `<th`, etc.  Used to decide whether to keep buffering.
255fn is_potential_open_tag_prefix(s: &str) -> bool {
256    let s = s.trim_start();
257    if s.is_empty() {
258        return false;
259    }
260
261    // Check if s is a prefix of "<think" (possibly followed by attrs + ">")
262    let tag = OPEN_TAG_PREFIX;
263    if s.len() <= tag.len() {
264        return tag.starts_with(s);
265    }
266
267    // s is longer than "<think" — it must start with "<think" and be
268    // waiting for the closing '>'.
269    if !s.starts_with(tag) {
270        return false;
271    }
272
273    // After "<think" we allow optional attributes until '>'.
274    !s[tag.len()..].contains('>')
275}
276
277/// Return how many bytes from the start of `buf` are safe to emit as
278/// thinking content — i.e. bytes that cannot be part of a `</think>` tag.
279fn safe_thinking_emit_len(buf: &str) -> usize {
280    let lower = buf.to_lowercase();
281    // Find the earliest position where a potential closing tag could start.
282    // A closing tag starts with '<'.  We need to check if '<' at position i
283    // could be the start of "</think>".
284    for (i, _) in lower.char_indices().rev() {
285        if lower[i..].starts_with('<') {
286            let tail = &lower[i..];
287            // Check if tail is a prefix of "</think>"
288            if CLOSE_TAG.starts_with(tail) {
289                // This '<' might be the start of a closing tag.
290                return i;
291            }
292        }
293    }
294    // No potential closing tag fragment found — entire buffer is safe.
295    buf.len()
296}