diff --git a/listener/listener.go b/listener/listener.go index 907ab12..0db0776 100644 --- a/listener/listener.go +++ b/listener/listener.go @@ -52,22 +52,57 @@ type Client interface { Session() scaleset.RunnerScaleSetSession } -type Option func(*Listener) +// MetricsRecorder defines the hook methods that will be called by the listener at +// various points in the message handling process. This allows for custom +// metrics to be collected without coupling the listener to a specific metrics +// implementation. The methods in this interface will be called with relevant +// information about the message handling process, such as the number of jobs +// started/completed, the desired runner count, and any errors that occur. +// Implementers can use this information to track the performance and behavior +// of the listener and the scaleset service. +type MetricsRecorder interface { + RecordStatistics(statistics *scaleset.RunnerScaleSetStatistic) + RecordJobStarted(msg *scaleset.JobStarted) + RecordJobCompleted(msg *scaleset.JobCompleted) + RecordDesiredRunners(count int) +} + +type discardMetricsRecorder struct{} + +func (d *discardMetricsRecorder) RecordStatistics(statistics *scaleset.RunnerScaleSetStatistic) {} +func (d *discardMetricsRecorder) RecordJobStarted(msg *scaleset.JobStarted) {} +func (d *discardMetricsRecorder) RecordJobCompleted(msg *scaleset.JobCompleted) {} +func (d *discardMetricsRecorder) RecordDesiredRunners(count int) {} // Listener listens for messages from the scaleset service and handles them. It automatically handles session // creation/deletion/refreshing and message polling and acking. type Listener struct { // The main client responsible for communicating with the scaleset service - client Client + client Client + metricsRecorder MetricsRecorder // Configuration for the listener - scaleSetID int - maxRunners atomic.Uint32 + scaleSetID int + maxRunners atomic.Uint32 + latestStatistics *scaleset.RunnerScaleSetStatistic // configuration for the listener logger *slog.Logger } +type Option func(*Listener) + +// WithMetricsRecorder sets the MetricsRecorder for the listener. If not set, a no-op recorder will be used. +// If the nil is passed, the MetricsRecorder will not be updated and the existing one will be used (which is a no-op by default). +func WithMetricsRecorder(recorder MetricsRecorder) Option { + return func(l *Listener) { + if recorder == nil { + return + } + l.metricsRecorder = recorder + } +} + // SetMaxRunners sets the capacity of the scaleset. It is concurrently // safe to update the max runners during listener.Run. func (l *Listener) SetMaxRunners(count int) { @@ -85,12 +120,17 @@ func New(client Client, config Config, options ...Option) (*Listener, error) { } listener := &Listener{ - client: client, - scaleSetID: config.ScaleSetID, - logger: config.Logger, + client: client, + metricsRecorder: &discardMetricsRecorder{}, + scaleSetID: config.ScaleSetID, + logger: config.Logger, } listener.SetMaxRunners(config.MaxRunners) + for _, option := range options { + option(listener) + } + return listener, nil } @@ -114,13 +154,17 @@ func (l *Listener) Run(ctx context.Context, scaler Scaler) error { return fmt.Errorf("session statistics is nil") } + l.handleStatistics(ctx, initialSession.Statistics) + l.logger.Info( "Handling initial session statistics", slog.Int("totalAssignedJobs", initialSession.Statistics.TotalAssignedJobs), ) - if _, err := scaler.HandleDesiredRunnerCount(ctx, initialSession.Statistics.TotalAssignedJobs); err != nil { + desiredCount, err := scaler.HandleDesiredRunnerCount(ctx, initialSession.Statistics.TotalAssignedJobs) + if err != nil { return fmt.Errorf("handling initial message failed: %w", err) } + l.metricsRecorder.RecordDesiredRunners(desiredCount) } var lastMessageID int @@ -142,7 +186,7 @@ func (l *Listener) Run(ctx context.Context, scaler Scaler) error { } if msg == nil { - _, err := scaler.HandleDesiredRunnerCount(ctx, 0) + _, err := scaler.HandleDesiredRunnerCount(ctx, l.latestStatistics.TotalAssignedJobs) if err != nil { return fmt.Errorf("handling nil message failed: %w", err) } @@ -160,24 +204,35 @@ func (l *Listener) Run(ctx context.Context, scaler Scaler) error { } func (l *Listener) handleMessage(ctx context.Context, handler Scaler, msg *scaleset.RunnerScaleSetMessage) error { + l.handleStatistics(ctx, msg.Statistics) + if err := l.client.DeleteMessage(ctx, msg.MessageID); err != nil { return fmt.Errorf("failed to delete message: %w", err) } for _, jobStarted := range msg.JobStartedMessages { + l.metricsRecorder.RecordJobStarted(jobStarted) if err := handler.HandleJobStarted(ctx, jobStarted); err != nil { return fmt.Errorf("failed to handle job started: %w", err) } } for _, jobCompleted := range msg.JobCompletedMessages { + l.metricsRecorder.RecordJobCompleted(jobCompleted) if err := handler.HandleJobCompleted(ctx, jobCompleted); err != nil { return fmt.Errorf("failed to handle job completed: %w", err) } } - if _, err := handler.HandleDesiredRunnerCount(ctx, msg.Statistics.TotalAssignedJobs); err != nil { + desiredCount, err := handler.HandleDesiredRunnerCount(ctx, msg.Statistics.TotalAssignedJobs) + if err != nil { return fmt.Errorf("failed to handle desired runner count: %w", err) } + l.metricsRecorder.RecordDesiredRunners(desiredCount) return nil } + +func (l *Listener) handleStatistics(ctx context.Context, msg *scaleset.RunnerScaleSetStatistic) { + l.latestStatistics = msg + l.metricsRecorder.RecordStatistics(msg) +} diff --git a/listener/listener_test.go b/listener/listener_test.go index bc1045e..776508a 100644 --- a/listener/listener_test.go +++ b/listener/listener_test.go @@ -75,39 +75,37 @@ func TestListener_Run(t *testing.T) { client := NewMockClient(t) uuid := uuid.New() + initialStatistics := &scaleset.RunnerScaleSetStatistic{ + TotalAssignedJobs: 2, + } session := scaleset.RunnerScaleSetSession{ SessionID: uuid, OwnerName: "example", RunnerScaleSet: &scaleset.RunnerScaleSet{}, MessageQueueURL: "https://example.com", MessageQueueAccessToken: "1234567890", - Statistics: &scaleset.RunnerScaleSetStatistic{}, + Statistics: initialStatistics, } client.On("Session").Return(session).Once() - l, err := New(client, config) + metricsRecorder := NewMockMetricsRecorder(t) + metricsRecorder.On("RecordStatistics", initialStatistics).Once() + metricsRecorder.On("RecordDesiredRunners", initialStatistics.TotalAssignedJobs). + Return(initialStatistics.TotalAssignedJobs, nil). + Run(func(mock.Arguments) { cancel() }). + Once() + + l, err := New(client, config, WithMetricsRecorder(metricsRecorder)) require.Nil(t, err) - var called bool handler := NewMockScaler(t) - handler.On( - "HandleDesiredRunnerCount", - mock.Anything, - mock.Anything, - ). - Return(0, nil). - Run( - func(mock.Arguments) { - called = true - cancel() - }, - ). + handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything). + Return(initialStatistics.TotalAssignedJobs, nil). Once() err = l.Run(ctx, handler) assert.ErrorIs(t, err, context.Canceled) - assert.True(t, called) }) t.Run("cancel context after get message", func(t *testing.T) { @@ -120,61 +118,60 @@ func TestListener_Run(t *testing.T) { MaxRunners: 10, } - client := NewMockClient(t) uuid := uuid.New() + initialStatistics := &scaleset.RunnerScaleSetStatistic{ + TotalAssignedJobs: 2, + } + session := scaleset.RunnerScaleSetSession{ SessionID: uuid, OwnerName: "example", RunnerScaleSet: &scaleset.RunnerScaleSet{}, MessageQueueURL: "https://example.com", MessageQueueAccessToken: "1234567890", - Statistics: &scaleset.RunnerScaleSetStatistic{}, + Statistics: initialStatistics, } msg := &scaleset.RunnerScaleSetMessage{ - MessageID: 1, - Statistics: &scaleset.RunnerScaleSetStatistic{}, + MessageID: 1, + Statistics: &scaleset.RunnerScaleSetStatistic{ + TotalAssignedJobs: 3, + }, } - client.On("Session").Return(session).Once() - client.On( - "GetMessage", - ctx, - mock.Anything, - 10, - ). - Return(msg, nil). - Run( - func(mock.Arguments) { - cancel() - }, - ). - Once() - - // Ensure delete message is called without cancel - client.On( - "DeleteMessage", - context.WithoutCancel(ctx), - mock.Anything, - ).Return(nil).Once() + metricsRecorder := NewMockMetricsRecorder(t) + client := NewMockClient(t) handler := NewMockScaler(t) - handler.On( - "HandleDesiredRunnerCount", - mock.Anything, - 0, - ). - Return(0, nil). + + client.On("Session").Return(session).Once() + metricsRecorder.On("RecordStatistics", initialStatistics).Once() + metricsRecorder.On("RecordDesiredRunners", initialStatistics.TotalAssignedJobs). + Return(initialStatistics.TotalAssignedJobs, nil). + Once() + handler.On("HandleDesiredRunnerCount", mock.Anything, initialStatistics.TotalAssignedJobs). + Return(initialStatistics.TotalAssignedJobs, nil). Once() - handler.On( - "HandleDesiredRunnerCount", - mock.Anything, - mock.Anything, - ). - Return(0, nil). + client.On("GetMessage", ctx, mock.Anything, 10). + Return(msg, nil). + Run(func(mock.Arguments) { cancel() }). Once() - l, err := New(client, config) + metricsRecorder.On("RecordStatistics", msg.Statistics).Once() + // Ensure delete message is called without cancel + client.On("DeleteMessage", context.WithoutCancel(ctx), mock.Anything). + Return(nil). + Once() + + metricsRecorder.On("RecordDesiredRunners", msg.Statistics.TotalAssignedJobs). + Return(msg.Statistics.TotalAssignedJobs, nil). + Once() + + handler.On("HandleDesiredRunnerCount", mock.Anything, msg.Statistics.TotalAssignedJobs). + Return(msg.Statistics.TotalAssignedJobs, nil). + Once() + + l, err := New(client, config, WithMetricsRecorder(metricsRecorder)) require.Nil(t, err) err = l.Run(ctx, handler) diff --git a/listener/mocks_test.go b/listener/mocks_test.go index d46c47d..f045a06 100644 --- a/listener/mocks_test.go +++ b/listener/mocks_test.go @@ -213,6 +213,193 @@ func (_c *MockClient_Session_Call) RunAndReturn(run func() scaleset.RunnerScaleS return _c } +// NewMockMetricsRecorder creates a new instance of MockMetricsRecorder. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockMetricsRecorder(t interface { + mock.TestingT + Cleanup(func()) +}) *MockMetricsRecorder { + mock := &MockMetricsRecorder{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} + +// MockMetricsRecorder is an autogenerated mock type for the MetricsRecorder type +type MockMetricsRecorder struct { + mock.Mock +} + +type MockMetricsRecorder_Expecter struct { + mock *mock.Mock +} + +func (_m *MockMetricsRecorder) EXPECT() *MockMetricsRecorder_Expecter { + return &MockMetricsRecorder_Expecter{mock: &_m.Mock} +} + +// RecordDesiredRunners provides a mock function for the type MockMetricsRecorder +func (_mock *MockMetricsRecorder) RecordDesiredRunners(count int) { + _mock.Called(count) + return +} + +// MockMetricsRecorder_RecordDesiredRunners_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordDesiredRunners' +type MockMetricsRecorder_RecordDesiredRunners_Call struct { + *mock.Call +} + +// RecordDesiredRunners is a helper method to define mock.On call +// - count int +func (_e *MockMetricsRecorder_Expecter) RecordDesiredRunners(count interface{}) *MockMetricsRecorder_RecordDesiredRunners_Call { + return &MockMetricsRecorder_RecordDesiredRunners_Call{Call: _e.mock.On("RecordDesiredRunners", count)} +} + +func (_c *MockMetricsRecorder_RecordDesiredRunners_Call) Run(run func(count int)) *MockMetricsRecorder_RecordDesiredRunners_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 int + if args[0] != nil { + arg0 = args[0].(int) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *MockMetricsRecorder_RecordDesiredRunners_Call) Return() *MockMetricsRecorder_RecordDesiredRunners_Call { + _c.Call.Return() + return _c +} + +func (_c *MockMetricsRecorder_RecordDesiredRunners_Call) RunAndReturn(run func(count int)) *MockMetricsRecorder_RecordDesiredRunners_Call { + _c.Run(run) + return _c +} + +// RecordJobCompleted provides a mock function for the type MockMetricsRecorder +func (_mock *MockMetricsRecorder) RecordJobCompleted(msg *scaleset.JobCompleted) { + _mock.Called(msg) + return +} + +// MockMetricsRecorder_RecordJobCompleted_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordJobCompleted' +type MockMetricsRecorder_RecordJobCompleted_Call struct { + *mock.Call +} + +// RecordJobCompleted is a helper method to define mock.On call +// - msg *scaleset.JobCompleted +func (_e *MockMetricsRecorder_Expecter) RecordJobCompleted(msg interface{}) *MockMetricsRecorder_RecordJobCompleted_Call { + return &MockMetricsRecorder_RecordJobCompleted_Call{Call: _e.mock.On("RecordJobCompleted", msg)} +} + +func (_c *MockMetricsRecorder_RecordJobCompleted_Call) Run(run func(msg *scaleset.JobCompleted)) *MockMetricsRecorder_RecordJobCompleted_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 *scaleset.JobCompleted + if args[0] != nil { + arg0 = args[0].(*scaleset.JobCompleted) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *MockMetricsRecorder_RecordJobCompleted_Call) Return() *MockMetricsRecorder_RecordJobCompleted_Call { + _c.Call.Return() + return _c +} + +func (_c *MockMetricsRecorder_RecordJobCompleted_Call) RunAndReturn(run func(msg *scaleset.JobCompleted)) *MockMetricsRecorder_RecordJobCompleted_Call { + _c.Run(run) + return _c +} + +// RecordJobStarted provides a mock function for the type MockMetricsRecorder +func (_mock *MockMetricsRecorder) RecordJobStarted(msg *scaleset.JobStarted) { + _mock.Called(msg) + return +} + +// MockMetricsRecorder_RecordJobStarted_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordJobStarted' +type MockMetricsRecorder_RecordJobStarted_Call struct { + *mock.Call +} + +// RecordJobStarted is a helper method to define mock.On call +// - msg *scaleset.JobStarted +func (_e *MockMetricsRecorder_Expecter) RecordJobStarted(msg interface{}) *MockMetricsRecorder_RecordJobStarted_Call { + return &MockMetricsRecorder_RecordJobStarted_Call{Call: _e.mock.On("RecordJobStarted", msg)} +} + +func (_c *MockMetricsRecorder_RecordJobStarted_Call) Run(run func(msg *scaleset.JobStarted)) *MockMetricsRecorder_RecordJobStarted_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 *scaleset.JobStarted + if args[0] != nil { + arg0 = args[0].(*scaleset.JobStarted) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *MockMetricsRecorder_RecordJobStarted_Call) Return() *MockMetricsRecorder_RecordJobStarted_Call { + _c.Call.Return() + return _c +} + +func (_c *MockMetricsRecorder_RecordJobStarted_Call) RunAndReturn(run func(msg *scaleset.JobStarted)) *MockMetricsRecorder_RecordJobStarted_Call { + _c.Run(run) + return _c +} + +// RecordStatistics provides a mock function for the type MockMetricsRecorder +func (_mock *MockMetricsRecorder) RecordStatistics(statistics *scaleset.RunnerScaleSetStatistic) { + _mock.Called(statistics) + return +} + +// MockMetricsRecorder_RecordStatistics_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordStatistics' +type MockMetricsRecorder_RecordStatistics_Call struct { + *mock.Call +} + +// RecordStatistics is a helper method to define mock.On call +// - statistics *scaleset.RunnerScaleSetStatistic +func (_e *MockMetricsRecorder_Expecter) RecordStatistics(statistics interface{}) *MockMetricsRecorder_RecordStatistics_Call { + return &MockMetricsRecorder_RecordStatistics_Call{Call: _e.mock.On("RecordStatistics", statistics)} +} + +func (_c *MockMetricsRecorder_RecordStatistics_Call) Run(run func(statistics *scaleset.RunnerScaleSetStatistic)) *MockMetricsRecorder_RecordStatistics_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 *scaleset.RunnerScaleSetStatistic + if args[0] != nil { + arg0 = args[0].(*scaleset.RunnerScaleSetStatistic) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *MockMetricsRecorder_RecordStatistics_Call) Return() *MockMetricsRecorder_RecordStatistics_Call { + _c.Call.Return() + return _c +} + +func (_c *MockMetricsRecorder_RecordStatistics_Call) RunAndReturn(run func(statistics *scaleset.RunnerScaleSetStatistic)) *MockMetricsRecorder_RecordStatistics_Call { + _c.Run(run) + return _c +} + // NewMockScaler creates a new instance of MockScaler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockScaler(t interface {