Compare commits
44 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 33cbb7a1d9 | |||
| 9cd2ceb86b | |||
| aeda0a5af4 | |||
| c841c96f1c | |||
| 145b7e382f | |||
| 0015641f99 | |||
| 1208e1216e | |||
| ebe67dba45 | |||
| 5cb859334b | |||
| 3fe477d93e | |||
| f3ab19d2b8 | |||
| 40342b1e2a | |||
| d9d8d097da | |||
| e346584c2d | |||
| be0376b5b5 | |||
| 7e0eae99a6 | |||
| 01adae723f | |||
| e37307f46f | |||
| 23bfd4848d | |||
| a2a8af97b6 | |||
| 60ce378d14 | |||
| dde6b4631c | |||
| fe39658434 | |||
| cec89c5b7b | |||
| 7442885c99 | |||
| 6d1c317b90 | |||
| 45800be002 | |||
| 16a9c915cb | |||
| c2f0a53402 | |||
| 5f370484a7 | |||
| 8104f571eb | |||
| 4cc97e220a | |||
| 8155f87a58 | |||
| 83fb2e2837 | |||
| 1f22d2441c | |||
| e9e5ca3282 | |||
| 51aa04a4b2 | |||
| 39ffa2e560 | |||
| b1dcbc44e6 | |||
| 287d3410da | |||
| c549549283 | |||
| a2aa9f9ea9 | |||
| dd4c9f2481 | |||
| 8b43e9e5c9 |
@@ -1,18 +0,0 @@
|
||||
version: 2
|
||||
updates:
|
||||
- package-ecosystem: "gomod" # See documentation for possible values
|
||||
directory: "/" # Location of package manifests
|
||||
schedule:
|
||||
interval: "weekly"
|
||||
groups:
|
||||
gomod:
|
||||
patterns:
|
||||
- "*"
|
||||
- package-ecosystem: github-actions
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: "weekly"
|
||||
groups:
|
||||
actions:
|
||||
patterns:
|
||||
- "*"
|
||||
@@ -19,7 +19,7 @@ jobs:
|
||||
fmt:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- uses: actions/checkout@v5
|
||||
- uses: actions/setup-go@v6
|
||||
with:
|
||||
go-version-file: "go.mod"
|
||||
@@ -32,7 +32,7 @@ jobs:
|
||||
mocks:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- uses: actions/checkout@v5
|
||||
- uses: actions/setup-go@v6
|
||||
with:
|
||||
go-version-file: "go.mod"
|
||||
@@ -45,13 +45,13 @@ jobs:
|
||||
lint:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- uses: actions/checkout@v5
|
||||
- uses: actions/setup-go@v6
|
||||
with:
|
||||
go-version-file: "go.mod"
|
||||
cache: false
|
||||
- name: golangci-lint
|
||||
uses: golangci/golangci-lint-action@1e7e51e771db61008b38414a730f564565cf7c20
|
||||
uses: golangci/golangci-lint-action@4afd733a84b1f43292c63897423277bb7f4313a9
|
||||
with:
|
||||
only-new-issues: true
|
||||
version: v2.5.0
|
||||
@@ -59,10 +59,10 @@ jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- uses: actions/checkout@v5
|
||||
- uses: actions/setup-go@v6
|
||||
with:
|
||||
go-version-file: "go.mod"
|
||||
cache: true
|
||||
- name: Run tests
|
||||
run: go test ./... -race
|
||||
run: go test ./...
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
MIT License
|
||||
|
||||
Copyright GitHub, Inc.
|
||||
Copyright (c) 2025 GitHub
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# GitHub Actions Runner Scale Set Client (Public Preview)
|
||||
# GitHub Actions Runner Scale Set Client (Private Preview)
|
||||
|
||||
> Status: **Public Preview** – While the API is stable, interfaces and examples in this repository may change.
|
||||
> Status: **Private Preview** – While the API is stable, interfaces and examples in this repository may change.
|
||||
|
||||
This repository provides a standalone Go client for the GitHub Actions **Runner Scale Set** APIs. It is extracted from the `actions-runner-controller` project so that platform teams, integrators, and infrastructure providers can build **their own custom autoscaling solutions** for GitHub Actions runners.
|
||||
|
||||
@@ -12,7 +12,7 @@ You do *not* need to adopt the full controller (and Kubernetes) to take advantag
|
||||
|
||||
A runner scale set is a group of self-hosted runners that autoscales based on workflow demand. Here's how it works:
|
||||
|
||||
1. **Registration**: You create a scale set with a name, which also serves as the label workflows use to target it (e.g., `runs-on: my-scale-set`). Multiple labels can be assigned per scale set. Like regular self-hosted runners, scale sets can be registered at the repository, organization, or enterprise level.
|
||||
1. **Registration**: You create a scale set with a name, which also serves as the label workflows use to target it (e.g., `runs-on: my-scale-set`). Like regular self-hosted runners, scale sets can be registered at the repository, organization, or enterprise level.
|
||||
2. **Polling**: Your scale set client continuously polls the API, reporting its maximum capacity (how many runners it can produce).
|
||||
3. **Job matching**: GitHub matches jobs to your scale set based on the label and runner group policies, just like regular self-hosted runners.
|
||||
4. **Scaling signal**: The API responds with how many runners your scale set needs online (`statistics.TotalAssignedJobs`).
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"maps"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@@ -126,9 +127,9 @@ type ProxyFunc func(req *http.Request) (*url.URL, error)
|
||||
type SystemInfo struct {
|
||||
// System is the name of the scale set implementation
|
||||
System string `json:"system"`
|
||||
// Version is the version of the client
|
||||
// Version is the version of the controller
|
||||
Version string `json:"version"`
|
||||
// CommitSHA is the git commit SHA of the client
|
||||
// CommitSHA is the git commit SHA of the controller
|
||||
CommitSHA string `json:"commit_sha"`
|
||||
// ScaleSetID is the ID of the scale set
|
||||
ScaleSetID int `json:"scale_set_id"`
|
||||
@@ -222,7 +223,6 @@ type userAgent struct {
|
||||
SystemInfo
|
||||
BuildVersion string `json:"build_version"`
|
||||
BuildCommitSHA string `json:"build_commit_sha"`
|
||||
Kind string `json:"kind"`
|
||||
}
|
||||
|
||||
func (c *Client) newGitHubAPIRequest(ctx context.Context, method, path string, body io.Reader) (*http.Request, error) {
|
||||
@@ -510,14 +510,6 @@ func parseRunnerScaleSetMessageResponse(respBody io.Reader) (*RunnerScaleSetMess
|
||||
}
|
||||
|
||||
switch messageType.MessageType {
|
||||
case MessageTypeJobAvailable:
|
||||
var jobAvailable JobAvailable
|
||||
if err := json.Unmarshal(msg, &jobAvailable); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode job available: %w", err)
|
||||
}
|
||||
|
||||
message.JobAvailableMessages = append(message.JobAvailableMessages, &jobAvailable)
|
||||
|
||||
case MessageTypeJobAssigned:
|
||||
var jobAssigned JobAssigned
|
||||
if err := json.Unmarshal(msg, &jobAssigned); err != nil {
|
||||
@@ -554,6 +546,7 @@ func parseRunnerScaleSetMessageResponse(respBody io.Reader) (*RunnerScaleSetMess
|
||||
// It exposes client options that could be overwritten, providing ability to specify different retry policies or TLS settings, proxy, etc.
|
||||
func (c *Client) MessageSessionClient(ctx context.Context, runnerScaleSetID int, owner string, options ...HTTPOption) (*MessageSessionClient, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// Copy original options
|
||||
httpClientOption := c.httpClientOption
|
||||
@@ -574,8 +567,6 @@ func (c *Client) MessageSessionClient(ctx context.Context, runnerScaleSetID int,
|
||||
scaleSetID: runnerScaleSetID,
|
||||
session: nil,
|
||||
}
|
||||
// Unlock the client to allow createMessageSession to call public methods that require locking
|
||||
c.mu.Unlock()
|
||||
|
||||
if err := client.createMessageSession(ctx); err != nil {
|
||||
return nil, fmt.Errorf("failed to create message session: %w", err)
|
||||
@@ -845,6 +836,8 @@ func (c *Client) getActionsServiceAdminConnection(ctx context.Context, rt *regis
|
||||
return nil, fmt.Errorf("failed to get actions service admin connection: %w", err)
|
||||
}
|
||||
|
||||
slog.Info("got connection", *adminConnection.ActionsServiceURL, c.userAgent)
|
||||
|
||||
return adminConnection, nil
|
||||
}
|
||||
|
||||
|
||||
+11
-52
@@ -75,20 +75,12 @@ func sendRequest(c *http.Client, req *http.Request) (*http.Response, error) {
|
||||
}
|
||||
|
||||
type httpClientOption struct {
|
||||
logger *slog.Logger
|
||||
|
||||
// Options for built-in retryable HTTP client.
|
||||
// Ignored if a custom retryable HTTP client is provided via WithRetryableHTTPClint.
|
||||
retryMax int
|
||||
retryWaitMax time.Duration
|
||||
|
||||
// fields added to the transport if specified
|
||||
logger *slog.Logger
|
||||
retryMax int
|
||||
retryWaitMax time.Duration
|
||||
rootCAs *x509.CertPool
|
||||
tlsInsecureSkipVerify bool
|
||||
proxyFunc ProxyFunc
|
||||
timeout time.Duration
|
||||
|
||||
retryableHTTPClient *retryablehttp.Client
|
||||
}
|
||||
|
||||
func (o *httpClientOption) defaults() {
|
||||
@@ -101,26 +93,14 @@ func (o *httpClientOption) defaults() {
|
||||
if o.retryWaitMax == 0 {
|
||||
o.retryWaitMax = 30 * time.Second
|
||||
}
|
||||
if o.timeout == 0 {
|
||||
o.timeout = 5 * time.Minute
|
||||
}
|
||||
}
|
||||
|
||||
func (o *httpClientOption) newRetryableHTTPClient() (*retryablehttp.Client, error) {
|
||||
var retryClient *retryablehttp.Client
|
||||
if o.retryableHTTPClient != nil {
|
||||
retryClient = o.retryableHTTPClient
|
||||
} else {
|
||||
retryClient = retryablehttp.NewClient()
|
||||
retryClient.RetryMax = o.retryMax
|
||||
retryClient.RetryWaitMax = o.retryWaitMax
|
||||
}
|
||||
|
||||
if retryClient.HTTPClient.Timeout == 0 {
|
||||
retryClient.HTTPClient.Timeout = o.timeout
|
||||
}
|
||||
|
||||
retryClient := retryablehttp.NewClient()
|
||||
retryClient.Logger = o.logger
|
||||
retryClient.RetryMax = o.retryMax
|
||||
retryClient.RetryWaitMax = o.retryWaitMax
|
||||
retryClient.HTTPClient.Timeout = 5 * time.Minute // timeout must be > 1m to accomodate long polling
|
||||
|
||||
transport, ok := retryClient.HTTPClient.Transport.(*http.Transport)
|
||||
if !ok {
|
||||
@@ -140,9 +120,7 @@ func (o *httpClientOption) newRetryableHTTPClient() (*retryablehttp.Client, erro
|
||||
transport.TLSClientConfig.InsecureSkipVerify = true
|
||||
}
|
||||
|
||||
if o.proxyFunc != nil {
|
||||
transport.Proxy = o.proxyFunc
|
||||
}
|
||||
transport.Proxy = o.proxyFunc
|
||||
|
||||
retryClient.HTTPClient.Transport = transport
|
||||
|
||||
@@ -159,30 +137,18 @@ func (c *commonClient) setUserAgent() {
|
||||
SystemInfo: c.systemInfo,
|
||||
BuildVersion: buildInfo.version,
|
||||
BuildCommitSHA: buildInfo.commitSHA,
|
||||
Kind: "scaleset",
|
||||
})
|
||||
c.userAgent = string(b)
|
||||
slog.Info("user agent", "useragent", c.userAgent)
|
||||
}
|
||||
|
||||
// HTTPOption defines a functional option for configuring the Client.
|
||||
type HTTPOption func(*httpClientOption)
|
||||
|
||||
// WithRetryableHTTPClint allows users to provide a custom retryable HTTP client.
|
||||
// If not set, a default client will be used with the specified retry and timeout settings.
|
||||
func WithRetryableHTTPClint(client *retryablehttp.Client) HTTPOption {
|
||||
return func(c *httpClientOption) {
|
||||
c.retryableHTTPClient = client
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger sets a custom logger for the Client.
|
||||
// If nil is passed, a discard logger will be used.
|
||||
func WithLogger(logger *slog.Logger) HTTPOption {
|
||||
func WithLogger(logger slog.Logger) HTTPOption {
|
||||
return func(c *httpClientOption) {
|
||||
if logger == nil {
|
||||
logger = slog.New(slog.DiscardHandler)
|
||||
}
|
||||
c.logger = logger
|
||||
c.logger = &logger
|
||||
}
|
||||
}
|
||||
|
||||
@@ -220,10 +186,3 @@ func WithProxy(proxyFunc ProxyFunc) HTTPOption {
|
||||
c.proxyFunc = proxyFunc
|
||||
}
|
||||
}
|
||||
|
||||
// WithTimeout sets a timeout for the Client.
|
||||
func WithTimeout(duration time.Duration) HTTPOption {
|
||||
return func(c *httpClientOption) {
|
||||
c.timeout = duration
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,15 +3,12 @@ package scaleset
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/actions/scaleset/internal/testserver"
|
||||
"github.com/hashicorp/go-retryablehttp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/net/http/httpproxy"
|
||||
@@ -127,7 +124,6 @@ func TestUserAgent(t *testing.T) {
|
||||
SystemInfo: testSystemInfo,
|
||||
BuildCommitSHA: sha,
|
||||
BuildVersion: version,
|
||||
Kind: "scaleset",
|
||||
}
|
||||
b, err := json.Marshal(wantInfo)
|
||||
require.NoError(t, err, "failed to marshal expected user agent")
|
||||
@@ -148,7 +144,6 @@ func TestUserAgent(t *testing.T) {
|
||||
SystemInfo: userAgentInfo,
|
||||
BuildCommitSHA: sha,
|
||||
BuildVersion: version,
|
||||
Kind: "scaleset",
|
||||
}
|
||||
b, err = json.Marshal(wantInfo)
|
||||
require.NoError(t, err, "failed to marshal expected user agent after SetSystemInfo")
|
||||
@@ -156,141 +151,3 @@ func TestUserAgent(t *testing.T) {
|
||||
|
||||
assert.Equal(t, want, got)
|
||||
}
|
||||
|
||||
func TestWithLogger(t *testing.T) {
|
||||
newJSONHandler := func() slog.Handler {
|
||||
return slog.NewJSONHandler(
|
||||
io.Discard,
|
||||
&slog.HandlerOptions{
|
||||
AddSource: true,
|
||||
Level: slog.LevelError,
|
||||
},
|
||||
)
|
||||
}
|
||||
t.Run("WithLogger(nil) sets a discard logger on raw httpClientOption", func(t *testing.T) {
|
||||
opts := httpClientOption{}
|
||||
WithLogger(nil)(&opts)
|
||||
require.NotNil(t, opts.logger, "WithLogger(nil) should set a discard logger, not leave it nil")
|
||||
assert.Equal(t, slog.DiscardHandler, opts.logger.Handler(), "WithLogger(nil) should set a discard logger handler")
|
||||
})
|
||||
|
||||
t.Run("WithLogger(customLogger) assigns the provided logger", func(t *testing.T) {
|
||||
handler := newJSONHandler()
|
||||
customLogger := slog.New(handler)
|
||||
opts := httpClientOption{}
|
||||
WithLogger(customLogger)(&opts)
|
||||
require.Equal(t, customLogger, opts.logger, "WithLogger should assign the provided logger")
|
||||
assert.Equal(t, handler, opts.logger.Handler(), "WithLogger should set the provided logger handler")
|
||||
})
|
||||
|
||||
t.Run("WithLogger(nil) propagates discard logger to retryable HTTP client", func(t *testing.T) {
|
||||
opts := httpClientOption{}
|
||||
WithLogger(nil)(&opts)
|
||||
client, err := opts.newRetryableHTTPClient()
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, client.Logger, "retryable client should have logger set from WithLogger(nil)")
|
||||
|
||||
logger, ok := client.Logger.(*slog.Logger)
|
||||
require.True(t, ok, "retryable client logger should be a *slog.Logger")
|
||||
assert.Same(t, opts.logger, logger, "retryable client logger should be the same logger set by WithLogger(nil)")
|
||||
assert.Equal(t, slog.DiscardHandler, logger.Handler(), "retryable client logger should be a discard logger from WithLogger(nil)")
|
||||
})
|
||||
|
||||
t.Run("WithLogger(customLogger) propagates custom logger to retryable HTTP client", func(t *testing.T) {
|
||||
handler := newJSONHandler()
|
||||
customLogger := slog.New(handler)
|
||||
opts := httpClientOption{}
|
||||
WithLogger(customLogger)(&opts)
|
||||
client, err := opts.newRetryableHTTPClient()
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, client.Logger, "retryable client should have logger set")
|
||||
logger, ok := client.Logger.(*slog.Logger)
|
||||
require.True(t, ok, "retryable client logger should be a *slog.Logger")
|
||||
assert.Equal(t, handler, logger.Handler(), "retryable client logger should be the custom logger from WithLogger")
|
||||
})
|
||||
}
|
||||
|
||||
// TestWithRetryableHTTPClient verifies that a custom retryable HTTP client
|
||||
// provided via WithRetryableHTTPClient is actually used instead of the built-in one
|
||||
func TestWithRetryableHTTPClient(t *testing.T) {
|
||||
t.Run("uses custom retryable client instead of built-in", func(t *testing.T) {
|
||||
attemptCount := 0
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
attemptCount++
|
||||
if attemptCount == 1 {
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte(`{"result": "success"}`))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
// Create a custom retryable HTTP client with specific retry configuration
|
||||
customRetryClient := retryablehttp.NewClient()
|
||||
customRetryClient.RetryMax = 3
|
||||
customRetryClient.RetryWaitMax = 10 * time.Millisecond
|
||||
|
||||
// Create options with the custom retryable client
|
||||
opts := defaultHTTPClientOption()
|
||||
WithRetryableHTTPClint(customRetryClient)(&opts)
|
||||
|
||||
// Verify that the custom client is set in options
|
||||
assert.NotNil(t, opts.retryableHTTPClient)
|
||||
assert.Equal(t, customRetryClient, opts.retryableHTTPClient)
|
||||
|
||||
// Create the common client with custom retryable client
|
||||
client := newCommonClient(testSystemInfo, opts)
|
||||
|
||||
// Make a request that will trigger a retry
|
||||
req, err := http.NewRequest("GET", server.URL, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
resp, err := client.do(req)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Should succeed after retry
|
||||
assert.Equal(t, http.StatusOK, resp.StatusCode)
|
||||
assert.Equal(t, 2, attemptCount)
|
||||
|
||||
// Verify that the client used is the custom one by checking newRetryableHTTPClient
|
||||
retrievedRetryClient, err := client.newRetryableHTTPClient()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, customRetryClient, retrievedRetryClient, "should return the custom retryable client")
|
||||
})
|
||||
|
||||
t.Run("respects custom client's retry configuration over built-in defaults", func(t *testing.T) {
|
||||
attemptCount := 0
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
attemptCount++
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
// Create custom client with limited retries
|
||||
customRetryClient := retryablehttp.NewClient()
|
||||
customRetryClient.RetryMax = 1 // Only 1 retry (2 total attempts)
|
||||
customRetryClient.RetryWaitMax = 5 * time.Millisecond
|
||||
|
||||
opts := defaultHTTPClientOption()
|
||||
WithRetryableHTTPClint(customRetryClient)(&opts)
|
||||
|
||||
client := newCommonClient(testSystemInfo, opts)
|
||||
|
||||
req, err := http.NewRequest("GET", server.URL, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
resp, err := client.do(req)
|
||||
// When all retries are exhausted with a retryable error, the client gives up
|
||||
// and an error is returned
|
||||
if err != nil {
|
||||
// Expected: request failed after exhausting retries
|
||||
assert.Contains(t, err.Error(), "giving up after 2 attempt(s)")
|
||||
} else {
|
||||
// Or the final response is returned
|
||||
assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode)
|
||||
}
|
||||
// Should have tried 1 initial + 1 retry = 2 times total
|
||||
assert.Equal(t, 2, attemptCount)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -8,15 +8,14 @@ require (
|
||||
github.com/google/go-github/v79 v79.0.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/hashicorp/go-retryablehttp v0.7.8
|
||||
github.com/spf13/cobra v1.10.2
|
||||
github.com/spf13/cobra v1.10.1
|
||||
github.com/stretchr/testify v1.11.1
|
||||
golang.org/x/net v0.52.0
|
||||
golang.org/x/net v0.47.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/Microsoft/go-winio v0.6.2 // indirect
|
||||
github.com/brunoga/deep v1.2.4 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/containerd/errdefs v1.0.0 // indirect
|
||||
github.com/containerd/errdefs/pkg v0.3.0 // indirect
|
||||
github.com/containerd/log v0.1.0 // indirect
|
||||
@@ -27,7 +26,7 @@ require (
|
||||
github.com/fatih/structs v1.1.0 // indirect
|
||||
github.com/felixge/httpsnoop v1.0.4 // indirect
|
||||
github.com/fsnotify/fsnotify v1.8.0 // indirect
|
||||
github.com/go-logr/logr v1.4.3 // indirect
|
||||
github.com/go-logr/logr v1.4.2 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
|
||||
github.com/google/go-querystring v1.1.0 // indirect
|
||||
@@ -63,25 +62,23 @@ require (
|
||||
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
|
||||
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
|
||||
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
|
||||
go.opentelemetry.io/otel v1.39.0 // indirect
|
||||
go.opentelemetry.io/otel v1.35.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.39.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.39.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk/metric v1.39.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.39.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.35.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.35.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
|
||||
golang.org/x/mod v0.33.0 // indirect
|
||||
golang.org/x/sync v0.20.0 // indirect
|
||||
golang.org/x/sys v0.42.0 // indirect
|
||||
golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4 // indirect
|
||||
golang.org/x/term v0.41.0 // indirect
|
||||
golang.org/x/text v0.35.0 // indirect
|
||||
golang.org/x/mod v0.30.0 // indirect
|
||||
golang.org/x/sync v0.18.0 // indirect
|
||||
golang.org/x/sys v0.38.0 // indirect
|
||||
golang.org/x/telemetry v0.0.0-20251111182119-bc8e575c7b54 // indirect
|
||||
golang.org/x/term v0.37.0 // indirect
|
||||
golang.org/x/text v0.31.0 // indirect
|
||||
golang.org/x/time v0.14.0 // indirect
|
||||
golang.org/x/tools v0.42.0 // indirect
|
||||
google.golang.org/grpc v1.79.3 // indirect
|
||||
google.golang.org/protobuf v1.36.10 // indirect
|
||||
golang.org/x/tools v0.39.0 // indirect
|
||||
google.golang.org/grpc v1.72.2 // indirect
|
||||
google.golang.org/protobuf v1.36.9 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
gotest.tools/v3 v3.5.2 // indirect
|
||||
)
|
||||
|
||||
@@ -6,8 +6,6 @@ github.com/brunoga/deep v1.2.4 h1:Aj9E9oUbE+ccbyh35VC/NHlzzjfIVU69BXu2mt2LmL8=
|
||||
github.com/brunoga/deep v1.2.4/go.mod h1:GDV6dnXqn80ezsLSZ5Wlv1PdKAWAO4L5PnKYtv2dgaI=
|
||||
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
|
||||
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI=
|
||||
github.com/containerd/errdefs v1.0.0/go.mod h1:+YBYIdtsnF4Iw6nWZhJcqGSg/dwvV7tyJ/kCkyJ2k+M=
|
||||
github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151Xdx3ZPPE=
|
||||
@@ -36,8 +34,8 @@ github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw
|
||||
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
|
||||
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
|
||||
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
|
||||
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
|
||||
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||
github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs=
|
||||
@@ -120,16 +118,16 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH
|
||||
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
||||
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
|
||||
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
|
||||
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
|
||||
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
|
||||
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
||||
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
||||
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
|
||||
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
|
||||
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
|
||||
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
|
||||
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||
github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU=
|
||||
github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4=
|
||||
github.com/spf13/cobra v1.10.1 h1:lJeBwCfmrnXthfAupyUTzJ/J4Nc1RsHC/mSRU2dll/s=
|
||||
github.com/spf13/cobra v1.10.1/go.mod h1:7SmJGaTHFVBY0jW4NXGluQoLvhqFQM+6XSKD+P4XaB0=
|
||||
github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||
github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk=
|
||||
github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||
@@ -147,59 +145,58 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHo
|
||||
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
|
||||
github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=
|
||||
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
|
||||
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 h1:sbiXRNDSWJOTobXh5HyQKjq6wUC5tNybqjIqDpAY4CU=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0/go.mod h1:69uWxva0WgAA/4bu2Yy70SLDBwZXuQ6PbBpbsa5iZrQ=
|
||||
go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48=
|
||||
go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8=
|
||||
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
|
||||
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 h1:1fTNlAIJZGWLP5FVu0fikVry1IsiUnXjf7QFvoNN3Xw=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0/go.mod h1:zjPK58DtkqQFn+YUMbx0M2XV3QgKU0gS9LeGohREyK4=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 h1:xJ2qHD0C1BeYVTLLR9sX12+Qb95kfeD/byKj6Ky1pXg=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0/go.mod h1:u5BF1xyjstDowA1R5QAO9JHzqK+ublenEW/dyqTjBVk=
|
||||
go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0=
|
||||
go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs=
|
||||
go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18=
|
||||
go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew=
|
||||
go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI=
|
||||
go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA=
|
||||
go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
|
||||
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
|
||||
go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY=
|
||||
go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w=
|
||||
go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs=
|
||||
go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
|
||||
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
|
||||
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
|
||||
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
|
||||
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8=
|
||||
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
|
||||
golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8=
|
||||
golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w=
|
||||
golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0=
|
||||
golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
|
||||
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
|
||||
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
|
||||
golang.org/x/mod v0.30.0 h1:fDEXFVZ/fmCKProc/yAXXUijritrDzahmwwefnjoPFk=
|
||||
golang.org/x/mod v0.30.0/go.mod h1:lAsf5O2EvJeSFMiBxXDki7sCgAxEUcZHXoXMKT4GJKc=
|
||||
golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY=
|
||||
golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU=
|
||||
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
|
||||
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
|
||||
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
|
||||
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||
golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4 h1:bTLqdHv7xrGlFbvf5/TXNxy/iUwwdkjhqQTJDjW7aj0=
|
||||
golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4/go.mod h1:g5NllXBEermZrmR51cJDQxmJUHUOfRAaNyWBM+R+548=
|
||||
golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU=
|
||||
golang.org/x/term v0.41.0/go.mod h1:3pfBgksrReYfZ5lvYM0kSO0LIkAl4Yl2bXOkKP7Ec2A=
|
||||
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
|
||||
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
|
||||
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
|
||||
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||
golang.org/x/telemetry v0.0.0-20251111182119-bc8e575c7b54 h1:E2/AqCUMZGgd73TQkxUMcMla25GB9i/5HOdLr+uH7Vo=
|
||||
golang.org/x/telemetry v0.0.0-20251111182119-bc8e575c7b54/go.mod h1:hKdjCMrbv9skySur+Nek8Hd0uJ0GuxJIoIX2payrIdQ=
|
||||
golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU=
|
||||
golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254=
|
||||
golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM=
|
||||
golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM=
|
||||
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
|
||||
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
|
||||
golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k=
|
||||
golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0=
|
||||
golang.org/x/tools v0.39.0 h1:ik4ho21kwuQln40uelmciQPp9SipgNDdrafrYA4TmQQ=
|
||||
golang.org/x/tools v0.39.0/go.mod h1:JnefbkDPyD8UU2kI5fuf8ZX4/yUeh9W877ZeBONxUqQ=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 h1:fCvbg86sFXwdrl5LgVcTEvNC+2txB5mgROGmRL5mrls=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:+rXWjjaukWZun3mLfjmVnQi18E1AsFbDN9QdJ5YXLto=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 h1:gRkg/vSppuSQoDjxyiGfN4Upv/h/DQmIR10ZU8dh4Ww=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
|
||||
google.golang.org/grpc v1.79.3 h1:sybAEdRIEtvcD68Gx7dmnwjZKlyfuc61Dyo9pGXXkKE=
|
||||
google.golang.org/grpc v1.79.3/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ=
|
||||
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
|
||||
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a h1:nwKuGPlUAt+aR+pcrkfFRrTU1BVrSmYyYMxYbUIVHr0=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a/go.mod h1:3kWAYMk1I75K4vykHtKt2ycnOgpA6974V7bREqbsenU=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ=
|
||||
google.golang.org/grpc v1.72.2 h1:TdbGzwb82ty4OusHWepvFWGLgIbNo1/SUynEN0ssqv8=
|
||||
google.golang.org/grpc v1.72.2/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
|
||||
google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw=
|
||||
google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
|
||||
+10
-89
@@ -49,61 +49,25 @@ func (c *Config) Validate() error {
|
||||
type Client interface {
|
||||
GetMessage(ctx context.Context, lastMessageID, maxCapacity int) (*scaleset.RunnerScaleSetMessage, error)
|
||||
DeleteMessage(ctx context.Context, messageID int) error
|
||||
AcquireJobs(ctx context.Context, requestIDs []int64) ([]int64, error)
|
||||
Session() scaleset.RunnerScaleSetSession
|
||||
}
|
||||
|
||||
// MetricsRecorder defines the hook methods that will be called by the listener at
|
||||
// various points in the message handling process. This allows for custom
|
||||
// metrics to be collected without coupling the listener to a specific metrics
|
||||
// implementation. The methods in this interface will be called with relevant
|
||||
// information about the message handling process, such as the number of jobs
|
||||
// started/completed, the desired runner count, and any errors that occur.
|
||||
// Implementers can use this information to track the performance and behavior
|
||||
// of the listener and the scaleset service.
|
||||
type MetricsRecorder interface {
|
||||
RecordStatistics(statistics *scaleset.RunnerScaleSetStatistic)
|
||||
RecordJobStarted(msg *scaleset.JobStarted)
|
||||
RecordJobCompleted(msg *scaleset.JobCompleted)
|
||||
RecordDesiredRunners(count int)
|
||||
}
|
||||
|
||||
type discardMetricsRecorder struct{}
|
||||
|
||||
func (d *discardMetricsRecorder) RecordStatistics(statistics *scaleset.RunnerScaleSetStatistic) {}
|
||||
func (d *discardMetricsRecorder) RecordJobStarted(msg *scaleset.JobStarted) {}
|
||||
func (d *discardMetricsRecorder) RecordJobCompleted(msg *scaleset.JobCompleted) {}
|
||||
func (d *discardMetricsRecorder) RecordDesiredRunners(count int) {}
|
||||
type Option func(*Listener)
|
||||
|
||||
// Listener listens for messages from the scaleset service and handles them. It automatically handles session
|
||||
// creation/deletion/refreshing and message polling and acking.
|
||||
type Listener struct {
|
||||
// The main client responsible for communicating with the scaleset service
|
||||
client Client
|
||||
metricsRecorder MetricsRecorder
|
||||
client Client
|
||||
|
||||
// Configuration for the listener
|
||||
scaleSetID int
|
||||
maxRunners atomic.Uint32
|
||||
latestStatistics *scaleset.RunnerScaleSetStatistic
|
||||
scaleSetID int
|
||||
maxRunners atomic.Uint32
|
||||
|
||||
// configuration for the listener
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
type Option func(*Listener)
|
||||
|
||||
// WithMetricsRecorder sets the MetricsRecorder for the listener. If not set, a no-op recorder will be used.
|
||||
// If the nil is passed, the MetricsRecorder will not be updated and the existing one will be used (which is a no-op by default).
|
||||
func WithMetricsRecorder(recorder MetricsRecorder) Option {
|
||||
return func(l *Listener) {
|
||||
if recorder == nil {
|
||||
return
|
||||
}
|
||||
l.metricsRecorder = recorder
|
||||
}
|
||||
}
|
||||
|
||||
// SetMaxRunners sets the capacity of the scaleset. It is concurrently
|
||||
// safe to update the max runners during listener.Run.
|
||||
func (l *Listener) SetMaxRunners(count int) {
|
||||
@@ -121,17 +85,12 @@ func New(client Client, config Config, options ...Option) (*Listener, error) {
|
||||
}
|
||||
|
||||
listener := &Listener{
|
||||
client: client,
|
||||
metricsRecorder: &discardMetricsRecorder{},
|
||||
scaleSetID: config.ScaleSetID,
|
||||
logger: config.Logger,
|
||||
client: client,
|
||||
scaleSetID: config.ScaleSetID,
|
||||
logger: config.Logger,
|
||||
}
|
||||
listener.SetMaxRunners(config.MaxRunners)
|
||||
|
||||
for _, option := range options {
|
||||
option(listener)
|
||||
}
|
||||
|
||||
return listener, nil
|
||||
}
|
||||
|
||||
@@ -155,17 +114,13 @@ func (l *Listener) Run(ctx context.Context, scaler Scaler) error {
|
||||
return fmt.Errorf("session statistics is nil")
|
||||
}
|
||||
|
||||
l.handleStatistics(ctx, initialSession.Statistics)
|
||||
|
||||
l.logger.Info(
|
||||
"Handling initial session statistics",
|
||||
slog.Int("totalAssignedJobs", initialSession.Statistics.TotalAssignedJobs),
|
||||
)
|
||||
desiredCount, err := scaler.HandleDesiredRunnerCount(ctx, initialSession.Statistics.TotalAssignedJobs)
|
||||
if err != nil {
|
||||
if _, err := scaler.HandleDesiredRunnerCount(ctx, initialSession.Statistics.TotalAssignedJobs); err != nil {
|
||||
return fmt.Errorf("handling initial message failed: %w", err)
|
||||
}
|
||||
l.metricsRecorder.RecordDesiredRunners(desiredCount)
|
||||
}
|
||||
|
||||
var lastMessageID int
|
||||
@@ -187,7 +142,7 @@ func (l *Listener) Run(ctx context.Context, scaler Scaler) error {
|
||||
}
|
||||
|
||||
if msg == nil {
|
||||
_, err := scaler.HandleDesiredRunnerCount(ctx, l.latestStatistics.TotalAssignedJobs)
|
||||
_, err := scaler.HandleDesiredRunnerCount(ctx, 0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("handling nil message failed: %w", err)
|
||||
}
|
||||
@@ -205,58 +160,24 @@ func (l *Listener) Run(ctx context.Context, scaler Scaler) error {
|
||||
}
|
||||
|
||||
func (l *Listener) handleMessage(ctx context.Context, handler Scaler, msg *scaleset.RunnerScaleSetMessage) error {
|
||||
l.handleStatistics(ctx, msg.Statistics)
|
||||
|
||||
if err := l.client.DeleteMessage(ctx, msg.MessageID); err != nil {
|
||||
return fmt.Errorf("failed to delete message: %w", err)
|
||||
}
|
||||
|
||||
if len(msg.JobAvailableMessages) > 0 {
|
||||
if err := l.acquireAvailableJobs(ctx, msg.JobAvailableMessages); err != nil {
|
||||
return fmt.Errorf("failed to acquire available jobs: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, jobStarted := range msg.JobStartedMessages {
|
||||
l.metricsRecorder.RecordJobStarted(jobStarted)
|
||||
if err := handler.HandleJobStarted(ctx, jobStarted); err != nil {
|
||||
return fmt.Errorf("failed to handle job started: %w", err)
|
||||
}
|
||||
}
|
||||
for _, jobCompleted := range msg.JobCompletedMessages {
|
||||
l.metricsRecorder.RecordJobCompleted(jobCompleted)
|
||||
if err := handler.HandleJobCompleted(ctx, jobCompleted); err != nil {
|
||||
return fmt.Errorf("failed to handle job completed: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
desiredCount, err := handler.HandleDesiredRunnerCount(ctx, msg.Statistics.TotalAssignedJobs)
|
||||
if err != nil {
|
||||
if _, err := handler.HandleDesiredRunnerCount(ctx, msg.Statistics.TotalAssignedJobs); err != nil {
|
||||
return fmt.Errorf("failed to handle desired runner count: %w", err)
|
||||
}
|
||||
l.metricsRecorder.RecordDesiredRunners(desiredCount)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *Listener) acquireAvailableJobs(ctx context.Context, jobsAvailable []*scaleset.JobAvailable) error {
|
||||
ids := make([]int64, 0, len(jobsAvailable))
|
||||
for _, job := range jobsAvailable {
|
||||
ids = append(ids, job.RunnerRequestID)
|
||||
}
|
||||
|
||||
l.logger.Info("Acquiring jobs", slog.Int("count", len(ids)))
|
||||
|
||||
acquired, err := l.client.AcquireJobs(ctx, ids)
|
||||
if err != nil {
|
||||
return fmt.Errorf("acquiring jobs: %w", err)
|
||||
}
|
||||
|
||||
l.logger.Info("Jobs acquired", slog.Int("count", len(acquired)))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *Listener) handleStatistics(ctx context.Context, msg *scaleset.RunnerScaleSetStatistic) {
|
||||
l.latestStatistics = msg
|
||||
l.metricsRecorder.RecordStatistics(msg)
|
||||
}
|
||||
|
||||
+51
-48
@@ -75,37 +75,39 @@ func TestListener_Run(t *testing.T) {
|
||||
client := NewMockClient(t)
|
||||
|
||||
uuid := uuid.New()
|
||||
initialStatistics := &scaleset.RunnerScaleSetStatistic{
|
||||
TotalAssignedJobs: 2,
|
||||
}
|
||||
session := scaleset.RunnerScaleSetSession{
|
||||
SessionID: uuid,
|
||||
OwnerName: "example",
|
||||
RunnerScaleSet: &scaleset.RunnerScaleSet{},
|
||||
MessageQueueURL: "https://example.com",
|
||||
MessageQueueAccessToken: "1234567890",
|
||||
Statistics: initialStatistics,
|
||||
Statistics: &scaleset.RunnerScaleSetStatistic{},
|
||||
}
|
||||
|
||||
client.On("Session").Return(session).Once()
|
||||
|
||||
metricsRecorder := NewMockMetricsRecorder(t)
|
||||
metricsRecorder.On("RecordStatistics", initialStatistics).Once()
|
||||
metricsRecorder.On("RecordDesiredRunners", initialStatistics.TotalAssignedJobs).
|
||||
Return(initialStatistics.TotalAssignedJobs, nil).
|
||||
Run(func(mock.Arguments) { cancel() }).
|
||||
Once()
|
||||
|
||||
l, err := New(client, config, WithMetricsRecorder(metricsRecorder))
|
||||
l, err := New(client, config)
|
||||
require.Nil(t, err)
|
||||
|
||||
var called bool
|
||||
handler := NewMockScaler(t)
|
||||
handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything).
|
||||
Return(initialStatistics.TotalAssignedJobs, nil).
|
||||
handler.On(
|
||||
"HandleDesiredRunnerCount",
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
).
|
||||
Return(0, nil).
|
||||
Run(
|
||||
func(mock.Arguments) {
|
||||
called = true
|
||||
cancel()
|
||||
},
|
||||
).
|
||||
Once()
|
||||
|
||||
err = l.Run(ctx, handler)
|
||||
assert.ErrorIs(t, err, context.Canceled)
|
||||
assert.True(t, called)
|
||||
})
|
||||
|
||||
t.Run("cancel context after get message", func(t *testing.T) {
|
||||
@@ -118,60 +120,61 @@ func TestListener_Run(t *testing.T) {
|
||||
MaxRunners: 10,
|
||||
}
|
||||
|
||||
client := NewMockClient(t)
|
||||
uuid := uuid.New()
|
||||
initialStatistics := &scaleset.RunnerScaleSetStatistic{
|
||||
TotalAssignedJobs: 2,
|
||||
}
|
||||
|
||||
session := scaleset.RunnerScaleSetSession{
|
||||
SessionID: uuid,
|
||||
OwnerName: "example",
|
||||
RunnerScaleSet: &scaleset.RunnerScaleSet{},
|
||||
MessageQueueURL: "https://example.com",
|
||||
MessageQueueAccessToken: "1234567890",
|
||||
Statistics: initialStatistics,
|
||||
Statistics: &scaleset.RunnerScaleSetStatistic{},
|
||||
}
|
||||
|
||||
msg := &scaleset.RunnerScaleSetMessage{
|
||||
MessageID: 1,
|
||||
Statistics: &scaleset.RunnerScaleSetStatistic{
|
||||
TotalAssignedJobs: 3,
|
||||
},
|
||||
MessageID: 1,
|
||||
Statistics: &scaleset.RunnerScaleSetStatistic{},
|
||||
}
|
||||
|
||||
metricsRecorder := NewMockMetricsRecorder(t)
|
||||
client := NewMockClient(t)
|
||||
handler := NewMockScaler(t)
|
||||
|
||||
client.On("Session").Return(session).Once()
|
||||
metricsRecorder.On("RecordStatistics", initialStatistics).Once()
|
||||
metricsRecorder.On("RecordDesiredRunners", initialStatistics.TotalAssignedJobs).
|
||||
Return(initialStatistics.TotalAssignedJobs, nil).
|
||||
Once()
|
||||
handler.On("HandleDesiredRunnerCount", mock.Anything, initialStatistics.TotalAssignedJobs).
|
||||
Return(initialStatistics.TotalAssignedJobs, nil).
|
||||
Once()
|
||||
|
||||
client.On("GetMessage", ctx, mock.Anything, 10).
|
||||
client.On(
|
||||
"GetMessage",
|
||||
ctx,
|
||||
mock.Anything,
|
||||
10,
|
||||
).
|
||||
Return(msg, nil).
|
||||
Run(func(mock.Arguments) { cancel() }).
|
||||
Run(
|
||||
func(mock.Arguments) {
|
||||
cancel()
|
||||
},
|
||||
).
|
||||
Once()
|
||||
|
||||
metricsRecorder.On("RecordStatistics", msg.Statistics).Once()
|
||||
// Ensure delete message is called without cancel
|
||||
client.On("DeleteMessage", context.WithoutCancel(ctx), mock.Anything).
|
||||
Return(nil).
|
||||
client.On(
|
||||
"DeleteMessage",
|
||||
context.WithoutCancel(ctx),
|
||||
mock.Anything,
|
||||
).Return(nil).Once()
|
||||
|
||||
handler := NewMockScaler(t)
|
||||
handler.On(
|
||||
"HandleDesiredRunnerCount",
|
||||
mock.Anything,
|
||||
0,
|
||||
).
|
||||
Return(0, nil).
|
||||
Once()
|
||||
|
||||
metricsRecorder.On("RecordDesiredRunners", msg.Statistics.TotalAssignedJobs).
|
||||
Return(msg.Statistics.TotalAssignedJobs, nil).
|
||||
handler.On(
|
||||
"HandleDesiredRunnerCount",
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
).
|
||||
Return(0, nil).
|
||||
Once()
|
||||
|
||||
handler.On("HandleDesiredRunnerCount", mock.Anything, msg.Statistics.TotalAssignedJobs).
|
||||
Return(msg.Statistics.TotalAssignedJobs, nil).
|
||||
Once()
|
||||
|
||||
l, err := New(client, config, WithMetricsRecorder(metricsRecorder))
|
||||
l, err := New(client, config)
|
||||
require.Nil(t, err)
|
||||
|
||||
err = l.Run(ctx, handler)
|
||||
|
||||
@@ -38,74 +38,6 @@ func (_m *MockClient) EXPECT() *MockClient_Expecter {
|
||||
return &MockClient_Expecter{mock: &_m.Mock}
|
||||
}
|
||||
|
||||
// AcquireJobs provides a mock function for the type MockClient
|
||||
func (_mock *MockClient) AcquireJobs(ctx context.Context, requestIDs []int64) ([]int64, error) {
|
||||
ret := _mock.Called(ctx, requestIDs)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for AcquireJobs")
|
||||
}
|
||||
|
||||
var r0 []int64
|
||||
var r1 error
|
||||
if returnFunc, ok := ret.Get(0).(func(context.Context, []int64) ([]int64, error)); ok {
|
||||
return returnFunc(ctx, requestIDs)
|
||||
}
|
||||
if returnFunc, ok := ret.Get(0).(func(context.Context, []int64) []int64); ok {
|
||||
r0 = returnFunc(ctx, requestIDs)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).([]int64)
|
||||
}
|
||||
}
|
||||
if returnFunc, ok := ret.Get(1).(func(context.Context, []int64) error); ok {
|
||||
r1 = returnFunc(ctx, requestIDs)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockClient_AcquireJobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AcquireJobs'
|
||||
type MockClient_AcquireJobs_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// AcquireJobs is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - requestIDs []int64
|
||||
func (_e *MockClient_Expecter) AcquireJobs(ctx interface{}, requestIDs interface{}) *MockClient_AcquireJobs_Call {
|
||||
return &MockClient_AcquireJobs_Call{Call: _e.mock.On("AcquireJobs", ctx, requestIDs)}
|
||||
}
|
||||
|
||||
func (_c *MockClient_AcquireJobs_Call) Run(run func(ctx context.Context, requestIDs []int64)) *MockClient_AcquireJobs_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
var arg0 context.Context
|
||||
if args[0] != nil {
|
||||
arg0 = args[0].(context.Context)
|
||||
}
|
||||
var arg1 []int64
|
||||
if args[1] != nil {
|
||||
arg1 = args[1].([]int64)
|
||||
}
|
||||
run(
|
||||
arg0,
|
||||
arg1,
|
||||
)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockClient_AcquireJobs_Call) Return(int64s []int64, err error) *MockClient_AcquireJobs_Call {
|
||||
_c.Call.Return(int64s, err)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockClient_AcquireJobs_Call) RunAndReturn(run func(ctx context.Context, requestIDs []int64) ([]int64, error)) *MockClient_AcquireJobs_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// DeleteMessage provides a mock function for the type MockClient
|
||||
func (_mock *MockClient) DeleteMessage(ctx context.Context, messageID int) error {
|
||||
ret := _mock.Called(ctx, messageID)
|
||||
@@ -281,193 +213,6 @@ func (_c *MockClient_Session_Call) RunAndReturn(run func() scaleset.RunnerScaleS
|
||||
return _c
|
||||
}
|
||||
|
||||
// NewMockMetricsRecorder creates a new instance of MockMetricsRecorder. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
// The first argument is typically a *testing.T value.
|
||||
func NewMockMetricsRecorder(t interface {
|
||||
mock.TestingT
|
||||
Cleanup(func())
|
||||
}) *MockMetricsRecorder {
|
||||
mock := &MockMetricsRecorder{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
// MockMetricsRecorder is an autogenerated mock type for the MetricsRecorder type
|
||||
type MockMetricsRecorder struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
type MockMetricsRecorder_Expecter struct {
|
||||
mock *mock.Mock
|
||||
}
|
||||
|
||||
func (_m *MockMetricsRecorder) EXPECT() *MockMetricsRecorder_Expecter {
|
||||
return &MockMetricsRecorder_Expecter{mock: &_m.Mock}
|
||||
}
|
||||
|
||||
// RecordDesiredRunners provides a mock function for the type MockMetricsRecorder
|
||||
func (_mock *MockMetricsRecorder) RecordDesiredRunners(count int) {
|
||||
_mock.Called(count)
|
||||
return
|
||||
}
|
||||
|
||||
// MockMetricsRecorder_RecordDesiredRunners_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordDesiredRunners'
|
||||
type MockMetricsRecorder_RecordDesiredRunners_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// RecordDesiredRunners is a helper method to define mock.On call
|
||||
// - count int
|
||||
func (_e *MockMetricsRecorder_Expecter) RecordDesiredRunners(count interface{}) *MockMetricsRecorder_RecordDesiredRunners_Call {
|
||||
return &MockMetricsRecorder_RecordDesiredRunners_Call{Call: _e.mock.On("RecordDesiredRunners", count)}
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordDesiredRunners_Call) Run(run func(count int)) *MockMetricsRecorder_RecordDesiredRunners_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
var arg0 int
|
||||
if args[0] != nil {
|
||||
arg0 = args[0].(int)
|
||||
}
|
||||
run(
|
||||
arg0,
|
||||
)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordDesiredRunners_Call) Return() *MockMetricsRecorder_RecordDesiredRunners_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordDesiredRunners_Call) RunAndReturn(run func(count int)) *MockMetricsRecorder_RecordDesiredRunners_Call {
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// RecordJobCompleted provides a mock function for the type MockMetricsRecorder
|
||||
func (_mock *MockMetricsRecorder) RecordJobCompleted(msg *scaleset.JobCompleted) {
|
||||
_mock.Called(msg)
|
||||
return
|
||||
}
|
||||
|
||||
// MockMetricsRecorder_RecordJobCompleted_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordJobCompleted'
|
||||
type MockMetricsRecorder_RecordJobCompleted_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// RecordJobCompleted is a helper method to define mock.On call
|
||||
// - msg *scaleset.JobCompleted
|
||||
func (_e *MockMetricsRecorder_Expecter) RecordJobCompleted(msg interface{}) *MockMetricsRecorder_RecordJobCompleted_Call {
|
||||
return &MockMetricsRecorder_RecordJobCompleted_Call{Call: _e.mock.On("RecordJobCompleted", msg)}
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordJobCompleted_Call) Run(run func(msg *scaleset.JobCompleted)) *MockMetricsRecorder_RecordJobCompleted_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
var arg0 *scaleset.JobCompleted
|
||||
if args[0] != nil {
|
||||
arg0 = args[0].(*scaleset.JobCompleted)
|
||||
}
|
||||
run(
|
||||
arg0,
|
||||
)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordJobCompleted_Call) Return() *MockMetricsRecorder_RecordJobCompleted_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordJobCompleted_Call) RunAndReturn(run func(msg *scaleset.JobCompleted)) *MockMetricsRecorder_RecordJobCompleted_Call {
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// RecordJobStarted provides a mock function for the type MockMetricsRecorder
|
||||
func (_mock *MockMetricsRecorder) RecordJobStarted(msg *scaleset.JobStarted) {
|
||||
_mock.Called(msg)
|
||||
return
|
||||
}
|
||||
|
||||
// MockMetricsRecorder_RecordJobStarted_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordJobStarted'
|
||||
type MockMetricsRecorder_RecordJobStarted_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// RecordJobStarted is a helper method to define mock.On call
|
||||
// - msg *scaleset.JobStarted
|
||||
func (_e *MockMetricsRecorder_Expecter) RecordJobStarted(msg interface{}) *MockMetricsRecorder_RecordJobStarted_Call {
|
||||
return &MockMetricsRecorder_RecordJobStarted_Call{Call: _e.mock.On("RecordJobStarted", msg)}
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordJobStarted_Call) Run(run func(msg *scaleset.JobStarted)) *MockMetricsRecorder_RecordJobStarted_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
var arg0 *scaleset.JobStarted
|
||||
if args[0] != nil {
|
||||
arg0 = args[0].(*scaleset.JobStarted)
|
||||
}
|
||||
run(
|
||||
arg0,
|
||||
)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordJobStarted_Call) Return() *MockMetricsRecorder_RecordJobStarted_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordJobStarted_Call) RunAndReturn(run func(msg *scaleset.JobStarted)) *MockMetricsRecorder_RecordJobStarted_Call {
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// RecordStatistics provides a mock function for the type MockMetricsRecorder
|
||||
func (_mock *MockMetricsRecorder) RecordStatistics(statistics *scaleset.RunnerScaleSetStatistic) {
|
||||
_mock.Called(statistics)
|
||||
return
|
||||
}
|
||||
|
||||
// MockMetricsRecorder_RecordStatistics_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RecordStatistics'
|
||||
type MockMetricsRecorder_RecordStatistics_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// RecordStatistics is a helper method to define mock.On call
|
||||
// - statistics *scaleset.RunnerScaleSetStatistic
|
||||
func (_e *MockMetricsRecorder_Expecter) RecordStatistics(statistics interface{}) *MockMetricsRecorder_RecordStatistics_Call {
|
||||
return &MockMetricsRecorder_RecordStatistics_Call{Call: _e.mock.On("RecordStatistics", statistics)}
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordStatistics_Call) Run(run func(statistics *scaleset.RunnerScaleSetStatistic)) *MockMetricsRecorder_RecordStatistics_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
var arg0 *scaleset.RunnerScaleSetStatistic
|
||||
if args[0] != nil {
|
||||
arg0 = args[0].(*scaleset.RunnerScaleSetStatistic)
|
||||
}
|
||||
run(
|
||||
arg0,
|
||||
)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordStatistics_Call) Return() *MockMetricsRecorder_RecordStatistics_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockMetricsRecorder_RecordStatistics_Call) RunAndReturn(run func(statistics *scaleset.RunnerScaleSetStatistic)) *MockMetricsRecorder_RecordStatistics_Call {
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// NewMockScaler creates a new instance of MockScaler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
// The first argument is typically a *testing.T value.
|
||||
func NewMockScaler(t interface {
|
||||
|
||||
@@ -234,71 +234,7 @@ func (c *MessageSessionClient) Session() RunnerScaleSetSession {
|
||||
return *c.session
|
||||
}
|
||||
|
||||
// AcquireJobs acquires the given job request IDs from the runner scale set.
|
||||
// If the current session token is expired, it refreshes the session and tries one more time.
|
||||
func (c *MessageSessionClient) AcquireJobs(ctx context.Context, requestIDs []int64) ([]int64, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
ids, err := c.acquireJobs(ctx, requestIDs)
|
||||
if err == nil {
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
if !errors.Is(err, MessageQueueTokenExpiredError) {
|
||||
return nil, fmt.Errorf("failed to acquire jobs: %w", err)
|
||||
}
|
||||
|
||||
if err := c.refreshMessageSession(ctx); err != nil {
|
||||
return nil, fmt.Errorf("failed to refresh message session: %w", err)
|
||||
}
|
||||
|
||||
return c.acquireJobs(ctx, requestIDs)
|
||||
}
|
||||
|
||||
func (c *MessageSessionClient) acquireJobs(ctx context.Context, requestIDs []int64) ([]int64, error) {
|
||||
body, err := json.Marshal(requestIDs)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal request ids: %w", err)
|
||||
}
|
||||
|
||||
path := fmt.Sprintf("/%s/%d/acquirejobs", scaleSetEndpoint, c.scaleSetID)
|
||||
|
||||
c.innerClient.mu.Lock()
|
||||
req, err := c.innerClient.newActionsServiceRequest(ctx, http.MethodPost, path, bytes.NewBuffer(body))
|
||||
c.innerClient.mu.Unlock()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create acquire jobs request: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.session.MessageQueueAccessToken))
|
||||
|
||||
resp, err := c.commonClient.do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to issue acquire jobs request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == http.StatusUnauthorized {
|
||||
return nil, newRequestResponseError(req, resp, MessageQueueTokenExpiredError)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, newRequestResponseError(req, resp, fmt.Errorf("unexpected status code %s", resp.Status))
|
||||
}
|
||||
|
||||
var result acquireJobsResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return nil, newRequestResponseError(req, resp, fmt.Errorf("failed to decode acquire jobs response: %w", err))
|
||||
}
|
||||
|
||||
return result.Value, nil
|
||||
}
|
||||
|
||||
func (c *MessageSessionClient) doSessionRequest(ctx context.Context, method, path string, requestData io.Reader, expectedResponseStatusCode int, responseUnmarshalTarget any) error {
|
||||
c.innerClient.mu.Lock()
|
||||
defer c.innerClient.mu.Unlock()
|
||||
|
||||
req, err := c.innerClient.newActionsServiceRequest(ctx, method, path, requestData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create new actions service request: %w", err)
|
||||
|
||||
@@ -649,231 +649,3 @@ func TestDeleteMessage(t *testing.T) {
|
||||
assert.Contains(t, err.Error(), "unexpected status code")
|
||||
})
|
||||
}
|
||||
|
||||
func TestAcquireJobs(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
auth := actionsAuth{
|
||||
token: "token",
|
||||
}
|
||||
|
||||
t.Run("Acquire jobs successfully", func(t *testing.T) {
|
||||
requestIDs := []int64{1, 2}
|
||||
want := []int64{1, 2}
|
||||
response := []byte(`{"count":2,"value":[1,2]}`)
|
||||
|
||||
var handleSessionRequest http.HandlerFunc
|
||||
server := newActionsServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.Contains(r.URL.Path, "acquirejobs") {
|
||||
assert.Equal(t, http.MethodPost, r.Method)
|
||||
assert.Contains(t, r.URL.Path, "acquirejobs")
|
||||
assert.True(t, strings.HasPrefix(r.Header.Get("Authorization"), "Bearer"), "expected Bearer authorization header")
|
||||
|
||||
var gotIDs []int64
|
||||
err := json.NewDecoder(r.Body).Decode(&gotIDs)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, requestIDs, gotIDs)
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(response)
|
||||
return
|
||||
}
|
||||
if strings.HasSuffix(r.URL.Path, "sessions") {
|
||||
handleSessionRequest(w, r)
|
||||
return
|
||||
}
|
||||
if strings.Contains(r.URL.Path, "/sessions/") {
|
||||
handleSessionRequest(w, r)
|
||||
return
|
||||
}
|
||||
}))
|
||||
handleSessionRequest = newTestSessionRequestHandler(t, server.testRunnerScaleSetSession())
|
||||
|
||||
client, err := newClient(
|
||||
testSystemInfo,
|
||||
server.configURLForOrg("my-org"),
|
||||
auth,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
sessionClient, err := client.MessageSessionClient(ctx, 1, "my-org")
|
||||
require.NoError(t, err)
|
||||
|
||||
got, err := sessionClient.AcquireJobs(ctx, requestIDs)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, want, got)
|
||||
})
|
||||
|
||||
t.Run("Message token expired", func(t *testing.T) {
|
||||
var handleSessionRequest http.HandlerFunc
|
||||
server := newActionsServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.Contains(r.URL.Path, "acquirejobs") {
|
||||
w.WriteHeader(http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
// create session
|
||||
if strings.HasSuffix(r.URL.Path, "sessions") {
|
||||
handleSessionRequest(w, r)
|
||||
return
|
||||
}
|
||||
// refresh
|
||||
if strings.Contains(r.URL.Path, "/sessions/") {
|
||||
handleSessionRequest(w, r)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusUnauthorized)
|
||||
}))
|
||||
handleSessionRequest = newTestSessionRequestHandler(t, server.testRunnerScaleSetSession())
|
||||
|
||||
client, err := newClient(
|
||||
testSystemInfo,
|
||||
server.configURLForOrg("my-org"),
|
||||
auth,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
sessionClient, err := client.MessageSessionClient(ctx, 1, "my-org")
|
||||
require.NoError(t, err)
|
||||
|
||||
got, err := sessionClient.AcquireJobs(ctx, []int64{1})
|
||||
assert.Nil(t, got)
|
||||
assert.ErrorIs(t, err, MessageQueueTokenExpiredError, "expected error to be MessageQueueTokenExpiredError but got: %v", err)
|
||||
})
|
||||
|
||||
t.Run("Message token refreshed", func(t *testing.T) {
|
||||
want := []int64{1, 2}
|
||||
afterRefreshResponse := []byte(`{"count":2,"value":[1,2]}`)
|
||||
|
||||
var handleSessionRequest http.HandlerFunc
|
||||
type state int
|
||||
const (
|
||||
createSession state = iota
|
||||
firstAcquire
|
||||
refreshToken
|
||||
secondAcquire
|
||||
)
|
||||
currentState := createSession
|
||||
server := newActionsServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.Contains(r.URL.Path, "acquirejobs") {
|
||||
if currentState == firstAcquire {
|
||||
w.WriteHeader(http.StatusUnauthorized)
|
||||
currentState = refreshToken
|
||||
return
|
||||
}
|
||||
require.Equal(t, secondAcquire, currentState)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write(afterRefreshResponse)
|
||||
return
|
||||
}
|
||||
// create session
|
||||
if strings.HasSuffix(r.URL.Path, "sessions") {
|
||||
require.Equal(t, createSession, currentState)
|
||||
handleSessionRequest(w, r)
|
||||
currentState = firstAcquire
|
||||
return
|
||||
}
|
||||
// refresh
|
||||
if strings.Contains(r.URL.Path, "/sessions/") {
|
||||
require.Equal(t, refreshToken, currentState)
|
||||
handleSessionRequest(w, r)
|
||||
currentState = secondAcquire
|
||||
return
|
||||
}
|
||||
}))
|
||||
handleSessionRequest = newTestSessionRequestHandler(t, server.testRunnerScaleSetSession())
|
||||
|
||||
client, err := newClient(
|
||||
testSystemInfo,
|
||||
server.configURLForOrg("my-org"),
|
||||
auth,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
sessionClient, err := client.MessageSessionClient(ctx, 1, "my-org")
|
||||
require.NoError(t, err)
|
||||
|
||||
got, err := sessionClient.AcquireJobs(ctx, []int64{1, 2})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, want, got)
|
||||
})
|
||||
|
||||
t.Run("Server error", func(t *testing.T) {
|
||||
var handleSessionRequest http.HandlerFunc
|
||||
server := newActionsServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.Contains(r.URL.Path, "acquirejobs") {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if strings.HasSuffix(r.URL.Path, "sessions") {
|
||||
handleSessionRequest(w, r)
|
||||
return
|
||||
}
|
||||
if strings.Contains(r.URL.Path, "/sessions/") {
|
||||
handleSessionRequest(w, r)
|
||||
return
|
||||
}
|
||||
}))
|
||||
handleSessionRequest = newTestSessionRequestHandler(t, server.testRunnerScaleSetSession())
|
||||
|
||||
retryMax := 1
|
||||
client, err := newClient(
|
||||
testSystemInfo,
|
||||
server.configURLForOrg("my-org"),
|
||||
auth,
|
||||
WithRetryMax(retryMax),
|
||||
WithRetryWaitMax(1*time.Nanosecond),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
sessionClient, err := client.MessageSessionClient(
|
||||
ctx,
|
||||
1,
|
||||
"my-org",
|
||||
WithRetryMax(retryMax),
|
||||
WithRetryWaitMax(1*time.Nanosecond),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
got, err := sessionClient.AcquireJobs(ctx, []int64{1})
|
||||
assert.Nil(t, got)
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
|
||||
t.Run("Empty request IDs", func(t *testing.T) {
|
||||
response := []byte(`{"count":0,"value":[]}`)
|
||||
|
||||
var handleSessionRequest http.HandlerFunc
|
||||
server := newActionsServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.Contains(r.URL.Path, "acquirejobs") {
|
||||
assert.Equal(t, http.MethodPost, r.Method)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(response)
|
||||
return
|
||||
}
|
||||
if strings.HasSuffix(r.URL.Path, "sessions") {
|
||||
handleSessionRequest(w, r)
|
||||
return
|
||||
}
|
||||
if strings.Contains(r.URL.Path, "/sessions/") {
|
||||
handleSessionRequest(w, r)
|
||||
return
|
||||
}
|
||||
}))
|
||||
handleSessionRequest = newTestSessionRequestHandler(t, server.testRunnerScaleSetSession())
|
||||
|
||||
client, err := newClient(
|
||||
testSystemInfo,
|
||||
server.configURLForOrg("my-org"),
|
||||
auth,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
sessionClient, err := client.MessageSessionClient(ctx, 1, "my-org")
|
||||
require.NoError(t, err)
|
||||
|
||||
got, err := sessionClient.AcquireJobs(ctx, []int64{})
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, got)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -12,17 +12,11 @@ type MessageType string
|
||||
|
||||
// message types
|
||||
const (
|
||||
MessageTypeJobAvailable MessageType = "JobAvailable"
|
||||
MessageTypeJobAssigned MessageType = "JobAssigned"
|
||||
MessageTypeJobStarted MessageType = "JobStarted"
|
||||
MessageTypeJobCompleted MessageType = "JobCompleted"
|
||||
)
|
||||
|
||||
type JobAvailable struct {
|
||||
AcquireJobURL string `json:"acquireJobUrl"`
|
||||
JobMessageBase
|
||||
}
|
||||
|
||||
type JobAssigned struct {
|
||||
JobMessageBase
|
||||
}
|
||||
@@ -105,7 +99,6 @@ type runnerScaleSetMessageResponse struct {
|
||||
type RunnerScaleSetMessage struct {
|
||||
MessageID int
|
||||
Statistics *RunnerScaleSetStatistic
|
||||
JobAvailableMessages []*JobAvailable
|
||||
JobAssignedMessages []*JobAssigned
|
||||
JobStartedMessages []*JobStarted
|
||||
JobCompletedMessages []*JobCompleted
|
||||
@@ -116,11 +109,6 @@ type runnerScaleSetsResponse struct {
|
||||
RunnerScaleSets []RunnerScaleSet `json:"value"`
|
||||
}
|
||||
|
||||
type acquireJobsResponse struct {
|
||||
Count int `json:"count"`
|
||||
Value []int64 `json:"value"`
|
||||
}
|
||||
|
||||
type RunnerScaleSetSession struct {
|
||||
SessionID uuid.UUID `json:"sessionId,omitempty"`
|
||||
OwnerName string `json:"ownerName,omitempty"`
|
||||
|
||||
Reference in New Issue
Block a user