2025. 6. 11. 22:53ㆍMS AI School
다른 팀원분은 GRU 모델을 사용하여 모델 학습 후 백엔드 진행을 맡았고,
나는 Two-Tower 모델을 구현을 해보기로 하였다.
각각의 모델로 구현 후, 코드 리뷰를 해보면서 기능을 테스트 해보기로 했다.
그래서 오늘은 내가 이틀 동안 구현해 본 추천 시스템을 위한 Two-Tower 모델 학습 파이프라인을 설명해보려고 한다.
데이터 전처리
# --- 1. 데이터 로드 및 전처리 ---
print("\n--- 1. Loading and Preprocessing Data ---")
parquet_files = [f"{data_folder_path}/{i:012d}.parquet" for i in range(4)]
print(f"Attempting to load {len(parquet_files)} parquet files...")
try:
# 필요한 컬럼만 불러오기
columns_to_read = ['session_id', 'item_id', 'name', 'price', 'c1_name', 'c2_name', 'brand_name']
dataset = load_dataset('parquet', data_files={'train': parquet_files}, split='train', columns=columns_to_read)
df = dataset.to_pandas()
print(f"Successfully loaded {len(df)} rows from {len(parquet_files)} Parquet files.")
# 결측치 처리
for col in ['c1_name', 'c2_name', 'brand_name', 'item_condition_name']:
df[col] = df[col].fillna('Unknown')
# 사용자/아이템 인덱스 생성
df['user_idx'] = df['session_id'].astype('category').cat.codes
df['item_idx'] = df['item_id'].astype('category').cat.codes
num_users = len(df['user_idx'].unique())
num_items = len(df['item_idx'].unique())
print(f"Unique Users: {num_users}, Unique Items: {num_items}")
except Exception as e:
print(f"An error occurred during data loading: {e}")
exit()
총 10개의 parquet 데이터를 다운 받아서 학습을 시키려고 했지만, GPU 성능의 이슈로 인해 학습 시간이 너무 오래 걸렸다.
그래서 타협을 본 후 총 4개의 파일만 불러와서 학습을 시키게 되었다.
학습에 사용 될 column은 'session_id', 'item_id', 'name', 'price', 'c1_name', 'c2_name', 'brand_name' 총 7개 이다.
session_id는 각 사용자를 구분하는 user_idx로 사용될 것이고, item_id, name은 각 상품을 뜻하게 된다.
Feature Engineering
# 2-1. 가격(Price) 피처 처리
print("Processing price feature...")
# 가격이 0 이하인 경우 이상치로 간주하고 1로 처리 후 로그 변환
df['price'] = df['price'].apply(lambda x: max(x, 1.0))
df['log_price'] = np.log1p(df['price'])
price_scaler = StandardScaler()
df['scaled_price'] = price_scaler.fit_transform(df[['log_price']])
# 아이템별 가격 정보 저장
item_prices = df.drop_duplicates('item_idx').set_index('item_idx')['scaled_price'].to_dict()
# 2-2. 텍스트(Text) 임베딩 생성
print("Creating enhanced item embeddings...")
text_model = SentenceTransformer('all-MiniLM-L6-v2', device=device)
# 아이템별 메타데이터 딕셔너리 생성
item_meta = df.drop_duplicates('item_idx').set_index('item_idx')
item_titles = item_meta['name'].to_dict()
item_idx_to_c1 = item_meta['c1_name'].to_dict()
item_idx_to_c2 = item_meta['c2_name'].to_dict()
item_idx_to_brand = item_meta['brand_name'].to_dict()
item_idx_to_condition = item_meta['item_condition_name'].to_dict()
# 모든 피처를 결합한 상세 설명 생성
item_descriptions = []
for i in range(num_items):
name = item_titles.get(i, 'N/A')
c1 = item_idx_to_c1.get(i, 'N/A')
c2 = item_idx_to_c2.get(i, 'N/A')
brand = item_idx_to_brand.get(i, 'N/A')
condition = item_idx_to_condition.get(i, 'N/A')
description = (f"Name: {name}. Category: {c1}, {c2}. "
f"Brand: {brand}. Condition: {condition}.")
item_descriptions.append(description)
# 텍스트 임베딩 생성
item_text_embeddings = text_model.encode(item_descriptions, show_progress_bar=True, convert_to_tensor=True, device=device)
text_embedding_dim = item_text_embeddings.shape[1]
print("Item embeddings generated.")
상품 가격에 대해서는 우선 0 이하의 값들(이상치)을 1로 보정 해주었고, right-skewed 분포를 모델이 학습하기 수월한 정규 분포에 가깝게 처리해주기 위해 로그 변환을 적용하였다.
이 모든 Feature들을 결합하여, 텍스트 임베딩을 해주었는데, 텍스트 임베딩 생성 모델에는 'all-MiniLM-L6-v2' 라는 SentenceTransformer 모델을 사용하였다. 이 모델은 Sentence Embaddings를 생성하는데에 특화된 모델이다.
문장이나 단락의 의미를 숫자 벡터로 변환하고, 이를 통해 컴퓨터가 텍스트의 의미를 이해하고 다양한 자연어 처리(NLP) 작업에 활용할 수 있다.
모델 및 데이터셋 정의
# 3-1. 모델 아키텍처 (가격 피처 결합)
class TwoTowerModel(nn.Module):
def __init__(self, num_users, precomputed_item_embeddings, text_embedding_dim, final_embedding_dim=64):
super(TwoTowerModel, self).__init__()
self.final_embedding_dim = final_embedding_dim
# User Tower
self.user_embedding = nn.Embedding(num_users, final_embedding_dim)
# Item Tower
self.item_text_embedding = nn.Embedding.from_pretrained(precomputed_item_embeddings, freeze=True)
# 텍스트 임베딩과 가격 피처(1차원)를 합친 후 최종 임베딩 차원으로 매핑하는 MLP
self.item_mlp = nn.Sequential(
nn.Linear(text_embedding_dim + 1, (text_embedding_dim + 1) // 2),
nn.ReLU(),
nn.Linear((text_embedding_dim + 1) // 2, final_embedding_dim)
)
def get_user_vector(self, user_ids):
return self.user_embedding(user_ids)
def get_item_vector(self, item_ids, item_prices):
text_vecs = self.item_text_embedding(item_ids)
# 가격 피처를 [batch_size, 1] 형태로 만들어 텍스트 임베딩과 결합
combined_vec = torch.cat([text_vecs, item_prices.unsqueeze(1)], dim=1)
return self.item_mlp(combined_vec)
< User Tower >
User Tower에는 사용자 ID를 임베딩 벡터로 변환하는 nn.Embedding 레이어를 포함하고 있다.
num_users는 임베딩을 할 고유한 사용자 ID의 개수이다. final_embedding_dim은 각 사용자 ID에 해당하는 임베딩 벡터의 차원수이다. 예를 들어서, num_users = 1000, final_embedding_dim = 128 이라면, 이 레이어는 1000 x 128 크기의 임베딩 행렬을 가지게 된다.
< Item Tower >
nn.Embedding.from_pretrained() : 미리 계산된(Sentence Transformer로 생성한 아이템 텍스트 임베딩)을 사용하여 임베딩 레이어를 초기화 한다.
precomputed_item_embeddings : item_text_embeddings 텐서가 여기에 전달된다. 이 텐서의 각 행이 특정 아이템의 텍스트 임베딩 벡터가 된다.
그리고 freeze=True 를 써주어, 임베딩 레이어의 가중치를 학습 과정에서 업데이트 하지 않게 하였다.
다층 퍼셉트론(MLP)로 텍스트 임베딩과 가격 피처를 결합하여 최종 아이템 임베딩을 생성해주었다.
get_item_vector 메서드에서 기존 1차원 배열 형태의 가격 feature를 torch.cat을 위해 텍스트 임베딩과 차원을 맞춰주었고, 각 아이템 임베딩 벡터의 차원 방향으로 결합해주었다. 최종적으로 final_embedding_dim 차원의 아이템 임베딩을 반환한다.
네거티브 샘플링을 포함한 데이터셋 클래스
class TripletDataset(Dataset):
def __init__(self, interactions_df, item_prices_dict, all_item_indices):
self.users = torch.LongTensor(interactions_df['user_idx'].values)
self.pos_items = torch.LongTensor(interactions_df['item_idx'].values)
self.item_prices = item_prices_dict
self.all_item_indices = all_item_indices
self.num_items = len(all_item_indices)
def __len__(self):
return len(self.users)
def __getitem__(self, idx):
user_id = self.users[idx]
pos_item_id = self.pos_items[idx]
# 네거티브 샘플링
while True:
neg_item_id = np.random.randint(0, self.num_items)
# 여기서는 단순 랜덤 샘플링을 사용 (개선 가능)
if neg_item_id != pos_item_id.item():
break
pos_item_price = torch.tensor(self.item_prices.get(pos_item_id.item(), 0.0), dtype=torch.float)
neg_item_price = torch.tensor(self.item_prices.get(neg_item_id, 0.0), dtype=torch.float)
return user_id, pos_item_id, neg_item_id, pos_item_price, neg_item_price
여기서는 nn.TripletMarginLoss와 같은 손실 함수에 필요한 트리플릿(Anchor, Positive, Negative) 형태로 데이터를 제공한다.
- Anchor (앵커): 사용자의 임베딩 벡터
- Positive (긍정 샘플): 사용자가 실제로 상호작용(예: 구매, 클릭)한 아이템의 임베딩 벡터
- Negative (부정 샘플): 사용자가 상호작용하지 않은 아이템 중 임의로 선택된 아이템의 임베딩 벡터
앵커와 긍정 샘플은 가깝게, 부정 샘플과는 멀게 만드는 것이다.
학습 및 검증
# 데이터 분할
df_interactions = df[['user_idx', 'item_idx']].drop_duplicates().reset_index(drop=True)
train_val_df, test_df = train_test_split(df_interactions, test_size=0.1, random_state=42)
train_df, val_df = train_test_split(train_val_df, test_size=0.15, random_state=42)
print(f"Train size: {len(train_df)}, Validation size: {len(val_df)}, Test size: {len(test_df)}")
all_items = np.arange(num_items)
train_dataset = TripletDataset(train_df, item_prices, all_items)
val_dataset = TripletDataset(val_df, item_prices, all_items)
train_loader = DataLoader(train_dataset, batch_size=512, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=512, shuffle=False, num_workers=4, pin_memory=True)
# 4-2. 모델 학습 루프 (검증 및 조기 종료 포함)
final_embedding_dim = 128
model = TwoTowerModel(num_users, item_text_embeddings, text_embedding_dim, final_embedding_dim).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
# TripletLoss: anchor와 positive는 가깝게, anchor와 negative는 멀게 만듦
criterion = nn.TripletMarginLoss(margin=1.0)
num_epochs = 20
best_val_loss = float('inf')
patience = 3
patience_counter = 0
print("Starting model training...")
for epoch in range(num_epochs):
model.train()
total_train_loss = 0
for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Training]"):
user_ids, pos_item_ids, neg_item_ids, pos_prices, neg_prices = [b.to(device) for b in batch]
optimizer.zero_grad()
user_vec = model.get_user_vector(user_ids)
pos_item_vec = model.get_item_vector(pos_item_ids, pos_prices)
neg_item_vec = model.get_item_vector(neg_item_ids, neg_prices)
loss = criterion(user_vec, pos_item_vec, neg_item_vec)
loss.backward()
optimizer.step()
total_train_loss += loss.item()
avg_train_loss = total_train_loss / len(train_loader)
# 검증
model.eval()
total_val_loss = 0
with torch.no_grad():
for batch in tqdm(val_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Validation]"):
user_ids, pos_item_ids, neg_item_ids, pos_prices, neg_prices = [b.to(device) for b in batch]
user_vec = model.get_user_vector(user_ids)
pos_item_vec = model.get_item_vector(pos_item_ids, pos_prices)
neg_item_vec = model.get_item_vector(neg_item_ids, neg_prices)
loss = criterion(user_vec, pos_item_vec, neg_item_vec)
total_val_loss += loss.item()
avg_val_loss = total_val_loss / len(val_loader)
print(f"Epoch {epoch+1}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")
# 조기 종료 및 모델 저장
if avg_val_loss < best_val_loss:
best_val_loss = avg_val_loss
torch.save(model.state_dict(), os.path.join(artifacts_folder, "best_model.pth"))
print(f"Validation loss improved. Saved best model to '{artifacts_folder}/best_model.pth'")
patience_counter = 0
else:
patience_counter += 1
print(f"Validation loss did not improve. Patience: {patience_counter}/{patience}")
if patience_counter >= patience:
print("Early stopping triggered.")
break
1. 상호작용 데이터 준비
User - Item 쌍이 한 번만 학습 데이터에 포함되도록 하기 위해서, drop_duplicates()를 사용하여 중복을 제거하였다.
2. 데이터셋 분할
전체 상호작용 데이터(df_interactions) 를 Train, Validation, Test 세트로 각각 76.5%, 13.5%, 10% 비율로 나눴다.
3. TripletDataset 인스턴스 생성
- all_items = np.arange(num_items) : 전체 아이템의 인덱스 배열을 생성 -> 부정 샘플링을 위한 준비
- train_dataset = TripletDataset(train_df, item_prices, all_items) : 학습용 상호작용 데이터(train_df), 아이템 가격 정보(item_prices), 전체 아이템 인덱스(all_items)를 사용하여 학습용 TripletDataset을 만든다.
- 검증용 TripletDataset도 위와 동일한 방법으로 만들어주었다.
4. DataLoader 생성
학습 데이터는 Epoch마다 섞어서 모델이 데이터의 순서에 의존하지 않고 일반화 능력을 향상시키도록 해주었다.
num_workers는 데이터를 로드하고 전처리하는 데 사용할 서브프로세스의 수를 지정한다.
모델이 한 batch에 대한 학습을 진행하는 동안, num_workers는 다음 배치를 미리 준비해 둔다.
pin_memory=True 옵션은 DataLoader가 데이터를 CPU 메모리에 미리 고정된 상태로 로드하여, GPU 데이터를 복사할 때 더 효율적으로 전송 되도록 돕는다고 한다.
5. 모델 학습 루프
학습 단계에서는 train_loader 에서 배치 데이터를 가져와 forward pass, 손실 계산, backward pass, 옵티마이저 스텝을 수행한다.
검증 단계에서는 각 Epoch이 끝날 때마다 model.eval() 로 설정하고, val_loader 를 사용하여 모델 성능을 검증한다.
검증 손실이 더 이상 개선되지 않으면 학습을 조기 중단하여 과적합을 방지하고, 가장 좋은 성능을 보였던 모델의 가중치를 저장하도록 했다.
Artifacts 저장
print("\n--- 5. Saving Artifacts ---")
# 1. 가장 성능이 좋았던 모델 가중치 이름 변경
if os.path.exists(os.path.join(artifacts_folder, "best_model.pth")):
os.rename(os.path.join(artifacts_folder, "best_model.pth"), os.path.join(artifacts_folder, "two_tower_model.pth"))
print("Saved best model as 'two_tower_model.pth'")
# 2. 아이템 텍스트 임베딩 저장
torch.save(item_text_embeddings, os.path.join(artifacts_folder, "item_text_embeddings.pt"))
# 3. 가격 스케일러 저장
with open(os.path.join(artifacts_folder, "price_scaler.pkl"), 'wb') as f:
pickle.dump(price_scaler, f)
# 4. 매핑 및 데이터 저장
mappings = {
'user_categories': df['session_id'].astype('category').cat.categories,
'item_categories': df['item_id'].astype('category').cat.categories,
'idx_to_item_original': {idx: original_id for idx, original_id in enumerate(df['item_id'].astype('category').cat.categories)},
'item_titles': item_titles,
'item_idx_to_c1': item_idx_to_c1,
'item_idx_to_c2': item_idx_to_c2,
'item_idx_to_brand': item_idx_to_brand,
'item_idx_to_condition': item_idx_to_condition,
'item_prices': item_prices,
'final_embedding_dim': final_embedding_dim,
'text_embedding_dim': text_embedding_dim
}
with open(os.path.join(artifacts_folder, "mappings.pkl"), 'wb') as f:
pickle.dump(mappings, f)
# 5. 테스트 데이터 저장
test_df.to_pickle(os.path.join(artifacts_folder, "test_df.pkl"))
print(f"Artifacts saved to '{artifacts_folder}' folder.")
print("\n--- Training Script Finished ---")
학습이 완료된 후, 추후 백엔드와 연동하여 활용할 수 있게 Artifacts를 저장해주었다.
- two_tower_model.pth : 최종 학습된 모델의 가중치
- item_text_embeddings.pt : 모든 아이템의 사전 계산된 텍스트 임베딩
- price_scaler.pkl : 가격 스케일링에 사용된 StandardScaler 객체 (새로운 아이템 가격을 처리할 때 사용)
- mappings.pkl : 사용자 및 아이템의 원본 ID와 인덱스 간의 매핑 정보, 아이템의 메타데이터(이름, 카테고리, 브랜드, 상태), 그리고 최종 임베딩 차원 및 텍스트 임베딩 차원 등의 필요한 정보들을 딕셔너리 형태로 저장
이렇게 까지 하고 "사용자의 실시간으로 업데이트 되는 행동 데이터들은 어떡하지?"라는 생각이 들었다.
그래서 처음엔 막연히 모델에게 그때 그때 재학습을 시켜서 구현을 해야하나 싶었지만, 유저 한 명, 한 명의 실시간으로 쌓여가는 수많은 행동데이터를 그때마다 모델에게 학습을 시킨다는 것은 거의 불가능한 일이었다.
그렇게 방법을 찾고 찾은 결과, 이와 관련된 코드는 백엔드로 구현을 하게 되었는데...
이후로 작업했던 백엔드와 간단한 테스트용 프론트 작업에 대해서는 다음 포스팅에서 다룰 예정이다.
'MS AI School' 카테고리의 다른 글
[MS AI School] 1차 프로젝트 Record - 최종 (2) | 2025.06.17 |
---|---|
[MS AI School] 1차 프로젝트 Record - 3 (0) | 2025.06.16 |
[MS AI School] 1차 프로젝트 Recorde - 1 (4) | 2025.06.05 |
MS AI School 7기 - 우수생 선정 되다 ! (1) | 2025.06.04 |
MS AI School 7기 한 달차 후기 (0) | 2025.05.13 |