Skip to content

Commit 23630b1

Browse files
committed
[PDR-12988][fix(ci)]修复ci data race
1 parent b3c9931 commit 23630b1

File tree

7 files changed

+26
-17
lines changed

7 files changed

+26
-17
lines changed

mgr/dataflow.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,9 @@ const (
3737
DefaultRawDataBatchLen = 10
3838
RawDataMaxBatchLines = 100
3939
DefaultRawDataSize = 16 * 1024
40+
DefaultRawDataTimeout = 30
4041
)
4142

42-
var RawDataTimeOut = 30 * time.Second
43-
4443
// RawData 从 reader 模块中根据 type 获取多条字符串形式的样例日志
4544
func RawData(readerConfig conf.MapConf) ([]string, error) {
4645
defer func() {
@@ -53,6 +52,10 @@ func RawData(readerConfig conf.MapConf) ([]string, error) {
5352
}
5453

5554
runnerName, _ := readerConfig.GetString(GlobalKeyName)
55+
rawDataTimeOut, _ := readerConfig.GetIntOr(config.KeyRawDataTimeout, DefaultRawDataTimeout)
56+
if rawDataTimeOut < 10 || rawDataTimeOut > 300 {
57+
rawDataTimeOut = DefaultRawDataTimeout
58+
}
5659
configMetaPath := runnerName + "_" + Hash(strconv.FormatInt(time.Now().Unix(), 10))
5760
metaPath := filepath.Join(MetaTmp, configMetaPath)
5861
log.Debugf("Runner[%v] Using %s as default metaPath", runnerName, metaPath)
@@ -108,15 +111,15 @@ func RawData(readerConfig conf.MapConf) ([]string, error) {
108111
}
109112
if atomic.LoadInt32(&timeoutStatus) == 1 {
110113
if lastErr != nil {
111-
readChan <- dataResult{lastErr:lastErr}
114+
readChan <- dataResult{lastErr: lastErr}
112115
return
113116
}
114117
}
115118

116119
}()
117120

118121
var rawData []string
119-
timeout := time.NewTimer(RawDataTimeOut)
122+
timeout := time.NewTimer(time.Duration(rawDataTimeOut) * time.Second)
120123
defer timeout.Stop()
121124
select {
122125
case de := <-readChan:

mgr/dataflow_test.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,16 +135,14 @@ func Test_RawDataWithReadData(t *testing.T) {
135135
if err != nil {
136136
t.Error(err)
137137
}
138-
139-
RawDataTimeOut = 30 * time.Second
138+
runnerConf.ReaderConfig[readerconf.KeyRawDataTimeout] = "30"
140139
rawData, err := RawData(runnerConf.ReaderConfig)
141140
if err != nil {
142141
t.Error(err)
143142
}
144143

145144
expected := []string{"{\n \"logkit\": \"logkit\"\n}"}
146145
assert.Equal(t, expected, rawData)
147-
RawDataTimeOut = 30 * time.Second
148146
}
149147

150148
func Test_RawData_DaemonReader(t *testing.T) {
@@ -681,11 +679,11 @@ func Test_RawData_MultiLines(t *testing.T) {
681679
assert.Nil(t, err)
682680
assert.Equal(t, []string{"abc\n", "abc\n"}, actual)
683681

684-
RawDataTimeOut = 3 * time.Second
682+
readConfig[readerconf.KeyRawDataTimeout] = "3"
685683
os.RemoveAll(fileName)
686684
createRawDataFile(fileName, "abc\n")
687685
actual, err = RawData(readConfig)
688-
RawDataTimeOut = 30 * time.Second
686+
readConfig[readerconf.KeyRawDataTimeout] = "30"
689687
assert.Nil(t, err)
690688
assert.Equal(t, []string{"abc\n"}, actual)
691689
}

mgr/metric_runner_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,8 +200,8 @@ func metricRunTest(p *testParam) {
200200
base := filepath.Base("")
201201
metaPath := "meta/" + runnerName + "_" + Hash(base)
202202
t.Log("metaPath: ", metaPath)
203-
for i := 0; !utils.IsExist(metaPath) && i < 6; i++ {
204-
time.Sleep(500 * time.Millisecond)
203+
for i := 0; !utils.IsExist(metaPath) && i < 20; i++ {
204+
time.Sleep(1 * time.Second)
205205
i++
206206
}
207207
assert.True(t, utils.IsExist(metaPath))

reader/config/models.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const (
1010
KeyAuthUsername = "auth_username"
1111
KeyAuthPassword = "auth_password"
1212

13+
KeyRawDataTimeout = "raw_data_timeout"
1314
KeyLogPath = "log_path"
1415
KeyLogPathOld = "log_path_old"
1516
KeyMetaPath = "meta_path"

reader/tailx/tailx.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -606,12 +606,19 @@ func (r *Reader) sendError(err error) {
606606
if err == nil {
607607
return
608608
}
609+
if r.isStopping() || r.hasStopped() {
610+
log.Debugf("Runner[%s] reader %q is not running, send error operation ignored", r.meta.RunnerName, r.Name())
611+
return
612+
}
609613
defer func() {
610614
if rec := recover(); rec != nil {
611615
log.Errorf("Reader %q was panicked and recovered from %v\nstack: %s", r.Name(), rec, debug.Stack())
612616
}
613617
}()
614-
r.errChan <- err
618+
select {
619+
case r.errChan <- err:
620+
case <- time.After(time.Second):
621+
}
615622
}
616623

617624
// checkExpiredFiles 函数关闭过期的文件,再更新

sender/fault_tolerant/fault_tolerant_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,10 @@ import (
99
"time"
1010
"unicode/utf8"
1111

12-
"github.com/qiniu/pandora-go-sdk/base/reqerr"
13-
1412
"github.com/stretchr/testify/assert"
1513

1614
"github.com/qiniu/log"
15+
"github.com/qiniu/pandora-go-sdk/base/reqerr"
1716
"github.com/qiniu/pandora-go-sdk/pipeline"
1817

1918
"github.com/qiniu/logkit/conf"

utils/parse/mutate/keyvalue_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,10 @@ func Test_parseLine(t *testing.T) {
167167
},
168168
{
169169
line: "fo o=def = abc ",
170-
expectData: []string{
171-
"fo o",
172-
"def = abc",
170+
expectData: []models.Data{
171+
{
172+
"fo o": "def = abc",
173+
},
173174
},
174175
existErr: false,
175176
splitter: "=",

0 commit comments

Comments
 (0)