Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 44d2ec9e5b | |||
| feb84c6d04 | |||
| 0488917cf3 | |||
| f9f801fb38 | |||
| ebedf0bbbf | |||
| efa922e8dd | |||
| ff25a89ba7 | |||
| 2f9b84ee5a | |||
| 63a0a32683 | |||
| e4a017ce06 |
@@ -65,4 +65,4 @@ jobs:
|
||||
go-version-file: "go.mod"
|
||||
cache: true
|
||||
- name: Run tests
|
||||
run: go test ./...
|
||||
run: go test ./... -race
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2025 GitHub
|
||||
Copyright GitHub, Inc.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# GitHub Actions Runner Scale Set Client (Private Preview)
|
||||
# GitHub Actions Runner Scale Set Client (Public Preview)
|
||||
|
||||
> Status: **Private Preview** – While the API is stable, interfaces and examples in this repository may change.
|
||||
> Status: **Public Preview** – While the API is stable, interfaces and examples in this repository may change.
|
||||
|
||||
This repository provides a standalone Go client for the GitHub Actions **Runner Scale Set** APIs. It is extracted from the `actions-runner-controller` project so that platform teams, integrators, and infrastructure providers can build **their own custom autoscaling solutions** for GitHub Actions runners.
|
||||
|
||||
@@ -12,7 +12,7 @@ You do *not* need to adopt the full controller (and Kubernetes) to take advantag
|
||||
|
||||
A runner scale set is a group of self-hosted runners that autoscales based on workflow demand. Here's how it works:
|
||||
|
||||
1. **Registration**: You create a scale set with a name, which also serves as the label workflows use to target it (e.g., `runs-on: my-scale-set`). Like regular self-hosted runners, scale sets can be registered at the repository, organization, or enterprise level.
|
||||
1. **Registration**: You create a scale set with a name, which also serves as the label workflows use to target it (e.g., `runs-on: my-scale-set`). Multiple labels can be assigned per scale set. Like regular self-hosted runners, scale sets can be registered at the repository, organization, or enterprise level.
|
||||
2. **Polling**: Your scale set client continuously polls the API, reporting its maximum capacity (how many runners it can produce).
|
||||
3. **Job matching**: GitHub matches jobs to your scale set based on the label and runner group policies, just like regular self-hosted runners.
|
||||
4. **Scaling signal**: The API responds with how many runners your scale set needs online (`statistics.TotalAssignedJobs`).
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"maps"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@@ -127,9 +126,9 @@ type ProxyFunc func(req *http.Request) (*url.URL, error)
|
||||
type SystemInfo struct {
|
||||
// System is the name of the scale set implementation
|
||||
System string `json:"system"`
|
||||
// Version is the version of the controller
|
||||
// Version is the version of the client
|
||||
Version string `json:"version"`
|
||||
// CommitSHA is the git commit SHA of the controller
|
||||
// CommitSHA is the git commit SHA of the client
|
||||
CommitSHA string `json:"commit_sha"`
|
||||
// ScaleSetID is the ID of the scale set
|
||||
ScaleSetID int `json:"scale_set_id"`
|
||||
@@ -223,6 +222,7 @@ type userAgent struct {
|
||||
SystemInfo
|
||||
BuildVersion string `json:"build_version"`
|
||||
BuildCommitSHA string `json:"build_commit_sha"`
|
||||
Kind string `json:"kind"`
|
||||
}
|
||||
|
||||
func (c *Client) newGitHubAPIRequest(ctx context.Context, method, path string, body io.Reader) (*http.Request, error) {
|
||||
@@ -546,7 +546,6 @@ func parseRunnerScaleSetMessageResponse(respBody io.Reader) (*RunnerScaleSetMess
|
||||
// It exposes client options that could be overwritten, providing ability to specify different retry policies or TLS settings, proxy, etc.
|
||||
func (c *Client) MessageSessionClient(ctx context.Context, runnerScaleSetID int, owner string, options ...HTTPOption) (*MessageSessionClient, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// Copy original options
|
||||
httpClientOption := c.httpClientOption
|
||||
@@ -567,6 +566,8 @@ func (c *Client) MessageSessionClient(ctx context.Context, runnerScaleSetID int,
|
||||
scaleSetID: runnerScaleSetID,
|
||||
session: nil,
|
||||
}
|
||||
// Unlock the client to allow createMessageSession to call public methods that require locking
|
||||
c.mu.Unlock()
|
||||
|
||||
if err := client.createMessageSession(ctx); err != nil {
|
||||
return nil, fmt.Errorf("failed to create message session: %w", err)
|
||||
@@ -836,8 +837,6 @@ func (c *Client) getActionsServiceAdminConnection(ctx context.Context, rt *regis
|
||||
return nil, fmt.Errorf("failed to get actions service admin connection: %w", err)
|
||||
}
|
||||
|
||||
slog.Info("got connection", *adminConnection.ActionsServiceURL, c.userAgent)
|
||||
|
||||
return adminConnection, nil
|
||||
}
|
||||
|
||||
|
||||
+56
-13
@@ -75,12 +75,20 @@ func sendRequest(c *http.Client, req *http.Request) (*http.Response, error) {
|
||||
}
|
||||
|
||||
type httpClientOption struct {
|
||||
logger *slog.Logger
|
||||
retryMax int
|
||||
retryWaitMax time.Duration
|
||||
logger *slog.Logger
|
||||
|
||||
// Options for built-in retryable HTTP client.
|
||||
// Ignored if a custom retryable HTTP client is provided via WithRetryableHTTPClint.
|
||||
retryMax int
|
||||
retryWaitMax time.Duration
|
||||
|
||||
// fields added to the transport if specified
|
||||
rootCAs *x509.CertPool
|
||||
tlsInsecureSkipVerify bool
|
||||
proxyFunc ProxyFunc
|
||||
timeout time.Duration
|
||||
|
||||
retryableHTTPClient *retryablehttp.Client
|
||||
}
|
||||
|
||||
func (o *httpClientOption) defaults() {
|
||||
@@ -93,14 +101,28 @@ func (o *httpClientOption) defaults() {
|
||||
if o.retryWaitMax == 0 {
|
||||
o.retryWaitMax = 30 * time.Second
|
||||
}
|
||||
if o.timeout == 0 {
|
||||
o.timeout = 5 * time.Minute
|
||||
}
|
||||
}
|
||||
|
||||
func (o *httpClientOption) newRetryableHTTPClient() (*retryablehttp.Client, error) {
|
||||
retryClient := retryablehttp.NewClient()
|
||||
retryClient.Logger = o.logger
|
||||
retryClient.RetryMax = o.retryMax
|
||||
retryClient.RetryWaitMax = o.retryWaitMax
|
||||
retryClient.HTTPClient.Timeout = 5 * time.Minute // timeout must be > 1m to accomodate long polling
|
||||
var retryClient *retryablehttp.Client
|
||||
if o.retryableHTTPClient != nil {
|
||||
retryClient = o.retryableHTTPClient
|
||||
} else {
|
||||
retryClient = retryablehttp.NewClient()
|
||||
retryClient.RetryMax = o.retryMax
|
||||
retryClient.RetryWaitMax = o.retryWaitMax
|
||||
}
|
||||
|
||||
if retryClient.HTTPClient.Timeout == 0 {
|
||||
retryClient.HTTPClient.Timeout = o.timeout
|
||||
}
|
||||
|
||||
if retryClient.Logger == nil {
|
||||
retryClient.Logger = o.logger
|
||||
}
|
||||
|
||||
transport, ok := retryClient.HTTPClient.Transport.(*http.Transport)
|
||||
if !ok {
|
||||
@@ -120,7 +142,9 @@ func (o *httpClientOption) newRetryableHTTPClient() (*retryablehttp.Client, erro
|
||||
transport.TLSClientConfig.InsecureSkipVerify = true
|
||||
}
|
||||
|
||||
transport.Proxy = o.proxyFunc
|
||||
if o.proxyFunc != nil {
|
||||
transport.Proxy = o.proxyFunc
|
||||
}
|
||||
|
||||
retryClient.HTTPClient.Transport = transport
|
||||
|
||||
@@ -137,18 +161,30 @@ func (c *commonClient) setUserAgent() {
|
||||
SystemInfo: c.systemInfo,
|
||||
BuildVersion: buildInfo.version,
|
||||
BuildCommitSHA: buildInfo.commitSHA,
|
||||
Kind: "scaleset",
|
||||
})
|
||||
c.userAgent = string(b)
|
||||
slog.Info("user agent", "useragent", c.userAgent)
|
||||
}
|
||||
|
||||
// HTTPOption defines a functional option for configuring the Client.
|
||||
type HTTPOption func(*httpClientOption)
|
||||
|
||||
// WithLogger sets a custom logger for the Client.
|
||||
func WithLogger(logger slog.Logger) HTTPOption {
|
||||
// WithRetryableHTTPClint allows users to provide a custom retryable HTTP client.
|
||||
// If not set, a default client will be used with the specified retry and timeout settings.
|
||||
func WithRetryableHTTPClint(client *retryablehttp.Client) HTTPOption {
|
||||
return func(c *httpClientOption) {
|
||||
c.logger = &logger
|
||||
c.retryableHTTPClient = client
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger sets a custom logger for the Client.
|
||||
// If nil is passed, a discard logger will be used.
|
||||
func WithLogger(logger *slog.Logger) HTTPOption {
|
||||
return func(c *httpClientOption) {
|
||||
if logger == nil {
|
||||
logger = slog.New(slog.DiscardHandler)
|
||||
}
|
||||
c.logger = logger
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,3 +222,10 @@ func WithProxy(proxyFunc ProxyFunc) HTTPOption {
|
||||
c.proxyFunc = proxyFunc
|
||||
}
|
||||
}
|
||||
|
||||
// WithTimeout sets a timeout for the Client.
|
||||
func WithTimeout(duration time.Duration) HTTPOption {
|
||||
return func(c *httpClientOption) {
|
||||
c.timeout = duration
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,8 +7,10 @@ import (
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/actions/scaleset/internal/testserver"
|
||||
"github.com/hashicorp/go-retryablehttp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/net/http/httpproxy"
|
||||
@@ -124,6 +126,7 @@ func TestUserAgent(t *testing.T) {
|
||||
SystemInfo: testSystemInfo,
|
||||
BuildCommitSHA: sha,
|
||||
BuildVersion: version,
|
||||
Kind: "scaleset",
|
||||
}
|
||||
b, err := json.Marshal(wantInfo)
|
||||
require.NoError(t, err, "failed to marshal expected user agent")
|
||||
@@ -144,6 +147,7 @@ func TestUserAgent(t *testing.T) {
|
||||
SystemInfo: userAgentInfo,
|
||||
BuildCommitSHA: sha,
|
||||
BuildVersion: version,
|
||||
Kind: "scaleset",
|
||||
}
|
||||
b, err = json.Marshal(wantInfo)
|
||||
require.NoError(t, err, "failed to marshal expected user agent after SetSystemInfo")
|
||||
@@ -151,3 +155,88 @@ func TestUserAgent(t *testing.T) {
|
||||
|
||||
assert.Equal(t, want, got)
|
||||
}
|
||||
|
||||
// TestWithRetryableHTTPClient verifies that a custom retryable HTTP client
|
||||
// provided via WithRetryableHTTPClient is actually used instead of the built-in one
|
||||
func TestWithRetryableHTTPClient(t *testing.T) {
|
||||
t.Run("uses custom retryable client instead of built-in", func(t *testing.T) {
|
||||
attemptCount := 0
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
attemptCount++
|
||||
if attemptCount == 1 {
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte(`{"result": "success"}`))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
// Create a custom retryable HTTP client with specific retry configuration
|
||||
customRetryClient := retryablehttp.NewClient()
|
||||
customRetryClient.RetryMax = 3
|
||||
customRetryClient.RetryWaitMax = 10 * time.Millisecond
|
||||
|
||||
// Create options with the custom retryable client
|
||||
opts := defaultHTTPClientOption()
|
||||
WithRetryableHTTPClint(customRetryClient)(&opts)
|
||||
|
||||
// Verify that the custom client is set in options
|
||||
assert.NotNil(t, opts.retryableHTTPClient)
|
||||
assert.Equal(t, customRetryClient, opts.retryableHTTPClient)
|
||||
|
||||
// Create the common client with custom retryable client
|
||||
client := newCommonClient(testSystemInfo, opts)
|
||||
|
||||
// Make a request that will trigger a retry
|
||||
req, err := http.NewRequest("GET", server.URL, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
resp, err := client.do(req)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Should succeed after retry
|
||||
assert.Equal(t, http.StatusOK, resp.StatusCode)
|
||||
assert.Equal(t, 2, attemptCount)
|
||||
|
||||
// Verify that the client used is the custom one by checking newRetryableHTTPClient
|
||||
retrievedRetryClient, err := client.newRetryableHTTPClient()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, customRetryClient, retrievedRetryClient, "should return the custom retryable client")
|
||||
})
|
||||
|
||||
t.Run("respects custom client's retry configuration over built-in defaults", func(t *testing.T) {
|
||||
attemptCount := 0
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
attemptCount++
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
// Create custom client with limited retries
|
||||
customRetryClient := retryablehttp.NewClient()
|
||||
customRetryClient.RetryMax = 1 // Only 1 retry (2 total attempts)
|
||||
customRetryClient.RetryWaitMax = 5 * time.Millisecond
|
||||
|
||||
opts := defaultHTTPClientOption()
|
||||
WithRetryableHTTPClint(customRetryClient)(&opts)
|
||||
|
||||
client := newCommonClient(testSystemInfo, opts)
|
||||
|
||||
req, err := http.NewRequest("GET", server.URL, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
resp, err := client.do(req)
|
||||
// When all retries are exhausted with a retryable error, the client gives up
|
||||
// and an error is returned
|
||||
if err != nil {
|
||||
// Expected: request failed after exhausting retries
|
||||
assert.Contains(t, err.Error(), "giving up after 2 attempt(s)")
|
||||
} else {
|
||||
// Or the final response is returned
|
||||
assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode)
|
||||
}
|
||||
// Should have tried 1 initial + 1 retry = 2 times total
|
||||
assert.Equal(t, 2, attemptCount)
|
||||
})
|
||||
}
|
||||
|
||||
+65
-10
@@ -52,22 +52,57 @@ type Client interface {
|
||||
Session() scaleset.RunnerScaleSetSession
|
||||
}
|
||||
|
||||
type Option func(*Listener)
|
||||
// MetricsRecorder defines the hook methods that will be called by the listener at
|
||||
// various points in the message handling process. This allows for custom
|
||||
// metrics to be collected without coupling the listener to a specific metrics
|
||||
// implementation. The methods in this interface will be called with relevant
|
||||
// information about the message handling process, such as the number of jobs
|
||||
// started/completed, the desired runner count, and any errors that occur.
|
||||
// Implementers can use this information to track the performance and behavior
|
||||
// of the listener and the scaleset service.
|
||||
type MetricsRecorder interface {
|
||||
RecordStatistics(statistics *scaleset.RunnerScaleSetStatistic)
|
||||
RecordJobStarted(msg *scaleset.JobStarted)
|
||||
RecordJobCompleted(msg *scaleset.JobCompleted)
|
||||
RecordDesiredRunners(count int)
|
||||
}
|
||||
|
||||
type discardMetricsRecorder struct{}
|
||||
|
||||
func (d *discardMetricsRecorder) RecordStatistics(statistics *scaleset.RunnerScaleSetStatistic) {}
|
||||
func (d *discardMetricsRecorder) RecordJobStarted(msg *scaleset.JobStarted) {}
|
||||
func (d *discardMetricsRecorder) RecordJobCompleted(msg *scaleset.JobCompleted) {}
|
||||
func (d *discardMetricsRecorder) RecordDesiredRunners(count int) {}
|
||||
|
||||
// Listener listens for messages from the scaleset service and handles them. It automatically handles session
|
||||
// creation/deletion/refreshing and message polling and acking.
|
||||
type Listener struct {
|
||||
// The main client responsible for communicating with the scaleset service
|
||||
client Client
|
||||
client Client
|
||||
metricsRecorder MetricsRecorder
|
||||
|
||||
// Configuration for the listener
|
||||
scaleSetID int
|
||||
maxRunners atomic.Uint32
|
||||
scaleSetID int
|
||||
maxRunners atomic.Uint32
|
||||
latestStatistics *scaleset.RunnerScaleSetStatistic
|
||||
|
||||
// configuration for the listener
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
type Option func(*Listener)
|
||||
|
||||
// WithMetricsRecorder sets the MetricsRecorder for the listener. If not set, a no-op recorder will be used.
|
||||
// If the nil is passed, the MetricsRecorder will not be updated and the existing one will be used (which is a no-op by default).
|
||||
func WithMetricsRecorder(recorder MetricsRecorder) Option {
|
||||
return func(l *Listener) {
|
||||
if recorder == nil {
|
||||
return
|
||||
}
|
||||
l.metricsRecorder = recorder
|
||||
}
|
||||
}
|
||||
|
||||
// SetMaxRunners sets the capacity of the scaleset. It is concurrently
|
||||
// safe to update the max runners during listener.Run.
|
||||
func (l *Listener) SetMaxRunners(count int) {
|
||||
@@ -85,12 +120,17 @@ func New(client Client, config Config, options ...Option) (*Listener, error) {
|
||||
}
|
||||
|
||||
listener := &Listener{
|
||||
client: client,
|
||||
scaleSetID: config.ScaleSetID,
|
||||
logger: config.Logger,
|
||||
client: client,
|
||||
metricsRecorder: &discardMetricsRecorder{},
|
||||
scaleSetID: config.ScaleSetID,
|
||||
logger: config.Logger,
|
||||
}
|
||||
listener.SetMaxRunners(config.MaxRunners)
|
||||
|
||||
for _, option := range options {
|
||||
option(listener)
|
||||
}
|
||||
|
||||
return listener, nil
|
||||
}
|
||||
|
||||
@@ -114,13 +154,17 @@ func (l *Listener) Run(ctx context.Context, scaler Scaler) error {
|
||||
return fmt.Errorf("session statistics is nil")
|
||||
}
|
||||
|
||||
l.handleStatistics(ctx, initialSession.Statistics)
|
||||
|
||||
l.logger.Info(
|
||||
"Handling initial session statistics",
|
||||
slog.Int("totalAssignedJobs", initialSession.Statistics.TotalAssignedJobs),
|
||||
)
|
||||
if _, err := scaler.HandleDesiredRunnerCount(ctx, initialSession.Statistics.TotalAssignedJobs); err != nil {
|
||||
desiredCount, err := scaler.HandleDesiredRunnerCount(ctx, initialSession.Statistics.TotalAssignedJobs)
|
||||
if err != nil {
|
||||
return fmt.Errorf("handling initial message failed: %w", err)
|
||||
}
|
||||
l.metricsRecorder.RecordDesiredRunners(desiredCount)
|
||||
}
|
||||
|
||||
var lastMessageID int
|
||||
@@ -142,7 +186,7 @@ func (l *Listener) Run(ctx context.Context, scaler Scaler) error {
|
||||
}
|
||||
|
||||
if msg == nil {
|
||||
_, err := scaler.HandleDesiredRunnerCount(ctx, 0)
|
||||
_, err := scaler.HandleDesiredRunnerCount(ctx, l.latestStatistics.TotalAssignedJobs)
|
||||
if err != nil {
|
||||
return fmt.Errorf("handling nil message failed: %w", err)
|
||||
}
|
||||
@@ -160,24 +204,35 @@ func (l *Listener) Run(ctx context.Context, scaler Scaler) error {
|
||||
}
|
||||
|
||||
func (l *Listener) handleMessage(ctx context.Context, handler Scaler, msg *scaleset.RunnerScaleSetMessage) error {
|
||||
l.handleStatistics(ctx, msg.Statistics)
|
||||
|
||||
if err := l.client.DeleteMessage(ctx, msg.MessageID); err != nil {
|
||||
return fmt.Errorf("failed to delete message: %w", err)
|
||||
}
|
||||
|
||||
for _, jobStarted := range msg.JobStartedMessages {
|
||||
l.metricsRecorder.RecordJobStarted(jobStarted)
|
||||
if err := handler.HandleJobStarted(ctx, jobStarted); err != nil {
|
||||
return fmt.Errorf("failed to handle job started: %w", err)
|
||||
}
|
||||
}
|
||||
for _, jobCompleted := range msg.JobCompletedMessages {
|
||||
l.metricsRecorder.RecordJobCompleted(jobCompleted)
|
||||
if err := handler.HandleJobCompleted(ctx, jobCompleted); err != nil {
|
||||
return fmt.Errorf("failed to handle job completed: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := handler.HandleDesiredRunnerCount(ctx, msg.Statistics.TotalAssignedJobs); err != nil {
|
||||
desiredCount, err := handler.HandleDesiredRunnerCount(ctx, msg.Statistics.TotalAssignedJobs)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to handle desired runner count: %w", err)
|
||||
}
|
||||
l.metricsRecorder.RecordDesiredRunners(desiredCount)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *Listener) handleStatistics(ctx context.Context, msg *scaleset.RunnerScaleSetStatistic) {
|
||||
l.latestStatistics = msg
|
||||
l.metricsRecorder.RecordStatistics(msg)
|
||||
}
|
||||
|
||||
+51
-54
@@ -75,39 +75,37 @@ func TestListener_Run(t *testing.T) {
|
||||
client := NewMockClient(t)
|
||||
|
||||
uuid := uuid.New()
|
||||
initialStatistics := &scaleset.RunnerScaleSetStatistic{
|
||||
TotalAssignedJobs: 2,
|
||||
}
|
||||
session := scaleset.RunnerScaleSetSession{
|
||||
SessionID: uuid,
|
||||
OwnerName: "example",
|
||||
RunnerScaleSet: &scaleset.RunnerScaleSet{},
|
||||
MessageQueueURL: "https://example.com",
|
||||
MessageQueueAccessToken: "1234567890",
|
||||
Statistics: &scaleset.RunnerScaleSetStatistic{},
|
||||
Statistics: initialStatistics,
|
||||
}
|
||||
|
||||
client.On("Session").Return(session).Once()
|
||||
|
||||
l, err := New(client, config)
|
||||
metricsRecorder := NewMockMetricsRecorder(t)
|
||||
metricsRecorder.On("RecordStatistics", initialStatistics).Once()
|
||||
metricsRecorder.On("RecordDesiredRunners", initialStatistics.TotalAssignedJobs).
|
||||
Return(initialStatistics.TotalAssignedJobs, nil).
|
||||
Run(func(mock.Arguments) { cancel() }).
|
||||
Once()
|
||||
|
||||
l, err := New(client, config, WithMetricsRecorder(metricsRecorder))
|
||||
require.Nil(t, err)
|
||||
|
||||
var called bool
|
||||
handler := NewMockScaler(t)
|
||||
handler.On(
|
||||
"HandleDesiredRunnerCount",
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
).
|
||||
Return(0, nil).
|
||||
Run(
|
||||
func(mock.Arguments) {
|
||||
called = true
|
||||
cancel()
|
||||
},
|
||||
).
|
||||
handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything).
|
||||
Return(initialStatistics.TotalAssignedJobs, nil).
|
||||
Once()
|
||||
|
||||
err = l.Run(ctx, handler)
|
||||
assert.ErrorIs(t, err, context.Canceled)
|
||||
assert.True(t, called)
|
||||
})
|
||||
|
||||
t.Run("cancel context after get message", func(t *testing.T) {
|
||||
@@ -120,61 +118,60 @@ func TestListener_Run(t *testing.T) {
|
||||
MaxRunners: 10,
|
||||
}
|
||||
|
||||
client := NewMockClient(t)
|
||||
uuid := uuid.New()
|
||||
initialStatistics := &scaleset.RunnerScaleSetStatistic{
|
||||
TotalAssignedJobs: 2,
|
||||
}
|
||||
|
||||
session := scaleset.RunnerScaleSetSession{
|
||||
SessionID: uuid,
|
||||
OwnerName: "example",
|
||||
RunnerScaleSet: &scaleset.RunnerScaleSet{},
|
||||
MessageQueueURL: "https://example.com",
|
||||
MessageQueueAccessToken: "1234567890",
|
||||
Statistics: &scaleset.RunnerScaleSetStatistic{},
|
||||
Statistics: initialStatistics,
|
||||
}
|
||||
|
||||
msg := &scaleset.RunnerScaleSetMessage{
|
||||
MessageID: 1,
|
||||
Statistics: &scaleset.RunnerScaleSetStatistic{},
|
||||
MessageID: 1,
|
||||
Statistics: &scaleset.RunnerScaleSetStatistic{
|
||||
TotalAssignedJobs: 3,
|
||||
},
|
||||
}
|
||||
client.On("Session").Return(session).Once()
|
||||
client.On(
|
||||
"GetMessage",
|
||||
ctx,
|
||||
mock.Anything,
|
||||
10,
|
||||
).
|
||||
Return(msg, nil).
|
||||
Run(
|
||||
func(mock.Arguments) {
|
||||
cancel()
|
||||
},
|
||||
).
|
||||
Once()
|
||||
|
||||
// Ensure delete message is called without cancel
|
||||
client.On(
|
||||
"DeleteMessage",
|
||||
context.WithoutCancel(ctx),
|
||||
mock.Anything,
|
||||
).Return(nil).Once()
|
||||
|
||||
metricsRecorder := NewMockMetricsRecorder(t)
|
||||
client := NewMockClient(t)
|
||||
handler := NewMockScaler(t)
|
||||
handler.On(
|
||||
"HandleDesiredRunnerCount",
|
||||
mock.Anything,
|
||||
0,
|
||||
).
|
||||
Return(0, nil).
|
||||
|
||||
client.On("Session").Return(session).Once()
|
||||
metricsRecorder.On("RecordStatistics", initialStatistics).Once()
|
||||
metricsRecorder.On("RecordDesiredRunners", initialStatistics.TotalAssignedJobs).
|
||||
Return(initialStatistics.TotalAssignedJobs, nil).
|
||||
Once()
|
||||
handler.On("HandleDesiredRunnerCount", mock.Anything, initialStatistics.TotalAssignedJobs).
|
||||
Return(initialStatistics.TotalAssignedJobs, nil).
|
||||
Once()
|
||||
|
||||
handler.On(
|
||||
"HandleDesiredRunnerCount",
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
).
|
||||
Return(0, nil).
|
||||
client.On("GetMessage", ctx, mock.Anything, 10).
|
||||
Return(msg, nil).
|
||||
Run(func(mock.Arguments) { cancel() }).
|
||||
Once()
|
||||
|
||||
l, err := New(client, config)
|
||||
metricsRecorder.On("RecordStatistics", msg.Statistics).Once()
|
||||
// Ensure delete message is called without cancel
|
||||
client.On("DeleteMessage", context.WithoutCancel(ctx), mock.Anything).
|
||||
Return(nil).
|
||||
Once()
|
||||
|
||||
metricsRecorder.On("RecordDesiredRunners", msg.Statistics.TotalAssignedJobs).
|
||||
Return(msg.Statistics.TotalAssignedJobs, nil).
|
||||
Once()
|
||||
|
||||
handler.On("HandleDesiredRunnerCount", mock.Anything, msg.Statistics.TotalAssignedJobs).
|
||||
Return(msg.Statistics.TotalAssignedJobs, nil).
|
||||
Once()
|
||||
|
||||
l, err := New(client, config, WithMetricsRecorder(metricsRecorder))
|
||||
require.Nil(t, err)
|
||||
|
||||
err = l.Run(ctx, handler)
|
||||
|
||||
@@ -213,6 +213,193 @@ func (_c *MockClient_Session_Call) RunAndReturn(run func() scaleset.RunnerScaleS
|
||||
return _c
|
||||
}
|
||||
|
||||
// NewMockMetricsRecorder creates a new instance of MockMetricsRecorder. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
// The first argument is typically a *testing.T value.
|
||||
func NewMockMetricsRecorder(t interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}) *MockMetricsRecorder {
|
||||
mock := &MockMetricsRecorder{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
// MockMetricsRecorder is an autogenerated mock type for the MetricsRecorder type
|
||||
type MockMetricsRecorder struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
type MockMetricsRecorder_Expecter struct {
|
||||
mock *mock.Mock
|
||||
}
|
||||
|
||||
func (_m *MockMetricsRecorder) EXPECT() *MockMetricsRecorder_Expecter {
|
||||
return &MockMetricsRecorder_Expecter{mock: &_m.Mock}
|
||||
}
|
||||
|
||||
// RecordDesiredRunners provides a mock function for the type MockMetricsRecorder
|
||||
func (_mock *MockMetricsRecorder) RecordDesiredRunners(count int) {
|
||||
_mock.Called(count)
|
||||
return
|
||||
}
|
||||
|
||||
// MockMetricsRecorder_RecordDesiredRunners_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordDesiredRunners'
|
||||
type MockMetricsRecorder_RecordDesiredRunners_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// RecordDesiredRunners is a helper method to define mock.On call
|
||||
// - count int
|
||||
func (_e *MockMetricsRecorder_Expecter) RecordDesiredRunners(count interface{}) *MockMetricsRecorder_RecordDesiredRunners_Call {
|
||||
return &MockMetricsRecorder_RecordDesiredRunners_Call{Call: _e.mock.On("RecordDesiredRunners", count)}
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordDesiredRunners_Call) Run(run func(count int)) *MockMetricsRecorder_RecordDesiredRunners_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
var arg0 int
|
||||
if args[0] != nil {
|
||||
arg0 = args[0].(int)
|
||||
}
|
||||
run(
|
||||
arg0,
|
||||
)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordDesiredRunners_Call) Return() *MockMetricsRecorder_RecordDesiredRunners_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordDesiredRunners_Call) RunAndReturn(run func(count int)) *MockMetricsRecorder_RecordDesiredRunners_Call {
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// RecordJobCompleted provides a mock function for the type MockMetricsRecorder
|
||||
func (_mock *MockMetricsRecorder) RecordJobCompleted(msg *scaleset.JobCompleted) {
|
||||
_mock.Called(msg)
|
||||
return
|
||||
}
|
||||
|
||||
// MockMetricsRecorder_RecordJobCompleted_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordJobCompleted'
|
||||
type MockMetricsRecorder_RecordJobCompleted_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// RecordJobCompleted is a helper method to define mock.On call
|
||||
// - msg *scaleset.JobCompleted
|
||||
func (_e *MockMetricsRecorder_Expecter) RecordJobCompleted(msg interface{}) *MockMetricsRecorder_RecordJobCompleted_Call {
|
||||
return &MockMetricsRecorder_RecordJobCompleted_Call{Call: _e.mock.On("RecordJobCompleted", msg)}
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordJobCompleted_Call) Run(run func(msg *scaleset.JobCompleted)) *MockMetricsRecorder_RecordJobCompleted_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
var arg0 *scaleset.JobCompleted
|
||||
if args[0] != nil {
|
||||
arg0 = args[0].(*scaleset.JobCompleted)
|
||||
}
|
||||
run(
|
||||
arg0,
|
||||
)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordJobCompleted_Call) Return() *MockMetricsRecorder_RecordJobCompleted_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordJobCompleted_Call) RunAndReturn(run func(msg *scaleset.JobCompleted)) *MockMetricsRecorder_RecordJobCompleted_Call {
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// RecordJobStarted provides a mock function for the type MockMetricsRecorder
|
||||
func (_mock *MockMetricsRecorder) RecordJobStarted(msg *scaleset.JobStarted) {
|
||||
_mock.Called(msg)
|
||||
return
|
||||
}
|
||||
|
||||
// MockMetricsRecorder_RecordJobStarted_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordJobStarted'
|
||||
type MockMetricsRecorder_RecordJobStarted_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// RecordJobStarted is a helper method to define mock.On call
|
||||
// - msg *scaleset.JobStarted
|
||||
func (_e *MockMetricsRecorder_Expecter) RecordJobStarted(msg interface{}) *MockMetricsRecorder_RecordJobStarted_Call {
|
||||
return &MockMetricsRecorder_RecordJobStarted_Call{Call: _e.mock.On("RecordJobStarted", msg)}
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordJobStarted_Call) Run(run func(msg *scaleset.JobStarted)) *MockMetricsRecorder_RecordJobStarted_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
var arg0 *scaleset.JobStarted
|
||||
if args[0] != nil {
|
||||
arg0 = args[0].(*scaleset.JobStarted)
|
||||
}
|
||||
run(
|
||||
arg0,
|
||||
)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordJobStarted_Call) Return() *MockMetricsRecorder_RecordJobStarted_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordJobStarted_Call) RunAndReturn(run func(msg *scaleset.JobStarted)) *MockMetricsRecorder_RecordJobStarted_Call {
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// RecordStatistics provides a mock function for the type MockMetricsRecorder
|
||||
func (_mock *MockMetricsRecorder) RecordStatistics(statistics *scaleset.RunnerScaleSetStatistic) {
|
||||
_mock.Called(statistics)
|
||||
return
|
||||
}
|
||||
|
||||
// MockMetricsRecorder_RecordStatistics_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordStatistics'
|
||||
type MockMetricsRecorder_RecordStatistics_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// RecordStatistics is a helper method to define mock.On call
|
||||
// - statistics *scaleset.RunnerScaleSetStatistic
|
||||
func (_e *MockMetricsRecorder_Expecter) RecordStatistics(statistics interface{}) *MockMetricsRecorder_RecordStatistics_Call {
|
||||
return &MockMetricsRecorder_RecordStatistics_Call{Call: _e.mock.On("RecordStatistics", statistics)}
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordStatistics_Call) Run(run func(statistics *scaleset.RunnerScaleSetStatistic)) *MockMetricsRecorder_RecordStatistics_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
var arg0 *scaleset.RunnerScaleSetStatistic
|
||||
if args[0] != nil {
|
||||
arg0 = args[0].(*scaleset.RunnerScaleSetStatistic)
|
||||
}
|
||||
run(
|
||||
arg0,
|
||||
)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordStatistics_Call) Return() *MockMetricsRecorder_RecordStatistics_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordStatistics_Call) RunAndReturn(run func(statistics *scaleset.RunnerScaleSetStatistic)) *MockMetricsRecorder_RecordStatistics_Call {
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// NewMockScaler creates a new instance of MockScaler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
// The first argument is typically a *testing.T value.
|
||||
func NewMockScaler(t interface {
|
||||
|
||||
@@ -235,6 +235,9 @@ func (c *MessageSessionClient) Session() RunnerScaleSetSession {
|
||||
}
|
||||
|
||||
func (c *MessageSessionClient) doSessionRequest(ctx context.Context, method, path string, requestData io.Reader, expectedResponseStatusCode int, responseUnmarshalTarget any) error {
|
||||
c.innerClient.mu.Lock()
|
||||
defer c.innerClient.mu.Unlock()
|
||||
|
||||
req, err := c.innerClient.newActionsServiceRequest(ctx, method, path, requestData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create new actions service request: %w", err)
|
||||
|
||||
Reference in New Issue
Block a user