Skip to content

Commit 61880ab

Browse files
committed
Add AsyncStop config option to facilitate graceful stop in async mode on Close()
Signed-off-by: JamesJJ <jj@fcg.fyi>
1 parent 3179d5c commit 61880ab

File tree

4 files changed

+27
-2
lines changed

4 files changed

+27
-2
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ Since the default is zero value, Write will not time out.
6969
Enable asynchronous I/O (connect and write) for sending events to Fluentd.
7070
The default is false.
7171

72+
### AsyncStop
73+
74+
Enables discarding bufferred events when Close() is called in Async mode. If Fluentd is continuously unavailable, Close() will block forever otherwise.
75+
The default is false.
76+
7277
### RequestAck
7378

7479
Sets whether to request acknowledgment from Fluentd to increase the reliability

fluent/fluent.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type Config struct {
4949
MaxRetryWait int `json:"max_retry_wait"`
5050
TagPrefix string `json:"tag_prefix"`
5151
Async bool `json:"async"`
52+
AsyncStop bool `json:"async_stop"`
5253
// Deprecated: Use Async instead
5354
AsyncConnect bool `json:"async_connect"`
5455
MarshalAsJSON bool `json:"marshal_as_json"`
@@ -83,8 +84,9 @@ type msgToSend struct {
8384
type Fluent struct {
8485
Config
8586

86-
pending chan *msgToSend
87-
wg sync.WaitGroup
87+
stopRunning chan bool
88+
pending chan *msgToSend
89+
wg sync.WaitGroup
8890

8991
muconn sync.Mutex
9092
conn net.Conn
@@ -305,6 +307,10 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg
305307
// Close closes the connection, waiting for pending logs to be sent
306308
func (f *Fluent) Close() (err error) {
307309
if f.Config.Async {
310+
if f.Config.AsyncStop {
311+
f.stopRunning <- true
312+
close(f.stopRunning)
313+
}
308314
close(f.pending)
309315
f.wg.Wait()
310316
}
@@ -347,13 +353,25 @@ func (f *Fluent) connect() (err error) {
347353
}
348354

349355
func (f *Fluent) run() {
356+
drainEvents := false
350357
for {
358+
select {
359+
case stopRunning, ok := <-f.stopRunning:
360+
if stopRunning || !ok {
361+
drainEvents = true
362+
}
363+
default:
364+
}
351365
select {
352366
case entry, ok := <-f.pending:
353367
if !ok {
354368
f.wg.Done()
355369
return
356370
}
371+
if drainEvents {
372+
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, discarding...\n", time.Now().Format(time.RFC3339))
373+
continue
374+
}
357375
err := f.write(entry)
358376
if err != nil {
359377
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))

fluent/fluent_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ func TestJsonConfig(t *testing.T) {
260260
MaxRetry: 3,
261261
TagPrefix: "fluent",
262262
Async: false,
263+
AsyncStop: false,
263264
MarshalAsJSON: true,
264265
}
265266

fluent/testdata/config.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@
1010
"max_retry":3,
1111
"tag_prefix":"fluent",
1212
"async": false,
13+
"async_stop": false,
1314
"marshal_as_json": true
1415
}

0 commit comments

Comments
 (0)