Add lightweight TGS and MFU logging for training#9465
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces tracking and logging for Tokens per GPU per Second (TGS), throughput (TFLOP/s/GPU), and Model Flops Utilization (MFU) across Megatron and Hugging Face trainers, utilizing a user-specified DEVICE_TFLOPS environment variable. The code review feedback highlights several critical robustness issues where missing safety checks on potentially None or 0 values for world_size, seq_length, and logs could lead to TypeError or ZeroDivisionError crashes during training. Actionable code suggestions were provided to defensively handle these cases and prevent runtime failures.
| train_speed = elapsed / n_steps if n_steps > 0 else 0.0 | ||
| seq_length = getattr(args, 'seq_length', None) if args is not None else None | ||
| if train_speed > 0 and seq_length: | ||
| world_size = max(getattr(args, 'world_size', 1), 1) |
There was a problem hiding this comment.
If args.world_size is explicitly set to None, getattr(args, 'world_size', 1) will return None. Passing None to max() along with an integer raises a TypeError in Python 3 (e.g., TypeError: '>' not supported between instances of 'int' and 'NoneType'), which would crash the training process. Using a fallback to 1 before calling max avoids this issue.
| world_size = max(getattr(args, 'world_size', 1), 1) | |
| world_size = max(getattr(args, 'world_size', None) or 1, 1) |
| def on_init_end(self, args: TrainingArguments, state: TrainerState, control: TrainerControl, **kwargs): | ||
| tflops = get_env_args('DEVICE_TFLOPS', float, None) | ||
| if tflops is not None: | ||
| self.device_tflops = tflops * max(getattr(args, 'world_size', 1), 1) |
There was a problem hiding this comment.
If args.world_size is None, max(getattr(args, 'world_size', 1), 1) will raise a TypeError and crash the training. Using a safe fallback to 1 prevents this crash.
| self.device_tflops = tflops * max(getattr(args, 'world_size', 1), 1) | |
| self.device_tflops = tflops * max(getattr(args, 'world_size', None) or 1, 1) |
| if train_speed > 0: | ||
| logs['tgs'] = round( | ||
| args.global_batch_size * args.seq_length / train_speed / args.world_size, 3) | ||
| if self.device_peak_tflops: | ||
| throughput = self._get_throughput_tflops_per_gpu(train_speed) | ||
| if throughput is not None: | ||
| logs['throughput(TFLOP/s/GPU)'] = round(throughput, 3) | ||
| logs['MFU'] = round(throughput / self.device_peak_tflops, 6) |
There was a problem hiding this comment.
If args.seq_length is None or args.world_size is None or 0, this calculation will raise a TypeError or ZeroDivisionError, crashing the training run. We should defensively check that seq_length is present and that world_size is a valid non-zero integer before performing the division.
| if train_speed > 0: | |
| logs['tgs'] = round( | |
| args.global_batch_size * args.seq_length / train_speed / args.world_size, 3) | |
| if self.device_peak_tflops: | |
| throughput = self._get_throughput_tflops_per_gpu(train_speed) | |
| if throughput is not None: | |
| logs['throughput(TFLOP/s/GPU)'] = round(throughput, 3) | |
| logs['MFU'] = round(throughput / self.device_peak_tflops, 6) | |
| seq_length = getattr(args, 'seq_length', None) | |
| world_size = getattr(args, 'world_size', None) or 1 | |
| if train_speed > 0 and seq_length: | |
| logs['tgs'] = round( | |
| args.global_batch_size * seq_length / train_speed / world_size, 3) | |
| if self.device_peak_tflops: | |
| throughput = self._get_throughput_tflops_per_gpu(train_speed) | |
| if throughput is not None: | |
| logs['throughput(TFLOP/s/GPU)'] = round(throughput, 3) | |
| logs['MFU'] = round(throughput / self.device_peak_tflops, 6) |
| def _get_throughput_tflops_per_gpu(self, train_speed): | ||
| if train_speed <= 0: | ||
| return None | ||
| num_flops = self._num_floating_point_operations(self.args.global_batch_size) | ||
| if num_flops is None: | ||
| return None | ||
| return num_flops / (train_speed * 10**12 * self.args.world_size) |
There was a problem hiding this comment.
If self.args.world_size is None or 0, this will raise a TypeError or ZeroDivisionError. Safely retrieving world_size with a fallback to 1 prevents potential runtime crashes.
| def _get_throughput_tflops_per_gpu(self, train_speed): | |
| if train_speed <= 0: | |
| return None | |
| num_flops = self._num_floating_point_operations(self.args.global_batch_size) | |
| if num_flops is None: | |
| return None | |
| return num_flops / (train_speed * 10**12 * self.args.world_size) | |
| def _get_throughput_tflops_per_gpu(self, train_speed): | |
| if train_speed <= 0: | |
| return None | |
| world_size = getattr(self.args, 'world_size', None) or 1 | |
| num_flops = self._num_floating_point_operations(self.args.global_batch_size) | |
| if num_flops is None: | |
| return None | |
| return num_flops / (train_speed * 10**12 * world_size) |
| def _num_floating_point_operations(self, batch_size): | ||
| config = self.trainer.config | ||
| hidden_size = getattr(config, 'hidden_size', None) | ||
| num_layers = getattr(config, 'num_layers', None) | ||
| num_attention_heads = getattr(config, 'num_attention_heads', None) | ||
| ffn_hidden_size = getattr(config, 'ffn_hidden_size', None) | ||
| if None in {hidden_size, num_layers, num_attention_heads, ffn_hidden_size}: | ||
| return None | ||
|
|
||
| kv_channels = getattr(config, 'kv_channels', None) or hidden_size // num_attention_heads | ||
| num_query_groups = getattr(config, 'num_query_groups', None) or num_attention_heads | ||
| padded_vocab_size = getattr(config, 'padded_vocab_size', None) or getattr(config, 'vocab_size', None) | ||
| if padded_vocab_size is None: | ||
| return None | ||
|
|
||
| query_projection_size = kv_channels * num_attention_heads | ||
| query_projection_to_hidden_size_ratio = query_projection_size / hidden_size | ||
| num_experts_routed_to = 1 if getattr(config, 'num_moe_experts', None) is None else getattr( | ||
| config, 'moe_router_topk', 1) | ||
| gated_linear_multiplier = 1.5 if getattr(config, 'swiglu', False) else 1.0 | ||
|
|
||
| return ( | ||
| 12 * batch_size * self.args.seq_length * num_layers * hidden_size * hidden_size * ( | ||
| (1 + (num_query_groups / num_attention_heads) + (self.args.seq_length / hidden_size)) | ||
| * query_projection_to_hidden_size_ratio | ||
| + (ffn_hidden_size / hidden_size) * num_experts_routed_to * gated_linear_multiplier | ||
| + padded_vocab_size / (2 * num_layers * hidden_size))) |
There was a problem hiding this comment.
If self.args.seq_length is None, this calculation will raise a TypeError. Retrieving seq_length defensively and returning None early if it is not set prevents this issue.
def _num_floating_point_operations(self, batch_size):
seq_length = getattr(self.args, 'seq_length', None)
if seq_length is None:
return None
config = self.trainer.config
hidden_size = getattr(config, 'hidden_size', None)
num_layers = getattr(config, 'num_layers', None)
num_attention_heads = getattr(config, 'num_attention_heads', None)
ffn_hidden_size = getattr(config, 'ffn_hidden_size', None)
if None in {hidden_size, num_layers, num_attention_heads, ffn_hidden_size}:
return None
kv_channels = getattr(config, 'kv_channels', None) or hidden_size // num_attention_heads
num_query_groups = getattr(config, 'num_query_groups', None) or num_attention_heads
padded_vocab_size = getattr(config, 'padded_vocab_size', None) or getattr(config, 'vocab_size', None)
if padded_vocab_size is None:
return None
query_projection_size = kv_channels * num_attention_heads
query_projection_to_hidden_size_ratio = query_projection_size / hidden_size
num_experts_routed_to = 1 if getattr(config, 'num_moe_experts', None) is None else getattr(
config, 'moe_router_topk', 1)
gated_linear_multiplier = 1.5 if getattr(config, 'swiglu', False) else 1.0
return (
12 * batch_size * seq_length * num_layers * hidden_size * hidden_size * (
(1 + (num_query_groups / num_attention_heads) + (seq_length / hidden_size))
* query_projection_to_hidden_size_ratio
+ (ffn_hidden_size / hidden_size) * num_experts_routed_to * gated_linear_multiplier
+ padded_vocab_size / (2 * num_layers * hidden_size)))|
|
||
|
|
||
| def add_train_message(logs, state, start_time, start_step) -> None: | ||
| def add_train_message(logs, state, start_time, start_step, args=None) -> None: |
There was a problem hiding this comment.
If logs is None, attempting to set keys on it will raise a TypeError. Adding an early return when logs is None prevents potential runtime crashes.
| def add_train_message(logs, state, start_time, start_step, args=None) -> None: | |
| def add_train_message(logs, state, start_time, start_step, args=None) -> None: | |
| if logs is None: | |
| return |
| def on_log(self, args: TrainingArguments, state: TrainerState, control, logs=None, **kwargs): | ||
| total_flos = getattr(state, 'total_flos', 0) - self.start_flos | ||
| if self.elapsed > 0 and self.device_tflops and total_flos > 0: | ||
| logs['MFU'] = round(total_flos / self.elapsed / (self.device_tflops * 1e12), 6) | ||
| return super().on_log(args, state, control, logs, **kwargs) |
There was a problem hiding this comment.
If logs is None, attempting to assign logs['MFU'] will raise a TypeError. Checking that logs is not None before assigning makes the callback robust.
| def on_log(self, args: TrainingArguments, state: TrainerState, control, logs=None, **kwargs): | |
| total_flos = getattr(state, 'total_flos', 0) - self.start_flos | |
| if self.elapsed > 0 and self.device_tflops and total_flos > 0: | |
| logs['MFU'] = round(total_flos / self.elapsed / (self.device_tflops * 1e12), 6) | |
| return super().on_log(args, state, control, logs, **kwargs) | |
| def on_log(self, args: TrainingArguments, state: TrainerState, control, logs=None, **kwargs): | |
| total_flos = getattr(state, 'total_flos', 0) - self.start_flos | |
| if self.elapsed > 0 and self.device_tflops and total_flos > 0 and logs is not None: | |
| logs['MFU'] = round(total_flos / self.elapsed / (self.device_tflops * 1e12), 6) | |
| return super().on_log(args, state, control, logs, **kwargs) |
PR type
PR information
HF swift sft:
Megatron sft:
PR explanation:
Although
PerfMetricsLogCallbackalready provides MFU logging, this change intentionally keeps the lightweight progress-log path separate. The default progress callback is responsible for writing the console progress output andlogging.jsonl; user callbacks such asperf_logmay run after the progress log has already been emitted, so adding MFU there does not reliably make the metric appear in the same training log entry.This implementation only enables MFU when
DEVICE_TFLOPSis explicitly provided, avoiding the extra device benchmark inperf_log.py. It also keeps the change limited to log formatting/callback code and does not patch the training step or FLOPs calculation logic.