## Executor

```python
Executor(desc: str = 'Evaluating', show_progress: bool = True, keep_progress_bar: bool = True, jobs: List[Any] = list(), raise_exceptions: bool = False, batch_size: Optional[int] = None, run_config: Optional[RunConfig] = None, pbar: Optional[tqdm] = None, _jobs_processed: int = 0, _cancel_event: Event = Event())
```

Executor class for running asynchronous jobs with progress tracking and error handling.

Attributes:

| Name                    | Type        | Description                                       |
| ----------------------- | ----------- | ------------------------------------------------- |
| `desc`                  | `str`       | Description for the progress bar                  |
| `show_progress`         | `bool`      | Whether to show the progress bar                  |
| `keep_progress_bar`     | `bool`      | Whether to keep the progress bar after completion |
| `jobs`                  | `List[Any]` | List of jobs to execute                           |
| `raise_exceptions`      | `bool`      | Whether to raise exceptions or log them           |
| `batch_size`            | `int`       | Whether to batch (large) lists of tasks           |
| `run_config`            | `RunConfig` | Configuration for the run                         |
| `_nest_asyncio_applied` | `bool`      | Whether nest_asyncio has been applied             |
| `_cancel_event`         | `Event`     | Event to signal cancellation                      |

### cancel

```python
cancel() -> None
```

Cancel the execution of all jobs.

Source code in `src/ragas/executor.py`

```python
def cancel(self) -> None:
    """Cancel the execution of all jobs."""
    self._cancel_event.set()
```

### is_cancelled

```python
is_cancelled() -> bool
```

Check if the execution has been cancelled.

Source code in `src/ragas/executor.py`

```python
def is_cancelled(self) -> bool:
    """Check if the execution has been cancelled."""
    return self._cancel_event.is_set()
```

### submit

```python
submit(callable: Callable, *args, name: Optional[str] = None, **kwargs) -> None
```

Submit a job to be executed, wrapping the callable with error handling and indexing to keep track of the job index.

Source code in `src/ragas/executor.py`

```python
def submit(
    self,
    callable: t.Callable,
    *args,
    name: t.Optional[str] = None,
    **kwargs,
) -> None:
    """
    Submit a job to be executed, wrapping the callable with error handling and indexing to keep track of the job index.
    """
    # Use _jobs_processed for consistent indexing across multiple runs
    callable_with_index = self.wrap_callable_with_index(
        callable, self._jobs_processed
    )
    self.jobs.append((callable_with_index, args, kwargs, name))
    self._jobs_processed += 1
```

### clear_jobs

```python
clear_jobs() -> None
```

Clear all submitted jobs and reset counter.

Source code in `src/ragas/executor.py`

```python
def clear_jobs(self) -> None:
    """Clear all submitted jobs and reset counter."""
    self.jobs.clear()
    self._jobs_processed = 0
```

### aresults

```python
aresults() -> List[Any]
```

Execute all submitted jobs and return their results asynchronously. The results are returned in the order of job submission.

This is the async entry point for executing async jobs when already in an async context.

Source code in `src/ragas/executor.py`

```python
async def aresults(self) -> t.List[t.Any]:
    """
    Execute all submitted jobs and return their results asynchronously.
    The results are returned in the order of job submission.

    This is the async entry point for executing async jobs when already in an async context.
    """
    results = await self._process_jobs()
    sorted_results = sorted(results, key=lambda x: x[0])
    return [r[1] for r in sorted_results]
```

### results

```python
results() -> List[Any]
```

Execute all submitted jobs and return their results. The results are returned in the order of job submission.

This is the main sync entry point for executing async jobs.

Source code in `src/ragas/executor.py`

```python
def results(self) -> t.List[t.Any]:
    """
    Execute all submitted jobs and return their results. The results are returned in the order of job submission.

    This is the main sync entry point for executing async jobs.
    """

    async def _async_wrapper():
        return await self.aresults()

    apply_nest_asyncio()
    return run(_async_wrapper)
```

## run_async_batch

```python
run_async_batch(desc: str, func: Callable, kwargs_list: List[Dict], batch_size: Optional[int] = None)
```

Provide functionality to run the same async function with different arguments in parallel.

Source code in `src/ragas/executor.py`

```python
def run_async_batch(
    desc: str,
    func: t.Callable,
    kwargs_list: t.List[t.Dict],
    batch_size: t.Optional[int] = None,
):
    """
    Provide functionality to run the same async function with different arguments in parallel.
    """
    run_config = RunConfig()
    executor = Executor(
        desc=desc,
        keep_progress_bar=False,
        raise_exceptions=True,
        run_config=run_config,
        batch_size=batch_size,
    )

    for kwargs in kwargs_list:
        executor.submit(func, **kwargs)

    return executor.results()
```
