Skip to content

Commit 53e5b2d

Browse files
authored
feat: agent jobs panel (#7390)
* feat(agent): agent jobs Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Multiple webhooks, simplify Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Do not use cron with seconds Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Create separate pages for details Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Detect if no models have MCP configuration, show wizard Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * Make services test to run Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
1 parent 4b5977f commit 53e5b2d

25 files changed

+4308
-19
lines changed

core/application/agent_jobs.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package application
2+
3+
import (
4+
"time"
5+
6+
"github.com/mudler/LocalAI/core/services"
7+
"github.com/rs/zerolog/log"
8+
)
9+
10+
// RestartAgentJobService restarts the agent job service with current ApplicationConfig settings
11+
func (a *Application) RestartAgentJobService() error {
12+
a.agentJobMutex.Lock()
13+
defer a.agentJobMutex.Unlock()
14+
15+
// Stop existing service if running
16+
if a.agentJobService != nil {
17+
if err := a.agentJobService.Stop(); err != nil {
18+
log.Warn().Err(err).Msg("Error stopping agent job service")
19+
}
20+
// Wait a bit for shutdown to complete
21+
time.Sleep(200 * time.Millisecond)
22+
}
23+
24+
// Create new service instance
25+
agentJobService := services.NewAgentJobService(
26+
a.ApplicationConfig(),
27+
a.ModelLoader(),
28+
a.ModelConfigLoader(),
29+
a.TemplatesEvaluator(),
30+
)
31+
32+
// Start the service
33+
err := agentJobService.Start(a.ApplicationConfig().Context)
34+
if err != nil {
35+
log.Error().Err(err).Msg("Failed to start agent job service")
36+
return err
37+
}
38+
39+
a.agentJobService = agentJobService
40+
log.Info().Msg("Agent job service restarted")
41+
return nil
42+
}
43+

core/application/application.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ type Application struct {
1717
startupConfig *config.ApplicationConfig // Stores original config from env vars (before file loading)
1818
templatesEvaluator *templates.Evaluator
1919
galleryService *services.GalleryService
20+
agentJobService *services.AgentJobService
2021
watchdogMutex sync.Mutex
2122
watchdogStop chan bool
2223
p2pMutex sync.Mutex
2324
p2pCtx context.Context
2425
p2pCancel context.CancelFunc
26+
agentJobMutex sync.Mutex
2527
}
2628

2729
func newApplication(appConfig *config.ApplicationConfig) *Application {
@@ -53,6 +55,10 @@ func (a *Application) GalleryService() *services.GalleryService {
5355
return a.galleryService
5456
}
5557

58+
func (a *Application) AgentJobService() *services.AgentJobService {
59+
return a.agentJobService
60+
}
61+
5662
// StartupConfig returns the original startup configuration (from env vars, before file loading)
5763
func (a *Application) StartupConfig() *config.ApplicationConfig {
5864
return a.startupConfig
@@ -67,5 +73,20 @@ func (a *Application) start() error {
6773

6874
a.galleryService = galleryService
6975

76+
// Initialize agent job service
77+
agentJobService := services.NewAgentJobService(
78+
a.ApplicationConfig(),
79+
a.ModelLoader(),
80+
a.ModelConfigLoader(),
81+
a.TemplatesEvaluator(),
82+
)
83+
84+
err = agentJobService.Start(a.ApplicationConfig().Context)
85+
if err != nil {
86+
return err
87+
}
88+
89+
a.agentJobService = agentJobService
90+
7091
return nil
7192
}

core/application/config_file_watcher.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ func newConfigFileHandler(appConfig *config.ApplicationConfig) configFileHandler
4343
if err != nil {
4444
log.Error().Err(err).Str("file", "runtime_settings.json").Msg("unable to register config file handler")
4545
}
46+
// Note: agent_tasks.json and agent_jobs.json are handled by AgentJobService directly
47+
// The service watches and reloads these files internally
4648
return c
4749
}
4850

@@ -206,6 +208,7 @@ type runtimeSettings struct {
206208
AutoloadGalleries *bool `json:"autoload_galleries,omitempty"`
207209
AutoloadBackendGalleries *bool `json:"autoload_backend_galleries,omitempty"`
208210
ApiKeys *[]string `json:"api_keys,omitempty"`
211+
AgentJobRetentionDays *int `json:"agent_job_retention_days,omitempty"`
209212
}
210213

211214
func readRuntimeSettingsJson(startupAppConfig config.ApplicationConfig) fileHandler {
@@ -234,6 +237,7 @@ func readRuntimeSettingsJson(startupAppConfig config.ApplicationConfig) fileHand
234237
envFederated := appConfig.Federated == startupAppConfig.Federated
235238
envAutoloadGalleries := appConfig.AutoloadGalleries == startupAppConfig.AutoloadGalleries
236239
envAutoloadBackendGalleries := appConfig.AutoloadBackendGalleries == startupAppConfig.AutoloadBackendGalleries
240+
envAgentJobRetentionDays := appConfig.AgentJobRetentionDays == startupAppConfig.AgentJobRetentionDays
237241

238242
if len(fileContent) > 0 {
239243
var settings runtimeSettings
@@ -328,6 +332,9 @@ func readRuntimeSettingsJson(startupAppConfig config.ApplicationConfig) fileHand
328332
// Replace all runtime keys with what's in runtime_settings.json
329333
appConfig.ApiKeys = append(envKeys, runtimeKeys...)
330334
}
335+
if settings.AgentJobRetentionDays != nil && !envAgentJobRetentionDays {
336+
appConfig.AgentJobRetentionDays = *settings.AgentJobRetentionDays
337+
}
331338

332339
// If watchdog is enabled via file but not via env, ensure WatchDog flag is set
333340
if !envWatchdogIdle && !envWatchdogBusy {

core/application/startup.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ func loadRuntimeSettingsFromFile(options *config.ApplicationConfig) {
226226
WatchdogBusyTimeout *string `json:"watchdog_busy_timeout,omitempty"`
227227
SingleBackend *bool `json:"single_backend,omitempty"`
228228
ParallelBackendRequests *bool `json:"parallel_backend_requests,omitempty"`
229+
AgentJobRetentionDays *int `json:"agent_job_retention_days,omitempty"`
229230
}
230231

231232
if err := json.Unmarshal(fileContent, &settings); err != nil {
@@ -289,6 +290,12 @@ func loadRuntimeSettingsFromFile(options *config.ApplicationConfig) {
289290
options.ParallelBackendRequests = *settings.ParallelBackendRequests
290291
}
291292
}
293+
if settings.AgentJobRetentionDays != nil {
294+
// Only apply if current value is default (0), suggesting it wasn't set from env var
295+
if options.AgentJobRetentionDays == 0 {
296+
options.AgentJobRetentionDays = *settings.AgentJobRetentionDays
297+
}
298+
}
292299
if !options.WatchDogIdle && !options.WatchDogBusy {
293300
if settings.WatchdogEnabled != nil && *settings.WatchdogEnabled {
294301
options.WatchDog = true

core/cli/run.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ type RunCMD struct {
7575
DisableGalleryEndpoint bool `env:"LOCALAI_DISABLE_GALLERY_ENDPOINT,DISABLE_GALLERY_ENDPOINT" help:"Disable the gallery endpoints" group:"api"`
7676
MachineTag string `env:"LOCALAI_MACHINE_TAG,MACHINE_TAG" help:"Add Machine-Tag header to each response which is useful to track the machine in the P2P network" group:"api"`
7777
LoadToMemory []string `env:"LOCALAI_LOAD_TO_MEMORY,LOAD_TO_MEMORY" help:"A list of models to load into memory at startup" group:"models"`
78+
AgentJobRetentionDays int `env:"LOCALAI_AGENT_JOB_RETENTION_DAYS,AGENT_JOB_RETENTION_DAYS" default:"30" help:"Number of days to keep agent job history (default: 30)" group:"api"`
7879

7980
Version bool
8081
}
@@ -129,6 +130,7 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
129130
config.WithLoadToMemory(r.LoadToMemory),
130131
config.WithMachineTag(r.MachineTag),
131132
config.WithAPIAddress(r.Address),
133+
config.WithAgentJobRetentionDays(r.AgentJobRetentionDays),
132134
config.WithTunnelCallback(func(tunnels []string) {
133135
tunnelEnvVar := strings.Join(tunnels, ",")
134136
// TODO: this is very specific to llama.cpp, we should have a more generic way to set the environment variable

core/config/application_config.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,18 @@ type ApplicationConfig struct {
7070
TunnelCallback func(tunnels []string)
7171

7272
DisableRuntimeSettings bool
73+
74+
AgentJobRetentionDays int // Default: 30 days
7375
}
7476

7577
type AppOption func(*ApplicationConfig)
7678

7779
func NewApplicationConfig(o ...AppOption) *ApplicationConfig {
7880
opt := &ApplicationConfig{
79-
Context: context.Background(),
80-
UploadLimitMB: 15,
81-
Debug: true,
81+
Context: context.Background(),
82+
UploadLimitMB: 15,
83+
Debug: true,
84+
AgentJobRetentionDays: 30, // Default: 30 days
8285
}
8386
for _, oo := range o {
8487
oo(opt)
@@ -333,6 +336,12 @@ func WithApiKeys(apiKeys []string) AppOption {
333336
}
334337
}
335338

339+
func WithAgentJobRetentionDays(days int) AppOption {
340+
return func(o *ApplicationConfig) {
341+
o.AgentJobRetentionDays = days
342+
}
343+
}
344+
336345
func WithEnforcedPredownloadScans(enforced bool) AppOption {
337346
return func(o *ApplicationConfig) {
338347
o.EnforcePredownloadScans = enforced

core/http/app.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ func API(application *application.Application) (*echo.Echo, error) {
205205
opcache = services.NewOpCache(application.GalleryService())
206206
}
207207

208-
routes.RegisterLocalAIRoutes(e, requestExtractor, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), application.GalleryService(), opcache, application.TemplatesEvaluator())
208+
routes.RegisterLocalAIRoutes(e, requestExtractor, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), application.GalleryService(), opcache, application.TemplatesEvaluator(), application)
209209
routes.RegisterOpenAIRoutes(e, requestExtractor, application)
210210
if !application.ApplicationConfig().DisableWebUI {
211211
routes.RegisterUIAPIRoutes(e, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), application.GalleryService(), opcache, application)

core/http/app_test.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,41 @@ func postRequestResponseJSON[B1 any, B2 any](url string, reqJson *B1, respJson *
210210
return json.Unmarshal(body, respJson)
211211
}
212212

213+
func putRequestJSON[B any](url string, bodyJson *B) error {
214+
payload, err := json.Marshal(bodyJson)
215+
if err != nil {
216+
return err
217+
}
218+
219+
GinkgoWriter.Printf("PUT %s: %s\n", url, string(payload))
220+
221+
req, err := http.NewRequest("PUT", url, bytes.NewBuffer(payload))
222+
if err != nil {
223+
return err
224+
}
225+
226+
req.Header.Set("Content-Type", "application/json")
227+
req.Header.Set("Authorization", bearerKey)
228+
229+
client := &http.Client{}
230+
resp, err := client.Do(req)
231+
if err != nil {
232+
return err
233+
}
234+
defer resp.Body.Close()
235+
236+
body, err := io.ReadAll(resp.Body)
237+
if err != nil {
238+
return err
239+
}
240+
241+
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
242+
return fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(body))
243+
}
244+
245+
return nil
246+
}
247+
213248
func postInvalidRequest(url string) (error, int) {
214249

215250
req, err := http.NewRequest("POST", url, bytes.NewBufferString("invalid request"))
@@ -1194,6 +1229,138 @@ parameters:
11941229
Expect(findRespBody.Similarities[i]).To(BeNumerically("<=", 1))
11951230
}
11961231
})
1232+
1233+
Context("Agent Jobs", Label("agent-jobs"), func() {
1234+
It("creates and manages tasks", func() {
1235+
// Create a task
1236+
taskBody := map[string]interface{}{
1237+
"name": "Test Task",
1238+
"description": "Test Description",
1239+
"model": "testmodel.ggml",
1240+
"prompt": "Hello {{.name}}",
1241+
"enabled": true,
1242+
}
1243+
1244+
var createResp map[string]interface{}
1245+
err := postRequestResponseJSON("http://127.0.0.1:9090/api/agent/tasks", &taskBody, &createResp)
1246+
Expect(err).ToNot(HaveOccurred())
1247+
Expect(createResp["id"]).ToNot(BeEmpty())
1248+
taskID := createResp["id"].(string)
1249+
1250+
// Get the task
1251+
var task schema.Task
1252+
resp, err := http.Get("http://127.0.0.1:9090/api/agent/tasks/" + taskID)
1253+
Expect(err).ToNot(HaveOccurred())
1254+
Expect(resp.StatusCode).To(Equal(200))
1255+
body, _ := io.ReadAll(resp.Body)
1256+
json.Unmarshal(body, &task)
1257+
Expect(task.Name).To(Equal("Test Task"))
1258+
1259+
// List tasks
1260+
resp, err = http.Get("http://127.0.0.1:9090/api/agent/tasks")
1261+
Expect(err).ToNot(HaveOccurred())
1262+
Expect(resp.StatusCode).To(Equal(200))
1263+
var tasks []schema.Task
1264+
body, _ = io.ReadAll(resp.Body)
1265+
json.Unmarshal(body, &tasks)
1266+
Expect(len(tasks)).To(BeNumerically(">=", 1))
1267+
1268+
// Update task
1269+
taskBody["name"] = "Updated Task"
1270+
err = putRequestJSON("http://127.0.0.1:9090/api/agent/tasks/"+taskID, &taskBody)
1271+
Expect(err).ToNot(HaveOccurred())
1272+
1273+
// Verify update
1274+
resp, err = http.Get("http://127.0.0.1:9090/api/agent/tasks/" + taskID)
1275+
Expect(err).ToNot(HaveOccurred())
1276+
body, _ = io.ReadAll(resp.Body)
1277+
json.Unmarshal(body, &task)
1278+
Expect(task.Name).To(Equal("Updated Task"))
1279+
1280+
// Delete task
1281+
req, _ := http.NewRequest("DELETE", "http://127.0.0.1:9090/api/agent/tasks/"+taskID, nil)
1282+
req.Header.Set("Authorization", bearerKey)
1283+
resp, err = http.DefaultClient.Do(req)
1284+
Expect(err).ToNot(HaveOccurred())
1285+
Expect(resp.StatusCode).To(Equal(200))
1286+
})
1287+
1288+
It("executes and monitors jobs", func() {
1289+
// Create a task first
1290+
taskBody := map[string]interface{}{
1291+
"name": "Job Test Task",
1292+
"model": "testmodel.ggml",
1293+
"prompt": "Say hello",
1294+
"enabled": true,
1295+
}
1296+
1297+
var createResp map[string]interface{}
1298+
err := postRequestResponseJSON("http://127.0.0.1:9090/api/agent/tasks", &taskBody, &createResp)
1299+
Expect(err).ToNot(HaveOccurred())
1300+
taskID := createResp["id"].(string)
1301+
1302+
// Execute a job
1303+
jobBody := map[string]interface{}{
1304+
"task_id": taskID,
1305+
"parameters": map[string]string{},
1306+
}
1307+
1308+
var jobResp schema.JobExecutionResponse
1309+
err = postRequestResponseJSON("http://127.0.0.1:9090/api/agent/jobs/execute", &jobBody, &jobResp)
1310+
Expect(err).ToNot(HaveOccurred())
1311+
Expect(jobResp.JobID).ToNot(BeEmpty())
1312+
jobID := jobResp.JobID
1313+
1314+
// Get job status
1315+
var job schema.Job
1316+
resp, err := http.Get("http://127.0.0.1:9090/api/agent/jobs/" + jobID)
1317+
Expect(err).ToNot(HaveOccurred())
1318+
Expect(resp.StatusCode).To(Equal(200))
1319+
body, _ := io.ReadAll(resp.Body)
1320+
json.Unmarshal(body, &job)
1321+
Expect(job.ID).To(Equal(jobID))
1322+
Expect(job.TaskID).To(Equal(taskID))
1323+
1324+
// List jobs
1325+
resp, err = http.Get("http://127.0.0.1:9090/api/agent/jobs")
1326+
Expect(err).ToNot(HaveOccurred())
1327+
Expect(resp.StatusCode).To(Equal(200))
1328+
var jobs []schema.Job
1329+
body, _ = io.ReadAll(resp.Body)
1330+
json.Unmarshal(body, &jobs)
1331+
Expect(len(jobs)).To(BeNumerically(">=", 1))
1332+
1333+
// Cancel job (if still pending/running)
1334+
if job.Status == schema.JobStatusPending || job.Status == schema.JobStatusRunning {
1335+
req, _ := http.NewRequest("POST", "http://127.0.0.1:9090/api/agent/jobs/"+jobID+"/cancel", nil)
1336+
req.Header.Set("Authorization", bearerKey)
1337+
resp, err = http.DefaultClient.Do(req)
1338+
Expect(err).ToNot(HaveOccurred())
1339+
Expect(resp.StatusCode).To(Equal(200))
1340+
}
1341+
})
1342+
1343+
It("executes task by name", func() {
1344+
// Create a task with a specific name
1345+
taskBody := map[string]interface{}{
1346+
"name": "Named Task",
1347+
"model": "testmodel.ggml",
1348+
"prompt": "Hello",
1349+
"enabled": true,
1350+
}
1351+
1352+
var createResp map[string]interface{}
1353+
err := postRequestResponseJSON("http://127.0.0.1:9090/api/agent/tasks", &taskBody, &createResp)
1354+
Expect(err).ToNot(HaveOccurred())
1355+
1356+
// Execute by name
1357+
paramsBody := map[string]string{"param1": "value1"}
1358+
var jobResp schema.JobExecutionResponse
1359+
err = postRequestResponseJSON("http://127.0.0.1:9090/api/agent/tasks/Named Task/execute", &paramsBody, &jobResp)
1360+
Expect(err).ToNot(HaveOccurred())
1361+
Expect(jobResp.JobID).ToNot(BeEmpty())
1362+
})
1363+
})
11971364
})
11981365
})
11991366

0 commit comments

Comments
 (0)