현대의 데이터 과학 프로젝트에서 데이터는 단일 소스에서 오는 경우가 드뭅니다. 다양한 데이터베이스, API, 파일 시스템, 클라우드 서비스에서 수집된 데이터를 하나의 일관성 있는 데이터셋으로 통합하는 것은 데이터 전처리의 핵심 과정입니다. 이번 포스트에서는 데이터 통합의 모든 측면을 체계적으로 다뤄보겠습니다.
1. 데이터 통합의 개념과 중요성
데이터 통합이란?
데이터 통합(Data Integration)은 여러 이질적인 데이터 소스로부터 데이터를 수집, 결합, 변환하여 통일된 형태의 데이터셋을 만드는 과정입니다. 단순한 데이터 결합을 넘어서 의미적 일관성, 품질 보장, 중복 제거 등을 포함하는 포괄적인 작업입니다.
데이터 통합의 필요성
비즈니스 관점:
- 360도 고객 뷰 구축
- 실시간 의사결정 지원
- 데이터 사일로(Data Silo) 해소
- 비즈니스 인텔리전스 향상
기술적 관점:
- 데이터 중복 제거
- 일관성 있는 데이터 스키마 구축
- 데이터 품질 개선
- 분석 효율성 증대
데이터 통합의 주요 도전과제
- 스키마 이질성: 서로 다른 데이터 구조와 형식
- 의미적 이질성: 같은 개념의 다른 표현
- 데이터 품질: 누락, 중복, 불일치 데이터
- 확장성: 대용량 데이터 처리
- 실시간성: 스트리밍 데이터 통합
2. 데이터 통합 아키텍처 패턴
2.1 ETL (Extract, Transform, Load)
전통적인 배치 중심의 데이터 통합 방식입니다.
단계별 상세:
Extract (추출):
# 다양한 소스에서 데이터 추출 예시
import pandas as pd
import sqlite3
from sqlalchemy import create_engine
# 데이터베이스에서 추출
def extract_from_database():
engine = create_engine('postgresql://user:pass@localhost/db')
query = "SELECT * FROM customers WHERE created_date >= '2024-01-01'"
return pd.read_sql(query, engine)
# CSV 파일에서 추출
def extract_from_csv():
return pd.read_csv('sales_data.csv')
# API에서 추출
def extract_from_api():
import requests
response = requests.get('https://api.example.com/orders')
return pd.DataFrame(response.json())
Transform (변환):
def transform_customer_data(df):
# 데이터 정제 및 변환
df['phone'] = df['phone'].str.replace(r'[^\d]', '', regex=True)
df['email'] = df['email'].str.lower().str.strip()
df['created_date'] = pd.to_datetime(df['created_date'])
# 파생 변수 생성
df['customer_age_group'] = pd.cut(df['age'],
bins=[0, 25, 35, 50, 100],
labels=['Young', 'Adult', 'Middle', 'Senior'])
return df
Load (적재):
def load_to_warehouse(df, table_name):
engine = create_engine('postgresql://user:pass@datawarehouse/db')
df.to_sql(table_name, engine, if_exists='append', index=False)
ETL의 장단점:
- 장점: 안정성, 데이터 품질 보장, 복잡한 변환 가능
- 단점: 지연 시간, 배치 처리의 한계, 실시간성 부족
2.2 ELT (Extract, Load, Transform)
클라우드 환경에서 인기를 얻고 있는 방식입니다.
특징:
- 원본 데이터를 먼저 데이터 웨어하우스에 적재
- 웨어하우스 내에서 변환 수행
- 클라우드 컴퓨팅 성능 활용
-- ELT 변환 예시 (BigQuery)
CREATE OR REPLACE TABLE `project.dataset.customer_unified` AS
SELECT
COALESCE(crm.customer_id, web.user_id) as unified_customer_id,
COALESCE(crm.email, web.email) as email,
crm.phone,
web.last_login,
crm.created_date
FROM `project.dataset.crm_data` crm
FULL OUTER JOIN `project.dataset.web_data` web
ON crm.email = web.email
2.3 실시간 스트리밍 통합
Apache Kafka 기반 스트리밍:
from kafka import KafkaConsumer, KafkaProducer
import json
def stream_integration():
consumer = KafkaConsumer(
'orders', 'customers', 'products',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
order_buffer = {}
for message in consumer:
if message.topic == 'orders':
order_data = message.value
# 고객 및 제품 정보와 실시간 조인
enriched_order = enrich_order_data(order_data, order_buffer)
producer.send('enriched_orders', enriched_order)
2.4 데이터 가상화 (Data Virtualization)
물리적 통합 없이 논리적 통합을 제공하는 방식입니다.
특징:
- 실시간 쿼리 처리
- 데이터 이동 최소화
- 동적 스키마 매핑
3. 스키마 통합 기술
3.1 스키마 매칭 (Schema Matching)
서로 다른 스키마 간의 대응 관계를 찾는 기술입니다.
자동 매칭 기법:
문자열 유사도 기반:
from difflib import SequenceMatcher
import pandas as pd
def calculate_string_similarity(str1, str2):
return SequenceMatcher(None, str1.lower(), str2.lower()).ratio()
def auto_schema_matching(schema1_columns, schema2_columns, threshold=0.7):
matches = []
for col1 in schema1_columns:
best_match = None
best_score = 0
for col2 in schema2_columns:
score = calculate_string_similarity(col1, col2)
if score > best_score and score >= threshold:
best_score = score
best_match = col2
if best_match:
matches.append((col1, best_match, best_score))
return matches
의미적 매칭:
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
def semantic_schema_matching(schema1_columns, schema2_columns):
model = SentenceTransformer('all-MiniLM-L6-v2')
# 컬럼명을 벡터로 변환
embeddings1 = model.encode(schema1_columns)
embeddings2 = model.encode(schema2_columns)
# 코사인 유사도 계산
similarities = cosine_similarity(embeddings1, embeddings2)
matches = []
for i, col1 in enumerate(schema1_columns):
best_match_idx = np.argmax(similarities[i])
best_score = similarities[i][best_match_idx]
if best_score > 0.8: # 임계값
matches.append((col1, schema2_columns[best_match_idx], best_score))
return matches
3.2 스키마 매핑 (Schema Mapping)
매칭된 스키마를 실제로 통합하는 과정입니다.
규칙 기반 매핑:
class SchemaMapper:
def __init__(self):
self.mapping_rules = {
'customer_id': ['cust_id', 'customer_key', 'client_id'],
'full_name': ['name', 'customer_name', 'client_name'],
'email_address': ['email', 'e_mail', 'email_addr'],
'phone_number': ['phone', 'tel', 'telephone', 'mobile']
}
def apply_mapping(self, df, source_schema):
mapped_df = df.copy()
for target_col, source_variants in self.mapping_rules.items():
for source_col in source_variants:
if source_col in df.columns:
mapped_df[target_col] = mapped_df[source_col]
mapped_df.drop(source_col, axis=1, inplace=True)
break
return mapped_df
3.3 스키마 진화 (Schema Evolution)
시간에 따른 스키마 변화를 관리하는 기술입니다.
버전 관리 시스템:
class SchemaVersionManager:
def __init__(self):
self.schema_versions = {}
self.migration_rules = {}
def register_schema_version(self, version, schema):
self.schema_versions[version] = schema
def add_migration_rule(self, from_version, to_version, migration_func):
self.migration_rules[(from_version, to_version)] = migration_func
def migrate_data(self, df, from_version, to_version):
if (from_version, to_version) in self.migration_rules:
migration_func = self.migration_rules[(from_version, to_version)]
return migration_func(df)
else:
# 자동 마이그레이션 시도
return self.auto_migrate(df, from_version, to_version)
4. 엔티티 해결 (Entity Resolution)
4.1 중복 탐지 (Duplicate Detection)
정확한 매칭:
def exact_duplicate_detection(df, key_columns):
"""정확한 중복 탐지"""
duplicates = df.duplicated(subset=key_columns, keep=False)
return df[duplicates]
def fuzzy_duplicate_detection(df, columns, threshold=0.8):
"""퍼지 매칭을 통한 중복 탐지"""
from fuzzywuzzy import fuzz
potential_duplicates = []
for i in range(len(df)):
for j in range(i+1, len(df)):
similarity_score = 0
for col in columns:
str1 = str(df.iloc[i][col])
str2 = str(df.iloc[j][col])
similarity_score += fuzz.ratio(str1, str2) / 100
avg_similarity = similarity_score / len(columns)
if avg_similarity >= threshold:
potential_duplicates.append((i, j, avg_similarity))
return potential_duplicates
4.2 레코드 연결 (Record Linkage)
확률적 매칭:
import recordlinkage
from recordlinkage.preprocessing import clean
def probabilistic_record_linkage(df1, df2):
# 데이터 정제
df1['name_clean'] = clean(df1['name'])
df2['name_clean'] = clean(df2['name'])
# 인덱서 생성 (블로킹)
indexer = recordlinkage.Index()
indexer.block('first_name')
candidate_pairs = indexer.index(df1, df2)
# 비교 객체 생성
compare = recordlinkage.Compare()
compare.exact('gender', 'gender')
compare.string('name_clean', 'name_clean', method='jarowinkler')
compare.exact('city', 'city')
compare.numeric('age', 'age')
# 특성 벡터 계산
features = compare.compute(candidate_pairs, df1, df2)
# 분류기를 통한 매칭 결정
from sklearn.ensemble import RandomForestClassifier
classifier = RandomForestClassifier()
# ... 훈련 데이터로 학습 ...
matches = classifier.predict(features)
return candidate_pairs[matches == 1]
4.3 엔티티 해결 파이프라인
class EntityResolutionPipeline:
def __init__(self):
self.blocking_keys = []
self.comparison_fields = []
self.threshold = 0.8
def add_blocking_key(self, key):
self.blocking_keys.append(key)
def add_comparison_field(self, field, method='exact'):
self.comparison_fields.append((field, method))
def resolve_entities(self, datasets):
"""다중 데이터셋에 대한 엔티티 해결"""
resolved_entities = {}
entity_id = 0
for dataset_name, df in datasets.items():
for _, record in df.iterrows():
matched_entity = self.find_matching_entity(
record, resolved_entities
)
if matched_entity:
resolved_entities[matched_entity]['records'].append({
'dataset': dataset_name,
'record': record
})
else:
resolved_entities[entity_id] = {
'master_record': self.create_master_record(record),
'records': [{
'dataset': dataset_name,
'record': record
}]
}
entity_id += 1
return resolved_entities
5. 데이터 융합 (Data Fusion)
5.1 값 충돌 해결
서로 다른 소스에서 같은 엔티티에 대해 다른 값이 있을 때의 해결 방법입니다.
투표 기반 방식:
def voting_based_fusion(conflicting_values, weights=None):
"""가중 투표를 통한 값 융합"""
from collections import Counter
if weights is None:
weights = [1] * len(conflicting_values)
weighted_votes = {}
for value, weight in zip(conflicting_values, weights):
if value in weighted_votes:
weighted_votes[value] += weight
else:
weighted_votes[value] = weight
return max(weighted_votes, key=weighted_votes.get)
def confidence_based_fusion(conflicting_data):
"""신뢰도 기반 값 융합"""
# 데이터 소스별 신뢰도 스코어
source_confidence = {
'crm_system': 0.9,
'web_analytics': 0.7,
'social_media': 0.5,
'survey_data': 0.8
}
best_value = None
best_confidence = 0
for data_point in conflicting_data:
source = data_point['source']
value = data_point['value']
confidence = source_confidence.get(source, 0.1)
if confidence > best_confidence:
best_confidence = confidence
best_value = value
return best_value
5.2 마스터 데이터 생성
class MasterDataGenerator:
def __init__(self):
self.fusion_rules = {}
def add_fusion_rule(self, field, strategy, **kwargs):
self.fusion_rules[field] = {
'strategy': strategy,
'params': kwargs
}
def generate_master_record(self, duplicate_records):
master_record = {}
for field in duplicate_records[0].keys():
if field in self.fusion_rules:
rule = self.fusion_rules[field]
strategy = rule['strategy']
values = [record[field] for record in duplicate_records
if pd.notna(record[field])]
if strategy == 'most_frequent':
master_record[field] = self.most_frequent_value(values)
elif strategy == 'most_recent':
master_record[field] = self.most_recent_value(
duplicate_records, field, rule['params']['date_field']
)
elif strategy == 'highest_quality':
master_record[field] = self.highest_quality_value(
values, rule['params']['quality_function']
)
elif strategy == 'longest':
master_record[field] = max(values, key=len)
else:
# 기본 전략: 첫 번째 non-null 값
for record in duplicate_records:
if pd.notna(record[field]):
master_record[field] = record[field]
break
return master_record
6. 데이터 변환 기술
6.1 구조적 변환
정규화/비정규화:
def normalize_data(df, normal_form='3NF'):
"""데이터 정규화"""
if normal_form == '1NF':
return first_normal_form(df)
elif normal_form == '2NF':
return second_normal_form(df)
elif normal_form == '3NF':
return third_normal_form(df)
def denormalize_for_analytics(normalized_tables):
"""분석을 위한 비정규화"""
# 스타 스키마 생성
fact_table = normalized_tables['facts']
for dim_name, dim_table in normalized_tables['dimensions'].items():
fact_table = fact_table.merge(
dim_table,
on=f'{dim_name}_key',
how='left',
suffixes=('', f'_{dim_name}')
)
return fact_table
6.2 데이터 타입 통합
class DataTypeUnifier:
def __init__(self):
self.type_mapping = {
'date_formats': ['%Y-%m-%d', '%d/%m/%Y', '%m-%d-%Y'],
'boolean_values': {
True: ['true', 'yes', '1', 'y', 'on'],
False: ['false', 'no', '0', 'n', 'off']
}
}
def unify_dates(self, df, date_columns):
for col in date_columns:
unified_dates = []
for date_str in df[col]:
parsed_date = None
for format_str in self.type_mapping['date_formats']:
try:
parsed_date = pd.to_datetime(date_str, format=format_str)
break
except:
continue
unified_dates.append(parsed_date)
df[col] = unified_dates
return df
def unify_booleans(self, df, boolean_columns):
for col in boolean_columns:
df[col] = df[col].astype(str).str.lower()
for bool_val, representations in self.type_mapping['boolean_values'].items():
df[col] = df[col].replace(representations, bool_val)
return df
6.3 단위 변환
class UnitConverter:
def __init__(self):
self.conversion_factors = {
'length': {
'meter': 1.0,
'kilometer': 1000.0,
'centimeter': 0.01,
'inch': 0.0254,
'foot': 0.3048
},
'weight': {
'kilogram': 1.0,
'gram': 0.001,
'pound': 0.453592,
'ounce': 0.0283495
},
'currency': {
'usd': 1.0,
'eur': 1.1, # 실제로는 API에서 실시간 환율 가져와야 함
'krw': 0.00075
}
}
def convert_units(self, df, conversions):
"""
conversions: {
'height': {'from_unit': 'inch', 'to_unit': 'meter', 'category': 'length'},
'price': {'from_unit': 'eur', 'to_unit': 'usd', 'category': 'currency'}
}
"""
for column, conversion_info in conversions.items():
if column in df.columns:
category = conversion_info['category']
from_unit = conversion_info['from_unit']
to_unit = conversion_info['to_unit']
from_factor = self.conversion_factors[category][from_unit]
to_factor = self.conversion_factors[category][to_unit]
conversion_ratio = from_factor / to_factor
df[column] = df[column] * conversion_ratio
return df
7. 데이터 품질 관리
7.1 품질 지표 측정
class DataQualityAssessment:
def __init__(self):
self.quality_metrics = {}
def assess_completeness(self, df):
"""완전성 평가"""
total_cells = df.shape[0] * df.shape[1]
missing_cells = df.isnull().sum().sum()
completeness = (total_cells - missing_cells) / total_cells
return completeness
def assess_consistency(self, df, consistency_rules):
"""일관성 평가"""
violations = 0
total_rules = len(consistency_rules)
for rule in consistency_rules:
if rule['type'] == 'range':
column = rule['column']
min_val, max_val = rule['range']
violations += len(df[(df[column] < min_val) | (df[column] > max_val)])
elif rule['type'] == 'format':
column = rule['column']
pattern = rule['pattern']
violations += len(df[~df[column].str.match(pattern, na=False)])
consistency = 1 - (violations / (len(df) * total_rules))
return max(0, consistency)
def assess_accuracy(self, df, reference_data):
"""정확성 평가 (참조 데이터와 비교)"""
if reference_data is None:
return None
# 참조 데이터와 매칭하여 정확성 측정
merged = df.merge(reference_data, on='id', suffixes=('', '_ref'))
accuracy_scores = []
for col in df.columns:
if col != 'id' and f'{col}_ref' in merged.columns:
matches = (merged[col] == merged[f'{col}_ref']).sum()
accuracy = matches / len(merged)
accuracy_scores.append(accuracy)
return sum(accuracy_scores) / len(accuracy_scores) if accuracy_scores else 0
7.2 품질 개선 자동화
class AutomaticQualityImprovement:
def __init__(self):
self.improvement_strategies = {}
def auto_fix_missing_values(self, df, strategies=None):
"""결측값 자동 처리"""
if strategies is None:
strategies = {
'numeric': 'median',
'categorical': 'mode',
'datetime': 'forward_fill'
}
for column in df.columns:
if df[column].isnull().any():
if df[column].dtype in ['int64', 'float64']:
if strategies['numeric'] == 'median':
df[column].fillna(df[column].median(), inplace=True)
elif strategies['numeric'] == 'mean':
df[column].fillna(df[column].mean(), inplace=True)
elif df[column].dtype == 'object':
if strategies['categorical'] == 'mode':
mode_value = df[column].mode()[0] if not df[column].mode().empty else 'Unknown'
df[column].fillna(mode_value, inplace=True)
elif df[column].dtype == 'datetime64[ns]':
if strategies['datetime'] == 'forward_fill':
df[column].fillna(method='ffill', inplace=True)
return df
def auto_standardize_formats(self, df):
"""형식 자동 표준화"""
for column in df.columns:
if df[column].dtype == 'object':
# 이메일 형식 표준화
if 'email' in column.lower():
df[column] = df[column].str.lower().str.strip()
# 전화번호 형식 표준화
elif 'phone' in column.lower():
df[column] = df[column].str.replace(r'[^\d]', '', regex=True)
# 텍스트 정규화
else:
df[column] = df[column].str.strip()
return df
8. 실시간 데이터 통합
8.1 스트리밍 데이터 처리
Apache Kafka + Apache Flink:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
def setup_streaming_integration():
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
# Kafka 소스 테이블 생성
table_env.execute_sql("""
CREATE TABLE orders (
order_id STRING,
customer_id STRING,
product_id STRING,
quantity INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""")
table_env.execute_sql("""
CREATE TABLE customers (
customer_id STRING,
name STRING,
email STRING,
segment STRING
) WITH (
'connector' = 'kafka',
'topic' = 'customers',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""")
# 실시간 조인 및 집계
result = table_env.sql_query("""
SELECT
o.order_id,
c.name,
c.segment,
o.quantity,
o.order_time
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
""")
return result
8.2 변경 데이터 캡처 (CDC)
class ChangeDataCapture:
def __init__(self, source_db, target_system):
self.source_db = source_db
self.target_system = target_system
self.last_sync_time = None
def capture_changes(self, table_name):
"""데이터베이스 변경사항 캡처"""
if self.last_sync_time:
query = f"""
SELECT *, 'UPDATE' as operation_type
FROM {table_name}
WHERE last_modified > '{self.last_sync_time}'
UNION ALL
SELECT *, 'DELETE' as operation_type
FROM {table_name}_deleted_log
WHERE deleted_at > '{self.last_sync_time}'
"""
else:
query = f"SELECT *, 'INSERT' as operation_type FROM {table_name}"
changes = pd.read_sql(query, self.source_db)
return changes
def apply_changes(self, changes):
"""타겟 시스템에 변경사항 적용"""
for _, change in changes.iterrows():
...
...
...
답글 남기기