Files
scaleset/examples/dockerscaleset/scaler.go
T
Francesco Renzi e4a017ce06 Initial commit for open source release 🚀
Co-authored-by: Francesco Renzi <rentziass@github.com>
Co-authored-by: Nikola Jokic <jokicnikola07@gmail.com>
2026-02-03 16:41:15 +01:00

196 lines
5.3 KiB
Go

package main
import (
"context"
"fmt"
"log/slog"
"sync"
"github.com/actions/scaleset"
"github.com/actions/scaleset/listener"
"github.com/docker/docker/api/types/container"
dockerclient "github.com/docker/docker/client"
"github.com/google/uuid"
)
type Scaler struct {
runners runnerState
runnerImage string
scaleSetID int
dockerClient *dockerclient.Client
scalesetClient *scaleset.Client
minRunners int
maxRunners int
logger *slog.Logger
}
func (a *Scaler) HandleDesiredRunnerCount(ctx context.Context, count int) (int, error) {
currentCount := a.runners.count()
targetRunnerCount := min(a.maxRunners, a.minRunners+count)
switch {
case targetRunnerCount == currentCount:
// No scaling needed
return currentCount, nil
case targetRunnerCount > currentCount:
// Scale up
scaleUp := targetRunnerCount - currentCount
a.logger.Info(
"Scaling up runners",
slog.Int("currentCount", currentCount),
slog.Int("desiredCount", targetRunnerCount),
slog.Int("scaleUp", scaleUp),
)
for range scaleUp {
if _, err := a.startRunner(ctx); err != nil {
return 0, fmt.Errorf("failed to start runner: %w", err)
}
}
return a.runners.count(), nil
default:
// No need to handle scale down events, since:
// 1. JobCompleted events will first remove runners
// 2. If the count is still below the current runner count, the JobCompleted event will be delivered in the next batch.
// 3. Removal after JobCompleted events is handled synchronously.
// 4. If the job is cancelled, the JobCompleted event will still be delivered.
}
return a.runners.count(), nil
}
func (a *Scaler) HandleJobStarted(ctx context.Context, jobInfo *scaleset.JobStarted) error {
a.logger.Info(
"Job started",
slog.Int64("runnerRequestId", jobInfo.RunnerRequestID),
slog.String("jobId", jobInfo.JobID),
)
a.runners.markBusy(jobInfo.RunnerName)
return nil
}
func (a *Scaler) HandleJobCompleted(ctx context.Context, jobInfo *scaleset.JobCompleted) error {
a.logger.Info("Job completed", slog.Int64("runnerRequestId", jobInfo.RunnerRequestID), slog.String("jobId", jobInfo.JobID))
containerID := a.runners.markDone(jobInfo.RunnerName)
if err := a.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{Force: true}); err != nil {
return fmt.Errorf("failed to remove runner container: %w", err)
}
return nil
}
func (a *Scaler) startRunner(ctx context.Context) (string, error) {
name := fmt.Sprintf("runner-%s", uuid.NewString()[:8])
jit, err := a.scalesetClient.GenerateJitRunnerConfig(
ctx,
&scaleset.RunnerScaleSetJitRunnerSetting{
Name: name,
},
a.scaleSetID,
)
if err != nil {
return "", fmt.Errorf("failed to generate JIT config: %w", err)
}
c, err := a.dockerClient.ContainerCreate(
ctx,
&container.Config{
Image: a.runnerImage,
User: "runner",
Cmd: []string{"/home/runner/run.sh"},
Env: []string{
fmt.Sprintf("ACTIONS_RUNNER_INPUT_JITCONFIG=%s", jit.EncodedJITConfig),
},
},
nil,
nil, nil,
name,
)
if err != nil {
return "", fmt.Errorf("failed to create runner container: %w", err)
}
if err := a.dockerClient.ContainerStart(ctx, c.ID, container.StartOptions{}); err != nil {
return "", fmt.Errorf("failed to start runner container: %w", err)
}
a.runners.addIdle(name, c.ID)
return name, nil
}
func (a *Scaler) shutdown(ctx context.Context) {
a.logger.Info("Shutting down runners")
a.runners.mu.Lock()
defer a.runners.mu.Unlock()
for name, containerID := range a.runners.idle {
a.logger.Info("Removing idle runner", slog.String("name", name), slog.String("containerID", containerID))
if err := a.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{Force: true}); err != nil {
a.logger.Error("Failed to remove idle runner container", slog.String("name", name), slog.String("containerID", containerID), slog.String("error", err.Error()))
}
}
clear(a.runners.idle)
for name, containerID := range a.runners.busy {
a.logger.Info("Removing busy runner", slog.String("name", name), slog.String("containerID", containerID))
if err := a.dockerClient.ContainerRemove(ctx, containerID, container.RemoveOptions{Force: true}); err != nil {
a.logger.Error("Failed to remove busy runner container", slog.String("name", name), slog.String("containerID", containerID), slog.String("error", err.Error()))
}
}
clear(a.runners.busy)
}
var _ listener.Scaler = (*Scaler)(nil)
type runnerState struct {
mu sync.Mutex
idle map[string]string
busy map[string]string
}
func (r *runnerState) count() int {
r.mu.Lock()
count := len(r.idle) + len(r.busy)
r.mu.Unlock()
return count
}
func (r *runnerState) markBusy(name string) {
r.mu.Lock()
defer r.mu.Unlock()
state, ok := r.idle[name]
if !ok {
panic("marking non-existent runner busy")
}
delete(r.idle, name)
r.busy[name] = state
}
func (r *runnerState) markDone(name string) string {
r.mu.Lock()
defer r.mu.Unlock()
return r.markDoneUnlocked(name)
}
func (r *runnerState) markDoneUnlocked(name string) string {
containerID, ok := r.busy[name]
if ok {
delete(r.busy, name)
return containerID
}
containerID, ok = r.idle[name]
if ok {
delete(r.idle, name)
return containerID
}
panic("marking non-existent runner done")
}
func (r *runnerState) addIdle(name, containerID string) {
r.mu.Lock()
r.idle[name] = containerID
r.mu.Unlock()
}