This documentation page is also available as an interactive notebook. You can launch the notebook in
Kaggle or Colab, or download it for use with an IDE or local Jupyter installation, by clicking one of the
above links.
In this tutorial, we’ll demonstrate how RAG operations can be
implemented in Pixeltable. In particular, we’ll develop a RAG
application that summarizes a collection of PDF documents and uses
ChatGPT to answer questions about them.
In a traditional RAG workflow, such operations might be implemented as a
Python script that runs on a periodic schedule or in response to certain
events. In Pixeltable, they are implemented as persistent tables that
are updated automatically and incrementally as new data becomes
available.
We first set up our OpenAI API key:
import os
import getpass
if 'OPENAI_API_KEY' not in os.environ:
os.environ['OPENAI_API_KEY'] = getpass.getpass('OpenAI API Key:')
We then install the packages we need for this tutorial and then set up
our environment.
%pip install -q pixeltable sentence-transformers tiktoken openai openpyxl
Note: you may need to restart the kernel to use updated packages.
import numpy as np
import pixeltable as pxt
# Ensure a clean slate for the demo
pxt.drop_dir('rag_demo', force=True)
pxt.create_dir('rag_demo')
Connected to Pixeltable database at: postgresql+psycopg://postgres:@/pixeltable?host=/home/marcel/.pixeltable/pgdata
Created directory ‘rag_demo’.
<pixeltable.catalog.dir.Dir at 0x7eb34c1eceb0>
Next we’ll create a table containing the sample questions we want to
answer. The questions are stored in an Excel spreadsheet, along with a
set of “ground truth” answers to help evaluate our model pipeline. We
can use create_table() with the source parameter to load them. Note
that we can pass the URL of the spreadsheet directly.
base = 'https://github.com/pixeltable/pixeltable/raw/main/docs/resources/rag-demo/'
qa_url = base + 'Q-A-Rag.xlsx'
queries_t = pxt.create_table('rag_demo.queries', source=qa_url)
Created table ‘queries’.
Inserting rows into `queries`: 8 rows [00:00, 1853.94 rows/s]
Inserted 8 rows with 0 errors.
Outline
There are two major parts to our RAG application:
- Document Indexing: Load the documents, split them into chunks, and
index them using a vector embedding.
- Querying: For each question on our list, do a top-k lookup for the
most relevant chunks, use them to construct a ChatGPT prompt, and
send the enriched prompt to an LLM.
We’ll implement both parts in Pixeltable.
Document Indexing
All data in Pixeltable, including documents, resides in tables.
Tables are persistent containers that can serve as the store of record
for your data. Since we are starting from scratch, we will start with an
empty table rag_demo.documents with a single column, document.
documents_t = pxt.create_table(
'rag_demo.documents',
{'document': pxt.Document}
)
documents_t
Created table ‘documents’.
Next, we’ll insert our first few source documents into the new table.
We’ll leave the rest for later, in order to show how to update the
indexed document base incrementally.
document_urls = [
base + 'Argus-Market-Digest-June-2024.pdf',
base + 'Argus-Market-Watch-June-2024.pdf',
base + 'Company-Research-Alphabet.pdf',
base + 'Jefferson-Amazon.pdf',
base + 'Mclean-Equity-Alphabet.pdf',
base + 'Zacks-Nvidia-Report.pdf',
]
documents_t.insert({'document': url} for url in document_urls[:3])
documents_t.show()
Inserting rows into `documents`: 3 rows [00:00, 1925.76 rows/s]
Inserted 3 rows with 0 errors.
In RAG applications, we often decompose documents into smaller units, or
chunks, rather than treating each document as a single entity. In this
example, we’ll use Pixeltable’s built-in DocumentSplitter, but in
general the chunking methodology is highly customizable.
DocumentSplitter has a variety of options for controlling the chunking
behavior, and it’s also possible to replace it entirely with a
user-defined iterator (or an adapter for a third-party document
splitter).
In Pixeltable, operations such as chunking can be automated by creating
views of the base documents table. A view is a virtual derived
table: rather than adding data directly to the view, we define it via a
computation over the base table. In this example, the view is defined by
iteration over the chunks of a DocumentSplitter.
from pixeltable.iterators import DocumentSplitter
chunks_t = pxt.create_view(
'rag_demo.chunks',
documents_t,
iterator=DocumentSplitter.create(
document=documents_t.document,
separators='token_limit',
limit=300
)
)
Inserting rows into `chunks`: 41 rows [00:00, 21767.91 rows/s]
Our chunks view now has 3 columns:
text is the chunk text produced by the DocumentSplitter
pos is a system-generated integer column, starting at 0, that
provides a sequence number for each row
document, which is simply the document column from the base
table documents. We won’t need it here, but having access to the
base table’s columns (in effect a parent-child join) can be quite
useful.
Notice that as soon as we created it, chunks was automatically
populated with data from the existing documents in our base table. We
can select the first 2 chunks from each document using common query
operations, in order to get a feel for what was extracted:
chunks_t.where(chunks_t.pos < 2).show()
Now let’s compute vector embeddings for the document chunks and store
them in a vector index. Pixeltable has built-in support for vector
indexing using a variety of embedding model families, and it’s easy for
users to add new ones via UDFs. In this demo, we’re going to use the E5
model from the Huggingface sentence_transformers library, which runs
locally.
The following command creates a vector index on the text column in the
chunks table, using the E5 embedding model. (For details on index
creation, see the Embedding and Vector
Indices
guide.) Note that defining the index is sufficient in order to load it
with the existing data (and also to update it when the underlying data
changes, as we’ll see later).
from pixeltable.functions.huggingface import sentence_transformer
chunks_t.add_embedding_index(
'text',
embedding=sentence_transformer.using(model_id='intfloat/e5-large-v2')
)
This completes the first part of our application, creating an indexed
document base. Next, we’ll use it to run some queries.
Querying
In order to express a top-k lookup against our index, we use
Pixeltable’s similarity operator in combination with the standard
order_by and limit operations. Before building this into our
application, let’s run a sample query to make sure it works.
query_text = "What is the expected EPS for Nvidia in Q1 2026?"
sim = chunks_t.text.similarity(query_text)
nvidia_eps_query = (
chunks_t
.order_by(sim, asc=False)
.select(similarity=sim, text=chunks_t.text)
.limit(5)
)
nvidia_eps_query.collect()
We perform this context retrieval for each row of our queries table by
adding it as a computed column. In this case, the operation is a top-k
similarity lookup against the data in the chunks table. To implement
this operation, we’ll use Pixeltable’s @query decorator to enhance the
capabilities of the chunks table.
# A @query is essentially a reusable, parameterized query that is attached to a table (or view),
# which is a modular way of getting data from that table.
@pxt.query
def top_k(query_text: str):
sim = chunks_t.text.similarity(query_text)
return (
chunks_t.order_by(sim, asc=False)
.select(chunks_t.text, sim=sim)
.limit(5)
)
# Now add a computed column to `queries_t`, calling the query
# `top_k` that we just defined.
queries_t.add_computed_column(
question_context=top_k(queries_t.Question)
)
Added 8 column values with 0 errors.
8 rows updated, 8 values computed.
Our queries table now looks like this:
The new column question_context now contains the result of executing
the query for each row, formatted as a list of dictionaries:
queries_t.select(queries_t.question_context).head(1)
Asking the LLM
Now it’s time for the final step in our application: feeding the
document chunks and questions to an LLM for resolution. In this demo,
we’ll use OpenAI for this, but any other inference cloud or local model
could be used instead.
We start by defining a UDF that takes a top-k list of context chunks and
a question and turns them into a ChatGPT prompt.
# Define a UDF to create an LLM prompt given a top-k list of
# context chunks and a question.
@pxt.udf
def create_prompt(top_k_list: list[dict], question: str) -> str:
concat_top_k = '\n\n'.join(
elt['text'] for elt in reversed(top_k_list)
)
return f'''
PASSAGES:
{concat_top_k}
QUESTION:
{question}'''
We then add that again as a computed column to queries:
queries_t.add_computed_column(
prompt=create_prompt(queries_t.question_context, queries_t.Question)
)
Added 8 column values with 0 errors.
8 rows updated, 16 values computed.
We now have a new string column containing the prompt:
queries_t.select(queries_t.prompt).head(1)
We now add another computed column to call OpenAI. For the
chat_completions() call, we need to construct two messages, containing
the instructions to the model and the prompt. For the latter, we can
simply reference the prompt column we just added.
from pixeltable.functions import openai
# Assemble the prompt and instructions into OpenAI's message format
messages = [
{
'role': 'system',
'content': 'Please read the following passages and answer the question based on their contents.'
},
{
'role': 'user',
'content': queries_t.prompt
}
]
# Add a computed column that calls OpenAI
queries_t.add_computed_column(
response=openai.chat_completions(model='gpt-4o-mini', messages=messages)
)
Added 8 column values with 0 errors.
8 rows updated, 8 values computed.
Our queries table now contains a JSON-structured column response,
which holds the entire API response structure. At the moment, we’re only
interested in the response content, which we can extract easily into
another computed column:
queries_t.add_computed_column(
answer=queries_t.response.choices[0].message.content
)
Added 8 column values with 0 errors.
8 rows updated, 8 values computed.
We now have the following queries schema:
Let’s take a look at what we got back:
queries_t.select(queries_t.Question, queries_t.correct_answer, queries_t.answer).show()
The application works, but, as expected, a few questions couldn’t be
answered due to the missing documents. As a final step, let’s add the
remaining documents to our document base, and run the queries again.
Incremental Updates
Pixeltable’s views and computed columns update automatically in response
to new data. We can see this when we add the remaining documents to our
documents table. Watch how the chunks view is updated to stay in
sync with documents:
documents_t.insert({'document': p} for p in document_urls[3:])
Inserting rows into `documents`: 3 rows [00:00, 1949.63 rows/s]
Inserting rows into `chunks`: 68 rows [00:00, 601.57 rows/s]
Inserted 71 rows with 0 errors.
71 rows inserted, 6 values computed.
(Note: although Pixeltable updates documents and chunks, it does
not automatically update the queries table. This is by design: we
don’t want all rows in queries to get automatically re-executed every
time a single new document is added to the document base. However,
newly-added rows will be run over the new, incrementally-updated index.)
To confirm that the chunks index got updated, we’ll re-run the chunks
retrieval query for the question
What is the expected EPS for Nvidia in Q1 2026?
Previously, our most similar chunk had a similarity score of ~0.81.
Let’s see what we get now:
nvidia_eps_query.collect()
Our most similar chunk now has a score of ~0.86 and pulls in more
relevant chunks from the newly-inserted documents.
Let’s recompute the response column of the queries table, which will
automatically recompute the answer column as well.
queries_t.recompute_columns('response')
Inserting rows into `queries`: 8 rows [00:00, 2128.01 rows/s]
8 rows updated, 16 values computed.
As a final step, let’s confirm that all the queries now have answers:
queries_t.select(
queries_t.Question,
queries_t.correct_answer,
queries_t.answer
).show()