Retrieval Augmented Generation (RAG)
RAG (Retrieval Augmented Generation)
nAI'vi utilise le RAG pour répondre aux questions en s'appuyant sur des chunks et exemples Q&A stockés en PostgreSQL. Les index FAISS sont persistés en base par rôle et rechargés en mémoire à la demande.
Modèles
| Usage | Modèle | Variable d'env |
|---|---|---|
| Embeddings | mistral-embed | RAG_EMBEDDING_MODEL_ID |
| Génération (chat) | mistral-medium-latest | RAG_QUERY_MODEL_ID |
Paramètres RAG
Tous les paramètres sont configurables via variables d'environnement (voir config/rag_config.py):
| Paramètre | Variable d'env | Default | Range | Description |
|---|---|---|---|---|
k | RAG_K_CHUNKS | 5 | 1–20 | Nombre de chunks injectés dans le prompt |
n | RAG_N_EXAMPLES | 4 | 0–10 | Nombre d'exemples Q&A few-shot |
temperature | RAG_TEMP | 0.2 | — | Température LLM pour la génération |
batch_size | RAG_BATCH_SIZE | 64 | — | Taille des lots pour le calcul d'embeddings |
few_shots_enabled | FEW_SHOTS_ENABLED | true | — | Activer/désactiver les exemples few-shot |
Tables utilisées
| Table | Usage |
|---|---|
chunks | Chunks category = 'dynamic' → inclus dans le RAG |
qa_pair | category = 'static' → few-shot examples; category = 'dynamic' → inclus dans le RAG |
indexes | Index FAISS sérialisés (bytea), un par rôle |
index_chunks | Mapping index ↔ chunk_id (chunks + qa_pairs) |
roles | Rôles applicatifs — détermine quel index charger |
role_tags | Tags associés à un rôle — filtre les chunks inclus dans l'index |
item_tags | Tags attachés aux chunks/qa_pairs |
Index FAISS per-role
Chaque rôle applicatif dispose d'un index FAISS dédié. Les chunks inclus dans cet index sont ceux dont les tags sont accessibles pour le rôle (via la hiérarchie role_tags).
Génération d'un index (regenerate_index)
apps/chatbot/src/chatbot/rag/indexing.py — regenerate_index(client, db_conn):
- Récupère tous les rôles depuis la table
roles. - Pour chaque rôle:
a. Identifie les tag_ids accessibles (hiérarchie
display_order). b. Récupère les chunks/qa_pairs avec ces tags (+ embeddings si existants). c. Calcule les embeddings manquants par lots (BATCH_SIZE=64) via Mistralmistral-embed. d. Sauvegarde les nouveaux embeddings en DB (save_chunk_embedding_to_db). e. Construit un indexIndexFlatL2FAISS. f. Sérialise l'index (pickle) et le stocke dans la tableindexes(marquéis_active=true). g. Enregistre leschunk_iddansindex_chunks. - Retourne
{ "success": True, "message": "..." }.
Déclenché par: POST /regenerate-index sur le service Index API, appelé via la procédure ORPC regenerateIndex (admin).
Chargement d'un index (init_rag)
apps/chatbot/src/chatbot/rag/services.py — init_rag(client, db_conn, role_id):
- Vérifie le cache mémoire
ROLE_INDEX_CACHE[role_id]. - Si cache présent: compare
db_index_idavec le dernier index actif en DB (get_latest_index_from_db).- Si plus récent: recharge depuis DB (invalidation du cache).
- Sinon: retourne le cache (cache hit).
- Si pas de cache (cold start): charge depuis DB, peuple le cache.
- Fallback sur l'index global legacy si aucun index per-role n'existe.
Fonctions clés
prepare_rag_context
apps/chatbot/src/chatbot/rag/services.py
async def prepare_rag_context(
client: Mistral,
db_conn: psycopg2.extensions.connection,
question: str,
k: int = 5,
n: int = 2,
user_id: Optional[str] = None
) -> tuple[str, str, list] | tuple[None, None, None]:Args:
client— client Mistral initialisédb_conn— connexion PostgreSQLquestion— question de l'utilisateurk— nombre de chunks à récupérer (default:DEFAULT_K_CHUNKS)n— nombre d'exemples few-shot (default:DEFAULT_N_EXAMPLES)user_id— identifiant utilisateur (pour résolution du rôle)
Returns: (context_text, few_shots_text, sources) ou (None, None, None) en cas d'erreur.
context_text— chunks concaténés séparés par\n---------------------\nfew_shots_text— exemples Q&A formatés"question : answer\n"sources— liste[{chunk_id, file_id, file_name, table_source}]
answer_question
apps/chatbot/src/chatbot/rag/services.py
async def answer_question(
client: Mistral,
db_conn: psycopg2.extensions.connection,
question: str,
model_id: str = QUERY_MODEL_ID,
k: int = 5,
n: int = 2,
conversation_history: Optional[List[Dict[str, str]]] = None,
user_id: Optional[str] = None
) -> AsyncGenerator[str, None]:Yields: tokens de la réponse Mistral en streaming, puis __SOURCES__:{json}.
run_query_mistral
async def run_query_mistral(
client: Mistral,
prompt: str,
model_id: str,
conversation_history: Optional[List[Dict[str, str]]] = None
) -> AsyncGenerator[str, None]:Construit la liste de messages (historique + prompt courant) et appelle client.chat.stream_async() avec temperature=DEFAULT_RAG_TEMP. Yields les delta de contenu.
create_prompt
def create_prompt(
question: str,
retrieved_chunks: List[str],
retrieved_examples: List[Dict[str, Any]]
) -> str:Formate le template BOT_SYSTEM_PROMPT (env var) avec:
{context}— chunks séparés par\n---------------------\n{few_shots_examples}— exemples Q&A formatés{question}— question de l'utilisateur
get_text_embedding
apps/chatbot/src/chatbot/rag/indexing.py
def get_text_embedding(
client: Mistral,
input_chunks: List[str]
) -> List[List[float]]:Appelle client.embeddings.create(model="mistral-embed", inputs=input_chunks). Gère automatiquement la limite de tokens via splitting récursif des batches. Retourne une liste d'embeddings (vecteurs float).
Flux complet per-question
question + user_id
→ get_user_role(db_conn, user_id) → role_id
→ init_rag(client, db_conn, role_id)
→ ROLE_INDEX_CACHE[role_id] ou chargement depuis DB
→ get_text_embedding(client, [question]) → q_vector (1024-dim)
→ index.search(q_vector, k_search=min(len(chunks), k))
→ indices filtrés (idx != -1, chunk non null)
→ retrieved_chunks[0..k-1], sources[0..k-1]
→ get_few_shots_examples(db_conn, limit=n) → exemples statiques aléatoires
→ create_prompt(question, chunks, examples) → prompt formaté
→ run_query_mistral(client, prompt, QUERY_MODEL_ID, history) → stream tokens
→ yield tokens
→ yield "__SOURCES__:{json}"
Kalli