diff --git a/custom_ops/gpu_ops/append_attn/template_config.json b/custom_ops/gpu_ops/append_attn/template_config.json index b1932536859..c68a0b06cd6 100644 --- a/custom_ops/gpu_ops/append_attn/template_config.json +++ b/custom_ops/gpu_ops/append_attn/template_config.json @@ -17,7 +17,7 @@ "IsDynamicC8" ], "dispatch_params": { - "GROUP_SIZE": [1, 2, 4, 5, 6, 7, 8, 12, 14, 16, 24], + "GROUP_SIZE": [1, 2, 3, 4, 5, 6, 7, 8, 12, 14, 16, 24], "HEAD_DIM": [128], "BLOCK_SIZE": [64], "CAUSAL": [0, 1], @@ -54,7 +54,7 @@ "ENABLE_PREFILL" ], "dispatch_params": { - "GROUP_SIZE": [1, 2, 4, 5, 6, 7, 8, 12, 14, 16, 24], + "GROUP_SIZE": [1, 2, 3, 4, 5, 6, 7, 8, 12, 14, 16, 24], "HEAD_DIM": [128], "BLOCK_SIZE": [64], "CAUSAL": [0, 1], @@ -89,7 +89,7 @@ "ENABLE_PREFILL" ], "dispatch_params": { - "GROUP_SIZE": [1, 2, 4, 5, 6, 7, 8, 12, 14, 16, 24], + "GROUP_SIZE": [1, 2, 3, 4, 5, 6, 7, 8, 12, 14, 16, 24], "HEAD_DIM": [64,128,192], "BLOCK_SIZE": [64], "CAUSAL": [0, 1], diff --git a/fastdeploy/cache_manager/v1/cache_controller.py b/fastdeploy/cache_manager/v1/cache_controller.py index 52df6fa1186..34767fb98d8 100644 --- a/fastdeploy/cache_manager/v1/cache_controller.py +++ b/fastdeploy/cache_manager/v1/cache_controller.py @@ -18,6 +18,7 @@ import os import threading import time +from collections.abc import Sequence from concurrent.futures import ThreadPoolExecutor from typing import TYPE_CHECKING, Any, Dict, List, Optional @@ -104,6 +105,7 @@ def __init__(self, config: "FDConfig", local_rank: int, device_id: int): # Attention backend self.attn_backend = None + self.attn_backends = None @property def write_policy(self) -> Optional[str]: @@ -285,7 +287,9 @@ def initialize_kv_cache( Returns: cache_kvs_list: Flat list of allocated tensors in layer/role order. """ - self.attn_backend = attn_backend + attn_backends = self._normalize_attn_backends(attn_backend) + self.attn_backend = attn_backends[0] + self.attn_backends = attn_backends kv_cache_quant_type = self._get_kv_cache_quant_type() cache_dtype = "uint8" if kv_cache_quant_type is not None else self.model_config.dtype @@ -295,18 +299,19 @@ def initialize_kv_cache( f"backend={type(self.attn_backend).__name__}, kv_cache_quant_type={kv_cache_quant_type}" ) - caches = self.attn_backend.create_kv_cache( - num_layers=self._num_layers, - num_blocks=num_gpu_blocks, - cache_dtype=cache_dtype, - kv_cache_quant_type=kv_cache_quant_type, - ) - cache_kvs_list: List[Any] = [] - for (role, layer_idx), tensor in caches.items(): - name = self._format_cache_name(role, layer_idx) - self.cache_kvs_map[name] = tensor - cache_kvs_list.append(tensor) + for layer_idx, layer_attn_backend in enumerate(attn_backends): + caches = layer_attn_backend.create_kv_cache( + num_layers=1, + num_blocks=num_gpu_blocks, + cache_dtype=cache_dtype, + kv_cache_quant_type=kv_cache_quant_type, + layer_offset=layer_idx, + ) + for (role, cache_layer_idx), tensor in caches.items(): + name = self._format_cache_name(role, cache_layer_idx) + self.cache_kvs_map[name] = tensor + cache_kvs_list.append(tensor) paddle.device.cuda.empty_cache() logger.info("kv cache is initialized!") @@ -315,7 +320,7 @@ def initialize_kv_cache( self._transfer_manager.set_cache_kvs_map(self.cache_kvs_map) # Initialize host cache - self.initialize_host_cache(self.attn_backend) + self.initialize_host_cache(self.attn_backends) return cache_kvs_list @@ -498,6 +503,15 @@ def _bind_to_closest_numa_node(self) -> bool: logger.warning(f"[CacheController] NUMA binding failed: {e}") return False + def _normalize_attn_backends(self, attn_backend: Any) -> Sequence[Any]: + if isinstance(attn_backend, Sequence) and not isinstance(attn_backend, (str, bytes)): + if len(attn_backend) != self._num_layers: + raise ValueError( + f"attn_backend length {len(attn_backend)} does not match num_layers {self._num_layers}" + ) + return attn_backend + return [attn_backend] * self._num_layers + def initialize_host_cache( self, attn_backend: Any, @@ -533,35 +547,39 @@ def initialize_host_cache( cache_dtype = self.cache_config.cache_dtype cache_item_bytes = self.cache_config.get_cache_bytes(cache_dtype) num_layers = self._num_layers + self.config.speculative_config.num_extra_cache_layer + attn_backends = self._normalize_attn_backends(attn_backend) logger.info( f"[CacheController] Initializing swap space (Host cache) for {num_layers} layers " - f"(num_host_blocks={num_host_blocks}, backend={type(attn_backend).__name__}, " + f"(num_host_blocks={num_host_blocks}, backend={type(attn_backends[0]).__name__}, " f"kv_cache_quant_type={kv_cache_quant_type})." ) - try: - host_caches = attn_backend.create_host_kv_cache( - num_layers=num_layers, - num_blocks=num_host_blocks, - cache_item_bytes=cache_item_bytes, - kv_cache_quant_type=kv_cache_quant_type, - ) - except NotImplementedError as e: - logger.warning( - f"[CacheController] Host kv cache offload not supported by " - f"{type(attn_backend).__name__}: {e}. Skipping swap space setup." - ) - return + for layer_idx in range(num_layers): + layer_attn_backend = attn_backends[layer_idx] if layer_idx < len(attn_backends) else attn_backends[-1] + try: + host_caches = layer_attn_backend.create_host_kv_cache( + num_layers=1, + num_blocks=num_host_blocks, + cache_item_bytes=cache_item_bytes, + kv_cache_quant_type=kv_cache_quant_type, + layer_offset=layer_idx, + ) + except NotImplementedError as e: + logger.warning( + f"[CacheController] Host kv cache offload not supported by " + f"{type(layer_attn_backend).__name__}: {e}. Skipping swap space setup." + ) + return - for (role, layer_idx), ptr in host_caches.items(): - name = self._format_cache_name(role, layer_idx) - self.host_cache_kvs_map[name] = ptr + for (role, cache_layer_idx), ptr in host_caches.items(): + name = self._format_cache_name(role, cache_layer_idx) + self.host_cache_kvs_map[name] = ptr logger.info(f"[CacheController] Swap space (Host cache) is ready for {num_layers} layers!") # Preserve the shape/num_blocks bookkeeping that downstream code may read. - key_cache_shape, value_cache_shape = attn_backend.get_kv_cache_shape( + key_cache_shape, value_cache_shape = attn_backends[0].get_kv_cache_shape( max_num_blocks=num_host_blocks, kv_cache_quant_type=kv_cache_quant_type ) self._host_key_cache_shape = [num_host_blocks] + list(key_cache_shape[1:]) diff --git a/fastdeploy/config.py b/fastdeploy/config.py index 1b080680e6f..afc25a8766c 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -259,6 +259,8 @@ def __init__( if not hasattr(self, "head_dim"): self.head_dim = self.hidden_size // self.num_attention_heads + if not hasattr(self, "v_head_dim"): + self.v_head_dim = self.head_dim if hasattr(self, "vision_config"): self.vision_config = PretrainedConfig.from_dict(self.vision_config) diff --git a/fastdeploy/model_executor/forward_meta.py b/fastdeploy/model_executor/forward_meta.py index e32fb1209e1..00bda9e5ab6 100644 --- a/fastdeploy/model_executor/forward_meta.py +++ b/fastdeploy/model_executor/forward_meta.py @@ -79,6 +79,7 @@ class ForwardMeta: # Attention backend object attn_backend: AttentionBackend = None + attn_backends: Optional[list[AttentionBackend]] = None # Forward mode used during attention forward_mode: ForwardMode = ForwardMode.MIXED # Attention mask diff --git a/fastdeploy/model_executor/layers/attention/append_attn_backend.py b/fastdeploy/model_executor/layers/attention/append_attn_backend.py index cf24751a6c5..74238e6fc85 100644 --- a/fastdeploy/model_executor/layers/attention/append_attn_backend.py +++ b/fastdeploy/model_executor/layers/attention/append_attn_backend.py @@ -176,6 +176,8 @@ def __init__( self.num_heads: int = num_heads self.group_size: int = self.num_heads // self.kv_num_heads self.head_dim: int = fd_config.model_config.head_dim + self.v_head_dim: int = getattr(fd_config.model_config, "v_head_dim", self.head_dim) + self.external_norm_rope: bool = True if self.v_head_dim != self.head_dim else False self.num_layers: int = fd_config.model_config.num_hidden_layers # head wise sliding window attention @@ -290,7 +292,9 @@ def get_kv_cache_shape( key_cache_shape = [max_num_blocks, self.kv_num_heads, self.block_size, self.head_dim] if kv_cache_quant_type is not None and kv_cache_quant_type == "int4_zp": key_cache_shape[-1] = self.head_dim // 2 - value_cache_shape = key_cache_shape + value_cache_shape = [max_num_blocks, self.kv_num_heads, self.block_size, self.v_head_dim] + if kv_cache_quant_type is not None and kv_cache_quant_type == "int4_zp": + value_cache_shape[-1] = self.v_head_dim // 2 return key_cache_shape, value_cache_shape def forward_mixed( @@ -308,33 +312,6 @@ def forward_mixed( forward_mixed """ - cache_k = forward_meta.caches[2 * layer.layer_id] - cache_v = forward_meta.caches[2 * layer.layer_id + 1] - - from fastdeploy.model_executor.ops.triton_ops import do_rope, write_cache - - if getattr(layer, "only_do_attn", False): - do_rope( - qkv, - forward_meta.rotary_embs[0], - forward_meta.rotary_embs[1], - forward_meta.cu_seqlens_q, - forward_meta.seq_lens_decoder, - forward_meta.batch_id_per_token, - cache_k, - cache_v, - ) - - write_cache( - qkv, - cache_k, - cache_v, - forward_meta.cu_seqlens_q, - forward_meta.seq_lens_decoder, - forward_meta.batch_id_per_token, - forward_meta.block_tables, - ) - metadata = self.attention_metadata # - PaddleFormers fallback: rope_already_applied=True -> use identity RoPE (cos=1, sin=0) @@ -376,7 +353,8 @@ def forward_mixed( cache_k_scales = getattr(layer, "cache_k_scale", None) cache_v_scales = getattr(layer, "cache_v_scale", None) - if layer.layer_id == 0: + self.num_key_value_heads_list = getattr(self.fd_config.model_config, "num_key_value_heads_list", None) + if layer.layer_id == 0 or self.num_key_value_heads_list is not None: get_block_shape_and_split_kv_block( forward_meta.seq_lens_encoder, forward_meta.seq_lens_decoder, @@ -399,6 +377,45 @@ def forward_mixed( self.block_size, ) + from fastdeploy.model_executor.ops.triton_ops import ( + do_rope, + qk_rmsnorm_fused, + write_cache, + ) + + if self.external_norm_rope: + if q_norm_weight is not None and k_norm_weight is not None: + qk_rmsnorm_fused( + qkv, + q_norm_weight, + k_norm_weight, + getattr(layer, "rms_norm_eps", 1e-6), + layer.num_heads * layer.head_dim, + layer.kv_num_heads * layer.head_dim, + cache_k.shape[-1], + cache_v.shape[-1], + ) + do_rope( + qkv, + forward_meta.rotary_embs[0], + forward_meta.rotary_embs[1], + forward_meta.cu_seqlens_q, + forward_meta.seq_lens_decoder, + forward_meta.batch_id_per_token, + cache_k, + cache_v, + ) + + write_cache( + qkv, + cache_k, + cache_v, + forward_meta.cu_seqlens_q, + forward_meta.seq_lens_decoder, + forward_meta.batch_id_per_token, + forward_meta.block_tables, + ) + if self.use_output: quant_max_bound = getattr(layer, "quant_max_bound", 0.0) cache_quant_type = getattr(layer, "cache_quant_type_str", "none") @@ -547,7 +564,7 @@ def forward_mixed( sliding_window, self.sink_size, self.head_wise_full_hidden if self.head_wise_swa_ratio > 0 else 0, - getattr(layer, "only_do_attn", False), + self.external_norm_rope, # if True is means only_do_attn ) return res @@ -569,7 +586,7 @@ def forward_unitest( cache_k = forward_meta.caches[2 * layer.layer_id] cache_v = forward_meta.caches[2 * layer.layer_id + 1] - head_dim_q = 192 + head_dim_q = 128 head_dim_v = 128 forward_meta.caches[2 * layer.layer_id] = paddle.randn(cache_k.shape[:3] + [head_dim_q]) diff --git a/fastdeploy/model_executor/layers/attention/attention.py b/fastdeploy/model_executor/layers/attention/attention.py index 96897317684..b4fb6b52302 100644 --- a/fastdeploy/model_executor/layers/attention/attention.py +++ b/fastdeploy/model_executor/layers/attention/attention.py @@ -89,11 +89,16 @@ def __init__( fd_config.model_config.num_attention_heads // fd_config.parallel_config.tensor_parallel_size ) self.head_dim: int = fd_config.model_config.head_dim + self.layer_id: int = layer_id + num_key_value_heads = getattr(fd_config.model_config, "num_key_value_heads_list", None) + if num_key_value_heads is None: + num_key_value_heads = fd_config.model_config.num_key_value_heads + else: + num_key_value_heads = num_key_value_heads[self.layer_id] self.kv_num_heads: int = max( 1, - fd_config.model_config.num_key_value_heads // fd_config.parallel_config.tensor_parallel_size, + int(num_key_value_heads) // fd_config.parallel_config.tensor_parallel_size, ) - self.layer_id: int = layer_id self.v_head_dim: int = v_head_dim if v_head_dim > 0 else self.head_dim self.rope_type: str = rope_type self.qk_head_dim: int = self.head_dim @@ -277,7 +282,11 @@ def forward( if forward_meta.layer_done_counter is not None: forward_meta.layer_done_counter.wait_for_layer(self.layer_id) - return forward_meta.attn_backend.forward( + attn_backend = forward_meta.attn_backend + if forward_meta.attn_backends is not None: + attn_backend = forward_meta.attn_backends[self.layer_id] + + return attn_backend.forward( q, k, v, diff --git a/fastdeploy/model_executor/layers/linear.py b/fastdeploy/model_executor/layers/linear.py index 4f4985173b7..43549909837 100644 --- a/fastdeploy/model_executor/layers/linear.py +++ b/fastdeploy/model_executor/layers/linear.py @@ -666,17 +666,18 @@ def __init__( self.kv_num_heads = fd_config.model_config.num_key_value_heads if kv_num_heads is None else kv_num_heads self.hidden_size = fd_config.model_config.hidden_size if hidden_size is None else hidden_size self.head_dim = fd_config.model_config.head_dim if head_dim is None else head_dim + self.v_head_dim = getattr(fd_config.model_config, "v_head_dim", fd_config.model_config.head_dim) self.tp_size = fd_config.parallel_config.tensor_parallel_size self.local_rank = fd_config.parallel_config.tensor_parallel_rank self.num_heads_per_rank = divide(self.num_heads, self.tp_size) - if self.kv_num_heads < self.tp_size and self.tp_size % self.kv_num_heads == 0: + if self.kv_num_heads < self.tp_size: self.kv_num_heads_per_rank = 1 - self.num_kv_head_replicas = divide(self.tp_size, self.kv_num_heads) - output_size = (self.num_heads + 2 * self.tp_size) * self.head_dim + self.num_kv_head_replicas = self.tp_size // self.kv_num_heads + output_size = (self.num_heads + self.tp_size) * self.head_dim + self.tp_size * self.v_head_dim else: self.kv_num_heads_per_rank = divide(self.kv_num_heads, self.tp_size) self.num_kv_head_replicas = 1 - output_size = (self.num_heads + 2 * self.kv_num_heads) * self.head_dim + output_size = (self.num_heads + self.kv_num_heads) * self.head_dim + self.kv_num_heads * self.v_head_dim input_size = self.hidden_size super().__init__( fd_config=fd_config, @@ -689,18 +690,22 @@ def __init__( ) def _get_shard_size_mapping(self, loaded_shard_id: str, head_dim: int): + v_head_dim = getattr(self, "v_head_dim", head_dim) shard_size_mapping = { "q": self.num_heads_per_rank * head_dim, "k": self.kv_num_heads_per_rank * head_dim, - "v": self.kv_num_heads_per_rank * head_dim, + "v": self.kv_num_heads_per_rank * v_head_dim, } return shard_size_mapping.get(loaded_shard_id) + def _get_kv_shard_id(self): + if self.kv_num_heads < self.tp_size: + return self.local_rank * self.kv_num_heads // self.tp_size + return self.local_rank // self.num_kv_head_replicas + def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = None): output_dim = getattr(param, "output_dim", None) assert output_dim is not None - dim = -1 if output_dim else 0 - head_dim = param.shape[dim] // (self.num_heads_per_rank + 2 * self.kv_num_heads_per_rank) weight_need_transpose = getattr(param, "weight_need_transpose", False) if loaded_shard_id is None: if weight_need_transpose: @@ -711,9 +716,9 @@ def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = N # Loaded weight is already fused on disk shard_offsets = [ # (shard_id, shard_offset, shard_size) - ("q", 0, self.num_heads * head_dim), - ("k", self.num_heads * head_dim, self.kv_num_heads * head_dim), - ("v", (self.num_heads + self.kv_num_heads) * head_dim, self.kv_num_heads * head_dim), + ("q", 0, self.num_heads * self.head_dim), + ("k", self.num_heads * self.head_dim, self.kv_num_heads * self.head_dim), + ("v", (self.num_heads + self.kv_num_heads) * self.head_dim, self.kv_num_heads * self.v_head_dim), ] for shard_id, shard_offset, shard_size in shard_offsets: loaded_weight_shard = slice_fn( @@ -728,8 +733,8 @@ def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = N loaded_weight = loaded_weight.transpose([1, 0]) # Tensor parallelism splits the weight along the output_dim if self.tp_size > 1 and not self.fd_config.load_config.is_pre_sharded: - block_size = self._get_shard_size_mapping(loaded_shard_id, head_dim) - shard_id = self.local_rank if loaded_shard_id == "q" else self.local_rank // self.num_kv_head_replicas + block_size = self._get_shard_size_mapping(loaded_shard_id, self.head_dim) + shard_id = self.local_rank if loaded_shard_id == "q" else self._get_kv_shard_id() shard_offset = shard_id * block_size shard_size = block_size loaded_weight = slice_fn(loaded_weight, output_dim, start=shard_offset, end=shard_offset + shard_size) @@ -738,16 +743,15 @@ def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = N param.initialize() if loaded_shard_id == "q": - param_shard_offset = 0 - param_shard_size = self.num_heads_per_rank * head_dim + param_shard_size = self.num_heads_per_rank * self.head_dim elif loaded_shard_id == "k": - param_shard_offset = self.num_heads_per_rank * head_dim - param_shard_size = self.kv_num_heads_per_rank * head_dim + param_shard_offset = self.num_heads_per_rank * self.head_dim + param_shard_size = self.kv_num_heads_per_rank * self.head_dim else: # loaded_shard_id == "v" - param_shard_offset = (self.num_heads_per_rank + self.kv_num_heads_per_rank) * head_dim - param_shard_size = self.kv_num_heads_per_rank * head_dim + param_shard_offset = (self.num_heads_per_rank + self.kv_num_heads_per_rank) * self.head_dim + param_shard_size = self.kv_num_heads_per_rank * self.v_head_dim if hasattr(param, "tensor_track"): param.tensor_track.mark(start=param_shard_offset, end=param_shard_offset + param_shard_size) @@ -783,7 +787,8 @@ def load_weight(self, state_dict: dict): weight_tensor = paddle.concat([q_tensor, k_tensor, v_tensor], axis=-1).transpose([1, 0]) weight_tensor = weight_tensor.reshape( [ - (self.num_heads_per_rank + 2 * self.kv_num_heads_per_rank) * (self.head_dim), + (self.num_heads_per_rank + self.kv_num_heads_per_rank) * self.head_dim + + self.kv_num_heads_per_rank * self.v_head_dim, self.hidden_size, ] ) @@ -1171,6 +1176,7 @@ def __init__( self.kv_num_heads = fd_config.model_config.num_key_value_heads if kv_num_heads is None else kv_num_heads self.hidden_size = fd_config.model_config.hidden_size if hidden_size is None else hidden_size self.head_dim = fd_config.model_config.head_dim if head_dim is None else head_dim + self.v_head_dim = getattr(fd_config.model_config, "v_head_dim", fd_config.model_config.head_dim) self.tp_size = fd_config.parallel_config.tensor_parallel_size self.local_rank = fd_config.parallel_config.tensor_parallel_rank self.num_heads_per_rank = divide(self.num_heads, self.tp_size) @@ -1178,11 +1184,16 @@ def __init__( if self.kv_num_heads < self.tp_size and self.tp_size % self.kv_num_heads == 0: self.kv_num_heads_per_rank = 1 self.num_kv_head_replicas = divide(self.tp_size, self.kv_num_heads) - output_size = (2 * self.num_heads + 2 * self.tp_size) * self.head_dim + output_size = (self.num_heads + self.tp_size) * self.head_dim + ( + self.num_heads + self.tp_size + ) * self.v_head_dim else: self.kv_num_heads_per_rank = divide(self.kv_num_heads, self.tp_size) self.num_kv_head_replicas = 1 - output_size = (2 * self.num_heads + 2 * self.kv_num_heads) * self.head_dim + # qkvg layout: [q (num_heads*head_dim) | k (kv_heads*head_dim) | v (kv_heads*v_head_dim) | gate (num_heads*v_head_dim)] + output_size = (self.num_heads + self.kv_num_heads) * self.head_dim + ( + self.num_heads + self.kv_num_heads + ) * self.v_head_dim input_size = self.hidden_size super().__init__( fd_config=fd_config, @@ -1198,28 +1209,33 @@ def _get_shard_size_mapping(self, loaded_shard_id: str, head_dim: int): shard_size_mapping = { "q": self.num_heads_per_rank * head_dim, "k": self.kv_num_heads_per_rank * head_dim, - "v": self.kv_num_heads_per_rank * head_dim, + "v": self.kv_num_heads_per_rank * self.v_head_dim, } return shard_size_mapping.get(loaded_shard_id) def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = None): assert loaded_shard_id in [ "qkv", + "q", + "k", + "v", "gate", - ], f"loaded_shard_id must be one of ['qkv', 'gate'], but got {loaded_shard_id}" - - if loaded_shard_id == "qkv": - self.qkv_weight_loader(param, loaded_weight, None) - else: + ], f"loaded_shard_id must be one of ['qkv', 'q', 'k', 'v', 'gate'], but got {loaded_shard_id}" + if loaded_shard_id == "gate": self.gate_weight_loader(param, loaded_weight) + elif loaded_shard_id in ("qkv", "q", "k", "v"): + # qkv: 传入的是 fused q+k+v 一次性拆分 + # q/k/v: 单独传入某一头,由 qkv_weight_loader 直接放置到对应 offset + sub_id = None if loaded_shard_id == "qkv" else loaded_shard_id + self.qkv_weight_loader(param, loaded_weight, sub_id) + else: + raise ValueError( + f"loaded_shard_id must be one of ['qkv','q','k','v','gate'], " f"but got {loaded_shard_id}" + ) def qkv_weight_loader(self, param, loaded_weight, loaded_shard_id): output_dim = getattr(param, "output_dim", None) assert output_dim is not None - dim = -1 if output_dim else 0 - - # q_head + gate_head + kv_head - head_dim = param.shape[dim] // (2 * self.num_heads_per_rank + 2 * self.kv_num_heads_per_rank) weight_need_transpose = getattr(param, "weight_need_transpose", False) if loaded_shard_id is None: if weight_need_transpose: @@ -1230,9 +1246,9 @@ def qkv_weight_loader(self, param, loaded_weight, loaded_shard_id): # Loaded weight is already fused on disk shard_offsets = [ # (shard_id, shard_offset, shard_size) - ("q", 0, self.num_heads * head_dim), - ("k", self.num_heads * head_dim, self.kv_num_heads * head_dim), - ("v", (self.num_heads + self.kv_num_heads) * head_dim, self.kv_num_heads * head_dim), + ("q", 0, self.num_heads * self.head_dim), + ("k", self.num_heads * self.head_dim, self.kv_num_heads * self.head_dim), + ("v", (self.num_heads + self.kv_num_heads) * self.head_dim, self.kv_num_heads * self.v_head_dim), ] for shard_id, shard_offset, shard_size in shard_offsets: loaded_weight_shard = slice_fn( @@ -1247,8 +1263,8 @@ def qkv_weight_loader(self, param, loaded_weight, loaded_shard_id): loaded_weight = loaded_weight.transpose([1, 0]) # Tensor parallelism splits the weight along the output_dim if self.tp_size > 1 and output_dim is not None: - block_size = self._get_shard_size_mapping(loaded_shard_id, head_dim) - shard_id = self.local_rank if loaded_shard_id == "q" else self.local_rank // self.num_kv_head_replicas + block_size = self._get_shard_size_mapping(loaded_shard_id, self.head_dim) + shard_id = self.local_rank if loaded_shard_id == "q" else self._get_kv_shard_id() shard_offset = shard_id * block_size shard_size = block_size loaded_weight = slice_fn(loaded_weight, output_dim, start=shard_offset, end=shard_offset + shard_size) @@ -1258,14 +1274,14 @@ def qkv_weight_loader(self, param, loaded_weight, loaded_shard_id): if loaded_shard_id == "q": param_shard_offset = 0 - param_shard_size = self.num_heads_per_rank * head_dim + param_shard_size = self.num_heads_per_rank * self.head_dim elif loaded_shard_id == "k": - param_shard_offset = self.num_heads_per_rank * head_dim - param_shard_size = self.kv_num_heads_per_rank * head_dim + param_shard_offset = self.num_heads_per_rank * self.head_dim + param_shard_size = self.kv_num_heads_per_rank * self.head_dim else: # loaded_shard_id == "v" - param_shard_offset = (self.num_heads_per_rank + self.kv_num_heads_per_rank) * head_dim - param_shard_size = self.kv_num_heads_per_rank * head_dim + param_shard_offset = (self.num_heads_per_rank + self.kv_num_heads_per_rank) * self.head_dim + param_shard_size = self.kv_num_heads_per_rank * self.v_head_dim if hasattr(param, "tensor_track"): param.tensor_track.mark(start=param_shard_offset, end=param_shard_offset + param_shard_size) @@ -1276,9 +1292,6 @@ def qkv_weight_loader(self, param, loaded_weight, loaded_shard_id): def gate_weight_loader(self, param, loaded_weight): output_dim = getattr(param, "output_dim", None) assert output_dim is not None - dim = -1 if output_dim else 0 - # q_head + gate_head + kv_head - head_dim = param.shape[dim] // (2 * self.num_heads_per_rank + 2 * self.kv_num_heads_per_rank) weight_need_transpose = getattr(param, "weight_need_transpose", False) if weight_need_transpose: @@ -1287,7 +1300,7 @@ def gate_weight_loader(self, param, loaded_weight): # Tensor parallelism splits the weight along the output_dim if self.tp_size > 1 and output_dim is not None: - block_size = self.num_heads_per_rank * head_dim + block_size = self.num_heads_per_rank * self.v_head_dim shard_offset = self.local_rank * block_size shard_size = block_size loaded_weight = slice_fn(loaded_weight, output_dim, start=shard_offset, end=shard_offset + shard_size) @@ -1295,8 +1308,10 @@ def gate_weight_loader(self, param, loaded_weight): if not param._is_initialized(): param.initialize() - param_shard_offset = (self.num_heads_per_rank + 2 * self.kv_num_heads_per_rank) * head_dim - param_shard_size = self.num_heads_per_rank * head_dim + param_shard_offset = ( + self.num_heads_per_rank + self.kv_num_heads_per_rank + ) * self.head_dim + self.kv_num_heads_per_rank * self.v_head_dim + param_shard_size = self.num_heads_per_rank * self.v_head_dim if hasattr(param, "tensor_track"): param.tensor_track.mark(start=param_shard_offset, end=param_shard_offset + param_shard_size) diff --git a/fastdeploy/model_executor/models/paddleformers/base.py b/fastdeploy/model_executor/models/paddleformers/base.py index 5eb981d5300..becd6e1f092 100644 --- a/fastdeploy/model_executor/models/paddleformers/base.py +++ b/fastdeploy/model_executor/models/paddleformers/base.py @@ -513,6 +513,7 @@ def _sync_config_from_text_config(self) -> None: "rope_theta", "rope_scaling", "head_dim", + "v_head_dim", "rms_norm_eps", "rope_local_base_freq", # Gemma3 specific "query_pre_attn_scalar", # Gemma3 specific diff --git a/fastdeploy/model_executor/ops/triton_ops/do_rope.py b/fastdeploy/model_executor/ops/triton_ops/do_rope.py index 681afd1d49f..d868d727440 100644 --- a/fastdeploy/model_executor/ops/triton_ops/do_rope.py +++ b/fastdeploy/model_executor/ops/triton_ops/do_rope.py @@ -73,9 +73,9 @@ def do_rope( head_dim_k = cache_k.shape[-1] num_kv_heads = cache_k.shape[1] - head_dim_v = cache_k.shape[-1] + head_dim_v = cache_v.shape[-1] qkv_size = qkv_out.shape[-1] - num_q_heads = (qkv_size - head_dim_v * num_kv_heads) // head_dim_k - num_kv_heads + num_q_heads = (qkv_size - (head_dim_k + head_dim_v) * num_kv_heads) // head_dim_k M = qkv_out.shape[0] grid = (M,) diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index b787ee54505..004fee8dedd 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -242,10 +242,7 @@ def __init__( ) # Initialize attention Backend - # NOTE(gonshaotian): Currently, all attention layers share one attention backend instance. - # In the future, we will expand it as a list. self.attn_backends: list[AttentionBackend] = [] - # self.attn_metadatas: list[AttentionMetadata] = [] self._initialize_attn_backend() # Forward meta store the global meta information of the forward @@ -1468,6 +1465,7 @@ def initialize_forward_meta(self, is_dummy_or_profile_run=False): ids_remove_padding=self.share_inputs["ids_remove_padding"], rotary_embs=self.share_inputs["rope_emb"], attn_backend=self.attn_backends[0], + attn_backends=self.attn_backends, decoder_batch_ids=self.share_inputs["decoder_batch_ids"], decoder_tile_ids_per_batch=self.share_inputs["decoder_tile_ids_per_batch"], decoder_num_blocks_cpu=self.share_inputs["decoder_num_blocks_cpu"], @@ -1560,7 +1558,7 @@ def initialize_kv_cache(self, profile: bool = False) -> None: """ if self.enable_cache_manager_v1: self.share_inputs["caches"] = self.cache_controller.initialize_kv_cache( - attn_backend=self.attn_backends[0], + attn_backend=self.attn_backends, num_gpu_blocks=self.num_gpu_blocks, ) self.cache_kvs_map = self.cache_controller.get_kv_caches() @@ -1597,17 +1595,18 @@ def initialize_kv_cache(self, profile: bool = False) -> None: kv_cache_quant_type = "uint8" cache_type = "uint8" - # NOTE(changwenbin) Get dsa cache shape. - key_cache_shape, value_cache_shape, indexer_cache_shape = self.attn_backends[0].get_kv_cache_shape( - max_num_blocks=max_block_num, kv_cache_quant_type=kv_cache_quant_type - ) - else: - key_cache_shape, value_cache_shape = self.attn_backends[0].get_kv_cache_shape( + key_cache_shapes = [] + value_cache_shapes = [] + indexer_cache_shapes = [] + for attn_backend in self.attn_backends: + kv_cache_shape = attn_backend.get_kv_cache_shape( max_num_blocks=max_block_num, kv_cache_quant_type=kv_cache_quant_type ) - indexer_cache_shape = [] + key_cache_shapes.append(kv_cache_shape[0]) + value_cache_shapes.append(kv_cache_shape[1]) + indexer_cache_shapes.append(kv_cache_shape[2] if self.dsa_cache else []) if kv_cache_quant_type == "block_wise_fp8": - kv_cache_scale_shape = [key_cache_shape[0], key_cache_shape[1], key_cache_shape[2]] + kv_cache_scale_shapes = [[shape[0], shape[1], shape[2]] for shape in key_cache_shapes] local_rank = self.local_rank % self.parallel_config.tensor_parallel_size # Check if gpu runner needs to create kv cache @@ -1632,6 +1631,11 @@ def initialize_kv_cache(self, profile: bool = False) -> None: cache_kvs_list = [] for i in range(self.model_config.num_hidden_layers): + key_cache_shape = key_cache_shapes[i] + value_cache_shape = value_cache_shapes[i] + indexer_cache_shape = indexer_cache_shapes[i] + kv_cache_scale_shape = kv_cache_scale_shapes[i] if kv_cache_quant_type == "block_wise_fp8" else None + # init key cache key_cache_name = f"key_caches_{i}_rank{local_rank}.device{self.device_id}" key_cache_scales_name = f"key_cache_scales_{i}_rank{local_rank}.device{self.device_id}" @@ -1727,10 +1731,8 @@ def _initialize_attn_backend(self) -> None: ), f"attn_backends should be empty before initialization, got {len(self.attn_backends)} backends" num_heads = self.model_config.num_attention_heads // self.parallel_config.tensor_parallel_size - self.model_config.kv_num_heads = max( - 1, - int(self.model_config.num_key_value_heads) // self.parallel_config.tensor_parallel_size, - ) + kv_num_heads_per_layer = self._get_kv_num_heads_per_layer() + self.model_config.kv_num_heads = kv_num_heads_per_layer[0] head_dim = self.model_config.head_dim encoder_block_shape_q = 64 @@ -1747,7 +1749,7 @@ def _initialize_attn_backend(self) -> None: decoder_block_shape_q=decoder_block_shape_q, decoder_step_token_num=self.speculative_config.num_speculative_tokens + 1, num_heads=num_heads, - kv_num_heads=self.model_config.kv_num_heads, + kv_num_heads=max(kv_num_heads_per_layer), block_size=self.fd_config.cache_config.block_size, head_dim=head_dim, dtype=self.model_config.dtype, @@ -1766,16 +1768,37 @@ def _initialize_attn_backend(self) -> None: # Get the attention backend attn_cls = get_attention_backend() - attn_backend = attn_cls( - self.fd_config, - kv_num_heads=self.model_config.kv_num_heads, - num_heads=num_heads, - head_dim=head_dim, - encoder_block_shape_q=encoder_block_shape_q, - decoder_block_shape_q=decoder_block_shape_q, - ) + for kv_num_heads in kv_num_heads_per_layer: + attn_backend = attn_cls( + self.fd_config, + kv_num_heads=kv_num_heads, + num_heads=num_heads, + head_dim=head_dim, + encoder_block_shape_q=encoder_block_shape_q, + decoder_block_shape_q=decoder_block_shape_q, + ) + self.attn_backends.append(attn_backend) + + def _get_kv_num_heads_per_layer(self) -> list[int]: + num_hidden_layers = self.model_config.num_hidden_layers + num_key_value_heads = getattr(self.model_config, "num_key_value_heads_list", None) + if num_key_value_heads is None: + kv_num_heads = max( + 1, + int(self.model_config.num_key_value_heads) // self.parallel_config.tensor_parallel_size, + ) + return [kv_num_heads] * num_hidden_layers - self.attn_backends.append(attn_backend) + if len(num_key_value_heads) != num_hidden_layers: + raise ValueError( + f"num_key_value_heads_list length {len(num_key_value_heads)} " + f"does not match num_hidden_layers {num_hidden_layers}" + ) + + return [ + max(1, int(layer_num_key_value_heads) // self.parallel_config.tensor_parallel_size) + for layer_num_key_value_heads in num_key_value_heads + ] def _dummy_pooler_run_task( self, @@ -3003,7 +3026,9 @@ def cal_theortical_kvcache(self): else: # default byte_of_dtype = 2 - hidden_dim = self.model_config.head_dim * self.model_config.kv_num_heads + kv_num_heads_per_layer = self._get_kv_num_heads_per_layer() + v_head_dim = getattr(self.model_config, "v_head_dim", self.model_config.head_dim) + kv_hidden_dim = (self.model_config.head_dim + v_head_dim) * sum(kv_num_heads_per_layer) # NOTE(liuzichang): Implement multi-layer MTP architecture in the future num_layers = ( self.model_config.num_hidden_layers + self.speculative_config.num_gpu_block_expand_ratio @@ -3035,7 +3060,13 @@ def cal_theortical_kvcache(self): * num_layers ) else: - required_memory = byte_of_dtype * 2 * (self.cache_config.block_size * hidden_dim) * num_layers # k + v + if self.spec_method == SpecMethod.MTP: + kv_hidden_dim += ( + (self.model_config.head_dim + v_head_dim) + * self.model_config.kv_num_heads + * self.speculative_config.num_gpu_block_expand_ratio + ) + required_memory = byte_of_dtype * self.cache_config.block_size * kv_hidden_dim # k + v return required_memory def clear_cache(self, profile=False): diff --git a/tests/layers/test_attention_layer.py b/tests/layers/test_attention_layer.py index 8cc635514d6..57145d8e85f 100644 --- a/tests/layers/test_attention_layer.py +++ b/tests/layers/test_attention_layer.py @@ -325,7 +325,7 @@ def test_append_attn_backend_decode_performance_with_prefill(self): state_dict = self.create_random_attention_state_dict(fd_config, prefix="test_layer") attention_layer[i].load_state_dict(state_dict) - attention_layer[i].attn.only_do_attn = True + attention_layer[i].attn.external_norm_rope = True cache_quant_type_str = getattr(attention_layer[0].attn, "cache_quant_type_str", "none") diff --git a/tests/model_executor/test_linear.py b/tests/model_executor/test_linear.py index 2bf08d3ddab..7833dcc18ff 100644 --- a/tests/model_executor/test_linear.py +++ b/tests/model_executor/test_linear.py @@ -287,6 +287,8 @@ def test_qkv_paths(): num_heads_per_rank=2, kv_num_heads_per_rank=1, num_kv_head_replicas=2, + head_dim=2, + v_head_dim=2, tp_size=2, local_rank=0, fd_config=cfg_tp2,