diff --git a/Season1.step_into_chatgpt/5.Parallel/auto_parallel/resnet.py b/Season1.step_into_chatgpt/5.Parallel/auto_parallel/resnet.py index 1155bcb..181db95 100644 --- a/Season1.step_into_chatgpt/5.Parallel/auto_parallel/resnet.py +++ b/Season1.step_into_chatgpt/5.Parallel/auto_parallel/resnet.py @@ -19,7 +19,7 @@ import mindspore.nn as nn import mindspore as ms import mindspore.ops as ops - +ms.set_context(mode=ms.GRAPH_MODE, device_target="Ascend") def weight_variable_0(shape): """weight_variable_0""" diff --git a/Season1.step_into_chatgpt/5.Parallel/auto_parallel/resnet50_distributed_training.py b/Season1.step_into_chatgpt/5.Parallel/auto_parallel/resnet50_distributed_training.py index b777cb8..883dc0b 100644 --- a/Season1.step_into_chatgpt/5.Parallel/auto_parallel/resnet50_distributed_training.py +++ b/Season1.step_into_chatgpt/5.Parallel/auto_parallel/resnet50_distributed_training.py @@ -27,8 +27,8 @@ import mindspore as ms from resnet import resnet50 -ms.set_context(mode=ms.GRAPH_MODE, device_target="GPU") -init("nccl") +ms.set_context(mode=ms.GRAPH_MODE, device_target="Ascend") +init("hccl") def create_dataset(data_path, repeat_num=1, batch_size=32, rank_id=0, rank_size=1): # pylint: disable=missing-docstring resize_height = 224 diff --git a/Season1.step_into_chatgpt/5.Parallel/operator_parallel/train.py b/Season1.step_into_chatgpt/5.Parallel/operator_parallel/train.py index f9e9134..4b3dbe0 100644 --- a/Season1.step_into_chatgpt/5.Parallel/operator_parallel/train.py +++ b/Season1.step_into_chatgpt/5.Parallel/operator_parallel/train.py @@ -1,7 +1,5 @@ -"""Operator Parallel Example""" -import sys +import os # 用于获取环境变量 import numpy as np - import mindspore as ms from mindspore.nn import Cell, Momentum from mindspore.ops import operations as ops @@ -11,12 +9,12 @@ import mindspore.communication as D from mindspore.common.initializer import initializer +# 获取设备数量的环境变量,如果没有设置,默认为1 +devices = int(os.getenv('DEVICE_NUM', 2)) # 从环境变量 DEVICE_NUM 获取设备数 -args = sys.argv -devices = int(args[1]) - -if devices < 1 and devices > 8: - print('device_num error') +# 验证设备数的有效性 +if devices < 1 or devices > 8: + print('Invalid number of devices. Exiting...') exit(0) step_per_epoch = 4 @@ -27,27 +25,26 @@ def generate(): yield inputs return generate - class Net(Cell): - """define net""" + """定义网络""" def __init__(self): super().__init__() - self.matmul = ops.MatMul().shard(((1, 2), (2, 1))) - self.weight = ms.Parameter(initializer("normal", [32, 16]), "w1") - self.relu = ops.ReLU().shard(((2, 1),)) + self.matmul = ops.MatMul().shard(((1, 2), (2, 1))) # 分配计算 + self.weight = ms.Parameter(initializer("normal", [32, 16]), "w1") # 权重初始化 + self.relu = ops.ReLU().shard(((2, 1),)) # ReLU 激活函数 def construct(self, x): - out = self.matmul(x, self.weight) - out = self.relu(out) + out = self.matmul(x, self.weight) # 矩阵乘法 + out = self.relu(out) # 激活函数 return out - if __name__ == "__main__": - ms.set_context(mode=ms.GRAPH_MODE) + ms.set_context(mode=ms.GRAPH_MODE, device_target="Ascend") # 设置设备为 Ascend NPU D.init() - rank = D.get_rank() - ms.set_auto_parallel_context(parallel_mode="semi_auto_parallel", device_num=devices, full_batch=True) + rank = D.get_rank() # 获取当前进程的 rank + ms.set_auto_parallel_context(parallel_mode="semi_auto_parallel", device_num=devices, full_batch=True) # 设置并行上下文 + # 数据集生成 np.random.seed(1) input_data = np.random.rand(16, 32).astype(np.float32) label_data = np.random.rand(16, 16).astype(np.float32) @@ -55,10 +52,12 @@ def construct(self, x): net = Net() + # 回调函数 callback = [train.LossMonitor(), train.ModelCheckpoint(directory="{}".format(rank))] dataset = ds.GeneratorDataset(fake_dataset, ["input", "label"]) loss = SoftmaxCrossEntropyWithLogits() + # 优化器 learning_rate = 0.001 momentum = 0.1 epoch_size = 5 diff --git a/Season1.step_into_chatgpt/5.Parallel/optimizer_parallel/fusion_example.py b/Season1.step_into_chatgpt/5.Parallel/optimizer_parallel/fusion_example.py index 96e92bc..c185ba7 100644 --- a/Season1.step_into_chatgpt/5.Parallel/optimizer_parallel/fusion_example.py +++ b/Season1.step_into_chatgpt/5.Parallel/optimizer_parallel/fusion_example.py @@ -1,38 +1,50 @@ -"""Parallel Optimizer Fusion Example""" from mindspore.communication import init from mindspore import nn import mindspore as ms + +# Initialize communication for distributed training init() -ms.set_context(mode=ms.GRAPH_MODE) -ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, enable_parallel_optimizer=True) +# Set MindSpore context to GRAPH_MODE for better performance +ms.set_context(mode=ms.GRAPH_MODE, device_target="Ascend") # Use Ascend for NPU + +# Enable parallel optimizer and set parallel mode +ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, + enable_parallel_optimizer=True, device_num=2) class DenseLayer(nn.Cell): - """A base layer with two dense layer""" + """A base layer with two dense layers""" def __init__(self): super().__init__() self.input_mapping = nn.Dense(10, 10) self.output_mapping = nn.Dense(10, 10) + def construct(self, x): x = self.input_mapping(x) return self.output_mapping(x) class Net(nn.Cell): - """An network with many dense layers""" + """A network with many dense layers""" def __init__(self): super().__init__() self.layer1 = DenseLayer() self.layer2 = DenseLayer() self.layer3 = DenseLayer() + + # Set communication fusion for each layer self.layer1.set_comm_fusion(0) self.layer2.set_comm_fusion(1) self.layer3.set_comm_fusion(2) + def construct(self, x): x = self.layer1(x) x = self.layer2(x) x = self.layer3(x) return x +# Instantiate the model net = Net() + +# Print the communication fusion id for each trainable parameter for item in net.trainable_params(): print(f"The parameter {item.name}'s fusion id is {item.comm_fusion}") diff --git a/Season1.step_into_chatgpt/5.Parallel/parallel_slides.pptx b/Season1.step_into_chatgpt/5.Parallel/parallel_slides.pptx new file mode 100644 index 0000000..cb1da6a Binary files /dev/null and b/Season1.step_into_chatgpt/5.Parallel/parallel_slides.pptx differ diff --git a/Season1.step_into_chatgpt/5.Parallel/pipeline_parallel/resnet.py b/Season1.step_into_chatgpt/5.Parallel/pipeline_parallel/resnet.py index 1155bcb..8b2871c 100644 --- a/Season1.step_into_chatgpt/5.Parallel/pipeline_parallel/resnet.py +++ b/Season1.step_into_chatgpt/5.Parallel/pipeline_parallel/resnet.py @@ -19,7 +19,7 @@ import mindspore.nn as nn import mindspore as ms import mindspore.ops as ops - +from mindspore.communication import init def weight_variable_0(shape): """weight_variable_0""" @@ -319,7 +319,12 @@ def construct(self, x): x = self.squeeze(x) x = self.fc(x) return x +# Set context for Ascend NPU +ms.set_context(mode=ms.GRAPH_MODE, device_target="Ascend") +ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, enable_parallel_optimizer=True, device_num=2) +# Initialize communication (for distributed training, if necessary) +init() def resnet50(batch_size, num_classes): """create resnet50""" diff --git a/Season1.step_into_chatgpt/5.Parallel/pipeline_parallel/resnet50_distributed_training_pipeline.py b/Season1.step_into_chatgpt/5.Parallel/pipeline_parallel/resnet50_distributed_training_pipeline.py index d9a1bf1..9c7f56d 100644 --- a/Season1.step_into_chatgpt/5.Parallel/pipeline_parallel/resnet50_distributed_training_pipeline.py +++ b/Season1.step_into_chatgpt/5.Parallel/pipeline_parallel/resnet50_distributed_training_pipeline.py @@ -28,7 +28,8 @@ from resnet import resnet50 device_target = os.getenv('DEVICE_TARGET') -ms.set_context(mode=ms.GRAPH_MODE, device_target=device_target) +ms.set_context(mode=ms.GRAPH_MODE, device_target='Ascend') + if device_target == "Ascend": device_id = int(os.getenv('DEVICE_ID')) ms.set_context(device_id=device_id) diff --git a/Season1.step_into_chatgpt/5.Parallel/recompute/resnet.py b/Season1.step_into_chatgpt/5.Parallel/recompute/resnet.py index dd9af90..0a4e76f 100644 --- a/Season1.step_into_chatgpt/5.Parallel/recompute/resnet.py +++ b/Season1.step_into_chatgpt/5.Parallel/recompute/resnet.py @@ -1,59 +1,36 @@ -# Copyright 2020 Huawei Technologies Co., Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================ -'''resnet -This sample code is applicable to Ascend. -''' -import numpy as np -import mindspore.nn as nn import mindspore as ms +import mindspore.nn as nn +import numpy as np import mindspore.ops as ops +# 设置设备目标为 Ascend + +ms.set_context(mode=ms.GRAPH_MODE, device_target="Ascend") + def weight_variable_0(shape): """weight_variable_0""" zeros = np.zeros(shape).astype(np.float32) return ms.Tensor(zeros) - def weight_variable_1(shape): """weight_variable_1""" ones = np.ones(shape).astype(np.float32) return ms.Tensor(ones) - +# Define Convolutions and BatchNorm def conv3x3(in_channels, out_channels, stride=1, padding=0): """3x3 convolution """ return nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=padding, weight_init='XavierUniform', has_bias=False, pad_mode="same") - def conv1x1(in_channels, out_channels, stride=1, padding=0): """1x1 convolution""" return nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, padding=padding, weight_init='XavierUniform', has_bias=False, pad_mode="same") - -def conv7x7(in_channels, out_channels, stride=1, padding=0): - """1x1 convolution""" - return nn.Conv2d(in_channels, out_channels, - kernel_size=7, stride=stride, padding=padding, weight_init='XavierUniform', - has_bias=False, pad_mode="same") - - def bn_with_initialize(out_channels): """bn_with_initialize""" shape = (out_channels) @@ -64,34 +41,12 @@ def bn_with_initialize(out_channels): beta_init=beta, moving_mean_init=mean, moving_var_init=var) return bn - -def bn_with_initialize_last(out_channels): - """bn_with_initialize_last""" - shape = (out_channels) - mean = weight_variable_0(shape) - var = weight_variable_1(shape) - beta = weight_variable_0(shape) - bn = nn.BatchNorm2d(out_channels, momentum=0.99, eps=0.00001, gamma_init='Uniform', - beta_init=beta, moving_mean_init=mean, moving_var_init=var) - return bn - - -def fc_with_initialize(input_channels, out_channels): - """fc_with_initialize""" - return nn.Dense(input_channels, out_channels, weight_init='XavierUniform', bias_init='Uniform') - - class ResidualBlock(nn.Cell): """ResidualBlock""" expansion = 4 - def __init__(self, - in_channels, - out_channels, - stride=1): - """init block""" + def __init__(self, in_channels, out_channels, stride=1): super(ResidualBlock, self).__init__() - out_chls = out_channels // self.expansion self.conv1 = conv1x1(in_channels, out_chls, stride=stride, padding=0) self.bn1 = bn_with_initialize(out_chls) @@ -100,13 +55,12 @@ def __init__(self, self.bn2 = bn_with_initialize(out_chls) self.conv3 = conv1x1(out_chls, out_channels, stride=1, padding=0) - self.bn3 = bn_with_initialize_last(out_channels) + self.bn3 = bn_with_initialize(out_channels) self.relu = ops.ReLU() self.add = ops.Add() def construct(self, x): - """construct""" identity = x out = self.conv1(x) @@ -125,155 +79,15 @@ def construct(self, x): return out - -class ResidualBlockWithDown(nn.Cell): - """ResidualBlockWithDown""" - expansion = 4 - - def __init__(self, - in_channels, - out_channels, - stride=1, - down_sample=False): - """init block with down""" - super(ResidualBlockWithDown, self).__init__() - - out_chls = out_channels // self.expansion - self.conv1 = conv1x1(in_channels, out_chls, stride=stride, padding=0) - self.bn1 = bn_with_initialize(out_chls) - - self.conv2 = conv3x3(out_chls, out_chls, stride=1, padding=0) - self.bn2 = bn_with_initialize(out_chls) - - self.conv3 = conv1x1(out_chls, out_channels, stride=1, padding=0) - self.bn3 = bn_with_initialize_last(out_channels) - - self.relu = ops.ReLU() - self.down_sample = down_sample - - self.conv_down_sample = conv1x1(in_channels, out_channels, stride=stride, padding=0) - self.bn_down_sample = bn_with_initialize(out_channels) - self.add = ops.Add() - - def construct(self, x): - """construct""" - identity = x - - out = self.conv1(x) - out = self.bn1(out) - out = self.relu(out) - - out = self.conv2(out) - out = self.bn2(out) - out = self.relu(out) - - out = self.conv3(out) - out = self.bn3(out) - - identity = self.conv_down_sample(identity) - identity = self.bn_down_sample(identity) - - out = self.add(out, identity) - out = self.relu(out) - - return out - - -class MakeLayer0(nn.Cell): - """MakeLayer0""" - - def __init__(self, block, in_channels, out_channels, stride): - """init""" - super(MakeLayer0, self).__init__() - self.a = ResidualBlockWithDown(in_channels, out_channels, stride=1, down_sample=True) - self.b = block(out_channels, out_channels, stride=stride) - self.c = block(out_channels, out_channels, stride=1) - - def construct(self, x): - """construct""" - x = self.a(x) - x = self.b(x) - x = self.c(x) - - return x - - -class MakeLayer1(nn.Cell): - """MakeLayer1""" - - def __init__(self, block, in_channels, out_channels, stride): - """init""" - super(MakeLayer1, self).__init__() - self.a = ResidualBlockWithDown(in_channels, out_channels, stride=stride, down_sample=True) - self.b = block(out_channels, out_channels, stride=1) - self.c = block(out_channels, out_channels, stride=1) - self.d = block(out_channels, out_channels, stride=1) - - def construct(self, x): - """construct""" - x = self.a(x) - x = self.b(x) - x = self.c(x) - x = self.d(x) - - return x - - -class MakeLayer2(nn.Cell): - """MakeLayer2""" - - def __init__(self, block, in_channels, out_channels, stride): - """init""" - super(MakeLayer2, self).__init__() - self.a = ResidualBlockWithDown(in_channels, out_channels, stride=stride, down_sample=True) - self.b = block(out_channels, out_channels, stride=1) - self.c = block(out_channels, out_channels, stride=1) - self.d = block(out_channels, out_channels, stride=1) - self.e = block(out_channels, out_channels, stride=1) - self.f = block(out_channels, out_channels, stride=1) - - def construct(self, x): - """construct""" - x = self.a(x) - x = self.b(x) - x = self.c(x) - x = self.d(x) - x = self.e(x) - x = self.f(x) - - return x - - -class MakeLayer3(nn.Cell): - """MakeLayer3""" - - def __init__(self, block, in_channels, out_channels, stride): - """init""" - super(MakeLayer3, self).__init__() - self.a = ResidualBlockWithDown(in_channels, out_channels, stride=stride, down_sample=True) - self.b = block(out_channels, out_channels, stride=1) - self.c = block(out_channels, out_channels, stride=1) - - def construct(self, x): - """construct""" - x = self.a(x) - x = self.b(x) - x = self.c(x) - - return x - - class ResNet(nn.Cell): """ResNet""" def __init__(self, block, num_classes=100, batch_size=32): - """init""" super(ResNet, self).__init__() self.batch_size = batch_size self.num_classes = num_classes - self.conv1 = conv7x7(3, 64, stride=2, padding=0) - + self.conv1 = conv1x1(3, 64, stride=2, padding=0) self.bn1 = bn_with_initialize(64) self.relu = ops.ReLU() self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode="same") @@ -283,17 +97,11 @@ def __init__(self, block, num_classes=100, batch_size=32): self.layer3 = MakeLayer2(block, in_channels=512, out_channels=1024, stride=2) self.layer4 = MakeLayer3(block, in_channels=1024, out_channels=2048, stride=2) - self.layer1.recompute() - self.layer2.recompute() - self.layer3.recompute() - self.layer4.recompute() - self.pool = ops.ReduceMean(keep_dims=True) self.squeeze = ops.Squeeze(axis=(2, 3)) self.fc = fc_with_initialize(512 * block.expansion, num_classes) def construct(self, x): - """construct""" x = self.conv1(x) x = self.bn1(x) x = self.relu(x) @@ -309,7 +117,6 @@ def construct(self, x): x = self.fc(x) return x - def resnet50(batch_size, num_classes): """create resnet50""" return ResNet(ResidualBlock, num_classes, batch_size)