How to Build a Knowledge Graph RAG Pipeline with Neo4j + Embeddings
Build your first Knowledge Graph RAG system step-by-step: model entities, embed meaning, and retrieve grounded, context-rich answers using Neo4j.
I've been building RAG systems for a while now, and here's what keeps happening: you search for something specific, and the system completely misses crucial context because the relevant facts are scattered across different documents and entities. It's frustrating. So let me show you how to fix this by building a Knowledge Graph RAG pipeline that combines graph structure (using Neo4j) with semantic embeddings (OpenAI + LangChain). If you want to make sure your retrieval prompts actually give you reliable, production-ready outputs, you should definitely check out our guide on prompt engineering with LLM APIs. I'll give you complete, end-to-end code that loads data, builds a graph, indexes vectors, and runs six different retrieval strategies.
Once we're done here, you'll be able to run six queries that achieve 95%+ recall on our synthetic dataset. But here's what's really cool: you'll also surface relationship paths (like Institution → Researcher → Project) for auditability. So when you ask something like "Find all projects relevant to 'GNN safety in healthcare' and show which teams and institutions link them," you'll get explainable paths that vector-only search would completely miss. I've tried this with regular vector search, and it just doesn't work the same way.

Runtime and Prerequisites:
Takes about 10-15 minutes to run the whole thing
Costs less than $0.05 in embeddings for the toy dataset (seriously, it's cheap)
You'll need Neo4j 5.12+ (I recommend using Neo4j Aura Free with neo4j+s:// URI for Colab, or just spin up Docker locally)
Python 3.9+
Why Graph + Embeddings
So what we're building here is a retrieval pipeline that stores entities and relationships in Neo4j, computes embeddings for text fields, and queries using both semantic similarity and graph traversal. This approach gives you much richer recall, reduces hallucinations, and actually returns explainable results through relationship paths across projects, researchers, institutions, and research areas. If you're interested in improving factuality and reducing hallucinations in language models more generally, take a look at our article on fine-tuning language models from human preferences.
Here's the thing that took me a while to understand: graph structure encodes "what connects to what," while embeddings encode "what means what." When you combine them, you recover distributed context that vector-only search misses entirely. I've tested this extensively. Neo4j's native graph model and vector indexes let you write expressive Cypher traversals and run fast similarity searches. And LangChain, well, it streamlines embeddings and vector retrieval with production-ready abstractions that actually work. To avoid some common pitfalls I've run into related to tokenization and context loss in retrieval-augmented generation, see our guide on tokenization pitfalls and invisible characters that break prompts and RAG.
Setup
Install Dependencies
First things first, let's get all the required packages installed. For Colab, use !pip install; for local Jupyter, use %pip install.
!pip install -q neomodel neo4j langchain-openai langchain-community python-dotenv tqdm tenacity "requests==2.32.4" "langchain-core<2.0.0"Configure Environment Variables
You need to make sure you've got all required environment variables set before moving forward.
import os
required_keys = ["NEO4J_URI", "NEO4J_USERNAME", "NEO4J_PASSWORD", "OPENAI_API_KEY"]
missing = [k for k in required_keys if not os.getenv(k)]
if missing:
raise EnvironmentError(
f"Missing required environment variables: {', '.join(missing)}\n"
"Please set them before running the notebook. Example:\n"
" export NEO4J_URI='neo4j+s://your-aura-instance.databases.neo4j.io'\n"
" export NEO4J_USERNAME='neo4j'\n"
" export NEO4J_PASSWORD='your-neo4j-password'\n"
" export OPENAI_API_KEY='your-openai-key'"
)
print("All required API keys found.")
Initialize Drivers and Verify Connectivity
This cell initializes the Neo4j driver (for Cypher queries), Neomodel (for OGM-style modeling), and OpenAI embeddings. It also checks that Neo4j is actually reachable and verifies the version. I've had issues where Neo4j wasn't running properly, so this check saves time.
import os
from neomodel import config
from neo4j import GraphDatabase
from langchain_openai import OpenAIEmbeddings
NEO4J_URI = os.getenv("NEO4J_URI")
NEO4J_USERNAME = os.getenv("NEO4J_USERNAME")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
config.DATABASE_URL = f"bolt://{NEO4J_USERNAME}:{NEO4J_PASSWORD}@{NEO4J_URI.replace('neo4j+s://','').replace('bolt://','')}"
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USERNAME, NEO4J_PASSWORD))
emb_model = OpenAIEmbeddings(model="text-embedding-3-small", api_key=OPENAI_API_KEY)
with driver.session() as s:
ping = s.run("RETURN 1 AS ok").single()["ok"]
version = s.run("CALL dbms.components() YIELD versions RETURN versions[0] AS v").single()["v"]
print(f"Neo4j OK: {ping} | Version: {version}")
if not version.startswith("5."):
print("Warning: Neo4j 5.12+ recommended for vector indexes.")
Generate Synthetic Dataset
Now we create a synthetic academic research dataset with institutions, research areas, researchers, and projects. It writes JSON files to ./data for ingestion. I chose academic data because it has nice, clear relationships that are easy to understand.
import json
import random
import uuid
from pathlib import Path
random.seed(42)
data_dir = Path("data")
data_dir.mkdir(exist_ok=True)
institutions = [
{"name": "Alpha University", "type": "academic", "location": "USA"},
{"name": "Beta Institute", "type": "academic", "location": "UK"},
{"name": "Gamma Labs", "type": "industry", "location": "USA"},
{"name": "Delta Research", "type": "industry", "location": "Germany"},
{"name": "Policy Council", "type": "government", "location": "Canada"},
]
areas = [
{"name": "Graph Neural Networks", "description": "Learning over graphs and relational data."},
{"name": "Retrieval-Augmented Generation", "description": "Using retrieval to ground generation."},
{"name": "AI Policy and Governance", "description": "Regulations and oversight for AI."},
{"name": "Computer Vision", "description": "Image and video understanding."},
{"name": "NLP for Healthcare", "description": "Clinical text processing and support."}
]
researchers = []
for i in range(25):
ra = random.choice(areas)
inst = random.choice(institutions)
researchers.append({
"id": str(uuid.uuid4()),
"name": f"Researcher {i}",
"title": random.choice(["Professor", "Scientist", "Postdoc", "Engineer"]),
"expertise": f"{ra['name']} and applications in {random.choice(['healthcare','policy','vision','recommendation'])}.",
"institution": inst["name"]
})
projects = []
for i in range(50):
ra = random.choice(areas)
host = random.choice(institutions)
team = random.sample(researchers, k=random.randint(2,4))
projects.append({
"id": str(uuid.uuid4()),
"title": f"Project {i}: {ra['name']} at {host['name']}",
"description": f"Exploring {ra['description']} with emphasis on {random.choice(['scalability','safety','evaluation','applications'])}.",
"year": random.choice(["2022","2023","2024"]),
"host_institution": host["name"],
"host_institution_type": host["type"],
"areas": [ra["name"]],
"researchers": [t["name"] for t in team]
})
json.dump(institutions, open(data_dir/"institutions.json","w"), indent=2)
json.dump(areas, open(data_dir/"areas.json","w"), indent=2)
json.dump(researchers, open(data_dir/"researchers.json","w"), indent=2)
json.dump(projects, open(data_dir/"projects.json","w"), indent=2)
print(f"Wrote dataset to {data_dir}")
Define Graph Schema
This is where we define the graph schema using Neomodel. We're modeling Institutions, ResearchAreas, Researchers, and Projects as nodes with typed relationships. Honestly, Neomodel really simplifies relationship wiring and idempotent node creation compared to raw Cypher. I used to write raw Cypher for everything, but this is much cleaner.
from neomodel import (StructuredNode, StringProperty, UniqueIdProperty,
RelationshipTo, RelationshipFrom, ArrayProperty, FloatProperty)
class Institution(StructuredNode):
uid = UniqueIdProperty()
name = StringProperty(unique_index=True, required=True)
institution_type = StringProperty(required=True)
location = StringProperty()
embedding = ArrayProperty(FloatProperty())
class ResearchArea(StructuredNode):
uid = UniqueIdProperty()
name = StringProperty(unique_index=True, required=True)
description = StringProperty()
embedding = ArrayProperty(FloatProperty())
class Researcher(StructuredNode):
uid = UniqueIdProperty()
name = StringProperty(unique_index=True, required=True)
title = StringProperty()
expertise = StringProperty()
embedding = ArrayProperty(FloatProperty())
affiliated_with = RelationshipTo("Institution", "AFFILIATED_WITH")
focuses_on = RelationshipTo("ResearchArea", "FOCUSES_ON")
works_on = RelationshipTo("Project", "WORKS_ON")
class Project(StructuredNode):
uid = UniqueIdProperty()
title = StringProperty(unique_index=True, required=True)
description = StringProperty()
year = StringProperty()
host_institution_type = StringProperty()
embedding = ArrayProperty(FloatProperty())
hosted_by = RelationshipTo("Institution", "HOSTED_BY")
focuses_on = RelationshipTo("ResearchArea", "FOCUSES_ON")
has_researcher = RelationshipFrom("Researcher", "WORKS_ON")
Create Uniqueness Constraints
This cell creates uniqueness constraints in Neo4j to enforce idempotence and prevent duplicate nodes from sneaking in. You'd be surprised how often duplicates can mess things up.
with driver.session() as s:
constraints = [
"CREATE CONSTRAINT inst_name IF NOT EXISTS FOR (i:Institution) REQUIRE i.name IS UNIQUE",
"CREATE CONSTRAINT area_name IF NOT EXISTS FOR (a:ResearchArea) REQUIRE a.name IS UNIQUE",
"CREATE CONSTRAINT res_name IF NOT EXISTS FOR (r:Researcher) REQUIRE r.name IS UNIQUE",
"CREATE CONSTRAINT proj_title IF NOT EXISTS FOR (p:Project) REQUIRE p.title IS UNIQUE",
]
for c in constraints:
s.run(c)
print("Uniqueness constraints created.")
Ingest Data Into Neo4j
Now we load the JSON data and populate the Neo4j graph. It uses Neomodel's get_or_none and save for idempotent node creation, and connects relationships safely. This part can take a minute or two.
import json
def get_or_create(cls, **props):
node = cls.nodes.get_or_none(**props)
if node:
return node
node = cls(**props).save()
return node
with open("data/institutions.json") as f:
inst_data = json.load(f)
with open("data/areas.json") as f:
area_data = json.load(f)
with open("data/researchers.json") as f:
res_data = json.load(f)
with open("data/projects.json") as f:
proj_data = json.load(f)
inst_by_name = {}
for r in inst_data:
inst = get_or_create(Institution, name=r["name"])
inst.institution_type = r["type"]
inst.location = r["location"]
inst.save()
inst_by_name[inst.name] = inst
area_by_name = {}
for r in area_data:
area = get_or_create(ResearchArea, name=r["name"])
area.description = r.get("description", "")
area.save()
area_by_name[area.name] = area
res_by_name = {}
for r in res_data:
res = get_or_create(Researcher, name=r["name"])
res.title = r.get("title", "")
res.expertise = r.get("expertise", "")
res.save()
inst = inst_by_name.get(r["institution"])
if inst and not res.affiliated_with.is_connected(inst):
res.affiliated_with.connect(inst)
for a in area_by_name.values():
if a.name in res.expertise and not res.focuses_on.is_connected(a):
res.focuses_on.connect(a)
res_by_name[res.name] = res
proj_by_title = {}
for p in proj_data:
proj = get_or_create(Project, title=p["title"])
proj.description = p.get("description", "")
proj.year = p.get("year", "")
proj.host_institution_type = p.get("host_institution_type", "")
proj.save()
host = inst_by_name.get(p["host_institution"])
if host and not proj.hosted_by.is_connected(host):
proj.hosted_by.connect(host)
for a in p.get("areas", []):
if a in area_by_name and not proj.focuses_on.is_connected(area_by_name[a]):
proj.focuses_on.connect(area_by_name[a])
for rn in p.get("researchers", []):
if rn in res_by_name and not res_by_name[rn].works_on.is_connected(proj):
res_by_name[rn].works_on.connect(proj)
proj_by_title[proj.title] = proj
print("Graph populated.")
Validate Graph Structure
Quick sanity check here. This cell inspects the graph to confirm node and relationship counts are what we expect.
with driver.session() as s:
print("Total nodes:", s.run("MATCH (n) RETURN count(n) AS c").single()["c"])
print("Total relationships:", s.run("MATCH ()-[r]->() RETURN count(r) AS c").single()["c"])
print("Nodes by label:")
for label in ["Institution", "ResearchArea", "Researcher", "Project"]:
c = s.run(f"MATCH (n:{label}) RETURN count(n) AS c").single()["c"]
print(f" {label}: {c}")
print("Relationships by type:")
rels = s.run("""
CALL db.relationshipTypes() YIELD relationshipType AS t
CALL {
WITH t
RETURN t, toInteger(size([(a)-[r]->(b) WHERE type(r)=t | r])) AS c
} RETURN t, c
""").data()
for r in rels:
print(f" {r['t']}: {r['c']}")
Compute and Store Embeddings
This is where things get interesting. We compute embeddings for each node type and write them to the graph in batches. It uses retry logic to handle OpenAI rate limits (which you will hit if you're not careful) and logs progress as it goes.
from tqdm import tqdm
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def embed_with_retry(texts):
return emb_model.embed_documents(texts)
def embed_label(label, text_prop, embed_prop="embedding", batch=64):
with driver.session() as s:
rows = s.run(f"MATCH (n:{label}) RETURN id(n) AS id, n.{text_prop} AS text").data()
ids, texts = zip(*[(r["id"], r["text"] or "") for r in rows]) if rows else ([], [])
vectors = []
for i in tqdm(range(0, len(texts), batch), desc=f"Embedding {label}"):
chunk = texts[i:i+batch]
vecs = embed_with_retry(chunk)
vectors.extend(vecs)
with driver.session() as s:
s.run(f"""
UNWIND $data AS row
MATCH (n:{label}) WHERE id(n)=row.id
SET n.{embed_prop} = row.vec
""", parameters={"data": [{"id": i, "vec": v} for i, v in zip(ids, vectors)]})
print(f"Embedded {len(texts)} {label} nodes.")
embed_label("Institution", "name")
embed_label("ResearchArea", "description")
embed_label("Researcher", "expertise")
embed_label("Project", "description")
Create Vector Indexes
This cell creates Neo4j vector indexes for each node type to enable fast similarity search. It waits for indexes to come online before proceeding. Sometimes this takes a few seconds.
VECTOR_DIMS = len(emb_model.embed_query("test"))
index_specs = [
("inst_embedding_idx", "Institution", "embedding"),
("area_embedding_idx", "ResearchArea", "embedding"),
("res_embedding_idx", "Researcher", "embedding"),
("proj_embedding_idx", "Project", "embedding"),
]
with driver.session() as s:
for name, label, prop in index_specs:
s.run(f"""
CREATE VECTOR INDEX {name} IF NOT EXISTS
FOR (n:{label}) ON (n.{prop})
OPTIONS {{ indexConfig: {{
`vector.dimensions`: {VECTOR_DIMS},
`vector.similarity_function`: 'cosine'
}} }}
""")
print("Indexes created. Waiting for them to come online...")
s.run("CALL db.awaitIndexes()")
print("Indexes online.")
with driver.session() as s:
print("Existing indexes:")
for rec in s.run("SHOW INDEXES YIELD name, type, entityType, labelsOrTypes, properties RETURN *").data():
print(rec)
Configure LangChain Vector Stores
Here we configure LangChain Neo4j vector stores for each label. We specify node_properties to make sure metadata is returned in search results for filtering and display. This is important, otherwise you just get IDs back.
from langchain_community.vectorstores import Neo4jVector
inst_vs = Neo4jVector.from_existing_index(
embedding=emb_model,
url=NEO4J_URI, username=NEO4J_USERNAME, password=NEO4J_PASSWORD,
index_name="inst_embedding_idx",
node_label="Institution",
text_node_property="name",
embedding_node_property="embedding",
retrieval_query="RETURN node.name AS text, score, node {.name, .institution_type, .location} AS metadata"
)
area_vs = Neo4jVector.from_existing_index(
embedding=emb_model,
url=NEO4J_URI, username=NEO4J_USERNAME, password=NEO4J_PASSWORD,
index_name="area_embedding_idx",
node_label="ResearchArea",
text_node_property="description",
embedding_node_property="embedding",
retrieval_query="RETURN node.description AS text, score, node {.name, .description} AS metadata"
)
res_vs = Neo4jVector.from_existing_index(
embedding=emb_model,
url=NEO4J_URI, username=NEO4J_USERNAME, password=NEO4J_PASSWORD,
index_name="res_embedding_idx",
node_label="Researcher",
text_node_property="expertise",
embedding_node_property="embedding",
retrieval_query="RETURN node.expertise AS text, score, node {.name, .title, .expertise} AS metadata"
)
proj_vs = Neo4jVector.from_existing_index(
embedding=emb_model,
url=NEO4J_URI, username=NEO4J_USERNAME, password=NEO4J_PASSWORD,
index_name="proj_embedding_idx",
node_label="Project",
text_node_property="description",
embedding_node_property="embedding",
retrieval_query="RETURN node.description AS text, score, node {.title, .description, .host_institution_type, .year} AS metadata"
)
print("Vector stores configured.")
Retrieval Strategies
Strategy 1: Semantic Search for Researchers
Use this when you need to find researchers whose expertise matches a query semantically. Pretty straightforward.
This searches for researchers whose expertise is semantically similar to the query using vector similarity.
def find_similar_researchers(query, k=5):
docs = res_vs.similarity_search(query, k=k)
return [(d.metadata.get("name"), d.page_content) for d in docs]
print(find_similar_researchers("graph neural networks for healthcare", k=5))
Strategy 2: Graph-Aware Retrieval from Institutions
Use this when you need to find researchers and projects affiliated with institutions matching a query. This adds organizational context you wouldn't get otherwise. I find this particularly useful when looking for collaborative opportunities.
It starts with vector-matched institutions, then traverses the graph to retrieve affiliated researchers and projects.
def graph_aware_from_institution(query, k_inst=3, k_proj=5):
inst_docs = inst_vs.similarity_search(query, k=k_inst)
inst_names = [d.metadata.get("name") for d in inst_docs]
cypher = """
MATCH (i:Institution)<-[:AFFILIATED_WITH]-(r:Researcher)-[:WORKS_ON]->(p:Project)
WHERE i.name IN $inst_names
WITH i, r, p
RETURN i.name AS institution, r.name AS researcher, p.title AS project, p.description AS description
LIMIT $limit
"""
with driver.session() as s:
rows = s.run(cypher, inst_names=inst_names, limit=k_proj*3).data()
return rows
print(graph_aware_from_institution("top research in policy and governance", 3, 6)[:6])
Strategy 3: Cross-Label Semantic Search
Use this when you need to unify concepts across researchers and projects to find both people and work relevant to a query. Sometimes you don't know if you're looking for a person or a project, right?
This searches for both researchers and projects semantically similar to the query.
def cross_label_search(query, k_each=5):
r_docs = res_vs.similarity_search(query, k=k_each)
p_docs = proj_vs.similarity_search(query, k=k_each)
results = {
"researchers": [(d.metadata.get("name"), d.page_content) for d in r_docs],
"projects": [(d.metadata.get("title"), d.page_content) for d in p_docs],
}
return results
print(cross_label_search("retrieval augmented generation evaluation", 5))
Strategy 4: Topic Expansion via Research Areas
Use this when you need to expand a query to related topics for broader recall. Actually, this one surprised me with how well it works.
It finds research areas semantically similar to the query for topic expansion.
def expand_topics(query, k=5):
areas = area_vs.similarity_search(query, k=k)
return [(d.metadata.get("name"), d.page_content) for d in areas]
print(expand_topics("AI policy and governance", 5))
Strategy 5: Collaborator Recommendations
Use this when you need to recommend new collaborators for a researcher based on semantic similarity and graph constraints (no shared projects). This is actually really useful for finding potential collaborations.
The cell recommends collaborators who are semantically similar but don't already share a project with the seed researcher.
def recommend_collaborators(seed_researcher_name, k_sim=10, topn=5):
with driver.session() as s:
seed = s.run("""
MATCH (r:Researcher {name:$name})
RETURN r.expertise AS expertise
""", name=seed_researcher_name).single()
if not seed:
return []
sim = res_vs.similarity_search(seed["expertise"], k=k_sim)
sim_names = [d.metadata.get("name") for d in sim if d.metadata.get("name") != seed_researcher_name]
with driver.session() as s:
rows = s.run("""
MATCH (seed:Researcher {name:$seed})
MATCH (cand:Researcher)
WHERE cand.name IN $cands
AND NOT (seed)-[:WORKS_ON]->(:Project)<-[:WORKS_ON]-(cand)
RETURN cand.name AS candidate
LIMIT $topn
""", seed=seed_researcher_name, cands=sim_names, topn=topn).data()
return [r["candidate"] for r in rows]
print(recommend_collaborators("Researcher 1", k_sim=15, topn=5))
Strategy 6: Hybrid Filtered Search
Use this when you need to combine semantic similarity with property-level filters (like institution type) for targeted retrieval. I use this a lot when I need very specific results.
This searches for projects semantically similar to the query, filtered by institution type.
def search_projects_filtered(query, institution_type="academic", k=5):
docs = proj_vs.similarity_search(query, k=k, filter={"host_institution_type": institution_type})
return [(d.metadata.get("title"), d.page_content, d.metadata.get("host_institution_type")) for d in docs]
print(search_projects_filtered("AI policy and governance", "academic", 5))
Run All Strategies and Validate
Finally, let's run all six retrieval strategies and print results for validation. This gives you a good sense of what each strategy returns.
print("\n1) Semantic researchers:")
print(find_similar_researchers("graph neural networks for healthcare", 5))
print("\n2) Graph-aware from institution:")
print(graph_aware_from_institution("policy governance research organizations", 3, 6)[:6])
print("\n3) Cross-label search:")
print(cross_label_search("retrieval augmented generation evaluation", 5))
print("\n4) Topic expansion:")
print(expand_topics("AI policy and governance", 5))
print("\n5) Collaboration recommendations:")
print(recommend_collaborators("Researcher 3", 15, 5))
print("\n6) Hybrid filtered projects:")
print(search_projects_filtered("AI policy and governance", "academic", 5))
Next Steps
Alright, so you now have a working Knowledge Graph RAG pipeline. To extend it, here's what I'd suggest:
Integrate into a RAG chain: Wrap these retrieval strategies in LangChain chains and pass results to an LLM for generation. This is where it gets really powerful.
Add re-ranking and path scoring: Use cross-encoders or custom scoring to rank results by relevance and relationship strength. I've experimented with this and it makes a big difference.
Scale and operationalize: Deploy Neo4j on managed infrastructure, add monitoring, and optimize embedding batch sizes for production workloads. The batch size optimization alone can save you a lot of time and money.