Skip to content

Commit fd405d9

Browse files
committed
[PDR-13483][fix(dir)]修复dir模式下数据有漏采多采的问题
1 parent 93c2c43 commit fd405d9

File tree

2 files changed

+56
-30
lines changed

2 files changed

+56
-30
lines changed

reader/bufreader/bufreader.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -315,11 +315,7 @@ func (b *BufReader) readSlice(delim []byte) (line []byte, err error) {
315315
defer b.mux.Unlock()
316316
for {
317317
if atomic.LoadInt32(&b.stopped) > 0 {
318-
if !IsSelfRunner(b.Meta.RunnerName) {
319-
log.Warn("BufReader was stopped while reading...")
320-
} else {
321-
log.Debug("BufReader was stopped while reading...")
322-
}
318+
log.Debug("BufReader was stopped while reading...")
323319
return
324320
}
325321
// Search buffer.

reader/seqfile/seqfile.go

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"errors"
55
"fmt"
66
"io"
7+
"io/ioutil"
78
"os"
89
"path/filepath"
910
"strconv"
@@ -149,13 +150,45 @@ func NewSeqFile(meta *reader.Meta, path string, ignoreHidden, newFileNewLine boo
149150
sf.f = nil
150151
sf.offset = 0
151152
}
152-
sf.inodeOffset = meta.GetDoneFileInode(sf.inodeSensitive)
153153
sf.dir = dir
154+
sf.inodeOffset = meta.GetDoneFileInode(sf.inodeSensitive)
155+
sf.updateInodeOffset(whence)
154156
sf.currFile = currFile
155157
sf.name = "SeqFile:" + dir
156158
return sf, nil
157159
}
158160

161+
func (sf *SeqFile) updateInodeOffset(whence string) {
162+
if whence != config.WhenceNewest {
163+
return
164+
}
165+
if len(sf.inodeOffset) != 0 {
166+
return
167+
}
168+
files, err := ioutil.ReadDir(sf.dir)
169+
if err != nil {
170+
log.Errorf("Runner[%v] %v read dir error %v", sf.meta.RunnerName, sf.dir, err)
171+
return
172+
}
173+
for _, f := range files {
174+
if f.IsDir() {
175+
continue
176+
}
177+
inode, err := utilsos.GetIdentifyIDByPath(filepath.Join(sf.dir, f.Name()))
178+
if err != nil {
179+
log.Errorf("Runner[%s] NewSeqFile get file %s inode error %v, ignore...", sf.meta.RunnerName, f.Name(), err)
180+
continue
181+
}
182+
var key string
183+
if sf.inodeSensitive {
184+
key = reader.JoinFileInode(f.Name(), strconv.FormatUint(inode, 10))
185+
} else {
186+
key = filepath.Base(f.Name())
187+
}
188+
sf.inodeOffset[key] = f.Size()
189+
}
190+
}
191+
159192
func (sf *SeqFile) getIgnoreCondition() func(os.FileInfo) bool {
160193
return func(fi os.FileInfo) bool {
161194

@@ -424,7 +457,7 @@ func (sf *SeqFile) getNextFileCondition() (condition func(os.FileInfo) bool, err
424457
}
425458
return
426459
}
427-
currFi, err := os.Stat(sf.currFile)
460+
_, err = os.Stat(sf.currFile)
428461
if err != nil {
429462
if !os.IsNotExist(err) {
430463
// 日志读取错误
@@ -436,9 +469,6 @@ func (sf *SeqFile) getNextFileCondition() (condition func(os.FileInfo) bool, err
436469
log.Debugf("Runner[%v] stat current file [%v] error %v, start to find the oldest file", sf.meta.RunnerName, sf.currFile, err)
437470
return
438471
}
439-
newerThanCurrFile := func(f os.FileInfo) bool {
440-
return f.ModTime().UnixNano() >= currFi.ModTime().UnixNano()
441-
}
442472

443473
isNewFile := func(f os.FileInfo) bool {
444474
inode, err := utilsos.GetIdentifyIDByPath(filepath.Join(sf.dir, f.Name()))
@@ -465,7 +495,7 @@ func (sf *SeqFile) getNextFileCondition() (condition func(os.FileInfo) bool, err
465495
return !ok || (sf.ReadSameInode && offset != -1 && f.Size() != offset)
466496
}
467497

468-
condition = reader.AndCondition(reader.AndCondition(newerThanCurrFile, sf.getIgnoreCondition()), isNewFile)
498+
condition = reader.AndCondition(sf.getIgnoreCondition(), isNewFile)
469499
return
470500
}
471501

@@ -572,30 +602,11 @@ func (sf *SeqFile) open(fi os.FileInfo) (err error) {
572602
doneFileInode := sf.inode
573603
doneFileOffset := sf.offset
574604
sf.lastFile = doneFile
575-
fname := fi.Name()
576-
sf.currFile = filepath.Join(sf.dir, fname)
577-
f, err := os.Open(sf.currFile)
578-
if err != nil {
579-
log.Warnf("Runner[%v] os.Open %s: %v", sf.meta.RunnerName, fname, err)
580-
return err
581-
}
582605

583606
//开新的之前关掉老的
584607
if sf.ratereader != nil {
585608
sf.ratereader.Close()
586609
}
587-
if sf.meta.Readlimit > 0 {
588-
sf.ratereader = rateio.NewRateReader(f, sf.meta.Readlimit)
589-
} else {
590-
sf.ratereader = f
591-
}
592-
sf.offset = sf.getOffset(f, 0, true)
593-
sf.f = f
594-
sf.inode, err = utilsos.GetIdentifyIDByPath(sf.currFile)
595-
if err != nil {
596-
return err
597-
}
598-
log.Infof("Runner[%v] %s - start tail new file: %s", sf.meta.RunnerName, sf.dir, fname)
599610
if sf.inodeOffset == nil {
600611
sf.inodeOffset = make(map[string]int64)
601612
}
@@ -621,6 +632,25 @@ func (sf *SeqFile) open(fi os.FileInfo) (err error) {
621632
}
622633
break
623634
}
635+
filename := fi.Name()
636+
sf.currFile = filepath.Join(sf.dir, filename)
637+
f, err := os.Open(sf.currFile)
638+
if err != nil {
639+
log.Warnf("Runner[%v] os.Open %s: %v", sf.meta.RunnerName, filename, err)
640+
return err
641+
}
642+
if sf.meta.Readlimit > 0 {
643+
sf.ratereader = rateio.NewRateReader(f, sf.meta.Readlimit)
644+
} else {
645+
sf.ratereader = f
646+
}
647+
sf.offset = sf.getOffset(f, 0, true)
648+
sf.f = f
649+
sf.inode, err = utilsos.GetIdentifyIDByPath(sf.currFile)
650+
if err != nil {
651+
return err
652+
}
653+
log.Infof("Runner[%v] %s - start tail new file: %s", sf.meta.RunnerName, sf.dir, filename)
624654
return
625655
}
626656

0 commit comments

Comments
 (0)