diff --git a/cuda_core/cuda/core/_graph.py b/cuda_core/cuda/core/_graph.py index b6e266a9a8..fdc929079b 100644 --- a/cuda_core/cuda/core/_graph.py +++ b/cuda_core/cuda/core/_graph.py @@ -746,6 +746,18 @@ def close(self): """Destroy the graph.""" self._mnff.close() + @property + def handle(self) -> driver.CUgraphExec: + """Return the underlying ``CUgraphExec`` object. + + .. caution:: + + This handle is a Python object. To get the memory address of the underlying C + handle, call ``int()`` on the returned object. + + """ + return self._mnff.graph + def update(self, builder: GraphBuilder): """Update the graph using new build configuration from the builder. diff --git a/cuda_core/tests/graph/test_advanced.py b/cuda_core/tests/graph/test_advanced.py new file mode 100644 index 0000000000..68b6dbd0d2 --- /dev/null +++ b/cuda_core/tests/graph/test_advanced.py @@ -0,0 +1,186 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: LicenseRef-NVIDIA-SOFTWARE-LICENSE + +"""Advanced graph feature tests (child graphs, update, stream lifetime).""" + +import numpy as np +import pytest +from cuda.core import Device, LaunchConfig, LegacyPinnedMemoryResource, launch +from helpers.graph_kernels import compile_common_kernels, compile_conditional_kernels + + +@pytest.mark.skipif(tuple(int(i) for i in np.__version__.split(".")[:2]) < (2, 1), reason="need numpy 2.1.0+") +def test_graph_child_graph(init_cuda): + mod = compile_common_kernels() + add_one = mod.get_kernel("add_one") + + # Allocate memory + launch_stream = Device().create_stream() + mr = LegacyPinnedMemoryResource() + b = mr.allocate(8) + arr = np.from_dlpack(b).view(np.int32) + arr[0] = 0 + arr[1] = 0 + + # Capture the child graph + gb_child = Device().create_graph_builder().begin_building() + launch(gb_child, LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) + launch(gb_child, LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) + launch(gb_child, LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) + gb_child.end_building() + + # Capture the parent graph + gb_parent = Device().create_graph_builder().begin_building() + launch(gb_parent, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) + + ## Add child + try: + gb_parent.add_child(gb_child) + except NotImplementedError as e: + with pytest.raises( + NotImplementedError, + match="^Launching child graphs is not implemented for versions older than CUDA 12", + ): + raise e + gb_parent.end_building() + b.close() + pytest.skip("Launching child graphs is not implemented for versions older than CUDA 12") + + launch(gb_parent, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) + graph = gb_parent.end_building().complete() + + # Parent updates first value, child updates second value + assert arr[0] == 0 + assert arr[1] == 0 + graph.launch(launch_stream) + launch_stream.sync() + assert arr[0] == 2 + assert arr[1] == 3 + + # Close the memory resource now because the garbage collected might + # de-allocate it during the next graph builder process + b.close() + + +@pytest.mark.skipif(tuple(int(i) for i in np.__version__.split(".")[:2]) < (2, 1), reason="need numpy 2.1.0+") +def test_graph_update(init_cuda): + mod = compile_conditional_kernels(int) + add_one = mod.get_kernel("add_one") + + # Allocate memory + launch_stream = Device().create_stream() + mr = LegacyPinnedMemoryResource() + b = mr.allocate(12) + arr = np.from_dlpack(b).view(np.int32) + arr[0] = 0 + arr[1] = 0 + arr[2] = 0 + + def build_graph(condition_value): + # Begin capture + gb = Device().create_graph_builder().begin_building() + + # Add Node A (sets condition) + handle = gb.create_conditional_handle(default_value=condition_value) + + # Add Node B (while condition) + try: + gb_case = list(gb.switch(handle, 3)) + except Exception as e: + with pytest.raises(RuntimeError, match="^(Driver|Binding) version"): + raise e + gb.end_building() + raise e + + ## Case 0 + gb_case[0].begin_building() + launch(gb_case[0], LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) + launch(gb_case[0], LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) + launch(gb_case[0], LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) + gb_case[0].end_building() + + ## Case 1 + gb_case[1].begin_building() + launch(gb_case[1], LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) + launch(gb_case[1], LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) + launch(gb_case[1], LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) + gb_case[1].end_building() + + ## Case 2 + gb_case[2].begin_building() + launch(gb_case[2], LaunchConfig(grid=1, block=1), add_one, arr[2:].ctypes.data) + launch(gb_case[2], LaunchConfig(grid=1, block=1), add_one, arr[2:].ctypes.data) + launch(gb_case[2], LaunchConfig(grid=1, block=1), add_one, arr[2:].ctypes.data) + gb_case[2].end_building() + + return gb.end_building() + + try: + graph_variants = [build_graph(0), build_graph(1), build_graph(2)] + except Exception as e: + with pytest.raises(RuntimeError, match="^(Driver|Binding) version"): + raise e + b.close() + pytest.skip("Driver does not support conditional switch") + + # Launch the first graph + assert arr[0] == 0 + assert arr[1] == 0 + assert arr[2] == 0 + graph = graph_variants[0].complete() + graph.launch(launch_stream) + launch_stream.sync() + assert arr[0] == 3 + assert arr[1] == 0 + assert arr[2] == 0 + + # Update with second variant and launch again + graph.update(graph_variants[1]) + graph.launch(launch_stream) + launch_stream.sync() + assert arr[0] == 3 + assert arr[1] == 3 + assert arr[2] == 0 + + # Update with third variant and launch again + graph.update(graph_variants[2]) + graph.launch(launch_stream) + launch_stream.sync() + assert arr[0] == 3 + assert arr[1] == 3 + assert arr[2] == 3 + + # Close the memory resource now because the garbage collected might + # de-allocate it during the next graph builder process + b.close() + + +def test_graph_stream_lifetime(init_cuda): + mod = compile_common_kernels() + empty_kernel = mod.get_kernel("empty_kernel") + + # Create simple graph from device + gb = Device().create_graph_builder().begin_building() + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + graph = gb.end_building().complete() + + # Destroy simple graph and builder + gb.close() + graph.close() + + # Create simple graph from stream + stream = Device().create_stream() + gb = stream.create_graph_builder().begin_building() + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + graph = gb.end_building().complete() + + # Destroy simple graph and builder + gb.close() + graph.close() + + # Verify the stream can still launch work + launch(stream, LaunchConfig(grid=1, block=1), empty_kernel) + stream.sync() + + # Destroy the stream + stream.close() diff --git a/cuda_core/tests/graph/test_basic.py b/cuda_core/tests/graph/test_basic.py new file mode 100644 index 0000000000..a1447c6b58 --- /dev/null +++ b/cuda_core/tests/graph/test_basic.py @@ -0,0 +1,164 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: LicenseRef-NVIDIA-SOFTWARE-LICENSE + +"""Basic graph construction and topology tests.""" + +import numpy as np +import pytest +from cuda.core import Device, GraphBuilder, LaunchConfig, LegacyPinnedMemoryResource, launch +from helpers.graph_kernels import compile_common_kernels + + +def test_graph_is_building(init_cuda): + gb = Device().create_graph_builder() + assert gb.is_building is False + gb.begin_building() + assert gb.is_building is True + gb.end_building() + assert gb.is_building is False + + +def test_graph_straight(init_cuda): + mod = compile_common_kernels() + empty_kernel = mod.get_kernel("empty_kernel") + launch_stream = Device().create_stream() + + # Simple linear topology + gb = Device().create_graph_builder().begin_building() + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + graph = gb.end_building().complete() + + # Sanity upload and launch + graph.upload(launch_stream) + graph.launch(launch_stream) + launch_stream.sync() + + +def test_graph_fork_join(init_cuda): + mod = compile_common_kernels() + empty_kernel = mod.get_kernel("empty_kernel") + launch_stream = Device().create_stream() + + # Simple diamond topology + gb = Device().create_graph_builder().begin_building() + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + + with pytest.raises(ValueError, match="^Invalid split count: expecting >= 2, got 1"): + gb.split(1) + + left, right = gb.split(2) + launch(left, LaunchConfig(grid=1, block=1), empty_kernel) + launch(left, LaunchConfig(grid=1, block=1), empty_kernel) + launch(right, LaunchConfig(grid=1, block=1), empty_kernel) + launch(right, LaunchConfig(grid=1, block=1), empty_kernel) + + with pytest.raises(ValueError, match="^Must join with at least two graph builders"): + GraphBuilder.join(left) + + gb = GraphBuilder.join(left, right) + + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + graph = gb.end_building().complete() + + # Sanity upload and launch + graph.upload(launch_stream) + graph.launch(launch_stream) + launch_stream.sync() + + +def test_graph_is_join_required(init_cuda): + mod = compile_common_kernels() + empty_kernel = mod.get_kernel("empty_kernel") + + # Starting builder is always primary + gb = Device().create_graph_builder() + assert gb.is_join_required is False + gb.begin_building() + + # Create root node + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + + # First returned builder is always the original + first_split_builders = gb.split(3) + assert first_split_builders[0] is gb + + # Only the original builder need not join + assert first_split_builders[0].is_join_required is False + for builder in first_split_builders[1:]: + assert builder.is_join_required is True + + # Launch kernel on each split + for builder in first_split_builders: + launch(builder, LaunchConfig(grid=1, block=1), empty_kernel) + + # Splitting on new builder will all require joining + second_split_builders = first_split_builders[-1] + first_split_builders = first_split_builders[0:-1] + second_split_builders = second_split_builders.split(3) + for builder in second_split_builders: + assert builder.is_join_required is True + + # Launch kernel on each second split + for builder in second_split_builders: + launch(builder, LaunchConfig(grid=1, block=1), empty_kernel) + + # Joined builder requires joining if all builder need to join + gb = GraphBuilder.join(*second_split_builders) + assert gb.is_join_required is True + gb = GraphBuilder.join(gb, *first_split_builders) + assert gb.is_join_required is False + + # Create final node + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + gb.end_building().complete() + + +@pytest.mark.skipif(tuple(int(i) for i in np.__version__.split(".")[:2]) < (2, 1), reason="need numpy 2.1.0+") +def test_graph_repeat_capture(init_cuda): + mod = compile_common_kernels() + add_one = mod.get_kernel("add_one") + + # Allocate memory + launch_stream = Device().create_stream() + mr = LegacyPinnedMemoryResource() + b = mr.allocate(4) + arr = np.from_dlpack(b).view(np.int32) + arr[0] = 0 + + # Launch the graph once + gb = launch_stream.create_graph_builder().begin_building() + launch(gb, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) + graph = gb.end_building().complete() + + # Run the graph once + graph.launch(launch_stream) + launch_stream.sync() + assert arr[0] == 1 + + # Continue capturing to extend the graph + with pytest.raises(RuntimeError, match="^Cannot resume building after building has ended."): + gb.begin_building() + + # Graph can be re-launched + graph.launch(launch_stream) + graph.launch(launch_stream) + graph.launch(launch_stream) + launch_stream.sync() + assert arr[0] == 4 + + # Close the memory resource now because the garbage collected might + # de-allocate it during the next graph builder process + b.close() + + +def test_graph_capture_errors(init_cuda): + gb = Device().create_graph_builder() + with pytest.raises(RuntimeError, match="^Graph has not finished building."): + gb.complete() + + gb.begin_building() + with pytest.raises(RuntimeError, match="^Graph has not finished building."): + gb.complete() + gb.end_building().complete() diff --git a/cuda_core/tests/test_graph_mem.py b/cuda_core/tests/graph/test_capture_alloc.py similarity index 99% rename from cuda_core/tests/test_graph_mem.py rename to cuda_core/tests/graph/test_capture_alloc.py index 5159fd2b2b..06bb6445e1 100644 --- a/cuda_core/tests/test_graph_mem.py +++ b/cuda_core/tests/graph/test_capture_alloc.py @@ -2,6 +2,8 @@ # # SPDX-License-Identifier: LicenseRef-NVIDIA-SOFTWARE-LICENSE +"""Graph memory resource tests.""" + import pytest from cuda.core import ( Device, diff --git a/cuda_core/tests/graph/test_conditional.py b/cuda_core/tests/graph/test_conditional.py new file mode 100644 index 0000000000..25975b5e60 --- /dev/null +++ b/cuda_core/tests/graph/test_conditional.py @@ -0,0 +1,289 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: LicenseRef-NVIDIA-SOFTWARE-LICENSE + +"""Conditional graph node tests (if, if-else, switch, while).""" + +import ctypes + +import numpy as np +import pytest +from cuda.core import Device, GraphBuilder, LaunchConfig, LegacyPinnedMemoryResource, launch +from helpers.graph_kernels import compile_conditional_kernels + + +@pytest.mark.parametrize( + "condition_value", [True, False, ctypes.c_bool(True), ctypes.c_bool(False), np.bool_(True), np.bool_(False), 1, 0] +) +@pytest.mark.skipif(tuple(int(i) for i in np.__version__.split(".")[:2]) < (2, 1), reason="need numpy 2.1.0+") +def test_graph_conditional_if(init_cuda, condition_value): + mod = compile_conditional_kernels(type(condition_value)) + add_one = mod.get_kernel("add_one") + set_handle = mod.get_kernel("set_handle") + + # Allocate memory + launch_stream = Device().create_stream() + mr = LegacyPinnedMemoryResource() + b = mr.allocate(8) + arr = np.from_dlpack(b).view(np.int32) + arr[0] = 0 + arr[1] = 0 + + # Begin capture + gb = Device().create_graph_builder().begin_building() + + # Add Node A (sets condition) + try: + handle = gb.create_conditional_handle() + except RuntimeError as e: + with pytest.raises(RuntimeError, match="^Driver version"): + raise e + gb.end_building() + b.close() + pytest.skip("Driver does not support conditional handle") + launch(gb, LaunchConfig(grid=1, block=1), set_handle, handle, condition_value) + + # Add Node B (if condition) + gb_if = gb.if_cond(handle).begin_building() + launch(gb_if, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) + gb_if_0, gb_if_1 = gb_if.split(2) + launch(gb_if_0, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) + launch(gb_if_1, LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) + gb_if = GraphBuilder.join(gb_if_0, gb_if_1) + launch(gb_if, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) + gb_if.end_building() + + # Add Node C (...) + # Note: We use the original graph to continue building past the cond node + launch(gb, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) + + graph = gb.end_building().complete() + + # Left path increments first value, right path increments second value + assert arr[0] == 0 + assert arr[1] == 0 + graph.launch(launch_stream) + launch_stream.sync() + if condition_value: + assert arr[0] == 4 + assert arr[1] == 1 + else: + assert arr[0] == 1 + assert arr[1] == 0 + + # Close the memory resource now because the garbage collected might + # de-allocate it during the next graph builder process + b.close() + + +@pytest.mark.parametrize( + "condition_value", [True, False, ctypes.c_bool(True), ctypes.c_bool(False), np.bool_(True), np.bool_(False), 1, 0] +) +@pytest.mark.skipif(tuple(int(i) for i in np.__version__.split(".")[:2]) < (2, 1), reason="need numpy 2.1.0+") +def test_graph_conditional_if_else(init_cuda, condition_value): + mod = compile_conditional_kernels(type(condition_value)) + add_one = mod.get_kernel("add_one") + set_handle = mod.get_kernel("set_handle") + + # Allocate memory + launch_stream = Device().create_stream() + mr = LegacyPinnedMemoryResource() + b = mr.allocate(8) + arr = np.from_dlpack(b).view(np.int32) + arr[0] = 0 + arr[1] = 0 + + # Begin capture + gb = Device().create_graph_builder().begin_building() + + # Add Node A (sets condition) + handle = gb.create_conditional_handle() + launch(gb, LaunchConfig(grid=1, block=1), set_handle, handle, condition_value) + + # Add Node B (if condition) + try: + gb_if, gb_else = gb.if_else(handle) + except RuntimeError as e: + with pytest.raises(RuntimeError, match="^(Driver|Binding) version"): + raise e + gb.end_building() + b.close() + pytest.skip("Driver does not support conditional if-else") + + ## IF nodes + gb_if = gb_if.begin_building() + launch(gb_if, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) + gb_if_0, gb_if_1 = gb_if.split(2) + launch(gb_if_0, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) + launch(gb_if_1, LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) + gb_if = GraphBuilder.join(gb_if_0, gb_if_1) + launch(gb_if, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) + gb_if.end_building() + + ## ELSE nodes + gb_else = gb_else.begin_building() + launch(gb_else, LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) + launch(gb_else, LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) + launch(gb_else, LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) + gb_else.end_building() + + # Add Node C (...) + # Note: We use the original graph to continue building past the cond node + launch(gb, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) + + graph = gb.end_building().complete() + + # True condition increments both values, while False increments only second value + assert arr[0] == 0 + assert arr[1] == 0 + graph.launch(launch_stream) + launch_stream.sync() + if condition_value: + assert arr[0] == 4 + assert arr[1] == 1 + else: + assert arr[0] == 1 + assert arr[1] == 3 + + # Close the memory resource now because the garbage collected might + # de-allocate it during the next graph builder process + b.close() + + +@pytest.mark.parametrize("condition_value", [0, 1, 2, 3]) +@pytest.mark.skipif(tuple(int(i) for i in np.__version__.split(".")[:2]) < (2, 1), reason="need numpy 2.1.0+") +def test_graph_conditional_switch(init_cuda, condition_value): + mod = compile_conditional_kernels(type(condition_value)) + add_one = mod.get_kernel("add_one") + set_handle = mod.get_kernel("set_handle") + + # Allocate memory + launch_stream = Device().create_stream() + mr = LegacyPinnedMemoryResource() + b = mr.allocate(12) + arr = np.from_dlpack(b).view(np.int32) + arr[0] = 0 + arr[1] = 0 + arr[2] = 0 + + # Begin capture + gb = Device().create_graph_builder().begin_building() + + # Add Node A (sets condition) + handle = gb.create_conditional_handle() + launch(gb, LaunchConfig(grid=1, block=1), set_handle, handle, condition_value) + + # Add Node B (while condition) + try: + gb_case = list(gb.switch(handle, 3)) + except RuntimeError as e: + with pytest.raises(RuntimeError, match="^(Driver|Binding) version"): + raise e + gb.end_building() + b.close() + pytest.skip("Driver does not support conditional switch") + + ## Case 0 + gb_case[0].begin_building() + launch(gb_case[0], LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) + launch(gb_case[0], LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) + launch(gb_case[0], LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) + gb_case[0].end_building() + + ## Case 1 + gb_case[1].begin_building() + launch(gb_case[1], LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) + gb_case_1_left, gb_case_1_right = gb_case[1].split(2) + launch(gb_case_1_left, LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) + launch(gb_case_1_right, LaunchConfig(grid=1, block=1), add_one, arr[2:].ctypes.data) + gb_case[1] = GraphBuilder.join(gb_case_1_left, gb_case_1_right) + gb_case[1].end_building() + + ## Case 2 + gb_case[2].begin_building() + launch(gb_case[2], LaunchConfig(grid=1, block=1), add_one, arr[2:].ctypes.data) + launch(gb_case[2], LaunchConfig(grid=1, block=1), add_one, arr[2:].ctypes.data) + launch(gb_case[2], LaunchConfig(grid=1, block=1), add_one, arr[2:].ctypes.data) + gb_case[2].end_building() + + # Add Node C (...) + # Note: We use the original graph to continue building past the cond node + launch(gb, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) + + graph = gb.end_building().complete() + + # Each case focuses on their own index + assert arr[0] == 0 + assert arr[1] == 0 + assert arr[2] == 0 + graph.launch(launch_stream) + launch_stream.sync() + if condition_value == 0: + assert arr[0] == 4 + assert arr[1] == 0 + assert arr[2] == 0 + elif condition_value == 1: + assert arr[0] == 1 + assert arr[1] == 2 + assert arr[2] == 1 + elif condition_value == 2: + assert arr[0] == 1 + assert arr[1] == 0 + assert arr[2] == 3 + elif condition_value == 3: + # No branch is taken if case index is out of range + assert arr[0] == 1 + assert arr[1] == 0 + assert arr[2] == 0 + + # Close the memory resource now because the garbage collected might + # de-allocate it during the next graph builder process + b.close() + + +@pytest.mark.parametrize("condition_value", [True, False, 1, 0]) +@pytest.mark.skipif(tuple(int(i) for i in np.__version__.split(".")[:2]) < (2, 1), reason="need numpy 2.1.0+") +def test_graph_conditional_while(init_cuda, condition_value): + mod = compile_conditional_kernels(type(condition_value)) + add_one = mod.get_kernel("add_one") + loop_kernel = mod.get_kernel("loop_kernel") + empty_kernel = mod.get_kernel("empty_kernel") + + # Allocate memory + launch_stream = Device().create_stream() + mr = LegacyPinnedMemoryResource() + b = mr.allocate(4) + arr = np.from_dlpack(b).view(np.int32) + arr[0] = 0 + + # Begin capture + gb = Device().create_graph_builder().begin_building() + + # Node A is skipped because we can instead use a non-zero default value + handle = gb.create_conditional_handle(default_value=condition_value) + + # Add Node B (while condition) + gb_while = gb.while_loop(handle) + gb_while.begin_building() + launch(gb_while, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) + launch(gb_while, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) + launch(gb_while, LaunchConfig(grid=1, block=1), loop_kernel, handle) + gb_while.end_building() + + # Add Node C (...) + # Note: We use the original gb to continue building past the cond node + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + + graph = gb.end_building().complete() + + # Default value is used to start the loop + assert arr[0] == 0 + graph.launch(launch_stream) + launch_stream.sync() + if condition_value: + assert arr[0] == 20 + else: + assert arr[0] == 0 + + # Close the memory resource now because the garbage collected might + # de-allocate it during the next graph builder process + b.close() diff --git a/cuda_core/tests/graph/test_device_launch.py b/cuda_core/tests/graph/test_device_launch.py new file mode 100644 index 0000000000..a1a7059c99 --- /dev/null +++ b/cuda_core/tests/graph/test_device_launch.py @@ -0,0 +1,206 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: LicenseRef-NVIDIA-SOFTWARE-LICENSE + +"""Device-side graph launch tests. + +Device-side graph launch allows a kernel running on the GPU to launch a CUDA graph. +This feature requires: +- CUDA 12.0+ +- Hopper architecture (sm_90+) +- The kernel calling cudaGraphLaunch() must itself be launched from within a graph +""" + +import os +import sys + +import numpy as np +import pytest +from cuda.core import ( + Device, + GraphCompleteOptions, + LaunchConfig, + LegacyPinnedMemoryResource, + Linker, + LinkerOptions, + ObjectCode, + Program, + ProgramOptions, + launch, +) + + +def _find_cudadevrt_library(): + """Find the CUDA device runtime static library using CUDA_HOME. + + See https://github.com/NVIDIA/cuda-python/issues/716 for future improvements + to make this discovery more robust via cuda.pathfinder. + + Returns: + Path to libcudadevrt.a (Linux) or cudadevrt.lib (Windows), or None if not found. + """ + cuda_home = os.environ.get("CUDA_HOME") or os.environ.get("CUDA_PATH") + if not cuda_home: + return None + + if sys.platform == "win32": + path = os.path.join(cuda_home, "lib", "x64", "cudadevrt.lib") + else: + # Try lib64 first (common on Linux), fall back to lib + path = os.path.join(cuda_home, "lib64", "libcudadevrt.a") + if not os.path.isfile(path): + path = os.path.join(cuda_home, "lib", "libcudadevrt.a") + + return path if os.path.isfile(path) else None + + +def _get_device_arch(): + """Get the current device's architecture string.""" + return "".join(f"{i}" for i in Device().compute_capability) + + +def _compile_work_kernel(): + """Compile a simple kernel that increments a value.""" + code = """ + extern "C" __global__ void increment(int* value) { + if (threadIdx.x == 0 && blockIdx.x == 0) { + atomicAdd(value, 1); + } + } + """ + arch = _get_device_arch() + opts = ProgramOptions(std="c++17", arch=f"sm_{arch}") + return Program(code, "c++", options=opts).compile("cubin").get_kernel("increment") + + +def _compile_device_launcher_kernel(): + """Compile a kernel that launches a graph from device code. + + This kernel uses cudaGraphLaunch() to launch a graph from device code. + It requires linking with libcudadevrt.a (the CUDA device runtime library). + + Raises pytest.skip if libcudadevrt.a cannot be found. + """ + cudadevrt_path = _find_cudadevrt_library() + if cudadevrt_path is None: + pytest.skip("cudadevrt library not found (set CUDA_HOME or CUDA_PATH)") + + code = """ + extern "C" __global__ void launch_graph_from_device(cudaGraphExec_t graph) { + if (threadIdx.x == 0 && blockIdx.x == 0) { + cudaGraphLaunch(graph, cudaStreamGraphTailLaunch); + } + } + """ + arch = _get_device_arch() + opts = ProgramOptions(std="c++17", arch=f"sm_{arch}", relocatable_device_code=True) + ptx = Program(code, "c++", options=opts).compile("ptx") + + # Link with device runtime library + cudadevrt = ObjectCode.from_library(cudadevrt_path) + + linker = Linker(ptx, cudadevrt, options=LinkerOptions(arch=f"sm_{arch}")) + return linker.link("cubin").get_kernel("launch_graph_from_device") + + +@pytest.mark.skipif( + Device().compute_capability.major < 9, + reason="Device-side graph launch requires Hopper (sm_90+) architecture", +) +@pytest.mark.skipif(tuple(int(i) for i in np.__version__.split(".")[:2]) < (2, 1), reason="need numpy 2.1.0+") +def test_device_launch_basic(init_cuda): + """Test basic device-side graph launch functionality. + + This test verifies that a graph can be launched from device code by: + 1. Creating an inner graph (with device_launch=True) that increments a value + 2. Creating an outer graph that contains a kernel calling cudaGraphLaunch() + 3. Launching the outer graph and verifying the inner graph executed + """ + dev = Device() + dev.set_current() + stream = dev.create_stream() + + # Compile kernels + work_kernel = _compile_work_kernel() + launcher_kernel = _compile_device_launcher_kernel() + + # Allocate and initialize memory + mr = LegacyPinnedMemoryResource() + buf = mr.allocate(4, stream=stream) + arr = np.from_dlpack(buf).view(np.int32) + arr[0] = 0 + stream.sync() + + # Create the inner graph (the graph to be launched from device) + gb_inner = dev.create_graph_builder().begin_building() + launch(gb_inner, LaunchConfig(grid=1, block=1), work_kernel, arr.ctypes.data) + inner_graph = gb_inner.end_building().complete( + options=GraphCompleteOptions(device_launch=True, upload_stream=stream) + ) + stream.sync() + + # Create the outer graph (launches inner graph from device) + inner_graph_handle = int(inner_graph.handle) + gb_outer = dev.create_graph_builder().begin_building() + launch(gb_outer, LaunchConfig(grid=1, block=1), launcher_kernel, inner_graph_handle) + outer_graph = gb_outer.end_building().complete() + + # Launch outer graph (which triggers device-side launch of inner graph) + outer_graph.launch(stream) + stream.sync() + + # Verify result + assert arr[0] == 1, f"Expected 1, got {arr[0]}" + + buf.close() + + +@pytest.mark.skipif( + Device().compute_capability.major < 9, + reason="Device-side graph launch requires Hopper (sm_90+) architecture", +) +@pytest.mark.skipif(tuple(int(i) for i in np.__version__.split(".")[:2]) < (2, 1), reason="need numpy 2.1.0+") +def test_device_launch_multiple(init_cuda): + """Test that device-side graph launch can be executed multiple times. + + This test verifies that both the outer and inner graphs can be reused + for multiple launches. + """ + dev = Device() + dev.set_current() + stream = dev.create_stream() + + # Compile kernels + work_kernel = _compile_work_kernel() + launcher_kernel = _compile_device_launcher_kernel() + + # Allocate and initialize memory + mr = LegacyPinnedMemoryResource() + buf = mr.allocate(4, stream=stream) + arr = np.from_dlpack(buf).view(np.int32) + arr[0] = 0 + stream.sync() + + # Create the inner graph + gb_inner = dev.create_graph_builder().begin_building() + launch(gb_inner, LaunchConfig(grid=1, block=1), work_kernel, arr.ctypes.data) + inner_graph = gb_inner.end_building().complete( + options=GraphCompleteOptions(device_launch=True, upload_stream=stream) + ) + stream.sync() + + # Create the outer graph + inner_graph_handle = int(inner_graph.handle) + gb_outer = dev.create_graph_builder().begin_building() + launch(gb_outer, LaunchConfig(grid=1, block=1), launcher_kernel, inner_graph_handle) + outer_graph = gb_outer.end_building().complete() + + # Launch multiple times + num_launches = 5 + for _ in range(num_launches): + outer_graph.launch(stream) + stream.sync() + + # Verify result + assert arr[0] == num_launches, f"Expected {num_launches}, got {arr[0]}" + + buf.close() diff --git a/cuda_core/tests/graph/test_options.py b/cuda_core/tests/graph/test_options.py new file mode 100644 index 0000000000..3e3f9de1f0 --- /dev/null +++ b/cuda_core/tests/graph/test_options.py @@ -0,0 +1,90 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: LicenseRef-NVIDIA-SOFTWARE-LICENSE + +"""Graph options and build mode tests.""" + +import pytest +from cuda.core import Device, GraphBuilder, GraphCompleteOptions, GraphDebugPrintOptions, LaunchConfig, launch +from helpers.graph_kernels import compile_common_kernels, compile_conditional_kernels + + +def test_graph_dot_print_options(init_cuda, tmp_path): + mod = compile_conditional_kernels(bool) + set_handle = mod.get_kernel("set_handle") + empty_kernel = mod.get_kernel("empty_kernel") + + # Begin capture + gb = Device().create_graph_builder().begin_building() + + # Add Node A (sets condition) + handle = gb.create_conditional_handle() + launch(gb, LaunchConfig(grid=1, block=1), set_handle, handle, False) + + # Add Node B (if condition) + gb_if = gb.if_cond(handle).begin_building() + launch(gb_if, LaunchConfig(grid=1, block=1), empty_kernel) + gb_if_0, gb_if_1 = gb_if.split(2) + launch(gb_if_0, LaunchConfig(grid=1, block=1), empty_kernel) + launch(gb_if_1, LaunchConfig(grid=1, block=1), empty_kernel) + gb_if = GraphBuilder.join(gb_if_0, gb_if_1) + launch(gb_if, LaunchConfig(grid=1, block=1), empty_kernel) + gb_if.end_building() + + # Add Node C (...) + # Note: We use the original graph to continue building past the cond node + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + gb.end_building() + + # Print using all options + path = bytes(str(tmp_path / "vlad.dot"), "utf-8") + options = GraphDebugPrintOptions(**{field: True for field in GraphDebugPrintOptions.__dataclass_fields__}) + gb.debug_dot_print(path, options) + + +def test_graph_complete_options(init_cuda): + mod = compile_common_kernels() + empty_kernel = mod.get_kernel("empty_kernel") + launch_stream = Device().create_stream() + + # Simple linear topology + gb = Device().create_graph_builder().begin_building() + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + gb.end_building() + + options = GraphCompleteOptions(auto_free_on_launch=True) + gb.complete(options).close() + options = GraphCompleteOptions(upload_stream=launch_stream) + gb.complete(options).close() + options = GraphCompleteOptions(device_launch=True) + gb.complete(options).close() + options = GraphCompleteOptions(use_node_priority=True) + gb.complete(options).close() + + +def test_graph_build_mode(init_cuda): + mod = compile_common_kernels() + empty_kernel = mod.get_kernel("empty_kernel") + + # Simple linear topology + gb = Device().create_graph_builder().begin_building(mode="global") + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + gb.end_building() + + gb = Device().create_graph_builder().begin_building(mode="thread_local") + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + gb.end_building() + + gb = Device().create_graph_builder().begin_building(mode="relaxed") + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) + gb.end_building() + + with pytest.raises(ValueError, match="^Unsupported build mode:"): + gb = Device().create_graph_builder().begin_building(mode=None) diff --git a/cuda_core/tests/helpers/graph_kernels.py b/cuda_core/tests/helpers/graph_kernels.py new file mode 100644 index 0000000000..c38f0bafde --- /dev/null +++ b/cuda_core/tests/helpers/graph_kernels.py @@ -0,0 +1,81 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: LicenseRef-NVIDIA-SOFTWARE-LICENSE + +"""Shared kernel compilation helpers for graph tests.""" + +import ctypes + +import numpy as np +import pytest + +try: + from cuda.bindings import nvrtc +except ImportError: + from cuda import nvrtc + +from cuda.core import Device, Program, ProgramOptions +from cuda.core._utils.cuda_utils import NVRTCError, handle_return + + +def compile_common_kernels(): + """Compile basic kernels for graph tests. + + Returns a module with: + - empty_kernel: does nothing + - add_one: increments an int pointer by 1 + """ + code = """ + __global__ void empty_kernel() {} + __global__ void add_one(int *a) { *a += 1; } + """ + arch = "".join(f"{i}" for i in Device().compute_capability) + program_options = ProgramOptions(std="c++17", arch=f"sm_{arch}") + prog = Program(code, code_type="c++", options=program_options) + mod = prog.compile("cubin", name_expressions=("empty_kernel", "add_one")) + return mod + + +def compile_conditional_kernels(cond_type): + """Compile kernels for conditional graph tests. + + Args: + cond_type: The type of the condition value (bool, np.bool_, ctypes.c_bool, or int) + + Returns a module with: + - empty_kernel: does nothing + - add_one: increments an int pointer by 1 + - set_handle: sets a conditional handle value + - loop_kernel: decrements a counter and updates a conditional handle + """ + if cond_type in (bool, np.bool_, ctypes.c_bool): + cond_type_str = "bool" + elif cond_type is int: + cond_type_str = "unsigned int" + else: + raise ValueError("Unsupported cond_type") + + code = """ + extern "C" __device__ __cudart_builtin__ void CUDARTAPI cudaGraphSetConditional(cudaGraphConditionalHandle handle, + unsigned int value); + __global__ void empty_kernel() {} + __global__ void add_one(int *a) { *a += 1; } + __global__ void set_handle(cudaGraphConditionalHandle handle, $cond_type_str value) { + cudaGraphSetConditional(handle, value); + } + __global__ void loop_kernel(cudaGraphConditionalHandle handle) + { + static int count = 10; + cudaGraphSetConditional(handle, --count ? 1 : 0); + } + """.replace("$cond_type_str", cond_type_str) + arch = "".join(f"{i}" for i in Device().compute_capability) + program_options = ProgramOptions(std="c++17", arch=f"sm_{arch}") + prog = Program(code, code_type="c++", options=program_options) + try: + mod = prog.compile("cubin", name_expressions=("empty_kernel", "add_one", "set_handle", "loop_kernel")) + except NVRTCError as e: + with pytest.raises(NVRTCError, match='error: identifier "cudaGraphConditionalHandle" is undefined'): + raise e + nvrtcVersion = handle_return(nvrtc.nvrtcVersion()) + pytest.skip(f"NVRTC version {nvrtcVersion} does not support conditionals") + return mod diff --git a/cuda_core/tests/test_graph.py b/cuda_core/tests/test_graph.py deleted file mode 100644 index aaad9304f4..0000000000 --- a/cuda_core/tests/test_graph.py +++ /dev/null @@ -1,764 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# -# SPDX-License-Identifier: LicenseRef-NVIDIA-SOFTWARE-LICENSE - -import ctypes - -import numpy as np -import pytest - -try: - from cuda.bindings import nvrtc -except ImportError: - from cuda import nvrtc -from cuda.core import ( - Device, - GraphBuilder, - GraphCompleteOptions, - GraphDebugPrintOptions, - LaunchConfig, - LegacyPinnedMemoryResource, - Program, - ProgramOptions, - launch, -) -from cuda.core._utils.cuda_utils import NVRTCError, handle_return - - -def _common_kernels(): - code = """ - __global__ void empty_kernel() {} - __global__ void add_one(int *a) { *a += 1; } - """ - arch = "".join(f"{i}" for i in Device().compute_capability) - program_options = ProgramOptions(std="c++17", arch=f"sm_{arch}") - prog = Program(code, code_type="c++", options=program_options) - mod = prog.compile("cubin", name_expressions=("empty_kernel", "add_one")) - return mod - - -def _common_kernels_conditional(cond_type): - if cond_type in (bool, np.bool_, ctypes.c_bool): - cond_type_str = "bool" - elif cond_type is int: - cond_type_str = "unsigned int" - else: - raise ValueError("Unsupported cond_type") - - code = """ - extern "C" __device__ __cudart_builtin__ void CUDARTAPI cudaGraphSetConditional(cudaGraphConditionalHandle handle, - unsigned int value); - __global__ void empty_kernel() {} - __global__ void add_one(int *a) { *a += 1; } - __global__ void set_handle(cudaGraphConditionalHandle handle, $cond_type_str value) { - cudaGraphSetConditional(handle, value); - } - __global__ void loop_kernel(cudaGraphConditionalHandle handle) - { - static int count = 10; - cudaGraphSetConditional(handle, --count ? 1 : 0); - } - """.replace("$cond_type_str", cond_type_str) - arch = "".join(f"{i}" for i in Device().compute_capability) - program_options = ProgramOptions(std="c++17", arch=f"sm_{arch}") - prog = Program(code, code_type="c++", options=program_options) - try: - mod = prog.compile("cubin", name_expressions=("empty_kernel", "add_one", "set_handle", "loop_kernel")) - except NVRTCError as e: - with pytest.raises(NVRTCError, match='error: identifier "cudaGraphConditionalHandle" is undefined'): - raise e - nvrtcVersion = handle_return(nvrtc.nvrtcVersion()) - pytest.skip(f"NVRTC version {nvrtcVersion} does not support conditionals") - return mod - - -def test_graph_is_building(init_cuda): - gb = Device().create_graph_builder() - assert gb.is_building is False - gb.begin_building() - assert gb.is_building is True - gb.end_building() - assert gb.is_building is False - - -def test_graph_straight(init_cuda): - mod = _common_kernels() - empty_kernel = mod.get_kernel("empty_kernel") - launch_stream = Device().create_stream() - - # Simple linear topology - gb = Device().create_graph_builder().begin_building() - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - graph = gb.end_building().complete() - - # Sanity upload and launch - graph.upload(launch_stream) - graph.launch(launch_stream) - launch_stream.sync() - - -def test_graph_fork_join(init_cuda): - mod = _common_kernels() - empty_kernel = mod.get_kernel("empty_kernel") - launch_stream = Device().create_stream() - - # Simple diamond topology - gb = Device().create_graph_builder().begin_building() - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - - with pytest.raises(ValueError, match="^Invalid split count: expecting >= 2, got 1"): - gb.split(1) - - left, right = gb.split(2) - launch(left, LaunchConfig(grid=1, block=1), empty_kernel) - launch(left, LaunchConfig(grid=1, block=1), empty_kernel) - launch(right, LaunchConfig(grid=1, block=1), empty_kernel) - launch(right, LaunchConfig(grid=1, block=1), empty_kernel) - - with pytest.raises(ValueError, match="^Must join with at least two graph builders"): - GraphBuilder.join(left) - - gb = GraphBuilder.join(left, right) - - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - graph = gb.end_building().complete() - - # Sanity upload and launch - graph.upload(launch_stream) - graph.launch(launch_stream) - launch_stream.sync() - - -def test_graph_is_join_required(init_cuda): - mod = _common_kernels() - empty_kernel = mod.get_kernel("empty_kernel") - - # Starting builder is always primary - gb = Device().create_graph_builder() - assert gb.is_join_required is False - gb.begin_building() - - # Create root node - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - - # First returned builder is always the original - first_split_builders = gb.split(3) - assert first_split_builders[0] is gb - - # Only the original builder need not join - assert first_split_builders[0].is_join_required is False - for builder in first_split_builders[1:]: - assert builder.is_join_required is True - - # Launch kernel on each split - for builder in first_split_builders: - launch(builder, LaunchConfig(grid=1, block=1), empty_kernel) - - # Splitting on new builder will all require joining - second_split_builders = first_split_builders[-1] - first_split_builders = first_split_builders[0:-1] - second_split_builders = second_split_builders.split(3) - for builder in second_split_builders: - assert builder.is_join_required is True - - # Launch kernel on each second split - for builder in second_split_builders: - launch(builder, LaunchConfig(grid=1, block=1), empty_kernel) - - # Joined builder requires joining if all builder need to join - gb = GraphBuilder.join(*second_split_builders) - assert gb.is_join_required is True - gb = GraphBuilder.join(gb, *first_split_builders) - assert gb.is_join_required is False - - # Create final node - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - gb.end_building().complete() - - -@pytest.mark.skipif(tuple(int(i) for i in np.__version__.split(".")[:2]) < (2, 1), reason="need numpy 2.1.0+") -def test_graph_repeat_capture(init_cuda): - mod = _common_kernels() - add_one = mod.get_kernel("add_one") - - # Allocate memory - launch_stream = Device().create_stream() - mr = LegacyPinnedMemoryResource() - b = mr.allocate(4) - arr = np.from_dlpack(b).view(np.int32) - arr[0] = 0 - - # Launch the graph once - gb = launch_stream.create_graph_builder().begin_building() - launch(gb, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) - graph = gb.end_building().complete() - - # Run the graph once - graph.launch(launch_stream) - launch_stream.sync() - assert arr[0] == 1 - - # Continue capturing to extend the graph - with pytest.raises(RuntimeError, match="^Cannot resume building after building has ended."): - gb.begin_building() - - # Graph can be re-launched - graph.launch(launch_stream) - graph.launch(launch_stream) - graph.launch(launch_stream) - launch_stream.sync() - assert arr[0] == 4 - - # Close the memory resource now because the garbage collected might - # de-allocate it during the next graph builder process - b.close() - - -def test_graph_capture_errors(init_cuda): - gb = Device().create_graph_builder() - with pytest.raises(RuntimeError, match="^Graph has not finished building."): - gb.complete() - - gb.begin_building() - with pytest.raises(RuntimeError, match="^Graph has not finished building."): - gb.complete() - gb.end_building().complete() - - -@pytest.mark.parametrize( - "condition_value", [True, False, ctypes.c_bool(True), ctypes.c_bool(False), np.bool_(True), np.bool_(False), 1, 0] -) -@pytest.mark.skipif(tuple(int(i) for i in np.__version__.split(".")[:2]) < (2, 1), reason="need numpy 2.1.0+") -def test_graph_conditional_if(init_cuda, condition_value): - mod = _common_kernels_conditional(type(condition_value)) - add_one = mod.get_kernel("add_one") - set_handle = mod.get_kernel("set_handle") - - # Allocate memory - launch_stream = Device().create_stream() - mr = LegacyPinnedMemoryResource() - b = mr.allocate(8) - arr = np.from_dlpack(b).view(np.int32) - arr[0] = 0 - arr[1] = 0 - - # Begin capture - gb = Device().create_graph_builder().begin_building() - - # Add Node A (sets condition) - try: - handle = gb.create_conditional_handle() - except RuntimeError as e: - with pytest.raises(RuntimeError, match="^Driver version"): - raise e - gb.end_building() - b.close() - pytest.skip("Driver does not support conditional handle") - launch(gb, LaunchConfig(grid=1, block=1), set_handle, handle, condition_value) - - # Add Node B (if condition) - gb_if = gb.if_cond(handle).begin_building() - launch(gb_if, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) - gb_if_0, gb_if_1 = gb_if.split(2) - launch(gb_if_0, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) - launch(gb_if_1, LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) - gb_if = GraphBuilder.join(gb_if_0, gb_if_1) - launch(gb_if, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) - gb_if.end_building() - - # Add Node C (...) - # Note: We use the original graph to continue building past the cond node - launch(gb, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) - - graph = gb.end_building().complete() - - # Left path increments first value, right path increments second value - assert arr[0] == 0 - assert arr[1] == 0 - graph.launch(launch_stream) - launch_stream.sync() - if condition_value: - assert arr[0] == 4 - assert arr[1] == 1 - else: - assert arr[0] == 1 - assert arr[1] == 0 - - # Close the memory resource now because the garbage collected might - # de-allocate it during the next graph builder process - b.close() - - -@pytest.mark.parametrize( - "condition_value", [True, False, ctypes.c_bool(True), ctypes.c_bool(False), np.bool_(True), np.bool_(False), 1, 0] -) -@pytest.mark.skipif(tuple(int(i) for i in np.__version__.split(".")[:2]) < (2, 1), reason="need numpy 2.1.0+") -def test_graph_conditional_if_else(init_cuda, condition_value): - mod = _common_kernels_conditional(type(condition_value)) - add_one = mod.get_kernel("add_one") - set_handle = mod.get_kernel("set_handle") - - # Allocate memory - launch_stream = Device().create_stream() - mr = LegacyPinnedMemoryResource() - b = mr.allocate(8) - arr = np.from_dlpack(b).view(np.int32) - arr[0] = 0 - arr[1] = 0 - - # Begin capture - gb = Device().create_graph_builder().begin_building() - - # Add Node A (sets condition) - handle = gb.create_conditional_handle() - launch(gb, LaunchConfig(grid=1, block=1), set_handle, handle, condition_value) - - # Add Node B (if condition) - try: - gb_if, gb_else = gb.if_else(handle) - except RuntimeError as e: - with pytest.raises(RuntimeError, match="^(Driver|Binding) version"): - raise e - gb.end_building() - b.close() - pytest.skip("Driver does not support conditional if-else") - - ## IF nodes - gb_if = gb_if.begin_building() - launch(gb_if, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) - gb_if_0, gb_if_1 = gb_if.split(2) - launch(gb_if_0, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) - launch(gb_if_1, LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) - gb_if = GraphBuilder.join(gb_if_0, gb_if_1) - launch(gb_if, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) - gb_if.end_building() - - ## ELSE nodes - gb_else = gb_else.begin_building() - launch(gb_else, LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) - launch(gb_else, LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) - launch(gb_else, LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) - gb_else.end_building() - - # Add Node C (...) - # Note: We use the original graph to continue building past the cond node - launch(gb, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) - - graph = gb.end_building().complete() - - # True condition increments both values, while False increments only second value - assert arr[0] == 0 - assert arr[1] == 0 - graph.launch(launch_stream) - launch_stream.sync() - if condition_value: - assert arr[0] == 4 - assert arr[1] == 1 - else: - assert arr[0] == 1 - assert arr[1] == 3 - - # Close the memory resource now because the garbage collected might - # de-allocate it during the next graph builder process - b.close() - - -@pytest.mark.parametrize("condition_value", [0, 1, 2, 3]) -@pytest.mark.skipif(tuple(int(i) for i in np.__version__.split(".")[:2]) < (2, 1), reason="need numpy 2.1.0+") -def test_graph_conditional_switch(init_cuda, condition_value): - mod = _common_kernels_conditional(type(condition_value)) - add_one = mod.get_kernel("add_one") - set_handle = mod.get_kernel("set_handle") - - # Allocate memory - launch_stream = Device().create_stream() - mr = LegacyPinnedMemoryResource() - b = mr.allocate(12) - arr = np.from_dlpack(b).view(np.int32) - arr[0] = 0 - arr[1] = 0 - arr[2] = 0 - - # Begin capture - gb = Device().create_graph_builder().begin_building() - - # Add Node A (sets condition) - handle = gb.create_conditional_handle() - launch(gb, LaunchConfig(grid=1, block=1), set_handle, handle, condition_value) - - # Add Node B (while condition) - try: - gb_case = list(gb.switch(handle, 3)) - except RuntimeError as e: - with pytest.raises(RuntimeError, match="^(Driver|Binding) version"): - raise e - gb.end_building() - b.close() - pytest.skip("Driver does not support conditional switch") - - ## Case 0 - gb_case[0].begin_building() - launch(gb_case[0], LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) - launch(gb_case[0], LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) - launch(gb_case[0], LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) - gb_case[0].end_building() - - ## Case 1 - gb_case[1].begin_building() - launch(gb_case[1], LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) - gb_case_1_left, gb_case_1_right = gb_case[1].split(2) - launch(gb_case_1_left, LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) - launch(gb_case_1_right, LaunchConfig(grid=1, block=1), add_one, arr[2:].ctypes.data) - gb_case[1] = GraphBuilder.join(gb_case_1_left, gb_case_1_right) - gb_case[1].end_building() - - ## Case 2 - gb_case[2].begin_building() - launch(gb_case[2], LaunchConfig(grid=1, block=1), add_one, arr[2:].ctypes.data) - launch(gb_case[2], LaunchConfig(grid=1, block=1), add_one, arr[2:].ctypes.data) - launch(gb_case[2], LaunchConfig(grid=1, block=1), add_one, arr[2:].ctypes.data) - gb_case[2].end_building() - - # Add Node C (...) - # Note: We use the original graph to continue building past the cond node - launch(gb, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) - - graph = gb.end_building().complete() - - # Each case focuses on their own index - assert arr[0] == 0 - assert arr[1] == 0 - assert arr[2] == 0 - graph.launch(launch_stream) - launch_stream.sync() - if condition_value == 0: - assert arr[0] == 4 - assert arr[1] == 0 - assert arr[2] == 0 - elif condition_value == 1: - assert arr[0] == 1 - assert arr[1] == 2 - assert arr[2] == 1 - elif condition_value == 2: - assert arr[0] == 1 - assert arr[1] == 0 - assert arr[2] == 3 - elif condition_value == 3: - # No branch is taken if case index is out of range - assert arr[0] == 1 - assert arr[1] == 0 - assert arr[2] == 0 - - # Close the memory resource now because the garbage collected might - # de-allocate it during the next graph builder process - b.close() - - -@pytest.mark.parametrize("condition_value", [True, False, 1, 0]) -@pytest.mark.skipif(tuple(int(i) for i in np.__version__.split(".")[:2]) < (2, 1), reason="need numpy 2.1.0+") -def test_graph_conditional_while(init_cuda, condition_value): - mod = _common_kernels_conditional(type(condition_value)) - add_one = mod.get_kernel("add_one") - loop_kernel = mod.get_kernel("loop_kernel") - empty_kernel = mod.get_kernel("empty_kernel") - - # Allocate memory - launch_stream = Device().create_stream() - mr = LegacyPinnedMemoryResource() - b = mr.allocate(4) - arr = np.from_dlpack(b).view(np.int32) - arr[0] = 0 - - # Begin capture - gb = Device().create_graph_builder().begin_building() - - # Node A is skipped because we can instead use a non-zero default value - handle = gb.create_conditional_handle(default_value=condition_value) - - # Add Node B (while condition) - gb_while = gb.while_loop(handle) - gb_while.begin_building() - launch(gb_while, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) - launch(gb_while, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) - launch(gb_while, LaunchConfig(grid=1, block=1), loop_kernel, handle) - gb_while.end_building() - - # Add Node C (...) - # Note: We use the original gb to continue building past the cond node - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - - graph = gb.end_building().complete() - - # Default value is used to start the loop - assert arr[0] == 0 - graph.launch(launch_stream) - launch_stream.sync() - if condition_value: - assert arr[0] == 20 - else: - assert arr[0] == 0 - - # Close the memory resource now because the garbage collected might - # de-allocate it during the next graph builder process - b.close() - - -@pytest.mark.skipif(tuple(int(i) for i in np.__version__.split(".")[:2]) < (2, 1), reason="need numpy 2.1.0+") -def test_graph_child_graph(init_cuda): - mod = _common_kernels() - add_one = mod.get_kernel("add_one") - - # Allocate memory - launch_stream = Device().create_stream() - mr = LegacyPinnedMemoryResource() - b = mr.allocate(8) - arr = np.from_dlpack(b).view(np.int32) - arr[0] = 0 - arr[1] = 0 - - # Capture the child graph - gb_child = Device().create_graph_builder().begin_building() - launch(gb_child, LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) - launch(gb_child, LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) - launch(gb_child, LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) - gb_child.end_building() - - # Capture the parent graph - gb_parent = Device().create_graph_builder().begin_building() - launch(gb_parent, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) - - ## Add child - try: - gb_parent.add_child(gb_child) - except NotImplementedError as e: - with pytest.raises( - NotImplementedError, - match="^Launching child graphs is not implemented for versions older than CUDA 12", - ): - raise e - gb_parent.end_building() - b.close() - pytest.skip("Launching child graphs is not implemented for versions older than CUDA 12") - - launch(gb_parent, LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) - graph = gb_parent.end_building().complete() - - # Parent updates first value, child updates second value - assert arr[0] == 0 - assert arr[1] == 0 - graph.launch(launch_stream) - launch_stream.sync() - assert arr[0] == 2 - assert arr[1] == 3 - - # Close the memory resource now because the garbage collected might - # de-allocate it during the next graph builder process - b.close() - - -@pytest.mark.skipif(tuple(int(i) for i in np.__version__.split(".")[:2]) < (2, 1), reason="need numpy 2.1.0+") -def test_graph_update(init_cuda): - mod = _common_kernels_conditional(int) - add_one = mod.get_kernel("add_one") - - # Allocate memory - launch_stream = Device().create_stream() - mr = LegacyPinnedMemoryResource() - b = mr.allocate(12) - arr = np.from_dlpack(b).view(np.int32) - arr[0] = 0 - arr[1] = 0 - arr[2] = 0 - - def build_graph(condition_value): - # Begin capture - gb = Device().create_graph_builder().begin_building() - - # Add Node A (sets condition) - handle = gb.create_conditional_handle(default_value=condition_value) - - # Add Node B (while condition) - try: - gb_case = list(gb.switch(handle, 3)) - except Exception as e: - with pytest.raises(RuntimeError, match="^(Driver|Binding) version"): - raise e - gb.end_building() - raise e - - ## Case 0 - gb_case[0].begin_building() - launch(gb_case[0], LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) - launch(gb_case[0], LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) - launch(gb_case[0], LaunchConfig(grid=1, block=1), add_one, arr.ctypes.data) - gb_case[0].end_building() - - ## Case 1 - gb_case[1].begin_building() - launch(gb_case[1], LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) - launch(gb_case[1], LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) - launch(gb_case[1], LaunchConfig(grid=1, block=1), add_one, arr[1:].ctypes.data) - gb_case[1].end_building() - - ## Case 2 - gb_case[2].begin_building() - launch(gb_case[2], LaunchConfig(grid=1, block=1), add_one, arr[2:].ctypes.data) - launch(gb_case[2], LaunchConfig(grid=1, block=1), add_one, arr[2:].ctypes.data) - launch(gb_case[2], LaunchConfig(grid=1, block=1), add_one, arr[2:].ctypes.data) - gb_case[2].end_building() - - return gb.end_building() - - try: - graph_variants = [build_graph(0), build_graph(1), build_graph(2)] - except Exception as e: - with pytest.raises(RuntimeError, match="^(Driver|Binding) version"): - raise e - b.close() - pytest.skip("Driver does not support conditional switch") - - # Launch the first graph - assert arr[0] == 0 - assert arr[1] == 0 - assert arr[2] == 0 - graph = graph_variants[0].complete() - graph.launch(launch_stream) - launch_stream.sync() - assert arr[0] == 3 - assert arr[1] == 0 - assert arr[2] == 0 - - # Update with second variant and launch again - graph.update(graph_variants[1]) - graph.launch(launch_stream) - launch_stream.sync() - assert arr[0] == 3 - assert arr[1] == 3 - assert arr[2] == 0 - - # Update with third variant and launch again - graph.update(graph_variants[2]) - graph.launch(launch_stream) - launch_stream.sync() - assert arr[0] == 3 - assert arr[1] == 3 - assert arr[2] == 3 - - # Close the memory resource now because the garbage collected might - # de-allocate it during the next graph builder process - b.close() - - -def test_graph_stream_lifetime(init_cuda): - mod = _common_kernels() - empty_kernel = mod.get_kernel("empty_kernel") - - # Create simple graph from device - gb = Device().create_graph_builder().begin_building() - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - graph = gb.end_building().complete() - - # Destroy simple graph and builder - gb.close() - graph.close() - - # Create simple graph from stream - stream = Device().create_stream() - gb = stream.create_graph_builder().begin_building() - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - graph = gb.end_building().complete() - - # Destroy simple graph and builder - gb.close() - graph.close() - - # Verify the stream can still launch work - launch(stream, LaunchConfig(grid=1, block=1), empty_kernel) - stream.sync() - - # Destroy the stream - stream.close() - - -def test_graph_dot_print_options(init_cuda, tmp_path): - mod = _common_kernels_conditional(bool) - set_handle = mod.get_kernel("set_handle") - empty_kernel = mod.get_kernel("empty_kernel") - - # Begin capture - gb = Device().create_graph_builder().begin_building() - - # Add Node A (sets condition) - handle = gb.create_conditional_handle() - launch(gb, LaunchConfig(grid=1, block=1), set_handle, handle, False) - - # Add Node B (if condition) - gb_if = gb.if_cond(handle).begin_building() - launch(gb_if, LaunchConfig(grid=1, block=1), empty_kernel) - gb_if_0, gb_if_1 = gb_if.split(2) - launch(gb_if_0, LaunchConfig(grid=1, block=1), empty_kernel) - launch(gb_if_1, LaunchConfig(grid=1, block=1), empty_kernel) - gb_if = GraphBuilder.join(gb_if_0, gb_if_1) - launch(gb_if, LaunchConfig(grid=1, block=1), empty_kernel) - gb_if.end_building() - - # Add Node C (...) - # Note: We use the original graph to continue building past the cond node - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - gb.end_building() - - # Print using all options - path = bytes(str(tmp_path / "vlad.dot"), "utf-8") - options = GraphDebugPrintOptions(**{field: True for field in GraphDebugPrintOptions.__dataclass_fields__}) - gb.debug_dot_print(path, options) - - -def test_graph_complete_options(init_cuda): - mod = _common_kernels() - empty_kernel = mod.get_kernel("empty_kernel") - launch_stream = Device().create_stream() - - # Simple linear topology - gb = Device().create_graph_builder().begin_building() - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - gb.end_building() - - options = GraphCompleteOptions(auto_free_on_launch=True) - gb.complete(options).close() - options = GraphCompleteOptions(upload_stream=launch_stream) - gb.complete(options).close() - options = GraphCompleteOptions(device_launch=True) - gb.complete(options).close() - options = GraphCompleteOptions(use_node_priority=True) - gb.complete(options).close() - - -def test_graph_build_mode(init_cuda): - mod = _common_kernels() - empty_kernel = mod.get_kernel("empty_kernel") - - # Simple linear topology - gb = Device().create_graph_builder().begin_building(mode="global") - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - gb.end_building() - - gb = Device().create_graph_builder().begin_building(mode="thread_local") - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - gb.end_building() - - gb = Device().create_graph_builder().begin_building(mode="relaxed") - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - launch(gb, LaunchConfig(grid=1, block=1), empty_kernel) - gb.end_building() - - with pytest.raises(ValueError, match="^Unsupported build mode:"): - gb = Device().create_graph_builder().begin_building(mode=None)