Skip to content

Commit 93c2c43

Browse files
committed
修复dir会重复读取的问题
1 parent d60f1b5 commit 93c2c43

File tree

8 files changed

+66
-114
lines changed

8 files changed

+66
-114
lines changed

reader/bufreader/bufreader.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ func (b *BufReader) readBytes(delim []byte) ([]byte, error) {
391391
}
392392
nolog = true
393393
lineLen = 0
394-
full = make([][]byte, 0, len(full)) // 重新申请空间
394+
full = make([][]byte, 0, len(full)) // 重新申请空间
395395
continue
396396
}
397397
buf := make([]byte, fragLen)
@@ -646,7 +646,7 @@ func NewFileDirReader(meta *reader.Meta, conf conf.MapConf) (reader reader.Reade
646646
newfileNewLine, _ := conf.GetBoolOr(KeyNewFileNewLine, false)
647647
skipFirstLine, _ := conf.GetBoolOr(KeySkipFileFirstLine, false)
648648
readSameInode, _ := conf.GetBoolOr(KeyReadSameInode, false)
649-
fr, err := seqfile.NewSeqFile(meta, logpath, ignoreHidden, newfileNewLine, ignoreFileSuffix, validFilesRegex, whence, nil, inodeSensitive)
649+
fr, err := seqfile.NewSeqFile(meta, logpath, ignoreHidden, newfileNewLine, ignoreFileSuffix, validFilesRegex, whence, inodeSensitive)
650650
if err != nil {
651651
return
652652
}

reader/cloudtrail/cloudtrail.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) {
8585
validFilePattern, _ := conf.GetStringOr(KeyValidFilePattern, "*")
8686
bufSize, _ := conf.GetIntOr(KeyBufSize, bufreader.DefaultBufSize)
8787
skipFirstLine, _ := conf.GetBoolOr(KeySkipFileFirstLine, false)
88-
sf, err := seqfile.NewSeqFile(meta, opts.directory, true, true, ignoredSuffixes, validFilePattern, WhenceOldest, nil, true)
88+
sf, err := seqfile.NewSeqFile(meta, opts.directory, true, true, ignoredSuffixes, validFilePattern, WhenceOldest, true)
8989
if err != nil {
9090
return nil, err
9191
}

reader/dirx/dir_reader.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -296,8 +296,6 @@ type newReaderOptions struct {
296296
Whence string
297297
BufferSize int
298298

299-
expireMap map[string]int64
300-
301299
MsgChan chan<- message
302300
ErrChan chan<- error
303301

@@ -330,7 +328,7 @@ func (drs *dirReaders) NewReader(opts newReaderOptions, notFirstTime bool, maxLi
330328
return nil, fmt.Errorf("new extract reader: %v", err)
331329
}
332330
} else {
333-
fr, err := seqfile.NewSeqFile(subMeta, opts.LogPath, opts.IgnoreHidden, opts.NewFileNewLine, opts.IgnoreFileSuffixes, opts.ValidFilesRegex, opts.Whence, opts.expireMap, true)
331+
fr, err := seqfile.NewSeqFile(subMeta, opts.LogPath, opts.IgnoreHidden, opts.NewFileNewLine, opts.IgnoreFileSuffixes, opts.ValidFilesRegex, opts.Whence, true)
334332
if err != nil {
335333
return nil, fmt.Errorf("new sequence file: %v", err)
336334
}

reader/dirx/dirx.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,6 @@ func (r *Reader) statLogPath() {
303303
MsgChan: r.msgChan,
304304
ErrChan: r.errChan,
305305
ReadSameInode: r.readSameInode,
306-
expireMap: r.expireMap,
307306
}, r.notFirstTime, r.maxLineLen)
308307
if err != nil {
309308
if err == ErrAlreadyExist {

reader/dirx/dirx_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
)
2222

2323
func createFileWithContent(filepathn, lines string) {
24-
file, err := os.OpenFile(filepathn, os.O_CREATE|os.O_WRONLY, DefaultFilePerm)
24+
file, err := os.OpenFile(filepathn, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, DefaultFilePerm)
2525
if err != nil {
2626
log.Error(err)
2727
return
@@ -641,7 +641,6 @@ func multiReaderNewestOffsetTest(t *testing.T) {
641641

642642
createDirWithName(dir1)
643643
createFileWithContent(dir1file1, "abc123\nabc124\nabc125\nabc126\nabc127\n")
644-
time.Sleep(15 * time.Second)
645644
expectResults := map[string]int{
646645
"abc\nx\n": 1,
647646
"abc\ny\n": 1,
@@ -728,6 +727,10 @@ func multiReaderNewestOffsetTest(t *testing.T) {
728727
}
729728

730729
func multiReaderSameInodeTest(t *testing.T) {
730+
731+
}
732+
733+
func TestMultiReaderSameInodeTest(t *testing.T) {
731734
dirname := "multiReaderSameInodeTest"
732735
dir1 := filepath.Join(dirname, "logs/abc")
733736
dir2 := filepath.Join(dirname, "logs/xyz")
@@ -745,8 +748,7 @@ func multiReaderSameInodeTest(t *testing.T) {
745748
"abc124\n": 3,
746749
"abc125\n": 3,
747750
"abc126\n": 3,
748-
"abc127\n": 3,
749-
"abc128\n": 1,
751+
"abc127\n": 2,
750752
"abc\nx\n": 1,
751753
"abc\ny\n": 1,
752754
"abc\nz\n": 1,
@@ -823,7 +825,7 @@ func multiReaderSameInodeTest(t *testing.T) {
823825
}
824826
}
825827
t.Log("Reader has finished reading two")
826-
createFileWithContent(dir1file1, "abc123\nabc124\nabc125\nabc126\nabc127\nabc128\n")
828+
createFileWithContent(dir1file1, "abc123\nabc124\nabc125\nabc126\n")
827829
time.Sleep(5 * time.Second)
828830
assert.Equal(t, 2, dr.dirReaders.Num(), "Number of readers")
829831

reader/meta.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package reader
22

33
import (
4+
"bytes"
45
"errors"
56
"fmt"
67
"io/ioutil"
@@ -515,6 +516,25 @@ func (m *Meta) AppendDoneFileInode(path string, inode uint64, offset int64) (err
515516
return
516517
}
517518

519+
func (m *Meta) SyncDoneFileInode(inodeOffset map[string]int64) (err error) {
520+
f, err := os.OpenFile(m.DoneFile(), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, DefaultFilePerm)
521+
if err != nil {
522+
return
523+
}
524+
defer f.Close()
525+
526+
var data bytes.Buffer
527+
for inodeFile, offset := range inodeOffset {
528+
str := strings.Split(inodeFile, "_")
529+
if len(str) != 2 {
530+
continue
531+
}
532+
data.WriteString(fmt.Sprintf("%s\t%v\t%v\t%s\n", str[0], str[1], offset, time.Now().Format(time.RFC3339Nano)))
533+
}
534+
_, err = fmt.Fprintf(f, data.String())
535+
return
536+
}
537+
518538
func (m *Meta) GetDoneFileContent() ([]string, error) {
519539
return m.getDoneFileContent()
520540
}

reader/seqfile/seqfile.go

Lines changed: 20 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,12 @@ type SeqFile struct {
4949
SkipFileFirstLine bool //跳过新文件的第一行,常用于带title的csv文件,title与实际格式不同
5050
hasSkiped bool
5151

52-
inodeOffset map[string]int64 //记录filename_inode是否已经读过
53-
inodeSensitive bool // 是否以inode信息作为 inodeDone 和 expireMap 的key值
52+
inodeOffset map[string]int64 //记录filename_inode是否已经读过
53+
inodeSensitive bool // 是否以inode信息作为 inodeDone 和 inodeOffset 的key值
5454

5555
lastSyncPath string
5656
lastSyncOffset int64
5757

58-
expireMap map[string]int64
59-
6058
ReadSameInode bool //记录已经读过的filename_inode是否继续读
6159
}
6260

@@ -106,7 +104,7 @@ func getStartFile(path, whence string, meta *reader.Meta, sf *SeqFile) (f *os.Fi
106104
return
107105
}
108106

109-
func NewSeqFile(meta *reader.Meta, path string, ignoreHidden, newFileNewLine bool, suffixes []string, validFileRegex, whence string, expireMap map[string]int64, inodeSensitive bool) (sf *SeqFile, err error) {
107+
func NewSeqFile(meta *reader.Meta, path string, ignoreHidden, newFileNewLine bool, suffixes []string, validFileRegex, whence string, inodeSensitive bool) (sf *SeqFile, err error) {
110108
sf = &SeqFile{
111109
ignoreFileSuffix: suffixes,
112110
ignoreHidden: ignoreHidden,
@@ -115,7 +113,6 @@ func NewSeqFile(meta *reader.Meta, path string, ignoreHidden, newFileNewLine boo
115113
newFileAsNewLine: newFileNewLine,
116114
meta: meta,
117115
inodeOffset: make(map[string]int64),
118-
expireMap: expireMap,
119116
inodeSensitive: inodeSensitive,
120117
}
121118
//原来的for循环替换成单次执行,启动的时候出错就直接报错给用户即可,不需要等待重试。
@@ -465,7 +462,7 @@ func (sf *SeqFile) getNextFileCondition() (condition func(os.FileInfo) bool, err
465462
key = filepath.Base(f.Name())
466463
}
467464
offset, ok := sf.inodeOffset[key]
468-
return !ok || (sf.ReadSameInode && offset != -1 && f.Size() != offset)
465+
return !ok || (sf.ReadSameInode && offset != -1 && f.Size() != offset)
469466
}
470467

471468
condition = reader.AndCondition(reader.AndCondition(newerThanCurrFile, sf.getIgnoreCondition()), isNewFile)
@@ -582,7 +579,7 @@ func (sf *SeqFile) open(fi os.FileInfo) (err error) {
582579
log.Warnf("Runner[%v] os.Open %s: %v", sf.meta.RunnerName, fname, err)
583580
return err
584581
}
585-
sf.f = f
582+
586583
//开新的之前关掉老的
587584
if sf.ratereader != nil {
588585
sf.ratereader.Close()
@@ -593,6 +590,7 @@ func (sf *SeqFile) open(fi os.FileInfo) (err error) {
593590
sf.ratereader = f
594591
}
595592
sf.offset = sf.getOffset(f, 0, true)
593+
sf.f = f
596594
sf.inode, err = utilsos.GetIdentifyIDByPath(sf.currFile)
597595
if err != nil {
598596
return err
@@ -610,7 +608,7 @@ func (sf *SeqFile) open(fi os.FileInfo) (err error) {
610608
sf.inodeOffset[key] = doneFileOffset
611609
tryTime := 0
612610
for {
613-
err := sf.meta.AppendDoneFileInode(doneFile, doneFileInode, doneFileOffset)
611+
err := sf.meta.SyncDoneFileInode(sf.inodeOffset)
614612
if err != nil {
615613
if tryTime > 3 {
616614
log.Errorf("Runner[%v] cannot write done file %s, err:%v, ignore this noefi", sf.meta.RunnerName, doneFile, err)
@@ -692,59 +690,41 @@ type LineSkipper interface {
692690
}
693691

694692
func (sf *SeqFile) getOffset(f *os.File, offset int64, seek bool) int64 {
695-
if len(sf.expireMap) == 0 || offset != 0 || f == nil {
693+
if len(sf.inodeOffset) == 0 || offset != 0 || f == nil {
696694
return offset
697695
}
698696

699-
if sf.meta.IsExist() {
700-
deleteNotExist(filepath.Dir(f.Name()), sf.expireMap, sf.inodeSensitive)
697+
fileName := f.Name()
698+
fileInfo, err := f.Stat()
699+
if err != nil {
700+
log.Errorf("Runner[%s] NewSeqFile get file %s info error %v, ignore...", sf.meta.RunnerName, fileName, err)
701701
return offset
702702
}
703703

704-
fileName := f.Name()
705704
inode, err := utilsos.GetIdentifyIDByPath(fileName)
706705
if err != nil {
707706
log.Errorf("Runner[%s] NewSeqFile get file %s inode error %v, ignore...", sf.meta.RunnerName, fileName, err)
708707
return offset
709708
}
710-
inodeStr := strconv.FormatUint(inode, 10)
709+
var key string
711710
if sf.inodeSensitive {
712-
offset = sf.expireMap[inodeStr+"_"+fileName]
711+
key = reader.JoinFileInode(fileName, strconv.FormatUint(inode, 10))
713712
} else {
714-
offset = sf.expireMap[fileName]
713+
key = filepath.Base(fileName)
714+
}
715+
offset = sf.inodeOffset[key]
716+
if fileInfo.Size() < offset {
717+
offset = 0
715718
}
716719
if seek {
717-
_, err = f.Seek(sf.offset, io.SeekStart)
720+
_, err = f.Seek(offset, io.SeekStart)
718721
if err != nil {
719722
log.Errorf("Runner[%s] file: %s seek offset: %d failed: %v", sf.meta.RunnerName, f.Name(), sf.offset, err)
720723
}
721724
}
722725
return offset
723726
}
724727

725-
func deleteNotExist(dir string, expireMap map[string]int64, inodeSensitive bool) {
726-
if dir == "" {
727-
return
728-
}
729-
var arr []string
730-
for inodeFile := range expireMap {
731-
if inodeSensitive {
732-
arr = strings.SplitN(inodeFile, "_", 2)
733-
if len(arr) < 2 {
734-
continue
735-
}
736-
if filepath.Dir(arr[1]) != dir {
737-
continue
738-
}
739-
} else {
740-
if filepath.Dir(inodeFile) != dir {
741-
continue
742-
}
743-
}
744-
delete(expireMap, inodeFile)
745-
}
746-
}
747-
748728
var (
749729
_ LineSkipper = new(SeqFile)
750730
_ reader.NewSourceRecorder = new(SeqFile)

0 commit comments

Comments
 (0)