151 lines
5.1 KiB
Python
151 lines
5.1 KiB
Python
import os
|
|
from threading import Thread
|
|
|
|
import torch
|
|
from sentence_transformers import SentenceTransformer
|
|
import chromadb
|
|
import json
|
|
from chromadb.utils import embedding_functions
|
|
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer
|
|
from fastapi import FastAPI
|
|
|
|
# 2. 벡터 DB 설정
|
|
persist_directory = "./chroma_db"
|
|
chroma_client = chromadb.PersistentClient(path=persist_directory)
|
|
|
|
# 컬렉션 생성
|
|
collection = chroma_client.get_or_create_collection(
|
|
name="manuals",
|
|
)
|
|
|
|
_model = None
|
|
_tokenizer = None
|
|
|
|
def get_qwen_model() :
|
|
global _model, _tokenizer
|
|
if _model is None:
|
|
model_name = "Qwen/Qwen3-0.6B"
|
|
_tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
|
|
_model = AutoModelForCausalLM.from_pretrained(
|
|
model_name,
|
|
torch_dtype=torch.float32, # CPU 안정성
|
|
device_map="auto",
|
|
trust_remote_code=True
|
|
)
|
|
return _model, _tokenizer
|
|
|
|
# 3. 질의 처리
|
|
def query_and_summarize(job: str, query: str, top_k: int = 3):
|
|
|
|
# 관련 문서 검색
|
|
results = collection.query(
|
|
query_texts=[query],
|
|
n_results=5,
|
|
where={"dept": job}
|
|
)
|
|
# results = collection.query(query_texts=[query], n_results=top_k)
|
|
cosine_similarities = [1 - d for d in results['distances'][0]]
|
|
print("유사도:", cosine_similarities)
|
|
# 출력 예: [0.610, 0.473, 0.154, 0.142]
|
|
|
|
context_with_score = ""
|
|
for i, (doc, dist) in enumerate(zip(results['documents'][0], results['distances'][0])):
|
|
sim = 1 - dist
|
|
context_with_score += f"[문서 {i+1} | 유사도: {sim:.3f}]\n{doc}\n\n"
|
|
|
|
print(context_with_score)
|
|
print("\n\n\n\n\n")
|
|
top_doc = results['documents'][0][0]
|
|
|
|
# ✅ 명시적으로 모델과 토크나이저 로드
|
|
model, tokenizer = get_qwen_model()
|
|
|
|
messages = [
|
|
{"role": "system", "content": "당신은 회사 재무/회계 업무 전문 어시스턴트입니다. 문서 내용은 그대로 사용자에게 보여 줘야 하며 이를 기반으로 부가설명을 정확하고 상세하게 답변하세요."},
|
|
{"role": "user", "content": f"다음 문서를 참고하세요:\n{top_doc}\n\n질문: {query}"}
|
|
]
|
|
|
|
text = tokenizer.apply_chat_template(
|
|
messages,
|
|
tokenize=False,
|
|
add_generation_prompt=True,
|
|
enable_thinking=False # Switches between thinking and non-thinking modes. Default is True.
|
|
)
|
|
model_inputs = tokenizer([text], return_tensors="pt").to(model.device)
|
|
|
|
# conduct text completion
|
|
generated_ids = model.generate(
|
|
**model_inputs,
|
|
max_new_tokens=500
|
|
)
|
|
output_ids = generated_ids[0][len(model_inputs.input_ids[0]):].tolist()
|
|
|
|
# parsing thinking content
|
|
try:
|
|
# rindex finding 151668 (</think>)
|
|
index = len(output_ids) - output_ids[::-1].index(151668)
|
|
except ValueError:
|
|
index = 0
|
|
|
|
thinking_content = tokenizer.decode(output_ids[:index], skip_special_tokens=True).strip("\n")
|
|
end_think_id = tokenizer.convert_tokens_to_ids("</think>")
|
|
if end_think_id in output_ids:
|
|
idx = len(output_ids) - output_ids[::-1].index(end_think_id)
|
|
else:
|
|
idx = 0
|
|
content = tokenizer.decode(output_ids[idx:], skip_special_tokens=True).strip()
|
|
|
|
print(top_doc)
|
|
print("\n\n\n\n\n")
|
|
print("thinking content:", thinking_content)
|
|
print("\n\n\n\n\n")
|
|
return content
|
|
|
|
def query_and_summarize_stream(job: str, query: str):
|
|
# 1. 문서 검색 (기존과 동일)
|
|
results = collection.query(query_texts=[query], n_results=1, where={"dept": job})
|
|
top_doc = results['documents'][0][0]
|
|
|
|
model, tokenizer = get_qwen_model()
|
|
|
|
# 2. 메시지 구성
|
|
messages = [
|
|
{"role": "system", "content": "당신은 회사 재무/회계 업무 전문 어시스턴트입니다."},
|
|
{"role": "user", "content": f"다음 문서를 참고하세요:\n{top_doc}\n\n질문: {query}"}
|
|
]
|
|
|
|
text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
|
|
model_inputs = tokenizer([text], return_tensors="pt").to(model.device)
|
|
|
|
# 3. 스트리머 설정
|
|
# skip_prompt=True를 해야 입력한 질문이 다시 나오지 않습니다.
|
|
streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
|
|
|
|
# 4. 별도 스레드에서 생성 실행 (비동기 처리를 위함)
|
|
generation_kwargs = dict(model_inputs, streamer=streamer, max_new_tokens=500)
|
|
thread = Thread(target=model.generate, kwargs=generation_kwargs)
|
|
thread.start()
|
|
|
|
# 5. 제너레이터 함수 정의
|
|
def generate():
|
|
for new_text in streamer:
|
|
if new_text:
|
|
# 클라이언트가 JSON으로 받길 원한다면 형식을 맞춰줍니다.
|
|
yield json.dumps({"kind": "text", "text": new_text}) + "\n"
|
|
|
|
return generate
|
|
|
|
app = FastAPI()
|
|
|
|
@app.get("/")
|
|
def question(query: str) :
|
|
user_query = query
|
|
answer = query_and_summarize_stream(job="FI", query=user_query)
|
|
return {"answer": answer}
|
|
|
|
# 예시 사용
|
|
if __name__ == "__main__":
|
|
# init(job="FI")
|
|
# FI : 재무 HR : 인사
|
|
print('1')
|
|
# 실행방법 uvicorn manual:app --reload |