|
|
|
|
|
import os |
|
from typing import Optional |
|
import dataclasses |
|
from pathlib import Path |
|
import hashlib |
|
import numpy as np |
|
from google import genai |
|
from google.genai import types |
|
from dotenv import load_dotenv |
|
from lightrag.utils import EmbeddingFunc, Tokenizer |
|
from lightrag import LightRAG, QueryParam |
|
from sentence_transformers import SentenceTransformer |
|
from lightrag.kg.shared_storage import initialize_pipeline_status |
|
import sentencepiece as spm |
|
import requests |
|
|
|
import asyncio |
|
import nest_asyncio |
|
|
|
|
|
nest_asyncio.apply() |
|
|
|
load_dotenv() |
|
gemini_api_key = os.getenv("GEMINI_API_KEY") |
|
|
|
WORKING_DIR = "./dickens" |
|
|
|
if os.path.exists(WORKING_DIR): |
|
import shutil |
|
|
|
shutil.rmtree(WORKING_DIR) |
|
|
|
os.mkdir(WORKING_DIR) |
|
|
|
|
|
class GemmaTokenizer(Tokenizer): |
|
|
|
|
|
@dataclasses.dataclass(frozen=True) |
|
class _TokenizerConfig: |
|
tokenizer_model_url: str |
|
tokenizer_model_hash: str |
|
|
|
_TOKENIZERS = { |
|
"google/gemma2": _TokenizerConfig( |
|
tokenizer_model_url="https://raw.githubusercontent.com/google/gemma_pytorch/33b652c465537c6158f9a472ea5700e5e770ad3f/tokenizer/tokenizer.model", |
|
tokenizer_model_hash="61a7b147390c64585d6c3543dd6fc636906c9af3865a5548f27f31aee1d4c8e2", |
|
), |
|
"google/gemma3": _TokenizerConfig( |
|
tokenizer_model_url="https://raw.githubusercontent.com/google/gemma_pytorch/cb7c0152a369e43908e769eb09e1ce6043afe084/tokenizer/gemma3_cleaned_262144_v2.spiece.model", |
|
tokenizer_model_hash="1299c11d7cf632ef3b4e11937501358ada021bbdf7c47638d13c0ee982f2e79c", |
|
), |
|
} |
|
|
|
def __init__( |
|
self, model_name: str = "gemini-2.0-flash", tokenizer_dir: Optional[str] = None |
|
): |
|
|
|
if "1.5" in model_name or "1.0" in model_name: |
|
|
|
|
|
tokenizer_name = "google/gemma2" |
|
else: |
|
|
|
tokenizer_name = "google/gemma3" |
|
|
|
file_url = self._TOKENIZERS[tokenizer_name].tokenizer_model_url |
|
tokenizer_model_name = file_url.rsplit("/", 1)[1] |
|
expected_hash = self._TOKENIZERS[tokenizer_name].tokenizer_model_hash |
|
|
|
tokenizer_dir = Path(tokenizer_dir) |
|
if tokenizer_dir.is_dir(): |
|
file_path = tokenizer_dir / tokenizer_model_name |
|
model_data = self._maybe_load_from_cache( |
|
file_path=file_path, expected_hash=expected_hash |
|
) |
|
else: |
|
model_data = None |
|
if not model_data: |
|
model_data = self._load_from_url( |
|
file_url=file_url, expected_hash=expected_hash |
|
) |
|
self.save_tokenizer_to_cache(cache_path=file_path, model_data=model_data) |
|
|
|
tokenizer = spm.SentencePieceProcessor() |
|
tokenizer.LoadFromSerializedProto(model_data) |
|
super().__init__(model_name=model_name, tokenizer=tokenizer) |
|
|
|
def _is_valid_model(self, model_data: bytes, expected_hash: str) -> bool: |
|
"""Returns true if the content is valid by checking the hash.""" |
|
return hashlib.sha256(model_data).hexdigest() == expected_hash |
|
|
|
def _maybe_load_from_cache(self, file_path: Path, expected_hash: str) -> bytes: |
|
"""Loads the model data from the cache path.""" |
|
if not file_path.is_file(): |
|
return |
|
with open(file_path, "rb") as f: |
|
content = f.read() |
|
if self._is_valid_model(model_data=content, expected_hash=expected_hash): |
|
return content |
|
|
|
|
|
self._maybe_remove_file(file_path) |
|
|
|
def _load_from_url(self, file_url: str, expected_hash: str) -> bytes: |
|
"""Loads model bytes from the given file url.""" |
|
resp = requests.get(file_url) |
|
resp.raise_for_status() |
|
content = resp.content |
|
|
|
if not self._is_valid_model(model_data=content, expected_hash=expected_hash): |
|
actual_hash = hashlib.sha256(content).hexdigest() |
|
raise ValueError( |
|
f"Downloaded model file is corrupted." |
|
f" Expected hash {expected_hash}. Got file hash {actual_hash}." |
|
) |
|
return content |
|
|
|
@staticmethod |
|
def save_tokenizer_to_cache(cache_path: Path, model_data: bytes) -> None: |
|
"""Saves the model data to the cache path.""" |
|
try: |
|
if not cache_path.is_file(): |
|
cache_dir = cache_path.parent |
|
cache_dir.mkdir(parents=True, exist_ok=True) |
|
with open(cache_path, "wb") as f: |
|
f.write(model_data) |
|
except OSError: |
|
|
|
pass |
|
|
|
@staticmethod |
|
def _maybe_remove_file(file_path: Path) -> None: |
|
"""Removes the file if exists.""" |
|
if not file_path.is_file(): |
|
return |
|
try: |
|
file_path.unlink() |
|
except OSError: |
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def llm_model_func( |
|
prompt, system_prompt=None, history_messages=[], keyword_extraction=False, **kwargs |
|
) -> str: |
|
|
|
client = genai.Client(api_key=gemini_api_key) |
|
|
|
|
|
if history_messages is None: |
|
history_messages = [] |
|
|
|
combined_prompt = "" |
|
if system_prompt: |
|
combined_prompt += f"{system_prompt}\n" |
|
|
|
for msg in history_messages: |
|
|
|
combined_prompt += f"{msg['role']}: {msg['content']}\n" |
|
|
|
|
|
combined_prompt += f"user: {prompt}" |
|
|
|
|
|
response = client.models.generate_content( |
|
model="gemini-1.5-flash", |
|
contents=[combined_prompt], |
|
config=types.GenerateContentConfig(max_output_tokens=500, temperature=0.1), |
|
) |
|
|
|
|
|
return response.text |
|
|
|
|
|
async def embedding_func(texts: list[str]) -> np.ndarray: |
|
model = SentenceTransformer("all-MiniLM-L6-v2") |
|
embeddings = model.encode(texts, convert_to_numpy=True) |
|
return embeddings |
|
|
|
|
|
async def initialize_rag(): |
|
rag = LightRAG( |
|
working_dir=WORKING_DIR, |
|
|
|
tokenizer=GemmaTokenizer( |
|
tokenizer_dir=(Path(WORKING_DIR) / "vertexai_tokenizer_model"), |
|
model_name="gemini-2.0-flash", |
|
), |
|
llm_model_func=llm_model_func, |
|
embedding_func=EmbeddingFunc( |
|
embedding_dim=384, |
|
max_token_size=8192, |
|
func=embedding_func, |
|
), |
|
) |
|
|
|
await rag.initialize_storages() |
|
await initialize_pipeline_status() |
|
|
|
return rag |
|
|
|
|
|
def main(): |
|
|
|
rag = asyncio.run(initialize_rag()) |
|
file_path = "story.txt" |
|
with open(file_path, "r") as file: |
|
text = file.read() |
|
|
|
rag.insert(text) |
|
|
|
response = rag.query( |
|
query="What is the main theme of the story?", |
|
param=QueryParam(mode="hybrid", top_k=5, response_type="single line"), |
|
) |
|
|
|
print(response) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|