Skip to content

Commit 87370c5

Browse files
liukai2012shangmin-001
authored andcommitted
Merge pull request #1161 from asiocity/PDR-13603
[PDR-13603][bug] 修复 s3 接口无法对接其他平台问题
2 parents b131cbb + 243d4b1 commit 87370c5

File tree

5 files changed

+57
-28
lines changed

5 files changed

+57
-28
lines changed

mgr/dataflow_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,7 @@ func Test_RawData_MultiLines(t *testing.T) {
679679
assert.Nil(t, err)
680680
assert.Equal(t, []string{"abc\n", "abc\n"}, actual)
681681

682-
readConfig[readerconf.KeyRawDataTimeout] = "3"
682+
readConfig[readerconf.KeyRawDataTimeout] = "40"
683683
os.RemoveAll(fileName)
684684
createRawDataFile(fileName, "abc\n")
685685
actual, err = RawData(readConfig)

mgr/runner_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1469,7 +1469,7 @@ func TestAddDatatags(t *testing.T) {
14691469
assert.NoError(t, err)
14701470
go rr.Run()
14711471

1472-
time.Sleep(2 * time.Second)
1472+
time.Sleep(60 * time.Second)
14731473
data, err := ioutil.ReadFile("./TestAddDatatags/filesend.json")
14741474
assert.Nil(t, err)
14751475
var res []Data
@@ -1539,7 +1539,7 @@ func TestRunWithExtra(t *testing.T) {
15391539
assert.NoError(t, err)
15401540
go rr.Run()
15411541

1542-
time.Sleep(2 * time.Second)
1542+
time.Sleep(60 * time.Second)
15431543
data, err := ioutil.ReadFile("./TestRunWithExtra/filesend.json")
15441544
assert.Nil(t, err)
15451545
var res []Data
@@ -1671,7 +1671,7 @@ func TestRunWithDataSourceFail(t *testing.T) {
16711671
assert.NotNil(t, rr)
16721672
go rr.Run()
16731673

1674-
time.Sleep(2 * time.Second)
1674+
time.Sleep(60 * time.Second)
16751675
data, err := ioutil.ReadFile("./TestRunWithDataSourceFail/filesend.json")
16761676
assert.Nil(t, err)
16771677
var res []Data
@@ -2076,21 +2076,21 @@ func TestTailxCleaner(t *testing.T) {
20762076
assert.NotNil(t, rr)
20772077
go rr.Run()
20782078

2079-
time.Sleep(2 * time.Second)
2079+
time.Sleep(60 * time.Second)
20802080

20812081
logPatha1 := filepath.Join(dira, "a.log.1")
20822082
assert.NoError(t, os.Rename(logPatha, logPatha1))
20832083

20842084
assert.NoError(t, ioutil.WriteFile(logPatha, []byte("bbbb\n"), 0666))
20852085

2086-
time.Sleep(5 * time.Second)
2086+
time.Sleep(60 * time.Second)
20872087

20882088
logPatha2 := filepath.Join(dira, "a.log.2")
20892089
assert.NoError(t, os.Rename(logPatha, logPatha2))
20902090

20912091
assert.NoError(t, ioutil.WriteFile(logPatha, []byte("cccc\n"), 0666))
20922092

2093-
time.Sleep(2 * time.Second)
2093+
time.Sleep(60 * time.Second)
20942094

20952095
assert.NotNil(t, rr.Cleaner())
20962096

@@ -2113,6 +2113,7 @@ DONE:
21132113
break
21142114
}
21152115
}
2116+
time.Sleep(50 * time.Second)
21162117
assert.Equal(t, 1, ret)
21172118
}
21182119

reader/bufreader/bufreader.go

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,17 @@ type LastSync struct {
5454

5555
// BufReader implements buffering for an FileReader object.
5656
type BufReader struct {
57-
stopped int32
58-
buf []byte
59-
delim []byte
60-
mutiLineCache *LineCache
61-
rd reader.FileReader // reader provided by the client
62-
r, w int // buf read and write positions
63-
err error
64-
lastByte int
65-
lastRuneSize int
66-
lastSync LastSync
57+
stopped int32
58+
buf []byte
59+
delim []byte
60+
mutiLineCache *LineCache
61+
waitForWholeLine bool //readWholeLine
62+
rd reader.FileReader // reader provided by the client
63+
r, w int // buf read and write positions
64+
err error
65+
lastByte int
66+
lastRuneSize int
67+
lastSync LastSync
6768

6869
runTime reader.RunTime
6970

@@ -88,7 +89,7 @@ type BufReader struct {
8889
const minReadBufferSize = 16
8990

9091
//最大连续读到空的尝试次数
91-
const maxConsecutiveEmptyReads = 10
92+
const maxConsecutiveEmptyReads = 40
9293

9394
// NewReaderSize returns a new Reader whose buffer has at least the specified
9495
// size. If the argument FileReader is already a Reader with large enough
@@ -189,6 +190,11 @@ func (b *BufReader) SetMode(mode string, v interface{}) (err error) {
189190
return
190191
}
191192

193+
func (b *BufReader) SetWaitFlagForWholeLine() {
194+
b.waitForWholeLine = true
195+
return
196+
}
197+
192198
func (b *BufReader) SetRunTime(mode string, v interface{}) (err error) {
193199
b.runTime, err = reader.ParseRunTimeWithMode(mode, v)
194200
return err
@@ -280,13 +286,25 @@ func (b *BufReader) fill() {
280286
}
281287

282288
b.w += n
289+
290+
if err == io.EOF && b.waitForWholeLine {
291+
if i == 1 { //when last attempts,return err info;
292+
b.err = err
293+
return
294+
}
295+
296+
time.Sleep(1 * time.Second)
297+
continue
298+
}
299+
283300
if err != nil {
284301
b.err = err
285302
return
286303
}
287304
if n > 0 {
288305
return
289306
}
307+
290308
}
291309
b.err = io.ErrNoProgress
292310
}
@@ -666,7 +684,9 @@ func NewSingleFileReader(meta *reader.Meta, conf conf.MapConf) (reader reader.Re
666684
return
667685
}
668686
maxLineLen, _ := conf.GetInt64Or(KeyRunnerMaxLineLen, 0)
669-
return NewReaderSize(fr, meta, bufSize, maxLineLen)
687+
r, err := NewReaderSize(fr, meta, bufSize, maxLineLen)
688+
r.SetWaitFlagForWholeLine()
689+
return r, err
670690
}
671691

672692
func init() {

reader/cloudtrail/cloudtrail.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -248,12 +248,12 @@ func newSyncManager(meta *reader.Meta, opts *syncOptions) (*syncManager, error)
248248

249249
func makeSyncSource(bucket, prefix string) string {
250250
if prefix == "" {
251-
return fmt.Sprintf("s3://%s", bucket)
251+
return fmt.Sprintf("%s", bucket)
252252
}
253253
if strings.HasPrefix(prefix, "/") {
254-
return fmt.Sprintf("s3://%s%s", bucket, prefix)
254+
return fmt.Sprintf("%s%s", bucket, prefix)
255255
}
256-
return fmt.Sprintf("s3://%s/%s", bucket, prefix)
256+
return fmt.Sprintf("%s/%s", bucket, prefix)
257257
}
258258

259259
func (mgr *syncManager) startSync() {
@@ -319,9 +319,6 @@ func newSyncRunner(ctx *syncContext, quitChan chan struct{}) *syncRunner {
319319
}
320320

321321
func (s *syncRunner) Sync() error {
322-
if !validSource(s.source) {
323-
return fmt.Errorf("invalid sync source %q", s.source)
324-
}
325322
if !validTarget(s.target) {
326323
return fmt.Errorf("invalid sync target %q", s.target)
327324
}
@@ -457,10 +454,20 @@ func (r *s3Url) keys() []string {
457454

458455
func lookupBucket(bucketName string, auth aws.Auth, region string, endpoint string) (*s3.Bucket, error) {
459456
log.Infof("looking for bucket %q in region %q", bucketName, region)
460-
rg := aws.Regions[region]
461-
if endpoint != "" {
462-
rg.S3Endpoint = endpoint
457+
458+
rg, ok := aws.Regions[region]
459+
if !ok {
460+
rg = aws.Region{
461+
Name: region,
462+
S3Endpoint: endpoint,
463+
}
464+
} else {
465+
// replace default endpoint
466+
if endpoint != "" {
467+
rg.S3Endpoint = endpoint
468+
}
463469
}
470+
464471
s3 := s3.New(auth, rg)
465472
bucket := s3.Bucket(bucketName)
466473
_, err := bucket.List("", "", "", 0)

reader/dirx/dirx_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,6 +1079,7 @@ func readerExpireDeleteTarTest(t *testing.T) {
10791079
}
10801080
}
10811081
t.Log("maxNum ", maxNum, "emptyNum", emptyNum)
1082+
time.Sleep(60 * time.Second)
10821083
assert.EqualValues(t, len(expectResults), len(actualResults))
10831084
for k, v := range expectResults {
10841085
actualV, ok := actualResults[k]

0 commit comments

Comments
 (0)