Python 가이드
D.Hub 파이프라인의 Python 코드 노드를 사용하면 데이터를 변환·분석·가공할 수 있습니다. Polars를 기본 데이터 처리 라이브러리로 사용하며, 과학 계산·지리 공간 패키지도 활용할 수 있습니다.
실행 계약
Python 코드 노드는 run 함수를 진입점으로 사용합니다. 입력은 Polars DataFrame, 반환값은 "output" 키를 가진 딕셔너리입니다.
import polars as pl
def run(input, options):
# input : Polars DataFrame (입력 데이터)
# options: dict (사용자 정의 옵션)
output = input.with_columns(
(pl.col("value") * 2).alias("doubled_value")
)
return {"output": output}
입력 (input)
- 단일 입력은 첫 번째 매개변수
input(Polars DataFrame)으로 전달됩니다. - 입력이 여러 개인 노드는 각 입력 연결의 별칭(alias)을 매개변수 이름으로 받습니다(아래 여러 입력 받기 참고).
옵션 (options)
- 파이프라인 설정에서 정의한 런타임 변수가
options딕셔너리로 전달됩니다. - 실행마다 달라지는 날짜·필터 조건 등을 옵션으로 전달하면 코드를 재사용할 수 있습니다.
출력 (output)
- 반드시
"output"키를 포함하는 딕셔너리를 반환해야 하며, 값은 Polars DataFrame입니다.
반환 형식 주의
return output처럼 DataFrame을 직접 반환하면 오류가 발생합니다. 반드시 {"output": output} 형식으로 반환하세요.
사용 가능한 패키지
파이프라인 Python 환경에서 주로 사용하는 데이터 처리·공간 분석 라이브러리입니다.
| 패키지 | 용도 |
|---|---|
polars | 고성능 DataFrame 처리 (기본 라이브러리) |
pandas | 범용 DataFrame 처리 |
pyarrow | Apache Arrow 컬럼형 데이터 처리 |
numpy | 수치 계산 |
geopandas | 지리 공간 데이터 처리 |
h3 | H3 육각형 공간 인덱싱 |
pyproj | 좌표계 변환 |
rdflib | RDF/온톨로지 데이터 처리 |
추가 패키지
기본 환경에 없는 패키지가 필요하면 파이프라인/코드 설정의 패키지 항목이나 실행 환경을 조정하세요. 자세한 정책은 관리자에게 문의하세요.
코드 예시
데이터 필터링
import polars as pl
def run(input, options):
return {"output": input.filter(
(pl.col("status") == "active") & (pl.col("age") >= 18)
)}
컬럼 변환 및 추가
import polars as pl
def run(input, options):
output = input.with_columns([
(pl.col("price") * pl.col("quantity")).alias("total"),
pl.col("date").str.to_date("%Y-%m-%d").alias("parsed_date"),
pl.col("category").str.to_uppercase().alias("category_upper"),
])
return {"output": output}
그룹별 집계
import polars as pl
def run(input, options):
output = input.group_by("region").agg([
pl.col("amount").sum().alias("total_amount"),
pl.col("amount").mean().alias("avg_amount"),
pl.col("order_id").n_unique().alias("order_count"),
])
return {"output": output}
여러 입력 받기
입력 연결이 여러 개인 노드는 각 입력의 별칭을 매개변수 이름으로 받습니다(마지막 매개변수는 항상 options). 예를 들어 입력 별칭이 orders, customers인 노드는 다음과 같이 작성합니다.
import polars as pl
def run(orders, customers, options):
output = orders.join(customers, on="customer_id", how="left")
return {"output": output}
옵션 활용
import polars as pl
def run(input, options):
threshold = float(options.get("threshold", 100))
target_date = options.get("target_date", "2026-01-01")
output = input.filter(
(pl.col("value") > threshold) & (pl.col("measured_at") >= target_date)
)
return {"output": output}
GIS 데이터 처리
H3 인덱싱
위경도 좌표를 H3 육각형 인덱스로 변환해 공간 집계를 수행할 수 있습니다.
import polars as pl
import h3
def run(input, options):
resolution = int(options.get("h3_resolution", 7))
rows = input.to_dicts()
for row in rows:
row["h3_index"] = h3.latlng_to_cell(
row["latitude"], row["longitude"], resolution
)
output = pl.DataFrame(rows).group_by("h3_index").agg([
pl.col("value").mean().alias("avg_value"),
pl.len().alias("point_count"),
])
return {"output": output}
좌표계 변환
import polars as pl
from pyproj import Transformer
def run(input, options):
transformer = Transformer.from_crs("EPSG:5179", "EPSG:4326", always_xy=True)
rows = input.to_dicts()
for row in rows:
lon, lat = transformer.transform(row["x"], row["y"])
row["longitude"], row["latitude"] = lon, lat
return {"output": pl.DataFrame(rows)}
Pandas 호환
기존 Pandas 코드가 있다면 Polars와 상호 변환해 사용할 수 있습니다.
import polars as pl
def run(input, options):
pdf = input.to_pandas()
pdf["new_col"] = pdf["col_a"].apply(lambda x: x.strip().lower())
return {"output": pl.from_pandas(pdf)}
성능 고려
Pandas 변환은 메모리 복사가 발생합니다. 대용량 데이터에서는 Polars 네이티브 API를 직접 사용하는 것을 권장합니다.
디버깅 팁
print()출력은 파이프라인 실행 로그에서 확인할 수 있습니다.- 중간 결과는
print(input.head(5)), 스키마는print(input.schema)로 확인하세요. - 에러가 발생하면 트레이스백이 실행 로그에 표시됩니다.
관련 문서
- SQL 가이드 — SQL 코드 노드(
input테이블) - 파이프라인 노드 추가 — 코드 노드 연결과 입출력