본문으로 건너뛰기
버전: v0.1.0

Python 가이드

D.Hub 파이프라인의 Python 코드 노드를 사용하면 데이터를 변환, 분석, 가공할 수 있습니다. Polars를 기본 데이터 처리 라이브러리로 사용하며, 다양한 과학 계산 및 지리 공간 패키지를 지원합니다.

기본 구조

Python 코드 노드는 transform 함수를 진입점으로 사용합니다.

import polars as pl

def transform(inputs, options):
df = inputs['input_df']

result_df = df.with_columns(
(pl.col("value") * 2).alias("doubled_value")
)

return {"output": result_df}

입력 (Input)

  • 이전 노드에서 전달된 데이터는 inputs 딕셔너리로 전달됩니다.
  • 키 이름은 파이프라인 에디터에서 설정한 입력 별칭(Input Alias)을 따릅니다.
  • 값은 Polars DataFrame 형태입니다.

옵션 (Options)

  • 파이프라인 설정에서 정의한 런타임 변수가 options 딕셔너리로 전달됩니다.
  • 실행마다 달라지는 날짜, 필터 조건 등을 옵션으로 전달하면 코드를 재사용할 수 있습니다.

출력 (Output)

  • 반드시 "output" 키를 포함하는 딕셔너리를 반환해야 합니다.
  • "output" 값은 Polars DataFrame이어야 합니다.
반환 형식 주의

return result_df처럼 DataFrame을 직접 반환하면 오류가 발생합니다. 반드시 {"output": result_df} 형식으로 반환하세요.


사용 가능한 패키지

D.Hub 파이프라인 환경에는 다음 패키지들이 기본 설치되어 있습니다.

패키지용도
polars고성능 DataFrame 처리 (기본 라이브러리)
pandas범용 DataFrame 처리
pyarrowApache Arrow 컬럼형 데이터 처리
numpy수치 계산
geopandas지리 공간 데이터 처리
h3H3 육각형 공간 인덱싱
pyproj좌표계 변환
rdflibRDF/온톨로지 데이터 처리
추가 패키지

기본 제공 패키지 외에 추가 패키지가 필요하면, 파이프라인 설정에서 AI 실행 환경을 선택하거나 관리자에게 문의하세요.


코드 예시

데이터 필터링

import polars as pl

def transform(inputs, options):
df = inputs['raw_data']

filtered = df.filter(
(pl.col("status") == "active") &
(pl.col("age") >= 18)
)

return {"output": filtered}

컬럼 변환 및 추가

import polars as pl

def transform(inputs, options):
df = inputs['sales']

result = df.with_columns([
(pl.col("price") * pl.col("quantity")).alias("total"),
pl.col("date").str.to_date("%Y-%m-%d").alias("parsed_date"),
pl.col("category").str.to_uppercase().alias("category_upper"),
])

return {"output": result}

그룹별 집계

import polars as pl

def transform(inputs, options):
df = inputs['transactions']

summary = df.group_by("region").agg([
pl.col("amount").sum().alias("total_amount"),
pl.col("amount").mean().alias("avg_amount"),
pl.col("order_id").n_unique().alias("order_count"),
])

return {"output": summary}

DataFrame 조인

import polars as pl

def transform(inputs, options):
orders = inputs['orders']
customers = inputs['customers']

joined = orders.join(
customers,
on="customer_id",
how="left",
)

return {"output": joined}

옵션 활용

import polars as pl

def transform(inputs, options):
df = inputs['sensor_data']
threshold = float(options.get("threshold", 100))
target_date = options.get("target_date", "2026-01-01")

result = df.filter(
(pl.col("value") > threshold) &
(pl.col("measured_at") >= target_date)
)

return {"output": result}

GIS 데이터 처리

H3 인덱싱

위경도 좌표를 H3 육각형 인덱스로 변환하여 공간 집계를 수행할 수 있습니다.

import polars as pl
import h3

def transform(inputs, options):
df = inputs['location_data']
resolution = int(options.get("h3_resolution", 7))

rows = df.to_dicts()
for row in rows:
row["h3_index"] = h3.latlng_to_cell(
row["latitude"], row["longitude"], resolution
)

result = pl.DataFrame(rows)

aggregated = result.group_by("h3_index").agg([
pl.col("value").mean().alias("avg_value"),
pl.len().alias("point_count"),
])

return {"output": aggregated}

좌표계 변환

import polars as pl
from pyproj import Transformer

def transform(inputs, options):
df = inputs['korea_data']
transformer = Transformer.from_crs("EPSG:5179", "EPSG:4326", always_xy=True)

rows = df.to_dicts()
for row in rows:
lon, lat = transformer.transform(row["x"], row["y"])
row["longitude"] = lon
row["latitude"] = lat

return {"output": pl.DataFrame(rows)}

Pandas 호환

기존 Pandas 코드가 있다면 Polars와 상호 변환하여 사용할 수 있습니다.

import polars as pl
import pandas as pd

def transform(inputs, options):
df = inputs['data']

pdf = df.to_pandas()
pdf["new_col"] = pdf["col_a"].apply(lambda x: x.strip().lower())
result = pl.from_pandas(pdf)

return {"output": result}
성능 고려

Pandas 변환은 메모리 복사가 발생합니다. 대용량 데이터에서는 Polars 네이티브 API를 직접 사용하는 것을 권장합니다.


디버깅 팁

  • print() 출력은 파이프라인 실행 로그에서 확인할 수 있습니다.
  • 중간 결과를 확인하려면 print(df.head(5))를 사용하세요.
  • 스키마를 확인하려면 print(df.schema)를 사용하세요.
  • 에러 발생 시 트레이스백이 실행 로그에 표시됩니다.