From 40690337479461af7ddfb63fb29909830ffbe565 Mon Sep 17 00:00:00 2001 From: "You-Cheng Lin (Owen)" Date: Wed, 17 Sep 2025 17:53:14 +0000 Subject: [PATCH 01/10] create simple worker pool and add AsyncGetJobInfo Signed-off-by: You-Cheng Lin (Owen) --- .../ray_job_submission_service_server.go | 2 +- .../config/v1alpha1/configuration_types.go | 6 ++- .../controllers/ray/rayjob_controller.go | 33 +++++++++--- .../controllers/ray/rayservice_controller.go | 2 +- ray-operator/controllers/ray/suite_test.go | 3 +- .../dashboardclient/dashboard_httpclient.go | 24 ++++++++- .../ray/utils/dashboardclient/worker_pool.go | 54 +++++++++++++++++++ .../ray/utils/fake_serve_httpclient.go | 6 ++- ray-operator/controllers/ray/utils/util.go | 9 ++-- ray-operator/rayjob-submitter/cmd/main.go | 2 +- ray-operator/test/sampleyaml/support.go | 2 +- 11 files changed, 122 insertions(+), 21 deletions(-) create mode 100644 ray-operator/controllers/ray/utils/dashboardclient/worker_pool.go diff --git a/apiserver/pkg/server/ray_job_submission_service_server.go b/apiserver/pkg/server/ray_job_submission_service_server.go index 4fdead1e50f..8344c7f886d 100644 --- a/apiserver/pkg/server/ray_job_submission_service_server.go +++ b/apiserver/pkg/server/ray_job_submission_service_server.go @@ -41,7 +41,7 @@ type RayJobSubmissionServiceServer struct { // Create RayJobSubmissionServiceServer func NewRayJobSubmissionServiceServer(clusterServer *ClusterServer, options *RayJobSubmissionServiceServerOptions) *RayJobSubmissionServiceServer { zl := zerolog.New(os.Stdout).Level(zerolog.DebugLevel) - return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false)} + return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false, nil, nil)} } // Submit Ray job diff --git a/ray-operator/apis/config/v1alpha1/configuration_types.go b/ray-operator/apis/config/v1alpha1/configuration_types.go index 6acaedeb260..427e9f2ef8f 100644 --- a/ray-operator/apis/config/v1alpha1/configuration_types.go +++ b/ray-operator/apis/config/v1alpha1/configuration_types.go @@ -1,6 +1,8 @@ package v1alpha1 import ( + "sync" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -85,8 +87,8 @@ type Configuration struct { EnableMetrics bool `json:"enableMetrics,omitempty"` } -func (config Configuration) GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { - return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy) +func (config Configuration) GetDashboardClient(mgr manager.Manager, taskQueue chan func(), jobInfoMap *sync.Map) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { + return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy, taskQueue, jobInfoMap) } func (config Configuration) GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) utils.RayHttpProxyClientInterface { diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index cae16cd0bc8..b8f3c0f9d31 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -6,6 +6,7 @@ import ( "os" "strconv" "strings" + "sync" "time" "github.com/go-logr/logr" @@ -29,6 +30,7 @@ import ( "github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient" + utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types" "github.com/ray-project/kuberay/ray-operator/pkg/features" ) @@ -40,11 +42,12 @@ const ( // RayJobReconciler reconciles a RayJob object type RayJobReconciler struct { client.Client - Scheme *runtime.Scheme - Recorder record.EventRecorder - + Scheme *runtime.Scheme + Recorder record.EventRecorder + JobInfoMap *sync.Map dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) options RayJobReconcilerOptions + workerPool *dashboardclient.WorkerPool } type RayJobReconcilerOptions struct { @@ -53,13 +56,18 @@ type RayJobReconcilerOptions struct { // NewRayJobReconciler returns a new reconcile.Reconciler func NewRayJobReconciler(_ context.Context, mgr manager.Manager, options RayJobReconcilerOptions, provider utils.ClientProvider) *RayJobReconciler { - dashboardClientFunc := provider.GetDashboardClient(mgr) + taskQueue := make(chan func(), 1000) + JobInfoMap := &sync.Map{} + workerPool := dashboardclient.NewWorkerPool(taskQueue) + dashboardClientFunc := provider.GetDashboardClient(mgr, taskQueue, JobInfoMap) return &RayJobReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Recorder: mgr.GetEventRecorderFor("rayjob-controller"), + JobInfoMap: JobInfoMap, dashboardClientFunc: dashboardClientFunc, options: options, + workerPool: workerPool, } } @@ -263,9 +271,12 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) if err != nil { return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err } - - jobInfo, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId) - if err != nil { + var jobInfo *utiltypes.RayJobInfo + if loadedJobInfo, ok := r.JobInfoMap.Load(rayJobInstance.Status.JobId); ok { + logger.Info("Found jobInfo in map", "JobId", rayJobInstance.Status.JobId, "jobInfo", loadedJobInfo) + jobInfo = loadedJobInfo.(*utiltypes.RayJobInfo) + logger.Info("Casted jobInfo", "JobId", rayJobInstance.Status.JobId, "jobInfo", jobInfo) + } else { // If the Ray job was not found, GetJobInfo returns a BadRequest error. if rayJobInstance.Spec.SubmissionMode == rayv1.HTTPMode && errors.IsBadRequest(err) { logger.Info("The Ray job was not found. Submit a Ray job via an HTTP request.", "JobId", rayJobInstance.Status.JobId) @@ -275,10 +286,16 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) } return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil } - logger.Error(err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId) + logger.Info("Job info not found in map", "JobId", rayJobInstance.Status.JobId) + rayDashboardClient.AsyncGetJobInfo(ctx, rayJobInstance.Status.JobId) return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err } + rayDashboardClient.AsyncGetJobInfo(ctx, rayJobInstance.Status.JobId) + if jobInfo == nil { + logger.Error(err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId) + return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err + } // If the JobStatus is in a terminal status, such as SUCCEEDED, FAILED, or STOPPED, it is impossible for the Ray job // to transition to any other. Additionally, RayJob does not currently support retries. Hence, we can mark the RayJob // as "Complete" or "Failed" to avoid unnecessary reconciliation. diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index 7e51c018fbf..802f5b19b95 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -60,7 +60,7 @@ type RayServiceReconciler struct { // NewRayServiceReconciler returns a new reconcile.Reconciler func NewRayServiceReconciler(_ context.Context, mgr manager.Manager, provider utils.ClientProvider) *RayServiceReconciler { - dashboardClientFunc := provider.GetDashboardClient(mgr) + dashboardClientFunc := provider.GetDashboardClient(mgr, nil, nil) httpProxyClientFunc := provider.GetHttpProxyClient(mgr) return &RayServiceReconciler{ Client: mgr.GetClient(), diff --git a/ray-operator/controllers/ray/suite_test.go b/ray-operator/controllers/ray/suite_test.go index 85c913e7bd6..be24401c9d8 100644 --- a/ray-operator/controllers/ray/suite_test.go +++ b/ray-operator/controllers/ray/suite_test.go @@ -18,6 +18,7 @@ package ray import ( "os" "path/filepath" + "sync" "testing" . "github.com/onsi/ginkgo/v2" @@ -52,7 +53,7 @@ var ( type TestClientProvider struct{} -func (testProvider TestClientProvider) GetDashboardClient(_ manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { +func (testProvider TestClientProvider) GetDashboardClient(_ manager.Manager, _ chan func(), _ *sync.Map) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { return func(_ *rayv1.RayCluster, _ string) (dashboardclient.RayDashboardClientInterface, error) { return fakeRayDashboardClient, nil } diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go index ddd45aad08b..3de6be8f031 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go @@ -7,6 +7,7 @@ import ( "io" "net/http" "strings" + "sync" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/json" @@ -25,12 +26,13 @@ var ( ) type RayDashboardClientInterface interface { - InitClient(client *http.Client, dashboardURL string) + InitClient(client *http.Client, dashboardURL string, taskQueue chan func(), jobInfoMap *sync.Map) UpdateDeployments(ctx context.Context, configJson []byte) error // V2/multi-app Rest API GetServeDetails(ctx context.Context) (*utiltypes.ServeDetails, error) GetMultiApplicationStatus(context.Context) (map[string]*utiltypes.ServeApplicationStatus, error) GetJobInfo(ctx context.Context, jobId string) (*utiltypes.RayJobInfo, error) + AsyncGetJobInfo(ctx context.Context, jobId string) ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error) SubmitJob(ctx context.Context, rayJob *rayv1.RayJob) (string, error) SubmitJobReq(ctx context.Context, request *utiltypes.RayJobRequest) (string, error) @@ -41,12 +43,16 @@ type RayDashboardClientInterface interface { type RayDashboardClient struct { client *http.Client + taskQueue chan func() + jobInfoMap *sync.Map dashboardURL string } -func (r *RayDashboardClient) InitClient(client *http.Client, dashboardURL string) { +func (r *RayDashboardClient) InitClient(client *http.Client, dashboardURL string, taskQueue chan func(), jobInfoMap *sync.Map) { r.client = client r.dashboardURL = dashboardURL + r.taskQueue = taskQueue + r.jobInfoMap = jobInfoMap } // UpdateDeployments update the deployments in the Ray cluster. @@ -161,6 +167,19 @@ func (r *RayDashboardClient) GetJobInfo(ctx context.Context, jobId string) (*uti return &jobInfo, nil } +func (r *RayDashboardClient) AsyncGetJobInfo(ctx context.Context, jobId string) { + r.taskQueue <- func() { + jobInfo, err := r.GetJobInfo(ctx, jobId) + if err != nil { + fmt.Printf("AsyncGetJobInfo: error: %v\n", err) + } + fmt.Printf("AsyncGetJobInfo: jobInfo: %v\n", jobInfo) + if jobInfo != nil { + r.jobInfoMap.Store(jobId, jobInfo) + } + } +} + func (r *RayDashboardClient) ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, r.dashboardURL+JobPath, nil) if err != nil { @@ -211,6 +230,7 @@ func (r *RayDashboardClient) SubmitJobReq(ctx context.Context, request *utiltype } req.Header.Set("Content-Type", "application/json") + resp, err := r.client.Do(req) if err != nil { return diff --git a/ray-operator/controllers/ray/utils/dashboardclient/worker_pool.go b/ray-operator/controllers/ray/utils/dashboardclient/worker_pool.go new file mode 100644 index 00000000000..6c9fd2f4f80 --- /dev/null +++ b/ray-operator/controllers/ray/utils/dashboardclient/worker_pool.go @@ -0,0 +1,54 @@ +package dashboardclient + +import ( + "sync" +) + +type WorkerPool struct { + taskQueue chan func() + stop chan struct{} + wg sync.WaitGroup + workers int +} + +func NewWorkerPool(taskQueue chan func()) *WorkerPool { + wp := &WorkerPool{ + taskQueue: taskQueue, + workers: 10, + stop: make(chan struct{}), + } + + // Start workers immediately + wp.Start() + return wp +} + +// Start launches worker goroutines to consume from queue +func (wp *WorkerPool) Start() { + for i := 0; i < wp.workers; i++ { + wp.wg.Add(1) + go wp.worker() + } +} + +// worker consumes and executes tasks from the queue +func (wp *WorkerPool) worker() { + defer wp.wg.Done() + + for { + select { + case <-wp.stop: + return + case task := <-wp.taskQueue: + if task != nil { + task() // Execute the job + } + } + } +} + +// Stop shuts down all workers +func (wp *WorkerPool) Stop() { + close(wp.stop) + wp.wg.Wait() +} diff --git a/ray-operator/controllers/ray/utils/fake_serve_httpclient.go b/ray-operator/controllers/ray/utils/fake_serve_httpclient.go index 21a3fdb91be..19334860072 100644 --- a/ray-operator/controllers/ray/utils/fake_serve_httpclient.go +++ b/ray-operator/controllers/ray/utils/fake_serve_httpclient.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "sync" "sync/atomic" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" @@ -19,7 +20,7 @@ type FakeRayDashboardClient struct { var _ dashboardclient.RayDashboardClientInterface = (*FakeRayDashboardClient)(nil) -func (r *FakeRayDashboardClient) InitClient(_ *http.Client, _ string) { +func (r *FakeRayDashboardClient) InitClient(_ *http.Client, _ string, _ chan func(), _ *sync.Map) { } func (r *FakeRayDashboardClient) UpdateDeployments(_ context.Context, _ []byte) error { @@ -46,6 +47,9 @@ func (r *FakeRayDashboardClient) GetJobInfo(ctx context.Context, jobId string) ( return &utiltypes.RayJobInfo{JobStatus: rayv1.JobStatusRunning}, nil } +func (r *FakeRayDashboardClient) AsyncGetJobInfo(_ context.Context, _ string) { +} + func (r *FakeRayDashboardClient) ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error) { if mock := r.GetJobInfoMock.Load(); mock != nil { info, err := (*mock)(ctx, "job_id") diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index cf6b9066323..81277b3d4f7 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -11,6 +11,7 @@ import ( "reflect" "strconv" "strings" + "sync" "time" "unicode" @@ -641,7 +642,7 @@ func EnvVarByName(envName string, envVars []corev1.EnvVar) (corev1.EnvVar, bool) } type ClientProvider interface { - GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) + GetDashboardClient(mgr manager.Manager, taskQueue chan func(), jobInfoMap *sync.Map) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) RayHttpProxyClientInterface } @@ -758,7 +759,7 @@ func FetchHeadServiceURL(ctx context.Context, cli client.Client, rayCluster *ray return headServiceURL, nil } -func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { +func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool, taskQueue chan func(), jobInfoMap *sync.Map) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { return func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { dashboardClient := &dashboardclient.RayDashboardClient{} if useKubernetesProxy { @@ -777,13 +778,15 @@ func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) fun // configured to communicate with the Kubernetes API server. mgr.GetHTTPClient(), fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:dashboard/proxy", mgr.GetConfig().Host, rayCluster.Namespace, headSvcName), + taskQueue, + jobInfoMap, ) return dashboardClient, nil } dashboardClient.InitClient(&http.Client{ Timeout: 2 * time.Second, - }, "http://"+url) + }, "http://"+url, taskQueue, jobInfoMap) return dashboardClient, nil } } diff --git a/ray-operator/rayjob-submitter/cmd/main.go b/ray-operator/rayjob-submitter/cmd/main.go index dd4f68eb420..ae58b62d169 100644 --- a/ray-operator/rayjob-submitter/cmd/main.go +++ b/ray-operator/rayjob-submitter/cmd/main.go @@ -64,7 +64,7 @@ func main() { } rayDashboardClient := &dashboardclient.RayDashboardClient{} address = rayjobsubmitter.JobSubmissionURL(address) - rayDashboardClient.InitClient(&http.Client{Timeout: time.Second * 10}, address) + rayDashboardClient.InitClient(&http.Client{Timeout: time.Second * 10}, address, nil, nil) submissionId, err := rayDashboardClient.SubmitJobReq(context.Background(), &req) if err != nil { if strings.Contains(err.Error(), "Please use a different submission_id") { diff --git a/ray-operator/test/sampleyaml/support.go b/ray-operator/test/sampleyaml/support.go index 42ffd19e0a2..f715583f722 100644 --- a/ray-operator/test/sampleyaml/support.go +++ b/ray-operator/test/sampleyaml/support.go @@ -75,7 +75,7 @@ func QueryDashboardGetAppStatus(t Test, rayCluster *rayv1.RayCluster) func(Gomeg g.Expect(err).ToNot(HaveOccurred()) url := fmt.Sprintf("127.0.0.1:%d", localPort) - rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false) + rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false, nil, nil) rayDashboardClient, err := rayDashboardClientFunc(rayCluster, url) g.Expect(err).ToNot(HaveOccurred()) serveDetails, err := rayDashboardClient.GetServeDetails(t.Ctx()) From 45635be4d55e1046f7dd62f9bb9b5e0775c4bd62 Mon Sep 17 00:00:00 2001 From: "You-Cheng Lin (Owen)" Date: Wed, 17 Sep 2025 17:56:22 +0000 Subject: [PATCH 02/10] remove log Signed-off-by: You-Cheng Lin (Owen) --- ray-operator/controllers/ray/rayjob_controller.go | 2 -- .../ray/utils/dashboardclient/dashboard_httpclient.go | 1 - 2 files changed, 3 deletions(-) diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index b8f3c0f9d31..44812ec0770 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -273,9 +273,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) } var jobInfo *utiltypes.RayJobInfo if loadedJobInfo, ok := r.JobInfoMap.Load(rayJobInstance.Status.JobId); ok { - logger.Info("Found jobInfo in map", "JobId", rayJobInstance.Status.JobId, "jobInfo", loadedJobInfo) jobInfo = loadedJobInfo.(*utiltypes.RayJobInfo) - logger.Info("Casted jobInfo", "JobId", rayJobInstance.Status.JobId, "jobInfo", jobInfo) } else { // If the Ray job was not found, GetJobInfo returns a BadRequest error. if rayJobInstance.Spec.SubmissionMode == rayv1.HTTPMode && errors.IsBadRequest(err) { diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go index 3de6be8f031..0f4d763928b 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go @@ -173,7 +173,6 @@ func (r *RayDashboardClient) AsyncGetJobInfo(ctx context.Context, jobId string) if err != nil { fmt.Printf("AsyncGetJobInfo: error: %v\n", err) } - fmt.Printf("AsyncGetJobInfo: jobInfo: %v\n", jobInfo) if jobInfo != nil { r.jobInfoMap.Store(jobId, jobInfo) } From 72142c0d851840595df28e1487d91c07ce3e1ebd Mon Sep 17 00:00:00 2001 From: "You-Cheng Lin (Owen)" Date: Thu, 2 Oct 2025 14:39:11 +0000 Subject: [PATCH 03/10] use another map tp track channel content Signed-off-by: You-Cheng Lin (Owen) --- .../config/v1alpha1/configuration_types.go | 8 +++---- .../controllers/ray/rayjob_controller.go | 19 +++++++--------- .../controllers/ray/rayservice_controller.go | 2 +- ray-operator/controllers/ray/suite_test.go | 5 +++-- .../dashboardclient/dashboard_httpclient.go | 22 ++++++++++++------- .../ray/utils/dashboardclient/worker_pool.go | 18 +++++++++------ .../ray/utils/fake_serve_httpclient.go | 5 +++-- ray-operator/controllers/ray/utils/util.go | 13 ++++++----- ray-operator/test/sampleyaml/support.go | 2 +- 9 files changed, 53 insertions(+), 41 deletions(-) diff --git a/ray-operator/apis/config/v1alpha1/configuration_types.go b/ray-operator/apis/config/v1alpha1/configuration_types.go index 427e9f2ef8f..51d72aa4346 100644 --- a/ray-operator/apis/config/v1alpha1/configuration_types.go +++ b/ray-operator/apis/config/v1alpha1/configuration_types.go @@ -1,8 +1,7 @@ package v1alpha1 import ( - "sync" - + cmap "github.com/orcaman/concurrent-map/v2" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -10,6 +9,7 @@ import ( rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient" + utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types" ) //+kubebuilder:object:root=true @@ -87,8 +87,8 @@ type Configuration struct { EnableMetrics bool `json:"enableMetrics,omitempty"` } -func (config Configuration) GetDashboardClient(mgr manager.Manager, taskQueue chan func(), jobInfoMap *sync.Map) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { - return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy, taskQueue, jobInfoMap) +func (config Configuration) GetDashboardClient(mgr manager.Manager, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { + return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy, jobInfoMap) } func (config Configuration) GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) utils.RayHttpProxyClientInterface { diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index 44812ec0770..2f97de8a9c3 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -6,10 +6,10 @@ import ( "os" "strconv" "strings" - "sync" "time" "github.com/go-logr/logr" + cmap "github.com/orcaman/concurrent-map/v2" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -44,10 +44,9 @@ type RayJobReconciler struct { client.Client Scheme *runtime.Scheme Recorder record.EventRecorder - JobInfoMap *sync.Map + JobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo] dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) options RayJobReconcilerOptions - workerPool *dashboardclient.WorkerPool } type RayJobReconcilerOptions struct { @@ -56,18 +55,15 @@ type RayJobReconcilerOptions struct { // NewRayJobReconciler returns a new reconcile.Reconciler func NewRayJobReconciler(_ context.Context, mgr manager.Manager, options RayJobReconcilerOptions, provider utils.ClientProvider) *RayJobReconciler { - taskQueue := make(chan func(), 1000) - JobInfoMap := &sync.Map{} - workerPool := dashboardclient.NewWorkerPool(taskQueue) - dashboardClientFunc := provider.GetDashboardClient(mgr, taskQueue, JobInfoMap) + JobInfoMap := cmap.New[*utiltypes.RayJobInfo]() + dashboardClientFunc := provider.GetDashboardClient(mgr, &JobInfoMap) return &RayJobReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Recorder: mgr.GetEventRecorderFor("rayjob-controller"), - JobInfoMap: JobInfoMap, + JobInfoMap: &JobInfoMap, dashboardClientFunc: dashboardClientFunc, options: options, - workerPool: workerPool, } } @@ -272,8 +268,9 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err } var jobInfo *utiltypes.RayJobInfo - if loadedJobInfo, ok := r.JobInfoMap.Load(rayJobInstance.Status.JobId); ok { - jobInfo = loadedJobInfo.(*utiltypes.RayJobInfo) + if loadedJobInfo, ok := r.JobInfoMap.Get(rayJobInstance.Status.JobId); ok { + logger.Info("Job info found in map", "JobId", rayJobInstance.Status.JobId, "JobInfo", loadedJobInfo) + jobInfo = loadedJobInfo } else { // If the Ray job was not found, GetJobInfo returns a BadRequest error. if rayJobInstance.Spec.SubmissionMode == rayv1.HTTPMode && errors.IsBadRequest(err) { diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index 802f5b19b95..8a6fe88c53b 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -60,7 +60,7 @@ type RayServiceReconciler struct { // NewRayServiceReconciler returns a new reconcile.Reconciler func NewRayServiceReconciler(_ context.Context, mgr manager.Manager, provider utils.ClientProvider) *RayServiceReconciler { - dashboardClientFunc := provider.GetDashboardClient(mgr, nil, nil) + dashboardClientFunc := provider.GetDashboardClient(mgr, nil) httpProxyClientFunc := provider.GetHttpProxyClient(mgr) return &RayServiceReconciler{ Client: mgr.GetClient(), diff --git a/ray-operator/controllers/ray/suite_test.go b/ray-operator/controllers/ray/suite_test.go index be24401c9d8..96c2b1a12fd 100644 --- a/ray-operator/controllers/ray/suite_test.go +++ b/ray-operator/controllers/ray/suite_test.go @@ -18,11 +18,11 @@ package ray import ( "os" "path/filepath" - "sync" "testing" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + cmap "github.com/orcaman/concurrent-map/v2" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -37,6 +37,7 @@ import ( rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient" + utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types" ) // These tests use Ginkgo (BDD-style Go testing framework). Refer to @@ -53,7 +54,7 @@ var ( type TestClientProvider struct{} -func (testProvider TestClientProvider) GetDashboardClient(_ manager.Manager, _ chan func(), _ *sync.Map) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { +func (testProvider TestClientProvider) GetDashboardClient(_ manager.Manager, _ *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { return func(_ *rayv1.RayCluster, _ string) (dashboardclient.RayDashboardClientInterface, error) { return fakeRayDashboardClient, nil } diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go index 0f4d763928b..97ad5adb446 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go @@ -7,8 +7,8 @@ import ( "io" "net/http" "strings" - "sync" + cmap "github.com/orcaman/concurrent-map/v2" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/yaml" @@ -26,7 +26,7 @@ var ( ) type RayDashboardClientInterface interface { - InitClient(client *http.Client, dashboardURL string, taskQueue chan func(), jobInfoMap *sync.Map) + InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]) UpdateDeployments(ctx context.Context, configJson []byte) error // V2/multi-app Rest API GetServeDetails(ctx context.Context) (*utiltypes.ServeDetails, error) @@ -43,15 +43,15 @@ type RayDashboardClientInterface interface { type RayDashboardClient struct { client *http.Client - taskQueue chan func() - jobInfoMap *sync.Map + workerPool *WorkerPool + jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo] dashboardURL string } -func (r *RayDashboardClient) InitClient(client *http.Client, dashboardURL string, taskQueue chan func(), jobInfoMap *sync.Map) { +func (r *RayDashboardClient) InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]) { r.client = client r.dashboardURL = dashboardURL - r.taskQueue = taskQueue + r.workerPool = workerPool r.jobInfoMap = jobInfoMap } @@ -168,13 +168,19 @@ func (r *RayDashboardClient) GetJobInfo(ctx context.Context, jobId string) (*uti } func (r *RayDashboardClient) AsyncGetJobInfo(ctx context.Context, jobId string) { - r.taskQueue <- func() { + if _, ok := r.workerPool.channelContent.Get(jobId); ok { + return + } + r.workerPool.channelContent.Set(jobId, struct{}{}) + r.workerPool.taskQueue <- func() { jobInfo, err := r.GetJobInfo(ctx, jobId) + r.workerPool.channelContent.Remove(jobId) if err != nil { fmt.Printf("AsyncGetJobInfo: error: %v\n", err) + return } if jobInfo != nil { - r.jobInfoMap.Store(jobId, jobInfo) + r.jobInfoMap.Set(jobId, jobInfo) } } } diff --git a/ray-operator/controllers/ray/utils/dashboardclient/worker_pool.go b/ray-operator/controllers/ray/utils/dashboardclient/worker_pool.go index 6c9fd2f4f80..558dd3bf88a 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/worker_pool.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/worker_pool.go @@ -2,20 +2,24 @@ package dashboardclient import ( "sync" + + cmap "github.com/orcaman/concurrent-map/v2" ) type WorkerPool struct { - taskQueue chan func() - stop chan struct{} - wg sync.WaitGroup - workers int + channelContent cmap.ConcurrentMap[string, struct{}] + taskQueue chan func() + stop chan struct{} + wg sync.WaitGroup + workers int } func NewWorkerPool(taskQueue chan func()) *WorkerPool { wp := &WorkerPool{ - taskQueue: taskQueue, - workers: 10, - stop: make(chan struct{}), + taskQueue: taskQueue, + workers: 10, + stop: make(chan struct{}), + channelContent: cmap.New[struct{}](), } // Start workers immediately diff --git a/ray-operator/controllers/ray/utils/fake_serve_httpclient.go b/ray-operator/controllers/ray/utils/fake_serve_httpclient.go index 19334860072..dce925678f5 100644 --- a/ray-operator/controllers/ray/utils/fake_serve_httpclient.go +++ b/ray-operator/controllers/ray/utils/fake_serve_httpclient.go @@ -4,9 +4,10 @@ import ( "context" "fmt" "net/http" - "sync" "sync/atomic" + cmap "github.com/orcaman/concurrent-map/v2" + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient" utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types" @@ -20,7 +21,7 @@ type FakeRayDashboardClient struct { var _ dashboardclient.RayDashboardClientInterface = (*FakeRayDashboardClient)(nil) -func (r *FakeRayDashboardClient) InitClient(_ *http.Client, _ string, _ chan func(), _ *sync.Map) { +func (r *FakeRayDashboardClient) InitClient(_ *http.Client, _ string, _ *dashboardclient.WorkerPool, _ *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]) { } func (r *FakeRayDashboardClient) UpdateDeployments(_ context.Context, _ []byte) error { diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index 81277b3d4f7..86bca26f128 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -11,10 +11,10 @@ import ( "reflect" "strconv" "strings" - "sync" "time" "unicode" + cmap "github.com/orcaman/concurrent-map/v2" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -28,6 +28,7 @@ import ( rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient" + utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types" ) const ( @@ -642,7 +643,7 @@ func EnvVarByName(envName string, envVars []corev1.EnvVar) (corev1.EnvVar, bool) } type ClientProvider interface { - GetDashboardClient(mgr manager.Manager, taskQueue chan func(), jobInfoMap *sync.Map) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) + GetDashboardClient(mgr manager.Manager, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) RayHttpProxyClientInterface } @@ -759,7 +760,9 @@ func FetchHeadServiceURL(ctx context.Context, cli client.Client, rayCluster *ray return headServiceURL, nil } -func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool, taskQueue chan func(), jobInfoMap *sync.Map) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { +func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { + taskQueue := make(chan func()) + workerPool := dashboardclient.NewWorkerPool(taskQueue) return func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { dashboardClient := &dashboardclient.RayDashboardClient{} if useKubernetesProxy { @@ -778,7 +781,7 @@ func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool, tas // configured to communicate with the Kubernetes API server. mgr.GetHTTPClient(), fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:dashboard/proxy", mgr.GetConfig().Host, rayCluster.Namespace, headSvcName), - taskQueue, + workerPool, jobInfoMap, ) return dashboardClient, nil @@ -786,7 +789,7 @@ func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool, tas dashboardClient.InitClient(&http.Client{ Timeout: 2 * time.Second, - }, "http://"+url, taskQueue, jobInfoMap) + }, "http://"+url, workerPool, jobInfoMap) return dashboardClient, nil } } diff --git a/ray-operator/test/sampleyaml/support.go b/ray-operator/test/sampleyaml/support.go index f715583f722..1a6393a2761 100644 --- a/ray-operator/test/sampleyaml/support.go +++ b/ray-operator/test/sampleyaml/support.go @@ -75,7 +75,7 @@ func QueryDashboardGetAppStatus(t Test, rayCluster *rayv1.RayCluster) func(Gomeg g.Expect(err).ToNot(HaveOccurred()) url := fmt.Sprintf("127.0.0.1:%d", localPort) - rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false, nil, nil) + rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false, nil) rayDashboardClient, err := rayDashboardClientFunc(rayCluster, url) g.Expect(err).ToNot(HaveOccurred()) serveDetails, err := rayDashboardClient.GetServeDetails(t.Ctx()) From 727d2c2d15c8c08f38b5a39416fc85e0570c7ee8 Mon Sep 17 00:00:00 2001 From: "You-Cheng Lin (Owen)" Date: Fri, 10 Oct 2025 14:38:53 +0000 Subject: [PATCH 04/10] update go mod Signed-off-by: You-Cheng Lin (Owen) --- apiserver/pkg/server/ray_job_submission_service_server.go | 2 +- go.mod | 1 + go.sum | 2 ++ 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/apiserver/pkg/server/ray_job_submission_service_server.go b/apiserver/pkg/server/ray_job_submission_service_server.go index 8344c7f886d..1b026858a3f 100644 --- a/apiserver/pkg/server/ray_job_submission_service_server.go +++ b/apiserver/pkg/server/ray_job_submission_service_server.go @@ -41,7 +41,7 @@ type RayJobSubmissionServiceServer struct { // Create RayJobSubmissionServiceServer func NewRayJobSubmissionServiceServer(clusterServer *ClusterServer, options *RayJobSubmissionServiceServerOptions) *RayJobSubmissionServiceServer { zl := zerolog.New(os.Stdout).Level(zerolog.DebugLevel) - return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false, nil, nil)} + return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false, nil)} } // Submit Ray job diff --git a/go.mod b/go.mod index 472e6d593df..0db4cf6a5eb 100644 --- a/go.mod +++ b/go.mod @@ -82,6 +82,7 @@ require ( github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect + github.com/orcaman/concurrent-map/v2 v2.0.1 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.1 // indirect diff --git a/go.sum b/go.sum index dddab9f7e86..7549b7d26c1 100644 --- a/go.sum +++ b/go.sum @@ -165,6 +165,8 @@ github.com/onsi/ginkgo/v2 v2.23.4/go.mod h1:Bt66ApGPBFzHyR+JO10Zbt0Gsp4uWxu5mIOT github.com/onsi/gomega v1.37.0 h1:CdEG8g0S133B4OswTDC/5XPSzE1OeP29QOioj2PID2Y= github.com/onsi/gomega v1.37.0/go.mod h1:8D9+Txp43QWKhM24yyOBEdpkzN8FvJyAwecBgsU4KU0= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= +github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= From d9a1a884e0095485607799034dcb935c3f6a10ac Mon Sep 17 00:00:00 2001 From: "You-Cheng Lin (Owen)" Date: Fri, 10 Oct 2025 15:18:57 +0000 Subject: [PATCH 05/10] remove JobInfoMap from controller Signed-off-by: You-Cheng Lin (Owen) --- ray-operator/apis/config/v1alpha1/configuration_types.go | 5 +++-- ray-operator/controllers/ray/rayjob_controller.go | 8 ++------ ray-operator/controllers/ray/rayservice_controller.go | 2 +- ray-operator/controllers/ray/suite_test.go | 4 +--- .../ray/utils/dashboardclient/dashboard_httpclient.go | 8 ++++++++ .../controllers/ray/utils/fake_serve_httpclient.go | 4 ++++ ray-operator/controllers/ray/utils/util.go | 2 +- 7 files changed, 20 insertions(+), 13 deletions(-) diff --git a/ray-operator/apis/config/v1alpha1/configuration_types.go b/ray-operator/apis/config/v1alpha1/configuration_types.go index 51d72aa4346..37e562fcd5e 100644 --- a/ray-operator/apis/config/v1alpha1/configuration_types.go +++ b/ray-operator/apis/config/v1alpha1/configuration_types.go @@ -87,8 +87,9 @@ type Configuration struct { EnableMetrics bool `json:"enableMetrics,omitempty"` } -func (config Configuration) GetDashboardClient(mgr manager.Manager, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { - return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy, jobInfoMap) +func (config Configuration) GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { + jobInfoMap := cmap.New[*utiltypes.RayJobInfo]() + return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy, &jobInfoMap) } func (config Configuration) GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) utils.RayHttpProxyClientInterface { diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index 68cb2789e8c..cf75f1bbd00 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -9,7 +9,6 @@ import ( "time" "github.com/go-logr/logr" - cmap "github.com/orcaman/concurrent-map/v2" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -45,7 +44,6 @@ type RayJobReconciler struct { client.Client Scheme *runtime.Scheme Recorder record.EventRecorder - JobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo] dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) options RayJobReconcilerOptions } @@ -57,13 +55,11 @@ type RayJobReconcilerOptions struct { // NewRayJobReconciler returns a new reconcile.Reconciler func NewRayJobReconciler(_ context.Context, mgr manager.Manager, options RayJobReconcilerOptions, provider utils.ClientProvider) *RayJobReconciler { - JobInfoMap := cmap.New[*utiltypes.RayJobInfo]() - dashboardClientFunc := provider.GetDashboardClient(mgr, &JobInfoMap) + dashboardClientFunc := provider.GetDashboardClient(mgr) return &RayJobReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Recorder: mgr.GetEventRecorderFor("rayjob-controller"), - JobInfoMap: &JobInfoMap, dashboardClientFunc: dashboardClientFunc, options: options, } @@ -280,7 +276,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err } var jobInfo *utiltypes.RayJobInfo - if loadedJobInfo, ok := r.JobInfoMap.Get(rayJobInstance.Status.JobId); ok { + if loadedJobInfo := rayDashboardClient.GetJobInfoFromCache(rayJobInstance.Status.JobId); loadedJobInfo != nil { logger.Info("Job info found in map", "JobId", rayJobInstance.Status.JobId, "JobInfo", loadedJobInfo) jobInfo = loadedJobInfo } else { diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index a28dcd52aa3..7a1a50a36f6 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -60,7 +60,7 @@ type RayServiceReconciler struct { // NewRayServiceReconciler returns a new reconcile.Reconciler func NewRayServiceReconciler(_ context.Context, mgr manager.Manager, provider utils.ClientProvider) *RayServiceReconciler { - dashboardClientFunc := provider.GetDashboardClient(mgr, nil) + dashboardClientFunc := provider.GetDashboardClient(mgr) httpProxyClientFunc := provider.GetHttpProxyClient(mgr) return &RayServiceReconciler{ Client: mgr.GetClient(), diff --git a/ray-operator/controllers/ray/suite_test.go b/ray-operator/controllers/ray/suite_test.go index 96c2b1a12fd..85c913e7bd6 100644 --- a/ray-operator/controllers/ray/suite_test.go +++ b/ray-operator/controllers/ray/suite_test.go @@ -22,7 +22,6 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - cmap "github.com/orcaman/concurrent-map/v2" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -37,7 +36,6 @@ import ( rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient" - utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types" ) // These tests use Ginkgo (BDD-style Go testing framework). Refer to @@ -54,7 +52,7 @@ var ( type TestClientProvider struct{} -func (testProvider TestClientProvider) GetDashboardClient(_ manager.Manager, _ *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { +func (testProvider TestClientProvider) GetDashboardClient(_ manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { return func(_ *rayv1.RayCluster, _ string) (dashboardclient.RayDashboardClientInterface, error) { return fakeRayDashboardClient, nil } diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go index c439f989d73..33520666219 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go @@ -41,6 +41,7 @@ type RayDashboardClientInterface interface { GetJobLog(ctx context.Context, jobName string) (*string, error) StopJob(ctx context.Context, jobName string) error DeleteJob(ctx context.Context, jobName string) error + GetJobInfoFromCache(jobId string) *utiltypes.RayJobInfo } type RayDashboardClient struct { @@ -358,6 +359,13 @@ func (r *RayDashboardClient) DeleteJob(ctx context.Context, jobName string) erro return nil } +func (r *RayDashboardClient) GetJobInfoFromCache(jobId string) *utiltypes.RayJobInfo { + if jobInfo, ok := r.jobInfoMap.Get(jobId); ok { + return jobInfo + } + return nil +} + func ConvertRayJobToReq(rayJob *rayv1.RayJob) (*utiltypes.RayJobRequest, error) { req := &utiltypes.RayJobRequest{ Entrypoint: rayJob.Spec.Entrypoint, diff --git a/ray-operator/controllers/ray/utils/fake_serve_httpclient.go b/ray-operator/controllers/ray/utils/fake_serve_httpclient.go index dce925678f5..38220cfd1e7 100644 --- a/ray-operator/controllers/ray/utils/fake_serve_httpclient.go +++ b/ray-operator/controllers/ray/utils/fake_serve_httpclient.go @@ -82,3 +82,7 @@ func (r *FakeRayDashboardClient) StopJob(_ context.Context, _ string) (err error func (r *FakeRayDashboardClient) DeleteJob(_ context.Context, _ string) error { return nil } + +func (r *FakeRayDashboardClient) GetJobInfoFromCache(_ string) *utiltypes.RayJobInfo { + return nil +} diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index 2ccda78025f..c0c669670a9 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -643,7 +643,7 @@ func EnvVarByName(envName string, envVars []corev1.EnvVar) (corev1.EnvVar, bool) } type ClientProvider interface { - GetDashboardClient(mgr manager.Manager, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) + GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) RayHttpProxyClientInterface } From 0f1f766d115e7b4a3437e477302e103ff7dbf400 Mon Sep 17 00:00:00 2001 From: "You-Cheng Lin (Owen)" Date: Fri, 10 Oct 2025 15:28:36 +0000 Subject: [PATCH 06/10] defer the channelContent removal Signed-off-by: You-Cheng Lin (Owen) --- .../ray/utils/dashboardclient/dashboard_httpclient.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go index 33520666219..dfadfe7f012 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go @@ -184,8 +184,8 @@ func (r *RayDashboardClient) AsyncGetJobInfo(ctx context.Context, jobId string) } r.workerPool.channelContent.Set(jobId, struct{}{}) r.workerPool.taskQueue <- func() { + defer r.workerPool.channelContent.Remove(jobId) jobInfo, err := r.GetJobInfo(ctx, jobId) - r.workerPool.channelContent.Remove(jobId) if err != nil { fmt.Printf("AsyncGetJobInfo: error: %v\n", err) return From da0e45ec3b0c716ba8d6dfbb6b3efe0bb899776d Mon Sep 17 00:00:00 2001 From: "You-Cheng Lin (Owen)" Date: Fri, 10 Oct 2025 17:06:47 +0000 Subject: [PATCH 07/10] add error in cache Signed-off-by: You-Cheng Lin (Owen) --- .../apis/config/v1alpha1/configuration_types.go | 2 +- ray-operator/controllers/ray/rayjob_controller.go | 10 +++++++--- .../utils/dashboardclient/dashboard_httpclient.go | 15 ++++++++------- .../ray/utils/fake_serve_httpclient.go | 4 ++-- .../ray/utils/types/dashboard_httpclient.go | 5 +++++ ray-operator/controllers/ray/utils/util.go | 2 +- 6 files changed, 24 insertions(+), 14 deletions(-) diff --git a/ray-operator/apis/config/v1alpha1/configuration_types.go b/ray-operator/apis/config/v1alpha1/configuration_types.go index 37e562fcd5e..d6d25ee3b8d 100644 --- a/ray-operator/apis/config/v1alpha1/configuration_types.go +++ b/ray-operator/apis/config/v1alpha1/configuration_types.go @@ -88,7 +88,7 @@ type Configuration struct { } func (config Configuration) GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { - jobInfoMap := cmap.New[*utiltypes.RayJobInfo]() + jobInfoMap := cmap.New[*utiltypes.RayJobCache]() return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy, &jobInfoMap) } diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index cf75f1bbd00..33ddbfe0e8e 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -276,9 +276,13 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err } var jobInfo *utiltypes.RayJobInfo - if loadedJobInfo := rayDashboardClient.GetJobInfoFromCache(rayJobInstance.Status.JobId); loadedJobInfo != nil { - logger.Info("Job info found in map", "JobId", rayJobInstance.Status.JobId, "JobInfo", loadedJobInfo) - jobInfo = loadedJobInfo + if loadedJobCache := rayDashboardClient.GetJobInfoFromCache(rayJobInstance.Status.JobId); loadedJobCache != nil { + if loadedJobCache.Err != nil { + rayDashboardClient.AsyncGetJobInfo(ctx, rayJobInstance.Status.JobId) + logger.Error(loadedJobCache.Err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId, "Error", loadedJobCache.Err) + return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, loadedJobCache.Err + } + jobInfo = loadedJobCache.JobInfo } else { // If the Ray job was not found, GetJobInfo returns a BadRequest error. if errors.IsBadRequest(err) { diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go index dfadfe7f012..34fd1d4c518 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go @@ -28,7 +28,7 @@ var ( ) type RayDashboardClientInterface interface { - InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]) + InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache]) UpdateDeployments(ctx context.Context, configJson []byte) error // V2/multi-app Rest API GetServeDetails(ctx context.Context) (*utiltypes.ServeDetails, error) @@ -41,17 +41,17 @@ type RayDashboardClientInterface interface { GetJobLog(ctx context.Context, jobName string) (*string, error) StopJob(ctx context.Context, jobName string) error DeleteJob(ctx context.Context, jobName string) error - GetJobInfoFromCache(jobId string) *utiltypes.RayJobInfo + GetJobInfoFromCache(jobId string) *utiltypes.RayJobCache } type RayDashboardClient struct { client *http.Client workerPool *WorkerPool - jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo] + jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache] dashboardURL string } -func (r *RayDashboardClient) InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]) { +func (r *RayDashboardClient) InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache]) { r.client = client r.dashboardURL = dashboardURL r.workerPool = workerPool @@ -187,11 +187,12 @@ func (r *RayDashboardClient) AsyncGetJobInfo(ctx context.Context, jobId string) defer r.workerPool.channelContent.Remove(jobId) jobInfo, err := r.GetJobInfo(ctx, jobId) if err != nil { - fmt.Printf("AsyncGetJobInfo: error: %v\n", err) + err = fmt.Errorf("failed to get job info: %w", err) + r.jobInfoMap.Set(jobId, &utiltypes.RayJobCache{JobInfo: nil, Err: err}) return } if jobInfo != nil { - r.jobInfoMap.Set(jobId, jobInfo) + r.jobInfoMap.Set(jobId, &utiltypes.RayJobCache{JobInfo: jobInfo, Err: nil}) } } } @@ -359,7 +360,7 @@ func (r *RayDashboardClient) DeleteJob(ctx context.Context, jobName string) erro return nil } -func (r *RayDashboardClient) GetJobInfoFromCache(jobId string) *utiltypes.RayJobInfo { +func (r *RayDashboardClient) GetJobInfoFromCache(jobId string) *utiltypes.RayJobCache { if jobInfo, ok := r.jobInfoMap.Get(jobId); ok { return jobInfo } diff --git a/ray-operator/controllers/ray/utils/fake_serve_httpclient.go b/ray-operator/controllers/ray/utils/fake_serve_httpclient.go index 38220cfd1e7..04e9fd03895 100644 --- a/ray-operator/controllers/ray/utils/fake_serve_httpclient.go +++ b/ray-operator/controllers/ray/utils/fake_serve_httpclient.go @@ -21,7 +21,7 @@ type FakeRayDashboardClient struct { var _ dashboardclient.RayDashboardClientInterface = (*FakeRayDashboardClient)(nil) -func (r *FakeRayDashboardClient) InitClient(_ *http.Client, _ string, _ *dashboardclient.WorkerPool, _ *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]) { +func (r *FakeRayDashboardClient) InitClient(_ *http.Client, _ string, _ *dashboardclient.WorkerPool, _ *cmap.ConcurrentMap[string, *utiltypes.RayJobCache]) { } func (r *FakeRayDashboardClient) UpdateDeployments(_ context.Context, _ []byte) error { @@ -83,6 +83,6 @@ func (r *FakeRayDashboardClient) DeleteJob(_ context.Context, _ string) error { return nil } -func (r *FakeRayDashboardClient) GetJobInfoFromCache(_ string) *utiltypes.RayJobInfo { +func (r *FakeRayDashboardClient) GetJobInfoFromCache(_ string) *utiltypes.RayJobCache { return nil } diff --git a/ray-operator/controllers/ray/utils/types/dashboard_httpclient.go b/ray-operator/controllers/ray/utils/types/dashboard_httpclient.go index a246ea399f3..54bc373a29d 100644 --- a/ray-operator/controllers/ray/utils/types/dashboard_httpclient.go +++ b/ray-operator/controllers/ray/utils/types/dashboard_httpclient.go @@ -46,3 +46,8 @@ type RayJobStopResponse struct { type RayJobLogsResponse struct { Logs string `json:"logs,omitempty"` } + +type RayJobCache struct { + JobInfo *RayJobInfo + Err error +} diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index c0c669670a9..31684fdeff0 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -760,7 +760,7 @@ func FetchHeadServiceURL(ctx context.Context, cli client.Client, rayCluster *ray return headServiceURL, nil } -func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobInfo]) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { +func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache]) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { taskQueue := make(chan func()) workerPool := dashboardclient.NewWorkerPool(taskQueue) return func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { From afcb9e146852bd0c09604e1d0e730043ce015bf7 Mon Sep 17 00:00:00 2001 From: "You-Cheng Lin (Owen)" Date: Mon, 13 Oct 2025 13:09:34 +0000 Subject: [PATCH 08/10] move workerPoolChannelContent into dashboard client Signed-off-by: You-Cheng Lin (Owen) --- .../dashboardclient/dashboard_httpclient.go | 20 ++++++++++--------- .../ray/utils/dashboardclient/worker_pool.go | 18 +++++++---------- .../ray/utils/fake_serve_httpclient.go | 2 +- ray-operator/controllers/ray/utils/util.go | 4 +++- ray-operator/rayjob-submitter/cmd/main.go | 2 +- 5 files changed, 23 insertions(+), 23 deletions(-) diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go index 34fd1d4c518..565f76a4d04 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go @@ -28,7 +28,7 @@ var ( ) type RayDashboardClientInterface interface { - InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache]) + InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache], workerPoolChannelContent *cmap.ConcurrentMap[string, struct{}]) UpdateDeployments(ctx context.Context, configJson []byte) error // V2/multi-app Rest API GetServeDetails(ctx context.Context) (*utiltypes.ServeDetails, error) @@ -45,17 +45,19 @@ type RayDashboardClientInterface interface { } type RayDashboardClient struct { - client *http.Client - workerPool *WorkerPool - jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache] - dashboardURL string + client *http.Client + workerPoolChannelContent *cmap.ConcurrentMap[string, struct{}] + workerPool *WorkerPool + jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache] + dashboardURL string } -func (r *RayDashboardClient) InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache]) { +func (r *RayDashboardClient) InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache], workerPoolChannelContent *cmap.ConcurrentMap[string, struct{}]) { r.client = client r.dashboardURL = dashboardURL r.workerPool = workerPool r.jobInfoMap = jobInfoMap + r.workerPoolChannelContent = workerPoolChannelContent } // UpdateDeployments update the deployments in the Ray cluster. @@ -179,12 +181,12 @@ func (r *RayDashboardClient) GetJobInfo(ctx context.Context, jobId string) (*uti } func (r *RayDashboardClient) AsyncGetJobInfo(ctx context.Context, jobId string) { - if _, ok := r.workerPool.channelContent.Get(jobId); ok { + if _, ok := r.workerPoolChannelContent.Get(jobId); ok { return } - r.workerPool.channelContent.Set(jobId, struct{}{}) + r.workerPoolChannelContent.Set(jobId, struct{}{}) r.workerPool.taskQueue <- func() { - defer r.workerPool.channelContent.Remove(jobId) + defer r.workerPoolChannelContent.Remove(jobId) jobInfo, err := r.GetJobInfo(ctx, jobId) if err != nil { err = fmt.Errorf("failed to get job info: %w", err) diff --git a/ray-operator/controllers/ray/utils/dashboardclient/worker_pool.go b/ray-operator/controllers/ray/utils/dashboardclient/worker_pool.go index 558dd3bf88a..6c9fd2f4f80 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/worker_pool.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/worker_pool.go @@ -2,24 +2,20 @@ package dashboardclient import ( "sync" - - cmap "github.com/orcaman/concurrent-map/v2" ) type WorkerPool struct { - channelContent cmap.ConcurrentMap[string, struct{}] - taskQueue chan func() - stop chan struct{} - wg sync.WaitGroup - workers int + taskQueue chan func() + stop chan struct{} + wg sync.WaitGroup + workers int } func NewWorkerPool(taskQueue chan func()) *WorkerPool { wp := &WorkerPool{ - taskQueue: taskQueue, - workers: 10, - stop: make(chan struct{}), - channelContent: cmap.New[struct{}](), + taskQueue: taskQueue, + workers: 10, + stop: make(chan struct{}), } // Start workers immediately diff --git a/ray-operator/controllers/ray/utils/fake_serve_httpclient.go b/ray-operator/controllers/ray/utils/fake_serve_httpclient.go index 04e9fd03895..005a5eed561 100644 --- a/ray-operator/controllers/ray/utils/fake_serve_httpclient.go +++ b/ray-operator/controllers/ray/utils/fake_serve_httpclient.go @@ -21,7 +21,7 @@ type FakeRayDashboardClient struct { var _ dashboardclient.RayDashboardClientInterface = (*FakeRayDashboardClient)(nil) -func (r *FakeRayDashboardClient) InitClient(_ *http.Client, _ string, _ *dashboardclient.WorkerPool, _ *cmap.ConcurrentMap[string, *utiltypes.RayJobCache]) { +func (r *FakeRayDashboardClient) InitClient(_ *http.Client, _ string, _ *dashboardclient.WorkerPool, _ *cmap.ConcurrentMap[string, *utiltypes.RayJobCache], _ *cmap.ConcurrentMap[string, struct{}]) { } func (r *FakeRayDashboardClient) UpdateDeployments(_ context.Context, _ []byte) error { diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index 31684fdeff0..070c3c9ad1c 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -763,6 +763,7 @@ func FetchHeadServiceURL(ctx context.Context, cli client.Client, rayCluster *ray func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache]) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { taskQueue := make(chan func()) workerPool := dashboardclient.NewWorkerPool(taskQueue) + workerPoolChannelContent := cmap.New[struct{}]() return func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { dashboardClient := &dashboardclient.RayDashboardClient{} if useKubernetesProxy { @@ -783,13 +784,14 @@ func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool, job fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:dashboard/proxy", mgr.GetConfig().Host, rayCluster.Namespace, headSvcName), workerPool, jobInfoMap, + &workerPoolChannelContent, ) return dashboardClient, nil } dashboardClient.InitClient(&http.Client{ Timeout: 2 * time.Second, - }, "http://"+url, workerPool, jobInfoMap) + }, "http://"+url, workerPool, jobInfoMap, &workerPoolChannelContent) return dashboardClient, nil } } diff --git a/ray-operator/rayjob-submitter/cmd/main.go b/ray-operator/rayjob-submitter/cmd/main.go index ae58b62d169..d11cb02f44e 100644 --- a/ray-operator/rayjob-submitter/cmd/main.go +++ b/ray-operator/rayjob-submitter/cmd/main.go @@ -64,7 +64,7 @@ func main() { } rayDashboardClient := &dashboardclient.RayDashboardClient{} address = rayjobsubmitter.JobSubmissionURL(address) - rayDashboardClient.InitClient(&http.Client{Timeout: time.Second * 10}, address, nil, nil) + rayDashboardClient.InitClient(&http.Client{Timeout: time.Second * 10}, address, nil, nil, nil) submissionId, err := rayDashboardClient.SubmitJobReq(context.Background(), &req) if err != nil { if strings.Contains(err.Error(), "Please use a different submission_id") { From 82b3acf9b085bf02ee9ecdd7cb12146a91690fb4 Mon Sep 17 00:00:00 2001 From: "You-Cheng Lin (Owen)" Date: Mon, 13 Oct 2025 13:14:15 +0000 Subject: [PATCH 09/10] make Start worker pool private Signed-off-by: You-Cheng Lin (Owen) --- .../ray/utils/dashboardclient/worker_pool.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/ray-operator/controllers/ray/utils/dashboardclient/worker_pool.go b/ray-operator/controllers/ray/utils/dashboardclient/worker_pool.go index 6c9fd2f4f80..6b0c0bf6f2e 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/worker_pool.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/worker_pool.go @@ -6,7 +6,7 @@ import ( type WorkerPool struct { taskQueue chan func() - stop chan struct{} + stopChan chan struct{} wg sync.WaitGroup workers int } @@ -15,16 +15,16 @@ func NewWorkerPool(taskQueue chan func()) *WorkerPool { wp := &WorkerPool{ taskQueue: taskQueue, workers: 10, - stop: make(chan struct{}), + stopChan: make(chan struct{}), } // Start workers immediately - wp.Start() + wp.start() return wp } // Start launches worker goroutines to consume from queue -func (wp *WorkerPool) Start() { +func (wp *WorkerPool) start() { for i := 0; i < wp.workers; i++ { wp.wg.Add(1) go wp.worker() @@ -37,7 +37,7 @@ func (wp *WorkerPool) worker() { for { select { - case <-wp.stop: + case <-wp.stopChan: return case task := <-wp.taskQueue: if task != nil { @@ -46,9 +46,3 @@ func (wp *WorkerPool) worker() { } } } - -// Stop shuts down all workers -func (wp *WorkerPool) Stop() { - close(wp.stop) - wp.wg.Wait() -} From 69be5fbc2b25865bb1289b43c62ab6c8ae83cd32 Mon Sep 17 00:00:00 2001 From: "You-Cheng Lin (Owen)" Date: Thu, 16 Oct 2025 09:17:59 +0000 Subject: [PATCH 10/10] update Signed-off-by: You-Cheng Lin (Owen) --- .../controllers/ray/rayjob_controller.go | 23 +++++++++++-------- .../test/e2erayjob/rayjob_retry_test.go | 2 +- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index 33ddbfe0e8e..6e4b12977cc 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -276,15 +276,23 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err } var jobInfo *utiltypes.RayJobInfo - if loadedJobCache := rayDashboardClient.GetJobInfoFromCache(rayJobInstance.Status.JobId); loadedJobCache != nil { - if loadedJobCache.Err != nil { + jobCache := rayDashboardClient.GetJobInfoFromCache(rayJobInstance.Status.JobId) + if jobCache != nil { + if jobCache.Err != nil { + if errors.IsBadRequest(jobCache.Err) && isSubmitterFinished { + rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed + rayJobInstance.Status.Reason = rayv1.AppFailed + rayJobInstance.Status.Message = "Submitter completed but Ray job not found in RayCluster." + break + } + logger.Error(jobCache.Err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId, "Error", jobCache.Err) rayDashboardClient.AsyncGetJobInfo(ctx, rayJobInstance.Status.JobId) - logger.Error(loadedJobCache.Err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId, "Error", loadedJobCache.Err) - return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, loadedJobCache.Err + return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, jobCache.Err } - jobInfo = loadedJobCache.JobInfo + jobInfo = jobCache.JobInfo } else { - // If the Ray job was not found, GetJobInfo returns a BadRequest error. + // Cache miss: try a direct fetch to disambiguate not-found vs. transient + jobInfo, err = rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId) if errors.IsBadRequest(err) { if rayJobInstance.Spec.SubmissionMode == rayv1.HTTPMode { logger.Info("The Ray job was not found. Submit a Ray job via an HTTP request.", "JobId", rayJobInstance.Status.JobId) @@ -301,9 +309,6 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) break } } - logger.Info("Job info not found in map", "JobId", rayJobInstance.Status.JobId) - rayDashboardClient.AsyncGetJobInfo(ctx, rayJobInstance.Status.JobId) - return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err } rayDashboardClient.AsyncGetJobInfo(ctx, rayJobInstance.Status.JobId) diff --git a/ray-operator/test/e2erayjob/rayjob_retry_test.go b/ray-operator/test/e2erayjob/rayjob_retry_test.go index 3cbb70ab48a..ce1e565a23c 100644 --- a/ray-operator/test/e2erayjob/rayjob_retry_test.go +++ b/ray-operator/test/e2erayjob/rayjob_retry_test.go @@ -107,7 +107,7 @@ func TestRayJobRetry(t *testing.T) { LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name) // Ensure JobDeploymentStatus transit to Failed - g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium). + g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutLong). Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed))) // Ensure JobStatus is empty g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).