From ff4ae12e1f5e6c9394463fd8396cfaefacfa49c7 Mon Sep 17 00:00:00 2001 From: Joel Lamy-Poirier Date: Mon, 19 Jul 2021 16:14:29 -0400 Subject: [PATCH 01/13] stuff --- megatron/__init__.py | 20 ++++---- megatron/fused_kernels/__init__.py | 2 +- megatron/initialize.py | 63 ++++++++++++++----------- megatron/tokenizer/gpt2_tokenization.py | 5 +- megatron/training.py | 7 ++- megatron/utils.py | 7 ++- requirements.txt | 2 + setup.py | 2 + 8 files changed, 67 insertions(+), 41 deletions(-) diff --git a/megatron/__init__.py b/megatron/__init__.py index 09858d3b7c9..f670e652aac 100644 --- a/megatron/__init__.py +++ b/megatron/__init__.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import torch +import os from .package_info import ( __description__, @@ -25,15 +26,16 @@ __version__, ) -from .global_vars import get_args -from .global_vars import get_current_global_batch_size -from .global_vars import get_num_microbatches -from .global_vars import update_num_microbatches -from .global_vars import get_tokenizer -from .global_vars import get_tensorboard_writer -from .global_vars import get_adlr_autoresume -from .global_vars import get_timers -from .initialize import initialize_megatron +if "MEGATRON_SETUP" not in os.environ: + from .global_vars import get_args + from .global_vars import get_current_global_batch_size + from .global_vars import get_num_microbatches + from .global_vars import update_num_microbatches + from .global_vars import get_tokenizer + from .global_vars import get_tensorboard_writer + from .global_vars import get_adlr_autoresume + from .global_vars import get_timers + from .initialize import initialize_megatron def print_rank_0(message): """If distributed is initialized, print only on rank 0.""" diff --git a/megatron/fused_kernels/__init__.py b/megatron/fused_kernels/__init__.py index f5b67fcae77..2810a6bf7fb 100644 --- a/megatron/fused_kernels/__init__.py +++ b/megatron/fused_kernels/__init__.py @@ -24,7 +24,7 @@ # leading to recompilation of fused kernels. Set it to empty string # to avoid recompilation and assign arch flags explicity in # extra_cuda_cflags below -os.environ["TORCH_CUDA_ARCH_LIST"] = "" +#os.environ["TORCH_CUDA_ARCH_LIST"] = "" def load(args): diff --git a/megatron/initialize.py b/megatron/initialize.py index 11c996e0c6a..9fe64d7e22f 100644 --- a/megatron/initialize.py +++ b/megatron/initialize.py @@ -30,7 +30,8 @@ from megatron.global_vars import set_global_variables from megatron.mpu import (set_tensor_model_parallel_rank, set_tensor_model_parallel_world_size) - +import logging +logger = logging.getLogger(__name__) def initialize_megatron(extra_args_provider=None, args_defaults={}, ignore_unknown_args=False, allow_no_cuda=False): @@ -60,7 +61,7 @@ def finish_mpu_init(): # Random seeds for reproducibility. if args.rank == 0: - print('> setting random seeds to {} ...'.format(args.seed)) + logger.info('> setting random seeds to {} ...'.format(args.seed)) _set_random_seed(args.seed) args = get_args() @@ -100,11 +101,11 @@ def _compile_dependencies(): # TODO: move this to ninja if torch.distributed.get_rank() == 0: start_time = time.time() - print('> compiling dataset index builder ...') + logger.info('> compiling dataset index builder ...') from megatron.data.dataset_utils import compile_helper compile_helper() - print('>>> done with dataset index builder. Compilation time: {:.3f} ' - 'seconds'.format(time.time() - start_time), flush=True) + logger.info('>>> done with dataset index builder. Compilation time: {:.3f} ' + 'seconds'.format(time.time() - start_time)) # ================== # Load fused kernels @@ -123,29 +124,37 @@ def _compile_dependencies(): if not ((args.fp16 or args.bf16) and custom_kernel_constraint and args.masked_softmax_fusion): - if args.rank == 0: - print('WARNING: constraints for invoking optimized' - ' fused softmax kernel are not met. We default' - ' back to unfused kernel invocations.', flush=True) - + logger.warning('constraints for invoking optimized' + ' fused softmax kernel are not met. We default' + ' back to unfused kernel invocations.') + + start_time = time.time() + logger.info(f'> compiling and loading fused kernels ...') + fused_kernels.load(args) + logger.info("Compile done, waiting for others to compile") + torch.distributed.barrier() # Always build on rank zero first. - if torch.distributed.get_rank() == 0: - start_time = time.time() - print('> compiling and loading fused kernels ...', flush=True) - fused_kernels.load(args) - torch.distributed.barrier() - else: - torch.distributed.barrier() - fused_kernels.load(args) + # if torch.distributed.get_rank() == 0: + # start_time = time.time() + # logger.info(f'> compiling and loading fused kernels ... (rank = {torch.distributed.get_rank()})') + # fused_kernels.load(args) + # logger.info("Compile done, waiting on barrier") + # torch.distributed.barrier() + # logger.info("Barrier done, waiting for others to compile") + # else: + # logger.info("Waiting for rank 0 to compile") + # torch.distributed.barrier() + # logger.info(f'> compiling and loading fused kernels ... (rank = {torch.distributed.get_rank()})') + # fused_kernels.load(args) + # logger.info("Compile done, waiting on barrier") # Simple barrier to make sure all ranks have passed the # compilation phase successfully before moving on to the # rest of the program. We think this might ensure that # the lock is released. - torch.distributed.barrier() - if torch.distributed.get_rank() == 0: - print('>>> done with compiling and loading fused kernels. ' - 'Compilation time: {:.3f} seconds'.format( - time.time() - start_time), flush=True) + #torch.distributed.barrier() + logger.info('>>> done with compiling and loading fused kernels. ' + 'Compilation time: {:.3f} seconds'.format( + time.time() - start_time)) @@ -157,15 +166,15 @@ def _initialize_distributed(): if torch.distributed.is_initialized(): if args.rank == 0: - print('torch distributed is already initialized, ' - 'skipping initialization ...', flush=True) + logger.info('torch distributed is already initialized, ' + 'skipping initialization ...') args.rank = torch.distributed.get_rank() args.world_size = torch.distributed.get_world_size() else: if args.rank == 0: - print('> initializing torch distributed ...', flush=True) + logger.info('> initializing torch distributed ...') # Manually set the device ids. if device_count > 0: device = args.rank % device_count @@ -189,7 +198,7 @@ def _initialize_distributed(): # data-parallel communicators. if device_count > 0: if mpu.model_parallel_is_initialized(): - print('model parallel is already initialized') + logger.info('model parallel is already initialized') else: mpu.initialize_model_parallel(args.tensor_model_parallel_size, args.pipeline_model_parallel_size, diff --git a/megatron/tokenizer/gpt2_tokenization.py b/megatron/tokenizer/gpt2_tokenization.py index 3f37e449089..fb6b44de379 100644 --- a/megatron/tokenizer/gpt2_tokenization.py +++ b/megatron/tokenizer/gpt2_tokenization.py @@ -22,7 +22,10 @@ import json import logging import os -import regex as re +try: + import regex as re +except ImportError: + pass from io import open try: diff --git a/megatron/training.py b/megatron/training.py index 72a430ecc6b..62ed60c1238 100644 --- a/megatron/training.py +++ b/megatron/training.py @@ -51,7 +51,9 @@ from megatron.schedules import forward_backward_pipelining_without_interleaving from megatron.schedules import forward_backward_pipelining_with_interleaving from megatron.utils import report_memory +import logging +logger = logging.getLogger(__name__) def print_datetime(string): @@ -91,6 +93,7 @@ def pretrain(train_valid_test_dataset_provider, """ # Initalize and get arguments, timers, and Tensorboard writer. + logger.info("Initializing megatron") initialize_megatron(extra_args_provider=extra_args_provider, args_defaults=args_defaults) @@ -102,7 +105,7 @@ def pretrain(train_valid_test_dataset_provider, torch.distributed.all_reduce(start_time_tensor, op=torch.distributed.ReduceOp.MIN) _TRAIN_START_TIME = start_time_tensor.item() - print_rank_0('time to initialize megatron (seconds): {:.3f}'.format( + logger.info('time to initialize megatron (seconds): {:.3f}'.format( time.time() - _TRAIN_START_TIME)) print_datetime('after megatron is initialized') @@ -829,8 +832,10 @@ def build_train_valid_test_data_iterators( # Need to broadcast num_tokens and num_type_tokens. flags = torch.cuda.LongTensor( [int(do_train), int(do_valid), int(do_test)]) + logger.info("Broadcasting data iterator") else: flags = torch.cuda.LongTensor([0, 0, 0]) + logger.info("Waiting for data iterator") # Broadcast num tokens. torch.distributed.broadcast(flags, diff --git a/megatron/utils.py b/megatron/utils.py index 6289c405ac7..2714a52c593 100644 --- a/megatron/utils.py +++ b/megatron/utils.py @@ -20,8 +20,11 @@ import torch from torch.nn.parallel import DistributedDataParallel as torchDDP -from apex.multi_tensor_apply import multi_tensor_applier -import amp_C +try: + from apex.multi_tensor_apply import multi_tensor_applier + import amp_C +except ImportError: + pass from megatron import get_args from megatron import print_rank_0 diff --git a/requirements.txt b/requirements.txt index 1f7389c3e8f..ec661c9cc75 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,5 @@ torch six regex numpy +ninja +nltk \ No newline at end of file diff --git a/setup.py b/setup.py index 668a6b6ccea..9cd8161b787 100644 --- a/setup.py +++ b/setup.py @@ -22,6 +22,8 @@ if sys.version_info < (3,): raise Exception("Python 2 is not supported by Megatron.") +os.environ["MEGATRON_SETUP"]="TRUE" + from megatron.package_info import ( __description__, __contact_names__, From 7a590318a81536a2c6272722b2368090848b3801 Mon Sep 17 00:00:00 2001 From: Joel Lamy-Poirier Date: Mon, 23 Aug 2021 21:46:17 -0400 Subject: [PATCH 02/13] stuff --- megatron/data/bert_dataset.py | 2 +- megatron/data/indexed_dataset.py | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/megatron/data/bert_dataset.py b/megatron/data/bert_dataset.py index 916a3be0657..b2c9a7893d1 100644 --- a/megatron/data/bert_dataset.py +++ b/megatron/data/bert_dataset.py @@ -118,7 +118,7 @@ def build_training_sample(sample, if binary_head: # We assume that we have at least two sentences in the sample assert len(sample) > 1 - assert target_seq_length <= max_seq_length + assert target_seq_length <= max_seq_length-2 # Divide sample into two segments (A and B). if binary_head: diff --git a/megatron/data/indexed_dataset.py b/megatron/data/indexed_dataset.py index 1251066232e..3bae8f1a6fb 100644 --- a/megatron/data/indexed_dataset.py +++ b/megatron/data/indexed_dataset.py @@ -18,7 +18,7 @@ import numpy as np import torch -from megatron import print_rank_0 +#from megatron import print_rank_0 def __best_fitting_dtype(vocab_size=None): @@ -401,21 +401,21 @@ def __init__(self, path, skip_warmup=False): offset = stream.tell() if not skip_warmup: - print_rank_0(" warming up index mmap file...") + #print_rank_0(" warming up index mmap file...") _warmup_mmap_file(path) self._bin_buffer_mmap = np.memmap(path, mode='r', order='C') self._bin_buffer = memoryview(self._bin_buffer_mmap) - print_rank_0(" reading sizes...") + #print_rank_0(" reading sizes...") self._sizes = np.frombuffer( self._bin_buffer, dtype=np.int32, count=self._len, offset=offset) - print_rank_0(" reading pointers...") + #print_rank_0(" reading pointers...") self._pointers = np.frombuffer(self._bin_buffer, dtype=np.int64, count=self._len, offset=offset + self._sizes.nbytes) - print_rank_0(" reading document index...") + #print_rank_0(" reading document index...") self._doc_idx = np.frombuffer(self._bin_buffer, dtype=np.int64, count=self._doc_count, offset=offset + self._sizes.nbytes + self._pointers.nbytes) @@ -462,11 +462,11 @@ def _do_init(self, path, skip_warmup): self._index = self.Index(index_file_path(self._path), skip_warmup) if not skip_warmup: - print_rank_0(" warming up data mmap file...") + #print_rank_0(" warming up data mmap file...") _warmup_mmap_file(data_file_path(self._path)) - print_rank_0(" creating numpy buffer of mmap...") + #print_rank_0(" creating numpy buffer of mmap...") self._bin_buffer_mmap = np.memmap(data_file_path(self._path), mode='r', order='C') - print_rank_0(" creating memory view of numpy buffer...") + #print_rank_0(" creating memory view of numpy buffer...") self._bin_buffer = memoryview(self._bin_buffer_mmap) def __del__(self): From 8176d9cb2d67e537cb9bb9f5261eb404b6df184b Mon Sep 17 00:00:00 2001 From: Amine Elhattami Date: Tue, 19 Oct 2021 16:20:57 -0400 Subject: [PATCH 03/13] Fixed Blender dataset and added verbose message --- megatron/data/dataset_utils.py | 2 +- megatron/data/helpers.cpp | 19 +++++++++++++++---- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/megatron/data/dataset_utils.py b/megatron/data/dataset_utils.py index 3052e9fdd31..fbc33852959 100644 --- a/megatron/data/dataset_utils.py +++ b/megatron/data/dataset_utils.py @@ -446,7 +446,7 @@ def build_train_valid_test_datasets(data_prefix, data_impl, splits_string, prefixes[i], data_impl, splits_string, datasets_train_valid_test_num_samples[i], max_seq_length, masked_lm_prob, short_seq_prob, - seed, skip_warmup, binary_head, dataset_type=dataset_type) + seed, skip_warmup, binary_head,max_seq_length_dec, dataset_type=dataset_type) if train_ds: train_datasets.append(train_ds) if valid_ds: diff --git a/megatron/data/helpers.cpp b/megatron/data/helpers.cpp index e45926a9769..512beaff3fa 100644 --- a/megatron/data/helpers.cpp +++ b/megatron/data/helpers.cpp @@ -26,6 +26,8 @@ #include #include #include +#include +#include namespace py = pybind11; using namespace std; @@ -283,12 +285,20 @@ py::array build_mapping_impl(const py::array_t& docs_, // For each epoch: for (int32_t epoch=0; epoch= max_num_samples) { - if (verbose && (!second)) { - cout << " reached " << max_num_samples << " samples after " - << epoch << " epochs ..." << endl << std::flush; - } + if (verbose && (!second)) { + cout << " reached " << max_num_samples << " samples after " + << epoch << " epochs ..." << endl << std::flush; + } break; } + + if (verbose && map_index % (max_num_samples / 100) == 0) { + auto t = std::time(nullptr); + auto tm = *std::localtime(&t); + cout << " reached " << map_index << "/" << max_num_samples << " samples after " + << epoch << " epochs ..." << std::put_time(&tm, "%d-%m-%Y %H-%M-%S") << endl << std::flush; + } + // For each document: for (int32_t doc=0; doc<(docs.shape(0) - 1); ++doc) { @@ -541,6 +551,7 @@ py::array build_blocks_mapping_impl(const py::array_t& docs_, // assign every block a unique id int32_t block_id = 0; + if (map_index >= max_num_samples) { if (verbose && (!second)) { cout << " reached " << max_num_samples << " samples after " From cd282e36d590710bc9c3054bbebd05490e001f3e Mon Sep 17 00:00:00 2001 From: Amine E <35821254+Am1n3e@users.noreply.github.com> Date: Tue, 23 Nov 2021 14:38:45 -0500 Subject: [PATCH 04/13] Saved dataset idx files to output folder (#1) --- megatron/data/biencoder_dataset_utils.py | 5 ++++- megatron/data/dataset_utils.py | 5 ++++- megatron/data/gpt_dataset.py | 8 +++++++- megatron/data/realm_dataset_utils.py | 5 ++++- 4 files changed, 19 insertions(+), 4 deletions(-) diff --git a/megatron/data/biencoder_dataset_utils.py b/megatron/data/biencoder_dataset_utils.py index f7b3b961b8c..feaeb3b90c1 100644 --- a/megatron/data/biencoder_dataset_utils.py +++ b/megatron/data/biencoder_dataset_utils.py @@ -1,5 +1,6 @@ import os import time +from pathlib import Path import numpy as np import torch @@ -134,7 +135,7 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo max_num_samples = np.iinfo(np.int64).max - 1 # Filename of the index mapping - indexmap_filename = data_prefix + indexmap_filename = Path(data_prefix).name indexmap_filename += '_{}_indexmap'.format(name) if num_epochs != (np.iinfo(np.int32).max - 1): indexmap_filename += '_{}ep'.format(num_epochs) @@ -146,6 +147,8 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo indexmap_filename += '_1sentok' indexmap_filename += '.npy' + indexmap_filename = Path(get_args().save).joinpath(indexmap_filename).resolve() + # Build the indexed mapping if not exist. if mpu.get_data_parallel_rank() == 0 and \ not os.path.isfile(indexmap_filename): diff --git a/megatron/data/dataset_utils.py b/megatron/data/dataset_utils.py index fbc33852959..51388e75c92 100644 --- a/megatron/data/dataset_utils.py +++ b/megatron/data/dataset_utils.py @@ -22,6 +22,7 @@ import os import time import collections +from pathlib import Path import numpy as np import torch @@ -650,7 +651,7 @@ def get_samples_mapping(indexed_dataset, max_num_samples = np.iinfo(np.int64).max - 1 # Filename of the index mapping - indexmap_filename = data_prefix + indexmap_filename = Path(data_prefix).name indexmap_filename += '_{}_indexmap'.format(name) if num_epochs != (np.iinfo(np.int32).max - 1): indexmap_filename += '_{}ep'.format(num_epochs) @@ -661,6 +662,8 @@ def get_samples_mapping(indexed_dataset, indexmap_filename += '_{}s'.format(seed) indexmap_filename += '.npy' + indexmap_filename = Path(get_args().save).joinpath(indexmap_filename).resolve() + # Build the indexed mapping if not exist. if torch.distributed.get_rank() == 0 and \ not os.path.isfile(indexmap_filename): diff --git a/megatron/data/gpt_dataset.py b/megatron/data/gpt_dataset.py index e6c64e975d4..993f74d8b99 100644 --- a/megatron/data/gpt_dataset.py +++ b/megatron/data/gpt_dataset.py @@ -17,6 +17,7 @@ import os import time +from pathlib import Path import numpy as np import torch @@ -202,7 +203,7 @@ def _build_index_mappings(name, data_prefix, documents, sizes, np_rng = np.random.RandomState(seed=seed) # Filename of the index mappings. - _filename = data_prefix + _filename = Path(data_prefix).name _filename += '_{}_indexmap'.format(name) _filename += '_{}ns'.format(num_samples) _filename += '_{}sl'.format(seq_length) @@ -211,6 +212,11 @@ def _build_index_mappings(name, data_prefix, documents, sizes, sample_idx_filename = _filename + '_sample_idx.npy' shuffle_idx_filename = _filename + '_shuffle_idx.npy' + output_folder = Path(get_args().save) + doc_idx_filename = output_folder.joinpath(doc_idx_filename).resolve() + sample_idx_filename = output_folder.joinpath(sample_idx_filename).resolve() + shuffle_idx_filename = output_folder.joinpath(shuffle_idx_filename).resolve() + # Build the indexed mapping if not exist. if torch.distributed.get_rank() == 0: if (not os.path.isfile(doc_idx_filename)) or \ diff --git a/megatron/data/realm_dataset_utils.py b/megatron/data/realm_dataset_utils.py index aecf5549a73..8b8ed038f29 100644 --- a/megatron/data/realm_dataset_utils.py +++ b/megatron/data/realm_dataset_utils.py @@ -1,5 +1,6 @@ import os import time +from pathlib import Path import numpy as np import torch @@ -124,7 +125,7 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo max_num_samples = np.iinfo(np.int64).max - 1 # Filename of the index mapping - indexmap_filename = data_prefix + indexmap_filename = Path(data_prefix).name indexmap_filename += '_{}_indexmap'.format(name) if num_epochs != (np.iinfo(np.int32).max - 1): indexmap_filename += '_{}ep'.format(num_epochs) @@ -136,6 +137,8 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo indexmap_filename += '_1sentok' indexmap_filename += '.npy' + indexmap_filename = Path(get_args().save).joinpath(indexmap_filename).resolve() + # Build the indexed mapping if not exist. if mpu.get_data_parallel_rank() == 0 and \ not os.path.isfile(indexmap_filename): From f03393ab093c4dbdef58d4244494ffd2bcadd4ac Mon Sep 17 00:00:00 2001 From: Amine E <35821254+Am1n3e@users.noreply.github.com> Date: Wed, 24 Nov 2021 17:16:44 -0500 Subject: [PATCH 05/13] Added torch distributed barrier (#2) --- megatron/data/biencoder_dataset_utils.py | 9 ++------- megatron/data/dataset_utils.py | 12 +++--------- megatron/data/gpt_dataset.py | 11 ++--------- megatron/data/realm_dataset_utils.py | 9 ++------- 4 files changed, 9 insertions(+), 32 deletions(-) diff --git a/megatron/data/biencoder_dataset_utils.py b/megatron/data/biencoder_dataset_utils.py index feaeb3b90c1..b1b61cd87b7 100644 --- a/megatron/data/biencoder_dataset_utils.py +++ b/megatron/data/biencoder_dataset_utils.py @@ -187,13 +187,8 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo '(seconds): {:4f}'.format( time.time() - start_time)) - # This should be a barrier but nccl barrier assumes - # device_index=rank which is not the case for model - # parallel case - counts = torch.cuda.LongTensor([1]) - torch.distributed.all_reduce(counts, group=mpu.get_data_parallel_group()) - assert counts[0].item() == torch.distributed.get_world_size( - group=mpu.get_data_parallel_group()) + # Wait until rank 0 generate the index file. + torch.distributed.barrier(device_ids=[int(os.environ['LOCAL_RANK'])]) # Load indexed dataset. print_rank_0(' > loading indexed mapping from {}'.format( diff --git a/megatron/data/dataset_utils.py b/megatron/data/dataset_utils.py index 51388e75c92..81acb6cde64 100644 --- a/megatron/data/dataset_utils.py +++ b/megatron/data/dataset_utils.py @@ -699,15 +699,9 @@ def get_samples_mapping(indexed_dataset, print_rank_0(' > elasped time to build and save samples mapping ' '(seconds): {:4f}'.format( time.time() - start_time)) - # This should be a barrier but nccl barrier assumes - # device_index=rank which is not the case for model - # parallel case - counts = torch.cuda.LongTensor([1]) - torch.distributed.all_reduce(counts, group=mpu.get_data_parallel_group()) - torch.distributed.all_reduce(counts, group=mpu.get_pipeline_model_parallel_group()) - assert counts[0].item() == ( - torch.distributed.get_world_size() // - torch.distributed.get_world_size(group=mpu.get_tensor_model_parallel_group())) + + # Wait until rank 0 generate the index file. + torch.distributed.barrier(device_ids=[int(os.environ['LOCAL_RANK'])]) # Load indexed dataset. print_rank_0(' > loading indexed mapping from {}'.format( diff --git a/megatron/data/gpt_dataset.py b/megatron/data/gpt_dataset.py index 993f74d8b99..ca16f38efbd 100644 --- a/megatron/data/gpt_dataset.py +++ b/megatron/data/gpt_dataset.py @@ -299,15 +299,8 @@ def _build_index_mappings(name, data_prefix, documents, sizes, print_rank_0(' > elasped time to build and save shuffle-idx mapping' ' (seconds): {:4f}'.format(time.time() - start_time)) - # This should be a barrier but nccl barrier assumes - # device_index=rank which is not the case for model - # parallel case - counts = torch.cuda.LongTensor([1]) - torch.distributed.all_reduce(counts, group=mpu.get_data_parallel_group()) - torch.distributed.all_reduce(counts, group=mpu.get_pipeline_model_parallel_group()) - assert counts[0].item() == ( - torch.distributed.get_world_size() // - torch.distributed.get_world_size(group=mpu.get_tensor_model_parallel_group())) + # Wait until rank 0 generate the index file. + torch.distributed.barrier(device_ids=[int(os.environ['LOCAL_RANK'])]) # Load mappings. start_time = time.time() diff --git a/megatron/data/realm_dataset_utils.py b/megatron/data/realm_dataset_utils.py index 8b8ed038f29..dd33fcd2886 100644 --- a/megatron/data/realm_dataset_utils.py +++ b/megatron/data/realm_dataset_utils.py @@ -177,13 +177,8 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo '(seconds): {:4f}'.format( time.time() - start_time)) - # This should be a barrier but nccl barrier assumes - # device_index=rank which is not the case for model - # parallel case - counts = torch.cuda.LongTensor([1]) - torch.distributed.all_reduce(counts, group=mpu.get_data_parallel_group()) - assert counts[0].item() == torch.distributed.get_world_size( - group=mpu.get_data_parallel_group()) + # Wait until rank 0 generate the index file. + torch.distributed.barrier(device_ids=[int(os.environ['LOCAL_RANK'])]) # Load indexed dataset. print_rank_0(' > loading indexed mapping from {}'.format( From 560f1a4708d2429a50135d0fbc1b35531f7a5c0f Mon Sep 17 00:00:00 2001 From: Joel Lamy-Poirier Date: Tue, 14 Dec 2021 10:25:37 -0500 Subject: [PATCH 06/13] Measure scales (#3) --- megatron/__init__.py | 18 ++--- megatron/arguments.py | 59 +++++++------- megatron/data/biencoder_dataset_utils.py | 7 ++ megatron/data/dataset_utils.py | 7 ++ megatron/data/gpt_dataset.py | 8 ++ megatron/data/realm_dataset_utils.py | 7 ++ .../fused_kernels/layer_norm_cuda_kernel.cu | 2 +- megatron/metrics.py | 73 +++++++++++++++++ megatron/model/bert_model.py | 25 ++++-- megatron/model/fused_layer_norm.py | 11 ++- megatron/model/language_model.py | 48 ++++++++--- megatron/model/transformer.py | 81 ++++++++++++++----- megatron/model/utils.py | 21 ++++- megatron/mpu/layers.py | 14 +++- megatron/optimizer/__init__.py | 10 ++- megatron/optimizer/clip_grads.py | 8 +- megatron/optimizer/optimizer.py | 20 ++++- megatron/training.py | 6 +- 18 files changed, 328 insertions(+), 97 deletions(-) create mode 100644 megatron/metrics.py diff --git a/megatron/__init__.py b/megatron/__init__.py index f670e652aac..b3a03290088 100644 --- a/megatron/__init__.py +++ b/megatron/__init__.py @@ -12,9 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging + import torch import os +logger = logging.getLogger(__name__) + from .package_info import ( __description__, __contact_names__, @@ -38,21 +42,11 @@ from .initialize import initialize_megatron def print_rank_0(message): - """If distributed is initialized, print only on rank 0.""" - if torch.distributed.is_initialized(): - if torch.distributed.get_rank() == 0: - print(message, flush=True) - else: - print(message, flush=True) + logger.info(str(message)) def is_last_rank(): return torch.distributed.get_rank() == ( torch.distributed.get_world_size() - 1) def print_rank_last(message): - """If distributed is initialized, print only on last rank.""" - if torch.distributed.is_initialized(): - if is_last_rank(): - print(message, flush=True) - else: - print(message, flush=True) + logger.info(str(message)) diff --git a/megatron/arguments.py b/megatron/arguments.py index b8c230f5793..fed8440fbd1 100644 --- a/megatron/arguments.py +++ b/megatron/arguments.py @@ -16,10 +16,13 @@ """Megatron arguments.""" import argparse +import logging import os import torch +logger = logging.getLogger(__name__) + def parse_args(extra_args_provider=None, defaults={}, ignore_unknown_args=False): """Parse all arguments.""" @@ -73,13 +76,12 @@ def parse_args(extra_args_provider=None, defaults={}, 'size ({})'.format(args.world_size, args.tensor_model_parallel_size, args.pipeline_model_parallel_size) args.data_parallel_size = args.world_size // model_parallel_size - if args.rank == 0: - print('using world size: {}, data-parallel-size: {}, ' - 'tensor-model-parallel size: {}, ' - 'pipeline-model-parallel size: {} '.format( - args.world_size, args.data_parallel_size, - args.tensor_model_parallel_size, - args.pipeline_model_parallel_size), flush=True) + logger.info('using world size: {}, data-parallel-size: {}, ' + 'tensor-model-parallel size: {}, ' + 'pipeline-model-parallel size: {} '.format( + args.world_size, args.data_parallel_size, + args.tensor_model_parallel_size, + args.pipeline_model_parallel_size)) # Deprecated arguments assert args.batch_size is None, '--batch-size argument is no longer ' \ @@ -98,11 +100,9 @@ def parse_args(extra_args_provider=None, defaults={}, # arguments that are passed to the program. We check this by # ensuring the arg is set to None. if getattr(args, key) is not None: - if args.rank == 0: - print('WARNING: overriding default arguments for {key}:{v} \ - with {key}:{v2}'.format(key=key, v=defaults[key], - v2=getattr(args, key)), - flush=True) + logger.warning('Overriding default arguments for {key}:{v} \ + with {key}:{v2}'.format(key=key, v=defaults[key], + v2=getattr(args, key))) else: setattr(args, key, defaults[key]) @@ -111,9 +111,8 @@ def parse_args(extra_args_provider=None, defaults={}, assert args.micro_batch_size > 0 if args.global_batch_size is None: args.global_batch_size = args.micro_batch_size * args.data_parallel_size - if args.rank == 0: - print('setting global batch size to {}'.format( - args.global_batch_size), flush=True) + logger.info('setting global batch size to {}'.format( + args.global_batch_size)) assert args.global_batch_size > 0 if args.num_layers_per_virtual_pipeline_stage is not None: assert args.pipeline_model_parallel_size > 2, \ @@ -140,13 +139,10 @@ def parse_args(extra_args_provider=None, defaults={}, # be done in fp32. if not args.accumulate_allreduce_grads_in_fp32: args.accumulate_allreduce_grads_in_fp32 = True - if args.rank == 0: - print('accumulate and all-reduce gradients in fp32 for ' - 'bfloat16 data type.', flush=True) + logger.info('accumulate and all-reduce gradients in fp32 for ' + 'bfloat16 data type.') - if args.rank == 0: - print('using {} for parameters ...'.format(args.params_dtype), - flush=True) + logger.info('using {} for parameters ...'.format(args.params_dtype)) # If we do accumulation and all-reduces in fp32, we need to have # local DDP and we should set the use-contiguous-buffers-in-ddp. @@ -239,17 +235,14 @@ def parse_args(extra_args_provider=None, defaults={}, def _print_args(args): """Print arguments.""" - if args.rank == 0: - print('------------------------ arguments ------------------------', - flush=True) - str_list = [] - for arg in vars(args): - dots = '.' * (48 - len(arg)) - str_list.append(' {} {} {}'.format(arg, dots, getattr(args, arg))) - for arg in sorted(str_list, key=lambda x: x.lower()): - print(arg, flush=True) - print('-------------------- end of arguments ---------------------', - flush=True) + logger.info('------------------------ arguments ------------------------') + str_list = [] + for arg in vars(args): + dots = '.' * (48 - len(arg)) + str_list.append(' {} {} {}'.format(arg, dots, getattr(args, arg))) + for arg in sorted(str_list, key=lambda x: x.lower()): + logger.info(arg) + logger.info('-------------------- end of arguments ---------------------') def _check_arg_is_not_none(args, arg): @@ -304,6 +297,8 @@ def _add_logging_args(parser): group.add_argument('--log-params-norm', action='store_true', help='If set, calculate and log parameters norm.') + group.add_argument('--log-scales', action='store_true', + help='Log the scales of parameters, gradients and activations.') group.add_argument('--log-num-zeros-in-grad', action='store_true', help='If set, calculate and log the number of zeros in gradient.') group.add_argument('--tensorboard-log-interval', type=int, default=1, diff --git a/megatron/data/biencoder_dataset_utils.py b/megatron/data/biencoder_dataset_utils.py index b1b61cd87b7..dee12e1b120 100644 --- a/megatron/data/biencoder_dataset_utils.py +++ b/megatron/data/biencoder_dataset_utils.py @@ -189,6 +189,13 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo # Wait until rank 0 generate the index file. torch.distributed.barrier(device_ids=[int(os.environ['LOCAL_RANK'])]) + # It can take some time for the file to be visible on other nodes. + for i in range(120): + if indexmap_filename.is_file(): + break + if i%10==0: + print_rank_0(" Waiting for index file...") + time.sleep(1.0) # Load indexed dataset. print_rank_0(' > loading indexed mapping from {}'.format( diff --git a/megatron/data/dataset_utils.py b/megatron/data/dataset_utils.py index 81acb6cde64..fa8cd2eb867 100644 --- a/megatron/data/dataset_utils.py +++ b/megatron/data/dataset_utils.py @@ -702,6 +702,13 @@ def get_samples_mapping(indexed_dataset, # Wait until rank 0 generate the index file. torch.distributed.barrier(device_ids=[int(os.environ['LOCAL_RANK'])]) + # It can take some time for the file to be visible on other nodes. + for i in range(120): + if indexmap_filename.is_file(): + break + if i%10==0: + print_rank_0(" Waiting for index file...") + time.sleep(1.0) # Load indexed dataset. print_rank_0(' > loading indexed mapping from {}'.format( diff --git a/megatron/data/gpt_dataset.py b/megatron/data/gpt_dataset.py index ca16f38efbd..815cc985e2c 100644 --- a/megatron/data/gpt_dataset.py +++ b/megatron/data/gpt_dataset.py @@ -302,6 +302,14 @@ def _build_index_mappings(name, data_prefix, documents, sizes, # Wait until rank 0 generate the index file. torch.distributed.barrier(device_ids=[int(os.environ['LOCAL_RANK'])]) + # It can take some time for the file to be visible on other nodes. + for i in range(120): + if doc_idx_filename.is_file() and sample_idx_filename.is_file() and shuffle_idx_filename.is_file(): + break + if i%10==0: + print_rank_0(" Waiting for index files...") + time.sleep(1.0) + # Load mappings. start_time = time.time() print_rank_0(' > loading doc-idx mapping from {}'.format( diff --git a/megatron/data/realm_dataset_utils.py b/megatron/data/realm_dataset_utils.py index dd33fcd2886..05ed12d8cdb 100644 --- a/megatron/data/realm_dataset_utils.py +++ b/megatron/data/realm_dataset_utils.py @@ -179,6 +179,13 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo # Wait until rank 0 generate the index file. torch.distributed.barrier(device_ids=[int(os.environ['LOCAL_RANK'])]) + # It can take some time for the file to be visible on other nodes. + for i in range(120): + if indexmap_filename.is_file(): + break + if i%10==0: + print_rank_0(" Waiting for index file...") + time.sleep(1.0) # Load indexed dataset. print_rank_0(' > loading indexed mapping from {}'.format( diff --git a/megatron/fused_kernels/layer_norm_cuda_kernel.cu b/megatron/fused_kernels/layer_norm_cuda_kernel.cu index ce42584aa33..a892c069f53 100644 --- a/megatron/fused_kernels/layer_norm_cuda_kernel.cu +++ b/megatron/fused_kernels/layer_norm_cuda_kernel.cu @@ -21,7 +21,7 @@ #include "ATen/ATen.h" #include "ATen/AccumulateType.h" #include "ATen/cuda/CUDAContext.h" -#include +#include "ATen/cuda/DeviceUtils.cuh" #include #include diff --git a/megatron/metrics.py b/megatron/metrics.py new file mode 100644 index 00000000000..8883a9dbb58 --- /dev/null +++ b/megatron/metrics.py @@ -0,0 +1,73 @@ +import logging +import math + +import torch +from megatron.global_vars import get_args + +logger = logging.getLogger(__name__) + +_iteration=0 +_metrics={} +_LOGGING_WIDTH=50 + +def next_iteration(iteration:int): + global _iteration, _metrics + _metrics={} + _iteration=iteration + + +def record_scale(name:str,x:torch.Tensor,grad=True, bias=None): + global _metrics + if get_log_scales(): + _metrics[f"{name}.scale" if grad else name]=get_scale(x if bias is None else x+bias) + if grad and x.requires_grad: + x.register_hook(lambda g: record_scale(f"{name}.grad",g,False)) + + +def get_scale(x): + return x.detach().float().pow(2).mean().pow(0.5) + + +def get_log_scales(): + args=get_args() + return args.log_scales and (_iteration+1) % args.log_interval == 0 + + +def log_metrics(): + metrics = {} + for key, value in _metrics.items(): + metrics_ = metrics + keys = key.split(".") + for prefix in keys[:-1]: + if prefix not in metrics_: + metrics_[prefix] = {} + metrics_ = metrics_[prefix] + metrics_[keys[-1]] = _format_value(value) + _log_dicts(metrics) + + +def _log_dicts(metrics, indent=0): + for key, value in metrics.items(): + key_ = key.rjust(len(key) + indent) + + # Merge keys when there is only one entry. + while isinstance(value, dict) and len(value) == 1: + for value_key, value_ in value.items(): + key_ = ".".join([key_, value_key]) + value = value_ + if isinstance(value, dict): + logger.info(key_ + ":") + _log_dicts(value, indent + 2) + else: + sep = _LOGGING_WIDTH - len(value) - len(key_) - 2 + logger.info(f"{key_.ljust(len(key_)+sep,'.')} {value}") + + +def _format_value(value, precision=5,max_leading_zeros=3): + decimals = 0 if value == 0 or not math.isfinite(value) else precision - math.floor(math.log10(abs(value))) + + if 0 <= decimals <= precision + max_leading_zeros: + value = f"{value:.{decimals}f}" + else: + value = f"{value:.{precision}e}" + return value \ No newline at end of file diff --git a/megatron/model/bert_model.py b/megatron/model/bert_model.py index 3ff5039d5fe..a649885760b 100644 --- a/megatron/model/bert_model.py +++ b/megatron/model/bert_model.py @@ -15,10 +15,12 @@ """BERT model.""" +import logging import torch from megatron import get_args from megatron import mpu +from megatron.metrics import record_scale from megatron.model.enums import AttnMaskType from megatron.model.language_model import parallel_lm_logits from megatron.model.language_model import get_language_model @@ -67,18 +69,20 @@ class BertLMHead(MegatronModule): """ def __init__(self, mpu_vocab_size, hidden_size, init_method, - layernorm_epsilon, parallel_output): + layernorm_epsilon, parallel_output, name_=""): super(BertLMHead, self).__init__() + self.name_=name_ args = get_args() self.bias = torch.nn.Parameter(torch.zeros(mpu_vocab_size)) + self.bias.name_=f"{self.name_}.logits.linear_bias" mpu.set_tensor_model_parallel_attributes(self.bias, True, 0, 1) self.parallel_output = parallel_output - self.dense = get_linear_layer(hidden_size, hidden_size, init_method) - self.layernorm = LayerNorm(hidden_size, eps=layernorm_epsilon) + self.dense = get_linear_layer(hidden_size, hidden_size, init_method, name_=f"{self.name_}.dense") + self.layernorm = LayerNorm(hidden_size, eps=layernorm_epsilon, name_=f"{self.name_}.layer_norm") self.gelu = torch.nn.functional.gelu if args.openai_gelu: self.gelu = openai_gelu @@ -86,13 +90,16 @@ def __init__(self, mpu_vocab_size, hidden_size, init_method, self.gelu = erf_gelu def forward(self, hidden_states, word_embeddings_weight): + record_scale(f"{self.name_}.hidden",hidden_states) hidden_states = self.dense(hidden_states) hidden_states = self.gelu(hidden_states) + record_scale(f"{self.name_}.gelu",hidden_states) hidden_states = self.layernorm(hidden_states) output = parallel_lm_logits(hidden_states, word_embeddings_weight, self.parallel_output, bias=self.bias) + record_scale(f"{self.name_}.logits",output) return output @@ -129,9 +136,11 @@ def __init__(self, add_binary_head=True, parallel_output=True, pre_process=True, - post_process=True): + post_process=True, + name_="bert"): super(BertModel, self).__init__() args = get_args() + self.name_=name_ self.fp16_lm_cross_entropy = args.fp16_lm_cross_entropy self.add_binary_head = add_binary_head @@ -150,18 +159,20 @@ def __init__(self, init_method=init_method, scaled_init_method=scaled_init_method, pre_process=self.pre_process, - post_process=self.post_process) + post_process=self.post_process, + name_=self.name_) self.initialize_word_embeddings(init_method_normal) if self.post_process: self.lm_head = BertLMHead( self.word_embeddings_weight().size(0), - args.hidden_size, init_method, args.layernorm_epsilon, parallel_output) + args.hidden_size, init_method, args.layernorm_epsilon, parallel_output, + name_=f"{self.name_}.output_layer.lm_head") self._lm_head_key = 'lm_head' self.binary_head = None if self.add_binary_head: self.binary_head = get_linear_layer(args.hidden_size, 2, - init_method) + init_method, name_=f"{self.name_}.output_layer.sop_head.binary_head") self._binary_head_key = 'binary_head' def set_input_tensor(self, input_tensor): diff --git a/megatron/model/fused_layer_norm.py b/megatron/model/fused_layer_norm.py index 78645c23613..8218c65a5e5 100644 --- a/megatron/model/fused_layer_norm.py +++ b/megatron/model/fused_layer_norm.py @@ -23,6 +23,8 @@ from torch.nn import init import importlib +from megatron.metrics import record_scale + global fused_mix_prec_layer_norm_cuda fused_mix_prec_layer_norm_cuda = None @@ -61,8 +63,9 @@ def backward(ctx, grad_output): class MixedFusedLayerNorm(torch.nn.Module): - def __init__(self, normalized_shape, eps=1e-5): + def __init__(self, normalized_shape, eps=1e-5, name_=""): super(MixedFusedLayerNorm, self).__init__() + self.name_=name_ global fused_mix_prec_layer_norm_cuda fused_mix_prec_layer_norm_cuda = importlib.import_module( @@ -73,7 +76,9 @@ def __init__(self, normalized_shape, eps=1e-5): self.normalized_shape = torch.Size(normalized_shape) self.eps = eps self.weight = Parameter(torch.Tensor(*normalized_shape)) + self.weight.name_=f"{self.name_}.layer_norm_weight" self.bias = Parameter(torch.Tensor(*normalized_shape)) + self.bias.name_=f"{self.name_}.layer_norm_bias" self.reset_parameters() @@ -85,6 +90,8 @@ def reset_parameters(self): def forward(self, input): - return FusedLayerNormAffineFunction.apply( + output = FusedLayerNormAffineFunction.apply( input, self.weight, self.bias, self.normalized_shape,self.eps) + record_scale(self.name_, output) + return output diff --git a/megatron/model/language_model.py b/megatron/model/language_model.py index 06330d81395..3bf9a9712cf 100644 --- a/megatron/model/language_model.py +++ b/megatron/model/language_model.py @@ -21,6 +21,7 @@ from megatron import get_args from megatron import mpu from .module import MegatronModule +from megatron.metrics import record_scale from megatron.model.enums import LayerType, AttnMaskType from megatron.model.transformer import ParallelTransformer from megatron.model.utils import get_linear_layer @@ -47,7 +48,7 @@ def get_language_model(num_tokentypes, add_pooler, encoder_attn_mask_type, init_method=None, scaled_init_method=None, add_decoder=False, decoder_attn_mask_type=AttnMaskType.causal, - pre_process=True, post_process=True): + pre_process=True, post_process=True, name_=""): """Build language model and return along with the key to save.""" args = get_args() @@ -68,7 +69,8 @@ def get_language_model(num_tokentypes, add_pooler, decoder_attn_mask_type=decoder_attn_mask_type, add_pooler=add_pooler, pre_process=pre_process, - post_process=post_process + post_process=post_process, + name_=name_ ) # key used for checkpoints. language_model_key = 'language_model' @@ -88,16 +90,20 @@ class Pooler(MegatronModule): bias is set to zero. """ - def __init__(self, hidden_size, init_method): + def __init__(self, hidden_size, init_method, name_=""): super(Pooler, self).__init__() - self.dense = get_linear_layer(hidden_size, hidden_size, init_method) + self.name_=name_ + self.dense = get_linear_layer(hidden_size, hidden_size, init_method, name_=f"{self.name_}.dense") def forward(self, hidden_states, sequence_index=0): # hidden_states: [b, s, h] # sequence_index: index of the token to pool. + record_scale(f"{self.name_}.input",hidden_states) pooled = hidden_states[:, sequence_index, :] + record_scale(f"{self.name_}.pooled",pooled) pooled = self.dense(pooled) pooled = torch.tanh(pooled) + record_scale(f"{self.name_}.tanh",pooled) return pooled @@ -121,7 +127,8 @@ def __init__(self, max_sequence_length, embedding_dropout_prob, init_method, - num_tokentypes=0): + num_tokentypes=0, + name_=""): super(Embedding, self).__init__() self.hidden_size = hidden_size @@ -129,17 +136,22 @@ def __init__(self, self.num_tokentypes = num_tokentypes args = get_args() + self.name_=name_ # Word embeddings (parallel). self.word_embeddings = mpu.VocabParallelEmbedding( vocab_size, self.hidden_size, init_method=self.init_method) self._word_embeddings_key = 'word_embeddings' + self.word_embeddings.name_=f"{self.name_}.word_embeddings" + self.word_embeddings.weight.name_=f"{self.word_embeddings.name_}.embedding_weight" # Position embedding (serial). self.position_embeddings = torch.nn.Embedding( max_sequence_length, self.hidden_size) self._position_embeddings_key = 'position_embeddings' + self.position_embeddings.name_=f"{self.name_}.position_embeddings" + self.position_embeddings.weight.name_=f"{self.position_embeddings.name_}.embedding_weight" # Initialize the position embeddings. self.init_method(self.position_embeddings.weight) @@ -151,6 +163,8 @@ def __init__(self, if self.num_tokentypes > 0: self.tokentype_embeddings = torch.nn.Embedding(self.num_tokentypes, self.hidden_size) + self.tokentype_embeddings.name_=f"{self.name_}.tokentype_embeddings" + self.tokentype_embeddings.weight.name_=f"{self.tokentype_embeddings.name_}.embedding_weight" # Initialize the token-type embeddings. self.init_method(self.tokentype_embeddings.weight) else: @@ -178,17 +192,24 @@ def add_tokentype_embeddings(self, num_tokentypes): def forward(self, input_ids, position_ids, tokentype_ids=None): # Embeddings. + args=get_args() words_embeddings = self.word_embeddings(input_ids) + record_scale(self.word_embeddings.name_,words_embeddings) position_embeddings = self.position_embeddings(position_ids) + record_scale(self.position_embeddings.name_,position_embeddings) embeddings = words_embeddings + position_embeddings if tokentype_ids is not None: assert self.tokentype_embeddings is not None - embeddings = embeddings + self.tokentype_embeddings(tokentype_ids) + tokentype_embeddings=self.tokentype_embeddings(tokentype_ids) + record_scale(self.tokentype_embeddings.name_,tokentype_embeddings) + embeddings = embeddings + tokentype_embeddings else: assert self.tokentype_embeddings is None + record_scale(f"{self.name_}.embeddings",embeddings) # Dropout. embeddings = self.embedding_dropout(embeddings) + record_scale(f"{self.name_}.dropout",embeddings) return embeddings @@ -277,9 +298,11 @@ def __init__(self, decoder_attn_mask_type=AttnMaskType.causal, add_pooler=False, pre_process=True, - post_process=True): + post_process=True, + name_=""): super(TransformerLanguageModel, self).__init__() args = get_args() + self.name_ = name_ self.pre_process = pre_process self.post_process = post_process @@ -298,7 +321,8 @@ def __init__(self, args.max_position_embeddings, args.hidden_dropout, self.init_method, - self.num_tokentypes) + self.num_tokentypes, + name_=f"{self.name_}.input_layer.embedding") self._embedding_key = 'embedding' # Transformer. @@ -307,7 +331,8 @@ def __init__(self, output_layer_init_method, self_attn_mask_type=self.encoder_attn_mask_type, pre_process=self.pre_process, - post_process=self.post_process + post_process=self.post_process, + name_=self.name_, ) self._encoder_key = 'encoder' @@ -319,13 +344,14 @@ def __init__(self, self.init_method, output_layer_init_method, layer_type=LayerType.decoder, - self_attn_mask_type=self.decoder_attn_mask_type) + self_attn_mask_type=self.decoder_attn_mask_type, + name_=f"{self.name_}.decoder") self._decoder_key = 'decoder' if self.post_process: # Pooler. if self.add_pooler: - self.pooler = Pooler(self.hidden_size, self.init_method) + self.pooler = Pooler(self.hidden_size, self.init_method, name_=f"{self.name_}.output_layer.sop_head") self._pooler_key = 'pooler' def set_input_tensor(self, input_tensor): diff --git a/megatron/model/transformer.py b/megatron/model/transformer.py index ac9d2021892..5a2c91306f6 100644 --- a/megatron/model/transformer.py +++ b/megatron/model/transformer.py @@ -20,6 +20,7 @@ from megatron import get_args from megatron import mpu +from megatron.metrics import record_scale from .module import MegatronModule from megatron.model.enums import AttnMaskType, LayerType, AttnType from megatron.model import LayerNorm @@ -57,9 +58,10 @@ class ParallelMLP(MegatronModule): applied. """ - def __init__(self, init_method, output_layer_init_method): + def __init__(self, init_method, output_layer_init_method, name_=""): super(ParallelMLP, self).__init__() args = get_args() + self.name_=name_ # Project to 4h. self.dense_h_to_4h = mpu.ColumnParallelLinear( @@ -67,7 +69,8 @@ def __init__(self, init_method, output_layer_init_method): args.ffn_hidden_size, gather_output=False, init_method=init_method, - skip_bias_add=True) + skip_bias_add=True, + name_=f"{name_}.dense_0") self.bias_gelu_fusion = args.bias_gelu_fusion self.activation_func = F.gelu @@ -82,7 +85,8 @@ def __init__(self, init_method, output_layer_init_method): args.hidden_size, input_is_parallel=True, init_method=output_layer_init_method, - skip_bias_add=True) + skip_bias_add=True, + name_=f"{name_}.dense_1") def forward(self, hidden_states): @@ -97,6 +101,7 @@ def forward(self, hidden_states): intermediate_parallel = \ self.activation_func(intermediate_parallel + bias_parallel) + record_scale(f"{self.name_}.gelu", intermediate_parallel) # [s, b, h] output, output_bias = self.dense_4h_to_h(intermediate_parallel) return output, output_bias @@ -112,9 +117,11 @@ class ParallelAttention(MegatronModule): def __init__(self, init_method, output_layer_init_method, layer_number, attention_type=AttnType.self_attn, - attn_mask_type=AttnMaskType.padding): + attn_mask_type=AttnMaskType.padding, + name_=""): super(ParallelAttention, self).__init__() args = get_args() + self.name_=name_ self.fp16 = args.fp16 self.bf16 = args.bf16 @@ -143,20 +150,23 @@ def __init__(self, init_method, args.hidden_size, 3 * projection_size, gather_output=False, - init_method=init_method) + init_method=init_method, + name_=f"{self.name_}.query_key_value") else: assert attention_type == AttnType.cross_attn self.query = mpu.ColumnParallelLinear( args.hidden_size, projection_size, gather_output=False, - init_method=init_method) + init_method=init_method, + name_=f"{self.name_}.query") self.key_value = mpu.ColumnParallelLinear( args.hidden_size, 2 * projection_size, gather_output=False, - init_method=init_method) + init_method=init_method, + name_=f"{self.name_}.key_value") coeff = None self.norm_factor = math.sqrt(self.hidden_size_per_attention_head) @@ -183,7 +193,8 @@ def __init__(self, init_method, args.hidden_size, input_is_parallel=True, init_method=output_layer_init_method, - skip_bias_add=True) + skip_bias_add=True, + name_=f"{self.name_}.dense") def forward(self, hidden_states, attention_mask, layer_past=None, get_key_value=False, encoder_output=None): @@ -229,6 +240,10 @@ def forward(self, hidden_states, attention_mask, layer_past=None, self.hidden_size_per_attention_head) query_layer = query_layer.view(*new_tensor_shape) + record_scale(f"{self.name_}.query_layer", query_layer) + record_scale(f"{self.name_}.key_layer", key_layer) + record_scale(f"{self.name_}.value_layer", value_layer) + # ================================== # Adjust key and value for inference # ================================== @@ -277,6 +292,7 @@ def forward(self, hidden_states, attention_mask, layer_past=None, # change view to [b, np, sq, sk] attention_scores = matmul_result.view(*output_size) + record_scale(f"{self.name_}.attention_scores", attention_scores) # ================================================== # Update attention mask for inference. [b, np, sq, sk] # ================================================== @@ -301,6 +317,7 @@ def forward(self, hidden_states, attention_mask, layer_past=None, # attention scores and attention mask [b, np, sq, sk] attention_probs = self.scale_mask_softmax(attention_scores, attention_mask) + record_scale(f"{self.name_}.attention_probs", attention_probs) # This is actually dropping out entire tokens to attend to, which might # seem a bit unusual, but is taken from the original Transformer paper. @@ -342,6 +359,8 @@ def forward(self, hidden_states, attention_mask, layer_past=None, (self.hidden_size_per_partition,) context_layer = context_layer.view(*new_context_layer_shape) + record_scale(f"{self.name_}.context_layer", context_layer) + # ================= # Output. [sq, b, h] # ================= @@ -388,10 +407,12 @@ class ParallelTransformerLayer(MegatronModule): def __init__(self, init_method, output_layer_init_method, layer_number, layer_type=LayerType.encoder, - self_attn_mask_type=AttnMaskType.padding): + self_attn_mask_type=AttnMaskType.padding, + name_=""): args = get_args() super(ParallelTransformerLayer, self).__init__() + self.name_=name_ self.layer_number = layer_number self.layer_type = layer_type @@ -404,7 +425,9 @@ def __init__(self, init_method, output_layer_init_method, # Layernorm on the input data. self.input_layernorm = LayerNorm( args.hidden_size, - eps=args.layernorm_epsilon) + eps=args.layernorm_epsilon, + name_=f"{self.name_}.input_layer_norm", + ) # Self attention. self.self_attention = ParallelAttention( @@ -412,29 +435,35 @@ def __init__(self, init_method, output_layer_init_method, output_layer_init_method, layer_number, attention_type=AttnType.self_attn, - attn_mask_type=self_attn_mask_type) + attn_mask_type=self_attn_mask_type, + name_=f"{self.name_}.self_attention") self.hidden_dropout = args.hidden_dropout self.bias_dropout_fusion = args.bias_dropout_fusion # Layernorm on the attention output self.post_attention_layernorm = LayerNorm( args.hidden_size, - eps=args.layernorm_epsilon) + eps=args.layernorm_epsilon, + name_=f"{self.name_}.post_attention_layer_norm", + ) if self.layer_type == LayerType.decoder: self.inter_attention = ParallelAttention( init_method, output_layer_init_method, layer_number, - attention_type=AttnType.cross_attn) + attention_type=AttnType.cross_attn, + name_=f"{self.name_}.inter_attention") # Layernorm on the attention output. self.post_inter_attention_layernorm = LayerNorm( args.hidden_size, - eps=args.layernorm_epsilon) + eps=args.layernorm_epsilon, + name_=f"{self.name_}.post_inter_attention_layer_norm", + ) # MLP self.mlp = ParallelMLP(init_method, - output_layer_init_method) + output_layer_init_method, name_=f"{self.name_}.mlp") def forward(self, hidden_states, attention_mask, encoder_output=None, enc_dec_attn_mask=None, @@ -450,6 +479,8 @@ def forward(self, hidden_states, attention_mask, layer_past=layer_past, get_key_value=get_key_value) + record_scale(f"{self.name_}.attention", attention_output, bias=attention_bias) + if get_key_value: attention_output, presents = attention_output @@ -458,6 +489,7 @@ def forward(self, hidden_states, attention_mask, residual = layernorm_output else: residual = hidden_states + record_scale(f"{self.name_}.attention_residual_input", residual) # jit scripting for a nn.module (with dropout) is not # trigerring the fusion kernel. For now, we use two @@ -471,6 +503,7 @@ def forward(self, hidden_states, attention_mask, else: bias_dropout_add_func = get_bias_dropout_add(self.training) + # re-enable torch grad to enable fused optimization. with torch.enable_grad(): layernorm_input = bias_dropout_add_func( @@ -479,6 +512,8 @@ def forward(self, hidden_states, attention_mask, residual, self.hidden_dropout) + record_scale(f"{self.name_}.attention_residual", layernorm_input) + # Layer norm post the self attention. layernorm_output = self.post_attention_layernorm(layernorm_input) @@ -487,11 +522,13 @@ def forward(self, hidden_states, attention_mask, self.inter_attention(layernorm_output, enc_dec_attn_mask, encoder_output=encoder_output) + record_scale(f"{self.name_}.inter_attention", attention_output, bias=attention_bias) # residual connection if self.apply_residual_connection_post_layernorm: residual = layernorm_output else: residual = layernorm_input + record_scale(f"{self.name_}.inter_attention_residual_input", residual) # re-enable torch grad to enable fused optimization. with torch.enable_grad(): @@ -500,6 +537,7 @@ def forward(self, hidden_states, attention_mask, attention_bias.expand_as(residual), residual, self.hidden_dropout) + record_scale(f"{self.name_}.inter_attention_residual", layernorm_input) # Layer norm post the decoder attention layernorm_output = self.post_inter_attention_layernorm(layernorm_input) @@ -512,6 +550,7 @@ def forward(self, hidden_states, attention_mask, residual = layernorm_output else: residual = layernorm_input + record_scale(f"{self.name_}.mlp_residual_input", residual) # re-enable torch grad to enable fused optimization. with torch.enable_grad(): @@ -521,6 +560,8 @@ def forward(self, hidden_states, attention_mask, residual, self.hidden_dropout) + record_scale(f"{self.name_}.mlp_residual", layernorm_input) + if get_key_value: output = [output, presents] @@ -533,9 +574,11 @@ class ParallelTransformer(MegatronModule): def __init__(self, init_method, output_layer_init_method, layer_type=LayerType.encoder, self_attn_mask_type=AttnMaskType.padding, - pre_process=True, post_process=True): + pre_process=True, post_process=True, + name_=""): super(ParallelTransformer, self).__init__() args = get_args() + self.name_=name_ self.bf16 = args.bf16 self.fp32_residual_connection = args.fp32_residual_connection @@ -559,7 +602,8 @@ def build_layer(layer_number): output_layer_init_method, layer_number, layer_type=layer_type, - self_attn_mask_type=self_attn_mask_type) + self_attn_mask_type=self_attn_mask_type, + name_=f"{self.name_}.layer_{layer_number-1}.transformer_layer") if args.virtual_pipeline_model_parallel_size is not None: assert args.num_layers % args.virtual_pipeline_model_parallel_size == 0, \ 'num_layers_per_stage must be divisible by ' \ @@ -589,7 +633,8 @@ def build_layer(layer_number): # Final layer norm before output. self.final_layernorm = LayerNorm( args.hidden_size, - eps=args.layernorm_epsilon) + eps=args.layernorm_epsilon, + name_=f"{self.name_}.output_layer.final_layer_norm") def _get_layer(self, layer_number): return self.layers[layer_number] diff --git a/megatron/model/utils.py b/megatron/model/utils.py index 465e8aa4ff6..d87616c6d98 100644 --- a/megatron/model/utils.py +++ b/megatron/model/utils.py @@ -18,8 +18,7 @@ import math import torch - -from megatron import get_args +from megatron.metrics import record_scale def init_method_normal(sigma): """Init method based on N(0, sigma).""" @@ -31,7 +30,7 @@ def init_(tensor): def scaled_init_method_normal(sigma, num_layers): """Init method based on N(0, sigma/sqrt(2*num_layers).""" - std = sigma / math.sqrt(2.0 * num_layers) + std = sigma / math.sqrt(2.0 * max(num_layers,1)) def init_(tensor): return torch.nn.init.normal_(tensor, mean=0.0, std=std) @@ -44,12 +43,26 @@ def attention_mask_func(attention_scores, attention_mask): return attention_scores -def get_linear_layer(rows, columns, init_method): +def get_linear_layer(rows, columns, init_method, name_=""): """Simple linear layer with weight initialization.""" layer = torch.nn.Linear(rows, columns) init_method(layer.weight) with torch.no_grad(): layer.bias.zero_() + layer.name_=name_ + layer.weight.name_=f"{name_}.linear_weight" + layer.bias.name_=f"{name_}.linear_bias" + + + old_forward=layer.forward + + def forward(input): + output=old_forward(input) + record_scale(layer.name_,output) + return output + + layer.forward=forward + return layer @torch.jit.script diff --git a/megatron/mpu/layers.py b/megatron/mpu/layers.py index 8dd69f72cb8..9bf58d2b8fa 100644 --- a/megatron/mpu/layers.py +++ b/megatron/mpu/layers.py @@ -35,7 +35,7 @@ from .utils import divide from .utils import split_tensor_along_last_dim from .utils import VocabUtility -from megatron import get_args +from megatron.metrics import get_args, get_log_scales, record_scale _MODEL_PARALLEL_ATTRIBUTE_DEFAULTS = {'tensor_model_parallel': False, @@ -225,8 +225,9 @@ class ColumnParallelLinear(torch.nn.Module): def __init__(self, input_size, output_size, bias=True, gather_output=True, init_method=init.xavier_normal_, stride=1, keep_master_weight_for_test=False, - skip_bias_add=False): + skip_bias_add=False,name_=""): super(ColumnParallelLinear, self).__init__() + self.name_=name_ # Keep input parameters self.input_size = input_size @@ -256,6 +257,7 @@ def __init__(self, input_size, output_size, bias=True, gather_output=True, device=torch.cuda.current_device(), dtype=args.params_dtype)) _initialize_affine_weight_gpu(self.weight, init_method, partition_dim=0, stride=stride) + self.weight.name_=f"{self.name_}.linear_weight" if bias: if args.use_cpu_initialization: @@ -270,6 +272,7 @@ def __init__(self, input_size, output_size, bias=True, gather_output=True, # Always initialize bias to zero. with torch.no_grad(): self.bias.zero_() + self.bias.name_ = f"{self.name_}.linear_bias" else: self.register_parameter('bias', None) @@ -288,6 +291,7 @@ def forward(self, input_): else: output = output_parallel output_bias = self.bias if self.skip_bias_add else None + record_scale(self.name_, output, bias=output_bias) return output, output_bias @@ -325,8 +329,9 @@ def __init__(self, input_size, output_size, bias=True, input_is_parallel=False, init_method=init.xavier_normal_, stride=1, keep_master_weight_for_test=False, - skip_bias_add=False): + skip_bias_add=False,name_=""): super(RowParallelLinear, self).__init__() + self.name_=name_ # Keep input parameters self.input_size = input_size @@ -356,6 +361,7 @@ def __init__(self, input_size, output_size, bias=True, device=torch.cuda.current_device(), dtype=args.params_dtype)) _initialize_affine_weight_gpu(self.weight, init_method, partition_dim=1, stride=stride) + self.weight.name_ = f"{self.name_}.linear_weight" if bias: if args.use_cpu_initialization: self.bias = Parameter(torch.empty(self.output_size, @@ -367,6 +373,7 @@ def __init__(self, input_size, output_size, bias=True, # Always initialize bias to zero. with torch.no_grad(): self.bias.zero_() + self.bias.name_ = f"{self.name_}.linear_bias" else: self.register_parameter('bias', None) @@ -388,5 +395,6 @@ def forward(self, input_): else: output = output_ output_bias = self.bias + record_scale(self.name_, output, bias=output_bias) return output, output_bias diff --git a/megatron/optimizer/__init__.py b/megatron/optimizer/__init__.py index 823a51f4492..7298930daae 100644 --- a/megatron/optimizer/__init__.py +++ b/megatron/optimizer/__init__.py @@ -13,8 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from apex.optimizers import FusedAdam as Adam -from apex.optimizers import FusedSGD as SGD +import warnings + +try: + from apex.optimizers import FusedAdam as Adam + from apex.optimizers import FusedSGD as SGD +except ImportError: + warnings.warn("Apex not found") from megatron import get_args from megatron.model import LayerNorm @@ -52,6 +57,7 @@ def get_megatron_optimizer(model): # Base optimizer. param_groups = _get_params_for_weight_decay_optimization(model) + print("weight_decay", args.weight_decay) if args.optimizer == 'adam': optimizer = Adam(param_groups, lr=args.lr, diff --git a/megatron/optimizer/clip_grads.py b/megatron/optimizer/clip_grads.py index 036a1d4c4cf..30e1b820ea0 100644 --- a/megatron/optimizer/clip_grads.py +++ b/megatron/optimizer/clip_grads.py @@ -17,9 +17,13 @@ import torch from torch._six import inf +import warnings -from apex.multi_tensor_apply import multi_tensor_applier -import amp_C +try: + from apex.multi_tensor_apply import multi_tensor_applier + import amp_C +except ImportError: + warnings.warn("Apex not found") from megatron import mpu from megatron.model.module import param_is_not_shared diff --git a/megatron/optimizer/optimizer.py b/megatron/optimizer/optimizer.py index 77baddd62ad..175a44b4c8d 100644 --- a/megatron/optimizer/optimizer.py +++ b/megatron/optimizer/optimizer.py @@ -17,15 +17,20 @@ from abc import ABC from abc import abstractmethod +import warnings import torch -from apex.multi_tensor_apply import multi_tensor_applier -import amp_C +try: + from apex.multi_tensor_apply import multi_tensor_applier + import amp_C +except ImportError: + warnings.warn("Apex not found") from megatron import get_timers from megatron import mpu from megatron import print_rank_0 +from megatron.metrics import record_scale,get_log_scales from .clip_grads import clip_grad_norm_fp32, count_zeros_fp32 @@ -136,6 +141,13 @@ def state_dict(self): def load_state_dict(self, state_dict): pass + def _record_scales(self): + if get_log_scales(): + for group in self.optimizer.param_groups: + for p in group['params']: + name_=getattr(p, "name_", "unknown") + record_scale(f"optimizer.{name_}.scale", p, False) + record_scale(f"optimizer.{name_}.grad", p.grad, False) # Promote state so it can be retrieved or set via # "optimizer_instance.state" @@ -245,6 +257,8 @@ def __init__(self, optimizer, clip_grad, log_num_zeros_in_grad, float16_params_this_group.append(param) # Create a copy main_param = param.detach().clone().float() + if hasattr(param, "name_"): + main_param.name_=param.name_ # Copy tensor model parallel attributes. mpu.copy_tensor_model_parallel_attributes(main_param, param) @@ -406,6 +420,7 @@ def step(self): num_zeros_in_grad = self.count_zeros() if \ self.log_num_zeros_in_grad else None + self._record_scales() # Step the optimizer. self.optimizer.step() @@ -504,6 +519,7 @@ def step(self): num_zeros_in_grad = self.count_zeros() if \ self.log_num_zeros_in_grad else None + self._record_scales() # Update parameters. self.optimizer.step() diff --git a/megatron/training.py b/megatron/training.py index 62ed60c1238..61031e29bea 100644 --- a/megatron/training.py +++ b/megatron/training.py @@ -25,7 +25,7 @@ import torch from torch.nn.parallel.distributed import DistributedDataParallel as torchDDP -from megatron import get_args +from megatron.metrics import get_args, get_log_scales, next_iteration, log_metrics from megatron import get_timers from megatron import get_tensorboard_writer from megatron import get_current_global_batch_size @@ -535,6 +535,9 @@ def add_to_logging(name): timers.write(timers_to_log, writer, iteration, normalizer=total_iterations) + if get_log_scales(): + log_metrics() + if iteration % args.log_interval == 0: elapsed_time = timers('interval-time').elapsed() elapsed_time_per_iteration = elapsed_time / total_iterations @@ -617,6 +620,7 @@ def train(forward_step_func, model, optimizer, lr_scheduler, print_datetime('before the start of training step') report_memory_flag = True while iteration < args.train_iters: + next_iteration(iteration) update_num_microbatches(args.consumed_train_samples) loss_dict, skipped_iter, grad_norm, num_zeros_in_grad = \ train_step(forward_step_func, From e1c46ae39c0536718278f4b6255b83f0080bde42 Mon Sep 17 00:00:00 2001 From: Joel Lamy-Poirier Date: Tue, 18 Jan 2022 19:09:48 -0500 Subject: [PATCH 07/13] Update fused kernels --- .../fused_kernels/layer_norm_cuda_kernel.cu | 3 + .../fused_kernels/scaled_masked_softmax.cpp | 22 +- .../fused_kernels/scaled_masked_softmax.h | 19 +- .../scaled_masked_softmax_cuda.cu | 5 + .../scaled_upper_triang_masked_softmax.h | 6 +- megatron/fused_kernels/tests/__init__.py | 0 .../fused_kernels/tests/test_fused_kernels.py | 300 ++++++++++++++++++ megatron/model/fused_softmax.py | 118 ++++--- 8 files changed, 422 insertions(+), 51 deletions(-) create mode 100644 megatron/fused_kernels/tests/__init__.py create mode 100644 megatron/fused_kernels/tests/test_fused_kernels.py diff --git a/megatron/fused_kernels/layer_norm_cuda_kernel.cu b/megatron/fused_kernels/layer_norm_cuda_kernel.cu index a892c069f53..91d53319150 100644 --- a/megatron/fused_kernels/layer_norm_cuda_kernel.cu +++ b/megatron/fused_kernels/layer_norm_cuda_kernel.cu @@ -329,6 +329,7 @@ void cuApplyLayerNorm( mean[i1] = mu; invvar[i1] = c_invvar; } + __syncthreads(); } } @@ -644,6 +645,8 @@ void cuComputeGradInput( k_grad_input[l] = static_cast(f_grad_input); } } + // prevent race where buf is written again before reads are done + __syncthreads(); } } diff --git a/megatron/fused_kernels/scaled_masked_softmax.cpp b/megatron/fused_kernels/scaled_masked_softmax.cpp index d5334710cf9..1852aee6fda 100644 --- a/megatron/fused_kernels/scaled_masked_softmax.cpp +++ b/megatron/fused_kernels/scaled_masked_softmax.cpp @@ -32,6 +32,12 @@ torch::Tensor bwd_cuda( torch::Tensor const& softmax_results, float scale_factor); +int get_batch_per_block_cuda( + int query_seq_len, + int key_seq_len, + int batches, + int attn_heads); + torch::Tensor fwd( torch::Tensor const& input, torch::Tensor const& mask, @@ -63,6 +69,14 @@ torch::Tensor bwd( return bwd_cuda(output_grads, softmax_results, scale_factor); } +int get_batch_per_block( + int query_seq_len, + int key_seq_len, + int batches, + int attn_heads) { + return get_batch_per_block_cuda(query_seq_len, key_seq_len, batches, attn_heads); +} + } // end namespace scaled_masked_softmax } // end namespace fused_softmax } // end namespace multihead_attn @@ -71,7 +85,13 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) { m.def("forward", &multihead_attn::fused_softmax::scaled_masked_softmax::fwd, "Self Multihead Attention scaled, time masked softmax -- Forward."); - m.def("backward", + + m.def("backward", &multihead_attn::fused_softmax::scaled_masked_softmax::bwd, "Self Multihead Attention scaled, time masked softmax -- Backward."); + + m.def("get_batch_per_block", + &multihead_attn::fused_softmax::scaled_masked_softmax::get_batch_per_block, + "Return Batch per block size." + ); } diff --git a/megatron/fused_kernels/scaled_masked_softmax.h b/megatron/fused_kernels/scaled_masked_softmax.h index 78e97e4ec60..45e8dcea27e 100644 --- a/megatron/fused_kernels/scaled_masked_softmax.h +++ b/megatron/fused_kernels/scaled_masked_softmax.h @@ -111,7 +111,7 @@ __global__ void scaled_masked_softmax_warp_forward( constexpr int WARP_SIZE = (next_power_of_two < C10_WARP_SIZE) ? next_power_of_two : C10_WARP_SIZE; constexpr int WARP_ITERATIONS = next_power_of_two / WARP_SIZE; constexpr int WARP_BATCH = (next_power_of_two <= 128) ? 2 : 1; - constexpr int ELEMENTS_PER_LDG_STG = 4; + constexpr int ELEMENTS_PER_LDG_STG = (WARP_ITERATIONS < 4) ? 1 : 4; // blockDim/threadIdx = (WARP_SIZE, WARPS_PER_BLOCK, ) // gridDim/blockIdx = (seq_len, attn_heads, batches) @@ -230,7 +230,7 @@ __global__ void scaled_masked_softmax_warp_backward( constexpr int WARP_SIZE = (next_power_of_two < C10_WARP_SIZE) ? next_power_of_two : C10_WARP_SIZE; constexpr int WARP_ITERATIONS = next_power_of_two / WARP_SIZE; constexpr int WARP_BATCH = (next_power_of_two <= 128) ? 2 : 1; - constexpr int ELEMENTS_PER_LDG_STG = 4; + constexpr int ELEMENTS_PER_LDG_STG = (WARP_ITERATIONS < 4) ? 1 : 4; // blockDim/threadIdx = (WARP_SIZE, WARPS_PER_BLOCK, ) // gridDim/blockIdx = (seq_len, attn_heads, batches) @@ -310,9 +310,22 @@ __global__ void scaled_masked_softmax_warp_backward( } } } - } // end of anonymous namespace +int get_batch_per_block(int query_seq_len, int key_seq_len, int batches, int attn_heads){ + int log2_elements = log2_ceil(key_seq_len); + const int next_power_of_two = 1 << log2_elements; + + int warp_size = (next_power_of_two < C10_WARP_SIZE) ? next_power_of_two : C10_WARP_SIZE; + int batches_per_warp = (next_power_of_two <= 128) ? 2 : 1; + + constexpr int threads_per_block = 128; + int warps_per_block = (threads_per_block / warp_size); + int batches_per_block = warps_per_block * batches_per_warp; + + return batches_per_block; +} + template void dispatch_scaled_masked_softmax_forward( output_t *dst, diff --git a/megatron/fused_kernels/scaled_masked_softmax_cuda.cu b/megatron/fused_kernels/scaled_masked_softmax_cuda.cu index 7e8317c4f48..902d36dd0f8 100644 --- a/megatron/fused_kernels/scaled_masked_softmax_cuda.cu +++ b/megatron/fused_kernels/scaled_masked_softmax_cuda.cu @@ -28,6 +28,11 @@ namespace multihead_attn { namespace fused_softmax { namespace scaled_masked_softmax { +int get_batch_per_block_cuda(int query_seq_len, int key_seq_len, int batches, int attn_heads){ + return get_batch_per_block(query_seq_len, key_seq_len, batches, attn_heads); +} + + torch::Tensor fwd_cuda( torch::Tensor const& input, torch::Tensor const& mask, diff --git a/megatron/fused_kernels/scaled_upper_triang_masked_softmax.h b/megatron/fused_kernels/scaled_upper_triang_masked_softmax.h index addca0a0a3b..6df83fc1037 100644 --- a/megatron/fused_kernels/scaled_upper_triang_masked_softmax.h +++ b/megatron/fused_kernels/scaled_upper_triang_masked_softmax.h @@ -125,7 +125,7 @@ __global__ void scaled_upper_triang_masked_softmax_warp_forward( constexpr int WARP_SIZE = (next_power_of_two < C10_WARP_SIZE) ? next_power_of_two : C10_WARP_SIZE; constexpr int WARP_ITERATIONS = next_power_of_two / WARP_SIZE; constexpr int WARP_BATCH = (next_power_of_two <= 128) ? 2 : 1; - constexpr int ELEMENTS_PER_LDG_STG = 4; + constexpr int ELEMENTS_PER_LDG_STG = (WARP_ITERATIONS < 4) ? 1 : 4; int first_batch = (blockDim.y * blockIdx.y + threadIdx.y) * gridDim.x * WARP_BATCH + blockIdx.x; int local_seq = blockIdx.x + 1; @@ -245,7 +245,7 @@ __global__ void scaled_upper_triang_masked_softmax_warp_backward( constexpr int WARP_SIZE = (next_power_of_two < C10_WARP_SIZE) ? next_power_of_two : C10_WARP_SIZE; constexpr int WARP_ITERATIONS = next_power_of_two / WARP_SIZE; constexpr int WARP_BATCH = (next_power_of_two <= 128) ? 2 : 1; - constexpr int ELEMENTS_PER_LDG_STG = 4; + constexpr int ELEMENTS_PER_LDG_STG = (WARP_ITERATIONS < 4) ? 1 : 4; int first_batch = (blockDim.y * blockIdx.y + threadIdx.y) * gridDim.x * WARP_BATCH + blockIdx.x; int local_seq = blockIdx.x + 1; @@ -361,6 +361,7 @@ void dispatch_scaled_upper_triang_masked_softmax_forward( int warps_per_block = (threads_per_block / warp_size); int batches_per_block = warps_per_block * batches_per_warp; TORCH_INTERNAL_ASSERT(attn_batches % batches_per_block == 0); + int blocks_per_seq = attn_batches / batches_per_block; dim3 blocks(seq_len, blocks_per_seq, 1); dim3 threads(warp_size, warps_per_block, 1); @@ -451,6 +452,7 @@ void dispatch_scaled_upper_triang_masked_softmax_backward( int warps_per_block = (threads_per_block / warp_size); int batches_per_block = warps_per_block * batches_per_warp; TORCH_INTERNAL_ASSERT(attn_batches % batches_per_block == 0); + int blocks_per_seq = attn_batches / batches_per_block; dim3 blocks(seq_len, blocks_per_seq, 1); dim3 threads(warp_size, warps_per_block, 1); diff --git a/megatron/fused_kernels/tests/__init__.py b/megatron/fused_kernels/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/megatron/fused_kernels/tests/test_fused_kernels.py b/megatron/fused_kernels/tests/test_fused_kernels.py new file mode 100644 index 00000000000..f8d5027a1f0 --- /dev/null +++ b/megatron/fused_kernels/tests/test_fused_kernels.py @@ -0,0 +1,300 @@ +import math + +import torch +from torch.nn import LayerNorm + +from megatron.model.enums import AttnMaskType +from megatron.model.fused_layer_norm import MixedFusedLayerNorm +from megatron.model.fused_softmax import FusedScaleMaskSoftmax +from megatron.model.utils import attention_mask_func + + +def test_load_fused_kernels(): + try: + import fused_mix_prec_layer_norm_cuda + import scaled_masked_softmax_cuda + import scaled_upper_triang_masked_softmax_cuda + import torch + + print("[Success] load_fused_kernels") + except ImportError as e: + print("[Fail] load_fused_kernels") + raise e + + +def test_fused_softmax(): + bert = BertModel.from_pretrained("bert-base-cased").cuda().half() + tokenizer = BertTokenizer.from_pretrained("bert-base-cased") + test_text = ( + "Hello. How are you? I am fine thank you and you? yes Good. " + "hi hi hi hi hi hi hi hi hi hi hi hi hi" # 32 + ) + + tokens = tokenizer( + [test_text] * 4, + return_tensors="pt", + ) + + embedding_output = bert.embeddings( + input_ids=tokens["input_ids"].cuda(), + position_ids=None, + token_type_ids=tokens["token_type_ids"].cuda(), + inputs_embeds=None, + past_key_values_length=0, + ) + + # (bsz, 1, 1, seq_len) + mask = bert.get_extended_attention_mask( + attention_mask=tokens["attention_mask"].cuda(), + input_shape=tokens["input_ids"].shape, + device=bert.device, + ) + # (bsz, 1, seq_len, seq_len) + mask = mask.repeat(1, 1, mask.size()[-1], 1) + + attention = bert.encoder.layer[0].attention.self + key_layer = attention.transpose_for_scores(attention.key(embedding_output)) + query_layer = attention.transpose_for_scores(attention.query(embedding_output)) + + attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2)) + attention_scores /= math.sqrt(key_layer.size()[-1]) + + fused_softmax = ( + FusedScaleMaskSoftmax( + input_in_fp16=True, + input_in_bf16=False, + mask_func=attention_mask_func, + scale=None, + softmax_in_fp32=False, + attn_mask_type=AttnMaskType.padding, + scaled_masked_softmax_fusion=True, + ) + .cuda() + .half() + ) + + fused_softmax_output = fused_softmax( + attention_scores, + (mask != 0), + ) + + torch_softmax = ( + FusedScaleMaskSoftmax( + input_in_fp16=True, + input_in_bf16=False, + mask_func=attention_mask_func, + scale=None, + softmax_in_fp32=False, + attn_mask_type=AttnMaskType.padding, + scaled_masked_softmax_fusion=False, + ) + .cuda() + .half() + ) + + torch_softmax_output = torch_softmax( + attention_scores, + (mask != 0), + ) + + test_result = (fused_softmax_output - torch_softmax_output).abs() + + while test_result.dim() != 1: + test_result = test_result.mean(dim=-1) + + diff = test_result.mean(dim=-1) + + if diff <= 1e-3: + print( + f"\n[Success] test_fused_softmax" + f"\n > mean_difference={diff}" + f"\n > fused_values={fused_softmax_output[-1][-1][-1][:5].tolist()}" + f"\n > torch_values={torch_softmax_output[-1][-1][-1][:5].tolist()}" + ) + else: + print( + f"\n[Fail] test_fused_softmax" + f"\n > mean_difference={diff}, " + f"\n > fused_values={fused_softmax_output[-1][-1][-1][:5].tolist()}, " + f"\n > torch_values={torch_softmax_output[-1][-1][-1][:5].tolist()}" + ) + + +def test_fused_upper_triangle_mask_softmax(): + gpt = GPT2Model.from_pretrained("gpt2").cuda().half() + tokenizer = GPT2Tokenizer.from_pretrained("gpt2") + test_text = ( + "Hello. How are you? I am fine thank you and you? yes Good. " + "hi hi hi hi hi hi hi" # 24 + ) + + tokens = tokenizer( + [test_text] * 4, + return_tensors="pt", + ) + + attention_mask = tokens["attention_mask"].cuda() + attention_mask = attention_mask.view(attention_mask.size(0), -1) + attention_mask = attention_mask[:, None, None, :] + attention_mask = (1.0 - attention_mask) * -10000.0 + attention_mask = attention_mask.repeat(1, 1, attention_mask.size()[-1], 1) + attn = gpt.h[0] + + hidden_states = gpt.wte(tokens["input_ids"].cuda()) + q, k, v = attn.attn.c_attn(hidden_states).split(768, dim=-1) + q = attn.attn._split_heads(q, attn.attn.num_heads, attn.attn.head_dim) + k = attn.attn._split_heads(k, attn.attn.num_heads, attn.attn.head_dim) + attn_weights = torch.matmul(q, k.transpose(-1, -2)) + + sq, sk = q.size(-2), k.size(-2) + causal_mask = attn.attn.bias[:, :, sk - sq : sk, :sk].bool() + total_mask = ~(causal_mask & (attention_mask == 0)) + """ + tensor([[[[False, True, True, ..., True, True, True], + [False, False, True, ..., True, True, True], + [False, False, False, ..., True, True, True], + ..., + [False, False, False, ..., False, True, True], + [False, False, False, ..., False, False, True], + [False, False, False, ..., False, False, False]]] + """ + + fused_softmax = ( + FusedScaleMaskSoftmax( + input_in_fp16=True, + input_in_bf16=False, + mask_func=attention_mask_func, + scale=None, + softmax_in_fp32=False, + attn_mask_type=AttnMaskType.causal, + scaled_masked_softmax_fusion=True, + ) + .cuda() + .half() + ) + + fused_softmax_output = fused_softmax( + attn_weights, + total_mask, + ) + + torch_softmax = ( + FusedScaleMaskSoftmax( + input_in_fp16=True, + input_in_bf16=False, + mask_func=attention_mask_func, + scale=None, + softmax_in_fp32=False, + attn_mask_type=AttnMaskType.causal, + scaled_masked_softmax_fusion=False, + ) + .cuda() + .half() + ) + + torch_softmax_output = torch_softmax( + attn_weights, + total_mask, + ) + + test_result = (fused_softmax_output - torch_softmax_output).abs() + + while test_result.dim() != 1: + test_result = test_result.mean(dim=-1) + + diff = test_result.mean(dim=-1) + + if diff <= 1e-3: + print( + f"\n[Success] test_fused_upper_triangle_mask_softmax" + f"\n > mean_difference={diff}" + f"\n > fused_values={fused_softmax_output[-1][-1][-1][:5].tolist()}" + f"\n > torch_values={torch_softmax_output[-1][-1][-1][:5].tolist()}" + ) + else: + print( + f"\n[Fail] test_fused_upper_triangle_mask_softmax" + f"\n > mean_difference={diff}, " + f"\n > fused_values={fused_softmax_output[-1][-1][-1][:5].tolist()}, " + f"\n > torch_values={torch_softmax_output[-1][-1][-1][:5].tolist()}" + ) + + +def test_layer_norm(): + bert = BertModel.from_pretrained("bert-base-cased").cuda().half() + tokenizer = BertTokenizer.from_pretrained("bert-base-cased") + test_text = ( + "Hello. How are you? I am fine thank you and you? yes Good. " + "hi hi hi hi hi hi hi hi hi hi hi hi hi" # 32 + ) + + tokens = tokenizer( + [test_text] * 4, + return_tensors="pt", + ) + + # [bsz, seq_len, d_model] + embedding_output = ( + bert.embeddings( + input_ids=tokens["input_ids"].cuda(), + position_ids=None, + token_type_ids=tokens["token_type_ids"].cuda(), + inputs_embeds=None, + past_key_values_length=0, + ) + .cuda() + .half() + ) + + fused_layernorm_layer = ( + MixedFusedLayerNorm(normalized_shape=embedding_output.size(-1)).cuda().half() + ) + + torch_layernorm_layer = ( + LayerNorm(normalized_shape=embedding_output.size(-1)).cuda().half() + ) + + fused_output = fused_layernorm_layer(embedding_output) + torch_output = torch_layernorm_layer(embedding_output) + test_result = (fused_output - torch_output).abs() + + while test_result.dim() != 1: + test_result = test_result.mean(dim=-1) + + diff = test_result.mean(dim=-1) + + if diff <= 1e-3: + print( + f"\n[Success] test_layer_norm" + f"\n > mean_difference={diff}" + f"\n > fused_values={fused_output[-1][-1][:5].tolist()}" + f"\n > torch_values={torch_output[-1][-1][:5].tolist()}" + ) + else: + print( + f"\n[Fail] test_layer_norm" + f"\n > mean_difference={diff}, " + f"\n > fused_values={fused_output[-1][-1][:5].tolist()}, " + f"\n > torch_values={torch_output[-1][-1][:5].tolist()}" + ) + + +if __name__ == "__main__": + try: + from transformers import BertTokenizer, GPT2Tokenizer + from transformers.models.bert.modeling_bert import BertModel + from transformers.models.gpt2.modeling_gpt2 import GPT2Model + import transformers + + transformers.logging.set_verbosity( + transformers.logging.FATAL, + ) + + except: + print("\n[Fail] Please install `transformers` package to test fused kernels\n") + exit(-1) + + test_load_fused_kernels() + test_fused_softmax() + test_fused_upper_triangle_mask_softmax() + test_layer_norm() diff --git a/megatron/model/fused_softmax.py b/megatron/model/fused_softmax.py index 097b29ef4c6..7b047dfa614 100644 --- a/megatron/model/fused_softmax.py +++ b/megatron/model/fused_softmax.py @@ -13,7 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. + import torch +import torch.nn as nn from megatron.model.enums import AttnMaskType @@ -30,10 +32,10 @@ def forward(ctx, inputs, scale): import scaled_upper_triang_masked_softmax_cuda scale_t = torch.tensor([scale]) - softmax_results = scaled_upper_triang_masked_softmax_cuda.forward( inputs, scale_t[0] ) + ctx.save_for_backward(softmax_results, scale_t) return softmax_results @@ -42,10 +44,10 @@ def backward(ctx, output_grads): import scaled_upper_triang_masked_softmax_cuda softmax_results, scale_t = ctx.saved_tensors - input_grads = scaled_upper_triang_masked_softmax_cuda.backward( output_grads, softmax_results, scale_t[0] ) + return input_grads, None @@ -63,9 +65,7 @@ def forward(ctx, inputs, mask, scale): scale_t = torch.tensor([scale]) - softmax_results = scaled_masked_softmax_cuda.forward( - inputs, mask, scale_t[0] - ) + softmax_results = scaled_masked_softmax_cuda.forward(inputs, mask, scale_t[0]) ctx.save_for_backward(softmax_results, scale_t) return softmax_results @@ -81,16 +81,18 @@ def backward(ctx, output_grads): return input_grads, None, None -class FusedScaleMaskSoftmax(torch.nn.Module): +class FusedScaleMaskSoftmax(nn.Module): """ fused operation: scaling + mask + softmax + Arguments: input_in_fp16: flag to indicate if input in fp16 data format. + input_in_bf16: flag to indicate if input in bf16 data format. attn_mask_type: attention mask type (pad or causal) + scaled_masked_softmax_fusion: flag to indicate user want to use softmax fusion mask_func: mask function to be applied. softmax_in_fp32: if true, softmax in performed at fp32 precision. scale: scaling factor used in input tensor scaling. - """ def __init__( @@ -106,8 +108,9 @@ def __init__( super(FusedScaleMaskSoftmax, self).__init__() self.input_in_fp16 = input_in_fp16 self.input_in_bf16 = input_in_bf16 - assert not (self.input_in_fp16 and self.input_in_bf16),\ - 'both fp16 and bf16 flags cannot be active at the same time.' + assert not ( + self.input_in_fp16 and self.input_in_bf16 + ), "both fp16 and bf16 flags cannot be active at the same time." self.input_in_float16 = self.input_in_fp16 or self.input_in_bf16 self.attn_mask_type = attn_mask_type self.scaled_masked_softmax_fusion = scaled_masked_softmax_fusion @@ -118,47 +121,72 @@ def __init__( assert ( self.scale is None or softmax_in_fp32 ), "softmax should be in fp32 when scaled" - + def forward(self, input, mask): # [b, np, sq, sk] assert input.dim() == 4 - data_size = input.size() - query_seq_len = data_size[-2] - key_seq_len = data_size[-1] - attn_batch_size = data_size[0] * data_size[1] - - # constraints on various tensor dimensions to enable warp based - # optimization and upper triangular optimization (for causal mask) - custom_kernel_constraint = key_seq_len > 16 and key_seq_len <= 2048 and \ - query_seq_len % 4 == 0 and attn_batch_size % 4 == 0 - - # invoke custom kernel - if self.input_in_float16 and mask is not None and \ - custom_kernel_constraint and self.scaled_masked_softmax_fusion: - scale = self.scale if self.scale is not None else 1.0 - - if self.attn_mask_type == AttnMaskType.causal: - assert query_seq_len == key_seq_len, \ - "causal mask is only for self attention" - input = input.view(-1, query_seq_len, key_seq_len) - probs = ScaledUpperTriangMaskedSoftmax.apply(input, scale) - probs = probs.view(*data_size) - else: - assert self.attn_mask_type == AttnMaskType.padding - probs = ScaledMaskedSoftmax.apply(input, mask, scale) + + if self.is_kernel_available(mask, *input.size()): + return self.forward_fused_softmax(input, mask) else: - if self.input_in_float16 and self.softmax_in_fp32: - input = input.float() + return self.forward_torch_softmax(input, mask) + + def is_kernel_available(self, mask, b, np, sq, sk): + attn_batches = b * np + + if ( + self.scaled_masked_softmax_fusion # user want to fuse + and self.input_in_float16 # input must be fp16 + and mask is not None # mask tensor must not be None + and 16 < sk <= 2048 # sk must be 16 ~ 2048 + and sq % 4 == 0 # sq must be divisor of 4 + and attn_batches % 4 == 0 # np * b must be divisor of 4 + ): + if 0 <= sk <= 2048: + batch_per_block = self.get_batch_per_block(sq, sk, b, np) + + if self.attn_mask_type == AttnMaskType.causal: + if attn_batches % batch_per_block == 0: + return True + else: + if sq % batch_per_block == 0: + return True + return False - if self.scale is not None: - input = input * self.scale - mask_output = self.mask_func(input, mask) if mask is not None else input - probs = torch.nn.Softmax(dim=-1)(mask_output) + def forward_fused_softmax(self, input, mask): + b, np, sq, sk = input.size() + scale = self.scale if self.scale is not None else 1.0 - if self.input_in_float16 and self.softmax_in_fp32: - if self.input_in_fp16: - probs = probs.half() - else: - probs = probs.bfloat16() + if self.attn_mask_type == AttnMaskType.causal: + assert sq == sk, "causal mask is only for self attention" + + # input is 3D tensor (attn_batches, sq, sk) + input = input.view(-1, sq, sk) + probs = ScaledUpperTriangMaskedSoftmax.apply(input, scale) + return probs.view(b, np, sq, sk) + else: + # input is 4D tensor (b, np, sq, sk) + return ScaledMaskedSoftmax.apply(input, mask, scale) + + def forward_torch_softmax(self, input, mask): + if self.input_in_float16 and self.softmax_in_fp32: + input = input.float() + + if self.scale is not None: + input = input * self.scale + mask_output = self.mask_func(input, mask) if mask is not None else input + probs = torch.nn.Softmax(dim=-1)(mask_output) + + if self.input_in_float16 and self.softmax_in_fp32: + if self.input_in_fp16: + probs = probs.half() + else: + probs = probs.bfloat16() return probs + + @staticmethod + def get_batch_per_block(sq, sk, b, np): + import scaled_masked_softmax_cuda + + return scaled_masked_softmax_cuda.get_batch_per_block(sq, sk, b, np) From 36f24a7808deb3945d76a651920385bf21d2df51 Mon Sep 17 00:00:00 2001 From: Joel Lamy-Poirier Date: Thu, 17 Feb 2022 17:30:00 -0500 Subject: [PATCH 08/13] fixes --- megatron/arguments.py | 2 +- megatron/data/helpers.cpp | 19 ++++--------------- megatron/initialize.py | 10 +++++----- 3 files changed, 10 insertions(+), 21 deletions(-) diff --git a/megatron/arguments.py b/megatron/arguments.py index 693224c93fe..10da8fa65dd 100644 --- a/megatron/arguments.py +++ b/megatron/arguments.py @@ -82,7 +82,7 @@ def parse_args(extra_args_provider=None, defaults={}, 'pipeline-model-parallel size: {} '.format( args.world_size, args.data_parallel_size, args.tensor_model_parallel_size, - args.pipeline_model_parallel_size), flush=True) + args.pipeline_model_parallel_size)) if args.pipeline_model_parallel_size > 1: if args.pipeline_model_parallel_split_rank is not None: assert args.pipeline_model_parallel_split_rank < \ diff --git a/megatron/data/helpers.cpp b/megatron/data/helpers.cpp index 512beaff3fa..e45926a9769 100644 --- a/megatron/data/helpers.cpp +++ b/megatron/data/helpers.cpp @@ -26,8 +26,6 @@ #include #include #include -#include -#include namespace py = pybind11; using namespace std; @@ -285,20 +283,12 @@ py::array build_mapping_impl(const py::array_t& docs_, // For each epoch: for (int32_t epoch=0; epoch= max_num_samples) { - if (verbose && (!second)) { - cout << " reached " << max_num_samples << " samples after " - << epoch << " epochs ..." << endl << std::flush; - } + if (verbose && (!second)) { + cout << " reached " << max_num_samples << " samples after " + << epoch << " epochs ..." << endl << std::flush; + } break; } - - if (verbose && map_index % (max_num_samples / 100) == 0) { - auto t = std::time(nullptr); - auto tm = *std::localtime(&t); - cout << " reached " << map_index << "/" << max_num_samples << " samples after " - << epoch << " epochs ..." << std::put_time(&tm, "%d-%m-%Y %H-%M-%S") << endl << std::flush; - } - // For each document: for (int32_t doc=0; doc<(docs.shape(0) - 1); ++doc) { @@ -551,7 +541,6 @@ py::array build_blocks_mapping_impl(const py::array_t& docs_, // assign every block a unique id int32_t block_id = 0; - if (map_index >= max_num_samples) { if (verbose && (!second)) { cout << " reached " << max_num_samples << " samples after " diff --git a/megatron/initialize.py b/megatron/initialize.py index d80e7b5e5ce..03ea26c5f11 100644 --- a/megatron/initialize.py +++ b/megatron/initialize.py @@ -185,11 +185,11 @@ def _initialize_distributed(): else: args.local_rank = device torch.cuda.set_device(device) - # Call the init process - torch.distributed.init_process_group( - backend=args.distributed_backend, - world_size=args.world_size, rank=args.rank, - timeout=timedelta(minutes=10)) + # Call the init process + torch.distributed.init_process_group( + backend=args.distributed_backend, + world_size=args.world_size, rank=args.rank, + timeout=timedelta(minutes=10)) # Set the tensor model-parallel, pipeline model-parallel, and # data-parallel communicators. From 40c2c3e9d7be8fd03746c7fd32b6c96c17852623 Mon Sep 17 00:00:00 2001 From: Joel Lamy-Poirier Date: Wed, 23 Feb 2022 05:07:27 -0500 Subject: [PATCH 09/13] Barrier fixes and time and memory logging --- megatron/data/biencoder_dataset_utils.py | 3 ++- megatron/data/dataset_utils.py | 4 +++- megatron/data/gpt_dataset.py | 3 ++- megatron/data/realm_dataset_utils.py | 3 ++- megatron/global_vars.py | 8 ++----- megatron/training.py | 29 ++++++++++++++++++++---- 6 files changed, 36 insertions(+), 14 deletions(-) diff --git a/megatron/data/biencoder_dataset_utils.py b/megatron/data/biencoder_dataset_utils.py index dee12e1b120..213fc78a7d7 100644 --- a/megatron/data/biencoder_dataset_utils.py +++ b/megatron/data/biencoder_dataset_utils.py @@ -4,6 +4,7 @@ import numpy as np import torch +import torch.distributed from megatron import get_args, get_tokenizer, mpu, print_rank_0 from megatron.data.dataset_utils import create_masked_lm_predictions, \ @@ -188,7 +189,7 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo time.time() - start_time)) # Wait until rank 0 generate the index file. - torch.distributed.barrier(device_ids=[int(os.environ['LOCAL_RANK'])]) + torch.distributed.barrier(device_ids=[int(os.environ['LOCAL_RANK'])], group=mpu.get_data_parallel_group()) # It can take some time for the file to be visible on other nodes. for i in range(120): if indexmap_filename.is_file(): diff --git a/megatron/data/dataset_utils.py b/megatron/data/dataset_utils.py index 45ca5e901a2..b795cc50c98 100644 --- a/megatron/data/dataset_utils.py +++ b/megatron/data/dataset_utils.py @@ -26,6 +26,7 @@ import numpy as np import torch +import torch.distributed from megatron import ( get_args, @@ -701,7 +702,8 @@ def get_samples_mapping(indexed_dataset, time.time() - start_time)) # Wait until rank 0 generate the index file. - torch.distributed.barrier(device_ids=[int(os.environ['LOCAL_RANK'])]) + print_rank_0(f"Barrier device {int(os.environ['LOCAL_RANK'])}") + torch.distributed.barrier(device_ids=[int(os.environ['LOCAL_RANK'])], group=mpu.get_data_parallel_group()) # It can take some time for the file to be visible on other nodes. for i in range(120): if indexmap_filename.is_file(): diff --git a/megatron/data/gpt_dataset.py b/megatron/data/gpt_dataset.py index 815cc985e2c..be682142913 100644 --- a/megatron/data/gpt_dataset.py +++ b/megatron/data/gpt_dataset.py @@ -21,6 +21,7 @@ import numpy as np import torch +import torch.distributed from megatron import mpu, print_rank_0 from megatron.data.blendable_dataset import BlendableDataset @@ -300,7 +301,7 @@ def _build_index_mappings(name, data_prefix, documents, sizes, ' (seconds): {:4f}'.format(time.time() - start_time)) # Wait until rank 0 generate the index file. - torch.distributed.barrier(device_ids=[int(os.environ['LOCAL_RANK'])]) + torch.distributed.barrier(device_ids=[int(os.environ['LOCAL_RANK'])], group=mpu.get_data_parallel_group()) # It can take some time for the file to be visible on other nodes. for i in range(120): diff --git a/megatron/data/realm_dataset_utils.py b/megatron/data/realm_dataset_utils.py index 05ed12d8cdb..1efdf210d40 100644 --- a/megatron/data/realm_dataset_utils.py +++ b/megatron/data/realm_dataset_utils.py @@ -4,6 +4,7 @@ import numpy as np import torch +import torch.distributed from megatron import mpu, print_rank_0 from megatron.data.dataset_utils import create_masked_lm_predictions, pad_and_convert_to_numpy @@ -178,7 +179,7 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo time.time() - start_time)) # Wait until rank 0 generate the index file. - torch.distributed.barrier(device_ids=[int(os.environ['LOCAL_RANK'])]) + torch.distributed.barrier(device_ids=[int(os.environ['LOCAL_RANK'])], group=mpu.get_data_parallel_group()) # It can take some time for the file to be visible on other nodes. for i in range(120): if indexmap_filename.is_file(): diff --git a/megatron/global_vars.py b/megatron/global_vars.py index 59f5960adc0..7977db864d0 100644 --- a/megatron/global_vars.py +++ b/megatron/global_vars.py @@ -267,9 +267,5 @@ def log(self, names, normalizer=1.0, reset=True): elapsed_time = self.timers[name].elapsed( reset=reset) * 1000.0 / normalizer string += ' | {}: {:.2f}'.format(name, elapsed_time) - if torch.distributed.is_initialized(): - if torch.distributed.get_rank() == ( - torch.distributed.get_world_size() - 1): - print(string, flush=True) - else: - print(string, flush=True) + from megatron import print_rank_last + print_rank_last(string) diff --git a/megatron/training.py b/megatron/training.py index 4cf6e4c0e25..46cab8841d3 100644 --- a/megatron/training.py +++ b/megatron/training.py @@ -477,11 +477,16 @@ def train_step(forward_step_func, data_iterator, return loss_reduced, skipped_iter, grad_norm, num_zeros_in_grad return {}, skipped_iter, grad_norm, num_zeros_in_grad +last_time=None +first_time=None +num_iters=None def training_log(loss_dict, total_loss_dict, learning_rate, iteration, loss_scale, report_memory_flag, skipped_iter, grad_norm, params_norm, num_zeros_in_grad): """Log training information such as losses, timing, ....""" + + global last_time, first_time, num_iters args = get_args() timers = get_timers() writer = get_tensorboard_writer() @@ -639,14 +644,30 @@ def add_to_logging(name): total_loss_dict[skipped_iters_key]) log_string += ' number of nan iterations: {:3d} |'.format( total_loss_dict[nan_iters_key]) + current_time=time.perf_counter() + # Skip the slower first batch to be more accurate + if first_time is None: + first_time=current_time + num_iters=0 + else: + num_iters+=1 + log_string += f' batch time: {1000*(current_time-last_time):.2f} ms |' + log_string += f' avg time: {1000*(current_time-first_time)/num_iters:.2f} ms |' + last_time=current_time + + log_string += f' memory: {torch.cuda.memory_allocated():,} |' + log_string += f' max memory: {torch.cuda.max_memory_allocated():,} |' + log_string += f' reserved: {torch.cuda.memory_reserved():,} |' + log_string += f' max reserved: {torch.cuda.memory_reserved():,} |' + torch.cuda.reset_peak_memory_stats() total_loss_dict[advanced_iters_key] = 0 total_loss_dict[skipped_iters_key] = 0 total_loss_dict[nan_iters_key] = 0 print_rank_last(log_string) - if report_memory_flag and learning_rate > 0.: - # Report memory after optimizer state has been initialized. - report_memory('(after {} iterations)'.format(iteration)) - report_memory_flag = False + #if report_memory_flag and learning_rate > 0.: + # # Report memory after optimizer state has been initialized. + # report_memory('(after {} iterations)'.format(iteration)) + # report_memory_flag = False timers.log(timers_to_log, normalizer=args.log_interval) return report_memory_flag From 9e0c548ec0ec90a0db020b7732fda578db3844d0 Mon Sep 17 00:00:00 2001 From: Joel Lamy-Poirier Date: Fri, 18 Mar 2022 20:47:31 -0400 Subject: [PATCH 10/13] Indexmap path and name args --- megatron/arguments.py | 4 ++++ megatron/data/biencoder_dataset_utils.py | 8 ++++++-- megatron/data/dataset_utils.py | 8 ++++++-- megatron/data/realm_dataset_utils.py | 8 ++++++-- megatron/training.py | 8 +++++--- 5 files changed, 27 insertions(+), 9 deletions(-) diff --git a/megatron/arguments.py b/megatron/arguments.py index 10da8fa65dd..39d9dd21bef 100644 --- a/megatron/arguments.py +++ b/megatron/arguments.py @@ -343,6 +343,8 @@ def _add_network_size_args(parser): def _add_logging_args(parser): group = parser.add_argument_group(title='logging') + group.add_argument('--name', type=str, default=None, + help='A name for the experiment.') group.add_argument('--log-params-norm', action='store_true', help='If set, calculate and log parameters norm.') group.add_argument('--log-scales', action='store_true', @@ -703,6 +705,8 @@ def _add_data_args(parser): '1) a single data path, 2) multiple datasets in the' 'form: dataset1-weight dataset1-path dataset2-weight ' 'dataset2-path ...') + group.add_argument('--indexmap-path', type=str, default=None, + help='Path for intermediate data files') group.add_argument('--split', type=str, default='969, 30, 1', help='Comma-separated list of proportions for training,' ' validation, and test split. For example the split ' diff --git a/megatron/data/biencoder_dataset_utils.py b/megatron/data/biencoder_dataset_utils.py index 213fc78a7d7..7e4956c40f9 100644 --- a/megatron/data/biencoder_dataset_utils.py +++ b/megatron/data/biencoder_dataset_utils.py @@ -136,7 +136,7 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo max_num_samples = np.iinfo(np.int64).max - 1 # Filename of the index mapping - indexmap_filename = Path(data_prefix).name + indexmap_filename = data_prefix indexmap_filename += '_{}_indexmap'.format(name) if num_epochs != (np.iinfo(np.int32).max - 1): indexmap_filename += '_{}ep'.format(num_epochs) @@ -148,7 +148,11 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo indexmap_filename += '_1sentok' indexmap_filename += '.npy' - indexmap_filename = Path(get_args().save).joinpath(indexmap_filename).resolve() + args=get_args() + if args.indexmap_path is not None: + indexmap_path=Path(get_args().indexmap_path).resolve() + indexmap_path.mkdir(parents=True, exist_ok=True) + indexmap_filename = indexmap_path/Path(indexmap_filename).name # Build the indexed mapping if not exist. if mpu.get_data_parallel_rank() == 0 and \ diff --git a/megatron/data/dataset_utils.py b/megatron/data/dataset_utils.py index b795cc50c98..381fe3fe111 100644 --- a/megatron/data/dataset_utils.py +++ b/megatron/data/dataset_utils.py @@ -652,7 +652,7 @@ def get_samples_mapping(indexed_dataset, max_num_samples = np.iinfo(np.int64).max - 1 # Filename of the index mapping - indexmap_filename = Path(data_prefix).name + indexmap_filename = data_prefix indexmap_filename += '_{}_indexmap'.format(name) if num_epochs != (np.iinfo(np.int32).max - 1): indexmap_filename += '_{}ep'.format(num_epochs) @@ -663,7 +663,11 @@ def get_samples_mapping(indexed_dataset, indexmap_filename += '_{}s'.format(seed) indexmap_filename += '.npy' - indexmap_filename = Path(get_args().save).joinpath(indexmap_filename).resolve() + args=get_args() + if args.indexmap_path is not None: + indexmap_path=Path(get_args().indexmap_path).resolve() + indexmap_path.mkdir(parents=True, exist_ok=True) + indexmap_filename = indexmap_path/Path(indexmap_filename).name # Build the indexed mapping if not exist. if torch.distributed.get_rank() == 0 and \ diff --git a/megatron/data/realm_dataset_utils.py b/megatron/data/realm_dataset_utils.py index 1efdf210d40..e313c42f531 100644 --- a/megatron/data/realm_dataset_utils.py +++ b/megatron/data/realm_dataset_utils.py @@ -126,7 +126,7 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo max_num_samples = np.iinfo(np.int64).max - 1 # Filename of the index mapping - indexmap_filename = Path(data_prefix).name + indexmap_filename = data_prefix indexmap_filename += '_{}_indexmap'.format(name) if num_epochs != (np.iinfo(np.int32).max - 1): indexmap_filename += '_{}ep'.format(num_epochs) @@ -138,7 +138,11 @@ def get_block_samples_mapping(block_dataset, title_dataset, data_prefix, num_epo indexmap_filename += '_1sentok' indexmap_filename += '.npy' - indexmap_filename = Path(get_args().save).joinpath(indexmap_filename).resolve() + args=get_args() + if args.indexmap_path is not None: + indexmap_path=Path(get_args().indexmap_path).resolve() + indexmap_path.mkdir(parents=True, exist_ok=True) + indexmap_filename = indexmap_path/Path(indexmap_filename).name # Build the indexed mapping if not exist. if mpu.get_data_parallel_rank() == 0 and \ diff --git a/megatron/training.py b/megatron/training.py index 46cab8841d3..2b697b1a78d 100644 --- a/megatron/training.py +++ b/megatron/training.py @@ -617,7 +617,10 @@ def add_to_logging(name): if args.log_timers_to_tensorboard: writer.add_scalar('iteration-time', elapsed_time_per_iteration, iteration) - log_string = ' iteration {:8d}/{:8d} |'.format( + log_string = '' + if args.name is not None: + log_string += ' {} |'.format(args.name) + log_string += ' iteration {:8d}/{:8d} |'.format( iteration, args.train_iters) log_string += ' consumed samples: {:12d} |'.format( args.consumed_train_samples) @@ -657,8 +660,7 @@ def add_to_logging(name): log_string += f' memory: {torch.cuda.memory_allocated():,} |' log_string += f' max memory: {torch.cuda.max_memory_allocated():,} |' - log_string += f' reserved: {torch.cuda.memory_reserved():,} |' - log_string += f' max reserved: {torch.cuda.memory_reserved():,} |' + log_string += f' max reserved: {torch.cuda.max_memory_reserved():,}' torch.cuda.reset_peak_memory_stats() total_loss_dict[advanced_iters_key] = 0 total_loss_dict[skipped_iters_key] = 0 From 80604931d8f9361cc0ae09c288e930992153826e Mon Sep 17 00:00:00 2001 From: Joel Lamy-Poirier Date: Fri, 25 Mar 2022 21:04:09 -0400 Subject: [PATCH 11/13] fixes --- megatron/data/dataset_utils.py | 2 +- megatron/data/gpt_dataset.py | 15 +++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/megatron/data/dataset_utils.py b/megatron/data/dataset_utils.py index 381fe3fe111..a7571abcb1b 100644 --- a/megatron/data/dataset_utils.py +++ b/megatron/data/dataset_utils.py @@ -665,7 +665,7 @@ def get_samples_mapping(indexed_dataset, args=get_args() if args.indexmap_path is not None: - indexmap_path=Path(get_args().indexmap_path).resolve() + indexmap_path=Path(args.indexmap_path).resolve() indexmap_path.mkdir(parents=True, exist_ok=True) indexmap_filename = indexmap_path/Path(indexmap_filename).name diff --git a/megatron/data/gpt_dataset.py b/megatron/data/gpt_dataset.py index be682142913..fa1efb6ace2 100644 --- a/megatron/data/gpt_dataset.py +++ b/megatron/data/gpt_dataset.py @@ -23,7 +23,7 @@ import torch import torch.distributed -from megatron import mpu, print_rank_0 +from megatron import mpu, print_rank_0, get_args from megatron.data.blendable_dataset import BlendableDataset from megatron.data.dataset_utils import get_datasets_weights_and_num_samples from megatron.data.dataset_utils import get_train_valid_test_split_ @@ -204,7 +204,7 @@ def _build_index_mappings(name, data_prefix, documents, sizes, np_rng = np.random.RandomState(seed=seed) # Filename of the index mappings. - _filename = Path(data_prefix).name + _filename = data_prefix _filename += '_{}_indexmap'.format(name) _filename += '_{}ns'.format(num_samples) _filename += '_{}sl'.format(seq_length) @@ -213,10 +213,13 @@ def _build_index_mappings(name, data_prefix, documents, sizes, sample_idx_filename = _filename + '_sample_idx.npy' shuffle_idx_filename = _filename + '_shuffle_idx.npy' - output_folder = Path(get_args().save) - doc_idx_filename = output_folder.joinpath(doc_idx_filename).resolve() - sample_idx_filename = output_folder.joinpath(sample_idx_filename).resolve() - shuffle_idx_filename = output_folder.joinpath(shuffle_idx_filename).resolve() + args=get_args() + if args.indexmap_path is not None: + indexmap_path=Path(args.indexmap_path).resolve() + indexmap_path.mkdir(parents=True, exist_ok=True) + doc_idx_filename = indexmap_path/Path(doc_idx_filename).name + sample_idx_filename = indexmap_path/Path(sample_idx_filename).name + shuffle_idx_filename = indexmap_path/Path(shuffle_idx_filename).name # Build the indexed mapping if not exist. if torch.distributed.get_rank() == 0: From bb2b76376e254a09c78f31ad4a63ec2478fa477b Mon Sep 17 00:00:00 2001 From: Joel Lamy-Poirier Date: Fri, 8 Jul 2022 12:03:57 -0400 Subject: [PATCH 12/13] Fix average time --- megatron/training.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/megatron/training.py b/megatron/training.py index 2b697b1a78d..7a63f5a454c 100644 --- a/megatron/training.py +++ b/megatron/training.py @@ -655,7 +655,7 @@ def add_to_logging(name): else: num_iters+=1 log_string += f' batch time: {1000*(current_time-last_time):.2f} ms |' - log_string += f' avg time: {1000*(current_time-first_time)/num_iters:.2f} ms |' + log_string += f' avg time: {1000*(current_time-first_time)/num_iters/args.log_interval:.2f} ms |' last_time=current_time log_string += f' memory: {torch.cuda.memory_allocated():,} |' From d9b39812131e51be26498a0feb25c80aa875f53c Mon Sep 17 00:00:00 2001 From: Joel Lamy-Poirier Date: Mon, 22 Aug 2022 12:56:22 -0400 Subject: [PATCH 13/13] Fix time --- megatron/training.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/megatron/training.py b/megatron/training.py index 7a63f5a454c..978741aa547 100644 --- a/megatron/training.py +++ b/megatron/training.py @@ -654,7 +654,7 @@ def add_to_logging(name): num_iters=0 else: num_iters+=1 - log_string += f' batch time: {1000*(current_time-last_time):.2f} ms |' + log_string += f' batch time: {1000*(current_time-last_time)/args.log_interval:.2f} ms |' log_string += f' avg time: {1000*(current_time-first_time)/num_iters/args.log_interval:.2f} ms |' last_time=current_time