Skip to content

Commit f3eebb9

Browse files
author
liukai2012
authored
Merge pull request #1175 from PapaPiya/fix_cleaner_panic
cleaner增加状态同步,防止写channel时导致panic
2 parents d464ae5 + f70ec20 commit f3eebb9

File tree

1 file changed

+29
-0
lines changed

1 file changed

+29
-0
lines changed

cleaner/cleaner.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package cleaner
22

33
import (
44
"path/filepath"
5+
"runtime/debug"
6+
"sync/atomic"
57
"time"
68

79
"github.com/bmatcuk/doublestar"
@@ -23,6 +25,7 @@ type Cleaner struct {
2325
cleanChan chan<- CleanSignal
2426
name string
2527
logdir string
28+
status int32
2629
}
2730

2831
type CleanSignal struct {
@@ -91,10 +94,19 @@ func NewCleaner(conf conf.MapConf, meta *reader.Meta, cleanChan chan<- CleanSign
9194
cleanChan: cleanChan,
9295
name: name,
9396
logdir: logdir,
97+
status: config.StatusInit,
9498
}, nil
9599
}
96100

97101
func (c *Cleaner) Run() {
102+
if !atomic.CompareAndSwapInt32(&c.status, config.StatusInit, config.StatusRunning) {
103+
if c.hasStopped() {
104+
log.Warnf("cleaner[%v] has stopped, run operation ignored", c.name)
105+
} else {
106+
log.Warnf("cleaner[%v] has already running, run operation ignored", c.name)
107+
}
108+
return
109+
}
98110
for {
99111
select {
100112
case <-c.exitChan:
@@ -110,9 +122,17 @@ func (c *Cleaner) Run() {
110122
}
111123

112124
func (c *Cleaner) Close() {
125+
if !atomic.CompareAndSwapInt32(&c.status, config.StatusRunning, config.StatusStopped) {
126+
log.Warnf("cleaner[%v] is not running, close operation ignored", c.name)
127+
return
128+
}
113129
c.exitChan <- struct{}{}
114130
}
115131

132+
func (c *Cleaner) hasStopped() bool {
133+
return atomic.LoadInt32(&c.status) == config.StatusStopped
134+
}
135+
116136
func (c *Cleaner) Name() string {
117137
return c.name
118138
}
@@ -160,6 +180,15 @@ func (c *Cleaner) checkBelong(path string) bool {
160180
}
161181

162182
func (c *Cleaner) Clean() (err error) {
183+
defer func() {
184+
if rec := recover(); rec != nil {
185+
log.Errorf("cleaner %q was panicked and recovered from %v\nstack: %s", c.Name(), rec, debug.Stack())
186+
}
187+
}()
188+
if c.hasStopped() {
189+
log.Warnf("cleaner[%v] reader %s has stopped, skip clean operation", c.name)
190+
return
191+
}
163192
var size int64 = 0
164193
var count int64 = 0
165194
beginClean := false

0 commit comments

Comments
 (0)