147 lines
5.7 KiB
Python
147 lines
5.7 KiB
Python
import os
|
|
|
|
import torch
|
|
from sentence_transformers import SentenceTransformer
|
|
import chromadb
|
|
from chromadb.utils import embedding_functions
|
|
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
|
|
from fastapi import FastAPI
|
|
|
|
# 2. 벡터 DB 설정
|
|
persist_directory = "./chroma_db"
|
|
chroma_client = chromadb.PersistentClient(path=persist_directory)
|
|
|
|
# ✅ Chroma 전용 임베딩 함수 사용 (오류 방지)
|
|
sentence_transformer_ef = embedding_functions.SentenceTransformerEmbeddingFunction(
|
|
model_name="all-MiniLM-L6-v2"
|
|
)
|
|
|
|
# 컬렉션 생성
|
|
collection = chroma_client.get_or_create_collection(
|
|
name="manuals",
|
|
embedding_function=sentence_transformer_ef # ← 여기가 핵심!
|
|
)
|
|
|
|
_model = None
|
|
_tokenizer = None
|
|
|
|
def get_qwen_model() :
|
|
global _model, _tokenizer
|
|
if _model is None:
|
|
model_name = "Qwen/Qwen3-0.6B"
|
|
_tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
|
|
_model = AutoModelForCausalLM.from_pretrained(
|
|
model_name,
|
|
torch_dtype=torch.float32, # CPU 안정성
|
|
device_map="auto",
|
|
trust_remote_code=True
|
|
)
|
|
return _model, _tokenizer
|
|
|
|
def init(job: str) :
|
|
print(f'{job} init start')
|
|
# 1. 문서 준비 (실제로는 PDF/Word 등에서 추출)
|
|
manuals = [
|
|
"지출 결의서는 사용 목적, 금액, 일자, 증빙 서류를 반드시 첨부하여 전자 결재 시스템에 등록해야 합니다.",
|
|
"월말 마감은 매월 25일부터 시작되며, 모든 부서는 28일까지 비용 집행 내역을 최종 확정해야 합니다.",
|
|
"외화 송금은 반드시 외환관리부의 사전 승인을 받은 후 금융팀을 통해 진행되어야 하며, 계약서 사본을 첨부해야 합니다.",
|
|
"세금계산서는 발행일로부터 10일 이내에 ERP에 등록되지 않으면 비용 처리가 불가합니다.",
|
|
"장기자산(차량, 사무기기 등)은 매년 1월에 정기 감가상각 점검을 받아야 하며, 자산관리부서가 이를 주관합니다.",
|
|
"현금 보관은 원칙적으로 금지되며, 불가피한 경우는 금고 보관 후 당일 중 재무팀에 입금 처리해야 합니다.",
|
|
"연말 정산 대상 직원은 매년 12월 10일까지 개인 소득공제 자료를 인사 시스템에 제출해야 합니다.",
|
|
"예산 초과 지출은 사전에 재무부와 협의 후 예산 조정 승인을 받아야 하며, 미승인 시 결재가 거부됩니다.",
|
|
"재무 제표 초안은 분기 마감 후 5영업일 이내에 감사법인에 제출되어야 하며, 최종 승인은 CFO가 담당합니다.",
|
|
]
|
|
|
|
# 문서 ID 생성 및 추가
|
|
doc_ids = [f"DOC_{job}_{i}" for i in range(len(manuals))]
|
|
# collection.add(documents=manuals, ids=doc_ids)
|
|
collection.add(
|
|
documents=manuals,
|
|
ids=doc_ids,
|
|
metadatas=[{"source": "fi_manual_v1.pdf", "version":1.0, "dept": job} for _ in doc_ids]
|
|
)
|
|
|
|
print(f'{job} init end')
|
|
|
|
# 3. 질의 처리
|
|
def query_and_summarize(job: str, query: str, top_k: int = 3):
|
|
|
|
# 관련 문서 검색
|
|
results = collection.query(
|
|
query_texts=[query],
|
|
n_results=5,
|
|
where={"dept": job}
|
|
)
|
|
# results = collection.query(query_texts=[query], n_results=top_k)
|
|
cosine_similarities = [1 - d for d in results['distances'][0]]
|
|
print("유사도:", cosine_similarities)
|
|
# 출력 예: [0.610, 0.473, 0.154, 0.142]
|
|
|
|
context_with_score = ""
|
|
for i, (doc, dist) in enumerate(zip(results['documents'][0], results['distances'][0])):
|
|
sim = 1 - dist
|
|
context_with_score += f"[문서 {i+1} | 유사도: {sim:.3f}]\n{doc}\n\n"
|
|
|
|
print(context_with_score)
|
|
print("\n\n\n\n\n")
|
|
top_doc = results['documents'][0][0]
|
|
|
|
# ✅ 명시적으로 모델과 토크나이저 로드
|
|
model, tokenizer = get_qwen_model()
|
|
|
|
messages = [
|
|
{"role": "system", "content": "당신은 회사 재무/회계 업무 전문 어시스턴트입니다. 문서 내용은 그대로 사용자에게 보여 줘야 하며 이를 기반으로 부가설명을 정확하고 상세하게 답변하세요."},
|
|
{"role": "user", "content": f"다음 문서를 참고하세요:\n{top_doc}\n\n질문: {query}"}
|
|
]
|
|
|
|
text = tokenizer.apply_chat_template(
|
|
messages,
|
|
tokenize=False,
|
|
add_generation_prompt=True,
|
|
enable_thinking=False # Switches between thinking and non-thinking modes. Default is True.
|
|
)
|
|
model_inputs = tokenizer([text], return_tensors="pt").to(model.device)
|
|
|
|
# conduct text completion
|
|
generated_ids = model.generate(
|
|
**model_inputs,
|
|
max_new_tokens=500
|
|
)
|
|
output_ids = generated_ids[0][len(model_inputs.input_ids[0]):].tolist()
|
|
|
|
# parsing thinking content
|
|
try:
|
|
# rindex finding 151668 (</think>)
|
|
index = len(output_ids) - output_ids[::-1].index(151668)
|
|
except ValueError:
|
|
index = 0
|
|
|
|
thinking_content = tokenizer.decode(output_ids[:index], skip_special_tokens=True).strip("\n")
|
|
end_think_id = tokenizer.convert_tokens_to_ids("</think>")
|
|
if end_think_id in output_ids:
|
|
idx = len(output_ids) - output_ids[::-1].index(end_think_id)
|
|
else:
|
|
idx = 0
|
|
content = tokenizer.decode(output_ids[idx:], skip_special_tokens=True).strip()
|
|
|
|
print(top_doc)
|
|
print("\n\n\n\n\n")
|
|
print("thinking content:", thinking_content)
|
|
print("\n\n\n\n\n")
|
|
return content
|
|
|
|
app = FastAPI()
|
|
|
|
@app.get("/")
|
|
def question(query: str) :
|
|
user_query = query
|
|
answer = query_and_summarize(job="FI", query=user_query)
|
|
return {"answer": answer}
|
|
|
|
# 예시 사용
|
|
if __name__ == "__main__":
|
|
# init(job="FI")
|
|
# FI : 재무 HR : 인사
|
|
print('1')
|
|
# 실행방법 uvicorn manual:app --reload |