Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 72647ae108 | |||
| 02ad99e7fe | |||
| 22bae1217e | |||
| a0708d5ea7 | |||
| 0325c63c37 | |||
| 48afc028b8 |
@@ -0,0 +1,18 @@
|
||||
version: 2
|
||||
updates:
|
||||
- package-ecosystem: "gomod" # See documentation for possible values
|
||||
directory: "/" # Location of package manifests
|
||||
schedule:
|
||||
interval: "weekly"
|
||||
groups:
|
||||
gomod:
|
||||
patterns:
|
||||
- "*"
|
||||
- package-ecosystem: github-actions
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: "weekly"
|
||||
groups:
|
||||
actions:
|
||||
patterns:
|
||||
- "*"
|
||||
@@ -19,7 +19,7 @@ jobs:
|
||||
fmt:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v5
|
||||
- uses: actions/checkout@v6
|
||||
- uses: actions/setup-go@v6
|
||||
with:
|
||||
go-version-file: "go.mod"
|
||||
@@ -32,7 +32,7 @@ jobs:
|
||||
mocks:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v5
|
||||
- uses: actions/checkout@v6
|
||||
- uses: actions/setup-go@v6
|
||||
with:
|
||||
go-version-file: "go.mod"
|
||||
@@ -45,13 +45,13 @@ jobs:
|
||||
lint:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v5
|
||||
- uses: actions/checkout@v6
|
||||
- uses: actions/setup-go@v6
|
||||
with:
|
||||
go-version-file: "go.mod"
|
||||
cache: false
|
||||
- name: golangci-lint
|
||||
uses: golangci/golangci-lint-action@4afd733a84b1f43292c63897423277bb7f4313a9
|
||||
uses: golangci/golangci-lint-action@1e7e51e771db61008b38414a730f564565cf7c20
|
||||
with:
|
||||
only-new-issues: true
|
||||
version: v2.5.0
|
||||
@@ -59,7 +59,7 @@ jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v5
|
||||
- uses: actions/checkout@v6
|
||||
- uses: actions/setup-go@v6
|
||||
with:
|
||||
go-version-file: "go.mod"
|
||||
|
||||
@@ -510,6 +510,14 @@ func parseRunnerScaleSetMessageResponse(respBody io.Reader) (*RunnerScaleSetMess
|
||||
}
|
||||
|
||||
switch messageType.MessageType {
|
||||
case MessageTypeJobAvailable:
|
||||
var jobAvailable JobAvailable
|
||||
if err := json.Unmarshal(msg, &jobAvailable); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode job available: %w", err)
|
||||
}
|
||||
|
||||
message.JobAvailableMessages = append(message.JobAvailableMessages, &jobAvailable)
|
||||
|
||||
case MessageTypeJobAssigned:
|
||||
var jobAssigned JobAssigned
|
||||
if err := json.Unmarshal(msg, &jobAssigned); err != nil {
|
||||
|
||||
+1
-3
@@ -120,9 +120,7 @@ func (o *httpClientOption) newRetryableHTTPClient() (*retryablehttp.Client, erro
|
||||
retryClient.HTTPClient.Timeout = o.timeout
|
||||
}
|
||||
|
||||
if retryClient.Logger == nil {
|
||||
retryClient.Logger = o.logger
|
||||
}
|
||||
retryClient.Logger = o.logger
|
||||
|
||||
transport, ok := retryClient.HTTPClient.Transport.(*http.Transport)
|
||||
if !ok {
|
||||
|
||||
@@ -3,6 +3,7 @@ package scaleset
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
@@ -156,6 +157,59 @@ func TestUserAgent(t *testing.T) {
|
||||
assert.Equal(t, want, got)
|
||||
}
|
||||
|
||||
func TestWithLogger(t *testing.T) {
|
||||
newJSONHandler := func() slog.Handler {
|
||||
return slog.NewJSONHandler(
|
||||
io.Discard,
|
||||
&slog.HandlerOptions{
|
||||
AddSource: true,
|
||||
Level: slog.LevelError,
|
||||
},
|
||||
)
|
||||
}
|
||||
t.Run("WithLogger(nil) sets a discard logger on raw httpClientOption", func(t *testing.T) {
|
||||
opts := httpClientOption{}
|
||||
WithLogger(nil)(&opts)
|
||||
require.NotNil(t, opts.logger, "WithLogger(nil) should set a discard logger, not leave it nil")
|
||||
assert.Equal(t, slog.DiscardHandler, opts.logger.Handler(), "WithLogger(nil) should set a discard logger handler")
|
||||
})
|
||||
|
||||
t.Run("WithLogger(customLogger) assigns the provided logger", func(t *testing.T) {
|
||||
handler := newJSONHandler()
|
||||
customLogger := slog.New(handler)
|
||||
opts := httpClientOption{}
|
||||
WithLogger(customLogger)(&opts)
|
||||
require.Equal(t, customLogger, opts.logger, "WithLogger should assign the provided logger")
|
||||
assert.Equal(t, handler, opts.logger.Handler(), "WithLogger should set the provided logger handler")
|
||||
})
|
||||
|
||||
t.Run("WithLogger(nil) propagates discard logger to retryable HTTP client", func(t *testing.T) {
|
||||
opts := httpClientOption{}
|
||||
WithLogger(nil)(&opts)
|
||||
client, err := opts.newRetryableHTTPClient()
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, client.Logger, "retryable client should have logger set from WithLogger(nil)")
|
||||
|
||||
logger, ok := client.Logger.(*slog.Logger)
|
||||
require.True(t, ok, "retryable client logger should be a *slog.Logger")
|
||||
assert.Same(t, opts.logger, logger, "retryable client logger should be the same logger set by WithLogger(nil)")
|
||||
assert.Equal(t, slog.DiscardHandler, logger.Handler(), "retryable client logger should be a discard logger from WithLogger(nil)")
|
||||
})
|
||||
|
||||
t.Run("WithLogger(customLogger) propagates custom logger to retryable HTTP client", func(t *testing.T) {
|
||||
handler := newJSONHandler()
|
||||
customLogger := slog.New(handler)
|
||||
opts := httpClientOption{}
|
||||
WithLogger(customLogger)(&opts)
|
||||
client, err := opts.newRetryableHTTPClient()
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, client.Logger, "retryable client should have logger set")
|
||||
logger, ok := client.Logger.(*slog.Logger)
|
||||
require.True(t, ok, "retryable client logger should be a *slog.Logger")
|
||||
assert.Equal(t, handler, logger.Handler(), "retryable client logger should be the custom logger from WithLogger")
|
||||
})
|
||||
}
|
||||
|
||||
// TestWithRetryableHTTPClient verifies that a custom retryable HTTP client
|
||||
// provided via WithRetryableHTTPClient is actually used instead of the built-in one
|
||||
func TestWithRetryableHTTPClient(t *testing.T) {
|
||||
|
||||
@@ -8,14 +8,15 @@ require (
|
||||
github.com/google/go-github/v79 v79.0.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/hashicorp/go-retryablehttp v0.7.8
|
||||
github.com/spf13/cobra v1.10.1
|
||||
github.com/spf13/cobra v1.10.2
|
||||
github.com/stretchr/testify v1.11.1
|
||||
golang.org/x/net v0.47.0
|
||||
golang.org/x/net v0.52.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/Microsoft/go-winio v0.6.2 // indirect
|
||||
github.com/brunoga/deep v1.2.4 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/containerd/errdefs v1.0.0 // indirect
|
||||
github.com/containerd/errdefs/pkg v0.3.0 // indirect
|
||||
github.com/containerd/log v0.1.0 // indirect
|
||||
@@ -26,7 +27,7 @@ require (
|
||||
github.com/fatih/structs v1.1.0 // indirect
|
||||
github.com/felixge/httpsnoop v1.0.4 // indirect
|
||||
github.com/fsnotify/fsnotify v1.8.0 // indirect
|
||||
github.com/go-logr/logr v1.4.2 // indirect
|
||||
github.com/go-logr/logr v1.4.3 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
|
||||
github.com/google/go-querystring v1.1.0 // indirect
|
||||
@@ -62,23 +63,25 @@ require (
|
||||
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
|
||||
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
|
||||
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
|
||||
go.opentelemetry.io/otel v1.35.0 // indirect
|
||||
go.opentelemetry.io/otel v1.39.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.35.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.35.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.39.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.39.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk/metric v1.39.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.39.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
|
||||
golang.org/x/mod v0.30.0 // indirect
|
||||
golang.org/x/sync v0.18.0 // indirect
|
||||
golang.org/x/sys v0.38.0 // indirect
|
||||
golang.org/x/telemetry v0.0.0-20251111182119-bc8e575c7b54 // indirect
|
||||
golang.org/x/term v0.37.0 // indirect
|
||||
golang.org/x/text v0.31.0 // indirect
|
||||
golang.org/x/mod v0.33.0 // indirect
|
||||
golang.org/x/sync v0.20.0 // indirect
|
||||
golang.org/x/sys v0.42.0 // indirect
|
||||
golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4 // indirect
|
||||
golang.org/x/term v0.41.0 // indirect
|
||||
golang.org/x/text v0.35.0 // indirect
|
||||
golang.org/x/time v0.14.0 // indirect
|
||||
golang.org/x/tools v0.39.0 // indirect
|
||||
google.golang.org/grpc v1.72.2 // indirect
|
||||
google.golang.org/protobuf v1.36.9 // indirect
|
||||
golang.org/x/tools v0.42.0 // indirect
|
||||
google.golang.org/grpc v1.79.3 // indirect
|
||||
google.golang.org/protobuf v1.36.10 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
gotest.tools/v3 v3.5.2 // indirect
|
||||
)
|
||||
|
||||
@@ -6,6 +6,8 @@ github.com/brunoga/deep v1.2.4 h1:Aj9E9oUbE+ccbyh35VC/NHlzzjfIVU69BXu2mt2LmL8=
|
||||
github.com/brunoga/deep v1.2.4/go.mod h1:GDV6dnXqn80ezsLSZ5Wlv1PdKAWAO4L5PnKYtv2dgaI=
|
||||
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
|
||||
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI=
|
||||
github.com/containerd/errdefs v1.0.0/go.mod h1:+YBYIdtsnF4Iw6nWZhJcqGSg/dwvV7tyJ/kCkyJ2k+M=
|
||||
github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151Xdx3ZPPE=
|
||||
@@ -34,8 +36,8 @@ github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw
|
||||
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
|
||||
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
|
||||
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
|
||||
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
|
||||
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||
github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs=
|
||||
@@ -118,16 +120,16 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH
|
||||
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
||||
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
|
||||
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
|
||||
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
||||
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
||||
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
|
||||
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
|
||||
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
|
||||
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
|
||||
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
|
||||
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
|
||||
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||
github.com/spf13/cobra v1.10.1 h1:lJeBwCfmrnXthfAupyUTzJ/J4Nc1RsHC/mSRU2dll/s=
|
||||
github.com/spf13/cobra v1.10.1/go.mod h1:7SmJGaTHFVBY0jW4NXGluQoLvhqFQM+6XSKD+P4XaB0=
|
||||
github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU=
|
||||
github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4=
|
||||
github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||
github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk=
|
||||
github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||
@@ -145,58 +147,59 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHo
|
||||
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
|
||||
github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=
|
||||
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
|
||||
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 h1:sbiXRNDSWJOTobXh5HyQKjq6wUC5tNybqjIqDpAY4CU=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0/go.mod h1:69uWxva0WgAA/4bu2Yy70SLDBwZXuQ6PbBpbsa5iZrQ=
|
||||
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
|
||||
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
|
||||
go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48=
|
||||
go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 h1:1fTNlAIJZGWLP5FVu0fikVry1IsiUnXjf7QFvoNN3Xw=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0/go.mod h1:zjPK58DtkqQFn+YUMbx0M2XV3QgKU0gS9LeGohREyK4=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 h1:xJ2qHD0C1BeYVTLLR9sX12+Qb95kfeD/byKj6Ky1pXg=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0/go.mod h1:u5BF1xyjstDowA1R5QAO9JHzqK+ublenEW/dyqTjBVk=
|
||||
go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
|
||||
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
|
||||
go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY=
|
||||
go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w=
|
||||
go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs=
|
||||
go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
|
||||
go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0=
|
||||
go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs=
|
||||
go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18=
|
||||
go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew=
|
||||
go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI=
|
||||
go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA=
|
||||
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
|
||||
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
|
||||
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
|
||||
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8=
|
||||
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
|
||||
golang.org/x/mod v0.30.0 h1:fDEXFVZ/fmCKProc/yAXXUijritrDzahmwwefnjoPFk=
|
||||
golang.org/x/mod v0.30.0/go.mod h1:lAsf5O2EvJeSFMiBxXDki7sCgAxEUcZHXoXMKT4GJKc=
|
||||
golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY=
|
||||
golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU=
|
||||
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
|
||||
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
|
||||
golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8=
|
||||
golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w=
|
||||
golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0=
|
||||
golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
|
||||
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
|
||||
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
|
||||
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
|
||||
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||
golang.org/x/telemetry v0.0.0-20251111182119-bc8e575c7b54 h1:E2/AqCUMZGgd73TQkxUMcMla25GB9i/5HOdLr+uH7Vo=
|
||||
golang.org/x/telemetry v0.0.0-20251111182119-bc8e575c7b54/go.mod h1:hKdjCMrbv9skySur+Nek8Hd0uJ0GuxJIoIX2payrIdQ=
|
||||
golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU=
|
||||
golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254=
|
||||
golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM=
|
||||
golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM=
|
||||
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
|
||||
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||
golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4 h1:bTLqdHv7xrGlFbvf5/TXNxy/iUwwdkjhqQTJDjW7aj0=
|
||||
golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4/go.mod h1:g5NllXBEermZrmR51cJDQxmJUHUOfRAaNyWBM+R+548=
|
||||
golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU=
|
||||
golang.org/x/term v0.41.0/go.mod h1:3pfBgksrReYfZ5lvYM0kSO0LIkAl4Yl2bXOkKP7Ec2A=
|
||||
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
|
||||
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
|
||||
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
|
||||
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
|
||||
golang.org/x/tools v0.39.0 h1:ik4ho21kwuQln40uelmciQPp9SipgNDdrafrYA4TmQQ=
|
||||
golang.org/x/tools v0.39.0/go.mod h1:JnefbkDPyD8UU2kI5fuf8ZX4/yUeh9W877ZeBONxUqQ=
|
||||
golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k=
|
||||
golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a h1:nwKuGPlUAt+aR+pcrkfFRrTU1BVrSmYyYMxYbUIVHr0=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a/go.mod h1:3kWAYMk1I75K4vykHtKt2ycnOgpA6974V7bREqbsenU=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ=
|
||||
google.golang.org/grpc v1.72.2 h1:TdbGzwb82ty4OusHWepvFWGLgIbNo1/SUynEN0ssqv8=
|
||||
google.golang.org/grpc v1.72.2/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
|
||||
google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw=
|
||||
google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 h1:fCvbg86sFXwdrl5LgVcTEvNC+2txB5mgROGmRL5mrls=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:+rXWjjaukWZun3mLfjmVnQi18E1AsFbDN9QdJ5YXLto=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 h1:gRkg/vSppuSQoDjxyiGfN4Upv/h/DQmIR10ZU8dh4Ww=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
|
||||
google.golang.org/grpc v1.79.3 h1:sybAEdRIEtvcD68Gx7dmnwjZKlyfuc61Dyo9pGXXkKE=
|
||||
google.golang.org/grpc v1.79.3/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ=
|
||||
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
|
||||
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
|
||||
@@ -49,6 +49,7 @@ func (c *Config) Validate() error {
|
||||
type Client interface {
|
||||
GetMessage(ctx context.Context, lastMessageID, maxCapacity int) (*scaleset.RunnerScaleSetMessage, error)
|
||||
DeleteMessage(ctx context.Context, messageID int) error
|
||||
AcquireJobs(ctx context.Context, requestIDs []int64) ([]int64, error)
|
||||
Session() scaleset.RunnerScaleSetSession
|
||||
}
|
||||
|
||||
@@ -210,6 +211,12 @@ func (l *Listener) handleMessage(ctx context.Context, handler Scaler, msg *scale
|
||||
return fmt.Errorf("failed to delete message: %w", err)
|
||||
}
|
||||
|
||||
if len(msg.JobAvailableMessages) > 0 {
|
||||
if err := l.acquireAvailableJobs(ctx, msg.JobAvailableMessages); err != nil {
|
||||
return fmt.Errorf("failed to acquire available jobs: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, jobStarted := range msg.JobStartedMessages {
|
||||
l.metricsRecorder.RecordJobStarted(jobStarted)
|
||||
if err := handler.HandleJobStarted(ctx, jobStarted); err != nil {
|
||||
@@ -232,6 +239,23 @@ func (l *Listener) handleMessage(ctx context.Context, handler Scaler, msg *scale
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *Listener) acquireAvailableJobs(ctx context.Context, jobsAvailable []*scaleset.JobAvailable) error {
|
||||
ids := make([]int64, 0, len(jobsAvailable))
|
||||
for _, job := range jobsAvailable {
|
||||
ids = append(ids, job.RunnerRequestID)
|
||||
}
|
||||
|
||||
l.logger.Info("Acquiring jobs", slog.Int("count", len(ids)))
|
||||
|
||||
acquired, err := l.client.AcquireJobs(ctx, ids)
|
||||
if err != nil {
|
||||
return fmt.Errorf("acquiring jobs: %w", err)
|
||||
}
|
||||
|
||||
l.logger.Info("Jobs acquired", slog.Int("count", len(acquired)))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *Listener) handleStatistics(ctx context.Context, msg *scaleset.RunnerScaleSetStatistic) {
|
||||
l.latestStatistics = msg
|
||||
l.metricsRecorder.RecordStatistics(msg)
|
||||
|
||||
@@ -38,6 +38,74 @@ func (_m *MockClient) EXPECT() *MockClient_Expecter {
|
||||
return &MockClient_Expecter{mock: &_m.Mock}
|
||||
}
|
||||
|
||||
// AcquireJobs provides a mock function for the type MockClient
|
||||
func (_mock *MockClient) AcquireJobs(ctx context.Context, requestIDs []int64) ([]int64, error) {
|
||||
ret := _mock.Called(ctx, requestIDs)
|
||||
|
||||
if len(ret) == 0 {
|
||||
panic("no return value specified for AcquireJobs")
|
||||
}
|
||||
|
||||
var r0 []int64
|
||||
var r1 error
|
||||
if returnFunc, ok := ret.Get(0).(func(context.Context, []int64) ([]int64, error)); ok {
|
||||
return returnFunc(ctx, requestIDs)
|
||||
}
|
||||
if returnFunc, ok := ret.Get(0).(func(context.Context, []int64) []int64); ok {
|
||||
r0 = returnFunc(ctx, requestIDs)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).([]int64)
|
||||
}
|
||||
}
|
||||
if returnFunc, ok := ret.Get(1).(func(context.Context, []int64) error); ok {
|
||||
r1 = returnFunc(ctx, requestIDs)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// MockClient_AcquireJobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AcquireJobs'
|
||||
type MockClient_AcquireJobs_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// AcquireJobs is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - requestIDs []int64
|
||||
func (_e *MockClient_Expecter) AcquireJobs(ctx interface{}, requestIDs interface{}) *MockClient_AcquireJobs_Call {
|
||||
return &MockClient_AcquireJobs_Call{Call: _e.mock.On("AcquireJobs", ctx, requestIDs)}
|
||||
}
|
||||
|
||||
func (_c *MockClient_AcquireJobs_Call) Run(run func(ctx context.Context, requestIDs []int64)) *MockClient_AcquireJobs_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
var arg0 context.Context
|
||||
if args[0] != nil {
|
||||
arg0 = args[0].(context.Context)
|
||||
}
|
||||
var arg1 []int64
|
||||
if args[1] != nil {
|
||||
arg1 = args[1].([]int64)
|
||||
}
|
||||
run(
|
||||
arg0,
|
||||
arg1,
|
||||
)
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockClient_AcquireJobs_Call) Return(int64s []int64, err error) *MockClient_AcquireJobs_Call {
|
||||
_c.Call.Return(int64s, err)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockClient_AcquireJobs_Call) RunAndReturn(run func(ctx context.Context, requestIDs []int64) ([]int64, error)) *MockClient_AcquireJobs_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// DeleteMessage provides a mock function for the type MockClient
|
||||
func (_mock *MockClient) DeleteMessage(ctx context.Context, messageID int) error {
|
||||
ret := _mock.Called(ctx, messageID)
|
||||
|
||||
@@ -234,6 +234,67 @@ func (c *MessageSessionClient) Session() RunnerScaleSetSession {
|
||||
return *c.session
|
||||
}
|
||||
|
||||
// AcquireJobs acquires the given job request IDs from the runner scale set.
|
||||
// If the current session token is expired, it refreshes the session and tries one more time.
|
||||
func (c *MessageSessionClient) AcquireJobs(ctx context.Context, requestIDs []int64) ([]int64, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
ids, err := c.acquireJobs(ctx, requestIDs)
|
||||
if err == nil {
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
if !errors.Is(err, MessageQueueTokenExpiredError) {
|
||||
return nil, fmt.Errorf("failed to acquire jobs: %w", err)
|
||||
}
|
||||
|
||||
if err := c.refreshMessageSession(ctx); err != nil {
|
||||
return nil, fmt.Errorf("failed to refresh message session: %w", err)
|
||||
}
|
||||
|
||||
return c.acquireJobs(ctx, requestIDs)
|
||||
}
|
||||
|
||||
func (c *MessageSessionClient) acquireJobs(ctx context.Context, requestIDs []int64) ([]int64, error) {
|
||||
body, err := json.Marshal(requestIDs)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal request ids: %w", err)
|
||||
}
|
||||
|
||||
path := fmt.Sprintf("/%s/%d/acquirejobs", scaleSetEndpoint, c.scaleSetID)
|
||||
|
||||
c.innerClient.mu.Lock()
|
||||
req, err := c.innerClient.newActionsServiceRequest(ctx, http.MethodPost, path, bytes.NewBuffer(body))
|
||||
c.innerClient.mu.Unlock()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create acquire jobs request: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.session.MessageQueueAccessToken))
|
||||
|
||||
resp, err := c.commonClient.do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to issue acquire jobs request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == http.StatusUnauthorized {
|
||||
return nil, newRequestResponseError(req, resp, MessageQueueTokenExpiredError)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, newRequestResponseError(req, resp, fmt.Errorf("unexpected status code %s", resp.Status))
|
||||
}
|
||||
|
||||
var result acquireJobsResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return nil, newRequestResponseError(req, resp, fmt.Errorf("failed to decode acquire jobs response: %w", err))
|
||||
}
|
||||
|
||||
return result.Value, nil
|
||||
}
|
||||
|
||||
func (c *MessageSessionClient) doSessionRequest(ctx context.Context, method, path string, requestData io.Reader, expectedResponseStatusCode int, responseUnmarshalTarget any) error {
|
||||
c.innerClient.mu.Lock()
|
||||
defer c.innerClient.mu.Unlock()
|
||||
|
||||
@@ -649,3 +649,231 @@ func TestDeleteMessage(t *testing.T) {
|
||||
assert.Contains(t, err.Error(), "unexpected status code")
|
||||
})
|
||||
}
|
||||
|
||||
func TestAcquireJobs(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
auth := actionsAuth{
|
||||
token: "token",
|
||||
}
|
||||
|
||||
t.Run("Acquire jobs successfully", func(t *testing.T) {
|
||||
requestIDs := []int64{1, 2}
|
||||
want := []int64{1, 2}
|
||||
response := []byte(`{"count":2,"value":[1,2]}`)
|
||||
|
||||
var handleSessionRequest http.HandlerFunc
|
||||
server := newActionsServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.Contains(r.URL.Path, "acquirejobs") {
|
||||
assert.Equal(t, http.MethodPost, r.Method)
|
||||
assert.Contains(t, r.URL.Path, "acquirejobs")
|
||||
assert.True(t, strings.HasPrefix(r.Header.Get("Authorization"), "Bearer"), "expected Bearer authorization header")
|
||||
|
||||
var gotIDs []int64
|
||||
err := json.NewDecoder(r.Body).Decode(&gotIDs)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, requestIDs, gotIDs)
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(response)
|
||||
return
|
||||
}
|
||||
if strings.HasSuffix(r.URL.Path, "sessions") {
|
||||
handleSessionRequest(w, r)
|
||||
return
|
||||
}
|
||||
if strings.Contains(r.URL.Path, "/sessions/") {
|
||||
handleSessionRequest(w, r)
|
||||
return
|
||||
}
|
||||
}))
|
||||
handleSessionRequest = newTestSessionRequestHandler(t, server.testRunnerScaleSetSession())
|
||||
|
||||
client, err := newClient(
|
||||
testSystemInfo,
|
||||
server.configURLForOrg("my-org"),
|
||||
auth,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
sessionClient, err := client.MessageSessionClient(ctx, 1, "my-org")
|
||||
require.NoError(t, err)
|
||||
|
||||
got, err := sessionClient.AcquireJobs(ctx, requestIDs)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, want, got)
|
||||
})
|
||||
|
||||
t.Run("Message token expired", func(t *testing.T) {
|
||||
var handleSessionRequest http.HandlerFunc
|
||||
server := newActionsServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.Contains(r.URL.Path, "acquirejobs") {
|
||||
w.WriteHeader(http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
// create session
|
||||
if strings.HasSuffix(r.URL.Path, "sessions") {
|
||||
handleSessionRequest(w, r)
|
||||
return
|
||||
}
|
||||
// refresh
|
||||
if strings.Contains(r.URL.Path, "/sessions/") {
|
||||
handleSessionRequest(w, r)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusUnauthorized)
|
||||
}))
|
||||
handleSessionRequest = newTestSessionRequestHandler(t, server.testRunnerScaleSetSession())
|
||||
|
||||
client, err := newClient(
|
||||
testSystemInfo,
|
||||
server.configURLForOrg("my-org"),
|
||||
auth,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
sessionClient, err := client.MessageSessionClient(ctx, 1, "my-org")
|
||||
require.NoError(t, err)
|
||||
|
||||
got, err := sessionClient.AcquireJobs(ctx, []int64{1})
|
||||
assert.Nil(t, got)
|
||||
assert.ErrorIs(t, err, MessageQueueTokenExpiredError, "expected error to be MessageQueueTokenExpiredError but got: %v", err)
|
||||
})
|
||||
|
||||
t.Run("Message token refreshed", func(t *testing.T) {
|
||||
want := []int64{1, 2}
|
||||
afterRefreshResponse := []byte(`{"count":2,"value":[1,2]}`)
|
||||
|
||||
var handleSessionRequest http.HandlerFunc
|
||||
type state int
|
||||
const (
|
||||
createSession state = iota
|
||||
firstAcquire
|
||||
refreshToken
|
||||
secondAcquire
|
||||
)
|
||||
currentState := createSession
|
||||
server := newActionsServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.Contains(r.URL.Path, "acquirejobs") {
|
||||
if currentState == firstAcquire {
|
||||
w.WriteHeader(http.StatusUnauthorized)
|
||||
currentState = refreshToken
|
||||
return
|
||||
}
|
||||
require.Equal(t, secondAcquire, currentState)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write(afterRefreshResponse)
|
||||
return
|
||||
}
|
||||
// create session
|
||||
if strings.HasSuffix(r.URL.Path, "sessions") {
|
||||
require.Equal(t, createSession, currentState)
|
||||
handleSessionRequest(w, r)
|
||||
currentState = firstAcquire
|
||||
return
|
||||
}
|
||||
// refresh
|
||||
if strings.Contains(r.URL.Path, "/sessions/") {
|
||||
require.Equal(t, refreshToken, currentState)
|
||||
handleSessionRequest(w, r)
|
||||
currentState = secondAcquire
|
||||
return
|
||||
}
|
||||
}))
|
||||
handleSessionRequest = newTestSessionRequestHandler(t, server.testRunnerScaleSetSession())
|
||||
|
||||
client, err := newClient(
|
||||
testSystemInfo,
|
||||
server.configURLForOrg("my-org"),
|
||||
auth,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
sessionClient, err := client.MessageSessionClient(ctx, 1, "my-org")
|
||||
require.NoError(t, err)
|
||||
|
||||
got, err := sessionClient.AcquireJobs(ctx, []int64{1, 2})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, want, got)
|
||||
})
|
||||
|
||||
t.Run("Server error", func(t *testing.T) {
|
||||
var handleSessionRequest http.HandlerFunc
|
||||
server := newActionsServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.Contains(r.URL.Path, "acquirejobs") {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if strings.HasSuffix(r.URL.Path, "sessions") {
|
||||
handleSessionRequest(w, r)
|
||||
return
|
||||
}
|
||||
if strings.Contains(r.URL.Path, "/sessions/") {
|
||||
handleSessionRequest(w, r)
|
||||
return
|
||||
}
|
||||
}))
|
||||
handleSessionRequest = newTestSessionRequestHandler(t, server.testRunnerScaleSetSession())
|
||||
|
||||
retryMax := 1
|
||||
client, err := newClient(
|
||||
testSystemInfo,
|
||||
server.configURLForOrg("my-org"),
|
||||
auth,
|
||||
WithRetryMax(retryMax),
|
||||
WithRetryWaitMax(1*time.Nanosecond),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
sessionClient, err := client.MessageSessionClient(
|
||||
ctx,
|
||||
1,
|
||||
"my-org",
|
||||
WithRetryMax(retryMax),
|
||||
WithRetryWaitMax(1*time.Nanosecond),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
got, err := sessionClient.AcquireJobs(ctx, []int64{1})
|
||||
assert.Nil(t, got)
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
|
||||
t.Run("Empty request IDs", func(t *testing.T) {
|
||||
response := []byte(`{"count":0,"value":[]}`)
|
||||
|
||||
var handleSessionRequest http.HandlerFunc
|
||||
server := newActionsServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.Contains(r.URL.Path, "acquirejobs") {
|
||||
assert.Equal(t, http.MethodPost, r.Method)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(response)
|
||||
return
|
||||
}
|
||||
if strings.HasSuffix(r.URL.Path, "sessions") {
|
||||
handleSessionRequest(w, r)
|
||||
return
|
||||
}
|
||||
if strings.Contains(r.URL.Path, "/sessions/") {
|
||||
handleSessionRequest(w, r)
|
||||
return
|
||||
}
|
||||
}))
|
||||
handleSessionRequest = newTestSessionRequestHandler(t, server.testRunnerScaleSetSession())
|
||||
|
||||
client, err := newClient(
|
||||
testSystemInfo,
|
||||
server.configURLForOrg("my-org"),
|
||||
auth,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
sessionClient, err := client.MessageSessionClient(ctx, 1, "my-org")
|
||||
require.NoError(t, err)
|
||||
|
||||
got, err := sessionClient.AcquireJobs(ctx, []int64{})
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, got)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -12,11 +12,17 @@ type MessageType string
|
||||
|
||||
// message types
|
||||
const (
|
||||
MessageTypeJobAvailable MessageType = "JobAvailable"
|
||||
MessageTypeJobAssigned MessageType = "JobAssigned"
|
||||
MessageTypeJobStarted MessageType = "JobStarted"
|
||||
MessageTypeJobCompleted MessageType = "JobCompleted"
|
||||
)
|
||||
|
||||
type JobAvailable struct {
|
||||
AcquireJobURL string `json:"acquireJobUrl"`
|
||||
JobMessageBase
|
||||
}
|
||||
|
||||
type JobAssigned struct {
|
||||
JobMessageBase
|
||||
}
|
||||
@@ -99,6 +105,7 @@ type runnerScaleSetMessageResponse struct {
|
||||
type RunnerScaleSetMessage struct {
|
||||
MessageID int
|
||||
Statistics *RunnerScaleSetStatistic
|
||||
JobAvailableMessages []*JobAvailable
|
||||
JobAssignedMessages []*JobAssigned
|
||||
JobStartedMessages []*JobStarted
|
||||
JobCompletedMessages []*JobCompleted
|
||||
@@ -109,6 +116,11 @@ type runnerScaleSetsResponse struct {
|
||||
RunnerScaleSets []RunnerScaleSet `json:"value"`
|
||||
}
|
||||
|
||||
type acquireJobsResponse struct {
|
||||
Count int `json:"count"`
|
||||
Value []int64 `json:"value"`
|
||||
}
|
||||
|
||||
type RunnerScaleSetSession struct {
|
||||
SessionID uuid.UUID `json:"sessionId,omitempty"`
|
||||
OwnerName string `json:"ownerName,omitempty"`
|
||||
|
||||
Reference in New Issue
Block a user