Skip to content

Commit

Permalink
Use clock interface
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed Jul 30, 2024
1 parent c71cdb3 commit dd0109f
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func NewControllers(
) []controller.Controller {

cluster := state.NewCluster(clock, kubeClient)
p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster)
p := provisioning.NewProvisioner(clock, kubeClient, recorder, cloudProvider, cluster)
evictionQueue := terminator.NewQueue(kubeClient, recorder)
disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p)

Expand Down
17 changes: 10 additions & 7 deletions pkg/controllers/provisioning/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/clock"

"sigs.k8s.io/karpenter/pkg/operator/options"
)
Expand All @@ -31,15 +32,17 @@ import (
// maximum batch duration.
type Batcher[T comparable] struct {
trigger chan struct{}
clk clock.Clock

mu sync.RWMutex
triggeredOnElems sets.Set[T]
}

// NewBatcher is a constructor for the Batcher
func NewBatcher[T comparable]() *Batcher[T] {
func NewBatcher[T comparable](clk clock.Clock) *Batcher[T] {
return &Batcher[T]{
trigger: make(chan struct{}, 1),
clk: clk,
triggeredOnElems: sets.New[T](),
}
}
Expand Down Expand Up @@ -76,23 +79,23 @@ func (b *Batcher[T]) Wait(ctx context.Context) bool {
select {
case <-b.trigger:
// start the batching window after the first item is received
case <-time.After(1 * time.Second):
case <-b.clk.After(1 * time.Second):
// If no pods, bail to the outer controller framework to refresh the context
return false
}
timeout := time.NewTimer(options.FromContext(ctx).BatchMaxDuration)
idle := time.NewTimer(options.FromContext(ctx).BatchIdleDuration)
timeout := b.clk.NewTimer(options.FromContext(ctx).BatchMaxDuration)
idle := b.clk.NewTimer(options.FromContext(ctx).BatchIdleDuration)
for {
select {
case <-b.trigger:
// correct way to reset an active timer per docs
if !idle.Stop() {
<-idle.C
<-idle.C()
}
idle.Reset(options.FromContext(ctx).BatchIdleDuration)
case <-timeout.C:
case <-timeout.C():
return true
case <-idle.C:
case <-idle.C():
return true
}
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/awslabs/operatorpkg/option"
"github.com/awslabs/operatorpkg/status"
"k8s.io/utils/clock"

"github.com/awslabs/operatorpkg/singleton"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -85,11 +86,9 @@ type Provisioner struct {
cm *pretty.ChangeMonitor
}

func NewProvisioner(kubeClient client.Client, recorder events.Recorder,
cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster,
) *Provisioner {
func NewProvisioner(clk clock.Clock, kubeClient client.Client, recorder events.Recorder, cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster) *Provisioner {
p := &Provisioner{
batcher: NewBatcher[types.UID](),
batcher: NewBatcher[types.UID](clk),
cloudProvider: cloudProvider,
kubeClient: kubeClient,
volumeTopology: scheduler.NewVolumeTopology(kubeClient),
Expand Down

0 comments on commit dd0109f

Please sign in to comment.