Add metrics hooks to the listener (#72)

* Add metrics hook

* extend current tests to cover recorder

* address nil passed in option, fix typo

* mocks
This commit is contained in:
Nikola Jokic
2026-02-17 10:12:57 +01:00
committed by GitHub
parent ebedf0bbbf
commit f9f801fb38
3 changed files with 303 additions and 64 deletions
+65 -10
View File
@@ -52,22 +52,57 @@ type Client interface {
Session() scaleset.RunnerScaleSetSession
}
type Option func(*Listener)
// MetricsRecorder defines the hook methods that will be called by the listener at
// various points in the message handling process. This allows for custom
// metrics to be collected without coupling the listener to a specific metrics
// implementation. The methods in this interface will be called with relevant
// information about the message handling process, such as the number of jobs
// started/completed, the desired runner count, and any errors that occur.
// Implementers can use this information to track the performance and behavior
// of the listener and the scaleset service.
type MetricsRecorder interface {
RecordStatistics(statistics *scaleset.RunnerScaleSetStatistic)
RecordJobStarted(msg *scaleset.JobStarted)
RecordJobCompleted(msg *scaleset.JobCompleted)
RecordDesiredRunners(count int)
}
type discardMetricsRecorder struct{}
func (d *discardMetricsRecorder) RecordStatistics(statistics *scaleset.RunnerScaleSetStatistic) {}
func (d *discardMetricsRecorder) RecordJobStarted(msg *scaleset.JobStarted) {}
func (d *discardMetricsRecorder) RecordJobCompleted(msg *scaleset.JobCompleted) {}
func (d *discardMetricsRecorder) RecordDesiredRunners(count int) {}
// Listener listens for messages from the scaleset service and handles them. It automatically handles session
// creation/deletion/refreshing and message polling and acking.
type Listener struct {
// The main client responsible for communicating with the scaleset service
client Client
client Client
metricsRecorder MetricsRecorder
// Configuration for the listener
scaleSetID int
maxRunners atomic.Uint32
scaleSetID int
maxRunners atomic.Uint32
latestStatistics *scaleset.RunnerScaleSetStatistic
// configuration for the listener
logger *slog.Logger
}
type Option func(*Listener)
// WithMetricsRecorder sets the MetricsRecorder for the listener. If not set, a no-op recorder will be used.
// If the nil is passed, the MetricsRecorder will not be updated and the existing one will be used (which is a no-op by default).
func WithMetricsRecorder(recorder MetricsRecorder) Option {
return func(l *Listener) {
if recorder == nil {
return
}
l.metricsRecorder = recorder
}
}
// SetMaxRunners sets the capacity of the scaleset. It is concurrently
// safe to update the max runners during listener.Run.
func (l *Listener) SetMaxRunners(count int) {
@@ -85,12 +120,17 @@ func New(client Client, config Config, options ...Option) (*Listener, error) {
}
listener := &Listener{
client: client,
scaleSetID: config.ScaleSetID,
logger: config.Logger,
client: client,
metricsRecorder: &discardMetricsRecorder{},
scaleSetID: config.ScaleSetID,
logger: config.Logger,
}
listener.SetMaxRunners(config.MaxRunners)
for _, option := range options {
option(listener)
}
return listener, nil
}
@@ -114,13 +154,17 @@ func (l *Listener) Run(ctx context.Context, scaler Scaler) error {
return fmt.Errorf("session statistics is nil")
}
l.handleStatistics(ctx, initialSession.Statistics)
l.logger.Info(
"Handling initial session statistics",
slog.Int("totalAssignedJobs", initialSession.Statistics.TotalAssignedJobs),
)
if _, err := scaler.HandleDesiredRunnerCount(ctx, initialSession.Statistics.TotalAssignedJobs); err != nil {
desiredCount, err := scaler.HandleDesiredRunnerCount(ctx, initialSession.Statistics.TotalAssignedJobs)
if err != nil {
return fmt.Errorf("handling initial message failed: %w", err)
}
l.metricsRecorder.RecordDesiredRunners(desiredCount)
}
var lastMessageID int
@@ -142,7 +186,7 @@ func (l *Listener) Run(ctx context.Context, scaler Scaler) error {
}
if msg == nil {
_, err := scaler.HandleDesiredRunnerCount(ctx, 0)
_, err := scaler.HandleDesiredRunnerCount(ctx, l.latestStatistics.TotalAssignedJobs)
if err != nil {
return fmt.Errorf("handling nil message failed: %w", err)
}
@@ -160,24 +204,35 @@ func (l *Listener) Run(ctx context.Context, scaler Scaler) error {
}
func (l *Listener) handleMessage(ctx context.Context, handler Scaler, msg *scaleset.RunnerScaleSetMessage) error {
l.handleStatistics(ctx, msg.Statistics)
if err := l.client.DeleteMessage(ctx, msg.MessageID); err != nil {
return fmt.Errorf("failed to delete message: %w", err)
}
for _, jobStarted := range msg.JobStartedMessages {
l.metricsRecorder.RecordJobStarted(jobStarted)
if err := handler.HandleJobStarted(ctx, jobStarted); err != nil {
return fmt.Errorf("failed to handle job started: %w", err)
}
}
for _, jobCompleted := range msg.JobCompletedMessages {
l.metricsRecorder.RecordJobCompleted(jobCompleted)
if err := handler.HandleJobCompleted(ctx, jobCompleted); err != nil {
return fmt.Errorf("failed to handle job completed: %w", err)
}
}
if _, err := handler.HandleDesiredRunnerCount(ctx, msg.Statistics.TotalAssignedJobs); err != nil {
desiredCount, err := handler.HandleDesiredRunnerCount(ctx, msg.Statistics.TotalAssignedJobs)
if err != nil {
return fmt.Errorf("failed to handle desired runner count: %w", err)
}
l.metricsRecorder.RecordDesiredRunners(desiredCount)
return nil
}
func (l *Listener) handleStatistics(ctx context.Context, msg *scaleset.RunnerScaleSetStatistic) {
l.latestStatistics = msg
l.metricsRecorder.RecordStatistics(msg)
}
+51 -54
View File
@@ -75,39 +75,37 @@ func TestListener_Run(t *testing.T) {
client := NewMockClient(t)
uuid := uuid.New()
initialStatistics := &scaleset.RunnerScaleSetStatistic{
TotalAssignedJobs: 2,
}
session := scaleset.RunnerScaleSetSession{
SessionID: uuid,
OwnerName: "example",
RunnerScaleSet: &scaleset.RunnerScaleSet{},
MessageQueueURL: "https://example.com",
MessageQueueAccessToken: "1234567890",
Statistics: &scaleset.RunnerScaleSetStatistic{},
Statistics: initialStatistics,
}
client.On("Session").Return(session).Once()
l, err := New(client, config)
metricsRecorder := NewMockMetricsRecorder(t)
metricsRecorder.On("RecordStatistics", initialStatistics).Once()
metricsRecorder.On("RecordDesiredRunners", initialStatistics.TotalAssignedJobs).
Return(initialStatistics.TotalAssignedJobs, nil).
Run(func(mock.Arguments) { cancel() }).
Once()
l, err := New(client, config, WithMetricsRecorder(metricsRecorder))
require.Nil(t, err)
var called bool
handler := NewMockScaler(t)
handler.On(
"HandleDesiredRunnerCount",
mock.Anything,
mock.Anything,
).
Return(0, nil).
Run(
func(mock.Arguments) {
called = true
cancel()
},
).
handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything).
Return(initialStatistics.TotalAssignedJobs, nil).
Once()
err = l.Run(ctx, handler)
assert.ErrorIs(t, err, context.Canceled)
assert.True(t, called)
})
t.Run("cancel context after get message", func(t *testing.T) {
@@ -120,61 +118,60 @@ func TestListener_Run(t *testing.T) {
MaxRunners: 10,
}
client := NewMockClient(t)
uuid := uuid.New()
initialStatistics := &scaleset.RunnerScaleSetStatistic{
TotalAssignedJobs: 2,
}
session := scaleset.RunnerScaleSetSession{
SessionID: uuid,
OwnerName: "example",
RunnerScaleSet: &scaleset.RunnerScaleSet{},
MessageQueueURL: "https://example.com",
MessageQueueAccessToken: "1234567890",
Statistics: &scaleset.RunnerScaleSetStatistic{},
Statistics: initialStatistics,
}
msg := &scaleset.RunnerScaleSetMessage{
MessageID: 1,
Statistics: &scaleset.RunnerScaleSetStatistic{},
MessageID: 1,
Statistics: &scaleset.RunnerScaleSetStatistic{
TotalAssignedJobs: 3,
},
}
client.On("Session").Return(session).Once()
client.On(
"GetMessage",
ctx,
mock.Anything,
10,
).
Return(msg, nil).
Run(
func(mock.Arguments) {
cancel()
},
).
Once()
// Ensure delete message is called without cancel
client.On(
"DeleteMessage",
context.WithoutCancel(ctx),
mock.Anything,
).Return(nil).Once()
metricsRecorder := NewMockMetricsRecorder(t)
client := NewMockClient(t)
handler := NewMockScaler(t)
handler.On(
"HandleDesiredRunnerCount",
mock.Anything,
0,
).
Return(0, nil).
client.On("Session").Return(session).Once()
metricsRecorder.On("RecordStatistics", initialStatistics).Once()
metricsRecorder.On("RecordDesiredRunners", initialStatistics.TotalAssignedJobs).
Return(initialStatistics.TotalAssignedJobs, nil).
Once()
handler.On("HandleDesiredRunnerCount", mock.Anything, initialStatistics.TotalAssignedJobs).
Return(initialStatistics.TotalAssignedJobs, nil).
Once()
handler.On(
"HandleDesiredRunnerCount",
mock.Anything,
mock.Anything,
).
Return(0, nil).
client.On("GetMessage", ctx, mock.Anything, 10).
Return(msg, nil).
Run(func(mock.Arguments) { cancel() }).
Once()
l, err := New(client, config)
metricsRecorder.On("RecordStatistics", msg.Statistics).Once()
// Ensure delete message is called without cancel
client.On("DeleteMessage", context.WithoutCancel(ctx), mock.Anything).
Return(nil).
Once()
metricsRecorder.On("RecordDesiredRunners", msg.Statistics.TotalAssignedJobs).
Return(msg.Statistics.TotalAssignedJobs, nil).
Once()
handler.On("HandleDesiredRunnerCount", mock.Anything, msg.Statistics.TotalAssignedJobs).
Return(msg.Statistics.TotalAssignedJobs, nil).
Once()
l, err := New(client, config, WithMetricsRecorder(metricsRecorder))
require.Nil(t, err)
err = l.Run(ctx, handler)
+187
View File
@@ -213,6 +213,193 @@ func (_c *MockClient_Session_Call) RunAndReturn(run func() scaleset.RunnerScaleS
return _c
}
// NewMockMetricsRecorder creates a new instance of MockMetricsRecorder. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockMetricsRecorder(t interface {
mock.TestingT
Cleanup(func())
}) *MockMetricsRecorder {
mock := &MockMetricsRecorder{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
// MockMetricsRecorder is an autogenerated mock type for the MetricsRecorder type
type MockMetricsRecorder struct {
mock.Mock
}
type MockMetricsRecorder_Expecter struct {
mock *mock.Mock
}
func (_m *MockMetricsRecorder) EXPECT() *MockMetricsRecorder_Expecter {
return &MockMetricsRecorder_Expecter{mock: &_m.Mock}
}
// RecordDesiredRunners provides a mock function for the type MockMetricsRecorder
func (_mock *MockMetricsRecorder) RecordDesiredRunners(count int) {
_mock.Called(count)
return
}
// MockMetricsRecorder_RecordDesiredRunners_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordDesiredRunners'
type MockMetricsRecorder_RecordDesiredRunners_Call struct {
*mock.Call
}
// RecordDesiredRunners is a helper method to define mock.On call
// - count int
func (_e *MockMetricsRecorder_Expecter) RecordDesiredRunners(count interface{}) *MockMetricsRecorder_RecordDesiredRunners_Call {
return &MockMetricsRecorder_RecordDesiredRunners_Call{Call: _e.mock.On("RecordDesiredRunners", count)}
}
func (_c *MockMetricsRecorder_RecordDesiredRunners_Call) Run(run func(count int)) *MockMetricsRecorder_RecordDesiredRunners_Call {
_c.Call.Run(func(args mock.Arguments) {
var arg0 int
if args[0] != nil {
arg0 = args[0].(int)
}
run(
arg0,
)
})
return _c
}
func (_c *MockMetricsRecorder_RecordDesiredRunners_Call) Return() *MockMetricsRecorder_RecordDesiredRunners_Call {
_c.Call.Return()
return _c
}
func (_c *MockMetricsRecorder_RecordDesiredRunners_Call) RunAndReturn(run func(count int)) *MockMetricsRecorder_RecordDesiredRunners_Call {
_c.Run(run)
return _c
}
// RecordJobCompleted provides a mock function for the type MockMetricsRecorder
func (_mock *MockMetricsRecorder) RecordJobCompleted(msg *scaleset.JobCompleted) {
_mock.Called(msg)
return
}
// MockMetricsRecorder_RecordJobCompleted_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordJobCompleted'
type MockMetricsRecorder_RecordJobCompleted_Call struct {
*mock.Call
}
// RecordJobCompleted is a helper method to define mock.On call
// - msg *scaleset.JobCompleted
func (_e *MockMetricsRecorder_Expecter) RecordJobCompleted(msg interface{}) *MockMetricsRecorder_RecordJobCompleted_Call {
return &MockMetricsRecorder_RecordJobCompleted_Call{Call: _e.mock.On("RecordJobCompleted", msg)}
}
func (_c *MockMetricsRecorder_RecordJobCompleted_Call) Run(run func(msg *scaleset.JobCompleted)) *MockMetricsRecorder_RecordJobCompleted_Call {
_c.Call.Run(func(args mock.Arguments) {
var arg0 *scaleset.JobCompleted
if args[0] != nil {
arg0 = args[0].(*scaleset.JobCompleted)
}
run(
arg0,
)
})
return _c
}
func (_c *MockMetricsRecorder_RecordJobCompleted_Call) Return() *MockMetricsRecorder_RecordJobCompleted_Call {
_c.Call.Return()
return _c
}
func (_c *MockMetricsRecorder_RecordJobCompleted_Call) RunAndReturn(run func(msg *scaleset.JobCompleted)) *MockMetricsRecorder_RecordJobCompleted_Call {
_c.Run(run)
return _c
}
// RecordJobStarted provides a mock function for the type MockMetricsRecorder
func (_mock *MockMetricsRecorder) RecordJobStarted(msg *scaleset.JobStarted) {
_mock.Called(msg)
return
}
// MockMetricsRecorder_RecordJobStarted_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordJobStarted'
type MockMetricsRecorder_RecordJobStarted_Call struct {
*mock.Call
}
// RecordJobStarted is a helper method to define mock.On call
// - msg *scaleset.JobStarted
func (_e *MockMetricsRecorder_Expecter) RecordJobStarted(msg interface{}) *MockMetricsRecorder_RecordJobStarted_Call {
return &MockMetricsRecorder_RecordJobStarted_Call{Call: _e.mock.On("RecordJobStarted", msg)}
}
func (_c *MockMetricsRecorder_RecordJobStarted_Call) Run(run func(msg *scaleset.JobStarted)) *MockMetricsRecorder_RecordJobStarted_Call {
_c.Call.Run(func(args mock.Arguments) {
var arg0 *scaleset.JobStarted
if args[0] != nil {
arg0 = args[0].(*scaleset.JobStarted)
}
run(
arg0,
)
})
return _c
}
func (_c *MockMetricsRecorder_RecordJobStarted_Call) Return() *MockMetricsRecorder_RecordJobStarted_Call {
_c.Call.Return()
return _c
}
func (_c *MockMetricsRecorder_RecordJobStarted_Call) RunAndReturn(run func(msg *scaleset.JobStarted)) *MockMetricsRecorder_RecordJobStarted_Call {
_c.Run(run)
return _c
}
// RecordStatistics provides a mock function for the type MockMetricsRecorder
func (_mock *MockMetricsRecorder) RecordStatistics(statistics *scaleset.RunnerScaleSetStatistic) {
_mock.Called(statistics)
return
}
// MockMetricsRecorder_RecordStatistics_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordStatistics'
type MockMetricsRecorder_RecordStatistics_Call struct {
*mock.Call
}
// RecordStatistics is a helper method to define mock.On call
// - statistics *scaleset.RunnerScaleSetStatistic
func (_e *MockMetricsRecorder_Expecter) RecordStatistics(statistics interface{}) *MockMetricsRecorder_RecordStatistics_Call {
return &MockMetricsRecorder_RecordStatistics_Call{Call: _e.mock.On("RecordStatistics", statistics)}
}
func (_c *MockMetricsRecorder_RecordStatistics_Call) Run(run func(statistics *scaleset.RunnerScaleSetStatistic)) *MockMetricsRecorder_RecordStatistics_Call {
_c.Call.Run(func(args mock.Arguments) {
var arg0 *scaleset.RunnerScaleSetStatistic
if args[0] != nil {
arg0 = args[0].(*scaleset.RunnerScaleSetStatistic)
}
run(
arg0,
)
})
return _c
}
func (_c *MockMetricsRecorder_RecordStatistics_Call) Return() *MockMetricsRecorder_RecordStatistics_Call {
_c.Call.Return()
return _c
}
func (_c *MockMetricsRecorder_RecordStatistics_Call) RunAndReturn(run func(statistics *scaleset.RunnerScaleSetStatistic)) *MockMetricsRecorder_RecordStatistics_Call {
_c.Run(run)
return _c
}
// NewMockScaler creates a new instance of MockScaler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockScaler(t interface {