Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (alloc *Action) allocateResources(queues *util.PriorityQueue, jobsMap map[a
continue
}

klog.V(3).Infof("Try to allocate resource to %d tasks of Job <%v/%v>",
klog.V(4).Infof("Try to allocate resource to %d tasks of Job <%v/%v>",
tasks.Len(), job.Namespace, job.Name)

hardMode, highestAllowedTier := job.IsHardTopologyMode()
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (pmpt *Action) normalPreempt(
preemptee := victimsQueue.Pop().(*api.TaskInfo)
klog.V(3).Infof("Try to preempt Task <%s/%s> for Task <%s/%s>",
preemptee.Namespace, preemptee.Name, preemptor.Namespace, preemptor.Name)
if err := stmt.Evict(preemptee, "preempt"); err != nil {
if err := stmt.Evict(preemptee, "preempt for task "+preemptor.Name); err != nil {
klog.Errorf("Failed to preempt Task <%s/%s> for Task <%s/%s>: %v",
preemptee.Namespace, preemptee.Name, preemptor.Namespace, preemptor.Name, err)
continue
Expand All @@ -373,7 +373,7 @@ func (pmpt *Action) normalPreempt(
}

metrics.RegisterPreemptionAttempts()
klog.V(3).Infof("Preempted <%v> for Task <%s/%s> requested <%v>.",
klog.V(3).Infof("Try to preempt <%v> for Task <%s/%s> requested <%v>.",
preempted, preemptor.Namespace, preemptor.Name, preemptor.InitResreq)

// If preemptor's queue is not allocatable, it means preemptor cannot be allocated. So no need care about the node idle resource
Expand Down Expand Up @@ -501,7 +501,7 @@ func prepareCandidate(c *candidate, pod *v1.Pod, stmt *framework.Statement, ssn
for _, victim := range c.Victims() {
klog.V(3).Infof("Try to preempt Task <%s/%s> for Task <%s/%s>",
victim.Namespace, victim.Name, pod.Namespace, pod.Name)
if err := stmt.Evict(victim, "preempt"); err != nil {
if err := stmt.Evict(victim, "preempt for task "+pod.Name); err != nil {
klog.Errorf("Failed to preempt Task <%s/%s> for Task <%s/%s>: %v",
victim.Namespace, victim.Name, pod.Namespace, pod.Name, err)
return api.AsStatus(err)
Expand Down
8 changes: 4 additions & 4 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (ra *Action) Execute(ssn *framework.Session) {

queue := queues.Pop().(*api.QueueInfo)
if ssn.Overused(queue) {
klog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
klog.V(3).Infof("Queue <%s> is overused <%v>, ignore it.", queue.Name, queue.Queue.Status.Allocated)
continue
}

Expand Down Expand Up @@ -177,7 +177,7 @@ func (ra *Action) Execute(ssn *framework.Session) {
}

if len(reclaimees) == 0 {
klog.V(4).Infof("No reclaimees on Node <%s>.", n.Name)
klog.V(3).Infof("No reclaimees on Node <%s>.", n.Name)
continue
}

Expand All @@ -191,14 +191,14 @@ func (ra *Action) Execute(ssn *framework.Session) {
victimsQueue := ssn.BuildVictimsPriorityQueue(victims, task)

resreq := task.InitResreq.Clone()
reclaimed := api.EmptyResource()
reclaimed := n.FutureIdle()

// Reclaim victims for tasks.
for !victimsQueue.Empty() {
reclaimee := victimsQueue.Pop().(*api.TaskInfo)
klog.Errorf("Try to reclaim Task <%s/%s> for Tasks <%s/%s>",
reclaimee.Namespace, reclaimee.Name, task.Namespace, task.Name)
if err := ssn.Evict(reclaimee, "reclaim"); err != nil {
if err := ssn.Evict(reclaimee, "reclaim for task "+task.Name); err != nil {
klog.Errorf("Failed to reclaim Task <%s/%s> for Tasks <%s/%s>: %v",
reclaimee.Namespace, reclaimee.Name, task.Namespace, task.Name, err)
continue
Expand Down
12 changes: 11 additions & 1 deletion pkg/scheduler/api/resource_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,13 +415,19 @@ func (r *Resource) LessEqual(rr *Resource, defaultValue DimensionDefaultValue) b

if defaultValue == Infinity {
for name := range rr.ScalarResources {
if IgnoreScalarResource(name) {
continue
}
if _, ok := r.ScalarResources[name]; !ok {
return false
}
}
}

for resourceName, leftValue := range r.ScalarResources {
if IgnoreScalarResource(resourceName) {
continue
}
rightValue, ok := rr.ScalarResources[resourceName]
if !ok && defaultValue == Infinity {
continue
Expand Down Expand Up @@ -461,7 +467,7 @@ func (r *Resource) LessEqualWithDimension(rr *Resource, req *Resource) bool {
}

for name, quant := range req.ScalarResources {
if IsIgnoredScalarResource(name) {
if IgnoreScalarResource(name) {
continue
}
rQuant := r.ScalarResources[name]
Expand Down Expand Up @@ -790,3 +796,7 @@ func ExceededPart(left, right *Resource) *Resource {
diff, _ := left.Diff(right, Zero)
return diff
}

func IgnoreScalarResource(name v1.ResourceName) bool {
return name == "attachable-volumes-csi-fsx.csi.aws.com" || name == "efa.poolsi.de/infiniband" || name == "vpc.amazonaws.com/efa" || ignoredScalarResources.Has(string(name))
}
4 changes: 2 additions & 2 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,9 +906,9 @@ func (sc *SchedulerCache) Bind(ctx context.Context, bindContexts []*BindContext)
tmp := time.Now()
errMsg := sc.Binder.Bind(sc.kubeClient, readyToBindTasks)
if len(errMsg) == 0 {
klog.V(3).Infof("bind ok, latency %v", time.Since(tmp))
klog.V(4).Infof("bind ok, latency %v", time.Since(tmp))
} else {
klog.V(3).Infof("There are %d tasks in total and %d binds failed, latency %v", len(readyToBindTasks), len(errMsg), time.Since(tmp))
klog.V(4).Infof("There are %d tasks in total and %d binds failed, latency %v", len(readyToBindTasks), len(errMsg), time.Since(tmp))
}

for _, bindContext := range bindContexts {
Expand Down
6 changes: 6 additions & 0 deletions pkg/scheduler/framework/session_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package framework
import (
"context"

"k8s.io/klog/v2"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"

"volcano.sh/apis/pkg/apis/scheduling"
Expand Down Expand Up @@ -204,6 +205,11 @@ func (ssn *Session) Reclaimable(reclaimer *api.TaskInfo, reclaimees []*api.TaskI
victims = nil
break
}
victimNames := []string{}
for _, victim := range victims {
victimNames = append(victimNames, victim.Name)
}
klog.V(3).Infof("Victims from plugin %s, victims=%+v reclaimer=%s", plugin.Name, victimNames, reclaimer.Name)
// first iteration - initialize victims list
if victims == nil {
victims = candidates
Expand Down
5 changes: 3 additions & 2 deletions pkg/scheduler/plugins/capacity/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,11 @@ func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) {
if allocated.LessEqual(attr.deserved, api.Infinity) || !attr.guarantee.LessEqual(exceptReclaimee, api.Zero) {
continue
}
klog.V(3).Infof("reclaimee %s/%s(%+v) becomes victim after comparison. allocated=%+v, deserved=%+v, exceptReclaimee=%+v", job.Queue, reclaimee.Name, reclaimee.Resreq, allocated, attr.deserved, exceptReclaimee)
allocated.Sub(reclaimee.Resreq)
victims = append(victims, reclaimee)
}
klog.V(4).Infof("Victims from capacity plugin, victims=%+v reclaimer=%s", victims, reclaimer)

return victims, util.Permit
})

Expand Down Expand Up @@ -419,7 +420,7 @@ func (cp *capacityPlugin) buildQueueAttrs(ssn *framework.Session) {
attr.realCapability = realCapability
}
cp.queueOpts[job.Queue] = attr
klog.V(4).Infof("Added Queue <%s> attributes.", job.Queue)
klog.V(3).Infof("Added Queue <%s> attributes <%v>.", job.Queue, attr)
}

attr := cp.queueOpts[job.Queue]
Expand Down
Loading