[MS AI School] 1차 프로젝트 Record - 3

2025. 6. 16. 21:07MS AI School

저번 two-tower 모델 구현에 이어, 백엔드 구현을 어떤 식으로 하게 되었는지 작성해보려고 한다.

현재의 가장 큰 문제는 저번 포스트에서도 언급 했다시피, 모델은 데이터셋에 존재하는 데이터만을

기반으로 하여 추천을 하게 되어있다.

하지만 실제로 사용자는 새로운 행동 데이터가 추가가 될 것이고, 그에 따른 모델의 추천 결과도 달라져야 한다.

이를 도대체 어떻게 구현을 해야할까?

 

먼저 백엔드 구현은 빠른 API 구축이 가능한 FastAPI를 채택하여 사용하였다.

FastAPI는 한 번도 사용해본 적이 없었지만, 요즘 코딩 성능이 가장 좋다는 평까지 나오고 있는,

Gemini의 도움을 받아 조금씩 배워가며 코드를 작성할 수 있었다. 


 

처음에 나는 사용자의 새로운 행동 데이터가 생길때 마다 실시간으로 모델 재학습을 진행하는 방법을 생각했다.

 

당연히 말도 안되는 소리이다.

지금 현재 모델을 학습하는 시간도 거의 1시간 가까이 걸렸는데, 이 과정을 수많은 사용자들의

데이터가 생성될때마다 진행을 한다? 이건 사실상 불가능한 일이다.

 

여러가지 방법을 시도해본 후, 하나의 방법을 고안해내게 되었다.

사용자의 데이터를 단기 데이터 / 장기 데이터로 나눈 후, 사용자의 기존 데이터는 장기 데이터, 새롭게 추가된 데이터는 단기 데이터로 저장하여 두 개의 데이터를 모두 고려한 상품을 추천하게 구현해보았다.

@app.post("/events", status_code=202)
def track_event(event: Event):
    # 사용자의 행동 (클릭, 검색) 이벤트를 기록합니다.
    if not SERVER_IS_READY:
        raise HTTPException(status_code=503, detail="Server is not ready.")
    
    # 1. 모든 이벤트를 전체 히스토리에 기록 (이력 조회를 위함)
    USER_INTERACTION_HISTORY[event.session_id].append(
        {"event_type": event.event_type, "value": event.value}
    )

    # 2. 클릭/검색 이벤트를 단기 행동 기록에 추가
    if event.event_type == 'click' or event.event_type == 'search':
        # 벡터를 바로 업데이트하는 대신, 행동 자체를 기록
        USER_RECENT_ACTIONS[event.session_id].append(event.value) # <--- 여기에 최근 행동이 저장됨.
        print(f"Action '{event.value}' added to recent history for session: {event.session_id}")
        
    return {"message": "Event tracked"}

 

이 함수는 웹 서비스에서 사용자의 클릭, 검색과 같은 이벤트가 발생할 때 호출되어, 해당 이벤트를 USER_RECENT_ACTIONS라는 메모리 상의 큐에 저장한다. 이 큐에 저장된 정보가 나중에 단기 추천에 활용된다.

# --- 유틸리티 함수 ---
def find_internal_item_vector_by_title(title: str) -> Optional[torch.Tensor]:
    """외부 상품 제목과 가장 유사한 내부 아이템을 찾아 그 벡터를 반환합니다."""
    if not SERVER_IS_READY: return None
    try:
        title_tfidf = tfidf_vectorizer.transform([title])
        similarities = cosine_similarity(title_tfidf, tfidf_matrix).flatten()
        most_similar_item_idx = similarities.argmax()
        
        # 유사도가 매우 낮으면 (e.g., 0.1 미만) 관련 없는 아이템으로 간주하고 무시
        if similarities[most_similar_item_idx] < 0.1:
            print(f"No relevant internal item found for title: '{title}'")
            return None
            
        print(f"Found internal item '{all_titles[most_similar_item_idx]}' for external title '{title}'")
        return ALL_ITEM_VECTORS[most_similar_item_idx]
    except Exception as e:
        print(f"Error finding internal item by title: {e}")
        return None

def get_base_user_vector(session_id: str) -> Optional[torch.Tensor]:
    """모델에서 사용자의 기본(장기) 벡터를 가져옵니다."""
    user_idx = SESSION_TO_IDX.get(session_id)
    if user_idx is None: return None
    with torch.no_grad():
        return model.get_user_vector(torch.LongTensor([user_idx]).to(device))

def get_user_vector(session_id: str) -> Optional[torch.Tensor]:
    """
    세션의 단기 사용자 벡터를 계산합니다.
    최근 행동(최대 10개)의 평균 벡터를 반환합니다.
    최근 행동이 없으면 None을 반환합니다.
    """
    recent_actions = USER_RECENT_ACTIONS.get(session_id)
    if not recent_actions:
        return None # 최근 행동이 없음

    # 최근 행동들의 아이템 벡터를 수집
    recent_item_vectors = []
    print(f"Calculating short-term vector from actions: {list(recent_actions)}")
    for action_title in recent_actions:
        item_vector = find_internal_item_vector_by_title(action_title)
        if item_vector is not None:
            recent_item_vectors.append(item_vector)

    if not recent_item_vectors:
        return None # 유효한 아이템 벡터가 없음

    # 아이템 벡터들의 평균을 계산하여 순수 단기 벡터 생성
    short_term_vector = torch.mean(torch.stack(recent_item_vectors), dim=0)
    return short_term_vector

 

 

find_internal_item_vector_by_title(title: str)

사용자가 클릭하거나 검색한 상품 제목이 주어졌을 때, 시스템이 알고 있는 수많은 내부 상품들 중에서 가장 비슷한 상품을 찾아 그 상품의 벡터(임베딩)를 가져오는 함수이다.

 

*TF-IDF 와 코사인 유사도를 사용해서 제목 간의 유사도를 측정하고, 가장 유사한 내부 아이템의 미리 계산된 임베딩 벡터를 반환한다.

 

*TF-IDF  : "Term Frequency-Inverse Document Frequency"의 약자로, 특정 단어가 문서 집합(코퍼스) 내에서 얼마나 중요한지를 측정하는 통계적 가중치 방법

쉽게 말해, 어떤 단어가 특정 문서에서 자주 나오면서도 다른 문서에서는 잘 나오지 않을수록 그 단어가 해당 문서를 대표하는 중요한 단어라고 판단하는 방식이다.

 

get_base_user_vector(session_id: str)

이 함수는 특정 사용자의 고유한 취향(장기 선호도)을 나타내는 벡터를 가져온다.

이 벡터는 모델이 처음 학습될 때 사용자별로 고정적으로 생성된 값이며, 사용자의 전반적인 관심사를 대변한다고 볼 수 있다.

 

사용자 ID를 내부 인덱스로 변환한 다음, 사전 학습된 모델에서 해당 인덱스에 매핑된 사용자 임베딩 벡터를 직접 조회한다.

 

get_user_vector(session_id : str)

track_event 함수를 통해 기록된 사용자의 최근 10개 행동 (상품 제목)을 가져온다.

이 제목들 각각에 대해 위에서 설명한 find_internal_item_vector_by_title 함수를 써서 해당 상품의 임베딩 벡터를 찾는다.

이렇게 찾은 모든 벡터들의 평균을 내서 단기 사용자 벡터가 완성된다.

@app.get("/recommend/{session_id}", response_model=RecommendationResponse)
def get_recommendations(session_id: str, long_term_ratio: float = 0.7, num_recs: int = 30):
    if not SERVER_IS_READY: raise HTTPException(status_code=503, detail="Server not ready")

    long_term_vector = get_base_user_vector(session_id)
    if long_term_vector is None:
        raise HTTPException(status_code=404, detail=f"Session ID '{session_id}' not found.")

    short_term_vector = get_user_vector(session_id) # 최신 단기 벡터

    with torch.no_grad():
        # 장기 벡터는 항상 존재
        long_term_scores = torch.matmul(ALL_ITEM_VECTORS, long_term_vector.T).squeeze()

        if short_term_vector is not None:
            # 단기 벡터가 있으면 단기 점수도 계산
            short_term_scores = torch.matmul(ALL_ITEM_VECTORS, short_term_vector.T).squeeze()
            num_long_term_recs = int(num_recs * long_term_ratio)
            num_short_term_recs = num_recs - num_long_term_recs
        else:
            # 단기 벡터가 없으면 장기 추천만 제공
            print(f"No short-term vector for {session_id}, returning long-term recs only.")
            short_term_scores = None
            num_long_term_recs = num_recs
            num_short_term_recs = 0

        long_term_indices = torch.topk(long_term_scores, k=num_long_term_recs * 2).indices.tolist()
        
        if short_term_scores is not None:
            short_term_indices = torch.topk(short_term_scores, k=num_short_term_recs * 2).indices.tolist()
        else:
            short_term_indices = []

    # 2. 두 추천 목록을 조합
    combined_indices = []
    seen_indices = set()

    # 단기 추천(최신 관심사)을 먼저 일부 추가
    for idx in short_term_indices:
        if idx not in seen_indices:
            combined_indices.append(idx)
            seen_indices.add(idx)
        if len(seen_indices) >= num_short_term_recs:
            break
            
    # 장기 추천으로 나머지 채우기
    for idx in long_term_indices:
        if idx not in seen_indices:
            combined_indices.append(idx)
            seen_indices.add(idx)
        if len(combined_indices) >= num_recs:
            break

    # 3. 조합된 인덱스로부터 최종 추천 목록 생성 (내부 데이터 사용)
    final_recommendations = []
    for idx in combined_indices:
        title = mappings['item_titles'].get(idx)
        if title:
            final_recommendations.append(Product(
                title=title,
                link="#", # 내부 아이템은 링크가 없음
                price=mappings['item_prices'].get(idx),
                thumbnail=None,
                source="Internal DB"
            ))

    internal_titles = [rec.title for rec in final_recommendations]
    print(f"Blended internal recommendations for {session_id}: {internal_titles}")

    return RecommendationResponse(session_id=session_id, recommendations=final_recommendations)

 

  1. 사용자 취향 벡터 준비:
    •  long_term_vector를 통해 사용자의 장기기억 데이터를 가져오고, short_term_vector를 통해 사용자의 최근 행동(클릭, 검색)으로 파악된 단기적인 관심사를 동적으로 생성한다.
  2. 아이템 추천 점수 계산:
    • ALL_ITEM_VECTORS (모든 상품의 임베딩 벡터)와 장기 데이터 벡터의 유사도를 계산하여 장기 추천 점수를 매김.
    • 단기 데이터 벡터가 있다면, 이 벡터와 ALL_ITEM_VECTORS의 유사도를 계산하여 단기 추천 점수를 매김.
  3. 장기/단기 추천 조합 (블렌딩):
    • long_term_ratio 에 따라 최종 추천 목록에서 장기/단기 추천이 차지할 비율을 결정.
    • 단기 추천 점수가 높은 상품들을 우선적으로 목록에 채우고, 나머지 자리를 장기 추천 점수가 높은 상품들로 채운다.
    • 이때 중복된 상품은 제거하여 최종 추천 목록을 만든다.
    • 이 블렌딩 과정을 통해 사용자의 최신 관심사를 빠르게 반영하면서도, 기존의 선호도도 잃지 않도록 균형을 맞춘다.
  4. 최종 추천 목록 반환:
    • 조합된 상품 인덱스를 바탕으로 실제 상품 정보(제목, 가격 등)를 가져와 Product 객체 리스트 형태로 사용자에게 반환.