Python 가이드
D.Hub 파이프라인의 Python 코드 노드를 사용하면 데이터를 변환, 분석, 가공할 수 있습니다. Polars를 기본 데이터 처리 라이브러리로 사용하며, 다양한 과학 계산 및 지리 공간 패키지를 지원합니다.
기본 구조
Python 코드 노드는 transform 함수를 진입점으로 사용합니다.
import polars as pl
def transform(inputs, options):
df = inputs['input_df']
result_df = df.with_columns(
(pl.col("value") * 2).alias("doubled_value")
)
return {"output": result_df}
입력 (Input)
- 이전 노드에서 전달된 데이터는
inputs딕셔너리로 전달됩니다. - 키 이름은 파이프라인 에디터에서 설정한 입력 별칭(Input Alias)을 따릅니다.
- 값은
Polars DataFrame형태입니다.
옵션 (Options)
- 파이프라인 설정에서 정의한 런타임 변수가
options딕셔너리로 전달됩니다. - 실행마다 달라지는 날짜, 필터 조건 등을 옵션으로 전달하면 코드를 재사용할 수 있습니다.
출력 (Output)
- 반드시
"output"키를 포함하는 딕셔너리를 반환해야 합니다. "output"값은Polars DataFrame이어야 합니다.
반환 형식 주의
return result_df처럼 DataFrame을 직접 반환하면 오류가 발생합니다. 반드시 {"output": result_df} 형식으로 반환하세요.
사용 가능한 패키지
D.Hub 파이프라인 환경에는 다음 패키지들이 기본 설치되어 있습니다.
| 패키지 | 용도 |
|---|---|
polars | 고성능 DataFrame 처리 (기본 라이브러리) |
pandas | 범용 DataFrame 처리 |
pyarrow | Apache Arrow 컬럼형 데이터 처리 |
numpy | 수치 계산 |
geopandas | 지리 공간 데이터 처리 |
h3 | H3 육각형 공간 인덱싱 |
pyproj | 좌표계 변환 |
rdflib | RDF/온톨로지 데이터 처리 |
추가 패키지
기본 제공 패키지 외에 추가 패키지가 필요하면, 파이프라인 설정에서 AI 실행 환경을 선택하거나 관리자에게 문의하세요.
코드 예시
데이터 필터링
import polars as pl
def transform(inputs, options):
df = inputs['raw_data']
filtered = df.filter(
(pl.col("status") == "active") &
(pl.col("age") >= 18)
)
return {"output": filtered}
컬럼 변환 및 추가
import polars as pl
def transform(inputs, options):
df = inputs['sales']
result = df.with_columns([
(pl.col("price") * pl.col("quantity")).alias("total"),
pl.col("date").str.to_date("%Y-%m-%d").alias("parsed_date"),
pl.col("category").str.to_uppercase().alias("category_upper"),
])
return {"output": result}
그룹별 집계
import polars as pl
def transform(inputs, options):
df = inputs['transactions']
summary = df.group_by("region").agg([
pl.col("amount").sum().alias("total_amount"),
pl.col("amount").mean().alias("avg_amount"),
pl.col("order_id").n_unique().alias("order_count"),
])
return {"output": summary}
DataFrame 조인
import polars as pl
def transform(inputs, options):
orders = inputs['orders']
customers = inputs['customers']
joined = orders.join(
customers,
on="customer_id",
how="left",
)
return {"output": joined}
옵션 활용
import polars as pl
def transform(inputs, options):
df = inputs['sensor_data']
threshold = float(options.get("threshold", 100))
target_date = options.get("target_date", "2026-01-01")
result = df.filter(
(pl.col("value") > threshold) &
(pl.col("measured_at") >= target_date)
)
return {"output": result}
GIS 데이터 처리
H3 인덱싱
위경도 좌표를 H3 육각형 인덱스로 변환하여 공간 집계를 수행할 수 있습니다.
import polars as pl
import h3
def transform(inputs, options):
df = inputs['location_data']
resolution = int(options.get("h3_resolution", 7))
rows = df.to_dicts()
for row in rows:
row["h3_index"] = h3.latlng_to_cell(
row["latitude"], row["longitude"], resolution
)
result = pl.DataFrame(rows)
aggregated = result.group_by("h3_index").agg([
pl.col("value").mean().alias("avg_value"),
pl.len().alias("point_count"),
])
return {"output": aggregated}
좌표계 변환
import polars as pl
from pyproj import Transformer
def transform(inputs, options):
df = inputs['korea_data']
transformer = Transformer.from_crs("EPSG:5179", "EPSG:4326", always_xy=True)
rows = df.to_dicts()
for row in rows:
lon, lat = transformer.transform(row["x"], row["y"])
row["longitude"] = lon
row["latitude"] = lat
return {"output": pl.DataFrame(rows)}
Pandas 호환
기존 Pandas 코드가 있다면 Polars와 상호 변환하여 사용할 수 있습니다.
import polars as pl
import pandas as pd
def transform(inputs, options):
df = inputs['data']
pdf = df.to_pandas()
pdf["new_col"] = pdf["col_a"].apply(lambda x: x.strip().lower())
result = pl.from_pandas(pdf)
return {"output": result}
성능 고려
Pandas 변환은 메모리 복사가 발생합니다. 대용량 데이터에서는 Polars 네이티브 API를 직접 사용하는 것을 권장합니다.
디버깅 팁
print()출력은 파이프라인 실행 로그에서 확인할 수 있습니다.- 중간 결과를 확인하려면
print(df.head(5))를 사용하세요. - 스키마를 확인하려면
print(df.schema)를 사용하세요. - 에러 발생 시 트레이스백이 실행 로그에 표시됩니다.