Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 220 additions & 0 deletions apps/batching.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
---
title: "Batching"
description: "Execute multiple invocations in parallel with concurrency control"
---

Batch invocations allow you to execute the same action multiple times with different payloads in parallel. This is useful for processing large datasets, running bulk operations, or executing multiple tasks concurrently.

## Creating a batch job

Create a batch job by providing an array of payloads to execute:

<CodeGroup>
```typescript Typescript/Javascript
import Kernel from '@onkernel/sdk';

const kernel = new Kernel();

const batchJob = await kernel.invocations.batch.create({
app_name: 'my-app',
action_name: 'analyze',
version: '1.0.0',
payloads: [
JSON.stringify({ url: "https://example.com/page1" }),
JSON.stringify({ url: "https://example.com/page2" }),
JSON.stringify({ url: "https://example.com/page3" }),
],
max_concurrency: 5, // Optional: limit concurrent executions
});

console.log(batchJob.batch_job_id);
console.log(batchJob.total_count);
```

```python Python
from kernel import Kernel
import json

kernel = Kernel()

batch_job = kernel.invocations.batch.create(
app_name="my-app",
action_name="analyze",
version="1.0.0",
payloads=[
json.dumps({"url": "https://example.com/page1"}),
json.dumps({"url": "https://example.com/page2"}),
json.dumps({"url": "https://example.com/page3"}),
],
max_concurrency=5, # Optional: limit concurrent executions
)

print(batch_job.batch_job_id)
print(batch_job.total_count)
```
</CodeGroup>

### Parameters

- `app_name` (required): The name of your deployed app.
- `action_name` (required): The action to invoke.
- `version` (optional): The app version to use. Defaults to `"latest"`.
- `payloads` (required): Array of JSON strings, each representing a payload for one invocation.
- `max_concurrency` (optional): Maximum number of concurrent invocations. If not specified, defaults to your organization's max_concurrent_invocations limit (5 for Developer, 50 for Start-Up, 1000 for Enterprise).

<Info>
Each payload must be a valid JSON string and follows the same 64 KB size limit as regular invocations.
</Info>

## Listing batch jobs

Retrieve all batch jobs for your organization with optional filtering:

<CodeGroup>
```typescript Typescript/Javascript
const batchJobs = await kernel.batchJobs.list({
app_name: 'my-app',
status: 'running',
limit: 50,
offset: 0,
});

for (const job of batchJobs) {
console.log(`${job.id}: ${job.succeeded_count}/${job.total_count} succeeded`);
}
```

```python Python
batch_jobs = kernel.batch_jobs.list(
app_name="my-app",
status="running",
limit=50,
offset=0,
)

for job in batch_jobs:
print(f"{job.id}: {job.succeeded_count}/{job.total_count} succeeded")
```
</CodeGroup>

### Filter parameters

- `app_name` (optional): Filter by app name.
- `action_name` (optional): Filter by action name.
- `status` (optional): Filter by status (`queued`, `running`, `succeeded`, `failed`, `partially_failed`).
- `limit` (optional): Number of results to return (default: 50).
- `offset` (optional): Number of results to skip for pagination.

## Getting batch job details

Retrieve detailed information about a specific batch job, including all individual invocations:

<CodeGroup>
```typescript Typescript/Javascript
const batchJob = await kernel.batchJobs.retrieve('batch_job_id');

console.log(`Status: ${batchJob.status}`);
console.log(`Progress: ${batchJob.succeeded_count}/${batchJob.total_count}`);

// Access individual invocations
for (const invocation of batchJob.invocations) {
console.log(`${invocation.id}: ${invocation.status}`);
}
```

```python Python
batch_job = kernel.batch_jobs.retrieve("batch_job_id")

print(f"Status: {batch_job.status}")
print(f"Progress: {batch_job.succeeded_count}/{batch_job.total_count}")

# Access individual invocations
for invocation in batch_job.invocations:
print(f"{invocation.id}: {invocation.status}")
```
</CodeGroup>

## Streaming batch job progress

Monitor batch job progress in real-time using Server-Sent Events (SSE):

<CodeGroup>
```typescript Typescript/Javascript
const stream = await kernel.batchJobs.events.retrieve('batch_job_id');

for await (const event of stream) {
if (event.event === 'batch_progress') {
console.log(`Progress: ${event.succeeded_count}/${event.total_count}`);
} else if (event.event === 'batch_state') {
console.log(`Status changed to: ${event.batch_job.status}`);
}
}
```

```python Python
stream = kernel.batch_jobs.events.retrieve("batch_job_id")

for event in stream:
if event.event == "batch_progress":
print(f"Progress: {event.succeeded_count}/{event.total_count}")
elif event.event == "batch_state":
print(f"Status changed to: {event.batch_job.status}")
```
</CodeGroup>

### Event types

- `batch_state`: Sent when the batch job status changes (initial state and terminal states).
- `batch_progress`: Sent when invocation counts change (succeeded or failed count updates).

## Batch job statuses

Batch jobs progress through the following statuses:

- `queued`: Batch job created, waiting to start.
- `running`: Invocations are being executed.
- `succeeded`: All invocations completed successfully.
- `failed`: Batch job encountered a critical error.
- `partially_failed`: Some invocations succeeded, others failed.

## Best practices

### Concurrency control

Use `max_concurrency` to control resource usage and avoid overwhelming downstream services:

```typescript
const batchJob = await kernel.invocations.batch.create({
app_name: 'my-app',
action_name: 'scrape',
payloads: urls.map(url => JSON.stringify({ url })),
max_concurrency: 10, // Process 10 URLs at a time
});
```

### Error handling

Individual invocation failures don't stop the batch job. Check the final status and review failed invocations:

```typescript
const batchJob = await kernel.batchJobs.retrieve('batch_job_id');

if (batchJob.status === 'partially_failed' || batchJob.status === 'failed') {
const failedInvocations = batchJob.invocations.filter(
inv => inv.status === 'failed'
);
console.log(`${failedInvocations.length} invocations failed`);
}
```

### Payload size limits

Each payload is limited to 64 KB. For larger inputs, store data externally and pass references:

```typescript
// Instead of large payloads
const payloads = largeDatasets.map(data => JSON.stringify(data)); // ❌ May exceed 64 KB

// Use references
const payloads = dataUrls.map(url => JSON.stringify({ data_url: url })); // ✅ Small payload
```
1 change: 1 addition & 0 deletions docs.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
"apps/develop",
"apps/deploy",
"apps/invoke",
"apps/batching",
"apps/stop",
"apps/secrets",
"apps/status",
Expand Down