From 4eae58b84e8b999e765397a1d0d9646074296cc1 Mon Sep 17 00:00:00 2001 From: Pratham-gluex Date: Thu, 2 Apr 2026 10:24:33 +0000 Subject: [PATCH 1/4] Expose CallDecoder decode_output APIs --- examples/trace_call_watch.py | 100 +++++++++++++++++++++++++++++++++++ hypersync/__init__.py | 64 +++++++++++++++++++--- src/decode_call.rs | 82 ++++++++++++++++++++++++++++ 3 files changed, 238 insertions(+), 8 deletions(-) create mode 100644 examples/trace_call_watch.py diff --git a/examples/trace_call_watch.py b/examples/trace_call_watch.py new file mode 100644 index 0000000..9ca19a3 --- /dev/null +++ b/examples/trace_call_watch.py @@ -0,0 +1,100 @@ +import os +import asyncio +import hypersync + +from hypersync import TraceField + + + +ADDR = "1e037f97d730Cc881e77F01E409D828b0bb14de0" +USDC = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48" +BALANCE_OF_SIGHASH = "0x70a08231" +SIG_IN = "balanceOf(address)" +SIG_OUT = "balanceOf(address)(uint256)" + + +def make_client(): + token = "5864d512-089a-4841-b4be-082bb19bc060" + if not token: + raise ValueError("ENVIO_API_TOKEN is required") + return hypersync.HypersyncClient( + hypersync.ClientConfig( + url="https://eth.hypersync.xyz", + api_token=token, + ) + ) + + +async def main(): + client = make_client() + height = await client.get_height() + + # recent window; increase if you get no traces + from_block = max(0, height - 500_000) + + query = hypersync.Query( + from_block=from_block, + to_block=height, + traces=[ + hypersync.TraceSelection( + to=[USDC], + sighash=[BALANCE_OF_SIGHASH], + ) + ], + field_selection=hypersync.FieldSelection( + trace=[ + TraceField.INPUT, + TraceField.OUTPUT, + TraceField.TO, + TraceField.SIGHASH, + TraceField.CALL_TYPE, + TraceField.TRANSACTION_HASH, + ] + ), + max_num_traces=50, + ) + + res = await client.get(query) + traces = [t for t in res.data.traces if t.input and t.output] + + print(f"got traces={len(res.data.traces)} usable={len(traces)}") + if not traces: + print("No usable traces with both input+output. Increase from_block window.") + return + + decoder = hypersync.CallDecoder([SIG_IN]) + + # 1) decode input (should contain the address argument) + decoded_inputs = decoder.decode_traces_input_sync(traces) + + # filter to traces where the decoded address equals your ADDR + target = "0x" + ADDR.lower() + filtered_traces = [] + for tr, din in zip(traces, decoded_inputs): + if not din: + continue + # din[0].val should be the address argument + if isinstance(din[0].val, str) and din[0].val.lower() == target: + filtered_traces.append(tr) + + print(f"traces calling balanceOf({target}) = {len(filtered_traces)}") + if not filtered_traces: + print("No traces found for that specific holder address in this window.") + print("Still: decode_traces_output_sync can be tested without this filter.") + filtered_traces = traces[:5] + + # 2) decode output (uint256 balance) + sigs = [SIG_OUT] * len(filtered_traces) + decoded_outputs = decoder.decode_traces_output_sync(filtered_traces, sigs) + + for i, (tr, dout) in enumerate(zip(filtered_traces[:5], decoded_outputs[:5])): + print(f"\n[{i}] tx={tr.transaction_hash} call_type={tr.call_type} to={tr.to} sighash={tr.sighash}") + print(f" output={tr.output}") + if dout is None: + print(" decoded_output=None (missing output or signature mismatch)") + else: + print(f" decoded_output_uint256={dout[0].val}") + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/hypersync/__init__.py b/hypersync/__init__.py index 5d0ab40..862b1a0 100644 --- a/hypersync/__init__.py +++ b/hypersync/__init__.py @@ -205,12 +205,12 @@ def disable_checksummed_addresses(self): self.inner.disable_checksummed_addresses() async def decode_inputs(self, inputs: list[str]) -> list[list[DecodedSolValue]]: - """Parse log and return decoded event. Returns None if topic0 not found.""" - return await self.inner.decode_inputs(input) + """Decode ABI-encoded calldata strings; forwards to the Rust CallDecoder.""" + return await self.inner.decode_inputs(inputs) - def decode_inputs_sync(self, inputs: list[str]) -> list[list[DecodedSolValue]]: - """Parse log and return decoded event. Returns None if topic0 not found.""" - return self.inner.decode_input_syncs(input) + def decode_inputs_sync(self, inputs: list[str]) -> list[Optional[list[DecodedSolValue]]]: + """Decode ABI-encoded calldata strings; forwards to the Rust CallDecoder.""" + return self.inner.decode_inputs_sync(inputs) async def decode_transactions_input( self, txs: list[Transaction] @@ -236,6 +236,54 @@ def decode_traces_input_sync( """Parse log and return decoded event. Returns None if topic0 not found.""" return self.inner.decode_traces_input_sync(traces) + async def decode_outputs( + self, outputs: list[str], signatures: list[str] + ) -> list[Optional[list[DecodedSolValue]]]: + """Decode ABI-encoded return data using the provided function signatures. + + Each output hex string is decoded using the corresponding entry in + signatures. The signature must include output types, e.g. + 'balanceOf(address)(uint256)'. Returns None for entries where decoding + fails or the output is empty. + """ + return await self.inner.decode_outputs(outputs, signatures) + + def decode_outputs_sync( + self, outputs: list[str], signatures: list[str] + ) -> list[Optional[list[DecodedSolValue]]]: + """Decode ABI-encoded return data using the provided function signatures. + + Each output hex string is decoded using the corresponding entry in + signatures. The signature must include output types, e.g. + 'balanceOf(address)(uint256)'. Returns None for entries where decoding + fails or the output is empty. + """ + return self.inner.decode_outputs_sync(outputs, signatures) + + async def decode_traces_output( + self, traces: list[Trace], signatures: list[str] + ) -> list[Optional[list[DecodedSolValue]]]: + """Decode ABI-encoded output data from traces using the provided function signatures. + + Each trace's output field is decoded using the corresponding entry in + signatures. The signature must include output types, e.g. + 'balanceOf(address)(uint256)'. Returns None for traces with no output + or where decoding fails. + """ + return await self.inner.decode_traces_output(traces, signatures) + + def decode_traces_output_sync( + self, traces: list[Trace], signatures: list[str] + ) -> list[Optional[list[DecodedSolValue]]]: + """Decode ABI-encoded output data from traces using the provided function signatures. + + Each trace's output field is decoded using the corresponding entry in + signatures. The signature must include output types, e.g. + 'balanceOf(address)(uint256)'. Returns None for traces with no output + or where decoding fails. + """ + return self.inner.decode_traces_output_sync(traces, signatures) + class DataType(StrEnum): """Data types supported for mapping.""" @@ -823,7 +871,7 @@ class ArrowStream(object): # receive the next response, returns None if the stream is finished async def recv(self) -> Optional[ArrowResponse]: - await self.inner.recv() + return await self.inner.recv() # close the stream so it doesn't keep loading data in the background async def close(self): @@ -835,7 +883,7 @@ class EventStream(object): # receive the next response, returns None if the stream is finished async def recv(self) -> Optional[EventResponse]: - await self.inner.recv() + return await self.inner.recv() # close the stream so it doesn't keep loading data in the background async def close(self): @@ -847,7 +895,7 @@ class QueryResponseStream(object): # receive the next response, returns None if the stream is finished async def recv(self) -> Optional[QueryResponse]: - await self.inner.recv() + await self.inner.recv() # close the stream so it doesn't keep loading data in the background async def close(self): diff --git a/src/decode_call.rs b/src/decode_call.rs index 3f1e178..2b924f9 100644 --- a/src/decode_call.rs +++ b/src/decode_call.rs @@ -131,4 +131,86 @@ impl CallDecoder { .collect() }) } + + pub fn decode_outputs<'py>( + &self, + outputs: Vec, + signatures: Vec, + py: Python<'py>, + ) -> PyResult> { + let decoder = self.clone(); + + future_into_py(py, async move { + let result = tokio::task::spawn_blocking(move || { + Python::attach(|py| decoder.decode_outputs_sync(outputs, signatures, py)) + }) + .await + .unwrap(); + Ok(result) + }) + } + + pub fn decode_traces_output<'py>( + &self, + traces: Vec, + signatures: Vec, + py: Python<'py>, + ) -> PyResult> { + let decoder = self.clone(); + + future_into_py(py, async move { + let result = tokio::task::spawn_blocking(move || { + Python::attach(|py| decoder.decode_traces_output_sync(traces, signatures, py)) + }) + .await + .unwrap(); + Ok(result) + }) + } + + pub fn decode_outputs_sync( + &self, + outputs: Vec, + signatures: Vec, + py: Python, + ) -> Vec>> { + outputs + .into_iter() + .zip(signatures.into_iter()) + .map(|(output, sig)| self.decode_output_impl(output.as_str(), sig.as_str(), py)) + .collect() + } + + pub fn decode_traces_output_sync( + &self, + traces: Vec, + signatures: Vec, + py: Python, + ) -> Vec>> { + traces + .into_iter() + .zip(signatures.into_iter()) + .map(|(trace, sig)| { + trace + .output + .as_ref() + .and_then(|out| self.decode_output_impl(out.as_str(), sig.as_str(), py)) + }) + .collect() + } + + pub fn decode_output_impl(&self, output: &str, signature: &str, py: Python) -> Option> { + let data = Data::decode_hex(output).context("decode output").unwrap(); + let decoded_output = self + .inner + .decode_output(&data, signature) + .context("decode output") + .unwrap(); + decoded_output.map(|decoded| { + decoded + .into_iter() + .map(|value| DecodedSolValue::new(py, value, self.checksummed_addresses)) + .collect() + }) + } } From 02627d0bd8cc63d7886b41317e13722dcb63795d Mon Sep 17 00:00:00 2001 From: Pratham khatri Date: Thu, 2 Apr 2026 16:10:10 +0530 Subject: [PATCH 2/4] Load ENVIO_API_TOKEN from environment variables --- examples/trace_call_watch.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/trace_call_watch.py b/examples/trace_call_watch.py index 9ca19a3..5fa9eed 100644 --- a/examples/trace_call_watch.py +++ b/examples/trace_call_watch.py @@ -1,10 +1,10 @@ import os import asyncio import hypersync - +from dotenv import load_dotenv from hypersync import TraceField - +load_dotenv() ADDR = "1e037f97d730Cc881e77F01E409D828b0bb14de0" USDC = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48" @@ -14,7 +14,7 @@ def make_client(): - token = "5864d512-089a-4841-b4be-082bb19bc060" + token = os.getenv("ENVIO_API_TOKEN") if not token: raise ValueError("ENVIO_API_TOKEN is required") return hypersync.HypersyncClient( @@ -97,4 +97,4 @@ async def main(): if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) From d5355d9b11d55be730f72c92c7ed7b889bbeb947 Mon Sep 17 00:00:00 2001 From: Pratham khatri Date: Thu, 2 Apr 2026 16:18:35 +0530 Subject: [PATCH 3/4] Update __init__.py --- hypersync/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hypersync/__init__.py b/hypersync/__init__.py index 862b1a0..7bfc878 100644 --- a/hypersync/__init__.py +++ b/hypersync/__init__.py @@ -895,7 +895,7 @@ class QueryResponseStream(object): # receive the next response, returns None if the stream is finished async def recv(self) -> Optional[QueryResponse]: - await self.inner.recv() + await self.inner.recv() # close the stream so it doesn't keep loading data in the background async def close(self): From f4a627c1046b03036e28fe24d1738e208bbee568 Mon Sep 17 00:00:00 2001 From: Pratham khatri Date: Thu, 2 Apr 2026 16:38:04 +0530 Subject: [PATCH 4/4] Update decode_call.rs --- src/decode_call.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/decode_call.rs b/src/decode_call.rs index 2b924f9..e37bcf5 100644 --- a/src/decode_call.rs +++ b/src/decode_call.rs @@ -174,6 +174,11 @@ impl CallDecoder { signatures: Vec, py: Python, ) -> Vec>> { + assert_eq!( ++ outputs.len(), ++ signatures.len(), ++ "outputs and signatures must have the same length" ++ ); outputs .into_iter() .zip(signatures.into_iter())