How to build an effective knowledge base for AI models.

A detailed guide on how to build a knowledge base for AI, from data collection and vector databases to optimizing retrieval and scalability.

AI is only powerful when the underlying knowledge base is strong enough. A well-built knowledge base not only helps the model respond more accurately but also significantly improves response speed—two weaknesses that many current AIs still face. According to a recent study, many large AI chatbots still answer nearly half of the user queries incorrectly.

Therefore, building a knowledge base is no longer an 'auxiliary' component, but almost a decisive factor in determining the quality of the entire AI system.

1. Start with the right data, not just a lot of data.

One of the most common mistakes when building a knowledge base is assuming that the more data, the smarter the AI ​​will be. In reality, this easily leads to a "garbage in, garbage out" situation — poor quality data will produce poor quality results.

The important thing is not the quantity, but the relevance of the data to the system's goals. A good knowledge base usually focuses only on the content that the AI ​​actually needs to provide correct answers.

For example, if you're building a customer support chatbot, the system might only need company policy documents, troubleshooting procedures, or product user manuals. This prevents the AI ​​from 'inventing' additional information beyond its permitted scope.

Note that there is currently a trend of using AI-generated data to build knowledge bases for other AI systems. This method speeds up development significantly, but it also carries risks because the content may contain errors, redundant information, or overly lengthy explanations. Therefore, all AI-generated data should be thoroughly reviewed before being fed into the system.

2. Cleaning and splitting the data is an extremely important step.

After data collection, the next step is content cleaning. This process typically involves removing duplicate data, eliminating outdated information, and standardizing terminology and formatting to ensure consistency across the entire knowledge base.

The data will then be divided into smaller 'chunks'. Each chunk should contain only one clear idea or topic to make it easier for the AI ​​to search and retrieve information.

You should divide chunks based on actual user questions rather than traditional document structures. For example, instead of dividing it into 'Account Management Chapter', you could break it down into sections like:
'How do I change my password?' or 'What is the password policy?'.

This approach allows AI to respond much more closely to the real needs of users.

3. Metadata and vectorization help AI 'understand' data faster.

After the data is fragmented, each chunk is typically assigned metadata such as data source, topic, update date, or access permissions. Metadata helps the system filter and find the right content faster instead of having to scan the entire knowledge base.

Next, the text will be converted into a vector using an embedding model such as OpenAI v3-Large or BGE-M3. This is a crucial step because AI processes vectors much faster than raw text.

A complete chunk will typically include:

  1. Vector embedding
  2. Original content
  3. Metadata included

This is also the foundation of most current RAG systems.

4. Choose the right database vector and optimize retrieval.

After vectorization, the data is typically stored in vector databases such as Pinecone, Milvus, or Weaviate. These systems are specifically designed for semantically retrieving vectors. You can upload vector data by writing a simple Python code snippet.

 import math import time import json from dataclasses import dataclass, field from typing import Any import numpy as np # Vector Normalization + Metadata def normalize_l2(vector: list[float]) -> list[float]: """ Return an L2-normalized copy of `vec`. Many vector stores use dot-product similarity. If you normalize vectors to unit length, dot-product becomes equivalent to cosine similarity. """ arr = np.array(vector, dtype=np.float32) norm = np.linalg.norm(arr) if norm == 0: return vector return (arr / norm).tolist() def prepare_record( doc_id: str, embedding: list[float], text: str, source: str, extra_metadata: dict[str, Any] | None = None, ) -> dict: """ Prepare a single record for vector DB upsert. Metadata serves two purposes: - Filtering: narrow down search to a subset """ metadata = { "source": source, "text_preview": text[:500], "char_count": len(text), } if extra_metadata: metadata.update(extra_metadata) return { "id": doc_id, "values": normalize_l2(embedding), "metadata": metadata, } # Vector Quantization # Scalar Quantization / SQ def scalar_quantization(input_vec) -> dict: """ This funtion demonstrates how to compress float32 input_vec to uint8 """ input_arr = np.array(input_vec, dtype=np.float32) min, max = input_arr.min(), input_arr.max() range = (max - min) if range == 0: quantized = np.zeros_like(arr, dtype=np.uint8) else: quantized = ((input_arr - min) / range * 255).astype(np.uint8) return { "quantized": quantized.tolist(), "min": float(min), "max": float(max), } def scalar_dequantization(record: dict) -> list[float]: """ You can Reconstruct the original vector by approximate float32 vector from uint8. """ arr = np.array(record["quantized"], dtype=np.float32) return (arr / 255 * (record["max"] - record["min"]) + record["min"]).tolist() # Product Quantization / PQ def train_product_quantizer( vectors, num_subvectors: int = 8, num_centroids: int = 256, max_iterations: int = 20) -> list: """ This function demonstrates split vector into subvectors, cluster each independently """ from sklearn.cluster import KMeans dim = vectors.shape[1] assert dim % num_subvectors == 0, "dim must be divisible by num_subvectors" sub_dim = dim // num_subvectors codebooks = [] for i in range(num_subvectors): sub_vectors = vectors[:, i * sub_dim : (i + 1) * sub_dim] kmeans = KMeans(n_clusters=num_centroids, max_iter=max_iterations, n_init=1) kmeans.fit(sub_vectors) codebooks.append(kmeans.cluster_centers_) return codebooks def pq_encode(vector: np.ndarray, codebooks: list[np.ndarray]) -> list[int]: """ Encode a single vector into PQ codes (one uint8 per subvector) """ num_subvectors = len(codebooks) sub_dim = len(vector) // num_subvectors codes = [] for i, codebook in enumerate(codebooks): sub_vec = vector[i * sub_dim : (i + 1) * sub_dim] distances = np.linalg.norm(codebook - sub_vec, axis=1) codes.append(int(np.argmin(distances))) return codes def pq_decode(codes: list[int], codebooks: list[np.ndarray]) -> np.ndarray: """ Reconstruct approximate vector from PQ codes """ return np.concatenate( [codebook[code] for code, codebook in zip(codes, codebooks)] )

Many developers often focus solely on 'making it work' while neglecting to optimize retrieval. In reality, users not only want AI to answer correctly but also to respond almost instantly.

To retrieve data from a vector database, you can use orchestration frameworks such as LlamaIndex and LangChain.

LlamaIndex can traverse vector databases faster and find the exact segment of data containing content relevant to the user's query.

LangChain then extracts data from that segment and transforms it according to the user's query. For example, it can summarize the text or write an email from that data.

""" Hybrid Retrieval: Take benefits from both keyword search and vector similarity Where each approach shines: - Keywords: looks for exact matches, but will miss searches with synonym - Embeddings: has advantage of capturing the meaning, but there is possibility of missing exact keyword Hybrid is a combination of both to get the best of each. """ import math from collections import defaultdict from dataclasses import dataclass import numpy as np @dataclass class Document: id: str text: str embedding: list[float] class BestMatching25Index: def __init__(self, k1: float = 1.5, b: float = 0.75): # Here k1 is the term frequency saturation limit # and b is length of normalization self.k1 = k1 self.b = b self.doc_lengths: dict[str, int] = {} self.avg_doc_length: float = 0 self.doc_freqs: dict[str, int] = {} self.term_freqs: dict[str, dict[str, int]] = {} self.corpus_size: int = 0 def _tokenize(self, text: str) -> list[str]: return text.lower().split() def index(self, documents: list[Document]) -> None: self.corpus_size = len(documents) for doc in documents: tokens = self._tokenize(doc.text) self.doc_lengths[doc.id] = len(tokens) self.term_freqs[doc.id] = {} seen_terms: set[str] = set() for token in tokens: self.term_freqs[doc.id][token] = self.term_freqs[doc.id].get(token, 0) + 1 if token not in seen_terms: self.doc_freqs[token] = self.doc_freqs.get(token, 0) + 1 seen_terms.add(token) self.avg_doc_length = sum(self.doc_lengths.values()) / self.corpus_size def score(self, query: str, doc_id: str) -> float: query_terms = self._tokenize(query) doc_len = self.doc_lengths[doc_id] score = 0.0 for term in query_terms: if term not in self.doc_freqs or term not in self.term_freqs.get(doc_id, {}): continue tf = self.term_freqs[doc_id][term] df = self.doc_freqs[term] idf = math.log((self.corpus_size - df + 0.5) / (df + 0.5) + 1) tf_norm = (tf * (self.k1 + 1)) / ( tf + self.k1 * (1 - self.b + self.b * doc_len / self.avg_doc_length) ) score += idf * tf_norm return score def search(self, query: str, top_k: int = 10) -> list[tuple[str, float]]: scores = [ (doc_id, self.score(query, doc_id)) for doc_id in self.doc_lengths ] scores.sort(key=lambda x: x[1], reverse=True) return scores[:top_k] class VectorIndex: """This class implements the smart search using the hybrid search. The index function normalize and stores the document search implements a cosine similarity search hybrid_search_weighted merges BM25 index and vector index using weighted average Reciprocal_rank_fusion Combines the results in an efficient way """ def __init__(self): self.documents: dict[str, np.ndarray] = {} def index(self, documents: list[Document]) -> None: for doc in documents: arr = np.array(doc.embedding, dtype=np.float32) norm = np.linalg.norm(arr) self.documents[doc.id] = arr / norm if norm > 0 else arr def search(self, query_embedding: list[float], top_k: int = 10) -> list[tuple[str, float]]: q = np.array(query_embedding, dtype=np.float32) q = q / np.linalg.norm(q) scores = [ (doc_id, float(np.dot(q, emb))) for doc_id, emb in self.documents.items() ] scores.sort(key=lambda x: x[1], reverse=True) return scores[:top_k] def hybrid_search_weighted( query: str, query_embedding: list[float], bm25_index: BestMatching25Index, vector_index: VectorIndex, alpha: float = 0.5, top_k: int = 10, ) -> list[dict]: """Combine keyword and vector scores with a tunable weight. alpha = 1.0 → pure vector search alpha = 0.0 → pure keyword search alpha = 0.5 → equal weight (good starting point) """ keyword_results = bm25_index.search(query, top_k=top_k * 2) vector_results = vector_index.search(query_embedding, top_k=top_k * 2) # Normalize (min-max) each score list to [0, 1] def normalize_scores(results: list[tuple[str, float]]) -> dict[str, float]: if not results: return {} scores = [s for _, s in results] min_s, max_s = min(scores), max(scores) rng = max_s - min_s if rng == 0: return {doc_id: 1.0 for doc_id, _ in results} return {doc_id: (s - min_s) / rng for doc_id, s in results} keyword_scores = normalize_scores(keyword_results) vector_scores = normalize_scores(vector_results) # Merge all_doc_ids = set(keyword_scores) | set(vector_scores) combined = [] for doc_id in all_doc_ids: ks = keyword_scores.get(doc_id, 0.0) vs = vector_scores.get(doc_id, 0.0) combined.append({ "id": doc_id, "score": alpha * vs + (1 - alpha) * ks, "keyword_score": ks, "vector_score": vs, }) combined.sort(key=lambda x: x["score"], reverse=True) return combined[:top_k] def reciprocal_rank_fusion( *ranked_lists: list[tuple[str, float]], k: int = 60, top_n: int = 10, ) -> list[dict]: """ Merge multiple ranked lists, uses RRF (Reciprocal Rank Fusion) RRF score = sum over all lists of: 1 / (k + rank) Why RRF over weighted combination? - No score normalization needed (works on ranks, not raw scores) - No alpha tuning needed - Robust across different score distributions - Used by Elasticsearch, Pinecone, Weaviate under the hood """ rrf_scores: dict[str, float] = defaultdict(float) doc_details: dict[str, dict] = {} for list_idx, ranked_list in enumerate(ranked_lists): for rank, (doc_id, raw_score) in enumerate(ranked_list, start=1): rrf_scores[doc_id] += 1.0 / (k + rank) if doc_id not in doc_details: doc_details[doc_id] = {} doc_details[doc_id][f"list_{list_idx}_rank"] = rank doc_details[doc_id][f"list_{list_idx}_score"] = raw_score results = [] for doc_id, rrf_score in rrf_scores.items(): results.append({ "id": doc_id, "rrf_score": round(rrf_score, 6), **doc_details[doc_id], }) results.sort(key=lambda x: x["rrf_score"], reverse=True) return results[:top_n] def hybrid_search_rrf( query: str, query_embedding: list[float], bm25_index: BestMatching25Index, vector_index: VectorIndex, top_k: int = 10, ) -> list[dict]: keyword_results = bm25_index.search(query, top_k=top_k * 2) vector_results = vector_index.search(query_embedding, top_k=top_k * 2) return reciprocal_rank_fusion(keyword_results, vector_results, top_n=top_k)

One of the most effective retrieval methods currently available is hybrid retrieval—a combination of keyword search and semantic vector search. Keyword search is strong for exact queries like 'password policy', while embedding search excels at understanding the meaning and context of the question.

When both are combined, the system becomes both much more accurate and flexible. Frameworks like LlamaIndex and LangChain are now popular choices for building retrieval pipelines in this way.

5. The knowledge base must be continuously updated.

A good knowledge base isn't something you build and then leave unfinished.

Over time, data can become outdated, policies can change, or embedding models can be updated. Without regular refreshes, AI will begin to provide inaccurate responses.

There's a concept called selective forgetting—actively deleting or updating data that is no longer relevant. Tools like DeepEval or TruLens can help monitor retrieval quality and identify which chunks are causing incorrect answers.

 """ Knowledge Base Quality Monitoring Knowledge base health with the help of automated checks: 1. Retrieval quality — is it finding the right documents? 2. Freshness detection — Are documents stale or embeddings drifting? 3. Unified pipeline — Scheduled monitoring with alerts """ import time import json import logging from datetime import datetime, timedelta from dataclasses import dataclass, field from typing import Any, Callable import numpy as np logging.basicConfig(level=logging.INFO) logger = logging.getLogger("kb_monitor") def setup_deepeval_metrics(): """Define retrieval quality metrics using DeepEval. DeepEval provides LLM-evaluated metrics — it uses a judge LLM to score whether retrieved context actually helps answer the question. """ from deepeval.metrics import ( AnswerRelevancyMetric, FaithfulnessMetric, ContextualPrecisionMetric, ContextualRecallMetric, ) from deepeval.test_case import LLMTestCase metrics = { # Does the answer address the question? "relevancy": AnswerRelevancyMetric(threshold=0.7), # Is the answer grounded in the retrieved context (no hallucination)? "faithfulness": FaithfulnessMetric(threshold=0.7), # Are the top-ranked retrieved docs actually relevant? "context_precision": ContextualPrecisionMetric(threshold=0.7), # Did we retrieve all the docs needed to answer? "context_recall": ContextualRecallMetric(threshold=0.7), } return metrics, LLMTestCase def evaluate_retrieval_quality( rag_pipeline: Callable, test_cases: list[dict], ) -> list[dict]: """Run a set of test queries through your RAG pipeline and score them. Each test case should have: - query: the user question - expected_answer: ground truth answer (for recall/relevancy) """ from deepeval import evaluate from deepeval.test_case import LLMTestCase from deepeval.metrics import ( AnswerRelevancyMetric, FaithfulnessMetric, ContextualPrecisionMetric, ContextualRecallMetric, ) results = [] for tc in test_cases: # Run your actual RAG pipeline response = rag_pipeline(tc["query"]) test_case = LLMTestCase( input=tc["query"], actual_output=response["answer"], expected_output=tc["expected_answer"], retrieval_context=response["retrieved_contexts"], ) metrics = [ AnswerRelevancyMetric(threshold=0.7), FaithfulnessMetric(threshold=0.7), ContextualPrecisionMetric(threshold=0.7), ContextualRecallMetric(threshold=0.7), ] for metric in metrics: metric.measure(test_case) results.append({ "query": tc["query"], "scores": {m.__class__.__name__: m.score for m in metrics}, "passed": all(m.is_successful() for m in metrics), }) return results def setup_trulens_monitoring(rag_pipeline: Callable, app_name: str = "my_kb"): """Wrap your RAG pipeline with TruLens for continuous feedback logging. TruLens records every query + response + retrieved context, then runs feedback functions asynchronously to score each interaction. """ from trulens.core import TruSession, Feedback, Select from trulens.providers.openai import OpenAI as TruLensOpenAI from trulens.apps.custom import TruCustomApp, instrument session = TruSession() # Feedback provider (uses an LLM to judge quality) provider = TruLensOpenAI() feedbacks = [ # Is the response relevant to the query? Feedback(provider.relevance) .on_input() .on_output(), # Is the response grounded in retrieved context? Feedback(provider.groundedness_measure_with_cot_reasons) .on(Select.RecordCalls.retrieve.rets) .on_output(), # Is the retrieved context relevant to the query? Feedback(provider.context_relevance) .on_input() .on(Select.RecordCalls.retrieve.rets), ] # Wrap your pipeline — every call is now logged and scored @instrument class InstrumentedRAG: def __init__(self, pipeline): self._pipeline = pipeline @instrument def retrieve(self, query: str) -> list[str]: result = self._pipeline(query) return result["retrieved_contexts"] @instrument def query(self, query: str) -> str: result = self._pipeline(query) return result["answer"] instrumented = InstrumentedRAG(rag_pipeline) tru_app = TruCustomApp( instrumented, app_name=app_name, feedbacks=feedbacks, ) return tru_app, session def get_trulens_dashboard_url(session) -> str: """Launch the TruLens dashboard to visualize quality over time.""" session.run_dashboard(port=8501) return "http://localhost:8501" @dataclass class DocumentFreshness: doc_id: str last_updated: datetime last_embedded: datetime source_hash: str # hash of source content at embedding time class FreshnessMonitor: """Detect stale documents and embedding drift.""" def __init__(self, staleness_threshold_days: int = 30): self.threshold = timedelta(days=staleness_threshold_days) self.freshness_records: dict[str, DocumentFreshness] = {} def register(self, doc_id: str, source_hash: str) -> None: now = datetime.utcnow() self.freshness_records[doc_id] = DocumentFreshness( doc_id=doc_id, last_updated=now, last_embedded=now, source_hash=source_hash, ) def check_staleness(self) -> dict: """Find documents that haven't been re-embedded recently.""" now = datetime.utcnow() stale, fresh = [], [] for doc_id, record in self.freshness_records.items(): age = now - record.last_embedded if age > self.threshold: stale.append({"id": doc_id, "days_stale": age.days}) else: fresh.append(doc_id) return { "total": len(self.freshness_records), "fresh": len(fresh), "stale": len(stale), "stale_documents": stale, } def check_content_drift( self, doc_id: str, current_source_hash: str ) -> bool: """Check if source content changed since last embedding.""" record = self.freshness_records.get(doc_id) if not record: return True # unknown doc, treat as drifted return record.source_hash != current_source_hash def detect_embedding_drift( old_embeddings: dict[str, list[float]], new_embeddings: dict[str, list[float]], drift_threshold: float = 0.1, ) -> dict: """Compare old vs new embeddings for the same documents. If your embedding model gets updated (or you switch models), existing vectors may no longer be compatible. This detects that. """ drifted = [] common_ids = set(old_embeddings) & set(new_embeddings) for doc_id in common_ids: old = np.array(old_embeddings[doc_id]) new = np.array(new_embeddings[doc_id]) # cosine distance: 0 = identical, 2 = opposite cos_sim = np.dot(old, new) / (np.linalg.norm(old) * np.linalg.norm(new)) cos_dist = 1 - cos_sim if cos_dist > drift_threshold: drifted.append({ "id": doc_id, "cosine_distance": round(float(cos_dist), 4), }) return { "documents_compared": len(common_ids), "drifted": len(drifted), "drift_threshold": drift_threshold, "drifted_documents": sorted(drifted, key=lambda x: x["cosine_distance"], reverse=True), }

6. The three biggest problems when building a knowledge base

The most common problem is poor data quality. This is also the reason why AI hallucinates. A famous example is Air Canada's chatbot, which once fabricated a non-existent refund policy.

Another issue is slow retrieval. Many AI systems respond correctly but are too laggy because developers haven't optimized the index or vector storage. The author recommends using HNSW or IVF indexing instead of flat indexing to speed up retrieval.

Furthermore, scalability is also a major challenge. Many teams initially choose a monolithic architecture for rapid deployment, but when the number of queries increases sharply, the CPU and RAM become overloaded. According to the author, horizontal sharding is a more suitable approach for scaling the knowledge base in the long term.

7. A knowledge base is not a place to 'dump data'.

Finally, it's important to note that a knowledge base isn't a place to just throw all your data in and expect AI to figure everything out on its own. It's an asset that needs continuous curation and optimization.

You should start with small tasks, such as focusing on only the 10 most common questions first. Only after the AI ​​answers consistently and accurately should you expand the system. The difference between an AI that 'guesses' and an AI that 'truly knows' lies in this deliberate process of curating the data.

You've just finished reading the article "How to build an effective knowledge base for AI models." edited by the TipsMake team. You can save how-to-build-an-effective-knowledge-base-for-ai-models.pdf to your computer here to read later or print it out. We hope this article has provided you with many useful tech tips and tricks. You can search for similar articles on tips and guides. Thank you for reading and for following us regularly.

« PREV How to edit images using AI in Messenger
NEXT » What is AI Slop and why is the internet flooded with AI-generated junk content?