diff --git a/examples/trace_call_watch.py b/examples/trace_call_watch.py new file mode 100644 index 0000000..5fa9eed --- /dev/null +++ b/examples/trace_call_watch.py @@ -0,0 +1,100 @@ +import os +import asyncio +import hypersync +from dotenv import load_dotenv +from hypersync import TraceField + +load_dotenv() + +ADDR = "1e037f97d730Cc881e77F01E409D828b0bb14de0" +USDC = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48" +BALANCE_OF_SIGHASH = "0x70a08231" +SIG_IN = "balanceOf(address)" +SIG_OUT = "balanceOf(address)(uint256)" + + +def make_client(): + token = os.getenv("ENVIO_API_TOKEN") + if not token: + raise ValueError("ENVIO_API_TOKEN is required") + return hypersync.HypersyncClient( + hypersync.ClientConfig( + url="https://eth.hypersync.xyz", + api_token=token, + ) + ) + + +async def main(): + client = make_client() + height = await client.get_height() + + # recent window; increase if you get no traces + from_block = max(0, height - 500_000) + + query = hypersync.Query( + from_block=from_block, + to_block=height, + traces=[ + hypersync.TraceSelection( + to=[USDC], + sighash=[BALANCE_OF_SIGHASH], + ) + ], + field_selection=hypersync.FieldSelection( + trace=[ + TraceField.INPUT, + TraceField.OUTPUT, + TraceField.TO, + TraceField.SIGHASH, + TraceField.CALL_TYPE, + TraceField.TRANSACTION_HASH, + ] + ), + max_num_traces=50, + ) + + res = await client.get(query) + traces = [t for t in res.data.traces if t.input and t.output] + + print(f"got traces={len(res.data.traces)} usable={len(traces)}") + if not traces: + print("No usable traces with both input+output. Increase from_block window.") + return + + decoder = hypersync.CallDecoder([SIG_IN]) + + # 1) decode input (should contain the address argument) + decoded_inputs = decoder.decode_traces_input_sync(traces) + + # filter to traces where the decoded address equals your ADDR + target = "0x" + ADDR.lower() + filtered_traces = [] + for tr, din in zip(traces, decoded_inputs): + if not din: + continue + # din[0].val should be the address argument + if isinstance(din[0].val, str) and din[0].val.lower() == target: + filtered_traces.append(tr) + + print(f"traces calling balanceOf({target}) = {len(filtered_traces)}") + if not filtered_traces: + print("No traces found for that specific holder address in this window.") + print("Still: decode_traces_output_sync can be tested without this filter.") + filtered_traces = traces[:5] + + # 2) decode output (uint256 balance) + sigs = [SIG_OUT] * len(filtered_traces) + decoded_outputs = decoder.decode_traces_output_sync(filtered_traces, sigs) + + for i, (tr, dout) in enumerate(zip(filtered_traces[:5], decoded_outputs[:5])): + print(f"\n[{i}] tx={tr.transaction_hash} call_type={tr.call_type} to={tr.to} sighash={tr.sighash}") + print(f" output={tr.output}") + if dout is None: + print(" decoded_output=None (missing output or signature mismatch)") + else: + print(f" decoded_output_uint256={dout[0].val}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/hypersync/__init__.py b/hypersync/__init__.py index 5d0ab40..7bfc878 100644 --- a/hypersync/__init__.py +++ b/hypersync/__init__.py @@ -205,12 +205,12 @@ def disable_checksummed_addresses(self): self.inner.disable_checksummed_addresses() async def decode_inputs(self, inputs: list[str]) -> list[list[DecodedSolValue]]: - """Parse log and return decoded event. Returns None if topic0 not found.""" - return await self.inner.decode_inputs(input) + """Decode ABI-encoded calldata strings; forwards to the Rust CallDecoder.""" + return await self.inner.decode_inputs(inputs) - def decode_inputs_sync(self, inputs: list[str]) -> list[list[DecodedSolValue]]: - """Parse log and return decoded event. Returns None if topic0 not found.""" - return self.inner.decode_input_syncs(input) + def decode_inputs_sync(self, inputs: list[str]) -> list[Optional[list[DecodedSolValue]]]: + """Decode ABI-encoded calldata strings; forwards to the Rust CallDecoder.""" + return self.inner.decode_inputs_sync(inputs) async def decode_transactions_input( self, txs: list[Transaction] @@ -236,6 +236,54 @@ def decode_traces_input_sync( """Parse log and return decoded event. Returns None if topic0 not found.""" return self.inner.decode_traces_input_sync(traces) + async def decode_outputs( + self, outputs: list[str], signatures: list[str] + ) -> list[Optional[list[DecodedSolValue]]]: + """Decode ABI-encoded return data using the provided function signatures. + + Each output hex string is decoded using the corresponding entry in + signatures. The signature must include output types, e.g. + 'balanceOf(address)(uint256)'. Returns None for entries where decoding + fails or the output is empty. + """ + return await self.inner.decode_outputs(outputs, signatures) + + def decode_outputs_sync( + self, outputs: list[str], signatures: list[str] + ) -> list[Optional[list[DecodedSolValue]]]: + """Decode ABI-encoded return data using the provided function signatures. + + Each output hex string is decoded using the corresponding entry in + signatures. The signature must include output types, e.g. + 'balanceOf(address)(uint256)'. Returns None for entries where decoding + fails or the output is empty. + """ + return self.inner.decode_outputs_sync(outputs, signatures) + + async def decode_traces_output( + self, traces: list[Trace], signatures: list[str] + ) -> list[Optional[list[DecodedSolValue]]]: + """Decode ABI-encoded output data from traces using the provided function signatures. + + Each trace's output field is decoded using the corresponding entry in + signatures. The signature must include output types, e.g. + 'balanceOf(address)(uint256)'. Returns None for traces with no output + or where decoding fails. + """ + return await self.inner.decode_traces_output(traces, signatures) + + def decode_traces_output_sync( + self, traces: list[Trace], signatures: list[str] + ) -> list[Optional[list[DecodedSolValue]]]: + """Decode ABI-encoded output data from traces using the provided function signatures. + + Each trace's output field is decoded using the corresponding entry in + signatures. The signature must include output types, e.g. + 'balanceOf(address)(uint256)'. Returns None for traces with no output + or where decoding fails. + """ + return self.inner.decode_traces_output_sync(traces, signatures) + class DataType(StrEnum): """Data types supported for mapping.""" @@ -823,7 +871,7 @@ class ArrowStream(object): # receive the next response, returns None if the stream is finished async def recv(self) -> Optional[ArrowResponse]: - await self.inner.recv() + return await self.inner.recv() # close the stream so it doesn't keep loading data in the background async def close(self): @@ -835,7 +883,7 @@ class EventStream(object): # receive the next response, returns None if the stream is finished async def recv(self) -> Optional[EventResponse]: - await self.inner.recv() + return await self.inner.recv() # close the stream so it doesn't keep loading data in the background async def close(self): diff --git a/src/decode_call.rs b/src/decode_call.rs index 3f1e178..e37bcf5 100644 --- a/src/decode_call.rs +++ b/src/decode_call.rs @@ -131,4 +131,91 @@ impl CallDecoder { .collect() }) } + + pub fn decode_outputs<'py>( + &self, + outputs: Vec, + signatures: Vec, + py: Python<'py>, + ) -> PyResult> { + let decoder = self.clone(); + + future_into_py(py, async move { + let result = tokio::task::spawn_blocking(move || { + Python::attach(|py| decoder.decode_outputs_sync(outputs, signatures, py)) + }) + .await + .unwrap(); + Ok(result) + }) + } + + pub fn decode_traces_output<'py>( + &self, + traces: Vec, + signatures: Vec, + py: Python<'py>, + ) -> PyResult> { + let decoder = self.clone(); + + future_into_py(py, async move { + let result = tokio::task::spawn_blocking(move || { + Python::attach(|py| decoder.decode_traces_output_sync(traces, signatures, py)) + }) + .await + .unwrap(); + Ok(result) + }) + } + + pub fn decode_outputs_sync( + &self, + outputs: Vec, + signatures: Vec, + py: Python, + ) -> Vec>> { + assert_eq!( ++ outputs.len(), ++ signatures.len(), ++ "outputs and signatures must have the same length" ++ ); + outputs + .into_iter() + .zip(signatures.into_iter()) + .map(|(output, sig)| self.decode_output_impl(output.as_str(), sig.as_str(), py)) + .collect() + } + + pub fn decode_traces_output_sync( + &self, + traces: Vec, + signatures: Vec, + py: Python, + ) -> Vec>> { + traces + .into_iter() + .zip(signatures.into_iter()) + .map(|(trace, sig)| { + trace + .output + .as_ref() + .and_then(|out| self.decode_output_impl(out.as_str(), sig.as_str(), py)) + }) + .collect() + } + + pub fn decode_output_impl(&self, output: &str, signature: &str, py: Python) -> Option> { + let data = Data::decode_hex(output).context("decode output").unwrap(); + let decoded_output = self + .inner + .decode_output(&data, signature) + .context("decode output") + .unwrap(); + decoded_output.map(|decoded| { + decoded + .into_iter() + .map(|value| DecodedSolValue::new(py, value, self.checksummed_addresses)) + .collect() + }) + } }