Improvements to the Retrieval Agent
1. Creating indexes from pre-crawled sites
Depending on the size of a site, crawling may be a long running process. In such cases our indexing process may timeout. We may want to run crawling actors independently (i.e., outside of our Index Graph)
To accommodate such cases, I added a new indexing configuration apify_dataset_id
which may be used to directly load a pre-crawled dataset. Apify stores the outcome of a crawl, and allows loading the dataset directly to a langchain.core.Document
loader.
Apify dataset storage
Additional indexing configuration
# same as before
@dataclass(kw_only=True)
class IndexConfiguration(CommonConfiguration):
# new configuration for specifying pre-crawled Apify dataset id
apify_dataset_id: str = field(
default="",
metadata={
"description": "Pre-crawled Apify dataset ID"
},
)
# same as before
Modifications to index_graph for directly building an index from pre-crawled dataset.
# same as before
from langchain_community.document_loaders import ApifyDatasetLoader
# same as before
def apify_crawl(configuration: IndexConfiguration):
# same as before
# changes to accommodate loading from pre-crawled datasets
dataset_id = configuration.apify_dataset_id
if dataset_id:
loader = ApifyDatasetLoader(
dataset_id=dataset_id,
dataset_mapping_function=lambda item: Document(
page_content=item.get('text') or "", metadata={"url": item["url"]}
),
)
else:
# same as before: use the apify actor to crawl
return loader.load()
# same as before
async def index_docs(
state: IndexState, *, config: Optional[RunnableConfig] = None
) -> dict[str, str]:
if not config:
raise ValueError("Configuration required to run index_docs.")
with retrieval.make_retriever(config) as retriever:
configuration = IndexConfiguration.from_runnable_config(config)
# either starter_urls, or apify_dataset_id of pre-crawled should be able to trigger
# crawling / loading the documents
if not state.docs and (configuration.starter_urls or configuration.apify_dataset_id):
print(f"starting crawl ...")
crawled_docs = apify_crawl(configuration)
# same as before
2. Splitting documents for fine-grained retrieval
So far I directly embedded a crawled document. Crawled documents may be arbitrarily large.
- This puts an uncertainity on how many
top_k
retrieval results I should consider for response generation. In fact, for one site the documents were so large that only 3 of them crossed the token limit allowed on the generation node. - For complex queries, where composing high quality answers may need information from multiple documents, limiting
top_k
may lead to poor results.
To address this, I split crawled documents into multiple documents as follows:
# src/retrieval_graph/index_graph.py
# new import
from langchain_text_splitters import RecursiveCharacterTextSplitter
# same as before
async def index_docs(
state: IndexState, *, config: Optional[RunnableConfig] = None
) -> dict[str, str]:
if not config:
raise ValueError("Configuration required to run index_docs.")
with retrieval.make_retriever(config) as retriever:
configuration = IndexConfiguration.from_runnable_config(config)
if not state.docs and (configuration.starter_urls or configuration.apify_dataset_id):
print(f"starting crawl ...")
crawled_docs = apify_crawl(configuration)
# use a 1000 char size overlapping window based splitter to create smaller document chunks to index
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
state.docs = text_splitter.split_documents(crawled_docs)
# same as before
We can now retrieve a larger set of top_k
results. Since the maximum chunk size here is 1000
, we can deterministically configure the top_k
based on the token limits supported by the language model used in the response generation node. E.g., if the token limit is 100,000 tokens, we can potentially use top_k
upto 500
(assuming an average token size of 5 characters). Note that I already support a configuration for specifying top_k
.
# src/retrieval_graph/retreival.py
# ...
def make_milvus_retriever(
configuration: IndexConfiguration, embedding_model: Embeddings, **kwargs
) -> Generator[VectorStoreRetriever, None, None]:
# ...
yield vstore.as_retriever(search_kwargs=configuration.search_kwargs)
# ...
Here, search_kwargs = {"k": 10}
conveys that retriever will return top 10 results.
3. Rate limit errors while embedding
Emebedding Models put limits on the number of token per unit time. Initially, I embedded the whole set of crawled (or split) documents in a single call. For large sites, this ran into rate limit errors. With the following enhancement I added a batch size configuration, split documents into multiple batches, and added a time delay to avoid rate limits.
# src/retrieval_graph/configuration.py
# ...
@dataclass(kw_only=True)
class IndexConfiguration(CommonConfiguration):
# ...
batch_size: int = field(
default=400,
metadata={
"description": "Number of documents to index in a single batch."
},
)
# ...
# src/retrieval_graph/index_graph.py
# ...
# generator function to create batches
def create_batches(docs, batch_size):
"""Chunk documents into smaller batches."""
for i in range(0, len(docs), batch_size):
yield docs[i:i + batch_size]
async def index_docs(
state: IndexState, *, config: Optional[RunnableConfig] = None
) -> dict[str, str]:
if not config:
raise ValueError("Configuration required to run index_docs.")
with retrieval.make_retriever(config) as retriever:
configuration = IndexConfiguration.from_runnable_config(config)
# ...
stamped_docs = ensure_docs_have_user_id(state.docs, config)
# embed in batches to avoid rate limit errors
batch_size = configuration.batch_size
for i, batch in enumerate(create_batches(stamped_docs, batch_size)):
if configuration.retriever_provider == "milvus":
retriever.add_documents(batch)
else:
await retriever.aadd_documents(batch)
# sleep if there are more batches to embed
if i < (len(stamped_docs) // batch_size):
time.sleep(60)
return {"docs": "delete"}
# ...
4. Pre-created indexes (for ease of demo)
While demonstrating the solution to clients, I noticed the following:
- Live crawling can take quite a while.
- Indexing pre-crawled datasets may also take some time due to delays needed to avoid rate limit errors. It also adds to model usage cost (if you are repeating it).
To address these, I added:
- Configurations that (optionally) load an index from an alternate location.
- Some of the pre-created indexes for clients to the repository, so the index files are available in the LangGraph cloud deployment.
# src/retrieval_graph/configuration.py
# ...
@dataclass(kw_only=True)
class Configuration(CommonConfiguration):
"""The configuration for the agent."""
# optional: index location
alternate_milvus_uri: Optional[str] = field(
default="",
metadata={
"description": "If you want to use one of the already available indexes, provide the file location here."
},
)
# ...
# src/retrieval_graph/retrieval.py
# ...
@contextmanager
def make_milvus_retriever(
configuration: IndexConfiguration, embedding_model: Embeddings, **kwargs
) -> Generator[VectorStoreRetriever, None, None]:
# ...
# use alternate milvus uri if provided in the configuration
milvus_uri = kwargs.get("alternate_milvus_uri") or os.environ.get("MILVUS_DB")
vstore = Milvus (
# ...
)
# ...
# ...
During demonstrations, I now use pre-created indexes to save time and money.