Skip to main content
Version: v0.3.0

Python 가이드

D.Hub 파이프라인의 Python 코드 노드를 사용하면 데이터를 변환·분석·가공할 수 있습니다. Polars를 기본 데이터 처리 라이브러리로 사용하며, 과학 계산·지리 공간 패키지도 활용할 수 있습니다.

실행 계약

Python 코드 노드는 run 함수를 진입점으로 사용합니다. 입력은 Polars DataFrame, 반환값은 "output" 키를 가진 딕셔너리입니다.

import polars as pl

def run(input, options):
# input : Polars DataFrame (입력 데이터)
# options: dict (사용자 정의 옵션)
output = input.with_columns(
(pl.col("value") * 2).alias("doubled_value")
)
return {"output": output}

입력 (input)

  • 단일 입력은 첫 번째 매개변수 input(Polars DataFrame)으로 전달됩니다.
  • 입력이 여러 개인 노드는 각 입력 연결의 별칭(alias)을 매개변수 이름으로 받습니다(아래 여러 입력 받기 참고).

옵션 (options)

  • 파이프라인 설정에서 정의한 런타임 변수가 options 딕셔너리로 전달됩니다.
  • 실행마다 달라지는 날짜·필터 조건 등을 옵션으로 전달하면 코드를 재사용할 수 있습니다.

출력 (output)

  • 반드시 "output" 키를 포함하는 딕셔너리를 반환해야 하며, 값은 Polars DataFrame입니다.
반환 형식 주의

return output처럼 DataFrame을 직접 반환하면 오류가 발생합니다. 반드시 {"output": output} 형식으로 반환하세요.


사용 가능한 패키지

파이프라인 Python 환경에서 주로 사용하는 데이터 처리·공간 분석 라이브러리입니다.

패키지용도
polars고성능 DataFrame 처리 (기본 라이브러리)
pandas범용 DataFrame 처리
pyarrowApache Arrow 컬럼형 데이터 처리
numpy수치 계산
geopandas지리 공간 데이터 처리
h3H3 육각형 공간 인덱싱
pyproj좌표계 변환
rdflibRDF/온톨로지 데이터 처리
추가 패키지

기본 환경에 없는 패키지가 필요하면 파이프라인/코드 설정의 패키지 항목이나 실행 환경을 조정하세요. 자세한 정책은 관리자에게 문의하세요.


코드 예시

데이터 필터링

import polars as pl

def run(input, options):
return {"output": input.filter(
(pl.col("status") == "active") & (pl.col("age") >= 18)
)}

컬럼 변환 및 추가

import polars as pl

def run(input, options):
output = input.with_columns([
(pl.col("price") * pl.col("quantity")).alias("total"),
pl.col("date").str.to_date("%Y-%m-%d").alias("parsed_date"),
pl.col("category").str.to_uppercase().alias("category_upper"),
])
return {"output": output}

그룹별 집계

import polars as pl

def run(input, options):
output = input.group_by("region").agg([
pl.col("amount").sum().alias("total_amount"),
pl.col("amount").mean().alias("avg_amount"),
pl.col("order_id").n_unique().alias("order_count"),
])
return {"output": output}

여러 입력 받기

입력 연결이 여러 개인 노드는 각 입력의 별칭을 매개변수 이름으로 받습니다(마지막 매개변수는 항상 options). 예를 들어 입력 별칭이 orders, customers인 노드는 다음과 같이 작성합니다.

import polars as pl

def run(orders, customers, options):
output = orders.join(customers, on="customer_id", how="left")
return {"output": output}

옵션 활용

import polars as pl

def run(input, options):
threshold = float(options.get("threshold", 100))
target_date = options.get("target_date", "2026-01-01")

output = input.filter(
(pl.col("value") > threshold) & (pl.col("measured_at") >= target_date)
)
return {"output": output}

GIS 데이터 처리

H3 인덱싱

위경도 좌표를 H3 육각형 인덱스로 변환해 공간 집계를 수행할 수 있습니다.

import polars as pl
import h3

def run(input, options):
resolution = int(options.get("h3_resolution", 7))

rows = input.to_dicts()
for row in rows:
row["h3_index"] = h3.latlng_to_cell(
row["latitude"], row["longitude"], resolution
)

output = pl.DataFrame(rows).group_by("h3_index").agg([
pl.col("value").mean().alias("avg_value"),
pl.len().alias("point_count"),
])
return {"output": output}

좌표계 변환

import polars as pl
from pyproj import Transformer

def run(input, options):
transformer = Transformer.from_crs("EPSG:5179", "EPSG:4326", always_xy=True)

rows = input.to_dicts()
for row in rows:
lon, lat = transformer.transform(row["x"], row["y"])
row["longitude"], row["latitude"] = lon, lat

return {"output": pl.DataFrame(rows)}

Pandas 호환

기존 Pandas 코드가 있다면 Polars와 상호 변환해 사용할 수 있습니다.

import polars as pl

def run(input, options):
pdf = input.to_pandas()
pdf["new_col"] = pdf["col_a"].apply(lambda x: x.strip().lower())
return {"output": pl.from_pandas(pdf)}
성능 고려

Pandas 변환은 메모리 복사가 발생합니다. 대용량 데이터에서는 Polars 네이티브 API를 직접 사용하는 것을 권장합니다.


디버깅 팁

  • print() 출력은 파이프라인 실행 로그에서 확인할 수 있습니다.
  • 중간 결과는 print(input.head(5)), 스키마는 print(input.schema)로 확인하세요.
  • 에러가 발생하면 트레이스백이 실행 로그에 표시됩니다.

관련 문서