Skip to content

Commit 11c3e43

Browse files
author
liukai2012
authored
Merge pull request #1163 from shangmin-001/shangmin01
file、fileauto 模式,inode不变的情况下增加数据,采集时一行数据会被拆分为多行发送
2 parents f3eebb9 + 87370c5 commit 11c3e43

File tree

4 files changed

+41
-19
lines changed

4 files changed

+41
-19
lines changed

mgr/dataflow_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,7 @@ func Test_RawData_MultiLines(t *testing.T) {
679679
assert.Nil(t, err)
680680
assert.Equal(t, []string{"abc\n", "abc\n"}, actual)
681681

682-
readConfig[readerconf.KeyRawDataTimeout] = "3"
682+
readConfig[readerconf.KeyRawDataTimeout] = "40"
683683
os.RemoveAll(fileName)
684684
createRawDataFile(fileName, "abc\n")
685685
actual, err = RawData(readConfig)

mgr/runner_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1469,7 +1469,7 @@ func TestAddDatatags(t *testing.T) {
14691469
assert.NoError(t, err)
14701470
go rr.Run()
14711471

1472-
time.Sleep(2 * time.Second)
1472+
time.Sleep(60 * time.Second)
14731473
data, err := ioutil.ReadFile("./TestAddDatatags/filesend.json")
14741474
assert.Nil(t, err)
14751475
var res []Data
@@ -1539,7 +1539,7 @@ func TestRunWithExtra(t *testing.T) {
15391539
assert.NoError(t, err)
15401540
go rr.Run()
15411541

1542-
time.Sleep(2 * time.Second)
1542+
time.Sleep(60 * time.Second)
15431543
data, err := ioutil.ReadFile("./TestRunWithExtra/filesend.json")
15441544
assert.Nil(t, err)
15451545
var res []Data
@@ -1671,7 +1671,7 @@ func TestRunWithDataSourceFail(t *testing.T) {
16711671
assert.NotNil(t, rr)
16721672
go rr.Run()
16731673

1674-
time.Sleep(2 * time.Second)
1674+
time.Sleep(60 * time.Second)
16751675
data, err := ioutil.ReadFile("./TestRunWithDataSourceFail/filesend.json")
16761676
assert.Nil(t, err)
16771677
var res []Data
@@ -2076,21 +2076,21 @@ func TestTailxCleaner(t *testing.T) {
20762076
assert.NotNil(t, rr)
20772077
go rr.Run()
20782078

2079-
time.Sleep(2 * time.Second)
2079+
time.Sleep(60 * time.Second)
20802080

20812081
logPatha1 := filepath.Join(dira, "a.log.1")
20822082
assert.NoError(t, os.Rename(logPatha, logPatha1))
20832083

20842084
assert.NoError(t, ioutil.WriteFile(logPatha, []byte("bbbb\n"), 0666))
20852085

2086-
time.Sleep(5 * time.Second)
2086+
time.Sleep(60 * time.Second)
20872087

20882088
logPatha2 := filepath.Join(dira, "a.log.2")
20892089
assert.NoError(t, os.Rename(logPatha, logPatha2))
20902090

20912091
assert.NoError(t, ioutil.WriteFile(logPatha, []byte("cccc\n"), 0666))
20922092

2093-
time.Sleep(2 * time.Second)
2093+
time.Sleep(60 * time.Second)
20942094

20952095
assert.NotNil(t, rr.Cleaner())
20962096

@@ -2113,6 +2113,7 @@ DONE:
21132113
break
21142114
}
21152115
}
2116+
time.Sleep(50 * time.Second)
21162117
assert.Equal(t, 1, ret)
21172118
}
21182119

reader/bufreader/bufreader.go

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,17 @@ type LastSync struct {
5454

5555
// BufReader implements buffering for an FileReader object.
5656
type BufReader struct {
57-
stopped int32
58-
buf []byte
59-
delim []byte
60-
mutiLineCache *LineCache
61-
rd reader.FileReader // reader provided by the client
62-
r, w int // buf read and write positions
63-
err error
64-
lastByte int
65-
lastRuneSize int
66-
lastSync LastSync
57+
stopped int32
58+
buf []byte
59+
delim []byte
60+
mutiLineCache *LineCache
61+
waitForWholeLine bool //readWholeLine
62+
rd reader.FileReader // reader provided by the client
63+
r, w int // buf read and write positions
64+
err error
65+
lastByte int
66+
lastRuneSize int
67+
lastSync LastSync
6768

6869
runTime reader.RunTime
6970

@@ -88,7 +89,7 @@ type BufReader struct {
8889
const minReadBufferSize = 16
8990

9091
//最大连续读到空的尝试次数
91-
const maxConsecutiveEmptyReads = 10
92+
const maxConsecutiveEmptyReads = 40
9293

9394
// NewReaderSize returns a new Reader whose buffer has at least the specified
9495
// size. If the argument FileReader is already a Reader with large enough
@@ -189,6 +190,11 @@ func (b *BufReader) SetMode(mode string, v interface{}) (err error) {
189190
return
190191
}
191192

193+
func (b *BufReader) SetWaitFlagForWholeLine() {
194+
b.waitForWholeLine = true
195+
return
196+
}
197+
192198
func (b *BufReader) SetRunTime(mode string, v interface{}) (err error) {
193199
b.runTime, err = reader.ParseRunTimeWithMode(mode, v)
194200
return err
@@ -280,13 +286,25 @@ func (b *BufReader) fill() {
280286
}
281287

282288
b.w += n
289+
290+
if err == io.EOF && b.waitForWholeLine {
291+
if i == 1 { //when last attempts,return err info;
292+
b.err = err
293+
return
294+
}
295+
296+
time.Sleep(1 * time.Second)
297+
continue
298+
}
299+
283300
if err != nil {
284301
b.err = err
285302
return
286303
}
287304
if n > 0 {
288305
return
289306
}
307+
290308
}
291309
b.err = io.ErrNoProgress
292310
}
@@ -666,7 +684,9 @@ func NewSingleFileReader(meta *reader.Meta, conf conf.MapConf) (reader reader.Re
666684
return
667685
}
668686
maxLineLen, _ := conf.GetInt64Or(KeyRunnerMaxLineLen, 0)
669-
return NewReaderSize(fr, meta, bufSize, maxLineLen)
687+
r, err := NewReaderSize(fr, meta, bufSize, maxLineLen)
688+
r.SetWaitFlagForWholeLine()
689+
return r, err
670690
}
671691

672692
func init() {

reader/dirx/dirx_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,6 +1079,7 @@ func readerExpireDeleteTarTest(t *testing.T) {
10791079
}
10801080
}
10811081
t.Log("maxNum ", maxNum, "emptyNum", emptyNum)
1082+
time.Sleep(60 * time.Second)
10821083
assert.EqualValues(t, len(expectResults), len(actualResults))
10831084
for k, v := range expectResults {
10841085
actualV, ok := actualResults[k]

0 commit comments

Comments
 (0)