diff --git a/.gitignore b/.gitignore index f9082380e..b2372b5dc 100644 --- a/.gitignore +++ b/.gitignore @@ -162,3 +162,5 @@ pufferlib/ocean/impulse_wars/*-release/ pufferlib/ocean/impulse_wars/debug-*/ pufferlib/ocean/impulse_wars/release-*/ pufferlib/ocean/impulse_wars/benchmark/ +*.a +*.o diff --git a/pufferlib/extensions/bindings.cpp b/pufferlib/extensions/bindings.cpp index 0c99cd245..76a9e5ece 100644 --- a/pufferlib/extensions/bindings.cpp +++ b/pufferlib/extensions/bindings.cpp @@ -156,7 +156,8 @@ TORCH_LIBRARY(_C, m) { m.def("fc_max(Tensor x, Tensor W, Tensor b) -> Tensor"); } -PYBIND11_MODULE(_C, m) { +__attribute__((visibility("default"))) +extern void register_pufferlib_bindings(pybind11::module_& m) { m.def("log_environments", &log_environments); m.def("rollouts", &rollouts); m.def("train", &train); diff --git a/pufferlib/extensions/cuda/cuda_kernels.cpp b/pufferlib/extensions/cuda/cuda_kernels.cpp new file mode 100644 index 000000000..14ace7787 --- /dev/null +++ b/pufferlib/extensions/cuda/cuda_kernels.cpp @@ -0,0 +1,2 @@ +// Compiles the torch/cuda kernels user in pufferlib to _C.so that is statically linked to by /binding.so +#include "kernels.h" \ No newline at end of file diff --git a/pufferlib/extensions/env_glue.cpp b/pufferlib/extensions/env_glue.cpp new file mode 100644 index 000000000..dbb0a725d --- /dev/null +++ b/pufferlib/extensions/env_glue.cpp @@ -0,0 +1,13 @@ +#include + +#include + +void register_pufferlib_bindings(pybind11::module_& m); + +PYBIND11_MODULE(binding, m) { + register_pufferlib_bindings(m); +} + +extern "C" void test_function() { + printf("Test function called!\n"); +} \ No newline at end of file diff --git a/pufferlib/extensions/pufferlib.cpp b/pufferlib/extensions/pufferlib.cpp index c8d07b326..890eb04a6 100644 --- a/pufferlib/extensions/pufferlib.cpp +++ b/pufferlib/extensions/pufferlib.cpp @@ -23,13 +23,24 @@ #include "muon.h" #include "env_binding.h" -#include "modules.h" + +typedef torch::Tensor Tensor; + +// CUDA kernel wrappers +#include "modules.cpp" + +// get dtype based on bf16 flag +inline torch::ScalarType get_dtype(bool bf16) { + return bf16 ? torch::kBFloat16 : torch::kFloat32; +} namespace pufferlib { -#include "models.cpp" +// Advantage computation is in advantage.cpp #include "advantage.cpp" -#include "ocean.cpp" + +// Model classes are in models.cpp +#include "models.cpp" torch::Dtype to_torch_dtype(int dtype) { if (dtype == FLOAT) { @@ -55,7 +66,7 @@ typedef struct { Tensor terminals; } EnvBuf; -tuple +std::tuple create_environments(int num_buffers, int total_agents, const std::string& env_name, Dict* vec_kwargs, Dict* env_kwargs, EnvBuf& env) { StaticVec* vec = create_static_vec(total_agents, num_buffers, vec_kwargs, env_kwargs); printf("DEBUG create_environments: vec->size=%d, vec->total_agents=%d\n", @@ -65,9 +76,9 @@ create_environments(int num_buffers, int total_agents, const std::string& env_na int num_atns = get_num_atns(); env.obs = torch::from_blob(vec->gpu_observations, {total_agents, obs_size}, torch::dtype(to_torch_dtype(get_obs_type())).device(torch::kCUDA)); - env.actions = torch::from_blob(vec->gpu_actions, {total_agents, num_atns}, cuda_f64); - env.rewards = torch::from_blob(vec->gpu_rewards, {total_agents}, cuda_f32); - env.terminals = torch::from_blob(vec->gpu_terminals, {total_agents}, cuda_f32); + env.actions = torch::from_blob(vec->gpu_actions, {total_agents, num_atns}, torch::dtype(torch::kFloat64).device(torch::kCUDA)); + env.rewards = torch::from_blob(vec->gpu_rewards, {total_agents}, torch::dtype(torch::kFloat32).device(torch::kCUDA)); + env.terminals = torch::from_blob(vec->gpu_terminals, {total_agents}, torch::dtype(torch::kFloat32).device(torch::kCUDA)); // Create act_sizes tensor on CUDA (needed for sample_logits kernel) Tensor act_sizes = torch::from_blob(get_act_sizes(), {num_atns}, torch::dtype(torch::kInt32)).to(torch::kCUDA); @@ -282,207 +293,189 @@ extern "C" void net_callback_wrapper(void* ctx, int buf, int t) { rollouts.terminals.select(0, t).narrow(0, buf*block_size, block_size).copy_( pufferl->env.terminals.narrow(0, buf*block_size, block_size), true); - pufferl->env.actions.narrow(0, buf*block_size, block_size).copy_(actions_out, true); - - if (capturing) { - pufferl->fused_rollout_cudagraphs[t][buf].capture_end(); - cap_stream.synchronize(); - cudaDeviceSynchronize(); - at::cuda::setCurrentCUDAStream(saved_stream); - } - } - profile_end(hypers.profile); + // Copy actions to env for next step + pufferl.env.actions.narrow(0, buf*block_size, block_size).copy_(actions_out, true); } -void rollouts_impl(PuffeRL& pufferl) { - torch::NoGradGuard no_grad; - HypersT& hypers = pufferl.hypers; - - int horizon = hypers.horizon; - int num_buffers = hypers.num_buffers; - // TODO: You removed state zeros and reward clamping - - for (int i = 0; i < num_buffers*horizon; ++i) { - int buf = i % num_buffers; - int h = i / num_buffers; +void train_forward_call(TrainGraph& graph, PolicyMinGRU* policy_bf16, PolicyMinGRU* policy_fp32, + torch::optim::Muon* muon, HypersT& hypers, Tensor& adv_mean, Tensor& adv_std, Tensor& act_sizes_cpu, bool kernels) { + auto [logits, newvalue] = policy_bf16->forward_train(graph.mb_obs, graph.mb_state); + + Tensor loss; + if (kernels) { + auto [mb_adv_var, mb_adv_mean] = torch::var_mean(graph.mb_advantages); // single kernel launch + loss = fused_ppo_loss_optimized( + logits, + newvalue, + graph.mb_actions, + graph.mb_logprobs, + graph.mb_advantages, + graph.mb_prio, + graph.mb_values, + graph.mb_returns, + mb_adv_mean, + mb_adv_var, // variance, not std - kernel does sqrtf to avoid second kernel launch here + graph.mb_ratio, + graph.mb_newvalue.view({graph.mb_ratio.size(0), graph.mb_ratio.size(1)}), + hypers.clip_coef, + hypers.vf_clip_coef, + hypers.vf_coef, + hypers.ent_coef + )[0]; + } else { + int num_action_heads = graph.mb_actions.size(-1); + int batch = hypers.minibatch_size; + int minibatch_segments = batch / hypers.horizon; + + // Split logits by action head sizes and compute log probs for each head + Tensor flat_logits = logits.reshape({batch, -1}); + flat_logits = torch::nan_to_num(flat_logits, 1e-8, 1e-8, 1e-8); + auto split_logits = torch::split(flat_logits, c10::IntArrayRef(act_sizes_cpu.data_ptr(), num_action_heads), 1); + + std::vector logprobs_vec; + std::vector entropies_vec; + + for (int h = 0; h < num_action_heads; h++) { + Tensor head_logits = split_logits[h]; + Tensor log_probs = torch::log_softmax(head_logits, 1); + Tensor probs = log_probs.exp(); + Tensor head_actions = graph.mb_actions.select(-1, h).reshape({batch}).to(torch::kInt64); + Tensor logprob = log_probs.gather(1, head_actions.unsqueeze(1)); + logprobs_vec.push_back(logprob); + entropies_vec.push_back(-(probs * log_probs).sum(1, true)); + } - net_callback_wrapper(&pufferl, buf, h); + // Stack and reduce - no per-iteration allocations + Tensor newlogprob = torch::cat(logprobs_vec, 1).sum(1).reshape({minibatch_segments, hypers.horizon}); + Tensor entropy = torch::cat(entropies_vec, 1).sum(1).mean(); + + // Compute ratio + Tensor logratio = newlogprob - graph.mb_logprobs; + Tensor ratio_new = logratio.exp(); + graph.mb_ratio.copy_(ratio_new, false); + graph.mb_newvalue.copy_(newvalue, false); + + // Normalize advantages: (adv - mean) / std, then weight + Tensor adv_normalized = graph.mb_advantages; + adv_normalized = graph.mb_prio * (adv_normalized - adv_normalized.mean()) / (adv_normalized.std() + 1e-8); + + // Policy loss + Tensor pg_loss1 = -adv_normalized * ratio_new; + Tensor pg_loss2 = -adv_normalized * torch::clamp(ratio_new, 1.0 - hypers.clip_coef, 1.0 + hypers.clip_coef); + Tensor pg_loss = torch::max(pg_loss1, pg_loss2).mean(); + + // Value loss + newvalue = newvalue.view(graph.mb_returns.sizes()); + Tensor v_clipped = graph.mb_values + torch::clamp(newvalue - graph.mb_values, + -hypers.vf_clip_coef, hypers.vf_clip_coef); + Tensor v_loss_unclipped = (newvalue - graph.mb_returns).pow(2); + Tensor v_loss_clipped = (v_clipped - graph.mb_returns).pow(2); + Tensor v_loss = 0.5 * torch::max(v_loss_unclipped, v_loss_clipped).mean(); + + // Total loss + loss = pg_loss + hypers.vf_coef*v_loss - hypers.ent_coef*entropy; + } - // TODO: There should be a lighter way to sync. You need to make sure the torch data streams - // are ready because puffer vec uses different streams. Setting to non-blocking is not enough. - cudaDeviceSynchronize(); + // computes gradients on bf16 weights (or fp32 if not using bf16) + loss.backward(); + + // copy gradients from bf16 to fp32, then optimizer step on fp32 master weights + if (hypers.bf16) { + copy_gradients_to_fp32(policy_bf16, policy_fp32); + } + clip_grad_norm_(policy_fp32->parameters(), hypers.max_grad_norm); + muon->step(); + muon->zero_grad(); + if (hypers.bf16) { + policy_bf16->zero_grad(); // also need to clear bf16 gradients + // sync updated fp32 weights back to bf16 for next forward pass + sync_policy_weights(policy_bf16, policy_fp32); } } -void train_impl(PuffeRL& pufferl) { - // Update to HypersT& p - HypersT& hypers = pufferl.hypers; - - // Buffers are stored as {horizon, segments, ...} for contiguous rollout writes - // Transpose to {segments, horizon, ...} for train logic - // Need .contiguous() because compute_puff_advantage_cuda uses raw data pointers - RolloutBuf rollouts; - rollouts.observations = pufferl.rollouts.observations.permute({1, 0, 2}).contiguous(); - rollouts.actions = pufferl.rollouts.actions.transpose(0, 1).contiguous(); - rollouts.logprobs = pufferl.rollouts.logprobs.transpose(0, 1).contiguous(); - rollouts.rewards = pufferl.rollouts.rewards.transpose(0, 1).contiguous(); - rollouts.rewards.clamp_(-1.0, 1.0); // Clamp rewards here instead of in eval to save a kernel call per step - rollouts.terminals = pufferl.rollouts.terminals.transpose(0, 1).contiguous(); - rollouts.ratio = pufferl.rollouts.ratio.transpose(0, 1).contiguous(); - rollouts.values = pufferl.rollouts.values.transpose(0, 1).contiguous(); - - // Inline any of these only used once - int minibatch_size = hypers.minibatch_size; - int batch_size = hypers.total_agents * hypers.horizon; - int minibatch_segments = minibatch_size / hypers.horizon; - float prio_beta0 = hypers.prio_beta0; - float prio_alpha = hypers.prio_alpha; - bool anneal_lr = hypers.anneal_lr; - int current_epoch = pufferl.epoch; - - // Accumulators - torch::Device device = rollouts.values.device(); - torch::TensorOptions scalar_opts = torch::TensorOptions().dtype(torch::kFloat32).device(device); - Tensor pg_sum = torch::zeros({}, scalar_opts); - Tensor v_sum = torch::zeros({}, scalar_opts); - Tensor ent_sum = torch::zeros({}, scalar_opts); - Tensor total_sum = torch::zeros({}, scalar_opts); - Tensor old_approx_kl_sum = torch::zeros({}, scalar_opts); - Tensor approx_kl_sum = torch::zeros({}, scalar_opts); - Tensor clipfrac_sum = torch::zeros({}, scalar_opts); - Tensor importance_sum = torch::zeros({}, scalar_opts); - - Policy* policy_bf16 = pufferl.policy_bf16; - // Policy* policy_fp32 = pufferl.policy_fp32; - torch::optim::Muon* muon = pufferl.muon; - - int total_epochs = hypers.total_timesteps / batch_size; - - if (anneal_lr) { - float lr_min = hypers.min_lr_ratio * hypers.lr; - float lr = cosine_annealing(hypers.lr, lr_min, current_epoch, total_epochs); - muon->lr.fill_(lr); +// Capture with shared memory pool +void capture_graph(at::cuda::CUDAGraph* graph, std::function func, + at::cuda::MempoolId_t pool) { + /* Checklist for avoiding diabolical capture bugs: + * 1. Don't start separate streams before tracing (i.e. env gpu buffers) + * 2. Make sure input/output buffer pointers don't change + * 3. Make sure to restore the original stream after tracing + * 4. All custom kernels need to use the default torch stream + * 5. Make sure you are using the torch stream fns, not the c10 ones. + * 6. Scalars get captured by value. They cannot change between calls. + */ + at::cuda::CUDAStream current_stream = at::cuda::getCurrentCUDAStream(); + + at::cuda::CUDAStream warmup_stream = at::cuda::getStreamFromPool(); + at::cuda::setCurrentCUDAStream(warmup_stream); + for (int i = 0; i < 10; ++i) { + func(); } + warmup_stream.synchronize(); - // Annealed priority exponent - TODO: graphed? - float anneal_beta = prio_beta0 + (1.0f - prio_beta0) * prio_alpha * (float)current_epoch/(float)total_epochs; - - // Zero out ratio at start of epoch (matches Python: self.ratio[:] = 1) - rollouts.ratio.fill_(1.0); - - Tensor advantages = torch::zeros_like(rollouts.values, torch::kFloat32); // fp32 precision + auto cap_stream = at::cuda::getStreamFromPool(); + at::cuda::setCurrentCUDAStream(cap_stream); + graph->capture_begin(pool); + func(); + graph->capture_end(); + cap_stream.synchronize(); - compute_advantage(rollouts, advantages, hypers); - Tensor mb_state = torch::zeros( - {hypers.num_layers, minibatch_segments, 1, (int64_t)hypers.hidden_size}, - torch::dtype(PRECISION_DTYPE).device(rollouts.values.device()) - ); - - int total_minibatches = hypers.replay_ratio * batch_size / hypers.minibatch_size; - - TrainGraph& graph = pufferl.train_buf; - - for (int mb = 0; mb < total_minibatches; ++mb) { - advantages.fill_(0.0); + cudaDeviceSynchronize(); - profile_begin("compute_advantage", hypers.profile); - compute_advantage(rollouts, advantages, hypers); - profile_end(hypers.profile); + at::cuda::setCurrentCUDAStream(current_stream); +} - // Inlined compute_prio - profile_begin("compute_prio", hypers.profile); - Tensor adv = advantages.abs().sum(1); - Tensor prio_weights = adv.pow(prio_alpha).nan_to_num_(0.0, 0.0, 0.0); - Tensor prio_probs = (prio_weights + 1e-6)/(prio_weights.sum() + 1e-6); - Tensor idx = at::multinomial(prio_probs, minibatch_segments, true); - Tensor mb_prio = torch::pow(hypers.total_agents*prio_probs.index_select(0, idx).unsqueeze(1), -anneal_beta); - profile_end(hypers.profile); - // Inlined train_select_and_copy - profile_begin("train_select_and_copy", hypers.profile); - Tensor mb_obs = rollouts.observations.index_select(0, idx); - Tensor mb_actions = rollouts.actions.index_select(0, idx); - Tensor mb_logprobs = rollouts.logprobs.index_select(0, idx); - Tensor mb_values = rollouts.values.index_select(0, idx); - Tensor mb_advantages = advantages.index_select(0, idx); - Tensor mb_returns = mb_advantages + mb_values; - - mb_state.zero_(); - graph.mb_obs.copy_(mb_obs, false); - graph.mb_state.copy_(mb_state, false); - graph.mb_actions.copy_(mb_actions, false); - graph.mb_logprobs.copy_(mb_logprobs, false); - graph.mb_advantages.copy_(mb_advantages, false); - graph.mb_prio.copy_(mb_prio, false); - graph.mb_values.copy_(mb_values, false); - graph.mb_returns.copy_(mb_returns, false); - profile_end(hypers.profile); +// ============================================================================ +// Rollout and train section functions +// ============================================================================ - profile_begin("train_forward_graph", hypers.profile); - if (pufferl.train_captured) { - pufferl.train_cudagraph.replay(); - } else { - bool capturing = pufferl.train_warmup == hypers.cudagraphs; - auto saved_stream = at::cuda::getCurrentCUDAStream(); - auto cap_stream = capturing ? at::cuda::getStreamFromPool() : saved_stream; - if (capturing) { - at::cuda::setCurrentCUDAStream(cap_stream); - pufferl.train_cudagraph.capture_begin(pufferl.train_pool_id); - } +inline void profile_begin(const char* tag, bool enable) { + if (enable) { cudaDeviceSynchronize(); nvtxRangePushA(tag); } +} - auto [logits, newvalue] = pufferl.policy_bf16->forward_train(graph.mb_obs, graph.mb_state); +inline void profile_end(bool enable) { + if (enable) { cudaDeviceSynchronize(); nvtxRangePop(); } +} - Tensor loss = compute_train_loss(logits, newvalue, - graph.mb_actions, graph.mb_logprobs, graph.mb_advantages, graph.mb_prio, - graph.mb_values, graph.mb_returns, - graph.mb_ratio, graph.mb_newvalue.view({graph.mb_ratio.size(0), graph.mb_ratio.size(1)}), - pufferl.act_sizes, pufferl.act_sizes_cpu, - hypers.minibatch_size, hypers.horizon, - hypers.clip_coef, hypers.vf_clip_coef, hypers.vf_coef, hypers.ent_coef, - pufferl.is_continuous, hypers.kernels); +void env_recv(PuffeRL& pufferl, int buf) { + // Not used in static/OMP path +} - loss.backward(); +void env_send(PuffeRL& pufferl, int buf) { + // Not used in static/OMP path +} - if (USE_BF16) { - copy_gradients_to_fp32(pufferl.policy_bf16, pufferl.policy_fp32); - } - clip_grad_norm_(pufferl.policy_fp32->parameters(), hypers.max_grad_norm); - pufferl.muon->step(); - pufferl.muon->zero_grad(); - if (USE_BF16) { - pufferl.policy_bf16->zero_grad(); - sync_policy_weights(pufferl.policy_bf16, pufferl.policy_fp32); - } +void compute_advantage(RolloutBuf& rollouts, Tensor& advantages, HypersT& hypers) { + compute_puff_advantage_cuda(rollouts.values, rollouts.rewards, rollouts.terminals, + rollouts.ratio, advantages, hypers.gamma, hypers.gae_lambda, + hypers.vtrace_rho_clip, hypers.vtrace_c_clip); +} - if (capturing) { - pufferl.train_cudagraph.capture_end(); - cap_stream.synchronize(); - cudaDeviceSynchronize(); - at::cuda::setCurrentCUDAStream(saved_stream); - pufferl.train_captured = true; - } - pufferl.train_warmup++; - } - profile_end(hypers.profile); +// Thread initialization callback - sets CUDA stream once per thread +extern "C" void thread_init_wrapper(void* ctx, int buf) { + PuffeRL* pufferl = (PuffeRL*)ctx; + at::cuda::setCurrentCUDAStream(pufferl->torch_streams[buf]); +} - // Update global ratio and values in-place (matches Python) - pufferl.rollouts.ratio.index_copy_(1, idx, graph.mb_ratio.detach().squeeze(-1).to(PRECISION_DTYPE).transpose(0, 1)); - pufferl.rollouts.values.index_copy_(1, idx, graph.mb_newvalue.detach().squeeze(-1).to(PRECISION_DTYPE).transpose(0, 1)); +// Callback for OMP threadmanager - runs policy forward for one (buf, t) step +extern "C" void net_callback_wrapper(void* ctx, int buf, int t) { + torch::NoGradGuard no_grad; + PuffeRL* pufferl = (PuffeRL*)ctx; + HypersT& hypers = pufferl->hypers; + profile_begin("fused_rollout", hypers.profile); + if (hypers.cudagraphs) { + // Fused cudagraph: input copy + forward + output copy in one shot + pufferl->fused_rollout_cudagraphs[t][buf].replay(); + } else { + fused_rollout_step(*pufferl, t, buf); } - pufferl.epoch += 1; - - // Compute explained variance at end of epoch - /* - auto y_true = advantages.flatten() + values.flatten(); - auto y_pred = values.flatten(); - auto var_y = y_true.var(); - */ - //double explained_var = (var_y.abs() < 1e-8) ? NAN : (1 - (y_true - y_pred).var() / var_y).item(); - cudaStreamSynchronize(at::cuda::getCurrentCUDAStream()); + profile_end(hypers.profile); } std::unique_ptr create_pufferl_impl(HypersT& hypers, const std::string& env_name, Dict* vec_kwargs, Dict* env_kwargs) { + BEGIN_LIBTORCH_CATCH auto pufferl = std::make_unique(); pufferl->hypers = hypers; pufferl->nccl_comm = nullptr; @@ -711,45 +704,72 @@ std::unique_ptr create_pufferl_impl(HypersT& hypers, const s static_vec_reset(vec); return pufferl; + END_LIBTORCH_CATCH } -void close_impl(PuffeRL& pufferl) { - cudaDeviceSynchronize(); - for (size_t i = 0; i < pufferl.buffer_states.size(); i++) { - char buf[64]; - snprintf(buf, sizeof(buf), "buffer_states[%zu]", i); - } - // Policy params total - size_t policy_bytes = 0; - for (const auto& p : pufferl.policy_fp32->parameters()) { - policy_bytes += p.numel() * p.element_size(); - } - if (USE_BF16) { - size_t bf16_bytes = 0; - for (const auto& p : pufferl.policy_bf16->parameters()) { - bf16_bytes += p.numel() * p.element_size(); - } - } +std::tuple compute_prio(Tensor& advantages, + int minibatch_segments, int segments, + float prio_alpha, float anneal_beta) { + Tensor adv = advantages.abs().sum(1); + Tensor prio_weights = adv.pow(prio_alpha).nan_to_num_(0.0, 0.0, 0.0); + Tensor prio_probs = (prio_weights + 1e-6)/(prio_weights.sum() + 1e-6); + Tensor idx = at::multinomial(prio_probs, minibatch_segments, true); + Tensor mb_prio = torch::pow(segments*prio_probs.index_select(0, idx).unsqueeze(1), -anneal_beta); + return {idx, mb_prio}; +} - // Reset CUDA graphs first (they hold references to tensor memory) - pufferl.train_cudagraph.reset(); - pufferl.fused_rollout_cudagraphs.clear(); +void train_select_and_copy(TrainGraph& graph, RolloutBuf& rollouts, + Tensor& advantages, Tensor& idx, Tensor& mb_state, Tensor& mb_prio) { + Tensor mb_obs = rollouts.observations.index_select(0, idx); + Tensor mb_actions = rollouts.actions.index_select(0, idx); + Tensor mb_logprobs = rollouts.logprobs.index_select(0, idx); + Tensor mb_values = rollouts.values.index_select(0, idx); + Tensor mb_advantages = advantages.index_select(0, idx); + Tensor mb_returns = mb_advantages + mb_values; + + mb_state.zero_(); + graph.mb_obs.copy_(mb_obs, false); + graph.mb_state.copy_(mb_state, false); + graph.mb_actions.copy_(mb_actions, false); + graph.mb_logprobs.copy_(mb_logprobs, false); + graph.mb_advantages.copy_(mb_advantages, false); + graph.mb_prio.copy_(mb_prio, false); + graph.mb_values.copy_(mb_values, false); + graph.mb_returns.copy_(mb_returns, false); +} - // Clear optimizer buffers explicitly (policy params are views into weight_buffer) - pufferl.muon->weight_buffer = Tensor(); - pufferl.muon->momentum_buffer = Tensor(); - pufferl.muon->lr = Tensor(); - // Clear the param_groups to release parameter references - pufferl.muon->param_groups().clear(); - delete pufferl.muon; - pufferl.muon = nullptr; +void rollouts_impl(PuffeRL& pufferl) { + torch::NoGradGuard no_grad; + HypersT& hypers = pufferl.hypers; - if (USE_BF16) { - delete pufferl.policy_bf16; + int horizon = hypers.horizon; + int num_buffers = hypers.num_buffers; + // TODO: You removed state zeros and reward clamping + + for (int i = 0; i < num_buffers*horizon; ++i) { + int buf = i % num_buffers; + int h = i / num_buffers; + + profile_begin("env_recv", hypers.profile); + env_recv(pufferl, buf); + profile_end(hypers.profile); + + net_callback_wrapper(&pufferl, buf, h); + + // TODO: There should be a lighter way to sync. You need to make sure the torch data streams + // are ready because puffer vec uses different streams. Setting to non-blocking is not enough. + cudaDeviceSynchronize(); + + profile_begin("env_send", hypers.profile); + env_send(pufferl, buf); + profile_end(hypers.profile); } - delete pufferl.policy_fp32; - pufferl.policy_bf16 = nullptr; - pufferl.policy_fp32 = nullptr; +} + + +void train_impl(PuffeRL& pufferl) { + // Update to HypersT& p + HypersT& hypers = pufferl.hypers; // Clear buffer states (releases CUDA tensors) pufferl.buffer_states.clear(); @@ -781,29 +801,57 @@ void close_impl(PuffeRL& pufferl) { pufferl.act_sizes_cpu = Tensor(); pufferl.rng_offset = Tensor(); - // Clear env tensors (from_blob wrappers - don't own memory but hold refs) - pufferl.env.obs = Tensor(); - pufferl.env.actions = Tensor(); - pufferl.env.rewards = Tensor(); - pufferl.env.terminals = Tensor(); + // Temporary: random indices and uniform weights + /* + auto idx = torch::randint(0, segments, {minibatch_segments}, torch::dtype(torch::kInt64).device(device)); + auto mb_prio = torch::ones({minibatch_segments, 1}, torch::dtype(torch::kFloat32).device(device)); + */ - // Clear torch streams - pufferl.torch_streams.clear(); + int total_minibatches = hypers.replay_ratio * batch_size / hypers.minibatch_size; - // Close environment vectorization (frees env GPU buffers) - static_vec_close(pufferl.vec); - pufferl.vec = nullptr; + for (int mb = 0; mb < total_minibatches; ++mb) { + advantages.fill_(0.0); - // Cleanup NCCL - if (pufferl.nccl_comm != nullptr) { - ncclCommDestroy(pufferl.nccl_comm); - pufferl.nccl_comm = nullptr; - } + profile_begin("compute_advantage", hypers.profile); + compute_advantage(rollouts, advantages, hypers); + profile_end(hypers.profile); - // Force CUDA to release cached memory first - c10::cuda::CUDACachingAllocator::emptyCache(); - cudaDeviceSynchronize(); + profile_begin("compute_prio", hypers.profile); + auto [idx, mb_prio] = compute_prio(advantages, minibatch_segments, hypers.total_agents, + prio_alpha, anneal_beta); + profile_end(hypers.profile); + + profile_begin("train_select_and_copy", hypers.profile); + TrainGraph& graph = pufferl.train_buf; + train_select_and_copy(graph, rollouts, advantages, idx, mb_state, mb_prio); + profile_end(hypers.profile); + profile_begin("train_forward_graph", hypers.profile); + if (hypers.cudagraphs) { + pufferl.train_cudagraph.replay(); + } else { + train_forward_call(graph, pufferl.policy_bf16, pufferl.policy_fp32, pufferl.muon, + hypers, pufferl.adv_mean, pufferl.adv_std, pufferl.act_sizes_cpu, hypers.kernels); + } + profile_end(hypers.profile); + + // Update global ratio and values in-place (matches Python) + // Buffers are {horizon, segments}, so index_copy_ along dim 1 (segments) + // Source is {minibatch_segments, horizon}, need to transpose to {horizon, minibatch_segments} + pufferl.rollouts.ratio.index_copy_(1, idx, graph.mb_ratio.detach().squeeze(-1).to(dtype).transpose(0, 1)); + pufferl.rollouts.values.index_copy_(1, idx, graph.mb_newvalue.detach().squeeze(-1).to(dtype).transpose(0, 1)); + + } + pufferl.epoch += 1; + + // Compute explained variance at end of epoch + /* + auto y_true = advantages.flatten() + values.flatten(); + auto y_pred = values.flatten(); + auto var_y = y_true.var(); + */ + //double explained_var = (var_y.abs() < 1e-8) ? NAN : (1 - (y_true - y_pred).var() / var_y).item(); + cudaStreamSynchronize(at::cuda::getCurrentCUDAStream()); } // nsys capture control (--capture-range=cudaProfilerApi). Different from profile_begin/end which are nvtx ranges. diff --git a/pufferlib/ocean/env_binding.h b/pufferlib/ocean/env_binding.h index 64efe1cf1..958759c7a 100644 --- a/pufferlib/ocean/env_binding.h +++ b/pufferlib/ocean/env_binding.h @@ -669,6 +669,7 @@ static PyMethodDef methods[] = { {NULL, NULL, 0, NULL} }; +#ifndef PUFFER_NATIVECPP_PYBINDINGS // Module definition static PyModuleDef module = { PyModuleDef_HEAD_INIT, @@ -682,3 +683,5 @@ PyMODINIT_FUNC PyInit_binding(void) { import_array(); return PyModule_Create(&module); } + +#endif \ No newline at end of file diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index b714354d7..9b5a869bf 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -37,8 +37,9 @@ import pufferlib.pytorch try: from pufferlib import _C + from pufferlib import fake_tensors except ImportError: - raise ImportError('Failed to import PufferLib C++ backend. If you have non-default PyTorch, try installing with --no-build-isolation') + raise ImportError('Failed to import C/CUDA advantage kernel. If you have non-default PyTorch, try installing with --no-build-isolation') import rich import rich.traceback @@ -565,7 +566,7 @@ def _train_rank(env_name, args=None, logger=None, verbose=True, early_stop_fn=No pufferl = PuffeRL(train_config, vec_config, env_config, policy_config, logger, verbose) if train_config['profile']: - _C.profiler_start() + binding.profiler_start() # Sweep needs data for early stopped runs, so send data when steps > 100M logging_threshold = min(0.20*train_config['total_timesteps'], 100_000_000) @@ -589,16 +590,15 @@ def _train_rank(env_name, args=None, logger=None, verbose=True, early_stop_fn=No if pufferl.global_step > logging_threshold: all_logs.append(logs) - if should_stop_early: - if train_config['profile']: - _C.profiler_stop() - model_path = pufferl.close() - pufferl.logger.log_cost(pufferl.uptime) - pufferl.logger.close(model_path, early_stop=True) - return pufferl, all_logs + if should_stop_early is not None and should_stop_early(logs): + if train_config['profile']: + _C.profiler_stop() + model_path = pufferl.close() + pufferl.logger.close(model_path) + return all_logs if train_config['profile']: - _C.profiler_stop() + binding.profiler_stop() pufferl.print_dashboard() @@ -684,6 +684,41 @@ def train(env_name, args=None, logger=None, verbose=True, early_stop_fn=None): pufferl.logger.close(model_path, early_stop=False) return all_logs +def sps(env_name, args=None, vecenv=None, policy=None, logger=None, verbose=True, should_stop_early=None): + args = args or load_config(env_name) + train_config = dict(**args['train'])#, env=env_name) + train_config['env_name'] = args['env_name'] + train_config['vec_kwargs'] = args['vec'] + train_config['env_kwargs'] = args['env'] + train_config['total_agents'] = args['vec']['total_agents'] + train_config['num_buffers'] = args['vec']['num_buffers'] + pufferl = PuffeRL(train_config, logger, verbose) + # Warmup + for _ in range(3): + _C.batched_forward( + pufferl.pufferl_cpp, + pufferl.observations, + pufferl.total_minibatches, + pufferl.minibatch_segments, + ) + + N = 100 + torch.cuda.synchronize() + start = time.time() + for _ in range(N): + _C.batched_forward( + pufferl.pufferl_cpp, + pufferl.observations, + pufferl.total_minibatches, + pufferl.minibatch_segments, + ) + torch.cuda.synchronize() + end = time.time() + dt = end - start + sps = pufferl.config['batch_size']*N/dt + print(f'SPS: {sps/1e6:.1f}M') + + def eval(env_name, args=None, vecenv=None, policy=None): args = args or load_config(env_name) backend = args['vec']['backend'] @@ -1147,6 +1182,7 @@ def main(): mode = sys.argv.pop(1) env_name = sys.argv.pop(1) + if mode == 'train': train(env_name=env_name) elif mode == 'eval': diff --git a/setup.py b/setup.py index 0701bc914..0efcefdf0 100644 --- a/setup.py +++ b/setup.py @@ -12,6 +12,10 @@ import platform import shutil import pybind11 +import torch +import subprocess +import sysconfig +import torch.utils.cpp_extension as cpp_ext from setuptools.command.build_ext import build_ext from torch.utils import cpp_extension @@ -37,6 +41,9 @@ NO_OCEAN = os.getenv("NO_OCEAN", "0") == "1" NO_TRAIN = os.getenv("NO_TRAIN", "0") == "1" +if DEBUG: + print("*****Building in DEBUG mode*******") + # Build raylib for your platform RAYLIB_URL = 'https://github.com/raysan5/raylib/releases/download/5.5/' RAYLIB_NAME = 'raylib-5.5_macos' if platform.system() == "Darwin" else 'raylib-5.5_linux_amd64' @@ -82,6 +89,7 @@ def download_box2d(platform): extra_compile_args = [ '-DNPY_NO_DEPRECATED_API=NPY_1_7_API_VERSION', '-DPLATFORM_DESKTOP', + '-DPUFFER_NATIVECPP_PYBINDINGS=1', ] extra_link_args = [ '-fwrapv', @@ -101,6 +109,12 @@ def download_box2d(platform): extra_compile_args += [ '-O0', '-g', + '-flto=auto', + '-fno-semantic-interposition', + '-fvisibility=hidden', + '-DPUFFER_DEBUG=1', + '-DDEBUG=1', + #'-fsanitize=address,undefined,bounds,pointer-overflow,leak', #'-fno-omit-frame-pointer', ] @@ -202,14 +216,49 @@ def run(self): extra_objects=[RAYLIB_A], ) +def _find_built_pufferlib_native(required: bool = True): + ext_suffix = ".so" + + inplace = os.path.join("pufferlib", "native" + ext_suffix) + if os.path.isfile(inplace): + return inplace + + cwd = os.getcwd() + candidates = glob.glob(os.path.join(cwd, "build", "**", "pufferlib", "_C*.so"), recursive=True) + candidates += glob.glob(os.path.join(cwd, "pufferlib", "_C*.so"), recursive=True) + candidates = [p for p in candidates if os.path.isfile(p)] + if candidates: + candidates.sort(key=os.path.getmtime, reverse=True) + return candidates[0] + + if required: + raise ValueError(f"Could not find built pufferlib.native extension under {cwd}.") + return None + +native_lib = _find_built_pufferlib_native(required=False) +if native_lib: + print(f"Adding native library {native_lib} to C/C++ extensions") + extension_kwargs['extra_objects'].append(native_lib) + +# Check if CUDA compiler is available. You need cuda dev, not just runtime. +cuda_home = os.environ.get('CUDA_HOME') or os.environ.get('CUDA_PATH') or torch.utils.cpp_extension.CUDA_HOME or '/usr/local/cuda' +nvtx_lib_dir = os.path.join(cuda_home, 'lib64') # Common on Linux; fall back to 'lib' if needed +nvtx_lib = 'nvToolsExt' + # Find C extensions c_extensions = [] if not NO_OCEAN: + cpp_sources = [ + "pufferlib/extensions/env_glue.cpp", + ] c_extension_paths = glob.glob('pufferlib/ocean/**/binding.c', recursive=True) + extension_kwargs['include_dirs'] += [pybind11.get_include(), torch.utils.cpp_extension.include_paths()[0], "pufferlib/extensions/"] + c_extensions = [ - Extension( + CppExtension( path.rstrip('.c').replace('/', '.'), - sources=[path], + sources=[path] + cpp_sources, + language ='c++', **extension_kwargs, ) for path in c_extension_paths if 'matsci' not in path @@ -247,9 +296,6 @@ def finalize_options(self): super().finalize_options() def run(self): - import subprocess - import sysconfig - import torch.utils.cpp_extension as cpp_ext src = 'profile_kernels.cu' out = 'profile_kernels' @@ -322,11 +368,15 @@ def run(self): # -g? clang_cmd = [ - 'clang', '-c', '-O2', '-DNDEBUG', + 'clang', '-c', + ('-O0' if DEBUG else '-O2'), + ('-DDEBUG' if DEBUG else '-DNDEBUG'), '-I.', '-Ipufferlib/extensions', f'-Ipufferlib/ocean/{env_name}', f'-I./{RAYLIB_NAME}/include', '-I/usr/local/cuda/include', '-DPLATFORM_DESKTOP', - '-fno-semantic-interposition', '-fvisibility=hidden', + ('-DPUFFER_DEBUG=1' if DEBUG else ''), + '-fno-semantic-interposition', + ('-fvisibility=default' if DEBUG else '-fvisibility=hidden'), '-fPIC', '-fopenmp', env_binding_src, '-o', static_obj ] @@ -367,11 +417,6 @@ def run(self): cmdclass[f"build_{env_name}_so"] = create_env_build_class(c_ext.name) -# Check if CUDA compiler is available. You need cuda dev, not just runtime. -import torch -cuda_home = os.environ.get('CUDA_HOME') or os.environ.get('CUDA_PATH') or torch.utils.cpp_extension.CUDA_HOME or '/usr/local/cuda' -nvtx_lib_dir = os.path.join(cuda_home, 'lib64') # Common on Linux; fall back to 'lib' if needed -nvtx_lib = 'nvToolsExt' torch_extensions = [] if not NO_TRAIN: torch_sources = [ @@ -454,3 +499,9 @@ def run(self): cmdclass=cmdclass, include_dirs=[numpy.get_include(), RAYLIB_NAME + '/include'], ) + + +# export CC=gcc-12 +# export CXX=g++-12 +# export LDSHARED="g++-12 -shared" +# export CUDAHOSTCXX=g++-12 \ No newline at end of file