Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ debug.test
/output/
coverage.out
.idea/
*.DS_Store
*.iml
45 changes: 43 additions & 2 deletions pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ import (
"k8s.io/node-problem-detector/pkg/util/tomb"
)

const (
// retryDelay is the time to wait before attempting to restart the kmsg parser.
retryDelay = 5 * time.Second
)

type kernelLogWatcher struct {
cfg types.WatcherConfig
startTime time.Time
Expand Down Expand Up @@ -101,8 +106,21 @@ func (k *kernelLogWatcher) watchLoop() {
return
case msg, ok := <-kmsgs:
if !ok {
klog.Error("Kmsg channel closed")
return
klog.Error("Kmsg channel closed, attempting to restart kmsg parser")

// Close the old parser
if err := k.kmsgParser.Close(); err != nil {
klog.Errorf("Failed to close kmsg parser: %v", err)
}

// Try to restart
var restarted bool
kmsgs, restarted = k.retryCreateParser()
if !restarted {
// Stopping was signaled
return
}
continue
}
klog.V(5).Infof("got kernel message: %+v", msg)
if msg.Message == "" {
Expand All @@ -122,3 +140,26 @@ func (k *kernelLogWatcher) watchLoop() {
}
}
}

// retryCreateParser attempts to create a new kmsg parser.
// It tries immediately first, then waits retryDelay between subsequent failures.
// It returns the new message channel and true on success, or nil and false if stopping was signaled.
func (k *kernelLogWatcher) retryCreateParser() (<-chan kmsgparser.Message, bool) {
for {
parser, err := kmsgparser.NewParser()
if err == nil {
k.kmsgParser = parser
klog.Infof("Successfully restarted kmsg parser")
return parser.Parse(), true
}

klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err)

select {
case <-k.tomb.Stopping():
klog.Infof("Stop watching kernel log during restart attempt")
return nil, false
case <-time.After(retryDelay):
}
}
}
157 changes: 155 additions & 2 deletions pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package kmsg

import (
"sync"
"testing"
"time"

Expand All @@ -27,23 +28,44 @@ import (
"k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
"k8s.io/node-problem-detector/pkg/util"
"k8s.io/node-problem-detector/pkg/util/tomb"
)

type mockKmsgParser struct {
kmsgs []kmsgparser.Message
kmsgs []kmsgparser.Message
closeAfterSend bool
closeCalled bool
mu sync.Mutex
}

func (m *mockKmsgParser) SetLogger(kmsgparser.Logger) {}
func (m *mockKmsgParser) Close() error { return nil }

func (m *mockKmsgParser) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
m.closeCalled = true
return nil
}

func (m *mockKmsgParser) WasCloseCalled() bool {
m.mu.Lock()
defer m.mu.Unlock()
return m.closeCalled
}

func (m *mockKmsgParser) Parse() <-chan kmsgparser.Message {
c := make(chan kmsgparser.Message)
go func() {
for _, msg := range m.kmsgs {
c <- msg
}
if m.closeAfterSend {
close(c)
}
}()
return c
}

func (m *mockKmsgParser) SeekEnd() error { return nil }

func TestWatch(t *testing.T) {
Expand Down Expand Up @@ -169,3 +191,134 @@ func TestWatch(t *testing.T) {
}
}
}

func TestWatcherStopsGracefullyOnTombStop(t *testing.T) {
now := time.Now()

mock := &mockKmsgParser{
kmsgs: []kmsgparser.Message{
{Message: "test message", Timestamp: now},
},
closeAfterSend: false, // Don't close, let tomb stop it
}

w := &kernelLogWatcher{
cfg: types.WatcherConfig{},
startTime: now.Add(-time.Second),
tomb: tomb.NewTomb(),
logCh: make(chan *logtypes.Log, 100),
kmsgParser: mock,
}

logCh, err := w.Watch()
assert.NoError(t, err)

// Should receive the message
select {
case log := <-logCh:
assert.Equal(t, "test message", log.Message)
case <-time.After(time.Second):
t.Fatal("timeout waiting for log message")
}

// Stop the watcher
w.Stop()

// Log channel should be closed after stop
select {
case _, ok := <-logCh:
assert.False(t, ok, "log channel should be closed after Stop()")
case <-time.After(time.Second):
t.Fatal("timeout waiting for log channel to close after Stop()")
}

// Verify parser was closed
assert.True(t, mock.WasCloseCalled(), "parser Close() should have been called")
}

func TestWatcherProcessesEmptyMessages(t *testing.T) {
now := time.Now()

mock := &mockKmsgParser{
kmsgs: []kmsgparser.Message{
{Message: "", Timestamp: now},
{Message: "valid message", Timestamp: now.Add(time.Second)},
{Message: "", Timestamp: now.Add(2 * time.Second)},
},
closeAfterSend: false,
}

w := &kernelLogWatcher{
cfg: types.WatcherConfig{},
startTime: now.Add(-time.Second),
tomb: tomb.NewTomb(),
logCh: make(chan *logtypes.Log, 100),
kmsgParser: mock,
}

logCh, err := w.Watch()
assert.NoError(t, err)

// Should only receive the non-empty message
select {
case log := <-logCh:
assert.Equal(t, "valid message", log.Message)
case <-time.After(time.Second):
t.Fatal("timeout waiting for log message")
}

// Stop the watcher and verify channel closes
w.Stop()

select {
case _, ok := <-logCh:
assert.False(t, ok, "log channel should be closed after Stop()")
case <-time.After(time.Second):
t.Fatal("timeout waiting for log channel to close")
}
}

func TestWatcherTrimsMessageWhitespace(t *testing.T) {
now := time.Now()

mock := &mockKmsgParser{
kmsgs: []kmsgparser.Message{
{Message: " message with spaces ", Timestamp: now},
{Message: "\ttabbed message\t", Timestamp: now.Add(time.Second)},
{Message: "\n\nnewlines\n\n", Timestamp: now.Add(2 * time.Second)},
},
closeAfterSend: false,
}

w := &kernelLogWatcher{
cfg: types.WatcherConfig{},
startTime: now.Add(-time.Second),
tomb: tomb.NewTomb(),
logCh: make(chan *logtypes.Log, 100),
kmsgParser: mock,
}

logCh, err := w.Watch()
assert.NoError(t, err)

expectedMessages := []string{"message with spaces", "tabbed message", "newlines"}

for _, expected := range expectedMessages {
select {
case log := <-logCh:
assert.Equal(t, expected, log.Message)
case <-time.After(time.Second):
t.Fatalf("timeout waiting for message: %s", expected)
}
}

// Stop the watcher and verify channel closes
w.Stop()

select {
case _, ok := <-logCh:
assert.False(t, ok, "log channel should be closed after Stop()")
case <-time.After(time.Second):
t.Fatal("timeout waiting for log channel to close")
}
}
Loading