Batches API
파이프라인 배치 실행을 관리하는 API입니다.
개요
배치(Batch)는 파이프라인의 단일 실행 인스턴스를 나타냅니다. 배치를 생성하면 Prefect 워크플로우 엔진에서 파이프라인이 실행되며, 각 스텝의 진행 상황을 모니터링할 수 있습니다.
엔드포인트
POST /api/v1/batches/{pipeline_id}
파이프라인 배치를 실행합니다.
Path Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
pipeline_id | string | Yes | 파이프라인 ID |
Query Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
pipeline_version | string | No | 특정 버전 실행 (기본: 최신) |
Response
200 OK
{
"item": "batch-xyz789"
}
| Field | Type | Description |
|---|---|---|
item | string | 생성된 배치 ID |
GET /api/v1/batches/{pipeline_id}
파이프라인의 최신 배치 상태를 조회합니다.
Path Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
pipeline_id | string | Yes | 파이프라인 ID |
Response
200 OK
{
"id": "batch-xyz789",
"name": "sales-pipeline",
"deployment": "run-pipeline/sales-pipeline",
"state": {
"state": "running",
"steps": {
"extract": {
"state": "success",
"offsets": {"partition_0": 1000}
},
"transform": {
"state": "running",
"offsets": {"partition_0": 500}
},
"load": {
"state": "pending",
"offsets": {}
}
}
}
}
Batch State Values
| State | Description |
|---|---|
stop | 중지됨 |
running | 실행 중 |
pending | 대기 중 |
scheduled | 스케줄됨 |
Step State Values
| State | Description |
|---|---|
none | 시작 전 |
pending | 대기 중 |
running | 실행 중 |
success | 성공 |
failed | 실패 |
GET /api/v1/batches/{pipeline_id}/{batch_id}
특정 배치의 상태를 조회합니다.
Path Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
pipeline_id | string | Yes | 파이프라인 ID |
batch_id | string | Yes | 배치 ID |
Response
GET /api/v1/batches/{pipeline_id}와 동일한 형식입니다.
GET /api/v1/batches/{pipeline_id}/
파이프라인의 모든 배치 목록을 조회합니다.
Path Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
pipeline_id | string | Yes | 파이프라인 ID |
Query Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
limit | integer | 100 | 최대 조회 개수 |
token | string | - | 페이지네이션 토큰 |
Response
200 OK
{
"items": {
"batch-xyz789": "{...batch json...}",
"batch-abc123": "{...batch json...}"
},
"token": "next-page-token"
}
PUT /api/v1/batches/{pipeline_id}/{batch_id}/stop
실행 중인 배치를 중지합니다.
Path Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
pipeline_id | string | Yes | 파이프라인 ID |
batch_id | string | Yes | 배치 ID |
Response
200 OK
{
"message": "Batch stopped successfully"
}
DELETE /api/v1/batches/{pipeline_id}/{batch_id}
배치를 삭제합니다. 실행 중인 배치는 먼저 중지됩니다.
Path Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
pipeline_id | string | Yes | 파이프라인 ID |
batch_id | string | Yes | 배치 ID |
Response
200 OK
{
"message": "Batch deleted successfully"
}
사용 예시
cURL
# 배치 실행
curl -X POST https://api.dhub.io/api/v1/batches/pipeline-abc123 \
-H "Authorization: Bearer <access_token>"
# 최신 배치 상태 조회
curl https://api.dhub.io/api/v1/batches/pipeline-abc123 \
-H "Authorization: Bearer <access_token>"
# 배치 중지
curl -X PUT https://api.dhub.io/api/v1/batches/pipeline-abc123/batch-xyz789/stop \
-H "Authorization: Bearer <access_token>"
# 배치 삭제
curl -X DELETE https://api.dhub.io/api/v1/batches/pipeline-abc123/batch-xyz789 \
-H "Authorization: Bearer <access_token>"
배치 상태 폴링
async function pollBatchStatus(pipelineId, batchId, interval = 5000) {
while (true) {
const response = await fetch(`/api/v1/batches/${pipelineId}/${batchId}`, {
headers: { 'Authorization': `Bearer ${accessToken}` }
});
const batch = await response.json();
console.log(`Batch state: ${batch.state.state}`);
// Check step states
for (const [stepName, stepState] of Object.entries(batch.state.steps)) {
console.log(` ${stepName}: ${stepState.state}`);
}
// Check if completed
const allDone = Object.values(batch.state.steps)
.every(s => s.state === 'success' || s.state === 'failed');
if (allDone || batch.state.state === 'stop') {
console.log('Batch completed');
break;
}
await new Promise(resolve => setTimeout(resolve, interval));
}
}