Skip to content

Commit 39d089a

Browse files
author
liukai2012
authored
Merge pull request #1155 from PapaPiya/fix_dir_dup
修复dir会重复读取的问题
2 parents eb7207a + fd405d9 commit 39d089a

File tree

8 files changed

+121
-143
lines changed

8 files changed

+121
-143
lines changed

reader/bufreader/bufreader.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -315,11 +315,7 @@ func (b *BufReader) readSlice(delim []byte) (line []byte, err error) {
315315
defer b.mux.Unlock()
316316
for {
317317
if atomic.LoadInt32(&b.stopped) > 0 {
318-
if !IsSelfRunner(b.Meta.RunnerName) {
319-
log.Warn("BufReader was stopped while reading...")
320-
} else {
321-
log.Debug("BufReader was stopped while reading...")
322-
}
318+
log.Debug("BufReader was stopped while reading...")
323319
return
324320
}
325321
// Search buffer.
@@ -391,7 +387,7 @@ func (b *BufReader) readBytes(delim []byte) ([]byte, error) {
391387
}
392388
nolog = true
393389
lineLen = 0
394-
full = make([][]byte, 0, len(full)) // 重新申请空间
390+
full = make([][]byte, 0, len(full)) // 重新申请空间
395391
continue
396392
}
397393
buf := make([]byte, fragLen)
@@ -646,7 +642,7 @@ func NewFileDirReader(meta *reader.Meta, conf conf.MapConf) (reader reader.Reade
646642
newfileNewLine, _ := conf.GetBoolOr(KeyNewFileNewLine, false)
647643
skipFirstLine, _ := conf.GetBoolOr(KeySkipFileFirstLine, false)
648644
readSameInode, _ := conf.GetBoolOr(KeyReadSameInode, false)
649-
fr, err := seqfile.NewSeqFile(meta, logpath, ignoreHidden, newfileNewLine, ignoreFileSuffix, validFilesRegex, whence, nil, inodeSensitive)
645+
fr, err := seqfile.NewSeqFile(meta, logpath, ignoreHidden, newfileNewLine, ignoreFileSuffix, validFilesRegex, whence, inodeSensitive)
650646
if err != nil {
651647
return
652648
}

reader/cloudtrail/cloudtrail.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) {
8585
validFilePattern, _ := conf.GetStringOr(KeyValidFilePattern, "*")
8686
bufSize, _ := conf.GetIntOr(KeyBufSize, bufreader.DefaultBufSize)
8787
skipFirstLine, _ := conf.GetBoolOr(KeySkipFileFirstLine, false)
88-
sf, err := seqfile.NewSeqFile(meta, opts.directory, true, true, ignoredSuffixes, validFilePattern, WhenceOldest, nil, true)
88+
sf, err := seqfile.NewSeqFile(meta, opts.directory, true, true, ignoredSuffixes, validFilePattern, WhenceOldest, true)
8989
if err != nil {
9090
return nil, err
9191
}

reader/dirx/dir_reader.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -351,8 +351,6 @@ type newReaderOptions struct {
351351
Whence string
352352
BufferSize int
353353

354-
expireMap map[string]int64
355-
356354
MsgChan chan<- message
357355
ErrChan chan<- error
358356

@@ -385,7 +383,7 @@ func (drs *dirReaders) NewReader(opts newReaderOptions, notFirstTime bool, maxLi
385383
return nil, fmt.Errorf("new extract reader: %v", err)
386384
}
387385
} else {
388-
fr, err := seqfile.NewSeqFile(subMeta, opts.LogPath, opts.IgnoreHidden, opts.NewFileNewLine, opts.IgnoreFileSuffixes, opts.ValidFilesRegex, opts.Whence, opts.expireMap, true)
386+
fr, err := seqfile.NewSeqFile(subMeta, opts.LogPath, opts.IgnoreHidden, opts.NewFileNewLine, opts.IgnoreFileSuffixes, opts.ValidFilesRegex, opts.Whence, true)
389387
if err != nil {
390388
return nil, fmt.Errorf("new sequence file: %v", err)
391389
}

reader/dirx/dirx.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,6 @@ func (r *Reader) statLogPath() {
303303
MsgChan: r.msgChan,
304304
ErrChan: r.errChan,
305305
ReadSameInode: r.readSameInode,
306-
expireMap: r.expireMap,
307306
}, r.notFirstTime, r.maxLineLen)
308307
if err != nil {
309308
if err == ErrAlreadyExist {

reader/dirx/dirx_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
)
2222

2323
func createFileWithContent(filepathn, lines string) {
24-
file, err := os.OpenFile(filepathn, os.O_CREATE|os.O_WRONLY, DefaultFilePerm)
24+
file, err := os.OpenFile(filepathn, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, DefaultFilePerm)
2525
if err != nil {
2626
log.Error(err)
2727
return
@@ -641,7 +641,6 @@ func multiReaderNewestOffsetTest(t *testing.T) {
641641

642642
createDirWithName(dir1)
643643
createFileWithContent(dir1file1, "abc123\nabc124\nabc125\nabc126\nabc127\n")
644-
time.Sleep(15 * time.Second)
645644
expectResults := map[string]int{
646645
"abc\nx\n": 1,
647646
"abc\ny\n": 1,
@@ -728,6 +727,10 @@ func multiReaderNewestOffsetTest(t *testing.T) {
728727
}
729728

730729
func multiReaderSameInodeTest(t *testing.T) {
730+
731+
}
732+
733+
func TestMultiReaderSameInodeTest(t *testing.T) {
731734
dirname := "multiReaderSameInodeTest"
732735
dir1 := filepath.Join(dirname, "logs/abc")
733736
dir2 := filepath.Join(dirname, "logs/xyz")
@@ -745,8 +748,7 @@ func multiReaderSameInodeTest(t *testing.T) {
745748
"abc124\n": 3,
746749
"abc125\n": 3,
747750
"abc126\n": 3,
748-
"abc127\n": 3,
749-
"abc128\n": 1,
751+
"abc127\n": 2,
750752
"abc\nx\n": 1,
751753
"abc\ny\n": 1,
752754
"abc\nz\n": 1,
@@ -823,7 +825,7 @@ func multiReaderSameInodeTest(t *testing.T) {
823825
}
824826
}
825827
t.Log("Reader has finished reading two")
826-
createFileWithContent(dir1file1, "abc123\nabc124\nabc125\nabc126\nabc127\nabc128\n")
828+
createFileWithContent(dir1file1, "abc123\nabc124\nabc125\nabc126\n")
827829
time.Sleep(5 * time.Second)
828830
assert.Equal(t, 2, dr.dirReaders.Num(), "Number of readers")
829831

reader/meta.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package reader
22

33
import (
4+
"bytes"
45
"errors"
56
"fmt"
67
"io/ioutil"
@@ -515,6 +516,25 @@ func (m *Meta) AppendDoneFileInode(path string, inode uint64, offset int64) (err
515516
return
516517
}
517518

519+
func (m *Meta) SyncDoneFileInode(inodeOffset map[string]int64) (err error) {
520+
f, err := os.OpenFile(m.DoneFile(), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, DefaultFilePerm)
521+
if err != nil {
522+
return
523+
}
524+
defer f.Close()
525+
526+
var data bytes.Buffer
527+
for inodeFile, offset := range inodeOffset {
528+
str := strings.Split(inodeFile, "_")
529+
if len(str) != 2 {
530+
continue
531+
}
532+
data.WriteString(fmt.Sprintf("%s\t%v\t%v\t%s\n", str[0], str[1], offset, time.Now().Format(time.RFC3339Nano)))
533+
}
534+
_, err = fmt.Fprintf(f, data.String())
535+
return
536+
}
537+
518538
func (m *Meta) GetDoneFileContent() ([]string, error) {
519539
return m.getDoneFileContent()
520540
}

0 commit comments

Comments
 (0)