Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/klog/v2"
"sigs.k8s.io/cluster-inventory-api/pkg/access"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/healthz"
Expand All @@ -52,6 +53,7 @@ var (
healthAddr string
profilerAddress string
concurrentReconciles int
providerFile string
)

// Add RBAC for the authorized diagnostics endpoint.
Expand Down Expand Up @@ -101,10 +103,20 @@ func main() {
libsveltosv1beta1.Component("ClusterInventoryController"), ctrl.Log.WithName("log-setter"),
ctrl.GetConfigOrDie())

var accessCfg *access.Config
if providerFile != "" {
accessCfg, err = access.NewFromFile(providerFile)
if err != nil {
setupLog.Error(err, "unable to load provider file", "path", providerFile)
os.Exit(1)
}
}

if err = (&controller.ClusterProfileReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
ConcurrentReconciles: concurrentReconciles,
AccessConfig: accessCfg,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterProfile")
os.Exit(1)
Expand Down Expand Up @@ -155,6 +167,10 @@ func initFlags(fs *pflag.FlagSet) {
fs.DurationVar(&syncPeriod, "sync-period", defaultSyncPeriod*time.Minute,
fmt.Sprintf("Minimum interval at which watched resources are reconciled. Default: %d minutes",
defaultSyncPeriod))

fs.StringVar(&providerFile, "clusterprofile-provider-file", "",
"Path to the JSON provider configuration file enabling exec-plugin access providers. "+
"When empty only the kubeconfig-secretreader provider is supported.")
}

func setupChecks(mgr ctrl.Manager) {
Expand Down
35 changes: 27 additions & 8 deletions controllers/clusterprofile_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package controller
import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
clusterinventoryv1alpha1 "sigs.k8s.io/cluster-inventory-api/apis/v1alpha1"
"sigs.k8s.io/cluster-inventory-api/pkg/access"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -37,13 +39,18 @@ const (
// controller can delete the SveltosCluster and kubeconfig Secret before
// the ClusterProfile is removed from the API server.
clusterProfileFinalizer = "clusterinventory.projectsveltos.io/finalizer"

// tokenRefreshRatio controls how early we requeue before the token expires.
// Requeuing at 80% of the remaining lifetime gives a comfortable safety margin.
tokenRefreshRatio = 0.8
)

// ClusterProfileReconciler reconciles a multicluster.x-k8s.io ClusterProfile.
type ClusterProfileReconciler struct {
client.Client
Scheme *runtime.Scheme
ConcurrentReconciles int
AccessConfig *access.Config
}

//+kubebuilder:rbac:groups=multicluster.x-k8s.io,resources=clusterprofiles,verbs=get;list;watch;update;patch
Expand Down Expand Up @@ -71,39 +78,51 @@ func (r *ClusterProfileReconciler) Reconcile(ctx context.Context, req ctrl.Reque
return ctrl.Result{}, r.reconcileDelete(ctx, cp, logger)
}

return ctrl.Result{}, r.reconcileNormal(ctx, cp, logger)
return r.reconcileNormal(ctx, cp, logger)
}

func (r *ClusterProfileReconciler) reconcileNormal(ctx context.Context,
cp *clusterinventoryv1alpha1.ClusterProfile, logger logr.Logger) error {
cp *clusterinventoryv1alpha1.ClusterProfile, logger logr.Logger) (ctrl.Result, error) {

logger.V(logs.LogDebug).Info("reconcileNormal")

if !controllerutil.ContainsFinalizer(cp, clusterProfileFinalizer) {
controllerutil.AddFinalizer(cp, clusterProfileFinalizer)
if err := r.Update(ctx, cp); err != nil {
logger.V(logs.LogInfo).Info(fmt.Sprintf("failed to add finalizer: %v", err))
return err
return ctrl.Result{}, err
}
}

kubeconfig, err := getKubeconfig(ctx, r.Client, cp, logger)
kubeconfig, expiry, err := getKubeconfig(ctx, r.Client, r.AccessConfig, cp, logger)
if err != nil {
logger.V(logs.LogInfo).Info(fmt.Sprintf("failed to get kubeconfig: %v", err))
return err
return ctrl.Result{}, err
}

if err := reconcileKubeconfigSecret(ctx, r.Client, cp, kubeconfig, logger); err != nil {
logger.V(logs.LogInfo).Info(fmt.Sprintf("failed to reconcile kubeconfig secret: %v", err))
return err
return ctrl.Result{}, err
}

if err := reconcileSveltosCluster(ctx, r.Client, cp, logger); err != nil {
logger.V(logs.LogInfo).Info(fmt.Sprintf("failed to reconcile SveltosCluster: %v", err))
return err
return ctrl.Result{}, err
}

return nil
if expiry != nil {
remaining := time.Until(*expiry)
const minRequeue = time.Minute
requeueAfter := time.Duration(float64(remaining) * tokenRefreshRatio)
if requeueAfter < minRequeue {
requeueAfter = minRequeue
}
logger.V(logs.LogDebug).Info("scheduling token refresh",
"expiry", expiry, "requeueAfter", requeueAfter)
return ctrl.Result{RequeueAfter: requeueAfter}, nil
}

return ctrl.Result{}, nil
}

func (r *ClusterProfileReconciler) reconcileDelete(ctx context.Context,
Expand Down
189 changes: 176 additions & 13 deletions controllers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"time"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
Expand All @@ -30,8 +33,12 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
clientauthenticationv1 "k8s.io/client-go/pkg/apis/clientauthentication/v1"
clientcmdv1 "k8s.io/client-go/tools/clientcmd/api/v1"
clusterinventoryv1alpha1 "sigs.k8s.io/cluster-inventory-api/apis/v1alpha1"
"sigs.k8s.io/cluster-inventory-api/pkg/access"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/yaml"

libsveltosv1beta1 "github.com/projectsveltos/libsveltos/api/v1beta1"
logs "github.com/projectsveltos/libsveltos/lib/logsettings"
Expand All @@ -57,6 +64,12 @@ const (
// created by this controller so they can be identified for cleanup.
managedByLabel = "clusterinventory.projectsveltos.io/managed-by"
managedByValue = "clusterinventory-controller"

// tokenKubeconfigCluster, tokenKubeconfigUser, and tokenKubeconfigContext are
// the fixed names used in the minimal kubeconfig built by BuildTokenKubeconfig.
tokenKubeconfigCluster = "cluster"
tokenKubeconfigUser = "user"
tokenKubeconfigContext = "context"
)

// secretReaderConfig is the JSON payload embedded in the ClusterProfile's
Expand Down Expand Up @@ -91,28 +104,40 @@ func kubeconfigSecretName(cpName string) string {
return cpName + "-sveltos-kubeconfig"
}

// getKubeconfig returns the raw kubeconfig bytes for the cluster described by cp.
// getKubeconfig returns the raw kubeconfig bytes for the cluster described by cp,
// along with an optional token expiry time (nil if the kubeconfig does not expire).
//
// Currently only the "kubeconfig-secretreader" access provider is supported:
// the provider's exec extension must reference a pre-existing Secret that
// contains a complete kubeconfig.
// The kubeconfig-secretreader provider is always supported: it reads a full
// kubeconfig from a pre-existing Secret.
//
// To support exec-plugin-based providers (e.g. GKE Fleet, EKS), add additional
// branches here and implement a corresponding helper (e.g. getKubeconfigFromGKE).
// Only this function and the new helper need to change.
// When accessCfg is non-nil, any provider listed in the config file that
// matches an AccessProvider in the ClusterProfile is also supported. The
// corresponding exec plugin is invoked directly in this process; the resulting
// bearer token is written into a plain kubeconfig so that sveltoscluster-manager
// needs no exec binary. The token expiry is returned so the caller can requeue
// before the token expires.
func getKubeconfig(ctx context.Context, c client.Client,
cp *clusterinventoryv1alpha1.ClusterProfile, logger logr.Logger) ([]byte, error) {
accessCfg *access.Config,
cp *clusterinventoryv1alpha1.ClusterProfile, logger logr.Logger) ([]byte, *time.Time, error) {

if provider := findAccessProvider(cp, kubeconfigSecretReaderProviderName); provider != nil {
logger.V(logs.LogDebug).Info("using kubeconfig-secretreader access provider")
return getKubeconfigFromSecretReader(ctx, c, cp.Namespace, provider)
data, err := getKubeconfigFromSecretReader(ctx, c, cp.Namespace, provider)
return data, nil, err
}

// TODO: add more provider branches here when exec-plugin support is implemented.
if accessCfg != nil {
for _, p := range accessCfg.Providers {
if findAccessProvider(cp, p.Name) != nil {
logger.V(logs.LogDebug).Info("using exec-plugin access provider", "provider", p.Name)
return getKubeconfigFromExecPlugin(ctx, accessCfg, cp, logger)
}
}
}

return nil, fmt.Errorf("no supported access provider in ClusterProfile %s/%s"+
" (only %q is supported currently)",
cp.Namespace, cp.Name, kubeconfigSecretReaderProviderName)
return nil, nil, fmt.Errorf("no supported access provider in ClusterProfile %s/%s"+
" (kubeconfig-secretreader not present; set --clusterprofile-provider-file to enable exec-plugin providers)",
cp.Namespace, cp.Name)
}

// findAccessProvider returns the first AccessProvider matching name.
Expand Down Expand Up @@ -281,3 +306,141 @@ func deleteKubeconfigSecret(ctx context.Context, c client.Client,
logger.V(logs.LogDebug).Info(fmt.Sprintf("deleting kubeconfig secret %s/%s", cp.Namespace, secretName))
return c.Delete(ctx, secret)
}

// getKubeconfigFromExecPlugin handles exec-plugin-based access providers.
// It uses pkg/access to build an exec config (merging any cluster-specific
// CLI args and env vars from the ClusterProfile extensions per KEP-5339),
// then invokes the binary directly to obtain a short-lived bearer token.
// The token is embedded in a plain kubeconfig so that sveltoscluster-manager
// needs no exec binary in its own pod.
func getKubeconfigFromExecPlugin(ctx context.Context,
accessCfg *access.Config,
cp *clusterinventoryv1alpha1.ClusterProfile, logger logr.Logger) ([]byte, *time.Time, error) {

// BuildConfigFromCP handles provider lookup and KEP-5339 arg/env merging.
restCfg, err := accessCfg.BuildConfigFromCP(cp)
if err != nil {
return nil, nil, fmt.Errorf("building rest.Config from ClusterProfile: %w", err)
}
if restCfg.ExecProvider == nil {
return nil, nil, fmt.Errorf("access provider produced no exec configuration")
}

// Find the matching AccessProvider to get the cluster server address and CA.
var ap *clusterinventoryv1alpha1.AccessProvider
for _, p := range accessCfg.Providers {
if found := findAccessProvider(cp, p.Name); found != nil {
ap = found
break
}
}
if ap == nil {
return nil, nil, fmt.Errorf("no matching access provider found in ClusterProfile %s/%s",
cp.Namespace, cp.Name)
}

var envVars []string
for _, e := range restCfg.ExecProvider.Env {
envVars = append(envVars, e.Name+"="+e.Value)
}

logger.V(logs.LogDebug).Info("invoking exec plugin", "command", restCfg.ExecProvider.Command)
token, expiry, err := invokeExecPlugin(ctx, restCfg.ExecProvider.Command,
restCfg.ExecProvider.Args, envVars, ap)
if err != nil {
return nil, nil, err
}

kubeconfig, err := BuildTokenKubeconfig(ap.Cluster.Server, ap.Cluster.CertificateAuthorityData, token)
if err != nil {
return nil, nil, fmt.Errorf("building token kubeconfig: %w", err)
}

return kubeconfig, expiry, nil
}

// invokeExecPlugin runs an exec credential plugin and returns the bearer token
// and its optional expiry time. It sets KUBERNETES_EXEC_INFO so that plugins
// that request cluster info (ProvideClusterInfo: true) receive the correct
// server and CA data.
func invokeExecPlugin(ctx context.Context,
command string, args []string, envVars []string,
ap *clusterinventoryv1alpha1.AccessProvider) (string, *time.Time, error) {

// Build KUBERNETES_EXEC_INFO: an ExecCredential carrying the cluster
// connection details so the plugin knows which cluster it is authenticating to.
execInfo := &clientauthenticationv1.ExecCredential{
TypeMeta: metav1.TypeMeta{
APIVersion: "client.authentication.k8s.io/v1",
Kind: "ExecCredential",
},
Spec: clientauthenticationv1.ExecCredentialSpec{
Cluster: &clientauthenticationv1.Cluster{
Server: ap.Cluster.Server,
CertificateAuthorityData: ap.Cluster.CertificateAuthorityData,
},
},
}
// Pass through the exec extension data as cluster config if present.
for _, ext := range ap.Cluster.Extensions {
if ext.Name == execExtensionKey {
execInfo.Spec.Cluster.Config = runtime.RawExtension{Raw: ext.Extension.Raw}
break
}
}
execInfoJSON, err := json.Marshal(execInfo)
if err != nil {
return "", nil, fmt.Errorf("marshaling KUBERNETES_EXEC_INFO: %w", err)
}

cmd := exec.CommandContext(ctx, command, args...)
cmd.Env = append(os.Environ(), "KUBERNETES_EXEC_INFO="+string(execInfoJSON))
cmd.Env = append(cmd.Env, envVars...)

out, err := cmd.Output()
if err != nil {
return "", nil, fmt.Errorf("exec plugin %q: %w", command, err)
}

var result clientauthenticationv1.ExecCredential
if err := json.Unmarshal(out, &result); err != nil {
return "", nil, fmt.Errorf("parsing exec plugin output: %w", err)
}
if result.Status == nil || result.Status.Token == "" {
return "", nil, fmt.Errorf("exec plugin %q returned no token", command)
}

var expiry *time.Time
if result.Status.ExpirationTimestamp != nil {
t := result.Status.ExpirationTimestamp.Time
expiry = &t
}
return result.Status.Token, expiry, nil
}

// BuildTokenKubeconfig constructs a minimal kubeconfig that authenticates
// with a static bearer token. The resulting YAML can be stored in a Secret
// and used by sveltoscluster-manager without any exec binary.
func BuildTokenKubeconfig(server string, caData []byte, token string) ([]byte, error) {
kc := clientcmdv1.Config{
APIVersion: "v1",
Kind: "Config",
Clusters: []clientcmdv1.NamedCluster{{
Name: tokenKubeconfigCluster,
Cluster: clientcmdv1.Cluster{
Server: server,
CertificateAuthorityData: caData,
},
}},
AuthInfos: []clientcmdv1.NamedAuthInfo{{
Name: tokenKubeconfigUser,
AuthInfo: clientcmdv1.AuthInfo{Token: token},
}},
Contexts: []clientcmdv1.NamedContext{{
Name: tokenKubeconfigContext,
Context: clientcmdv1.Context{Cluster: tokenKubeconfigCluster, AuthInfo: tokenKubeconfigUser},
}},
CurrentContext: tokenKubeconfigContext,
}
return yaml.Marshal(kc)
}
Loading