Skip to content

Commit eb7207a

Browse files
author
liukai2012
authored
Merge pull request #1148 from shangmin-001/shangmin01
dirx/file 模式,inode不变的情况下增加数据,采集时一行数据会被拆分为多行发送
2 parents d60f1b5 + a9c35b5 commit eb7207a

File tree

5 files changed

+177
-62
lines changed

5 files changed

+177
-62
lines changed

mgr/runner_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2109,7 +2109,7 @@ DONE:
21092109

21102110
}
21112111
time.Sleep(50 * time.Millisecond)
2112-
if dft > 200 {
2112+
if dft > 1000 {
21132113
break
21142114
}
21152115
}

reader/dirx/dir_reader.go

Lines changed: 68 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type dirReader struct {
3737
logPath string
3838
readLock sync.RWMutex
3939
readcache string
40+
halfLineCache map[string]string //针对不同的数据源做一个缓存
4041
numEmptyLines int
4142

4243
msgChan chan<- message
@@ -103,8 +104,40 @@ func (dr *dirReader) Run() {
103104
time.Sleep(2 * time.Second)
104105
continue
105106
}
107+
source := dr.br.Source()
108+
if _, ok := dr.halfLineCache[source]; !ok {
109+
dr.readLock.Lock()
110+
dr.halfLineCache[source] = ""
111+
dr.readLock.Unlock()
112+
}
106113

107-
if len(dr.readcache) == 0 {
114+
if dr.readcache != "" && err == io.EOF {
115+
dr.readLock.Lock()
116+
if len(dr.halfLineCache[source])+len(dr.readcache) > 20*utils.Mb {
117+
log.Warnf("Runner[%v] log path[%v] reader[%v] single line size has exceed 2k", dr.runnerName, dr.originalPath, source)
118+
dr.readcache = dr.halfLineCache[source] + dr.readcache
119+
dr.halfLineCache[source] = ""
120+
} else {
121+
dr.halfLineCache[source] += dr.readcache
122+
dr.readcache = ""
123+
}
124+
dr.readLock.Unlock()
125+
}
126+
127+
if err == nil && dr.halfLineCache[source] != "" {
128+
dr.readLock.Lock()
129+
dr.readcache += dr.halfLineCache[source]
130+
dr.halfLineCache[source] = ""
131+
dr.readLock.Unlock()
132+
}
133+
134+
if len(dr.readcache) == 0 && dr.halfLineCache[source] == "" {
135+
if key, exist := utils.GetKeyOfNotEmptyValueInMap(dr.halfLineCache); exist {
136+
source = key
137+
}
138+
}
139+
140+
if len(dr.readcache) == 0 && dr.halfLineCache[source] == "" {
108141
dr.numEmptyLines++
109142
// 文件 EOF,同时没有任何内容,代表不是第一次 EOF,休息时间设置长一些
110143
if err == io.EOF {
@@ -122,9 +155,19 @@ func (dr *dirReader) Run() {
122155
// 读取的结果为空,无论如何都 sleep 1s
123156
time.Sleep(time.Second)
124157
continue
158+
} else if len(dr.readcache) == 0 && dr.halfLineCache[source] != "" {
159+
dr.numEmptyLines++
160+
if err == io.EOF && dr.numEmptyLines < 40{
161+
log.Debugf("Runner[%s] %s meet EOF, ActiveReader was inactive now, stop it", dr.runnerName, dr.originalPath)
162+
time.Sleep(1 * time.Second)
163+
continue
164+
}
165+
dr.readLock.Lock()
166+
dr.readcache = dr.halfLineCache[source]
167+
dr.halfLineCache[source] = ""
168+
dr.readLock.Unlock()
125169
}
126170
}
127-
128171
log.Debugf("Runner[%v] %v >>>>>> read cache[%v] line cache [%v]", dr.runnerName, dr.originalPath, dr.readcache, string(dr.br.FormMutiLine()))
129172
repeat := 0
130173
for {
@@ -150,6 +193,7 @@ func (dr *dirReader) Run() {
150193
case dr.msgChan <- message{result: dr.readcache, logpath: dr.originalPath, currentFile: dr.br.Source()}:
151194
dr.readLock.Lock()
152195
dr.readcache = ""
196+
153197
dr.readLock.Unlock()
154198
case <-ticker.C:
155199
}
@@ -195,9 +239,20 @@ func HasDirExpired(dir string, expire time.Duration) bool {
195239
return latestModTime.Add(expire).Before(time.Now())
196240
}
197241

198-
//对于读完的直接认为过期,因为不会追加新数据
242+
//对于读完的直接认为过期,因为不会追加新数据;最后一次需要等待30s
199243
func (dr *dirReader) ReadDone() bool {
200-
return dr.br.ReadDone()
244+
return dr.br.ReadDone() && dr.halfLineCacheIsEmpty()
245+
}
246+
247+
func (dr *dirReader) halfLineCacheIsEmpty() bool {
248+
dr.readLock.Lock()
249+
defer dr.readLock.Unlock()
250+
for _, v := range dr.halfLineCache {
251+
if v != "" {
252+
return false
253+
}
254+
}
255+
return true
201256
}
202257

203258
func (dr *dirReader) HasExpired(expire time.Duration) bool {
@@ -346,14 +401,15 @@ func (drs *dirReaders) NewReader(opts newReaderOptions, notFirstTime bool, maxLi
346401
}
347402

348403
dr := &dirReader{
349-
status: StatusInit,
350-
inactive: 1,
351-
br: br,
352-
runnerName: opts.Meta.RunnerName,
353-
originalPath: opts.OriginalPath,
354-
logPath: opts.LogPath,
355-
msgChan: opts.MsgChan,
356-
errChan: opts.ErrChan,
404+
status: StatusInit,
405+
inactive: 1,
406+
br: br,
407+
halfLineCache: make(map[string]string),
408+
runnerName: opts.Meta.RunnerName,
409+
originalPath: opts.OriginalPath,
410+
logPath: opts.LogPath,
411+
msgChan: opts.MsgChan,
412+
errChan: opts.ErrChan,
357413
}
358414

359415
drs.lock.Lock()

reader/dirx/dirx_test.go

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func multiReaderOneLineTest(t *testing.T) {
168168
if err == io.EOF {
169169
break
170170
}
171-
if maxNum >= 18 || emptyNum > 10 {
171+
if maxNum >= 60 || emptyNum > 60 {
172172
break
173173
}
174174
}
@@ -237,7 +237,7 @@ func multiReaderMultiLineTest(t *testing.T) {
237237
if err == io.EOF {
238238
break
239239
}
240-
if maxNum >= 7 || emptyNum > 10 {
240+
if maxNum >= 60 || emptyNum > 60 {
241241
break
242242
}
243243
}
@@ -264,7 +264,7 @@ func multiReaderMultiLineTest(t *testing.T) {
264264
if err == io.EOF {
265265
break
266266
}
267-
if maxNum >= 10 || emptyNum > 10 {
267+
if maxNum >= 60 || emptyNum > 60 {
268268
break
269269
}
270270
}
@@ -340,7 +340,7 @@ func multiReaderSyncMetaOneLineTest(t *testing.T) {
340340
if err == io.EOF {
341341
break
342342
}
343-
if maxNum >= 2 || emptyNum > 10 {
343+
if maxNum >= 60 || emptyNum > 60 {
344344
break
345345
}
346346
}
@@ -369,7 +369,7 @@ func multiReaderSyncMetaOneLineTest(t *testing.T) {
369369
if err == io.EOF {
370370
break
371371
}
372-
if maxNum >= 9 || emptyNum > 10 {
372+
if maxNum >= 60 || emptyNum > 60 {
373373
break
374374
}
375375
}
@@ -396,7 +396,7 @@ func multiReaderSyncMetaOneLineTest(t *testing.T) {
396396
if err == io.EOF {
397397
break
398398
}
399-
if maxNum >= 12 || emptyNum > 10 {
399+
if maxNum >= 60 || emptyNum > 60 {
400400
break
401401
}
402402
}
@@ -471,7 +471,7 @@ func multiReaderSyncMetaMutilineTest(t *testing.T) {
471471
if err == io.EOF {
472472
break
473473
}
474-
if maxNum >= 5 || emptyNum > 10 {
474+
if maxNum >= 60 || emptyNum > 60 {
475475
break
476476
}
477477
}
@@ -501,7 +501,7 @@ func multiReaderSyncMetaMutilineTest(t *testing.T) {
501501
if err == io.EOF {
502502
break
503503
}
504-
if maxNum >= 7 || emptyNum > 10 {
504+
if maxNum >= 60 || emptyNum > 60 {
505505
break
506506
}
507507
}
@@ -527,7 +527,7 @@ func multiReaderSyncMetaMutilineTest(t *testing.T) {
527527
if err == io.EOF {
528528
break
529529
}
530-
if maxNum >= 10 || emptyNum > 10 {
530+
if maxNum >= 60 || emptyNum > 60 {
531531
break
532532
}
533533
}
@@ -619,7 +619,7 @@ func multiReaderNewestTest(t *testing.T) {
619619
if err == io.EOF {
620620
break
621621
}
622-
if maxNum >= 3 || emptyNum > 10 {
622+
if maxNum >= 60 || emptyNum > 60 {
623623
break
624624
}
625625
}
@@ -689,7 +689,7 @@ func multiReaderNewestOffsetTest(t *testing.T) {
689689
} else {
690690
emptyNum++
691691
}
692-
if emptyNum > 5 {
692+
if emptyNum > 60 {
693693
break
694694
}
695695
}
@@ -717,7 +717,7 @@ func multiReaderNewestOffsetTest(t *testing.T) {
717717
if err == io.EOF {
718718
break
719719
}
720-
if maxNum >= 6 || emptyNum > 10 {
720+
if maxNum >= 60 || emptyNum > 60 {
721721
break
722722
}
723723
}
@@ -791,7 +791,7 @@ func multiReaderSameInodeTest(t *testing.T) {
791791
} else {
792792
emptyNum++
793793
}
794-
if maxNum >= 10 || emptyNum > 5 {
794+
if maxNum >= 60 || emptyNum > 60 {
795795
break
796796
}
797797
}
@@ -818,7 +818,7 @@ func multiReaderSameInodeTest(t *testing.T) {
818818
if err == io.EOF {
819819
break
820820
}
821-
if maxNum >= 13 || emptyNum > 10 {
821+
if maxNum >= 60 || emptyNum > 60 {
822822
break
823823
}
824824
}
@@ -841,7 +841,7 @@ func multiReaderSameInodeTest(t *testing.T) {
841841
if err == io.EOF {
842842
break
843843
}
844-
if maxNum >= 19 || emptyNum > 10 {
844+
if maxNum >= 60 || emptyNum > 60 {
845845
break
846846
}
847847
}
@@ -924,7 +924,7 @@ func readerExpireDeleteTest(t *testing.T) {
924924
if err == io.EOF {
925925
break
926926
}
927-
if maxNum >= 18 || emptyNum > 10 {
927+
if maxNum >= 100 || emptyNum > 100 {
928928
break
929929
}
930930
}
@@ -1072,7 +1072,7 @@ func readerExpireDeleteTarTest(t *testing.T) {
10721072
if err == io.EOF {
10731073
break
10741074
}
1075-
if maxNum >= 18 || emptyNum > 10 {
1075+
if maxNum >= 60 || emptyNum > 60 {
10761076
break
10771077
}
10781078
}
@@ -1155,7 +1155,7 @@ func TestMultiReaderReset(t *testing.T) {
11551155
if err == io.EOF {
11561156
break
11571157
}
1158-
if maxNum >= 8 || emptyNum > 10 {
1158+
if maxNum >= 60 || emptyNum > 60 {
11591159
break
11601160
}
11611161
}
@@ -1193,7 +1193,7 @@ func TestMultiReaderReset(t *testing.T) {
11931193
if err == io.EOF {
11941194
break
11951195
}
1196-
if maxNum >= 8 || emptyNum > 10 {
1196+
if maxNum >= 60 || emptyNum > 60 {
11971197
break
11981198
}
11991199
}
@@ -1254,7 +1254,7 @@ func TestReaderErrBegin(t *testing.T) {
12541254
if err == io.EOF {
12551255
break
12561256
}
1257-
if maxNum >= 8 {
1257+
if maxNum >= 60 {
12581258
break
12591259
}
12601260
}

0 commit comments

Comments
 (0)