import os from threading import Thread import torch from sentence_transformers import SentenceTransformer import chromadb from chromadb.utils import embedding_functions from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer import json from fastapi import FastAPI from fastapi.responses import StreamingResponse from fastapi.middleware.cors import CORSMiddleware # # === 경로 설정 (모두 로컬) === QWEN_MODEL_PATH = "./models/Qwen3-0.6B" EMBEDDING_MODEL_PATH = "./models/all-MiniLM-L6-v2" # 2. 벡터 DB 설정 persist_directory = "./chroma_db" chroma_client = chromadb.PersistentClient(path=persist_directory) collection = chroma_client.get_or_create_collection( name="manuals", ) _model = None _tokenizer = None def get_qwen_model(): global _model, _tokenizer if _model is not None: return _model, _tokenizer _tokenizer = AutoTokenizer.from_pretrained( QWEN_MODEL_PATH, trust_remote_code=True, local_files_only=True ) _model = AutoModelForCausalLM.from_pretrained( QWEN_MODEL_PATH, dtype=torch.bfloat16, device_map="auto", trust_remote_code=True, local_files_only=True ) # ✅ torch.compile() 적용 (PyTorch 2.0+) if hasattr(torch, 'compile'): try: print("🚀 torch.compile() 적용 중...") # mode="reduce-overhead": 추론 시 추천 # dynamic=False: 고정 입력 크기 시 더 빠름 (보통 RAG는 입력 길이가 유동적이므로 dynamic=True 권장) _model = torch.compile( _model, mode="reduce-overhead", # 또는 "max-autotune" (초기 컴파일 시간 ↑, 이후 속도 ↑↑) dynamic=True # 입력 길이가 매번 다르므로 동적 크기 허용 ) print("✅ torch.compile() 성공!") except Exception as e: print(f"⚠️ torch.compile() 실패, 원본 모델 사용: {e}") pass # 실패하면 그냥 원본 사용 return _model, _tokenizer def query_and_summarize(job: str, query: str, top_k: int = 3, min_similarity: float = 0.2): from datetime import datetime print(f'1 {datetime.now()}') # 관련 문서 검색 (top_k보다 여유 있게 가져옴) results = collection.query( query_texts=[query], n_results=top_k + 2, # 여유분 확보 where={"dept": job} ) print(f'{datetime.now()}') if not results['documents'][0]: return "관련 문서를 찾을 수 없습니다." print(f'2 {datetime.now()}') # 유사도 계산 및 필터링 filtered_docs = [] for doc, dist in zip(results['documents'][0], results['distances'][0]): similarity = 1 - dist if similarity >= min_similarity: filtered_docs.append((doc, similarity)) if len(filtered_docs) >= top_k: break print(f'3 {datetime.now()}') if not filtered_docs: return "유사도 기준을 만족하는 문서가 없습니다." # 컨텍스트 생성 (유사도 내림차순 정렬은 이미 Chroma가 보장) context_parts = [] for i, (doc, sim) in enumerate(filtered_docs): context_parts.append(f"[청크 {i+1} | 유사도: {sim:.3f}]\n{doc}") context = "\n\n".join(context_parts) print(f'4 {datetime.now()}') # 모델 로드 model, tokenizer = get_qwen_model() print(f'5 {datetime.now()}') # 프롬프트 구성 messages = [ { "role": "system", "content": ( "당신은 회사 재무/회계 업무 전문 어시스턴트입니다. " "사용자에게 제공된 여러 청크를 종합하여, 정확하고 상세하게 답변하세요. " "필요시 문서 내용을 직접 인용하거나 요약해도 됩니다. " "추측하지 말고, 문서에 근거한 정보만 사용하세요." ) }, { "role": "user", "content": f"다음 문서들을 참고하세요:\n\n{context}\n\n질문: {query}" } ] print(f'6 {datetime.now()}') # 토큰화 및 생성 text = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True, enable_thinking=False # 작은 모델에선 비활성화 권장 ) model_inputs = tokenizer([text], return_tensors="pt").to(model.device) print(f'7 {datetime.now()}') generated_ids = model.generate( **model_inputs, max_new_tokens=150, do_sample=True, # ✅ 샘플링 활성화 temperature=0.3, top_p=0.9, pad_token_id=tokenizer.eos_token_id ) print(f'8 {datetime.now()}') input_len = model_inputs.input_ids.shape[1] output_ids = generated_ids[0][input_len:] response = tokenizer.decode(output_ids, skip_special_tokens=True).strip() print(f'9 {datetime.now()}') return response def query_and_summarize_stream(job: str, query: str, top_k: int = 3, min_similarity: float = 0.2): from datetime import datetime print(f'1 {datetime.now()}') # 관련 문서 검색 (top_k보다 여유 있게 가져옴) results = collection.query( query_texts=[query], n_results=top_k + 2, # 여유분 확보 where={"dept": job} ) print(f'{datetime.now()}') if not results['documents'][0]: def generate_empty(): yield json.dumps({"kind": "text", "text": "관련 문서를 찾을 수 없습니다."}) + "\n" return generate_empty print(f'2 {datetime.now()}') # 유사도 계산 및 필터링 filtered_docs = [] for doc, dist in zip(results['documents'][0], results['distances'][0]): similarity = 1 - dist if similarity >= min_similarity: filtered_docs.append((doc, similarity)) if len(filtered_docs) >= top_k: break print(f'3 {datetime.now()}') if not filtered_docs: def generate_low_sim(): yield json.dumps({"kind": "text", "text": "유사도 기준을 만족하는 문서가 없습니다."}) + "\n" return generate_low_sim # 컨텍스트 생성 context_parts = [] for i, (doc, sim) in enumerate(filtered_docs): context_parts.append(f"[청크 {i+1} | 유사도: {sim:.3f}]\n{doc}") context = "\n\n".join(context_parts) print(f'4 {datetime.now()}') # 모델 로드 model, tokenizer = get_qwen_model() print(f'5 {datetime.now()}') # 프롬프트 구성 messages = [ { "role": "system", "content": ( "당신은 회사 재무/회계 업무 전문 어시스턴트입니다. " "사용자에게 제공된 여러 청크를 종합하여, 정확하고 상세하게 답변하세요. " "필요시 문서 내용을 직접 인용하거나 요약해도 됩니다. " "추측하지 말고, 문서에 근거한 정보만 사용하세요." ) }, { "role": "user", "content": f"다음 문서들을 참고하세요:\n\n{context}\n\n질문: {query}" } ] print(f'6 {datetime.now()}') # 토큰화 text = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True, enable_thinking=False ) model_inputs = tokenizer([text], return_tensors="pt").to(model.device) print(f'7 {datetime.now()}') # 스트리머 설정 streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) # 생성 인자 설정 generation_kwargs = dict( **model_inputs, streamer=streamer, max_new_tokens=150, do_sample=True, temperature=0.3, top_p=0.9, pad_token_id=tokenizer.eos_token_id ) # 별도 스레드에서 생성 실행 thread = Thread(target=model.generate, kwargs=generation_kwargs) thread.start() print(f'8 {datetime.now()}') # 제너레이터 함수 정의 def generate(): for new_text in streamer: if new_text: yield json.dumps({"kind": "text", "text": new_text}) + "\n" print(f'9 {datetime.now()}') return generate # FastAPI 앱 app = FastAPI() # CORS 설정 추가 app.add_middleware( CORSMiddleware, allow_origins=["*"], # 모든 출처 허용 allow_credentials=True, allow_methods=["*"], # 모든 HTTP 메서드 허용 allow_headers=["*"], # 모든 헤더 허용 ) @app.get("/") def question(query: str): # answer = query_and_summarize(job="FI", query=query) # return {"answer": answer} generate = query_and_summarize_stream(job="FI", query=query) return StreamingResponse(generate(), media_type="application/x-ndjson") # 개발용 실행 (직접 실행 시) if __name__ == "__main__": query_and_summarize_stream(job="FI", query='외화 송금 방법?') import uvicorn # print("서버 시작: uvicorn manual_offline:app --reload") # 예시 질의 (주석 해제 시 직접 테스트 가능) # print(query_and_summarize("FI", "외화 송금 절차는 어떻게 되나요?"))