Skip to content

Executor

Executor dataclass

Executor(desc: str = 'Evaluating', show_progress: bool = True, keep_progress_bar: bool = True, jobs: List[Any] = list(), raise_exceptions: bool = False, batch_size: Optional[int] = None, run_config: Optional[RunConfig] = None, _nest_asyncio_applied: bool = False)

Executor class for running asynchronous jobs with progress tracking and error handling.

Attributes:

Name Type Description
desc str

Description for the progress bar

show_progress bool

Whether to show the progress bar

keep_progress_bar bool

Whether to keep the progress bar after completion

jobs List[Any]

List of jobs to execute

raise_exceptions bool

Whether to raise exceptions or log them

batch_size int

Whether to batch (large) lists of tasks

run_config RunConfig

Configuration for the run

_nest_asyncio_applied bool

Whether nest_asyncio has been applied

submit

submit(callable: Callable, *args, name: Optional[str] = None, **kwargs) -> None

Submit a job to be executed, wrapping the callable with error handling and indexing to keep track of the job index.

Source code in src/ragas/executor.py
def submit(
    self,
    callable: t.Callable,
    *args,
    name: t.Optional[str] = None,
    **kwargs,
) -> None:
    """
    Submit a job to be executed, wrapping the callable with error handling and indexing to keep track of the job index.
    """
    callable_with_index = self.wrap_callable_with_index(callable, len(self.jobs))
    self.jobs.append((callable_with_index, args, kwargs, name))

results

results() -> List[Any]

Execute all submitted jobs and return their results. The results are returned in the order of job submission.

Source code in src/ragas/executor.py
def results(self) -> t.List[t.Any]:
    """
    Execute all submitted jobs and return their results. The results are returned in the order of job submission.
    """
    if is_event_loop_running():
        # an event loop is running so call nested_asyncio to fix this
        try:
            import nest_asyncio
        except ImportError as e:
            raise ImportError(
                "It seems like your running this in a jupyter-like environment. "
                "Please install nest_asyncio with `pip install nest_asyncio` to make it work."
            ) from e
        else:
            if not self._nest_asyncio_applied:
                nest_asyncio.apply()
                self._nest_asyncio_applied = True

    results = asyncio.run(self._process_jobs())
    sorted_results = sorted(results, key=lambda x: x[0])
    return [r[1] for r in sorted_results]

run_async_batch

run_async_batch(desc: str, func: Callable, kwargs_list: List[Dict], batch_size: Optional[int] = None)

Provide functionality to run the same async function with different arguments in parallel.

Source code in src/ragas/executor.py
def run_async_batch(
    desc: str,
    func: t.Callable,
    kwargs_list: t.List[t.Dict],
    batch_size: t.Optional[int] = None,
):
    """
    Provide functionality to run the same async function with different arguments in parallel.
    """
    run_config = RunConfig()
    executor = Executor(
        desc=desc,
        keep_progress_bar=False,
        raise_exceptions=True,
        run_config=run_config,
        batch_size=batch_size,
    )

    for kwargs in kwargs_list:
        executor.submit(func, **kwargs)

    return executor.results()

is_event_loop_running

is_event_loop_running() -> bool

Check if an event loop is currently running.

Source code in src/ragas/executor.py
def is_event_loop_running() -> bool:
    """
    Check if an event loop is currently running.
    """
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        return False
    else:
        return loop.is_running()