diff --git a/examples/arm/aot_arm_compiler.py b/examples/arm/aot_arm_compiler.py index 57fbbdb6826..ba2fba539dc 100644 --- a/examples/arm/aot_arm_compiler.py +++ b/examples/arm/aot_arm_compiler.py @@ -13,6 +13,7 @@ import os import sys +from enum import Enum from pathlib import Path # Add Executorch root to path so this script can be run from anywhere @@ -25,6 +26,9 @@ import torch from examples.devtools.scripts.export_bundled_program import save_bundled_program + +from examples.models import MODEL_NAME_TO_MODEL +from examples.models.model_factory import EagerModelFactory from executorch.backends.arm.common.arm_compile_spec import ArmCompileSpec from executorch.backends.arm.ethosu import EthosUCompileSpec from executorch.backends.arm.quantizer import ( @@ -61,8 +65,13 @@ # Quantize model if required using the standard export quantizaion flow. from torchao.quantization.pt2e.quantize_pt2e import convert_pt2e, prepare_pt2e -from ..models import MODEL_NAME_TO_MODEL -from ..models.model_factory import EagerModelFactory +# Maximum number of samples to use for calibration when quantizing. +CALIBRATION_MAX_SAMPLES = 1000 + + +class QuantMode(str, Enum): + INT8 = "INT8" + A16W8 = "A16W8" def _load_example_inputs(model_input: str | None) -> Any: # nosec B614 @@ -226,12 +235,103 @@ def get_model_and_inputs_from_name( ) +def as_input_tuple(sample: object) -> Tuple[torch.Tensor, ...]: + if isinstance(sample, tuple): + return sample + if isinstance(sample, list): + return tuple(sample) + if isinstance(sample, torch.Tensor): + return (sample,) + if isinstance(sample, dict): + if "pixel_values" in sample: + return (sample["pixel_values"],) + raise ValueError("Calibration sample dict must contain 'pixel_values' key.") + raise ValueError( + "Calibration sample must be a Tensor, tuple, list, or dict with " + "'pixel_values'." + ) + + +def load_calibration_sample( + path: str, example_inputs: Tuple[torch.Tensor, ...] +) -> Tuple[torch.Tensor, ...]: + suffix = Path(path).suffix.lower() + if suffix in {".pt", ".pth"}: + sample = torch.load(path, weights_only=False) # nosec B614 trusted inputs + return as_input_tuple(sample) + raise ValueError(f"Unsupported calibration file type: {path}") + + +def load_calibration_samples( + calibration_data: str | None, + example_inputs: Tuple[torch.Tensor, ...], +) -> Optional[List[Tuple[torch.Tensor, ...]]]: + if calibration_data is None: + return None + + path = Path(calibration_data) + if path.is_file(): + return [load_calibration_sample(str(path), example_inputs)] + + if not path.is_dir(): + raise ValueError( + f"Calibration data path '{calibration_data}' is not a file or directory." + ) + + supported_suffixes = {".pt", ".pth"} + candidates = sorted( + str(p) + for p in path.rglob("*") + if p.is_file() and p.suffix.lower() in supported_suffixes + ) + if not candidates: + raise ValueError( + f"No supported calibration files found in directory '{calibration_data}'." + ) + + samples: List[Tuple[torch.Tensor, ...]] = [] + for candidate in candidates[:CALIBRATION_MAX_SAMPLES]: + samples.append(load_calibration_sample(candidate, example_inputs)) + + return samples + + +def _validate_calibration_sample( + calibration_sample: Tuple[torch.Tensor, ...], + example_inputs: Tuple[torch.Tensor, ...], +) -> None: + expected_len = len(example_inputs) + + if len(calibration_sample) != expected_len: + raise ValueError( + "Calibration sample has %d inputs, expected %d." + % (len(calibration_sample), expected_len) + ) + for input_idx, (expected, actual) in enumerate( + zip(example_inputs, calibration_sample) + ): + if isinstance(expected, torch.Tensor) and isinstance(actual, torch.Tensor): + if expected.shape != actual.shape: + raise ValueError( + "Calibration sample input %d shape %s does not match " + "expected shape %s." + % (input_idx, list(actual.shape), list(expected.shape)) + ) + elif type(expected) is not type(actual): + raise ValueError( + "Calibration sample input %d type %s does not match " + "expected type %s." + % (input_idx, type(actual).__name__, type(expected).__name__) + ) + + def quantize( model: GraphModule, model_name: str, compile_specs: ArmCompileSpec, example_inputs: Tuple[torch.Tensor], - is_int16x8: bool = False, + quant_mode: QuantMode = QuantMode.INT8, + calibration_samples: Optional[List[Tuple[torch.Tensor, ...]]] = None, ) -> GraphModule: """This is the official recommended flow for quantization in pytorch 2.0 export. @@ -242,24 +342,28 @@ def quantize( quantizer = create_quantizer(compile_specs) - if is_int16x8: - if compile_specs.tosa_spec.support_extension("int16"): - operator_config = get_symmetric_a16w8_quantization_config( - is_per_channel=True - ) - else: - raise ValueError( - f"Context TOSA spec {compile_specs.tosa_spec} doesn't support int16" - ) - else: - operator_config = get_symmetric_quantization_config(is_per_channel=True) + match quant_mode: + case QuantMode.INT8: + operator_config = get_symmetric_quantization_config(is_per_channel=True) + case QuantMode.A16W8: + if compile_specs.tosa_spec.support_extension("int16"): + operator_config = get_symmetric_a16w8_quantization_config( + is_per_channel=True + ) + else: + raise ValueError( + f"Context TOSA spec {compile_specs.tosa_spec} doesn't support int16" + ) quantizer.set_global(operator_config) m = prepare_pt2e(model, quantizer) - # Calibrate model using example inputs - # TODO: Add support for using a calibration dataset - m(*example_inputs) + if calibration_samples is None: + calibration_samples = [example_inputs] + + for sample in calibration_samples: + _validate_calibration_sample(sample, example_inputs) + m(*sample) m = convert_pt2e(m) logging.debug(f"Quantized model: {m}") @@ -354,7 +458,7 @@ def forward(self, x): ] -def get_compile_spec(args) -> ArmCompileSpec: +def _get_compile_spec(args) -> ArmCompileSpec: compile_spec = None if args.target.startswith("TOSA"): tosa_spec = TosaSpecification.create_from_string(args.target) @@ -407,7 +511,7 @@ def dump_delegation_info(edge, intermediate_files_folder: Optional[str] = None): file.write(delegation_info_string) -def get_args(): +def _get_args(): parser = argparse.ArgumentParser() parser.add_argument( "-m", @@ -477,6 +581,17 @@ def get_args(): default=False, help="Produce a quantized model", ) + parser.add_argument( + "--calibration_data", + required=False, + default=None, + help=( + "Optional calibration data file or directory. If a directory is " + "provided, up to 1000 samples are used for calibration. " + "Supported files: .pt/.pth. If not provided," + "quantized models are calibrated on their example inputs." + ), + ) parser.add_argument( "-s", "--so_library", @@ -550,6 +665,9 @@ def get_args(): logging_level = logging.DEBUG if args.debug else logging.WARNING logging.basicConfig(level=logging_level, format=LOGGING_FORMAT, force=True) + if args.calibration_data is not None and not args.quantize: + raise RuntimeError("--calibration_data requires --quantize to be enabled.") + # if we have custom ops, register them before processing the model if args.so_library is not None: logging.info(f"Loading custom ops from {args.so_library}") @@ -570,7 +688,7 @@ def get_args(): return args -def save_bpte_program(exec_prog, original_model: torch.nn.Module, output_name: str): +def _save_bpte_program(exec_prog, original_model: torch.nn.Module, output_name: str): # Construct MethodTestSuite for Each Method # Generate Test Suites @@ -649,42 +767,50 @@ def save_bpte_program(exec_prog, original_model: torch.nn.Module, output_name: s def quantize_model( - args, model: GraphModule, example_inputs: Tuple[torch.Tensor], compile_spec, + model_name: str, + strict_export: bool, + quant_mode: QuantMode, + calibration_samples: Optional[List[Tuple[torch.Tensor, ...]]], ) -> Tuple[GraphModule, ExportedProgram]: - - is_int16x8 = True if args.target == "TOSA-1.0+INT+int16" else False model_quant = quantize( model, - args.model_name, + model_name, compile_spec, example_inputs, - is_int16x8, + quant_mode, + calibration_samples, ) # Wrap quantized model back into an exported_program exported_program = torch.export.export( - model_quant, example_inputs, strict=args.strict_export + model_quant, example_inputs, strict=strict_export ) return model_quant, exported_program -def to_edge_TOSA_delegate( +def _to_edge_TOSA_delegate( exported_program: ExportedProgram, - args, + compile_spec, model: GraphModule, + quant_mode: Optional[QuantMode], example_inputs: Tuple[torch.Tensor], + model_name: str, + strict_export: bool, + calibration_samples: Optional[List[Tuple[torch.Tensor, ...]]], ): - # As we can target multiple output encodings, one must - # be specified. - compile_spec = get_compile_spec(args) - model_quant = None - if args.quantize: + if quant_mode is not None: model_quant, exported_program = quantize_model( - args, model, example_inputs, compile_spec + model, + example_inputs, + compile_spec, + model_name, + strict_export, + quant_mode, + calibration_samples, ) partitioner = create_partitioner(compile_spec) @@ -705,11 +831,12 @@ def to_edge_TOSA_delegate( return model_quant, edge -def to_edge_cortex_m( +def _to_edge_cortex_m( exported_program: ExportedProgram, args, model: GraphModule, example_inputs: Tuple[torch.Tensor], + calibration_samples: Optional[List[Tuple[torch.Tensor, ...]]], ): """Cortex-M/CMSIS-NN compilation path with no delegation.""" logging.info("Using Cortex-M/CMSIS-NN compilation path (no delegation)") @@ -738,9 +865,11 @@ def _to_channels_last(x): quantizer = CortexMQuantizer() prepared = prepare_pt2e(model, quantizer) - # Calibrate model using example inputs - # TODO: Add support for using a calibration dataset - prepared(*example_inputs) + if calibration_samples is None: + calibration_samples = [example_inputs] + + for sample in calibration_samples: + prepared(*tuple(_to_channels_last(x) for x in sample)) model_quant = convert_pt2e(prepared) @@ -768,19 +897,28 @@ def _to_channels_last(x): return model_quant if args.quantize else None, edge -def to_edge_no_delegate( +def _to_edge_no_delegate( exported_program: ExportedProgram, - args, + compile_spec, model: GraphModule, + quant_mode: Optional[QuantMode], example_inputs: Tuple[torch.Tensor], + model_name: str, + strict_export: bool, + calibration_samples: Optional[List[Tuple[torch.Tensor, ...]]], ): model_quant = None - if args.quantize: + if quant_mode is not None: # As we can target multiple output encodings, one must # be specified. - compile_spec = get_compile_spec(args) model, exported_program = quantize_model( - args, model, example_inputs, compile_spec + model, + example_inputs, + compile_spec, + model_name, + strict_export, + quant_mode, + calibration_samples, ) model_quant = model @@ -800,12 +938,15 @@ def to_edge_no_delegate( if __name__ == "__main__": # noqa: C901 - args = get_args() + args = _get_args() # Pick model from one of the supported lists original_model, example_inputs = get_model_and_inputs_from_name( args.model_name, args.model_input ) + calibration_samples = load_calibration_samples( + args.calibration_data, example_inputs + ) model = original_model.eval() # export under the assumption we quantize, the exported form also works @@ -837,6 +978,11 @@ def to_edge_no_delegate( # Quantize if required model_quant = None + if args.quantize: + quant_mode = QuantMode.A16W8 if "int16" in args.target else QuantMode.INT8 + else: + quant_mode = None + if args.target == "cortex-m55+int8": # Cortex-M path: CMSIS-NN portable kernels, no delegation if args.delegate: @@ -845,16 +991,36 @@ def to_edge_no_delegate( "(this target does not use delegated ops)." ) args.delegate = False - model_quant, edge = to_edge_cortex_m( - exported_program, args, model, example_inputs + model_quant, edge = _to_edge_cortex_m( + exported_program, + args, + model, + example_inputs, + calibration_samples, ) elif args.delegate: - model_quant, edge = to_edge_TOSA_delegate( - exported_program, args, model, example_inputs + # As we can target multiple output encodings, one must + # be specified. + model_quant, edge = _to_edge_TOSA_delegate( + exported_program, + _get_compile_spec(args), + model, + quant_mode, + example_inputs, + args.model_name, + args.strict_export, + calibration_samples, ) else: - model_quant, edge = to_edge_no_delegate( - exported_program, args, model, example_inputs + model_quant, edge = _to_edge_no_delegate( + exported_program, + _get_compile_spec(args), + model, + quant_mode, + example_inputs, + args.model_name, + args.strict_export, + calibration_samples, ) dump_delegation_info(edge, args.intermediates) @@ -897,8 +1063,12 @@ def to_edge_no_delegate( f"When not using --bundleio a .bpte file should not be use as --output {args.output}" ) output_file_name = args.output + output_dir = os.path.dirname(output_file_name) + if output_dir: + os.makedirs(output_dir, exist_ok=True) else: # --output is a folder + os.makedirs(args.output, exist_ok=True) output_file_name = os.path.join(args.output, output_file_name) if args.bundleio or args.etrecord: @@ -910,7 +1080,7 @@ def to_edge_no_delegate( if args.bundleio: # Realize the quantization impact on numerics when generating reference output reference_model = original_model if not model_quant else model_quant - save_bpte_program(exec_prog, reference_model, output_file_name) + _save_bpte_program(exec_prog, reference_model, output_file_name) print(f"Bundle PTE file saved as {output_file_name}") else: save_pte_program(exec_prog, output_file_name)