diff --git a/.gitignore b/.gitignore index fd790a322..d0d9e3352 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ debug.test /output/ coverage.out .idea/ +*.DS_Store +*.iml diff --git a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go index 64ec6d163..2728ba833 100644 --- a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go +++ b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go @@ -30,6 +30,11 @@ import ( "k8s.io/node-problem-detector/pkg/util/tomb" ) +const ( + // retryDelay is the time to wait before attempting to restart the kmsg parser. + retryDelay = 5 * time.Second +) + type kernelLogWatcher struct { cfg types.WatcherConfig startTime time.Time @@ -101,8 +106,21 @@ func (k *kernelLogWatcher) watchLoop() { return case msg, ok := <-kmsgs: if !ok { - klog.Error("Kmsg channel closed") - return + klog.Error("Kmsg channel closed, attempting to restart kmsg parser") + + // Close the old parser + if err := k.kmsgParser.Close(); err != nil { + klog.Errorf("Failed to close kmsg parser: %v", err) + } + + // Try to restart + var restarted bool + kmsgs, restarted = k.retryCreateParser() + if !restarted { + // Stopping was signaled + return + } + continue } klog.V(5).Infof("got kernel message: %+v", msg) if msg.Message == "" { @@ -122,3 +140,26 @@ func (k *kernelLogWatcher) watchLoop() { } } } + +// retryCreateParser attempts to create a new kmsg parser. +// It tries immediately first, then waits retryDelay between subsequent failures. +// It returns the new message channel and true on success, or nil and false if stopping was signaled. +func (k *kernelLogWatcher) retryCreateParser() (<-chan kmsgparser.Message, bool) { + for { + parser, err := kmsgparser.NewParser() + if err == nil { + k.kmsgParser = parser + klog.Infof("Successfully restarted kmsg parser") + return parser.Parse(), true + } + + klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err) + + select { + case <-k.tomb.Stopping(): + klog.Infof("Stop watching kernel log during restart attempt") + return nil, false + case <-time.After(retryDelay): + } + } +} diff --git a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux_test.go b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux_test.go index fac08aea1..b29a0431f 100644 --- a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux_test.go +++ b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux_test.go @@ -17,6 +17,7 @@ limitations under the License. package kmsg import ( + "sync" "testing" "time" @@ -27,23 +28,44 @@ import ( "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" "k8s.io/node-problem-detector/pkg/util" + "k8s.io/node-problem-detector/pkg/util/tomb" ) type mockKmsgParser struct { - kmsgs []kmsgparser.Message + kmsgs []kmsgparser.Message + closeAfterSend bool + closeCalled bool + mu sync.Mutex } func (m *mockKmsgParser) SetLogger(kmsgparser.Logger) {} -func (m *mockKmsgParser) Close() error { return nil } + +func (m *mockKmsgParser) Close() error { + m.mu.Lock() + defer m.mu.Unlock() + m.closeCalled = true + return nil +} + +func (m *mockKmsgParser) WasCloseCalled() bool { + m.mu.Lock() + defer m.mu.Unlock() + return m.closeCalled +} + func (m *mockKmsgParser) Parse() <-chan kmsgparser.Message { c := make(chan kmsgparser.Message) go func() { for _, msg := range m.kmsgs { c <- msg } + if m.closeAfterSend { + close(c) + } }() return c } + func (m *mockKmsgParser) SeekEnd() error { return nil } func TestWatch(t *testing.T) { @@ -169,3 +191,134 @@ func TestWatch(t *testing.T) { } } } + +func TestWatcherStopsGracefullyOnTombStop(t *testing.T) { + now := time.Now() + + mock := &mockKmsgParser{ + kmsgs: []kmsgparser.Message{ + {Message: "test message", Timestamp: now}, + }, + closeAfterSend: false, // Don't close, let tomb stop it + } + + w := &kernelLogWatcher{ + cfg: types.WatcherConfig{}, + startTime: now.Add(-time.Second), + tomb: tomb.NewTomb(), + logCh: make(chan *logtypes.Log, 100), + kmsgParser: mock, + } + + logCh, err := w.Watch() + assert.NoError(t, err) + + // Should receive the message + select { + case log := <-logCh: + assert.Equal(t, "test message", log.Message) + case <-time.After(time.Second): + t.Fatal("timeout waiting for log message") + } + + // Stop the watcher + w.Stop() + + // Log channel should be closed after stop + select { + case _, ok := <-logCh: + assert.False(t, ok, "log channel should be closed after Stop()") + case <-time.After(time.Second): + t.Fatal("timeout waiting for log channel to close after Stop()") + } + + // Verify parser was closed + assert.True(t, mock.WasCloseCalled(), "parser Close() should have been called") +} + +func TestWatcherProcessesEmptyMessages(t *testing.T) { + now := time.Now() + + mock := &mockKmsgParser{ + kmsgs: []kmsgparser.Message{ + {Message: "", Timestamp: now}, + {Message: "valid message", Timestamp: now.Add(time.Second)}, + {Message: "", Timestamp: now.Add(2 * time.Second)}, + }, + closeAfterSend: false, + } + + w := &kernelLogWatcher{ + cfg: types.WatcherConfig{}, + startTime: now.Add(-time.Second), + tomb: tomb.NewTomb(), + logCh: make(chan *logtypes.Log, 100), + kmsgParser: mock, + } + + logCh, err := w.Watch() + assert.NoError(t, err) + + // Should only receive the non-empty message + select { + case log := <-logCh: + assert.Equal(t, "valid message", log.Message) + case <-time.After(time.Second): + t.Fatal("timeout waiting for log message") + } + + // Stop the watcher and verify channel closes + w.Stop() + + select { + case _, ok := <-logCh: + assert.False(t, ok, "log channel should be closed after Stop()") + case <-time.After(time.Second): + t.Fatal("timeout waiting for log channel to close") + } +} + +func TestWatcherTrimsMessageWhitespace(t *testing.T) { + now := time.Now() + + mock := &mockKmsgParser{ + kmsgs: []kmsgparser.Message{ + {Message: " message with spaces ", Timestamp: now}, + {Message: "\ttabbed message\t", Timestamp: now.Add(time.Second)}, + {Message: "\n\nnewlines\n\n", Timestamp: now.Add(2 * time.Second)}, + }, + closeAfterSend: false, + } + + w := &kernelLogWatcher{ + cfg: types.WatcherConfig{}, + startTime: now.Add(-time.Second), + tomb: tomb.NewTomb(), + logCh: make(chan *logtypes.Log, 100), + kmsgParser: mock, + } + + logCh, err := w.Watch() + assert.NoError(t, err) + + expectedMessages := []string{"message with spaces", "tabbed message", "newlines"} + + for _, expected := range expectedMessages { + select { + case log := <-logCh: + assert.Equal(t, expected, log.Message) + case <-time.After(time.Second): + t.Fatalf("timeout waiting for message: %s", expected) + } + } + + // Stop the watcher and verify channel closes + w.Stop() + + select { + case _, ok := <-logCh: + assert.False(t, ok, "log channel should be closed after Stop()") + case <-time.After(time.Second): + t.Fatal("timeout waiting for log channel to close") + } +}