@@ -54,25 +54,26 @@ type LastSync struct {
5454
5555// BufReader implements buffering for an FileReader object.
5656type BufReader struct {
57- stopped int32
58- buf []byte
59- delim []byte
60- mutiLineCache * LineCache
61- waitForWholeLine bool //readWholeLine
62- rd reader.FileReader // reader provided by the client
63- r , w int // buf read and write positions
64- err error
65- lastByte int
66- lastRuneSize int
67- lastSync LastSync
57+ stopped int32
58+ buf []byte
59+ delim []byte
60+ mutiLineCache * LineCache
61+ rd reader.FileReader // reader provided by the client
62+ r , w int // buf read and write positions
63+ err error
64+ lastByte int
65+ lastRuneSize int
66+ lastSync LastSync
6867
6968 runTime reader.RunTime
7069
7170 mux sync.Mutex
7271 decoder mahonia.Decoder
7372
74- Meta * reader.Meta // 存放offset的元信息
75- multiLineRegexp * regexp.Regexp
73+ Meta * reader.Meta // 存放offset的元信息
74+ multiLineRegexp * regexp.Regexp
75+ lastLineSource string // 存放多行上一次source信息
76+ lastSecondLineSource string // 存放多行上上次source信息
7677
7778 stats StatsInfo
7879 statsLock sync.RWMutex
@@ -89,7 +90,7 @@ type BufReader struct {
8990const minReadBufferSize = 16
9091
9192//最大连续读到空的尝试次数
92- const maxConsecutiveEmptyReads = 40
93+ const maxConsecutiveEmptyReads = 10
9394
9495// NewReaderSize returns a new Reader whose buffer has at least the specified
9596// size. If the argument FileReader is already a Reader with large enough
@@ -190,11 +191,6 @@ func (b *BufReader) SetMode(mode string, v interface{}) (err error) {
190191 return
191192}
192193
193- func (b * BufReader ) SetWaitFlagForWholeLine () {
194- b .waitForWholeLine = true
195- return
196- }
197-
198194func (b * BufReader ) SetRunTime (mode string , v interface {}) (err error ) {
199195 b .runTime , err = reader .ParseRunTimeWithMode (mode , v )
200196 return err
@@ -249,7 +245,6 @@ func (b *BufReader) fill() {
249245 b .updateLastRdSource ()
250246 b .r = 0
251247 }
252-
253248 if b .w >= len (b .buf ) {
254249 panic (fmt .Sprintf ("Runner[%v] bufio: tried to fill full buffer" , b .Meta .RunnerName ))
255250 }
@@ -264,10 +259,14 @@ func (b *BufReader) fill() {
264259 if n < 0 {
265260 panic (errNegativeRead )
266261 }
267- if b . latestSource != b . rd . Source () {
268- //这个情况表示文件的数据源出现了变化,在buf中已经出现了至少2个数据源的数据,要定位是哪个位置的数据出现的分隔
262+
263+ if n > 0 {
269264 if rc , ok := b .rd .(reader.NewSourceRecorder ); ok {
270265 SIdx := rc .NewSourceIndex ()
266+ if len (b .lastRdSource ) > 0 && SIdx [0 ].Source == b .lastRdSource [len (b .lastRdSource )- 1 ].Source {
267+ b .lastRdSource [len (b .lastRdSource )- 1 ].Index = SIdx [0 ].Index + b .w
268+ SIdx = SIdx [1 :]
269+ }
271270 for _ , v := range SIdx {
272271 // 从 NewSourceIndex 函数中返回的index值就是本次读取的批次中上一个DataSource的数据量,加上b.w就是上个DataSource的整体数据
273272 b .lastRdSource = append (b .lastRdSource , reader.SourceIndex {
@@ -276,37 +275,24 @@ func (b *BufReader) fill() {
276275 })
277276 }
278277 } else {
279- //如果没实现这个接口,那么就认为到上次读到的为止都是前一次source的文件
280- b .lastRdSource = append (b .lastRdSource , reader.SourceIndex {
281- Source : b .latestSource ,
282- Index : b .w ,
283- })
278+ if b .latestSource != b .rd .Source () {
279+ //如果没实现这个接口,那么就认为到上次读到的为止都是前一次source的文件
280+ b .lastRdSource = append (b .lastRdSource , reader.SourceIndex {
281+ Source : b .latestSource ,
282+ Index : b .w ,
283+ })
284+ }
284285 }
285286 b .latestSource = b .rd .Source ()
286- }
287-
288- b .w += n
289287
290- if err == io .EOF && b .waitForWholeLine {
291- if i == 1 { //when last attempts,return err info;
292- b .err = err
293- return
294- }
295-
296- time .Sleep (1 * time .Second )
297- continue
288+ b .w += n
298289 }
299290
300- if err != nil {
301- b .err = err
302- return
303- }
304- if n > 0 {
291+ b .err = err
292+ if err != nil && err != io .EOF {
305293 return
306294 }
307-
308295 }
309- b .err = io .ErrNoProgress
310296}
311297
312298func (b * BufReader ) readErr () error {
@@ -455,6 +441,7 @@ func (b *BufReader) ReadPattern() (string, error) {
455441 //读取到line的情况
456442 if len (line ) > 0 {
457443 if b .mutiLineCache .Size () <= 0 {
444+ b .lastSecondLineSource = b .lineSource ()
458445 b .mutiLineCache .Set ([]string {line })
459446 continue
460447 }
@@ -464,6 +451,10 @@ func (b *BufReader) ReadPattern() (string, error) {
464451 line = string (b .mutiLineCache .Combine ())
465452 b .mutiLineCache .Set (make ([]string , 0 , 16 ))
466453 b .mutiLineCache .Append (tmp )
454+ if b .lastLineSource != "" {
455+ b .lastSecondLineSource = b .lastLineSource
456+ }
457+ b .lastLineSource = b .lineSource ()
467458 return line , err
468459 }
469460 b .mutiLineCache .Append (line )
@@ -472,6 +463,9 @@ func (b *BufReader) ReadPattern() (string, error) {
472463 if err != nil {
473464 line = string (b .mutiLineCache .Combine ())
474465 b .mutiLineCache .Set (make ([]string , 0 , 16 ))
466+ if b .lastLineSource != "" {
467+ b .lastSecondLineSource = b .lastLineSource
468+ }
475469 return line , err
476470 }
477471 maxTimes ++
@@ -550,6 +544,13 @@ func (b *BufReader) Name() string {
550544}
551545
552546func (b * BufReader ) Source () string {
547+ if b .multiLineRegexp != nil && b .lastSecondLineSource != "" {
548+ return b .lastSecondLineSource
549+ }
550+ return b .lineSource ()
551+ }
552+
553+ func (b * BufReader ) lineSource () string {
553554 //如果我当前读取的位置在上个数据源index之前,则返回上个数据源
554555 for _ , v := range b .lastRdSource {
555556 if (b .r < v .Index ) || (v .Index > 0 && b .r == v .Index ) {
@@ -586,9 +587,11 @@ func (b *BufReader) Lag() (rl *LagInfo, err error) {
586587}
587588
588589func (b * BufReader ) ReadDone () bool {
590+ b .mux .Lock ()
591+ defer b .mux .Unlock ()
589592 lr , ok := b .rd .(reader.OnceReader )
590593 if ok {
591- return lr .ReadDone ()
594+ return lr .ReadDone () && b . r == 0 && b . w == 0
592595 }
593596 return false
594597}
@@ -684,9 +687,7 @@ func NewSingleFileReader(meta *reader.Meta, conf conf.MapConf) (reader reader.Re
684687 return
685688 }
686689 maxLineLen , _ := conf .GetInt64Or (KeyRunnerMaxLineLen , 0 )
687- r , err := NewReaderSize (fr , meta , bufSize , maxLineLen )
688- r .SetWaitFlagForWholeLine ()
689- return r , err
690+ return NewReaderSize (fr , meta , bufSize , maxLineLen )
690691}
691692
692693func init () {
@@ -704,3 +705,7 @@ func getDelimByEncodingWay(encodingWay string) []byte {
704705 return []byte ("\n " )
705706 }
706707}
708+
709+ func (b * BufReader ) GetDelimiter () []byte {
710+ return b .delim
711+ }
0 commit comments