Skip to content

Annotator

llm_annotator.annotator

Annotator dataclass

Annotator(
    client: Client,
    batch_size: int = 256,
    num_proc: int | None = DEFAULT_CPU_COUNT,
    verbose: bool = False,
)

Sensible base class for LLM-based dataset annotation.

This class provides a framework for annotating datasets using large language models via a pluggable :class:~llm_annotator.clients.base.Client. It handles dataset loading, processing, and output generation with support for batching and uploading to Hugging Face Hub.

The Annotator class has four public entry points:

* :meth:`prepare_data`. Apply prompt templates, sorting, and caching
    without running inference. Backs prepared artifacts up to Hugging Face
    Hub if ``prepared_hub_id`` is provided.
* :meth:`run_annotation`. Run inference only, consuming data returned by
    :meth:`prepare_data` or loaded from a local path or Hub repo.
* :meth:`annotate_dataset`. Convenience wrapper that calls
    :meth:`prepare_data` and then :meth:`run_annotation` in one call.
* :meth:`generate_dataset`. Generate a new dataset from scratch by calling
    :meth:`annotate_dataset` over a synthetic prompt dataset.

The staged :meth:prepare_data + :meth:run_annotation pattern is recommended for large-scale or cluster (SLURM) workflows. When prepared_hub_id is provided, prepared artifacts are stored on Hugging Face Hub and restored automatically on the next call, so a failed generation job can be restarted without repeating the preparation step.

Parameters:

Name Type Description Default
client Client

An initialised :class:~llm_annotator.clients.base.Client instance that performs the actual generation.

required
batch_size int

Number of samples per inference batch.

256
num_proc int | None

Number of processes for dataset preprocessing.

DEFAULT_CPU_COUNT
verbose bool

Whether to print progress information.

False

Examples:

Basic usage with an OpenAI client:

from llm_annotator import Annotator, OpenAIClient
client = OpenAIClient(model="gpt-4o-mini")
with Annotator(client=client) as anno:
    ds = anno.annotate_dataset(
        output_dir="outputs/data",
        prompt_template="Process: {text}",
        dataset_name="my-dataset",
    )

Usage with vLLM offline client:

from llm_annotator import Annotator, VLLMOfflineClient
client = VLLMOfflineClient(
    model="meta-llama/Llama-3.2-3B-Instruct",
    max_model_len=4096,
)
try:
    ds = Annotator(client=client).annotate_dataset(
        output_dir="outputs/data",
        prompt_template="Process: {text}",
        dataset_name="my-dataset",
    )
finally:
    client.destroy()

__post_init__

__post_init__() -> None

Initialize a package-scoped logger for annotator runtime messages.

__enter__

__enter__() -> 'Annotator'

Enter the context manager, returning the annotator instance.

__exit__

__exit__(exc_type: Any, exc: Any, tb: Any) -> None

Exit the context manager and free all client resources.

destroy

destroy() -> None

Clean up all resources used by the underlying client.

prepare_data

prepare_data(
    output_dir: str | Path,
    prompt_template: str,
    *,
    dataset_name: str | None = None,
    dataset: Dataset | None = None,
    dataset_config: str | None = None,
    data_dir: str | None = None,
    dataset_split: str | None = None,
    max_num_samples: int | None = None,
    shuffle_seed: int | None = None,
    preprocess_fn: Callable | None = None,
    prompt_field_swapper: dict[str, str] | None = None,
    idx_column: str = "idx",
    task_prefix: str = "",
    sort_by_length: bool
    | Literal["shortest_first", "longest_first"] = False,
    system_message: str | None = None,
    prepared_hub_id: str | None = None,
    force_data_preparation: bool = False,
) -> tuple[Dataset, Path | None, str | None]

Prepare input data for annotation without running generation.

The method reuses local prepared data first, then optionally restores prepared data from Hugging Face Hub, and finally falls back to building the prepared dataset from source.

Parameters:

Name Type Description Default
output_dir str | Path

Directory where prepared artifacts are stored.

required
prompt_template str

Prompt template used to build chat messages.

required
dataset_name str | None

Name or path of the dataset to load.

None
dataset Dataset | None

Pre-loaded dataset to use instead of loading from name/path.

None
dataset_config str | None

Dataset configuration name (optional).

None
data_dir str | None

Data directory for local datasets (optional).

None
dataset_split str | None

Specific split to load (optional).

None
max_num_samples int | None

Maximum number of samples to prepare.

None
shuffle_seed int | None

Seed for dataset shuffling.

None
preprocess_fn Callable | None

Optional function to preprocess the dataset after loading and before applying the prompt template.

None
prompt_field_swapper dict[str, str] | None

Optional mapping to replace template fields.

None
idx_column str

Column name used as unique identifier. Must not exist in the input dataset.

'idx'
task_prefix str

Prefix for internal columns and artifact names.

''
sort_by_length bool | Literal['shortest_first', 'longest_first']

Whether to sort prompts by length.

False
system_message str | None

Optional system message for chat prompts.

None
prepared_hub_id str | None

Optional Hugging Face dataset ID for prepared-data backup and restore. Will be stored in the "prepared_cache" configuration of the repo if provided.

None
force_data_preparation bool

Whether to rebuild prepared data even when local or Hub artifacts already exist.

False

Returns:

Type Description
Dataset

Tuple of prepared dataset, local prepared-data path when available,

Path | None

and prepared Hugging Face dataset ID when available.

run_annotation

run_annotation(
    output_dir: str | Path,
    prompt_template: str,
    *,
    prepared_dataset: Dataset | None = None,
    prepared_data_path: str | Path | None = None,
    prepared_hub_id: str | None = None,
    resume_from_hub_id: str | None = None,
    new_hub_id: str | None = None,
    overwrite: bool = False,
    dataset_split: str | None = None,
    dataset_config: str | None = None,
    keep_columns: str | Iterable[str] | bool | None = None,
    options: ProviderRuntimeOptions | None = None,
    output_schema: str | dict[str, Any] | None = None,
    idx_column: str = "idx",
    upload_every_n_samples: int | None = 0,
    max_samples_per_output_file: int = 0,
    task_prefix: str = "",
    validate_fn: Callable | None = None,
    postprocess_fn: Callable | None = None,
    num_retries_invalid: int = 5,
    system_message: str | None = None,
    keep_idx_column: bool = False,
) -> Dataset

Run model generation on already prepared annotation inputs.

Parameters:

Name Type Description Default
output_dir str | Path

Directory where annotation output is written.

required
prompt_template str

Prompt template used for warm-up metadata.

required
prepared_dataset Dataset | None

Pre-prepared dataset with messages column.

None
prepared_data_path str | Path | None

Local path to prepared data on disk.

None
prepared_hub_id str | None

Hugging Face dataset ID with prepared data.

None
resume_from_hub_id str | None

Optional Hugging Face dataset ID to restore generation checkpoints from.

None
new_hub_id str | None

Optional Hugging Face dataset ID for generation uploads.

None
overwrite bool

Whether to overwrite existing output directory.

False
dataset_split str | None

Dataset split used for skip filtering.

None
dataset_config str | None

Dataset config used for skip filtering.

None
keep_columns str | Iterable[str] | bool | None

Columns to keep in output. True for all.

None
options ProviderRuntimeOptions | None

Runtime options passed to the client.

None
output_schema str | dict[str, Any] | None

Convenience JSON schema input. When provided, it is injected into options.json_schema.

None
idx_column str

Column name used as unique identifier.

'idx'
upload_every_n_samples int | None

Upload to Hub every N samples.

0
max_samples_per_output_file int

Maximum samples per output file.

0
task_prefix str

Prefix for internal columns and file names.

''
validate_fn Callable | None

Optional custom validation function.

None
postprocess_fn Callable | None

Optional postprocessing function that takes in a sample and must return a dict.

None
num_retries_invalid int

Number of retries for invalid outputs.

5
system_message str | None

Optional system message for chat prompts.

None
keep_idx_column bool

Whether to keep idx column in final dataset.

False

Returns:

Type Description
Dataset

Final concatenated annotation dataset.

Raises:

Type Description
ValueError

If no prepared data source can be resolved.

annotate_dataset

annotate_dataset(
    output_dir: str | Path,
    prompt_template: str | None = None,
    *,
    full_prompt_template: str | None = None,
    dataset_name: str | None = None,
    dataset: Dataset | None = None,
    dataset_config: str | None = None,
    data_dir: str | None = None,
    dataset_split: str | None = None,
    max_num_samples: int | None = None,
    shuffle_seed: int | None = None,
    preprocess_fn: Callable | None = None,
    prompt_field_swapper: dict[str, str] | None = None,
    idx_column: str = "idx",
    task_prefix: str = "",
    sort_by_length: bool
    | Literal["shortest_first", "longest_first"] = False,
    system_message: str | None = None,
    prepared_hub_id: str | None = None,
    force_data_preparation: bool = False,
    new_hub_id: str | None = None,
    overwrite: bool = False,
    keep_columns: str | Iterable[str] | bool | None = None,
    options: ProviderRuntimeOptions | None = None,
    output_schema: str | dict[str, Any] | None = None,
    upload_every_n_samples: int | None = 0,
    max_samples_per_output_file: int = 0,
    validate_fn: Callable | None = None,
    postprocess_fn: Callable | None = None,
    num_retries_invalid: int = 5,
    keep_idx_column: bool = False,
) -> Dataset

Annotate an existing dataset in one call.

This is a convenience wrapper around :meth:prepare_data and :meth:run_annotation for callers that prefer a single entry point.

Parameters:

Name Type Description Default
output_dir str | Path

Directory where annotation output is written.

required
prompt_template str | None

Prompt template with dataset fields. Defaults to full_prompt_template when provided.

None
full_prompt_template str | None

Backwards-compatible alias for prompt_template.

None
dataset_name str | None

Name or path of the dataset to load.

None
dataset Dataset | None

Pre-loaded dataset to annotate instead of loading one.

None
dataset_config str | None

Dataset configuration name.

None
data_dir str | None

Data directory for local datasets.

None
dataset_split str | None

Dataset split to load.

None
max_num_samples int | None

Maximum number of samples to annotate.

None
shuffle_seed int | None

Seed for dataset shuffling.

None
preprocess_fn Callable | None

Optional preprocessing callback.

None
prompt_field_swapper dict[str, str] | None

Optional mapping that renames prompt fields.

None
idx_column str

Column name used as the stable sample identifier.

'idx'
task_prefix str

Prefix for internal column names and output files.

''
sort_by_length bool | Literal['shortest_first', 'longest_first']

Whether to sort prompts by length.

False
system_message str | None

Optional system message for the chat prompt.

None
prepared_hub_id str | None

Optional Hub dataset ID for prepared-data cache.

None
force_data_preparation bool

Rebuild prepared data even if cached.

False
new_hub_id str | None

Optional Hub dataset ID for annotation outputs.

None
overwrite bool

Whether to overwrite the output directory.

False
keep_columns str | Iterable[str] | bool | None

Columns to keep in the final dataset.

None
options ProviderRuntimeOptions | None

Runtime options passed to the client.

None
output_schema str | dict[str, Any] | None

Optional JSON schema for structured output.

None
upload_every_n_samples int | None

Upload checkpoint cadence.

0
max_samples_per_output_file int

Maximum samples per output file.

0
validate_fn Callable | None

Optional validation callback.

None
postprocess_fn Callable | None

Optional postprocessing callback.

None
num_retries_invalid int

Number of retries for invalid outputs.

5
keep_idx_column bool

Whether to keep the index column in the result.

False

Returns:

Type Description
Dataset

The concatenated annotation dataset.

Raises:

Type Description
TypeError

If no prompt template is provided.

generate_dataset

generate_dataset(
    output_dir: str | Path,
    prompts: str | Sequence[str],
    *,
    prompt_prefix: str | None = None,
    new_hub_id: str | None = None,
    overwrite: bool = False,
    options: ProviderRuntimeOptions | None = None,
    max_num_samples: int | None = None,
    output_schema: str | dict[str, Any] | None = None,
    idx_column: str = "idx",
    upload_every_n_samples: int | None = 0,
    max_samples_per_output_file: int = 0,
    task_prefix: str = "",
    validate_fn: Callable | None = None,
    postprocess_fn: Callable | None = None,
    num_retries_invalid: int = 5,
    keep_idx_column: bool = False,
) -> Dataset

Generate a new dataset from prompts.

Parameters:

Name Type Description Default
output_dir str | Path

Directory where annotation output is written.

required
prompts str | Sequence[str]

A single prompt or a sequence of prompts.

required
prompt_prefix str | None

Optional shared prefix used for prefix caching.

None
new_hub_id str | None

Optional Hub dataset ID for annotation outputs.

None
overwrite bool

Whether to overwrite the output directory.

False
options ProviderRuntimeOptions | None

Runtime options passed to the client.

None
max_num_samples int | None

Number of times to repeat a single prompt.

None
output_schema str | dict[str, Any] | None

Optional JSON schema for structured output.

None
idx_column str

Column name used as the stable sample identifier.

'idx'
upload_every_n_samples int | None

Upload checkpoint cadence.

0
max_samples_per_output_file int

Maximum samples per output file.

0
task_prefix str

Prefix for internal column names and output files.

''
validate_fn Callable | None

Optional validation callback.

None
postprocess_fn Callable | None

Optional postprocessing callback.

None
num_retries_invalid int

Number of retries for invalid outputs.

5
keep_idx_column bool

Whether to keep the index column in the result.

False

Returns:

Type Description
Dataset

The concatenated annotation dataset.

Raises:

Type Description
ValueError

If no prompts are provided.

get_pfout_name

get_pfout_name(
    *,
    pdout: Path | str,
    max_samples_per_output_file: int,
    processed_n_samples: int | None = None,
) -> Path

Generate the output file name based on configuration.

Creates appropriate file names for output files, handling both single-file and multi-file output modes.

Parameters:

Name Type Description Default
pdout Path | str

The output directory path.

required
max_samples_per_output_file int

Maximum samples per output file (0 for unlimited).

required
processed_n_samples int | None

The number of samples processed so far.

None

Returns:

Type Description
Path

Path object for the output file name.

push_dir_to_hub

push_dir_to_hub(
    dir_path: Path | str,
    new_hub_id: str | None = None,
    *,
    task_prefix: str = "",
    revision: str | None = None,
    allow_patterns: list[str] | None = None,
    ignore_patterns: list[str] | None = None,
) -> None

Upload the output directory to Hugging Face Hub.

Creates a dataset repository and uploads all annotation files, excluding cached input data. Uses a separate branch for uploads.

Parameters:

Name Type Description Default
dir_path Path | str

Path to the directory containing annotation files.

required
new_hub_id str | None

Optional Hugging Face dataset ID to override the instance's new_hub_id.

None
task_prefix str

String prefix to use for branch naming.

''
revision str | None

Optional explicit branch name. Defaults to {task_prefix}jsonl_upload.

None
allow_patterns list[str] | None

Optional include patterns for upload.

None
ignore_patterns list[str] | None

Optional ignore patterns for upload.

None

destroy_on_error

destroy_on_error(
    func: Callable[..., Any],
) -> Callable[..., Any]

Decorate an Annotator method to call :meth:~Annotator.destroy on any exception.

Catches BaseException (including KeyboardInterrupt and SystemExit) so resources are freed even on forced termination. The original exception is always re-raised after the cleanup attempt.

Parameters:

Name Type Description Default
func Callable[..., Any]

The instance method to wrap.

required

Returns:

Type Description
Callable[..., Any]

The wrapped callable with automatic cleanup on failure.