Skip to content

Commit cf86a55

Browse files
committed
protocol: Fix multi-row decode
1 parent 7b31e13 commit cf86a55

File tree

2 files changed

+63
-38
lines changed

2 files changed

+63
-38
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
All changes in this project will be noted in this file.
44

5+
## 0.8.10
6+
7+
### Fixes
8+
9+
- Fixed decode of multi-row responses
10+
511
## 0.8.9
612

713
### Additions

src/protocol/mod.rs

Lines changed: 57 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -218,21 +218,17 @@ impl<T, U: ProtocolObjectState<Value = T>> ProtocolObjectDecodeState<T, U> {
218218
decoder: &mut Decoder,
219219
) -> ProtocolResult<ProtocolObjectDecodeState<T, U>> {
220220
match self {
221-
Self::Completed(c) => Ok(ProtocolObjectDecodeState::Completed(c)),
221+
Self::Completed(c) => Ok(Self::Completed(c)),
222222
Self::Pending(pv) => match pv.complete(decoder)? {
223-
ProtocolObjectDecodeState::Completed(c) => {
224-
Ok(ProtocolObjectDecodeState::Completed(c.into_value()))
225-
}
226-
ProtocolObjectDecodeState::Pending(pv) => {
227-
Ok(ProtocolObjectDecodeState::Pending(pv))
228-
}
223+
ProtocolObjectDecodeState::Completed(c) => Ok(Self::Completed(c.into_value())),
224+
ProtocolObjectDecodeState::Pending(pv) => Ok(Self::Pending(pv)),
229225
},
230226
}
231227
}
232228
}
233229

234230
#[cfg(test)]
235-
impl<T: ProtocolObjectState> ProtocolObjectDecodeState<T> {
231+
impl<T: ProtocolObjectState + core::fmt::Debug> ProtocolObjectDecodeState<T> {
236232
fn into_completed(self) -> Option<T> {
237233
match self {
238234
Self::Completed(c) => Some(c),
@@ -279,7 +275,7 @@ impl<T: LfsObject> ProtocolObjectState for LfsValue<T> {
279275
) -> ProtocolResult<ProtocolObjectDecodeState<Self>> {
280276
let mut stop = decoder.cursor_eq(b'\n');
281277
let mut error = false;
282-
while !decoder.eof() && !error && !stop {
278+
while (!decoder.eof()) && (!error) && (!stop) {
283279
let byte = decoder.next();
284280
error = !self.v.update(&mut self.state, byte);
285281
stop = decoder.cursor_eq(b'\n');
@@ -587,32 +583,17 @@ impl AsValueStream for Row {
587583

588584
#[derive(Debug, PartialEq)]
589585
pub(crate) struct ValueStream {
590-
size: ProtocolObjectDecodeState<usize, LfsValue<usize>>,
586+
element_count: ProtocolObjectDecodeState<usize, LfsValue<usize>>,
591587
items: Vec<Value>,
592588
pending: Option<Box<PendingValue>>,
593589
}
594590

595-
impl ProtocolObjectState for ValueStream {
596-
type Value = Vec<Value>;
597-
fn initialize(decoder: &Decoder) -> Self {
598-
Self {
599-
size: ProtocolObjectDecodeState::Pending(LfsValue::initialize(decoder)),
600-
items: vec![],
601-
pending: None,
602-
}
603-
}
604-
fn complete(
591+
impl ValueStream {
592+
fn _complete(
605593
mut self,
606594
decoder: &mut Decoder,
595+
size: usize,
607596
) -> ProtocolResult<ProtocolObjectDecodeState<Self>> {
608-
let size = match self.size.try_complete(decoder)? {
609-
ProtocolObjectDecodeState::Completed(c) => c,
610-
ProtocolObjectDecodeState::Pending(pv) => {
611-
self.size = ProtocolObjectDecodeState::Pending(pv);
612-
return Ok(ProtocolObjectDecodeState::Pending(self));
613-
}
614-
};
615-
self.size = ProtocolObjectDecodeState::Completed(size);
616597
while self.items.len() != size {
617598
if decoder.eof() {
618599
return Ok(ProtocolObjectDecodeState::Pending(self));
@@ -633,6 +614,31 @@ impl ProtocolObjectState for ValueStream {
633614
}
634615
Ok(ProtocolObjectDecodeState::Completed(self))
635616
}
617+
}
618+
619+
impl ProtocolObjectState for ValueStream {
620+
type Value = Vec<Value>;
621+
fn initialize(decoder: &Decoder) -> Self {
622+
Self {
623+
element_count: ProtocolObjectDecodeState::Pending(LfsValue::initialize(decoder)),
624+
items: vec![],
625+
pending: None,
626+
}
627+
}
628+
fn complete(
629+
mut self,
630+
decoder: &mut Decoder,
631+
) -> ProtocolResult<ProtocolObjectDecodeState<Self>> {
632+
let size = match self.element_count.try_complete(decoder)? {
633+
ProtocolObjectDecodeState::Completed(c) => c,
634+
ProtocolObjectDecodeState::Pending(pv) => {
635+
self.element_count = ProtocolObjectDecodeState::Pending(pv);
636+
return Ok(ProtocolObjectDecodeState::Pending(self));
637+
}
638+
};
639+
self.element_count = ProtocolObjectDecodeState::Completed(size);
640+
self._complete(decoder, size)
641+
}
636642
fn into_value(self) -> Self::Value {
637643
self.items
638644
}
@@ -644,7 +650,8 @@ impl ProtocolObjectState for ValueStream {
644650

645651
#[derive(Debug, PartialEq)]
646652
pub(crate) struct MultiValueStream {
647-
count: ProtocolObjectDecodeState<usize, LfsValue<usize>>,
653+
stream_count: ProtocolObjectDecodeState<usize, LfsValue<usize>>,
654+
stream_size: ProtocolObjectDecodeState<usize, LfsValue<usize>>,
648655
items: Vec<Vec<Value>>,
649656
pending: Option<ValueStream>,
650657
}
@@ -653,7 +660,8 @@ impl ProtocolObjectState for MultiValueStream {
653660
type Value = Vec<Vec<Value>>;
654661
fn initialize(decoder: &Decoder) -> Self {
655662
Self {
656-
count: ProtocolObjectDecodeState::Pending(LfsValue::initialize(decoder)),
663+
stream_count: ProtocolObjectDecodeState::Pending(LfsValue::initialize(decoder)),
664+
stream_size: ProtocolObjectDecodeState::Pending(LfsValue::initialize(decoder)),
657665
items: vec![],
658666
pending: None,
659667
}
@@ -662,18 +670,29 @@ impl ProtocolObjectState for MultiValueStream {
662670
mut self,
663671
decoder: &mut Decoder,
664672
) -> ProtocolResult<ProtocolObjectDecodeState<Self>> {
665-
let size = match self.count.try_complete(decoder)? {
673+
// get number of streams
674+
let stream_count = match self.stream_count.try_complete(decoder)? {
666675
ProtocolObjectDecodeState::Completed(sz) => sz,
667676
ProtocolObjectDecodeState::Pending(pv) => {
668-
self.count = ProtocolObjectDecodeState::Pending(pv);
677+
self.stream_count = ProtocolObjectDecodeState::Pending(pv);
669678
return Ok(ProtocolObjectDecodeState::Pending(self));
670679
}
671680
};
672-
self.count = ProtocolObjectDecodeState::Completed(size);
673-
while self.items.len() != size {
681+
self.stream_count = ProtocolObjectDecodeState::Completed(stream_count);
682+
// get per stream size
683+
let stream_size = match self.stream_size.try_complete(decoder)? {
684+
ProtocolObjectDecodeState::Completed(sz) => sz,
685+
ProtocolObjectDecodeState::Pending(pv) => {
686+
self.stream_size = ProtocolObjectDecodeState::Pending(pv);
687+
return Ok(ProtocolObjectDecodeState::Pending(self));
688+
}
689+
};
690+
self.stream_size = ProtocolObjectDecodeState::Completed(stream_size);
691+
// load items
692+
while self.items.len() != stream_count {
674693
match match self.pending.take() {
675-
Some(pending_vs) => pending_vs.complete(decoder),
676-
None => ValueStream::initialize(decoder).complete(decoder),
694+
Some(pending_vs) => pending_vs._complete(decoder, stream_size),
695+
None => ValueStream::initialize(decoder)._complete(decoder, stream_size),
677696
}? {
678697
ProtocolObjectDecodeState::Completed(vs) => {
679698
self.items.push(vs.items);
@@ -850,8 +869,8 @@ fn decode_value_stream() {
850869
#[test]
851870
fn decode_multi_value_stream() {
852871
let packet = [
853-
b"5\n".to_vec(),
854-
"8\n\x00\x01\x01\x0518446744073709551615\n\x09-9223372036854775808\n\x0A-3.141592654\n\x0C5\nabcde\x0D5\nfghij\x0E2\n\x0C5\nabcde\x0D5\nfghij".repeat(5).into_bytes()
872+
b"5\n8\n".to_vec(),
873+
"\x00\x01\x01\x0518446744073709551615\n\x09-9223372036854775808\n\x0A-3.141592654\n\x0C5\nabcde\x0D5\nfghij\x0E2\n\x0C5\nabcde\x0D5\nfghij".repeat(5).into_bytes()
855874
].concat();
856875
for i in 1..packet.len() {
857876
let mut decoder = Decoder::new(&packet[..i], 0);

0 commit comments

Comments
 (0)