Online Evaluation of RAG with Langfuse

Langfuse offers the feature to score your traces and spans. They can be used in multiple ways across Langfuse:

  1. Displayed on trace to provide a quick overview

  2. Segment all execution traces by scores to e.g. find all traces with a low-quality score

  3. Analytics: Detailed score reporting with drill downs into use cases and user segments

Ragas is an open-source tool that can help you run Model-Based Evaluation on your traces/spans, especially for RAG pipelines. Ragas can perform reference-free evaluations of various aspects of your RAG pipeline. Because it is reference-free you don’t need ground-truths when running the evaluations and can run it on production traces that you’ve collected with Langfuse.

The Environment

import os

# TODO REMOVE ENVIRONMENT VARIABLES!!!
# get keys for your project from https://cloud.langfuse.com
os.environ["LANGFUSE_PUBLIC_KEY"] = ""
os.environ["LANGFUSE_SECRET_KEY"] = ""

# your openai key
# os.environ["OPENAI_API_KEY"] = ""
%pip install datasets ragas llama_index python-dotenv --upgrade

The Data

For this example, we are going to use a dataset that has already been prepared by querying a RAG system and gathering its outputs. See below for instruction on how to fetch your production data from Langfuse.

The dataset contains the following columns

  • question: list[str] - These are the questions your RAG pipeline will be evaluated on.

  • answer: list[str] - The answer generated from the RAG pipeline and given to the user.

  • contexts: list[list[str]] - The contexts which were passed into the LLM to answer the question.

  • ground_truths: list[list[str]] - The ground truth answer to the questions. However, this can be ignored for online evaluations since we will not have access to ground-truth data in our case.

from datasets import load_dataset

fiqa_eval = load_dataset("explodinggradients/fiqa", "ragas_eval")["baseline"]
fiqa_eval
Dataset({
    features: ['question', 'ground_truths', 'answer', 'contexts'],
    num_rows: 30
})

The Metrics

For going to measure the following aspects of an RAG system. These metrics and from the Ragas library.

  1. faithfulness: This measures the factual consistency of the generated answer against the given context.

  2. answer_relevancy: Answer Relevancy, focuses on assessing how to-the-point and relevant the generated answer is to the given prompt.

  3. context precision: Context Precision is a metric that evaluates whether all of the ground-truth relevant items present in the contexts are ranked higher or not. Ideally, all the relevant chunks must appear at the top ranks. This metric is computed using the question and the contexts, with values ranging between 0 and 1, where higher scores indicate better precision.

  4. aspect_critique: This is designed to assess submissions based on predefined aspects such as harmlessness and correctness. Additionally, users have the flexibility to define their own aspects for evaluating submissions according to their specific criteria.

Do refer the documentation to know more about these metrics and how they work {docs}

# import metrics
from ragas.metrics import faithfulness, answer_relevancy, context_precision
from ragas.metrics.critique import SUPPORTED_ASPECTS, harmfulness

# metrics you chose
metrics = [faithfulness, answer_relevancy, context_precision, harmfulness]

for m in metrics:
    print(m.name)
    # also init the metrics
    m.init_model()
faithfulness
answer_relevancy
context_precision
harmfulness

The Setup

You can use model-based evaluation with Ragas in 2 ways

  1. Score each Trace: This means you will run the evaluations for each trace item. This gives you much better idea since of how each call to your RAG pipelines is performing but can be expensive

  2. Score as Batch: In this method we will take a random sample of traces on a periodic basis and score them. This brings down cost and gives you a rough estimate the performance of your app but can miss out on important samples.

In this cookbook, we’ll show you how to setup both.

Score with Trace

Lets take a small example of a single trace and see how you can score that with Ragas. First lets load the data

row = fiqa_eval[0]
row["question"], row["answer"]
('How to deposit a cheque issued to an associate in my business into my business account?',
 '\nThe best way to deposit a cheque issued to an associate in your business into your business account is to open a business account with the bank. You will need a state-issued "dba" certificate from the county clerk\'s office as well as an Employer ID Number (EIN) issued by the IRS. Once you have opened the business account, you can have the associate sign the back of the cheque and deposit it into the business account.')

Now lets init a Langfuse client SDK to instrument you app.

from langfuse import Langfuse

langfuse = Langfuse()

Here we are defining a utility function to score your trace with the metrics you chose.

def score_with_ragas(query, chunks, answer):
    scores = {}
    for m in metrics:
        print(f"calculating {m.name}")
        scores[m.name] = m.score_single(
            {"question": query, "contexts": chunks, "answer": answer}
        )
    return scores

You compute the score with each request. Below I’ve outlined a dummy application that does the following steps

  1. gets a question from the user

  2. fetch context from the database or vector store that can be used to answer the question from the user

  3. pass the question and the contexts to the LLM to generate the answer

All these step are logged as spans in a single trace in langfuse. You can read more about traces and spans from the langfuse documentation.

from langfuse.model import (
    CreateTrace,
    CreateSpan,
    CreateGeneration,
    CreateEvent,
    CreateScore,
)

# start a new trace when you get a question
question = row["question"]
trace = langfuse.trace(CreateTrace(name="rag"))

# retrieve the relevant chunks
# chunks = get_similar_chunks(question)
contexts = row["contexts"]
# pass it as span
trace.span(
    CreateSpan(
        name="retrieval", input={"question": question}, output={"contexts": contexts}
    )
)

# use llm to generate a answer with the chunks
# answer = get_response_from_llm(question, chunks)
answer = row["answer"]
trace.span(
    CreateSpan(
        name="generation",
        input={"question": question, "contexts": contexts},
        output={"answer": answer},
    )
)

# compute scores for the question, context, answer tuple
ragas_scores = score_with_ragas(question, contexts, answer)
ragas_scores
calculating faithfulness
calculating answer_relevancy
calculating context_precision
calculating harmfulness
{'faithfulness': 0.6666666666666667,
 'answer_relevancy': 0.9763805367382368,
 'context_precision': 0.9999999999,
 'harmfulness': 0}

Once the scores are computed you can add them to the trace in Langfuse:

# send the scores
for m in metrics:
    trace.score(CreateScore(name=m.name, value=ragas_scores[m.name]))

Trace with RAGAS scores

Note that the scoring is blocking so make sure that you sent the generated answer before waiting for the scores to get computed. Alternatively you can run score_with_ragas() in a separate thread and pass in the trace_id to log the scores.

Or you can consider

Scoring as batch

Scoring each production trace can be time-consuming and costly depending on your application architecture and traffic. In that case, it’s better to start off with a batch scoring method. Decide a timespan you want to run the batch process and the number of traces you want to sample from that time slice. Create a dataset and call ragas.evaluate to analyze the result.

You can run this periodically to keep track of how the scores are changing across timeslices and figure out if there are any discrepancies.

To create demo data in Langfuse, lets first create ~10 traces with the fiqa dataset.

from langfuse.model import (
    CreateTrace,
    CreateSpan,
    CreateGeneration,
    CreateEvent,
    CreateScore,
)

# fiqa traces
for interaction in fiqa_eval.select(range(10, 20)):
    trace = langfuse.trace(CreateTrace(name="rag"))
    trace.span(
        CreateSpan(
            name="retrieval",
            input={"question": question},
            output={"contexts": contexts},
        )
    )
    trace.span(
        CreateSpan(
            name="generation",
            input={"question": question, "contexts": contexts},
            output={"answer": answer},
        )
    )

# await that Langfuse SDK has processed all events before trying to retrieve it in the next step
langfuse.flush()

Now that the dataset is uploaded to langfuse you can retrieve it as needed with this handy function.

def get_traces(name=None, limit=None, user_id=None):
    all_data = []
    page = 1

    while True:
        response = langfuse.client.trace.list(name=name, page=page, user_id=user_id)
        if not response.data:
            break
        page += 1
        all_data.extend(response.data)
        if len(all_data) > limit:
            break

    return all_data[:limit]
from random import sample

NUM_TRACES_TO_SAMPLE = 3
traces = get_traces(name="rag", limit=5)
traces_sample = sample(traces, NUM_TRACES_TO_SAMPLE)

len(traces_sample)
3

Now lets make a batch and score it. Ragas uses huggingface dataset object to build the dataset and run the evaluation. If you run this on your own production data, use the right keys to extract the question, contexts and answer from the trace

# score on a sample
from random import sample

evaluation_batch = {
    "question": [],
    "contexts": [],
    "answer": [],
    "trace_id": [],
}

for t in traces_sample:
    observations = [langfuse.client.observations.get(o) for o in t.observations]
    for o in observations:
        if o.name == "retrieval":
            question = o.input["question"]
            contexts = o.output["contexts"]
        if o.name == "generation":
            answer = o.output["answer"]
    evaluation_batch["question"].append(question)
    evaluation_batch["contexts"].append(contexts)
    evaluation_batch["answer"].append(answer)
    evaluation_batch["trace_id"].append(t.id)
# run ragas evaluate
from datasets import Dataset
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy

ds = Dataset.from_dict(evaluation_batch)
r = evaluate(ds, metrics=[faithfulness, answer_relevancy])
evaluating with [faithfulness]
100%|███████████████████████████████████████████████████████████████████████████████████| 1/1 [00:23<00:00, 23.91s/it]
evaluating with [answer_relevancy]
100%|███████████████████████████████████████████████████████████████████████████████████| 1/1 [00:05<00:00,  5.04s/it]

And that is it! You can see the scores over a time period.

r
{'faithfulness': 0.8889, 'answer_relevancy': 0.9771}

You can also push the scores back into Langfuse or use the exported pandas dataframe to run further analysis.

df = r.to_pandas()

# add the langfuse trace_id to the result dataframe
df["trace_id"] = ds["trace_id"]

df.head()
question contexts answer faithfulness answer_relevancy trace_id
0 How to deposit a cheque issued to an associate... [Just have the associate sign the back and the... \nThe best way to deposit a cheque issued to a... 1.000000 0.976908 670bc2b5-f25a-4ec4-9567-27b44b3d9fd8
1 How to deposit a cheque issued to an associate... [Just have the associate sign the back and the... \nThe best way to deposit a cheque issued to a... 1.000000 0.977494 cb00e976-10b5-4d85-a35c-b3e8dd3dc955
2 How to deposit a cheque issued to an associate... [Just have the associate sign the back and the... \nThe best way to deposit a cheque issued to a... 0.666667 0.976893 2a00707d-a5e5-49ed-8bc2-67f133136330
from langfuse.model import InitialScore

for _, row in df.iterrows():
    for metric_name in ["faithfulness", "answer_relevancy"]:
        langfuse.score(
            InitialScore(
                name=metric_name, value=row[metric_name], trace_id=row["trace_id"]
            )
        )

List of traces with RAGAS scores