From 0fc8731adbdd8bcbf76c515c79974a0a2574099f Mon Sep 17 00:00:00 2001 From: Uchio Kondo Date: Mon, 9 Mar 2026 20:59:47 +0900 Subject: [PATCH 01/16] Implement queue generator for Cloudflare Workers --- uzumibi-on-cloudflare-spike/package.json | 3 + .../src/index.queue.js | 134 ++++++++++ .../wasm-app/Cargo.toml | 1 + .../wasm-app/src/lib.rs | 233 ++++++++++++++++++ uzumibi-on-cloudflare-spike/wrangler.jsonc | 9 +- 5 files changed, 379 insertions(+), 1 deletion(-) create mode 100644 uzumibi-on-cloudflare-spike/src/index.queue.js diff --git a/uzumibi-on-cloudflare-spike/package.json b/uzumibi-on-cloudflare-spike/package.json index 09189fe..25eeaf2 100644 --- a/uzumibi-on-cloudflare-spike/package.json +++ b/uzumibi-on-cloudflare-spike/package.json @@ -8,6 +8,9 @@ "dev:vanilla": "npm run build:wasm:vanilla && wrangler dev --main src/index.vanilla.js", "build:wasm:vanilla": "cargo build --package uzumibi-on-cloudflare-spike --target wasm32-unknown-unknown --release --no-default-features && cp -v -f ../target/wasm32-unknown-unknown/release/uzumibi_on_cloudflare_spike.wasm src/", "build:wasm:asyncify": "cargo build --package uzumibi-on-cloudflare-spike --target wasm32-unknown-unknown --release --features enable-external && wasm-opt --enable-bulk-memory --enable-nontrapping-float-to-int --asyncify -O2 ../target/wasm32-unknown-unknown/release/uzumibi_on_cloudflare_spike.wasm -o ./src/uzumibi_on_cloudflare_spike.wasm", + "build:wasm:queue": "cargo build --package uzumibi-on-cloudflare-spike --target wasm32-unknown-unknown --release --features queue && wasm-opt --enable-bulk-memory --enable-nontrapping-float-to-int --asyncify -O2 ../target/wasm32-unknown-unknown/release/uzumibi_on_cloudflare_spike.wasm -o ./src/uzumibi_on_cloudflare_spike_queue.wasm", + "dev:queue": "npm run build:wasm:queue && wrangler dev --main src/index.queue.js", + "deploy:queue": "npm run build:wasm:queue && wrangler deploy --main src/index.queue.js", "start": "wrangler dev", "test": "vitest" }, diff --git a/uzumibi-on-cloudflare-spike/src/index.queue.js b/uzumibi-on-cloudflare-spike/src/index.queue.js new file mode 100644 index 0000000..85e445b --- /dev/null +++ b/uzumibi-on-cloudflare-spike/src/index.queue.js @@ -0,0 +1,134 @@ +import { instantiate } from "asyncify-wasm"; +import mod from "./uzumibi_on_cloudflare_spike_queue.wasm"; + +const wasmModule = mod; + +export default { + async queue(batch, env, ctx) { + const decoder = new TextDecoder(); + const encoder = new TextEncoder(); + + // Current message being processed (set per iteration) + let currentMessage = null; + + const importObject = { + env: { + debug_console_log: (ptr, size) => { + const memory = exports.memory; + const buffer = new Uint8Array(memory.buffer, ptr, size); + console.log(`[debug]: ${decoder.decode(buffer)}`); + return 0; + }, + + uzumibi_cf_message_ack: async (_idPtr, _idSize) => { + currentMessage.ack(); + return 0; + }, + + uzumibi_cf_message_retry: async (_idPtr, _idSize, delaySeconds) => { + currentMessage.retry({ delaySeconds }); + return 0; + }, + }, + }; + + const instance = await instantiate(wasmModule, importObject); + const exports = instance.exports; + + for (const message of batch.messages) { + currentMessage = message; + + const idBytes = encoder.encode(message.id); + const timestampBytes = encoder.encode( + message.timestamp.toISOString(), + ); + const bodyBytes = encoder.encode( + typeof message.body === "string" + ? message.body + : JSON.stringify(message.body), + ); + const attempts = message.attempts; + + // Pack message data: + // u16 LE id_size, id bytes, + // u16 LE timestamp_size, timestamp bytes, + // u32 LE body_size, body bytes, + // u32 LE attempts + const totalSize = + 2 + + idBytes.length + + 2 + + timestampBytes.length + + 4 + + bodyBytes.length + + 4; + + const msgResult = + await exports.uzumibi_initialize_message(totalSize); + const msgOffset = Number(msgResult & 0xffffffffn); + if (msgOffset === 0) { + const errOffset = Number( + (msgResult >> 32n) & 0xffffffffn, + ); + const buffer = new Uint8Array( + exports.memory.buffer, + errOffset, + ); + let errStr = ""; + for (let i = 0; buffer[i] !== 0; i++) { + errStr += String.fromCharCode(buffer[i]); + } + throw new Error( + `Failed to initialize message: ${errStr}`, + ); + } + + const msgBuffer = new Uint8Array( + exports.memory.buffer, + msgOffset, + totalSize, + ); + const dataView = new DataView( + exports.memory.buffer, + msgOffset, + ); + let pos = 0; + + // id + dataView.setUint16(pos, idBytes.length, true); + pos += 2; + msgBuffer.set(idBytes, pos); + pos += idBytes.length; + + // timestamp + dataView.setUint16(pos, timestampBytes.length, true); + pos += 2; + msgBuffer.set(timestampBytes, pos); + pos += timestampBytes.length; + + // body + dataView.setUint32(pos, bodyBytes.length, true); + pos += 4; + msgBuffer.set(bodyBytes, pos); + pos += bodyBytes.length; + + // attempts + dataView.setUint32(pos, attempts, true); + + const result = await exports.uzumibi_start_message(); + if (result !== 0) { + const buffer = new Uint8Array( + exports.memory.buffer, + result, + ); + let errStr = ""; + for (let i = 0; buffer[i] !== 0; i++) { + errStr += String.fromCharCode(buffer[i]); + } + throw new Error( + `Failed to process message: ${errStr}`, + ); + } + } + }, +}; diff --git a/uzumibi-on-cloudflare-spike/wasm-app/Cargo.toml b/uzumibi-on-cloudflare-spike/wasm-app/Cargo.toml index 221a984..0ead5cd 100644 --- a/uzumibi-on-cloudflare-spike/wasm-app/Cargo.toml +++ b/uzumibi-on-cloudflare-spike/wasm-app/Cargo.toml @@ -19,3 +19,4 @@ mruby-compiler2-sys = ">= 0.3.0" [features] default = ["enable-external"] enable-external = [] +queue = [] diff --git a/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs b/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs index de76de9..c72253e 100644 --- a/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs +++ b/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs @@ -35,6 +35,16 @@ unsafe extern "C" { unsafe fn debug_console_log(ptr: *const u8, len: usize); } +#[cfg(feature = "queue")] +unsafe extern "C" { + unsafe fn uzumibi_cf_message_ack(message_id_ptr: *const u8, message_id_size: usize) -> i32; + unsafe fn uzumibi_cf_message_retry( + message_id_ptr: *const u8, + message_id_size: usize, + delay_seconds: i32, + ) -> i32; +} + #[cfg(feature = "enable-external")] unsafe extern "C" { unsafe fn uzumibi_cf_fetch( @@ -350,6 +360,81 @@ fn uzumibi_queue_class_send( Ok(RObject::boolean(true).to_refcount_assigned()) } +// ---- Queue consumer support (only when queue feature is active) ---- + +/// Message.ack! -> delegates to JS +#[cfg(feature = "queue")] +fn uzumibi_message_ack( + vm: &mut VM, + args: &[Rc], +) -> Result, mrubyedge::Error> { + // self is args[0] + let self_obj = &args[0]; + let id_obj = self_obj.get_ivar("@id").ok_or_else(|| { + mrubyedge::Error::RuntimeError("Message @id not found".to_string()) + })?; + let id = mrb_funcall(vm, id_obj.into(), "to_s", &[])?; + let id: String = id.as_ref().try_into()?; + + unsafe { + let result = uzumibi_cf_message_ack(id.as_ptr(), id.len()); + if result != 0 { + return Err(mrubyedge::Error::RuntimeError( + format!("Failed to ack message: return code {}", result), + )); + } + } + Ok(RObject::boolean(true).to_refcount_assigned()) +} + +/// Message.retry(delay_seconds: N) -> delegates to JS +#[cfg(feature = "queue")] +fn uzumibi_message_retry( + vm: &mut VM, + args: &[Rc], +) -> Result, mrubyedge::Error> { + let self_obj = &args[0]; + let id_obj = self_obj.get_ivar("@id").ok_or_else(|| { + mrubyedge::Error::RuntimeError("Message @id not found".to_string()) + })?; + let id = mrb_funcall(vm, id_obj.into(), "to_s", &[])?; + let id: String = id.as_ref().try_into()?; + + let delay_seconds: i32 = match vm.get_kwargs() { + Some(kwargs) => { + match kwargs.get("delay_seconds") { + Some(val) => { + let v: i64 = val.as_ref().try_into()?; + v as i32 + } + None => 0, + } + } + None => 0, + }; + + unsafe { + let result = uzumibi_cf_message_retry(id.as_ptr(), id.len(), delay_seconds); + if result != 0 { + return Err(mrubyedge::Error::RuntimeError( + format!("Failed to retry message: return code {}", result), + )); + } + } + Ok(RObject::boolean(true).to_refcount_assigned()) +} + +/// Consumer.on_receive(message) - abstract method, must be overridden +#[cfg(feature = "queue")] +fn uzumibi_consumer_on_receive( + _vm: &mut VM, + _args: &[Rc], +) -> Result, mrubyedge::Error> { + Err(mrubyedge::Error::RuntimeError( + "on_receive must be implemented by subclass of Uzumibi::Consumer".to_string(), + )) +} + // ---- VM initialization ---- fn init_vm() -> Result { @@ -398,6 +483,35 @@ fn init_vm() -> Result { ); } + #[cfg(feature = "queue")] + { + let uzumibi_module = vm.get_module_by_name("Uzumibi"); + + // Uzumibi::Consumer (base class for user-defined consumers) + let consumer_class = vm.define_class("Consumer", None, Some(uzumibi_module.clone())); + mrb_define_cmethod( + &mut vm, + consumer_class, + "on_receive", + Box::new(uzumibi_consumer_on_receive), + ); + + // Uzumibi::Message with ack! and retry methods + let message_class = vm.define_class("Message", None, Some(uzumibi_module)); + mrb_define_cmethod( + &mut vm, + message_class.clone(), + "ack!", + Box::new(uzumibi_message_ack), + ); + mrb_define_cmethod( + &mut vm, + message_class, + "retry", + Box::new(uzumibi_message_retry), + ); + } + vm.run() .map_err(|e| mrubyedge::Error::RuntimeError(format!("Failed to init VM: {:?}", e)))?; @@ -467,3 +581,122 @@ unsafe extern "C" fn uzumibi_start_request() -> u64 { } } } + +// ---- Queue message handling (only when queue feature is active) ---- + +/// Allocate a buffer for the message data. +/// Returns a pointer to the buffer (lower 32 bits) or error (upper 32 bits). +#[cfg(feature = "queue")] +static mut MESSAGE_BUF: Option> = None; + +#[cfg(feature = "queue")] +fn do_uzumibi_initialize_message(size: i32) -> Result<*mut u8, mrubyedge::Error> { + let _ = assume_init_vm()?; + unsafe { + MESSAGE_BUF = Some(vec![0u8; size as usize]); + Ok(MESSAGE_BUF.as_mut().unwrap().as_mut_ptr()) + } +} + +/// Unpack message from buffer and call $CONSUMER.on_receive(message). +/// Message binary format: +/// u16 LE id_size, id bytes, +/// u16 LE timestamp_size, timestamp bytes, +/// u32 LE body_size, body bytes, +/// u32 LE attempts +/// +/// Returns 0 on success, or a pointer to an error string. +#[cfg(feature = "queue")] +fn do_uzumibi_start_message() -> Result<(), mrubyedge::Error> { + debug_console_log_internal("uzumibi_start_message called"); + let vm = assume_init_vm()?; + + let buf = unsafe { + MESSAGE_BUF + .as_ref() + .ok_or_else(|| mrubyedge::Error::RuntimeError("Message buffer not initialized".to_string()))? + }; + + let mut offset = 0; + + // id (u16 LE size + bytes) + let id_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]) as usize; + offset += 2; + let id = String::from_utf8_lossy(&buf[offset..offset + id_size]).to_string(); + offset += id_size; + + // timestamp (u16 LE size + bytes) + let ts_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]) as usize; + offset += 2; + let timestamp = String::from_utf8_lossy(&buf[offset..offset + ts_size]).to_string(); + offset += ts_size; + + // body (u32 LE size + bytes) + let body_size = u32::from_le_bytes([ + buf[offset], buf[offset + 1], buf[offset + 2], buf[offset + 3], + ]) as usize; + offset += 4; + let body = String::from_utf8_lossy(&buf[offset..offset + body_size]).to_string(); + offset += body_size; + + // attempts (u32 LE) + let attempts = u32::from_le_bytes([ + buf[offset], buf[offset + 1], buf[offset + 2], buf[offset + 3], + ]) as i64; + + // Create Uzumibi::Message instance + let uzumibi = vm + .get_const_by_name("Uzumibi") + .ok_or_else(|| mrubyedge::Error::RuntimeError("Uzumibi module not found".to_string()))?; + let uzumibi_module = match &uzumibi.as_ref().value { + RValue::Module(m) => m.clone(), + _ => { + return Err(mrubyedge::Error::RuntimeError( + "Uzumibi must be a module".to_string(), + )); + } + }; + let message_class = uzumibi_module + .get_const_by_name("Message") + .ok_or_else(|| { + mrubyedge::Error::RuntimeError("Uzumibi::Message class not found".to_string()) + })?; + let message = mrb_funcall(vm, Some(message_class), "new", &[])?; + + message.set_ivar("@id", RObject::string(id).to_refcount_assigned()); + message.set_ivar("@timestamp", RObject::string(timestamp).to_refcount_assigned()); + message.set_ivar("@body", RObject::string(body).to_refcount_assigned()); + message.set_ivar("@attempts", RObject::integer(attempts).to_refcount_assigned()); + + // Call $CONSUMER.on_receive(message) + let consumer = vm + .globals + .get("$CONSUMER") + .ok_or_else(|| mrubyedge::Error::RuntimeError("$CONSUMER is not defined".to_string()))?; + mrb_funcall(vm, consumer.clone().into(), "on_receive", &[message])?; + + Ok(()) +} + +#[cfg(feature = "queue")] +#[unsafe(export_name = "uzumibi_initialize_message")] +unsafe extern "C" fn uzumibi_initialize_message(size: i32) -> u64 { + match do_uzumibi_initialize_message(size) { + Ok(ptr) => (ptr as u32) as u64, + Err(e) => { + let err_buf = set_error_to_buf(format!("Error in initialize_message: {}", e)); + ((err_buf as u32) as u64) << 32 + } + } +} + +#[cfg(feature = "queue")] +#[unsafe(export_name = "uzumibi_start_message")] +unsafe extern "C" fn uzumibi_start_message() -> u32 { + match do_uzumibi_start_message() { + Ok(()) => 0, + Err(e) => { + set_error_to_buf(format!("Error in start_message: {}", e)) as u32 + } + } +} diff --git a/uzumibi-on-cloudflare-spike/wrangler.jsonc b/uzumibi-on-cloudflare-spike/wrangler.jsonc index 0dab6d5..7dcd539 100644 --- a/uzumibi-on-cloudflare-spike/wrangler.jsonc +++ b/uzumibi-on-cloudflare-spike/wrangler.jsonc @@ -33,7 +33,7 @@ ] /** * Queues - * Used for Uzumibi::Queue.send + * Used for Uzumibi::Queue.send (producer) and Uzumibi::Consumer (consumer) * https://developers.cloudflare.com/queues/ */ // "queues": { @@ -42,6 +42,13 @@ // "binding": "MY_QUEUE", // "queue": "my-queue-name" // } + // ], + // "consumers": [ + // { + // "queue": "my-queue-name", + // "max_batch_size": 10, + // "max_batch_timeout": 5 + // } // ] // } } \ No newline at end of file From 15afa43e79f8b2be8652953365450c9e8b1128f0 Mon Sep 17 00:00:00 2001 From: Uchio Kondo Date: Mon, 9 Mar 2026 21:12:02 +0900 Subject: [PATCH 02/16] Sample 2 --- uzumibi-on-cloudflare-spike/lib/app.rb | 8 ++++++++ uzumibi-on-cloudflare-spike/wrangler.jsonc | 17 ++++++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/uzumibi-on-cloudflare-spike/lib/app.rb b/uzumibi-on-cloudflare-spike/lib/app.rb index 1d55526..c75a54f 100644 --- a/uzumibi-on-cloudflare-spike/lib/app.rb +++ b/uzumibi-on-cloudflare-spike/lib/app.rb @@ -37,6 +37,14 @@ class App < Uzumibi::Router res end + get "/queue/send" do |req, res| + Uzumibi::Queue.send("UZUMIBI_QUEUE", "Hello from Uzumibi Queue!") + res.status_code = 200 + res.headers = { "Content-Type" => "text/plain" } + res.body = "Sent message to queue\n" + res + end + get "/healthz" do |req, res| res.status_code = 200 res.headers = { diff --git a/uzumibi-on-cloudflare-spike/wrangler.jsonc b/uzumibi-on-cloudflare-spike/wrangler.jsonc index 7dcd539..7954502 100644 --- a/uzumibi-on-cloudflare-spike/wrangler.jsonc +++ b/uzumibi-on-cloudflare-spike/wrangler.jsonc @@ -30,7 +30,7 @@ "UzumibiKVObject" ] } - ] + ], /** * Queues * Used for Uzumibi::Queue.send (producer) and Uzumibi::Consumer (consumer) @@ -51,4 +51,19 @@ // } // ] // } + "queues": { + "producers": [ + { + "binding": "UZUMIBI_QUEUE", + "queue": "uzumibi-queue" + } + ], + "consumers": [ + { + "queue": "my-queue-name", + "max_batch_size": 10, + "max_batch_timeout": 5 + } + ] + } } \ No newline at end of file From 211c0df292806e5e06af0e1f00ce7ef1803cd4f5 Mon Sep 17 00:00:00 2001 From: Uchio Kondo Date: Mon, 9 Mar 2026 21:15:29 +0900 Subject: [PATCH 03/16] Compile consumer.rb --- uzumibi-on-cloudflare-spike/lib/consumer.rb | 14 ++++++++++++++ uzumibi-on-cloudflare-spike/wasm-app/build.rs | 13 +++++++++++++ uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs | 10 ++++++++++ 3 files changed, 37 insertions(+) create mode 100644 uzumibi-on-cloudflare-spike/lib/consumer.rb diff --git a/uzumibi-on-cloudflare-spike/lib/consumer.rb b/uzumibi-on-cloudflare-spike/lib/consumer.rb new file mode 100644 index 0000000..24238b4 --- /dev/null +++ b/uzumibi-on-cloudflare-spike/lib/consumer.rb @@ -0,0 +1,14 @@ +class Consumer < Uzumibi::Consumer + # @rbs message: Uzumibi::Message + def on_receive(message) + debug_console("[Consumer] Received message: id=#{message.id}, body=#{message.body}, attempts=#{message.attempts}") + + if message.id == "example" + message.ack! + else + message.retry(delay_seconds: 10) + end + end +end + +$CONSUMER = Consumer.new diff --git a/uzumibi-on-cloudflare-spike/wasm-app/build.rs b/uzumibi-on-cloudflare-spike/wasm-app/build.rs index a35f9a7..f94e82f 100644 --- a/uzumibi-on-cloudflare-spike/wasm-app/build.rs +++ b/uzumibi-on-cloudflare-spike/wasm-app/build.rs @@ -13,4 +13,17 @@ fn main() { ctx.compile_to_file(code, &mrb_path) .expect("failed to compile mruby script"); } + + #[cfg(feature = "queue")] + { + let consumer_mrb_path = Path::new(&out_dir).join("consumer.mrb"); + let consumer_code = include_str!("../lib/consumer.rb"); + println!("cargo:rerun-if-changed=../lib/consumer.rb"); + + unsafe { + let mut ctx = mruby_compiler2_sys::MRubyCompiler2Context::new(); + ctx.compile_to_file(consumer_code, &consumer_mrb_path) + .expect("failed to compile consumer mruby script"); + } + } } diff --git a/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs b/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs index c72253e..49050d6 100644 --- a/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs +++ b/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs @@ -15,6 +15,8 @@ use mrubyedge::{ }; static MRB: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/app.mrb")); +#[cfg(feature = "queue")] +static CONSUMER_MRB: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/consumer.mrb")); static mut MRUBY_VM: MaybeUninit = MaybeUninit::uninit(); static mut MRUBY_VM_LOADED: bool = false; @@ -515,6 +517,14 @@ fn init_vm() -> Result { vm.run() .map_err(|e| mrubyedge::Error::RuntimeError(format!("Failed to init VM: {:?}", e)))?; + #[cfg(feature = "queue")] + { + let mut consumer_rite = rite::load(CONSUMER_MRB) + .map_err(|e| mrubyedge::Error::RuntimeError(format!("Failed to load consumer mruby: {:?}", e)))?; + vm.eval_rite(&mut consumer_rite) + .map_err(|e| mrubyedge::Error::RuntimeError(format!("Failed to run consumer: {:?}", e)))?; + } + Ok(vm) } From 95b3c19258287588d59ab2d636cea8e8cb13e133 Mon Sep 17 00:00:00 2001 From: Uchio Kondo Date: Mon, 9 Mar 2026 21:20:53 +0900 Subject: [PATCH 04/16] Fix in queue methods --- .../wasm-app/Cargo.toml | 4 +- .../wasm-app/src/lib.rs | 99 +++++++++++-------- 2 files changed, 58 insertions(+), 45 deletions(-) diff --git a/uzumibi-on-cloudflare-spike/wasm-app/Cargo.toml b/uzumibi-on-cloudflare-spike/wasm-app/Cargo.toml index 0ead5cd..87f9649 100644 --- a/uzumibi-on-cloudflare-spike/wasm-app/Cargo.toml +++ b/uzumibi-on-cloudflare-spike/wasm-app/Cargo.toml @@ -17,6 +17,6 @@ uzumibi-art-router = ">= 0.3.1" mruby-compiler2-sys = ">= 0.3.0" [features] -default = ["enable-external"] +default = ["enable-external"] # , "queue"] enable-external = [] -queue = [] +queue = ["enable-external"] diff --git a/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs b/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs index 49050d6..7321a26 100644 --- a/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs +++ b/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs @@ -366,24 +366,25 @@ fn uzumibi_queue_class_send( /// Message.ack! -> delegates to JS #[cfg(feature = "queue")] -fn uzumibi_message_ack( - vm: &mut VM, - args: &[Rc], -) -> Result, mrubyedge::Error> { +fn uzumibi_message_ack(vm: &mut VM, args: &[Rc]) -> Result, mrubyedge::Error> { // self is args[0] let self_obj = &args[0]; - let id_obj = self_obj.get_ivar("@id").ok_or_else(|| { - mrubyedge::Error::RuntimeError("Message @id not found".to_string()) - })?; + let id_obj = self_obj.get_ivar("@id"); + if matches!(id_obj.as_ref().value, RValue::Nil) { + return Err(mrubyedge::Error::RuntimeError( + "Message object does not have @id".to_string(), + )); + } let id = mrb_funcall(vm, id_obj.into(), "to_s", &[])?; let id: String = id.as_ref().try_into()?; unsafe { let result = uzumibi_cf_message_ack(id.as_ptr(), id.len()); if result != 0 { - return Err(mrubyedge::Error::RuntimeError( - format!("Failed to ack message: return code {}", result), - )); + return Err(mrubyedge::Error::RuntimeError(format!( + "Failed to ack message: return code {}", + result + ))); } } Ok(RObject::boolean(true).to_refcount_assigned()) @@ -396,31 +397,33 @@ fn uzumibi_message_retry( args: &[Rc], ) -> Result, mrubyedge::Error> { let self_obj = &args[0]; - let id_obj = self_obj.get_ivar("@id").ok_or_else(|| { - mrubyedge::Error::RuntimeError("Message @id not found".to_string()) - })?; + let id_obj = self_obj.get_ivar("@id"); + if matches!(id_obj.as_ref().value, RValue::Nil) { + return Err(mrubyedge::Error::RuntimeError( + "Message object does not have @id".to_string(), + )); + } let id = mrb_funcall(vm, id_obj.into(), "to_s", &[])?; let id: String = id.as_ref().try_into()?; let delay_seconds: i32 = match vm.get_kwargs() { - Some(kwargs) => { - match kwargs.get("delay_seconds") { - Some(val) => { - let v: i64 = val.as_ref().try_into()?; - v as i32 - } - None => 0, + Some(kwargs) => match kwargs.get("delay_seconds") { + Some(val) => { + let v: i64 = val.as_ref().try_into()?; + v as i32 } - } + None => 0, + }, None => 0, }; unsafe { let result = uzumibi_cf_message_retry(id.as_ptr(), id.len(), delay_seconds); if result != 0 { - return Err(mrubyedge::Error::RuntimeError( - format!("Failed to retry message: return code {}", result), - )); + return Err(mrubyedge::Error::RuntimeError(format!( + "Failed to retry message: return code {}", + result + ))); } } Ok(RObject::boolean(true).to_refcount_assigned()) @@ -519,10 +522,12 @@ fn init_vm() -> Result { #[cfg(feature = "queue")] { - let mut consumer_rite = rite::load(CONSUMER_MRB) - .map_err(|e| mrubyedge::Error::RuntimeError(format!("Failed to load consumer mruby: {:?}", e)))?; - vm.eval_rite(&mut consumer_rite) - .map_err(|e| mrubyedge::Error::RuntimeError(format!("Failed to run consumer: {:?}", e)))?; + let mut consumer_rite = rite::load(CONSUMER_MRB).map_err(|e| { + mrubyedge::Error::RuntimeError(format!("Failed to load consumer mruby: {:?}", e)) + })?; + vm.eval_rite(&mut consumer_rite).map_err(|e| { + mrubyedge::Error::RuntimeError(format!("Failed to run consumer: {:?}", e)) + })?; } Ok(vm) @@ -622,9 +627,9 @@ fn do_uzumibi_start_message() -> Result<(), mrubyedge::Error> { let vm = assume_init_vm()?; let buf = unsafe { - MESSAGE_BUF - .as_ref() - .ok_or_else(|| mrubyedge::Error::RuntimeError("Message buffer not initialized".to_string()))? + MESSAGE_BUF.as_ref().ok_or_else(|| { + mrubyedge::Error::RuntimeError("Message buffer not initialized".to_string()) + })? }; let mut offset = 0; @@ -643,7 +648,10 @@ fn do_uzumibi_start_message() -> Result<(), mrubyedge::Error> { // body (u32 LE size + bytes) let body_size = u32::from_le_bytes([ - buf[offset], buf[offset + 1], buf[offset + 2], buf[offset + 3], + buf[offset], + buf[offset + 1], + buf[offset + 2], + buf[offset + 3], ]) as usize; offset += 4; let body = String::from_utf8_lossy(&buf[offset..offset + body_size]).to_string(); @@ -651,7 +659,10 @@ fn do_uzumibi_start_message() -> Result<(), mrubyedge::Error> { // attempts (u32 LE) let attempts = u32::from_le_bytes([ - buf[offset], buf[offset + 1], buf[offset + 2], buf[offset + 3], + buf[offset], + buf[offset + 1], + buf[offset + 2], + buf[offset + 3], ]) as i64; // Create Uzumibi::Message instance @@ -666,17 +677,21 @@ fn do_uzumibi_start_message() -> Result<(), mrubyedge::Error> { )); } }; - let message_class = uzumibi_module - .get_const_by_name("Message") - .ok_or_else(|| { - mrubyedge::Error::RuntimeError("Uzumibi::Message class not found".to_string()) - })?; + let message_class = uzumibi_module.get_const_by_name("Message").ok_or_else(|| { + mrubyedge::Error::RuntimeError("Uzumibi::Message class not found".to_string()) + })?; let message = mrb_funcall(vm, Some(message_class), "new", &[])?; message.set_ivar("@id", RObject::string(id).to_refcount_assigned()); - message.set_ivar("@timestamp", RObject::string(timestamp).to_refcount_assigned()); + message.set_ivar( + "@timestamp", + RObject::string(timestamp).to_refcount_assigned(), + ); message.set_ivar("@body", RObject::string(body).to_refcount_assigned()); - message.set_ivar("@attempts", RObject::integer(attempts).to_refcount_assigned()); + message.set_ivar( + "@attempts", + RObject::integer(attempts).to_refcount_assigned(), + ); // Call $CONSUMER.on_receive(message) let consumer = vm @@ -705,8 +720,6 @@ unsafe extern "C" fn uzumibi_initialize_message(size: i32) -> u64 { unsafe extern "C" fn uzumibi_start_message() -> u32 { match do_uzumibi_start_message() { Ok(()) => 0, - Err(e) => { - set_error_to_buf(format!("Error in start_message: {}", e)) as u32 - } + Err(e) => set_error_to_buf(format!("Error in start_message: {}", e)) as u32, } } From e844a80201eb75086f4b95edd3edb330e35b4b5b Mon Sep 17 00:00:00 2001 From: Uchio Kondo Date: Mon, 9 Mar 2026 22:13:44 +0900 Subject: [PATCH 05/16] It works!!! --- uzumibi-on-cloudflare-spike/lib/consumer.rb | 3 +- uzumibi-on-cloudflare-spike/package.json | 7 ++- .../wasm-app/Cargo.toml | 5 +- .../wasm-app/src/lib.rs | 51 +++++++++++++++-- .../wrangler.queue.jsonc | 55 +++++++++++++++++++ .../wrangler.vanilla.jsonc | 14 +++++ 6 files changed, 125 insertions(+), 10 deletions(-) create mode 100644 uzumibi-on-cloudflare-spike/wrangler.queue.jsonc create mode 100644 uzumibi-on-cloudflare-spike/wrangler.vanilla.jsonc diff --git a/uzumibi-on-cloudflare-spike/lib/consumer.rb b/uzumibi-on-cloudflare-spike/lib/consumer.rb index 24238b4..c844981 100644 --- a/uzumibi-on-cloudflare-spike/lib/consumer.rb +++ b/uzumibi-on-cloudflare-spike/lib/consumer.rb @@ -3,7 +3,8 @@ class Consumer < Uzumibi::Consumer def on_receive(message) debug_console("[Consumer] Received message: id=#{message.id}, body=#{message.body}, attempts=#{message.attempts}") - if message.id == "example" + if message.attempts > 5 + debug_console("[Consumer] Acknowledging message #{message.id} after 5 attempts!!") message.ack! else message.retry(delay_seconds: 10) diff --git a/uzumibi-on-cloudflare-spike/package.json b/uzumibi-on-cloudflare-spike/package.json index 25eeaf2..a4fdeaf 100644 --- a/uzumibi-on-cloudflare-spike/package.json +++ b/uzumibi-on-cloudflare-spike/package.json @@ -5,12 +5,13 @@ "scripts": { "deploy": "npm run build:wasm:asyncify && wrangler deploy", "dev": "npm run build:wasm:asyncify && wrangler dev", - "dev:vanilla": "npm run build:wasm:vanilla && wrangler dev --main src/index.vanilla.js", + "dev:vanilla": "npm run build:wasm:vanilla && wrangler dev --config wrangler.vanilla.jsonc", "build:wasm:vanilla": "cargo build --package uzumibi-on-cloudflare-spike --target wasm32-unknown-unknown --release --no-default-features && cp -v -f ../target/wasm32-unknown-unknown/release/uzumibi_on_cloudflare_spike.wasm src/", "build:wasm:asyncify": "cargo build --package uzumibi-on-cloudflare-spike --target wasm32-unknown-unknown --release --features enable-external && wasm-opt --enable-bulk-memory --enable-nontrapping-float-to-int --asyncify -O2 ../target/wasm32-unknown-unknown/release/uzumibi_on_cloudflare_spike.wasm -o ./src/uzumibi_on_cloudflare_spike.wasm", "build:wasm:queue": "cargo build --package uzumibi-on-cloudflare-spike --target wasm32-unknown-unknown --release --features queue && wasm-opt --enable-bulk-memory --enable-nontrapping-float-to-int --asyncify -O2 ../target/wasm32-unknown-unknown/release/uzumibi_on_cloudflare_spike.wasm -o ./src/uzumibi_on_cloudflare_spike_queue.wasm", - "dev:queue": "npm run build:wasm:queue && wrangler dev --main src/index.queue.js", - "deploy:queue": "npm run build:wasm:queue && wrangler deploy --main src/index.queue.js", + "build:wasm:queue:debug-stub": "cargo build --package uzumibi-on-cloudflare-spike --target wasm32-unknown-unknown --release --features debug-stub && wasm-opt --enable-bulk-memory --enable-nontrapping-float-to-int --asyncify -O2 ../target/wasm32-unknown-unknown/release/uzumibi_on_cloudflare_spike.wasm -o ./src/uzumibi_on_cloudflare_spike_queue.wasm", + "dev:queue": "npm run build:wasm:queue && wrangler dev --config wrangler.queue.jsonc", + "deploy:queue": "npm run build:wasm:queue && wrangler deploy --config wrangler.queue.jsonc", "start": "wrangler dev", "test": "vitest" }, diff --git a/uzumibi-on-cloudflare-spike/wasm-app/Cargo.toml b/uzumibi-on-cloudflare-spike/wasm-app/Cargo.toml index 87f9649..5789375 100644 --- a/uzumibi-on-cloudflare-spike/wasm-app/Cargo.toml +++ b/uzumibi-on-cloudflare-spike/wasm-app/Cargo.toml @@ -17,6 +17,7 @@ uzumibi-art-router = ">= 0.3.1" mruby-compiler2-sys = ">= 0.3.0" [features] -default = ["enable-external"] # , "queue"] +default = [] enable-external = [] -queue = ["enable-external"] +queue = [] +debug-stub = ["queue"] diff --git a/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs b/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs index 7321a26..d5a01cb 100644 --- a/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs +++ b/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs @@ -9,7 +9,7 @@ use mrubyedge::{ yamrb::{ helpers::{mrb_define_class_cmethod, mrb_define_cmethod, mrb_funcall}, prelude::hash::{mrb_hash_new, mrb_hash_set_index}, - value::{RObject, RValue}, + value::{RObject, RSym, RValue}, vm::VM, }, }; @@ -367,19 +367,33 @@ fn uzumibi_queue_class_send( /// Message.ack! -> delegates to JS #[cfg(feature = "queue")] fn uzumibi_message_ack(vm: &mut VM, args: &[Rc]) -> Result, mrubyedge::Error> { - // self is args[0] - let self_obj = &args[0]; + debug_console_log_internal("[debug] uzumibi_message_ack: called"); + let self_obj = vm.getself()?; let id_obj = self_obj.get_ivar("@id"); if matches!(id_obj.as_ref().value, RValue::Nil) { + debug_console_log_internal("[debug] uzumibi_message_ack: @id is nil"); return Err(mrubyedge::Error::RuntimeError( "Message object does not have @id".to_string(), )); } let id = mrb_funcall(vm, id_obj.into(), "to_s", &[])?; let id: String = id.as_ref().try_into()?; + debug_console_log_internal(&format!("[debug] uzumibi_message_ack: id={}", id)); + #[cfg(feature = "debug-stub")] + { + debug_console_log_internal("[debug] uzumibi_message_ack: STUB - skipping JS call"); + } + #[cfg(not(feature = "debug-stub"))] unsafe { + debug_console_log_internal( + "[debug] uzumibi_message_ack: calling JS uzumibi_cf_message_ack", + ); let result = uzumibi_cf_message_ack(id.as_ptr(), id.len()); + debug_console_log_internal(&format!( + "[debug] uzumibi_message_ack: JS returned {}", + result + )); if result != 0 { return Err(mrubyedge::Error::RuntimeError(format!( "Failed to ack message: return code {}", @@ -396,15 +410,18 @@ fn uzumibi_message_retry( vm: &mut VM, args: &[Rc], ) -> Result, mrubyedge::Error> { - let self_obj = &args[0]; + debug_console_log_internal("[debug] uzumibi_message_retry: called"); + let self_obj = vm.getself()?; let id_obj = self_obj.get_ivar("@id"); if matches!(id_obj.as_ref().value, RValue::Nil) { + debug_console_log_internal("[debug] uzumibi_message_retry: @id is nil"); return Err(mrubyedge::Error::RuntimeError( "Message object does not have @id".to_string(), )); } let id = mrb_funcall(vm, id_obj.into(), "to_s", &[])?; let id: String = id.as_ref().try_into()?; + debug_console_log_internal(&format!("[debug] uzumibi_message_retry: id={}", id)); let delay_seconds: i32 = match vm.get_kwargs() { Some(kwargs) => match kwargs.get("delay_seconds") { @@ -416,9 +433,25 @@ fn uzumibi_message_retry( }, None => 0, }; + debug_console_log_internal(&format!( + "[debug] uzumibi_message_retry: delay_seconds={}", + delay_seconds + )); + #[cfg(feature = "debug-stub")] + { + debug_console_log_internal("[debug] uzumibi_message_retry: STUB - skipping JS call"); + } + #[cfg(not(feature = "debug-stub"))] unsafe { + debug_console_log_internal( + "[debug] uzumibi_message_retry: calling JS uzumibi_cf_message_retry", + ); let result = uzumibi_cf_message_retry(id.as_ptr(), id.len(), delay_seconds); + debug_console_log_internal(&format!( + "[debug] uzumibi_message_retry: JS returned {}", + result + )); if result != 0 { return Err(mrubyedge::Error::RuntimeError(format!( "Failed to retry message: return code {}", @@ -503,6 +536,16 @@ fn init_vm() -> Result { // Uzumibi::Message with ack! and retry methods let message_class = vm.define_class("Message", None, Some(uzumibi_module)); + let message_class_obj = RObject::class(message_class.clone(), &mut vm); + for attr in ["id", "timestamp", "body", "attempts"] { + mrb_funcall( + &mut vm, + Some(message_class_obj.clone()), + "attr_accessor", + &[RObject::symbol(RSym::new(attr.to_string())).to_refcount_assigned()], + ) + .expect("attr_accessor failed"); + } mrb_define_cmethod( &mut vm, message_class.clone(), diff --git a/uzumibi-on-cloudflare-spike/wrangler.queue.jsonc b/uzumibi-on-cloudflare-spike/wrangler.queue.jsonc new file mode 100644 index 0000000..bcfd281 --- /dev/null +++ b/uzumibi-on-cloudflare-spike/wrangler.queue.jsonc @@ -0,0 +1,55 @@ +/** + * Queue consumer build config + * For more details on how to configure Wrangler, refer to: + * https://developers.cloudflare.com/workers/wrangler/configuration/ + */ +{ + "$schema": "node_modules/wrangler/config-schema.json", + "name": "uzumibi-on-cloudflare-spike-queue", + "main": "src/index.queue.js", + "compatibility_date": "2025-12-30", + "observability": { + "enabled": true + }, + /** + * Durable Objects + * Used for Uzumibi::KV.get/set + * https://developers.cloudflare.com/durable-objects/ + */ + // "durable_objects": { + // "bindings": [ + // { + // "name": "UZUMIBI_KV_DATA", + // "class_name": "UzumibiKVObject" + // } + // ] + // }, + // "migrations": [ + // { + // "tag": "v1", + // "new_sqlite_classes": [ + // "UzumibiKVObject" + // ] + // } + // ], + /** + * Queues + * Used for Uzumibi::Queue.send (producer) and Uzumibi::Consumer (consumer) + * https://developers.cloudflare.com/queues/ + */ + "queues": { + "producers": [ + { + "binding": "UZUMIBI_QUEUE", + "queue": "uzumibi-queue" + } + ], + "consumers": [ + { + "queue": "uzumibi-queue", + "max_batch_size": 10, + "max_batch_timeout": 5 + } + ] + } +} \ No newline at end of file diff --git a/uzumibi-on-cloudflare-spike/wrangler.vanilla.jsonc b/uzumibi-on-cloudflare-spike/wrangler.vanilla.jsonc new file mode 100644 index 0000000..c51d73d --- /dev/null +++ b/uzumibi-on-cloudflare-spike/wrangler.vanilla.jsonc @@ -0,0 +1,14 @@ +/** + * Vanilla (non-asyncify) build config + * For more details on how to configure Wrangler, refer to: + * https://developers.cloudflare.com/workers/wrangler/configuration/ + */ +{ + "$schema": "node_modules/wrangler/config-schema.json", + "name": "uzumibi-on-cloudflare-spike", + "main": "src/index.vanilla.js", + "compatibility_date": "2025-12-30", + "observability": { + "enabled": true + } +} \ No newline at end of file From 4a4ba3b07c5a64c85cfa5d57be50f4f0e8c01a6c Mon Sep 17 00:00:00 2001 From: Uchio Kondo Date: Mon, 9 Mar 2026 22:32:57 +0900 Subject: [PATCH 06/16] Find message from batch by id --- uzumibi-on-cloudflare-spike/lib/consumer.rb | 6 +++--- uzumibi-on-cloudflare-spike/src/index.queue.js | 18 +++++++++++------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/uzumibi-on-cloudflare-spike/lib/consumer.rb b/uzumibi-on-cloudflare-spike/lib/consumer.rb index c844981..c3a628d 100644 --- a/uzumibi-on-cloudflare-spike/lib/consumer.rb +++ b/uzumibi-on-cloudflare-spike/lib/consumer.rb @@ -3,11 +3,11 @@ class Consumer < Uzumibi::Consumer def on_receive(message) debug_console("[Consumer] Received message: id=#{message.id}, body=#{message.body}, attempts=#{message.attempts}") - if message.attempts > 5 - debug_console("[Consumer] Acknowledging message #{message.id} after 5 attempts!!") + if message.attempts > 3 + debug_console("[Consumer] Acknowledging message #{message.id} after 3 attempts!!") message.ack! else - message.retry(delay_seconds: 10) + message.retry(delay_seconds: 3) end end end diff --git a/uzumibi-on-cloudflare-spike/src/index.queue.js b/uzumibi-on-cloudflare-spike/src/index.queue.js index 85e445b..a0a89cf 100644 --- a/uzumibi-on-cloudflare-spike/src/index.queue.js +++ b/uzumibi-on-cloudflare-spike/src/index.queue.js @@ -9,7 +9,11 @@ export default { const encoder = new TextEncoder(); // Current message being processed (set per iteration) - let currentMessage = null; + const getMessage = (id) => { + const message = batch.messages.find((m) => m.id === id); + if (!message) throw new Error(`Message not found for id: ${id}`); + return message; + }; const importObject = { env: { @@ -20,13 +24,15 @@ export default { return 0; }, - uzumibi_cf_message_ack: async (_idPtr, _idSize) => { - currentMessage.ack(); + uzumibi_cf_message_ack: async (idPtr, idSize) => { + const id = decoder.decode(new Uint8Array(exports.memory.buffer, idPtr, idSize)); + getMessage(id).ack(); return 0; }, - uzumibi_cf_message_retry: async (_idPtr, _idSize, delaySeconds) => { - currentMessage.retry({ delaySeconds }); + uzumibi_cf_message_retry: async (idPtr, idSize, delaySeconds) => { + const id = decoder.decode(new Uint8Array(exports.memory.buffer, idPtr, idSize)); + getMessage(id).retry({ delaySeconds }); return 0; }, }, @@ -36,8 +42,6 @@ export default { const exports = instance.exports; for (const message of batch.messages) { - currentMessage = message; - const idBytes = encoder.encode(message.id); const timestampBytes = encoder.encode( message.timestamp.toISOString(), From aca261142493eb2cb29ed0dc729d19a0fde4894d Mon Sep 17 00:00:00 2001 From: Uchio Kondo Date: Mon, 9 Mar 2026 22:40:19 +0900 Subject: [PATCH 07/16] Remove debugs --- uzumibi-gem/src/helpers.rs | 2 +- uzumibi-gem/src/request.rs | 3 ++- .../wasm-app/src/lib.rs | 24 ------------------- 3 files changed, 3 insertions(+), 26 deletions(-) diff --git a/uzumibi-gem/src/helpers.rs b/uzumibi-gem/src/helpers.rs index 78dbb86..31edf7b 100644 --- a/uzumibi-gem/src/helpers.rs +++ b/uzumibi-gem/src/helpers.rs @@ -14,7 +14,7 @@ use std::collections::HashMap; /// # Example /// ``` /// use uzumibi_gem::helpers::parse_x_www_form_urlencoded; -/// +/// /// let data = b"name=John+Doe&age=30&city=New%20York"; /// let params = parse_x_www_form_urlencoded(data); /// assert_eq!(params.get("name"), Some(&"John Doe".to_string())); diff --git a/uzumibi-gem/src/request.rs b/uzumibi-gem/src/request.rs index 5ea2235..eb5cc42 100644 --- a/uzumibi-gem/src/request.rs +++ b/uzumibi-gem/src/request.rs @@ -260,7 +260,8 @@ impl Request { { let body_rstr = RObject::string_from_vec(self.body.clone()).to_refcount_assigned(); - if let Ok(json_value) = mrubyedge_serde_json::mrb_json_class_load(vm, &[body_rstr]) + if let Ok(json_value) = + mrubyedge_serde_json::mrb_json_class_load(vm, &[body_rstr]) { // If json_value is a Hash, set key-value pairs to params if let RValue::Hash(h) = &json_value.value { diff --git a/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs b/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs index d5a01cb..32372b6 100644 --- a/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs +++ b/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs @@ -367,18 +367,15 @@ fn uzumibi_queue_class_send( /// Message.ack! -> delegates to JS #[cfg(feature = "queue")] fn uzumibi_message_ack(vm: &mut VM, args: &[Rc]) -> Result, mrubyedge::Error> { - debug_console_log_internal("[debug] uzumibi_message_ack: called"); let self_obj = vm.getself()?; let id_obj = self_obj.get_ivar("@id"); if matches!(id_obj.as_ref().value, RValue::Nil) { - debug_console_log_internal("[debug] uzumibi_message_ack: @id is nil"); return Err(mrubyedge::Error::RuntimeError( "Message object does not have @id".to_string(), )); } let id = mrb_funcall(vm, id_obj.into(), "to_s", &[])?; let id: String = id.as_ref().try_into()?; - debug_console_log_internal(&format!("[debug] uzumibi_message_ack: id={}", id)); #[cfg(feature = "debug-stub")] { @@ -386,14 +383,7 @@ fn uzumibi_message_ack(vm: &mut VM, args: &[Rc]) -> Result, } #[cfg(not(feature = "debug-stub"))] unsafe { - debug_console_log_internal( - "[debug] uzumibi_message_ack: calling JS uzumibi_cf_message_ack", - ); let result = uzumibi_cf_message_ack(id.as_ptr(), id.len()); - debug_console_log_internal(&format!( - "[debug] uzumibi_message_ack: JS returned {}", - result - )); if result != 0 { return Err(mrubyedge::Error::RuntimeError(format!( "Failed to ack message: return code {}", @@ -410,18 +400,15 @@ fn uzumibi_message_retry( vm: &mut VM, args: &[Rc], ) -> Result, mrubyedge::Error> { - debug_console_log_internal("[debug] uzumibi_message_retry: called"); let self_obj = vm.getself()?; let id_obj = self_obj.get_ivar("@id"); if matches!(id_obj.as_ref().value, RValue::Nil) { - debug_console_log_internal("[debug] uzumibi_message_retry: @id is nil"); return Err(mrubyedge::Error::RuntimeError( "Message object does not have @id".to_string(), )); } let id = mrb_funcall(vm, id_obj.into(), "to_s", &[])?; let id: String = id.as_ref().try_into()?; - debug_console_log_internal(&format!("[debug] uzumibi_message_retry: id={}", id)); let delay_seconds: i32 = match vm.get_kwargs() { Some(kwargs) => match kwargs.get("delay_seconds") { @@ -433,10 +420,6 @@ fn uzumibi_message_retry( }, None => 0, }; - debug_console_log_internal(&format!( - "[debug] uzumibi_message_retry: delay_seconds={}", - delay_seconds - )); #[cfg(feature = "debug-stub")] { @@ -444,14 +427,7 @@ fn uzumibi_message_retry( } #[cfg(not(feature = "debug-stub"))] unsafe { - debug_console_log_internal( - "[debug] uzumibi_message_retry: calling JS uzumibi_cf_message_retry", - ); let result = uzumibi_cf_message_retry(id.as_ptr(), id.len(), delay_seconds); - debug_console_log_internal(&format!( - "[debug] uzumibi_message_retry: JS returned {}", - result - )); if result != 0 { return Err(mrubyedge::Error::RuntimeError(format!( "Failed to retry message: return code {}", From 1355b0743ef06a5de5405a461ca59e3cd9b26cfc Mon Sep 17 00:00:00 2001 From: Uchio Kondo Date: Mon, 9 Mar 2026 22:42:40 +0900 Subject: [PATCH 08/16] Remove stubbing --- uzumibi-on-cloudflare-spike/package.json | 1 - uzumibi-on-cloudflare-spike/wasm-app/Cargo.toml | 3 +-- uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs | 10 ---------- 3 files changed, 1 insertion(+), 13 deletions(-) diff --git a/uzumibi-on-cloudflare-spike/package.json b/uzumibi-on-cloudflare-spike/package.json index a4fdeaf..6dcb288 100644 --- a/uzumibi-on-cloudflare-spike/package.json +++ b/uzumibi-on-cloudflare-spike/package.json @@ -9,7 +9,6 @@ "build:wasm:vanilla": "cargo build --package uzumibi-on-cloudflare-spike --target wasm32-unknown-unknown --release --no-default-features && cp -v -f ../target/wasm32-unknown-unknown/release/uzumibi_on_cloudflare_spike.wasm src/", "build:wasm:asyncify": "cargo build --package uzumibi-on-cloudflare-spike --target wasm32-unknown-unknown --release --features enable-external && wasm-opt --enable-bulk-memory --enable-nontrapping-float-to-int --asyncify -O2 ../target/wasm32-unknown-unknown/release/uzumibi_on_cloudflare_spike.wasm -o ./src/uzumibi_on_cloudflare_spike.wasm", "build:wasm:queue": "cargo build --package uzumibi-on-cloudflare-spike --target wasm32-unknown-unknown --release --features queue && wasm-opt --enable-bulk-memory --enable-nontrapping-float-to-int --asyncify -O2 ../target/wasm32-unknown-unknown/release/uzumibi_on_cloudflare_spike.wasm -o ./src/uzumibi_on_cloudflare_spike_queue.wasm", - "build:wasm:queue:debug-stub": "cargo build --package uzumibi-on-cloudflare-spike --target wasm32-unknown-unknown --release --features debug-stub && wasm-opt --enable-bulk-memory --enable-nontrapping-float-to-int --asyncify -O2 ../target/wasm32-unknown-unknown/release/uzumibi_on_cloudflare_spike.wasm -o ./src/uzumibi_on_cloudflare_spike_queue.wasm", "dev:queue": "npm run build:wasm:queue && wrangler dev --config wrangler.queue.jsonc", "deploy:queue": "npm run build:wasm:queue && wrangler deploy --config wrangler.queue.jsonc", "start": "wrangler dev", diff --git a/uzumibi-on-cloudflare-spike/wasm-app/Cargo.toml b/uzumibi-on-cloudflare-spike/wasm-app/Cargo.toml index 5789375..cbe62cd 100644 --- a/uzumibi-on-cloudflare-spike/wasm-app/Cargo.toml +++ b/uzumibi-on-cloudflare-spike/wasm-app/Cargo.toml @@ -19,5 +19,4 @@ mruby-compiler2-sys = ">= 0.3.0" [features] default = [] enable-external = [] -queue = [] -debug-stub = ["queue"] +queue = ["enable-external"] diff --git a/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs b/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs index 32372b6..cb1bb10 100644 --- a/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs +++ b/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs @@ -377,11 +377,6 @@ fn uzumibi_message_ack(vm: &mut VM, args: &[Rc]) -> Result, let id = mrb_funcall(vm, id_obj.into(), "to_s", &[])?; let id: String = id.as_ref().try_into()?; - #[cfg(feature = "debug-stub")] - { - debug_console_log_internal("[debug] uzumibi_message_ack: STUB - skipping JS call"); - } - #[cfg(not(feature = "debug-stub"))] unsafe { let result = uzumibi_cf_message_ack(id.as_ptr(), id.len()); if result != 0 { @@ -421,11 +416,6 @@ fn uzumibi_message_retry( None => 0, }; - #[cfg(feature = "debug-stub")] - { - debug_console_log_internal("[debug] uzumibi_message_retry: STUB - skipping JS call"); - } - #[cfg(not(feature = "debug-stub"))] unsafe { let result = uzumibi_cf_message_retry(id.as_ptr(), id.len(), delay_seconds); if result != 0 { From 2029adb77be47c5c26b656d7c153675201923f07 Mon Sep 17 00:00:00 2001 From: Uchio Kondo Date: Mon, 9 Mar 2026 22:47:57 +0900 Subject: [PATCH 09/16] Support externals --- .../src/index.queue.js | 126 ++++++++++++++++++ .../wrangler.queue.jsonc | 32 ++--- 2 files changed, 142 insertions(+), 16 deletions(-) diff --git a/uzumibi-on-cloudflare-spike/src/index.queue.js b/uzumibi-on-cloudflare-spike/src/index.queue.js index a0a89cf..82a02dd 100644 --- a/uzumibi-on-cloudflare-spike/src/index.queue.js +++ b/uzumibi-on-cloudflare-spike/src/index.queue.js @@ -1,13 +1,33 @@ +import { DurableObject } from "cloudflare:workers"; import { instantiate } from "asyncify-wasm"; import mod from "./uzumibi_on_cloudflare_spike_queue.wasm"; const wasmModule = mod; +/** + * Durable Object for Uzumibi::KV storage + */ +export class UzumibiKVObject extends DurableObject { + async get(key) { + const value = await this.ctx.storage.get(key); + return value ?? null; + } + + async set(key, value) { + await this.ctx.storage.put(key, value); + } +} + export default { async queue(batch, env, ctx) { const decoder = new TextDecoder(); const encoder = new TextEncoder(); + // Durable Object stub (if binding exists) + const doStub = env.UZUMIBI_KV_DATA + ? env.UZUMIBI_KV_DATA.getByName("default") + : null; + // Current message being processed (set per iteration) const getMessage = (id) => { const message = batch.messages.find((m) => m.id === id); @@ -24,6 +44,112 @@ export default { return 0; }, + // Fetch.fetch(url, method, body) -> packed Uzumibi::Response + // Dummy implementation: queue consumers don't have access to fetch directly, + // but enable-external is enabled when queue feature is active. + uzumibi_cf_fetch: async ( + urlPtr, urlSize, + methodPtr, methodSize, + bodyPtr, bodySize, + resultPtr, resultMaxSize, + ) => { + const memory = exports.memory; + const url = decoder.decode(new Uint8Array(memory.buffer, urlPtr, urlSize)); + const method = decoder.decode(new Uint8Array(memory.buffer, methodPtr, methodSize)); + const body = bodySize > 0 + ? decoder.decode(new Uint8Array(memory.buffer, bodyPtr, bodySize)) + : null; + + const fetchOptions = { method }; + if (body && method !== "GET" && method !== "HEAD") { + fetchOptions.body = body; + } + + const response = await fetch(url, fetchOptions); + const responseBody = await response.text(); + + const respHeaders = []; + response.headers.forEach((value, key) => { + respHeaders.push({ key, value }); + }); + + const resultView = new DataView(memory.buffer, resultPtr, resultMaxSize); + const resultBuffer = new Uint8Array(memory.buffer, resultPtr, resultMaxSize); + let pos = 0; + + resultView.setUint16(pos, response.status, true); + pos += 2; + + resultView.setUint16(pos, respHeaders.length, true); + pos += 2; + + for (const header of respHeaders) { + const keyBytes = encoder.encode(header.key); + resultView.setUint16(pos, keyBytes.length, true); + pos += 2; + resultBuffer.set(keyBytes, pos); + pos += keyBytes.length; + + const valueBytes = encoder.encode(header.value); + resultView.setUint16(pos, valueBytes.length, true); + pos += 2; + resultBuffer.set(valueBytes, pos); + pos += valueBytes.length; + } + + const bodyBytes = encoder.encode(responseBody); + resultView.setUint32(pos, bodyBytes.length, true); + pos += 4; + + const bodyLen = Math.min(bodyBytes.length, resultMaxSize - pos); + resultBuffer.set(bodyBytes.slice(0, bodyLen), pos); + pos += bodyLen; + + return pos; + }, + + // KV.get(key) -> value string (via Durable Object) + uzumibi_cf_durable_object_get: async (keyPtr, keySize, resultPtr, resultMaxSize) => { + if (!doStub) return -1; + const memory = exports.memory; + const key = decoder.decode(new Uint8Array(memory.buffer, keyPtr, keySize)); + + const value = await doStub.get(key); + if (value === null) return -1; + + const valueBytes = encoder.encode(value); + const length = Math.min(valueBytes.length, resultMaxSize); + const resultBuffer = new Uint8Array(memory.buffer, resultPtr, resultMaxSize); + resultBuffer.set(valueBytes.slice(0, length)); + return length; + }, + + // KV.set(key, value) (via Durable Object) + uzumibi_cf_durable_object_set: async (keyPtr, keySize, valuePtr, valueSize) => { + if (!doStub) return -1; + const memory = exports.memory; + const key = decoder.decode(new Uint8Array(memory.buffer, keyPtr, keySize)); + const value = decoder.decode(new Uint8Array(memory.buffer, valuePtr, valueSize)); + + await doStub.set(key, value); + return 0; + }, + + // Queue.send(queue_name, message) + uzumibi_cf_queue_send: async (queueNamePtr, queueNameSize, messagePtr, messageSize) => { + const memory = exports.memory; + const queueName = decoder.decode(new Uint8Array(memory.buffer, queueNamePtr, queueNameSize)); + const message = decoder.decode(new Uint8Array(memory.buffer, messagePtr, messageSize)); + + const queue = env[queueName]; + if (!queue) { + console.error(`Queue binding '${queueName}' not found`); + return -1; + } + await queue.send(message); + return 0; + }, + uzumibi_cf_message_ack: async (idPtr, idSize) => { const id = decoder.decode(new Uint8Array(exports.memory.buffer, idPtr, idSize)); getMessage(id).ack(); diff --git a/uzumibi-on-cloudflare-spike/wrangler.queue.jsonc b/uzumibi-on-cloudflare-spike/wrangler.queue.jsonc index bcfd281..d988a97 100644 --- a/uzumibi-on-cloudflare-spike/wrangler.queue.jsonc +++ b/uzumibi-on-cloudflare-spike/wrangler.queue.jsonc @@ -16,22 +16,22 @@ * Used for Uzumibi::KV.get/set * https://developers.cloudflare.com/durable-objects/ */ - // "durable_objects": { - // "bindings": [ - // { - // "name": "UZUMIBI_KV_DATA", - // "class_name": "UzumibiKVObject" - // } - // ] - // }, - // "migrations": [ - // { - // "tag": "v1", - // "new_sqlite_classes": [ - // "UzumibiKVObject" - // ] - // } - // ], + "durable_objects": { + "bindings": [ + { + "name": "UZUMIBI_KV_DATA", + "class_name": "UzumibiKVObject" + } + ] + }, + "migrations": [ + { + "tag": "v1", + "new_sqlite_classes": [ + "UzumibiKVObject" + ] + } + ], /** * Queues * Used for Uzumibi::Queue.send (producer) and Uzumibi::Consumer (consumer) From f07243df39da721e529647d0e2e9835adc09ca7b Mon Sep 17 00:00:00 2001 From: Uchio Kondo Date: Mon, 9 Mar 2026 22:50:34 +0900 Subject: [PATCH 10/16] Refinements --- uzumibi-on-cloudflare-spike/src/index.queue.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/uzumibi-on-cloudflare-spike/src/index.queue.js b/uzumibi-on-cloudflare-spike/src/index.queue.js index 82a02dd..6c03e1c 100644 --- a/uzumibi-on-cloudflare-spike/src/index.queue.js +++ b/uzumibi-on-cloudflare-spike/src/index.queue.js @@ -19,6 +19,10 @@ export class UzumibiKVObject extends DurableObject { } export default { + async fetch(request, env, ctx) { + return new Response("This endpoint is for queue processing. Please send messages to the queue instead.", { status: 400 }); + }, + async queue(batch, env, ctx) { const decoder = new TextDecoder(); const encoder = new TextEncoder(); @@ -45,8 +49,6 @@ export default { }, // Fetch.fetch(url, method, body) -> packed Uzumibi::Response - // Dummy implementation: queue consumers don't have access to fetch directly, - // but enable-external is enabled when queue feature is active. uzumibi_cf_fetch: async ( urlPtr, urlSize, methodPtr, methodSize, From 2fc476e4bd2a021f4f53cb0928c010e3bac8978a Mon Sep 17 00:00:00 2001 From: Uchio Kondo Date: Mon, 9 Mar 2026 22:52:36 +0900 Subject: [PATCH 11/16] Use same name --- uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs b/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs index cb1bb10..3443aa4 100644 --- a/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs +++ b/uzumibi-on-cloudflare-spike/wasm-app/src/lib.rs @@ -14,9 +14,10 @@ use mrubyedge::{ }, }; +#[cfg(not(feature = "queue"))] static MRB: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/app.mrb")); #[cfg(feature = "queue")] -static CONSUMER_MRB: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/consumer.mrb")); +static MRB: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/consumer.mrb")); static mut MRUBY_VM: MaybeUninit = MaybeUninit::uninit(); static mut MRUBY_VM_LOADED: bool = false; @@ -529,16 +530,6 @@ fn init_vm() -> Result { vm.run() .map_err(|e| mrubyedge::Error::RuntimeError(format!("Failed to init VM: {:?}", e)))?; - #[cfg(feature = "queue")] - { - let mut consumer_rite = rite::load(CONSUMER_MRB).map_err(|e| { - mrubyedge::Error::RuntimeError(format!("Failed to load consumer mruby: {:?}", e)) - })?; - vm.eval_rite(&mut consumer_rite).map_err(|e| { - mrubyedge::Error::RuntimeError(format!("Failed to run consumer: {:?}", e)) - })?; - } - Ok(vm) } From 452b98cb0c1c2945bcaa70ca639bff796cef2f93 Mon Sep 17 00:00:00 2001 From: Uchio Kondo Date: Mon, 9 Mar 2026 23:25:25 +0900 Subject: [PATCH 12/16] Initial support of queue feature --- uzumibi-cli/src/main.rs | 34 +- .../enable-external/wrangler.jsonc | 28 +- .../__features__/queue/lib/consumer.rb | 15 + .../__features__/queue/package.json | 18 + .../__features__/queue/src/index.js | 266 +++++++ .../queue/wasm-app/.cargo/config.toml | 12 + .../__features__/queue/wasm-app/Cargo.toml_ | 22 + .../__features__/queue/wasm-app/build.rs | 16 + .../__features__/queue/wasm-app/src/lib.rs | 727 ++++++++++++++++++ .../__features__/queue/wrangler.jsonc | 55 ++ 10 files changed, 1177 insertions(+), 16 deletions(-) create mode 100644 uzumibi-cli/templates/cloudflare/__features__/queue/lib/consumer.rb create mode 100644 uzumibi-cli/templates/cloudflare/__features__/queue/package.json create mode 100644 uzumibi-cli/templates/cloudflare/__features__/queue/src/index.js create mode 100644 uzumibi-cli/templates/cloudflare/__features__/queue/wasm-app/.cargo/config.toml create mode 100644 uzumibi-cli/templates/cloudflare/__features__/queue/wasm-app/Cargo.toml_ create mode 100644 uzumibi-cli/templates/cloudflare/__features__/queue/wasm-app/build.rs create mode 100644 uzumibi-cli/templates/cloudflare/__features__/queue/wasm-app/src/lib.rs create mode 100644 uzumibi-cli/templates/cloudflare/__features__/queue/wrangler.jsonc diff --git a/uzumibi-cli/src/main.rs b/uzumibi-cli/src/main.rs index d9d7827..6a55f78 100644 --- a/uzumibi-cli/src/main.rs +++ b/uzumibi-cli/src/main.rs @@ -91,7 +91,12 @@ fn create_project( println!("Creating project '{}'...", project_name); // Collect feature overlay paths to know which files to skip from base - let feature_files = collect_feature_overlay_files(template, features); + let mut feature_files = collect_feature_overlay_files(template, features); + + // When queue feature is active, skip app.rb (consumer.rb replaces it) + if features.iter().any(|f| f == "queue") { + feature_files.insert("lib/app.rb".to_string()); + } // Copy base template files (skip files that will be overridden by feature overlays) copy_dir_recursive( @@ -338,14 +343,17 @@ fn show_diff(existing_file: &Path, new_content: &[u8]) -> Result<(), Box String { let project_name_underscore = project_name.replace('-', "_"); + let project_name_kebab = project_name.replace('_', "-"); content .replace("$$PROJECT_NAME$$", project_name) .replace("$$PROJECT_NAME_UNDERSCORE$$", &project_name_underscore) + .replace("$$PROJECT_NAME_KEBAB$$", &project_name_kebab) } fn print_project_next_steps(template: &str, project_name: &str, features: &[String]) { let has_enable_external = features.iter().any(|f| f == "enable-external"); + let has_queue = features.iter().any(|f| f == "queue"); println!("\nNext steps:"); match template { @@ -358,7 +366,7 @@ fn print_project_next_steps(template: &str, project_name: &str, features: &[Stri println!(" \x1b[36mrustup target add wasm32-unknown-unknown\x1b[0m"); println!(" • Node.js tools:"); println!(" \x1b[36mnpm install -g pnpm wrangler\x1b[0m"); - if has_enable_external { + if has_enable_external || has_queue { println!(" • wasm-opt (Binaryen, required for asyncify):"); println!(" \x1b[36mbrew install binaryen\x1b[0m"); println!(" Or visit: https://github.com/WebAssembly/binaryen/releases"); @@ -370,7 +378,27 @@ fn print_project_next_steps(template: &str, project_name: &str, features: &[Stri println!(" \x1b[36mpnpm run dev\x1b[0m"); println!(" 3. Deploy to Cloudflare:"); println!(" \x1b[36mpnpm run deploy\x1b[0m"); - if has_enable_external { + if has_queue { + println!(); + println!(" \x1b[33mNote:\x1b[0m This project uses queue feature (Cloudflare Queues consumer)."); + println!(" Edit \x1b[36mlib/consumer.rb\x1b[0m to implement your queue consumer logic."); + println!(" The following Uzumibi APIs are available in Ruby:"); + println!( + " • \x1b[36mUzumibi::Consumer\x1b[0m → Base class for queue consumers" + ); + println!( + " • \x1b[36mUzumibi::Message#ack!\x1b[0m / \x1b[36m#retry(delay_seconds: N)\x1b[0m → Message control" + ); + println!( + " • \x1b[36mUzumibi::Fetch.fetch(url, method, body)\x1b[0m → Uzumibi::Response" + ); + println!( + " • \x1b[36mUzumibi::KV.get(key)\x1b[0m / \x1b[36mUzumibi::KV.set(key, value)\x1b[0m → Durable Object storage" + ); + println!( + " • \x1b[36mUzumibi::Queue.send(queue_name, message)\x1b[0m → Cloudflare Queue" + ); + } else if has_enable_external { println!(); println!(" \x1b[33mNote:\x1b[0m This project uses enable-external feature."); println!(" The following Uzumibi APIs are available in Ruby:"); diff --git a/uzumibi-cli/templates/cloudflare/__features__/enable-external/wrangler.jsonc b/uzumibi-cli/templates/cloudflare/__features__/enable-external/wrangler.jsonc index bc7cadc..d194163 100644 --- a/uzumibi-cli/templates/cloudflare/__features__/enable-external/wrangler.jsonc +++ b/uzumibi-cli/templates/cloudflare/__features__/enable-external/wrangler.jsonc @@ -10,6 +10,21 @@ "observability": { "enabled": true }, + /** + * Queues + * Used for Uzumibi::Queue.send + * Uncomment the following block to enable queue producer capabilities. + * And create queue via command wrangler queue create $$PROJECT_NAME_KEBAB$$-queue + * https://developers.cloudflare.com/queues/ + */ + // "queues": { + // "producers": [ + // { + // "binding": "UZUMIBI_QUEUE", + // "queue": "$$PROJECT_NAME_KEBAB$$-queue" + // } + // ] + // }, /** * Durable Objects * Used for Uzumibi::KV.get/set @@ -31,17 +46,4 @@ ] } ] - /** - * Queues - * Used for Uzumibi::Queue.send - * https://developers.cloudflare.com/queues/ - */ - // "queues": { - // "producers": [ - // { - // "binding": "MY_QUEUE", - // "queue": "my-queue-name" - // } - // ] - // } } \ No newline at end of file diff --git a/uzumibi-cli/templates/cloudflare/__features__/queue/lib/consumer.rb b/uzumibi-cli/templates/cloudflare/__features__/queue/lib/consumer.rb new file mode 100644 index 0000000..c3a628d --- /dev/null +++ b/uzumibi-cli/templates/cloudflare/__features__/queue/lib/consumer.rb @@ -0,0 +1,15 @@ +class Consumer < Uzumibi::Consumer + # @rbs message: Uzumibi::Message + def on_receive(message) + debug_console("[Consumer] Received message: id=#{message.id}, body=#{message.body}, attempts=#{message.attempts}") + + if message.attempts > 3 + debug_console("[Consumer] Acknowledging message #{message.id} after 3 attempts!!") + message.ack! + else + message.retry(delay_seconds: 3) + end + end +end + +$CONSUMER = Consumer.new diff --git a/uzumibi-cli/templates/cloudflare/__features__/queue/package.json b/uzumibi-cli/templates/cloudflare/__features__/queue/package.json new file mode 100644 index 0000000..a51c779 --- /dev/null +++ b/uzumibi-cli/templates/cloudflare/__features__/queue/package.json @@ -0,0 +1,18 @@ +{ + "name": "$$PROJECT_NAME$$", + "version": "0.0.0", + "private": true, + "scripts": { + "deploy": "npm run build:wasm:asyncify && wrangler deploy", + "dev": "npm run build:wasm:asyncify && wrangler dev", + "build:wasm:asyncify": "cargo build --package $$PROJECT_NAME$$ --target wasm32-unknown-unknown --release --features queue && wasm-opt --enable-bulk-memory --enable-nontrapping-float-to-int --asyncify -O2 target/wasm32-unknown-unknown/release/$$PROJECT_NAME_UNDERSCORE$$.wasm -o ./src/$$PROJECT_NAME_UNDERSCORE$$_queue.wasm", + "start": "wrangler dev", + "test": "vitest" + }, + "devDependencies": { + "@cloudflare/vitest-pool-workers": "^0.8.19", + "asyncify-wasm": "^1.2.0", + "vitest": "~3.2.0", + "wrangler": "^4.54.0" + } +} diff --git a/uzumibi-cli/templates/cloudflare/__features__/queue/src/index.js b/uzumibi-cli/templates/cloudflare/__features__/queue/src/index.js new file mode 100644 index 0000000..ba40e06 --- /dev/null +++ b/uzumibi-cli/templates/cloudflare/__features__/queue/src/index.js @@ -0,0 +1,266 @@ +import { DurableObject } from "cloudflare:workers"; +import { instantiate } from "asyncify-wasm"; +import mod from "./$$PROJECT_NAME_UNDERSCORE$$_queue.wasm"; + +const wasmModule = mod; + +/** + * Durable Object for Uzumibi::KV storage + */ +export class UzumibiKVObject extends DurableObject { + async get(key) { + const value = await this.ctx.storage.get(key); + return value ?? null; + } + + async set(key, value) { + await this.ctx.storage.put(key, value); + } +} + +export default { + async fetch(request, env, ctx) { + return new Response("This endpoint is for queue processing. Please send messages to the queue instead.", { status: 400 }); + }, + + async queue(batch, env, ctx) { + const decoder = new TextDecoder(); + const encoder = new TextEncoder(); + + // Durable Object stub (if binding exists) + const doStub = env.UZUMIBI_KV_DATA + ? env.UZUMIBI_KV_DATA.getByName("default") + : null; + + // Look up message by ID for concurrency safety + const getMessage = (id) => { + const message = batch.messages.find((m) => m.id === id); + if (!message) throw new Error(`Message not found for id: ${id}`); + return message; + }; + + const importObject = { + env: { + debug_console_log: (ptr, size) => { + const memory = exports.memory; + const buffer = new Uint8Array(memory.buffer, ptr, size); + console.log(`[debug]: ${decoder.decode(buffer)}`); + return 0; + }, + + // Fetch.fetch(url, method, body) -> packed Uzumibi::Response + uzumibi_cf_fetch: async ( + urlPtr, urlSize, + methodPtr, methodSize, + bodyPtr, bodySize, + resultPtr, resultMaxSize, + ) => { + const memory = exports.memory; + const url = decoder.decode(new Uint8Array(memory.buffer, urlPtr, urlSize)); + const method = decoder.decode(new Uint8Array(memory.buffer, methodPtr, methodSize)); + const body = bodySize > 0 + ? decoder.decode(new Uint8Array(memory.buffer, bodyPtr, bodySize)) + : null; + + const fetchOptions = { method }; + if (body && method !== "GET" && method !== "HEAD") { + fetchOptions.body = body; + } + + const response = await fetch(url, fetchOptions); + const responseBody = await response.text(); + + const respHeaders = []; + response.headers.forEach((value, key) => { + respHeaders.push({ key, value }); + }); + + const resultView = new DataView(memory.buffer, resultPtr, resultMaxSize); + const resultBuffer = new Uint8Array(memory.buffer, resultPtr, resultMaxSize); + let pos = 0; + + resultView.setUint16(pos, response.status, true); + pos += 2; + + resultView.setUint16(pos, respHeaders.length, true); + pos += 2; + + for (const header of respHeaders) { + const keyBytes = encoder.encode(header.key); + resultView.setUint16(pos, keyBytes.length, true); + pos += 2; + resultBuffer.set(keyBytes, pos); + pos += keyBytes.length; + + const valueBytes = encoder.encode(header.value); + resultView.setUint16(pos, valueBytes.length, true); + pos += 2; + resultBuffer.set(valueBytes, pos); + pos += valueBytes.length; + } + + const bodyBytes = encoder.encode(responseBody); + resultView.setUint32(pos, bodyBytes.length, true); + pos += 4; + + const bodyLen = Math.min(bodyBytes.length, resultMaxSize - pos); + resultBuffer.set(bodyBytes.slice(0, bodyLen), pos); + pos += bodyLen; + + return pos; + }, + + // KV.get(key) -> value string (via Durable Object) + uzumibi_cf_durable_object_get: async (keyPtr, keySize, resultPtr, resultMaxSize) => { + if (!doStub) return -1; + const memory = exports.memory; + const key = decoder.decode(new Uint8Array(memory.buffer, keyPtr, keySize)); + + const value = await doStub.get(key); + if (value === null) return -1; + + const valueBytes = encoder.encode(value); + const length = Math.min(valueBytes.length, resultMaxSize); + const resultBuffer = new Uint8Array(memory.buffer, resultPtr, resultMaxSize); + resultBuffer.set(valueBytes.slice(0, length)); + return length; + }, + + // KV.set(key, value) (via Durable Object) + uzumibi_cf_durable_object_set: async (keyPtr, keySize, valuePtr, valueSize) => { + if (!doStub) return -1; + const memory = exports.memory; + const key = decoder.decode(new Uint8Array(memory.buffer, keyPtr, keySize)); + const value = decoder.decode(new Uint8Array(memory.buffer, valuePtr, valueSize)); + + await doStub.set(key, value); + return 0; + }, + + // Queue.send(queue_name, message) + uzumibi_cf_queue_send: async (queueNamePtr, queueNameSize, messagePtr, messageSize) => { + const memory = exports.memory; + const queueName = decoder.decode(new Uint8Array(memory.buffer, queueNamePtr, queueNameSize)); + const message = decoder.decode(new Uint8Array(memory.buffer, messagePtr, messageSize)); + + const queue = env[queueName]; + if (!queue) { + console.error(`Queue binding '${queueName}' not found`); + return -1; + } + await queue.send(message); + return 0; + }, + + uzumibi_cf_message_ack: async (idPtr, idSize) => { + const id = decoder.decode(new Uint8Array(exports.memory.buffer, idPtr, idSize)); + getMessage(id).ack(); + return 0; + }, + + uzumibi_cf_message_retry: async (idPtr, idSize, delaySeconds) => { + const id = decoder.decode(new Uint8Array(exports.memory.buffer, idPtr, idSize)); + getMessage(id).retry({ delaySeconds }); + return 0; + }, + }, + }; + + const instance = await instantiate(wasmModule, importObject); + const exports = instance.exports; + + for (const message of batch.messages) { + const idBytes = encoder.encode(message.id); + const timestampBytes = encoder.encode( + message.timestamp.toISOString(), + ); + const bodyBytes = encoder.encode( + typeof message.body === "string" + ? message.body + : JSON.stringify(message.body), + ); + const attempts = message.attempts; + + // Pack message data: + // u16 LE id_size, id bytes, + // u16 LE timestamp_size, timestamp bytes, + // u32 LE body_size, body bytes, + // u32 LE attempts + const totalSize = + 2 + + idBytes.length + + 2 + + timestampBytes.length + + 4 + + bodyBytes.length + + 4; + + const msgResult = + await exports.uzumibi_initialize_message(totalSize); + const msgOffset = Number(msgResult & 0xffffffffn); + if (msgOffset === 0) { + const errOffset = Number( + (msgResult >> 32n) & 0xffffffffn, + ); + const buffer = new Uint8Array( + exports.memory.buffer, + errOffset, + ); + let errStr = ""; + for (let i = 0; buffer[i] !== 0; i++) { + errStr += String.fromCharCode(buffer[i]); + } + throw new Error( + `Failed to initialize message: ${errStr}`, + ); + } + + const msgBuffer = new Uint8Array( + exports.memory.buffer, + msgOffset, + totalSize, + ); + const dataView = new DataView( + exports.memory.buffer, + msgOffset, + ); + let pos = 0; + + // id + dataView.setUint16(pos, idBytes.length, true); + pos += 2; + msgBuffer.set(idBytes, pos); + pos += idBytes.length; + + // timestamp + dataView.setUint16(pos, timestampBytes.length, true); + pos += 2; + msgBuffer.set(timestampBytes, pos); + pos += timestampBytes.length; + + // body + dataView.setUint32(pos, bodyBytes.length, true); + pos += 4; + msgBuffer.set(bodyBytes, pos); + pos += bodyBytes.length; + + // attempts + dataView.setUint32(pos, attempts, true); + + const result = await exports.uzumibi_start_message(); + if (result !== 0) { + const buffer = new Uint8Array( + exports.memory.buffer, + result, + ); + let errStr = ""; + for (let i = 0; buffer[i] !== 0; i++) { + errStr += String.fromCharCode(buffer[i]); + } + throw new Error( + `Failed to process message: ${errStr}`, + ); + } + } + }, +}; diff --git a/uzumibi-cli/templates/cloudflare/__features__/queue/wasm-app/.cargo/config.toml b/uzumibi-cli/templates/cloudflare/__features__/queue/wasm-app/.cargo/config.toml new file mode 100644 index 0000000..cef1057 --- /dev/null +++ b/uzumibi-cli/templates/cloudflare/__features__/queue/wasm-app/.cargo/config.toml @@ -0,0 +1,12 @@ +[target.wasm32-unknown-unknown] +rustflags = [ + "-C", + "target-feature=+bulk-memory,+mutable-globals", + "-C", + "link-arg=--import-memory", + "-C", + "link-arg=--export-memory", +] + +[build] +target = "wasm32-unknown-unknown" diff --git a/uzumibi-cli/templates/cloudflare/__features__/queue/wasm-app/Cargo.toml_ b/uzumibi-cli/templates/cloudflare/__features__/queue/wasm-app/Cargo.toml_ new file mode 100644 index 0000000..d5978d1 --- /dev/null +++ b/uzumibi-cli/templates/cloudflare/__features__/queue/wasm-app/Cargo.toml_ @@ -0,0 +1,22 @@ +[package] +name = "$$PROJECT_NAME$$" +version = "0.1.0" +edition = "2024" + +[lib] +crate-type = ["cdylib", "rlib"] + +[dependencies] +mrubyedge = { version = ">= 1.1", features = [ + "no-wasi", +], default-features = false } +uzumibi-gem = ">= 0.5.0" +uzumibi-art-router = ">= 0.3.1" + +[build-dependencies] +mruby-compiler2-sys = ">= 0.3.0" + +[features] +default = ["queue"] +enable-external = [] +queue = ["enable-external"] diff --git a/uzumibi-cli/templates/cloudflare/__features__/queue/wasm-app/build.rs b/uzumibi-cli/templates/cloudflare/__features__/queue/wasm-app/build.rs new file mode 100644 index 0000000..47fbb9b --- /dev/null +++ b/uzumibi-cli/templates/cloudflare/__features__/queue/wasm-app/build.rs @@ -0,0 +1,16 @@ +use std::path::Path; + +extern crate mruby_compiler2_sys; + +fn main() { + let out_dir = std::env::var("OUT_DIR").unwrap(); + let mrb_path = Path::new(&out_dir).join("consumer.mrb"); + let code = include_str!("../lib/consumer.rb"); + println!("cargo:rerun-if-changed=../lib/consumer.rb"); + + unsafe { + let mut ctx = mruby_compiler2_sys::MRubyCompiler2Context::new(); + ctx.compile_to_file(code, &mrb_path) + .expect("failed to compile mruby script"); + } +} diff --git a/uzumibi-cli/templates/cloudflare/__features__/queue/wasm-app/src/lib.rs b/uzumibi-cli/templates/cloudflare/__features__/queue/wasm-app/src/lib.rs new file mode 100644 index 0000000..d072d9e --- /dev/null +++ b/uzumibi-cli/templates/cloudflare/__features__/queue/wasm-app/src/lib.rs @@ -0,0 +1,727 @@ +#![allow(static_mut_refs)] +extern crate mrubyedge; +extern crate uzumibi_gem; + +use std::{mem::MaybeUninit, rc::Rc}; + +use mrubyedge::{ + rite::rite, + yamrb::{ + helpers::{mrb_define_class_cmethod, mrb_define_cmethod, mrb_funcall}, + prelude::hash::{mrb_hash_new, mrb_hash_set_index}, + value::{RObject, RSym, RValue}, + vm::VM, + }, +}; + +#[cfg(not(feature = "queue"))] +static MRB: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/app.mrb")); +#[cfg(feature = "queue")] +static MRB: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/consumer.mrb")); + +static mut MRUBY_VM: MaybeUninit = MaybeUninit::uninit(); +static mut MRUBY_VM_LOADED: bool = false; + +static mut ERROR_BUF: [u8; 4096] = [0; 4096]; + +fn set_error_to_buf(message: impl AsRef) -> *const u8 { + unsafe { + let bytes = message.as_ref().as_bytes(); + let len = bytes.len().min(ERROR_BUF.len() - 1); + ERROR_BUF[..len].copy_from_slice(&bytes[..len]); + ERROR_BUF[len] = 0; + ERROR_BUF.as_ptr() + } +} + +unsafe extern "C" { + unsafe fn debug_console_log(ptr: *const u8, len: usize); +} + +#[cfg(feature = "queue")] +unsafe extern "C" { + unsafe fn uzumibi_cf_message_ack(message_id_ptr: *const u8, message_id_size: usize) -> i32; + unsafe fn uzumibi_cf_message_retry( + message_id_ptr: *const u8, + message_id_size: usize, + delay_seconds: i32, + ) -> i32; +} + +#[cfg(feature = "enable-external")] +unsafe extern "C" { + unsafe fn uzumibi_cf_fetch( + url_ptr: *const u8, + url_size: usize, + method_ptr: *const u8, + method_size: usize, + body_ptr: *const u8, + body_size: usize, + result_ptr: *mut u8, + result_max_size: usize, + ) -> i32; + unsafe fn uzumibi_cf_durable_object_get( + key_ptr: *const u8, + key_size: usize, + result_ptr: *mut u8, + result_max_size: usize, + ) -> i32; + unsafe fn uzumibi_cf_durable_object_set( + key_ptr: *const u8, + key_size: usize, + value_ptr: *const u8, + value_size: usize, + ) -> i32; + unsafe fn uzumibi_cf_queue_send( + queue_name_ptr: *const u8, + queue_name_size: usize, + message_ptr: *const u8, + message_size: usize, + ) -> i32; +} + +fn debug_console_log_internal(message: &str) { + unsafe { + debug_console_log(message.as_ptr(), message.len()); + } +} + +// ---- External API wrappers (only when enable-external feature is active) ---- + +/// Packed response format (same as Uzumibi::Response#to_shared_memory): +/// u16 LE status_code +/// u16 LE headers_count +/// (u16 LE key_size, key bytes, u16 LE value_size, value bytes) * headers_count +/// u32 LE body_size +/// body bytes +#[cfg(feature = "enable-external")] +fn cf_fetch(url: &str, method: &str, body: &str) -> Result, String> { + const BUFFER_SIZE: usize = 65536; + let mut buffer = vec![0u8; BUFFER_SIZE]; + + unsafe { + let result = uzumibi_cf_fetch( + url.as_ptr(), + url.len(), + method.as_ptr(), + method.len(), + body.as_ptr(), + body.len(), + buffer.as_mut_ptr(), + BUFFER_SIZE, + ); + match result { + len if len >= 0 => { + let len = len as usize; + Ok(buffer[..len].to_vec()) + } + _ => Err(format!("Fetch failed with return code: {}", result)), + } + } +} + +#[cfg(feature = "enable-external")] +fn cf_durable_object_get(key: &str) -> Result, String> { + const BUFFER_SIZE: usize = 65536; + let mut buffer = vec![0u8; BUFFER_SIZE]; + + unsafe { + let result = uzumibi_cf_durable_object_get( + key.as_ptr(), + key.len(), + buffer.as_mut_ptr(), + BUFFER_SIZE, + ); + match result { + -1 => Ok(None), + len if len >= 0 => { + let len = len as usize; + let value = String::from_utf8(buffer[..len].to_vec()) + .map_err(|e| format!("Failed to decode UTF-8: {}", e))?; + Ok(Some(value)) + } + _ => Err(format!( + "Unexpected return value from durable_object_get: {}", + result + )), + } + } +} + +#[cfg(feature = "enable-external")] +fn cf_durable_object_set(key: &str, value: &str) -> Result<(), String> { + unsafe { + let result = + uzumibi_cf_durable_object_set(key.as_ptr(), key.len(), value.as_ptr(), value.len()); + match result { + 0 => Ok(()), + _ => Err(format!("Failed to set value: return code {}", result)), + } + } +} + +#[cfg(feature = "enable-external")] +fn cf_queue_send(queue_name: &str, message: &str) -> Result<(), String> { + unsafe { + let result = uzumibi_cf_queue_send( + queue_name.as_ptr(), + queue_name.len(), + message.as_ptr(), + message.len(), + ); + match result { + 0 => Ok(()), + _ => Err(format!( + "Failed to send queue message: return code {}", + result + )), + } + } +} + +// ---- mruby gem method implementations ---- + +fn uzumibi_kernel_debug_console_log( + vm: &mut VM, + args: &[Rc], +) -> Result, mrubyedge::Error> { + let msg_obj = &args[0]; + let msg = mrb_funcall(vm, msg_obj.clone().into(), "to_s", &[])?; + let msg: String = msg.as_ref().try_into()?; + unsafe { + debug_console_log(msg.as_ptr(), msg.len()); + } + Ok(RObject::nil().to_refcount_assigned()) +} + +/// Fetch.fetch(url, method="GET", body="") -> Uzumibi::Response +#[cfg(feature = "enable-external")] +fn uzumibi_fetch_class_fetch( + vm: &mut VM, + args: &[Rc], +) -> Result, mrubyedge::Error> { + let url_obj = &args[0]; + let url = mrb_funcall(vm, url_obj.clone().into(), "to_s", &[])?; + let url: String = url.as_ref().try_into()?; + + let method = if args.len() > 1 { + let m = mrb_funcall(vm, args[1].clone().into(), "to_s", &[])?; + let m: String = m.as_ref().try_into()?; + m + } else { + "GET".to_string() + }; + + let body = if args.len() > 2 { + let b = mrb_funcall(vm, args[2].clone().into(), "to_s", &[])?; + let b: String = b.as_ref().try_into()?; + b + } else { + String::new() + }; + + let packed = cf_fetch(&url, &method, &body) + .map_err(|e| mrubyedge::Error::RuntimeError(format!("Fetch failed: {}", e)))?; + + // Unpack the packed response into Uzumibi::Response + unpack_response_to_robject(vm, &packed) +} + +/// Unpack packed binary response into Uzumibi::Response mruby object +#[cfg(feature = "enable-external")] +fn unpack_response_to_robject(vm: &mut VM, buf: &[u8]) -> Result, mrubyedge::Error> { + let mut offset = 0; + + // Status code (u16 LE) + let status_code = u16::from_le_bytes([buf[offset], buf[offset + 1]]); + offset += 2; + + // Headers count (u16 LE) + let headers_count = u16::from_le_bytes([buf[offset], buf[offset + 1]]) as usize; + offset += 2; + + // Parse headers + let headers_hash = mrb_hash_new(vm, &[])?; + for _ in 0..headers_count { + let key_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]) as usize; + offset += 2; + let key = String::from_utf8_lossy(&buf[offset..offset + key_size]).to_string(); + offset += key_size; + + let value_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]) as usize; + offset += 2; + let value = String::from_utf8_lossy(&buf[offset..offset + value_size]).to_string(); + offset += value_size; + + mrb_hash_set_index( + headers_hash.clone(), + RObject::string(key).to_refcount_assigned(), + RObject::string(value).to_refcount_assigned(), + )?; + } + + // Body size (u32 LE) + let body_size = u32::from_le_bytes([ + buf[offset], + buf[offset + 1], + buf[offset + 2], + buf[offset + 3], + ]) as usize; + offset += 4; + + // Body + let body = String::from_utf8_lossy(&buf[offset..offset + body_size]).to_string(); + + // Create Uzumibi::Response instance + let uzumibi = vm + .get_const_by_name("Uzumibi") + .ok_or_else(|| mrubyedge::Error::RuntimeError("Uzumibi module not found".to_string()))?; + let uzumibi_module = match &uzumibi.as_ref().value { + RValue::Module(m) => m.clone(), + _ => { + return Err(mrubyedge::Error::RuntimeError( + "Uzumibi must be a module".to_string(), + )); + } + }; + let response_class = uzumibi_module + .get_const_by_name("Response") + .ok_or_else(|| { + mrubyedge::Error::RuntimeError("Uzumibi::Response class not found".to_string()) + })?; + let response = mrb_funcall(vm, Some(response_class), "new", &[])?; + + response.set_ivar( + "@status_code", + RObject::integer(status_code as i64).to_refcount_assigned(), + ); + response.set_ivar("@headers", headers_hash); + response.set_ivar("@body", RObject::string(body).to_refcount_assigned()); + + Ok(response) +} + +/// KV.get(key) +#[cfg(feature = "enable-external")] +fn uzumibi_kv_class_get( + vm: &mut VM, + args: &[Rc], +) -> Result, mrubyedge::Error> { + let key_obj = &args[0]; + let key = mrb_funcall(vm, key_obj.clone().into(), "to_s", &[])?; + let key: String = key.as_ref().try_into()?; + + match cf_durable_object_get(&key) { + Ok(Some(value)) => Ok(RObject::string(value).to_refcount_assigned()), + Ok(None) => Ok(RObject::nil().to_refcount_assigned()), + Err(e) => Err(mrubyedge::Error::RuntimeError(format!( + "Failed to access storage value: {}", + e + ))), + } +} + +/// KV.set(key, value) +#[cfg(feature = "enable-external")] +fn uzumibi_kv_class_set( + vm: &mut VM, + args: &[Rc], +) -> Result, mrubyedge::Error> { + let key_obj = &args[0]; + let key = mrb_funcall(vm, key_obj.clone().into(), "to_s", &[])?; + let key: String = key.as_ref().try_into()?; + + let value_obj = &args[1]; + let value = mrb_funcall(vm, value_obj.clone().into(), "to_s", &[])?; + let value: String = value.as_ref().try_into()?; + + cf_durable_object_set(&key, &value).map_err(|e| { + mrubyedge::Error::RuntimeError(format!("Failed to set storage value: {}", e)) + })?; + + Ok(RObject::boolean(true).to_refcount_assigned()) +} + +/// Queue.send(queue_name, message) +#[cfg(feature = "enable-external")] +fn uzumibi_queue_class_send( + vm: &mut VM, + args: &[Rc], +) -> Result, mrubyedge::Error> { + let queue_name_obj = &args[0]; + let queue_name = mrb_funcall(vm, queue_name_obj.clone().into(), "to_s", &[])?; + let queue_name: String = queue_name.as_ref().try_into()?; + + let message_obj = &args[1]; + let message = mrb_funcall(vm, message_obj.clone().into(), "to_s", &[])?; + let message: String = message.as_ref().try_into()?; + + cf_queue_send(&queue_name, &message).map_err(|e| { + mrubyedge::Error::RuntimeError(format!("Failed to send queue message: {}", e)) + })?; + + Ok(RObject::boolean(true).to_refcount_assigned()) +} + +// ---- Queue consumer support (only when queue feature is active) ---- + +/// Message.ack! -> delegates to JS +#[cfg(feature = "queue")] +fn uzumibi_message_ack( + vm: &mut VM, + _args: &[Rc], +) -> Result, mrubyedge::Error> { + let self_obj = vm.getself()?; + let id_obj = self_obj.get_ivar("@id"); + if matches!(id_obj.as_ref().value, RValue::Nil) { + return Err(mrubyedge::Error::RuntimeError( + "Message object does not have @id".to_string(), + )); + } + let id = mrb_funcall(vm, id_obj.into(), "to_s", &[])?; + let id: String = id.as_ref().try_into()?; + + unsafe { + let result = uzumibi_cf_message_ack(id.as_ptr(), id.len()); + if result != 0 { + return Err(mrubyedge::Error::RuntimeError(format!( + "Failed to ack message: return code {}", + result + ))); + } + } + Ok(RObject::boolean(true).to_refcount_assigned()) +} + +/// Message.retry(delay_seconds: N) -> delegates to JS +#[cfg(feature = "queue")] +fn uzumibi_message_retry( + vm: &mut VM, + _args: &[Rc], +) -> Result, mrubyedge::Error> { + let self_obj = vm.getself()?; + let id_obj = self_obj.get_ivar("@id"); + if matches!(id_obj.as_ref().value, RValue::Nil) { + return Err(mrubyedge::Error::RuntimeError( + "Message object does not have @id".to_string(), + )); + } + let id = mrb_funcall(vm, id_obj.into(), "to_s", &[])?; + let id: String = id.as_ref().try_into()?; + + let delay_seconds: i32 = match vm.get_kwargs() { + Some(kwargs) => match kwargs.get("delay_seconds") { + Some(val) => { + let v: i64 = val.as_ref().try_into()?; + v as i32 + } + None => 0, + }, + None => 0, + }; + + unsafe { + let result = uzumibi_cf_message_retry(id.as_ptr(), id.len(), delay_seconds); + if result != 0 { + return Err(mrubyedge::Error::RuntimeError(format!( + "Failed to retry message: return code {}", + result + ))); + } + } + Ok(RObject::boolean(true).to_refcount_assigned()) +} + +/// Consumer.on_receive(message) - abstract method, must be overridden +#[cfg(feature = "queue")] +fn uzumibi_consumer_on_receive( + _vm: &mut VM, + _args: &[Rc], +) -> Result, mrubyedge::Error> { + Err(mrubyedge::Error::RuntimeError( + "on_receive must be implemented by subclass of Uzumibi::Consumer".to_string(), + )) +} + +// ---- VM initialization ---- + +fn init_vm() -> Result { + let mut rite = rite::load(MRB) + .map_err(|e| mrubyedge::Error::RuntimeError(format!("Failed to load mruby: {:?}", e)))?; + let mut vm = VM::open(&mut rite); + uzumibi_gem::init::init_uzumibi(&mut vm); + let object = vm.object_class.clone(); + mrb_define_cmethod( + &mut vm, + object, + "debug_console", + Box::new(uzumibi_kernel_debug_console_log), + ); + + #[cfg(feature = "enable-external")] + { + let uzumibi_module = vm.get_module_by_name("Uzumibi"); + + // Uzumibi::Fetch.fetch(url, method="GET", body="") + let fetch_class = vm.define_class("Fetch", None, Some(uzumibi_module.clone())); + mrb_define_class_cmethod( + &mut vm, + fetch_class, + "fetch", + Box::new(uzumibi_fetch_class_fetch), + ); + + // Uzumibi::KV.get(key) / Uzumibi::KV.set(key, value) + let kv_class = vm.define_class("KV", None, Some(uzumibi_module.clone())); + mrb_define_class_cmethod( + &mut vm, + kv_class.clone(), + "get", + Box::new(uzumibi_kv_class_get), + ); + mrb_define_class_cmethod(&mut vm, kv_class, "set", Box::new(uzumibi_kv_class_set)); + + // Uzumibi::Queue.send(queue_name, message) + let queue_class = vm.define_class("Queue", None, Some(uzumibi_module)); + mrb_define_class_cmethod( + &mut vm, + queue_class, + "send", + Box::new(uzumibi_queue_class_send), + ); + } + + #[cfg(feature = "queue")] + { + let uzumibi_module = vm.get_module_by_name("Uzumibi"); + + // Uzumibi::Consumer (base class for user-defined consumers) + let consumer_class = vm.define_class("Consumer", None, Some(uzumibi_module.clone())); + mrb_define_cmethod( + &mut vm, + consumer_class, + "on_receive", + Box::new(uzumibi_consumer_on_receive), + ); + + // Uzumibi::Message with ack! and retry methods + let message_class = vm.define_class("Message", None, Some(uzumibi_module)); + let message_class_obj = RObject::class(message_class.clone(), &mut vm); + for attr in ["id", "timestamp", "body", "attempts"] { + mrb_funcall( + &mut vm, + Some(message_class_obj.clone()), + "attr_accessor", + &[RObject::symbol(RSym::new(attr.to_string())).to_refcount_assigned()], + ) + .expect("attr_accessor failed"); + } + mrb_define_cmethod( + &mut vm, + message_class.clone(), + "ack!", + Box::new(uzumibi_message_ack), + ); + mrb_define_cmethod( + &mut vm, + message_class, + "retry", + Box::new(uzumibi_message_retry), + ); + } + + vm.run() + .map_err(|e| mrubyedge::Error::RuntimeError(format!("Failed to init VM: {:?}", e)))?; + + Ok(vm) +} + +fn assume_init_vm() -> Result<&'static mut VM, mrubyedge::Error> { + unsafe { + if !MRUBY_VM_LOADED { + MRUBY_VM = MaybeUninit::new(init_vm()?); + MRUBY_VM_LOADED = true; + } + Ok(MRUBY_VM.assume_init_mut()) + } +} + +fn do_uzumibi_initialize_request(size: i32) -> Result<*mut u8, mrubyedge::Error> { + let vm = assume_init_vm()?; + let size = RObject::integer(size as i64).to_refcount_assigned(); + let app = vm + .globals + .get("$APP") + .ok_or_else(|| mrubyedge::Error::RuntimeError("$APP is not defined".to_string()))?; + let ret = mrb_funcall(vm, app.clone().into(), "initialize_request", &[size])?; + ret.as_ref().try_into() +} + +fn do_uzumibi_start_request() -> Result<*mut u8, mrubyedge::Error> { + debug_console_log_internal("uzumibi_start_request called"); + let vm = assume_init_vm()?; + let app = vm + .globals + .get("$APP") + .ok_or_else(|| mrubyedge::Error::RuntimeError("$APP is not defined".to_string()))?; + let ret = mrb_funcall( + vm, + app.clone().into(), + "start_request_and_return_shared_memory", + &[], + )?; + match &ret.as_ref().value { + RValue::SharedMemory(sm) => Ok(sm.borrow_mut().leak()), + _ => Err(mrubyedge::Error::RuntimeError( + "Returned value is not SharedMemory".to_string(), + )), + } +} + +#[unsafe(export_name = "uzumibi_initialize_request")] +unsafe extern "C" fn uzumibi_initialize_request(size: i32) -> u64 { + match do_uzumibi_initialize_request(size) { + Ok(ptr) => (ptr as u32) as u64, + Err(e) => { + let err_buf = set_error_to_buf(format!("Error in initialize_request: {}", e)); + ((err_buf as u32) as u64) << 32 + } + } +} + +#[unsafe(export_name = "uzumibi_start_request")] +unsafe extern "C" fn uzumibi_start_request() -> u64 { + match do_uzumibi_start_request() { + Ok(ptr) => (ptr as u32) as u64, + Err(e) => { + let err_buf = set_error_to_buf(format!("Error in start_request: {}", e)); + ((err_buf as u32) as u64) << 32 + } + } +} + +// ---- Queue message handling (only when queue feature is active) ---- + +/// Allocate a buffer for the message data. +/// Returns a pointer to the buffer (lower 32 bits) or error (upper 32 bits). +#[cfg(feature = "queue")] +static mut MESSAGE_BUF: Option> = None; + +#[cfg(feature = "queue")] +fn do_uzumibi_initialize_message(size: i32) -> Result<*mut u8, mrubyedge::Error> { + let _ = assume_init_vm()?; + unsafe { + MESSAGE_BUF = Some(vec![0u8; size as usize]); + Ok(MESSAGE_BUF.as_mut().unwrap().as_mut_ptr()) + } +} + +/// Unpack message from buffer and call $CONSUMER.on_receive(message). +/// Message binary format: +/// u16 LE id_size, id bytes, +/// u16 LE timestamp_size, timestamp bytes, +/// u32 LE body_size, body bytes, +/// u32 LE attempts +/// +/// Returns 0 on success, or a pointer to an error string. +#[cfg(feature = "queue")] +fn do_uzumibi_start_message() -> Result<(), mrubyedge::Error> { + let vm = assume_init_vm()?; + + let buf = unsafe { + MESSAGE_BUF.as_ref().ok_or_else(|| { + mrubyedge::Error::RuntimeError("Message buffer not initialized".to_string()) + })? + }; + + let mut offset = 0; + + // id (u16 LE size + bytes) + let id_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]) as usize; + offset += 2; + let id = String::from_utf8_lossy(&buf[offset..offset + id_size]).to_string(); + offset += id_size; + + // timestamp (u16 LE size + bytes) + let ts_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]) as usize; + offset += 2; + let timestamp = String::from_utf8_lossy(&buf[offset..offset + ts_size]).to_string(); + offset += ts_size; + + // body (u32 LE size + bytes) + let body_size = u32::from_le_bytes([ + buf[offset], + buf[offset + 1], + buf[offset + 2], + buf[offset + 3], + ]) as usize; + offset += 4; + let body = String::from_utf8_lossy(&buf[offset..offset + body_size]).to_string(); + offset += body_size; + + // attempts (u32 LE) + let attempts = u32::from_le_bytes([ + buf[offset], + buf[offset + 1], + buf[offset + 2], + buf[offset + 3], + ]) as i64; + + // Create Uzumibi::Message instance + let uzumibi = vm + .get_const_by_name("Uzumibi") + .ok_or_else(|| mrubyedge::Error::RuntimeError("Uzumibi module not found".to_string()))?; + let uzumibi_module = match &uzumibi.as_ref().value { + RValue::Module(m) => m.clone(), + _ => { + return Err(mrubyedge::Error::RuntimeError( + "Uzumibi must be a module".to_string(), + )); + } + }; + let message_class = uzumibi_module.get_const_by_name("Message").ok_or_else(|| { + mrubyedge::Error::RuntimeError("Uzumibi::Message class not found".to_string()) + })?; + let message = mrb_funcall(vm, Some(message_class), "new", &[])?; + + message.set_ivar("@id", RObject::string(id).to_refcount_assigned()); + message.set_ivar( + "@timestamp", + RObject::string(timestamp).to_refcount_assigned(), + ); + message.set_ivar("@body", RObject::string(body).to_refcount_assigned()); + message.set_ivar( + "@attempts", + RObject::integer(attempts).to_refcount_assigned(), + ); + + // Call $CONSUMER.on_receive(message) + let consumer = vm + .globals + .get("$CONSUMER") + .ok_or_else(|| mrubyedge::Error::RuntimeError("$CONSUMER is not defined".to_string()))?; + mrb_funcall(vm, consumer.clone().into(), "on_receive", &[message])?; + + Ok(()) +} + +#[cfg(feature = "queue")] +#[unsafe(export_name = "uzumibi_initialize_message")] +unsafe extern "C" fn uzumibi_initialize_message(size: i32) -> u64 { + match do_uzumibi_initialize_message(size) { + Ok(ptr) => (ptr as u32) as u64, + Err(e) => { + let err_buf = set_error_to_buf(format!("Error in initialize_message: {}", e)); + ((err_buf as u32) as u64) << 32 + } + } +} + +#[cfg(feature = "queue")] +#[unsafe(export_name = "uzumibi_start_message")] +unsafe extern "C" fn uzumibi_start_message() -> u32 { + match do_uzumibi_start_message() { + Ok(()) => 0, + Err(e) => set_error_to_buf(format!("Error in start_message: {}", e)) as u32, + } +} diff --git a/uzumibi-cli/templates/cloudflare/__features__/queue/wrangler.jsonc b/uzumibi-cli/templates/cloudflare/__features__/queue/wrangler.jsonc new file mode 100644 index 0000000..0342e93 --- /dev/null +++ b/uzumibi-cli/templates/cloudflare/__features__/queue/wrangler.jsonc @@ -0,0 +1,55 @@ +/** + * Queue consumer build config + * For more details on how to configure Wrangler, refer to: + * https://developers.cloudflare.com/workers/wrangler/configuration/ + */ +{ + "$schema": "node_modules/wrangler/config-schema.json", + "name": "$$PROJECT_NAME$$", + "main": "src/index.js", + "compatibility_date": "2025-12-30", + "observability": { + "enabled": true + }, + /** + * Durable Objects + * Used for Uzumibi::KV.get/set + * https://developers.cloudflare.com/durable-objects/ + */ + "durable_objects": { + "bindings": [ + { + "name": "UZUMIBI_KV_DATA", + "class_name": "UzumibiKVObject" + } + ] + }, + "migrations": [ + { + "tag": "v1", + "new_sqlite_classes": [ + "UzumibiKVObject" + ] + } + ], + /** + * Queues + * Used for Uzumibi::Queue.send (producer) and Uzumibi::Consumer (consumer) + * https://developers.cloudflare.com/queues/ + */ + "queues": { + "producers": [ + { + "binding": "UZUMIBI_QUEUE", + "queue": "$$PROJECT_NAME_KEBAB$$-queue" + } + ], + "consumers": [ + { + "queue": "$$PROJECT_NAME_KEBAB$$-queue", + "max_batch_size": 10, + "max_batch_timeout": 5 + } + ] + } +} From cbe17e3ad93dd60db55ccd44eb35206602a4fa7c Mon Sep 17 00:00:00 2001 From: Uchio Kondo Date: Mon, 9 Mar 2026 23:26:26 +0900 Subject: [PATCH 13/16] New RC2 --- Cargo.lock | 2 +- uzumibi-cli/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0e92b85..22f2e18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1915,7 +1915,7 @@ dependencies = [ [[package]] name = "uzumibi-cli" -version = "0.6.0-rc1" +version = "0.6.0-rc2" dependencies = [ "clap", "dialoguer", diff --git a/uzumibi-cli/Cargo.toml b/uzumibi-cli/Cargo.toml index 736eb7d..ca4ac53 100644 --- a/uzumibi-cli/Cargo.toml +++ b/uzumibi-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "uzumibi-cli" -version = "0.6.0-rc1" +version = "0.6.0-rc2" edition = "2024" authors = ["Uchio Kondo "] description = "Uzumibi CLI tool to generate serverless mruby/edge apps" From f355ce9304d85eec4c33836f883f2424aec5c014 Mon Sep 17 00:00:00 2001 From: Uchio Kondo Date: Mon, 9 Mar 2026 23:29:28 +0900 Subject: [PATCH 14/16] Update fmt --- uzumibi-cli/src/main.rs | 9 +- .../__features__/queue/package.json | 2 +- .../__features__/queue/src/index.js | 504 +++++++++--------- .../__features__/queue/wrangler.jsonc | 2 +- 4 files changed, 259 insertions(+), 258 deletions(-) diff --git a/uzumibi-cli/src/main.rs b/uzumibi-cli/src/main.rs index 6a55f78..a2f7063 100644 --- a/uzumibi-cli/src/main.rs +++ b/uzumibi-cli/src/main.rs @@ -380,12 +380,13 @@ fn print_project_next_steps(template: &str, project_name: &str, features: &[Stri println!(" \x1b[36mpnpm run deploy\x1b[0m"); if has_queue { println!(); - println!(" \x1b[33mNote:\x1b[0m This project uses queue feature (Cloudflare Queues consumer)."); - println!(" Edit \x1b[36mlib/consumer.rb\x1b[0m to implement your queue consumer logic."); - println!(" The following Uzumibi APIs are available in Ruby:"); println!( - " • \x1b[36mUzumibi::Consumer\x1b[0m → Base class for queue consumers" + " \x1b[33mNote:\x1b[0m This project uses queue feature (Cloudflare Queues consumer)." + ); + println!( + " Edit \x1b[36mlib/consumer.rb\x1b[0m to implement your queue consumer logic." ); + println!(" The following Uzumibi APIs are available in Ruby:"); println!( " • \x1b[36mUzumibi::Message#ack!\x1b[0m / \x1b[36m#retry(delay_seconds: N)\x1b[0m → Message control" ); diff --git a/uzumibi-cli/templates/cloudflare/__features__/queue/package.json b/uzumibi-cli/templates/cloudflare/__features__/queue/package.json index a51c779..3304c76 100644 --- a/uzumibi-cli/templates/cloudflare/__features__/queue/package.json +++ b/uzumibi-cli/templates/cloudflare/__features__/queue/package.json @@ -15,4 +15,4 @@ "vitest": "~3.2.0", "wrangler": "^4.54.0" } -} +} \ No newline at end of file diff --git a/uzumibi-cli/templates/cloudflare/__features__/queue/src/index.js b/uzumibi-cli/templates/cloudflare/__features__/queue/src/index.js index ba40e06..a3c60f1 100644 --- a/uzumibi-cli/templates/cloudflare/__features__/queue/src/index.js +++ b/uzumibi-cli/templates/cloudflare/__features__/queue/src/index.js @@ -8,259 +8,259 @@ const wasmModule = mod; * Durable Object for Uzumibi::KV storage */ export class UzumibiKVObject extends DurableObject { - async get(key) { - const value = await this.ctx.storage.get(key); - return value ?? null; - } - - async set(key, value) { - await this.ctx.storage.put(key, value); - } + async get(key) { + const value = await this.ctx.storage.get(key); + return value ?? null; + } + + async set(key, value) { + await this.ctx.storage.put(key, value); + } } export default { - async fetch(request, env, ctx) { - return new Response("This endpoint is for queue processing. Please send messages to the queue instead.", { status: 400 }); - }, - - async queue(batch, env, ctx) { - const decoder = new TextDecoder(); - const encoder = new TextEncoder(); - - // Durable Object stub (if binding exists) - const doStub = env.UZUMIBI_KV_DATA - ? env.UZUMIBI_KV_DATA.getByName("default") - : null; - - // Look up message by ID for concurrency safety - const getMessage = (id) => { - const message = batch.messages.find((m) => m.id === id); - if (!message) throw new Error(`Message not found for id: ${id}`); - return message; - }; - - const importObject = { - env: { - debug_console_log: (ptr, size) => { - const memory = exports.memory; - const buffer = new Uint8Array(memory.buffer, ptr, size); - console.log(`[debug]: ${decoder.decode(buffer)}`); - return 0; - }, - - // Fetch.fetch(url, method, body) -> packed Uzumibi::Response - uzumibi_cf_fetch: async ( - urlPtr, urlSize, - methodPtr, methodSize, - bodyPtr, bodySize, - resultPtr, resultMaxSize, - ) => { - const memory = exports.memory; - const url = decoder.decode(new Uint8Array(memory.buffer, urlPtr, urlSize)); - const method = decoder.decode(new Uint8Array(memory.buffer, methodPtr, methodSize)); - const body = bodySize > 0 - ? decoder.decode(new Uint8Array(memory.buffer, bodyPtr, bodySize)) - : null; - - const fetchOptions = { method }; - if (body && method !== "GET" && method !== "HEAD") { - fetchOptions.body = body; - } - - const response = await fetch(url, fetchOptions); - const responseBody = await response.text(); - - const respHeaders = []; - response.headers.forEach((value, key) => { - respHeaders.push({ key, value }); - }); - - const resultView = new DataView(memory.buffer, resultPtr, resultMaxSize); - const resultBuffer = new Uint8Array(memory.buffer, resultPtr, resultMaxSize); - let pos = 0; - - resultView.setUint16(pos, response.status, true); - pos += 2; - - resultView.setUint16(pos, respHeaders.length, true); - pos += 2; - - for (const header of respHeaders) { - const keyBytes = encoder.encode(header.key); - resultView.setUint16(pos, keyBytes.length, true); - pos += 2; - resultBuffer.set(keyBytes, pos); - pos += keyBytes.length; - - const valueBytes = encoder.encode(header.value); - resultView.setUint16(pos, valueBytes.length, true); - pos += 2; - resultBuffer.set(valueBytes, pos); - pos += valueBytes.length; - } - - const bodyBytes = encoder.encode(responseBody); - resultView.setUint32(pos, bodyBytes.length, true); - pos += 4; - - const bodyLen = Math.min(bodyBytes.length, resultMaxSize - pos); - resultBuffer.set(bodyBytes.slice(0, bodyLen), pos); - pos += bodyLen; - - return pos; - }, - - // KV.get(key) -> value string (via Durable Object) - uzumibi_cf_durable_object_get: async (keyPtr, keySize, resultPtr, resultMaxSize) => { - if (!doStub) return -1; - const memory = exports.memory; - const key = decoder.decode(new Uint8Array(memory.buffer, keyPtr, keySize)); - - const value = await doStub.get(key); - if (value === null) return -1; - - const valueBytes = encoder.encode(value); - const length = Math.min(valueBytes.length, resultMaxSize); - const resultBuffer = new Uint8Array(memory.buffer, resultPtr, resultMaxSize); - resultBuffer.set(valueBytes.slice(0, length)); - return length; - }, - - // KV.set(key, value) (via Durable Object) - uzumibi_cf_durable_object_set: async (keyPtr, keySize, valuePtr, valueSize) => { - if (!doStub) return -1; - const memory = exports.memory; - const key = decoder.decode(new Uint8Array(memory.buffer, keyPtr, keySize)); - const value = decoder.decode(new Uint8Array(memory.buffer, valuePtr, valueSize)); - - await doStub.set(key, value); - return 0; - }, - - // Queue.send(queue_name, message) - uzumibi_cf_queue_send: async (queueNamePtr, queueNameSize, messagePtr, messageSize) => { - const memory = exports.memory; - const queueName = decoder.decode(new Uint8Array(memory.buffer, queueNamePtr, queueNameSize)); - const message = decoder.decode(new Uint8Array(memory.buffer, messagePtr, messageSize)); - - const queue = env[queueName]; - if (!queue) { - console.error(`Queue binding '${queueName}' not found`); - return -1; - } - await queue.send(message); - return 0; - }, - - uzumibi_cf_message_ack: async (idPtr, idSize) => { - const id = decoder.decode(new Uint8Array(exports.memory.buffer, idPtr, idSize)); - getMessage(id).ack(); - return 0; - }, - - uzumibi_cf_message_retry: async (idPtr, idSize, delaySeconds) => { - const id = decoder.decode(new Uint8Array(exports.memory.buffer, idPtr, idSize)); - getMessage(id).retry({ delaySeconds }); - return 0; - }, - }, - }; - - const instance = await instantiate(wasmModule, importObject); - const exports = instance.exports; - - for (const message of batch.messages) { - const idBytes = encoder.encode(message.id); - const timestampBytes = encoder.encode( - message.timestamp.toISOString(), - ); - const bodyBytes = encoder.encode( - typeof message.body === "string" - ? message.body - : JSON.stringify(message.body), - ); - const attempts = message.attempts; - - // Pack message data: - // u16 LE id_size, id bytes, - // u16 LE timestamp_size, timestamp bytes, - // u32 LE body_size, body bytes, - // u32 LE attempts - const totalSize = - 2 + - idBytes.length + - 2 + - timestampBytes.length + - 4 + - bodyBytes.length + - 4; - - const msgResult = - await exports.uzumibi_initialize_message(totalSize); - const msgOffset = Number(msgResult & 0xffffffffn); - if (msgOffset === 0) { - const errOffset = Number( - (msgResult >> 32n) & 0xffffffffn, - ); - const buffer = new Uint8Array( - exports.memory.buffer, - errOffset, - ); - let errStr = ""; - for (let i = 0; buffer[i] !== 0; i++) { - errStr += String.fromCharCode(buffer[i]); - } - throw new Error( - `Failed to initialize message: ${errStr}`, - ); - } - - const msgBuffer = new Uint8Array( - exports.memory.buffer, - msgOffset, - totalSize, - ); - const dataView = new DataView( - exports.memory.buffer, - msgOffset, - ); - let pos = 0; - - // id - dataView.setUint16(pos, idBytes.length, true); - pos += 2; - msgBuffer.set(idBytes, pos); - pos += idBytes.length; - - // timestamp - dataView.setUint16(pos, timestampBytes.length, true); - pos += 2; - msgBuffer.set(timestampBytes, pos); - pos += timestampBytes.length; - - // body - dataView.setUint32(pos, bodyBytes.length, true); - pos += 4; - msgBuffer.set(bodyBytes, pos); - pos += bodyBytes.length; - - // attempts - dataView.setUint32(pos, attempts, true); - - const result = await exports.uzumibi_start_message(); - if (result !== 0) { - const buffer = new Uint8Array( - exports.memory.buffer, - result, - ); - let errStr = ""; - for (let i = 0; buffer[i] !== 0; i++) { - errStr += String.fromCharCode(buffer[i]); - } - throw new Error( - `Failed to process message: ${errStr}`, - ); - } - } - }, + async fetch(request, env, ctx) { + return new Response("This endpoint is for queue processing. Please send messages to the queue instead.", { status: 400 }); + }, + + async queue(batch, env, ctx) { + const decoder = new TextDecoder(); + const encoder = new TextEncoder(); + + // Durable Object stub (if binding exists) + const doStub = env.UZUMIBI_KV_DATA + ? env.UZUMIBI_KV_DATA.getByName("default") + : null; + + // Look up message by ID for concurrency safety + const getMessage = (id) => { + const message = batch.messages.find((m) => m.id === id); + if (!message) throw new Error(`Message not found for id: ${id}`); + return message; + }; + + const importObject = { + env: { + debug_console_log: (ptr, size) => { + const memory = exports.memory; + const buffer = new Uint8Array(memory.buffer, ptr, size); + console.log(`[debug]: ${decoder.decode(buffer)}`); + return 0; + }, + + // Fetch.fetch(url, method, body) -> packed Uzumibi::Response + uzumibi_cf_fetch: async ( + urlPtr, urlSize, + methodPtr, methodSize, + bodyPtr, bodySize, + resultPtr, resultMaxSize, + ) => { + const memory = exports.memory; + const url = decoder.decode(new Uint8Array(memory.buffer, urlPtr, urlSize)); + const method = decoder.decode(new Uint8Array(memory.buffer, methodPtr, methodSize)); + const body = bodySize > 0 + ? decoder.decode(new Uint8Array(memory.buffer, bodyPtr, bodySize)) + : null; + + const fetchOptions = { method }; + if (body && method !== "GET" && method !== "HEAD") { + fetchOptions.body = body; + } + + const response = await fetch(url, fetchOptions); + const responseBody = await response.text(); + + const respHeaders = []; + response.headers.forEach((value, key) => { + respHeaders.push({ key, value }); + }); + + const resultView = new DataView(memory.buffer, resultPtr, resultMaxSize); + const resultBuffer = new Uint8Array(memory.buffer, resultPtr, resultMaxSize); + let pos = 0; + + resultView.setUint16(pos, response.status, true); + pos += 2; + + resultView.setUint16(pos, respHeaders.length, true); + pos += 2; + + for (const header of respHeaders) { + const keyBytes = encoder.encode(header.key); + resultView.setUint16(pos, keyBytes.length, true); + pos += 2; + resultBuffer.set(keyBytes, pos); + pos += keyBytes.length; + + const valueBytes = encoder.encode(header.value); + resultView.setUint16(pos, valueBytes.length, true); + pos += 2; + resultBuffer.set(valueBytes, pos); + pos += valueBytes.length; + } + + const bodyBytes = encoder.encode(responseBody); + resultView.setUint32(pos, bodyBytes.length, true); + pos += 4; + + const bodyLen = Math.min(bodyBytes.length, resultMaxSize - pos); + resultBuffer.set(bodyBytes.slice(0, bodyLen), pos); + pos += bodyLen; + + return pos; + }, + + // KV.get(key) -> value string (via Durable Object) + uzumibi_cf_durable_object_get: async (keyPtr, keySize, resultPtr, resultMaxSize) => { + if (!doStub) return -1; + const memory = exports.memory; + const key = decoder.decode(new Uint8Array(memory.buffer, keyPtr, keySize)); + + const value = await doStub.get(key); + if (value === null) return -1; + + const valueBytes = encoder.encode(value); + const length = Math.min(valueBytes.length, resultMaxSize); + const resultBuffer = new Uint8Array(memory.buffer, resultPtr, resultMaxSize); + resultBuffer.set(valueBytes.slice(0, length)); + return length; + }, + + // KV.set(key, value) (via Durable Object) + uzumibi_cf_durable_object_set: async (keyPtr, keySize, valuePtr, valueSize) => { + if (!doStub) return -1; + const memory = exports.memory; + const key = decoder.decode(new Uint8Array(memory.buffer, keyPtr, keySize)); + const value = decoder.decode(new Uint8Array(memory.buffer, valuePtr, valueSize)); + + await doStub.set(key, value); + return 0; + }, + + // Queue.send(queue_name, message) + uzumibi_cf_queue_send: async (queueNamePtr, queueNameSize, messagePtr, messageSize) => { + const memory = exports.memory; + const queueName = decoder.decode(new Uint8Array(memory.buffer, queueNamePtr, queueNameSize)); + const message = decoder.decode(new Uint8Array(memory.buffer, messagePtr, messageSize)); + + const queue = env[queueName]; + if (!queue) { + console.error(`Queue binding '${queueName}' not found`); + return -1; + } + await queue.send(message); + return 0; + }, + + uzumibi_cf_message_ack: async (idPtr, idSize) => { + const id = decoder.decode(new Uint8Array(exports.memory.buffer, idPtr, idSize)); + getMessage(id).ack(); + return 0; + }, + + uzumibi_cf_message_retry: async (idPtr, idSize, delaySeconds) => { + const id = decoder.decode(new Uint8Array(exports.memory.buffer, idPtr, idSize)); + getMessage(id).retry({ delaySeconds }); + return 0; + }, + }, + }; + + const instance = await instantiate(wasmModule, importObject); + const exports = instance.exports; + + for (const message of batch.messages) { + const idBytes = encoder.encode(message.id); + const timestampBytes = encoder.encode( + message.timestamp.toISOString(), + ); + const bodyBytes = encoder.encode( + typeof message.body === "string" + ? message.body + : JSON.stringify(message.body), + ); + const attempts = message.attempts; + + // Pack message data: + // u16 LE id_size, id bytes, + // u16 LE timestamp_size, timestamp bytes, + // u32 LE body_size, body bytes, + // u32 LE attempts + const totalSize = + 2 + + idBytes.length + + 2 + + timestampBytes.length + + 4 + + bodyBytes.length + + 4; + + const msgResult = + await exports.uzumibi_initialize_message(totalSize); + const msgOffset = Number(msgResult & 0xffffffffn); + if (msgOffset === 0) { + const errOffset = Number( + (msgResult >> 32n) & 0xffffffffn, + ); + const buffer = new Uint8Array( + exports.memory.buffer, + errOffset, + ); + let errStr = ""; + for (let i = 0; buffer[i] !== 0; i++) { + errStr += String.fromCharCode(buffer[i]); + } + throw new Error( + `Failed to initialize message: ${errStr}`, + ); + } + + const msgBuffer = new Uint8Array( + exports.memory.buffer, + msgOffset, + totalSize, + ); + const dataView = new DataView( + exports.memory.buffer, + msgOffset, + ); + let pos = 0; + + // id + dataView.setUint16(pos, idBytes.length, true); + pos += 2; + msgBuffer.set(idBytes, pos); + pos += idBytes.length; + + // timestamp + dataView.setUint16(pos, timestampBytes.length, true); + pos += 2; + msgBuffer.set(timestampBytes, pos); + pos += timestampBytes.length; + + // body + dataView.setUint32(pos, bodyBytes.length, true); + pos += 4; + msgBuffer.set(bodyBytes, pos); + pos += bodyBytes.length; + + // attempts + dataView.setUint32(pos, attempts, true); + + const result = await exports.uzumibi_start_message(); + if (result !== 0) { + const buffer = new Uint8Array( + exports.memory.buffer, + result, + ); + let errStr = ""; + for (let i = 0; buffer[i] !== 0; i++) { + errStr += String.fromCharCode(buffer[i]); + } + throw new Error( + `Failed to process message: ${errStr}`, + ); + } + } + }, }; diff --git a/uzumibi-cli/templates/cloudflare/__features__/queue/wrangler.jsonc b/uzumibi-cli/templates/cloudflare/__features__/queue/wrangler.jsonc index 0342e93..7f1510f 100644 --- a/uzumibi-cli/templates/cloudflare/__features__/queue/wrangler.jsonc +++ b/uzumibi-cli/templates/cloudflare/__features__/queue/wrangler.jsonc @@ -52,4 +52,4 @@ } ] } -} +} \ No newline at end of file From 85035de670d3e214f1cb2cae52725e205662e572 Mon Sep 17 00:00:00 2001 From: Uchio Kondo Date: Mon, 9 Mar 2026 23:31:16 +0900 Subject: [PATCH 15/16] RC2... --- uzumibi-cli/tests/runn/help.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uzumibi-cli/tests/runn/help.yml b/uzumibi-cli/tests/runn/help.yml index 4d8c9e8..ce583f5 100644 --- a/uzumibi-cli/tests/runn/help.yml +++ b/uzumibi-cli/tests/runn/help.yml @@ -1,7 +1,7 @@ desc: Test uzumibi CLI help command vars: binary: ${UZUMIBI_TEST_BINARY:-../target/release/uzumibi} - version: 0.6.0-rc1 + version: 0.6.0-rc2 steps: build: desc: Build release binary From 5a0a07a4166a683ce3a55e5dda94f846b8e9d938 Mon Sep 17 00:00:00 2001 From: Uchio Kondo Date: Tue, 10 Mar 2026 00:19:44 +0900 Subject: [PATCH 16/16] Added kill server --- uzumibi-cli/tests/runn/new_cloudflare.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/uzumibi-cli/tests/runn/new_cloudflare.yml b/uzumibi-cli/tests/runn/new_cloudflare.yml index 6317c8b..5aff7b3 100644 --- a/uzumibi-cli/tests/runn/new_cloudflare.yml +++ b/uzumibi-cli/tests/runn/new_cloudflare.yml @@ -88,6 +88,12 @@ steps: current.res.status == 200 && current.res.rawBody contains 'It works!' + kill_server: + desc: Kill the dev server + exec: + command: kill -INT $(ps -ef | grep 'wrangler.js de[v]' | awk '{print $2}') || true + test: current.exit_code == 0 + cleanup_after: desc: Clean up test directory exec: