Skip to content
1 change: 1 addition & 0 deletions slice/api/v1alpha1/slice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type SliceStatus struct {

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:resource:scope=Cluster
// +kubebuilder:printcolumn:name="Type",type=string,JSONPath=`.spec.type`
// +kubebuilder:printcolumn:name="Topology",type=string,JSONPath=`.spec.topology`
// +kubebuilder:printcolumn:name="State",type=string,JSONPath=`.status.conditions[?(@.type=="Ready")].reason`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ spec:
listKind: SliceList
plural: slices
singular: slice
scope: Namespaced
scope: Cluster
versions:
- additionalPrinterColumns:
- jsonPath: .spec.type
Expand Down
32 changes: 29 additions & 3 deletions slice/internal/controller/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,50 @@ import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"

"tpu-slice-controller/api/v1alpha1"
"tpu-slice-controller/internal/core"
"tpu-slice-controller/internal/util/slices"
)

const (
OwnerReferenceUID = "metadata.ownerReferences.uid"
OwnerReferenceUID = "metadata.ownerReferences.uid"
WorkloadNamespaceIndex = "workload.namespace"
WorkloadNameIndex = "workload.name"
)

func indexOwnerReferenceUID(obj client.Object) []string {
return slices.Map(obj.GetOwnerReferences(), func(o *metav1.OwnerReference) string { return string(o.UID) })
}

func indexSliceByWorkloadNamespace(obj client.Object) []string {
if slice, ok := obj.(*v1alpha1.Slice); ok {
if ns, found := slice.GetAnnotations()[core.OwnerWorkloadNamespaceAnnotation]; found {
return []string{ns}
}
}
return nil
}

func indexSliceByWorkloadName(obj client.Object) []string {
if slice, ok := obj.(*v1alpha1.Slice); ok {
if name, found := slice.GetAnnotations()[core.OwnerWorkloadNameAnnotation]; found {
return []string{name}
}
}
return nil
}

// SetupIndexer configures the indexer to index specific fields for kueue.Workload and v1alpha1.Slice resources.
func SetupIndexer(ctx context.Context, indexer client.FieldIndexer) error {
if err := indexer.IndexField(ctx, &kueue.Workload{}, OwnerReferenceUID, indexOwnerReferenceUID); err != nil {
return fmt.Errorf("setting index on ownerReferences.uid for Workload: %w", err)
}
if err := indexer.IndexField(ctx, &v1alpha1.Slice{}, OwnerReferenceUID, indexOwnerReferenceUID); err != nil {
return fmt.Errorf("setting index on ownerReferences.uid for Slice: %w", err)
// Since Slice is now cluster-scoped, it cannot have a controller owner reference to a namespaced Workload.
// We use annotations for linking Slices to Workloads.
if err := indexer.IndexField(ctx, &v1alpha1.Slice{}, WorkloadNamespaceIndex, indexSliceByWorkloadNamespace); err != nil {
return fmt.Errorf("setting index on workload namespace for Slice: %w", err)
}
if err := indexer.IndexField(ctx, &v1alpha1.Slice{}, WorkloadNameIndex, indexSliceByWorkloadName); err != nil {
return fmt.Errorf("setting index on workload name for Slice: %w", err)
}
return nil
}
33 changes: 18 additions & 15 deletions slice/internal/controller/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,10 @@ func (r *WorkloadReconciler) cleanupSlices(ctx context.Context, wl *kueue.Worklo
func (r *WorkloadReconciler) findWorkloadSlices(ctx context.Context, wl *kueue.Workload) ([]v1alpha1.Slice, error) {
slices := &v1alpha1.SliceList{}
opts := []client.ListOption{
client.InNamespace(wl.Namespace),
client.MatchingFields{OwnerReferenceUID: string(wl.UID)},
client.MatchingFields{
WorkloadNamespaceIndex: wl.Namespace,
WorkloadNameIndex: wl.Name,
},
}
if err := r.client.List(ctx, slices, opts...); err != nil {
return nil, err
Expand Down Expand Up @@ -463,7 +465,7 @@ func (r *WorkloadReconciler) syncSlices(
continue
}

sliceName := core.SliceName(wl.Name, psa.Name)
sliceName := core.SliceName(wl.Namespace, wl.Name, psa.Name)

if _, exist := slicesByName[sliceName]; exist {
// Slice already exists, nothing to do.
Expand Down Expand Up @@ -511,18 +513,17 @@ func (r *WorkloadReconciler) createSlice(ctx context.Context, wl *kueue.Workload
slice := core.SliceWithMetadata(wl, psa.Name)
log := ctrl.LoggerFrom(ctx).WithValues("slice", klog.KObj(slice))
log.V(3).Info("Creating Slice")

if err := controllerutil.SetControllerReference(wl, slice, r.client.Scheme()); err != nil {
return nil, err
}
// Since Slice is a cluster-scoped object and Workload is namespaced,
// we cannot set a controller owner reference. The Workload's namespace and name
// are stored as annotations on the Slice for lookup.
parseTopologyAssignmentIntoPartitionIds(slice, psa.TopologyAssignment, nodes)

ps := podset.FindPodSetByName(wl.Spec.PodSets, psa.Name)
slice.Spec.Type = v1alpha1.Type(core.GetTPUAccelerator(ps.Template))
slice.Spec.Topology = core.GetTPUTopology(ps.Template)

if err := r.client.Create(ctx, slice); err != nil {
msg := fmt.Sprintf("Error creating Slice %q: %v", client.ObjectKeyFromObject(slice), err)
msg := fmt.Sprintf("Error creating Slice %q: %v", slice.Name, err)
log.Error(err, msg)
r.record.Event(wl, corev1.EventTypeWarning, FailedCreateSliceEventType, api.TruncateEventMessage(msg))
ac.State = kueue.CheckStatePending
Expand All @@ -547,7 +548,7 @@ func (r *WorkloadReconciler) updateWorkloadAdmissionCheckStatus(ctx context.Cont
func buildCreationEventMessage(slices []v1alpha1.Slice) string {
sliceNames := make([]string, len(slices))
for index, slice := range slices {
sliceNames[index] = fmt.Sprintf("%q", client.ObjectKeyFromObject(&slice))
sliceNames[index] = fmt.Sprintf("%q", slice.Name)
}
sort.Strings(sliceNames)
return fmt.Sprintf("The Slices %s have been created", strings.Join(sliceNames, ", "))
Expand Down Expand Up @@ -672,18 +673,20 @@ func (h *sliceHandler) handleEvent(ctx context.Context, obj client.Object, q wor

log := ctrl.LoggerFrom(ctx)

owner := metav1.GetControllerOf(slice)
if owner == nil {
log.V(3).Info("Owner not found")
workloadNamespace, nsFound := slice.Annotations[core.OwnerWorkloadNamespaceAnnotation]
workloadName, nameFound := slice.Annotations[core.OwnerWorkloadNameAnnotation]

if !nsFound || !nameFound {
log.V(3).Info("Slice is missing workload owner annotations, skipping event handling", "slice", klog.KObj(slice))
return
}

log.V(3).Info("Handle Slice event", "workload", klog.KRef(slice.Namespace, slice.Name))
log.V(3).Info("Handle Slice event", "workload", klog.KRef(workloadNamespace, workloadName))

req := reconcile.Request{
NamespacedName: types.NamespacedName{
Name: owner.Name,
Namespace: slice.Namespace,
Name: workloadName,
Namespace: workloadNamespace,
},
}

Expand Down
Loading
Loading