Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions examples/trace_call_watch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import os
import asyncio
import hypersync
from dotenv import load_dotenv
from hypersync import TraceField

load_dotenv()

ADDR = "1e037f97d730Cc881e77F01E409D828b0bb14de0"
USDC = "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"
BALANCE_OF_SIGHASH = "0x70a08231"
SIG_IN = "balanceOf(address)"
SIG_OUT = "balanceOf(address)(uint256)"


def make_client():
token = os.getenv("ENVIO_API_TOKEN")
if not token:
raise ValueError("ENVIO_API_TOKEN is required")
return hypersync.HypersyncClient(
hypersync.ClientConfig(
url="https://eth.hypersync.xyz",
api_token=token,
)
)


async def main():
client = make_client()
height = await client.get_height()

# recent window; increase if you get no traces
from_block = max(0, height - 500_000)

query = hypersync.Query(
from_block=from_block,
to_block=height,
traces=[
hypersync.TraceSelection(
to=[USDC],
sighash=[BALANCE_OF_SIGHASH],
)
],
field_selection=hypersync.FieldSelection(
trace=[
TraceField.INPUT,
TraceField.OUTPUT,
TraceField.TO,
TraceField.SIGHASH,
TraceField.CALL_TYPE,
TraceField.TRANSACTION_HASH,
]
),
max_num_traces=50,
)

res = await client.get(query)
traces = [t for t in res.data.traces if t.input and t.output]

print(f"got traces={len(res.data.traces)} usable={len(traces)}")
if not traces:
print("No usable traces with both input+output. Increase from_block window.")
return

decoder = hypersync.CallDecoder([SIG_IN])

# 1) decode input (should contain the address argument)
decoded_inputs = decoder.decode_traces_input_sync(traces)

# filter to traces where the decoded address equals your ADDR
target = "0x" + ADDR.lower()
filtered_traces = []
for tr, din in zip(traces, decoded_inputs):
if not din:
continue
# din[0].val should be the address argument
if isinstance(din[0].val, str) and din[0].val.lower() == target:
filtered_traces.append(tr)

print(f"traces calling balanceOf({target}) = {len(filtered_traces)}")
if not filtered_traces:
print("No traces found for that specific holder address in this window.")
print("Still: decode_traces_output_sync can be tested without this filter.")
filtered_traces = traces[:5]

# 2) decode output (uint256 balance)
sigs = [SIG_OUT] * len(filtered_traces)
decoded_outputs = decoder.decode_traces_output_sync(filtered_traces, sigs)

for i, (tr, dout) in enumerate(zip(filtered_traces[:5], decoded_outputs[:5])):
print(f"\n[{i}] tx={tr.transaction_hash} call_type={tr.call_type} to={tr.to} sighash={tr.sighash}")
print(f" output={tr.output}")
if dout is None:
print(" decoded_output=None (missing output or signature mismatch)")
else:
print(f" decoded_output_uint256={dout[0].val}")


if __name__ == "__main__":
asyncio.run(main())
62 changes: 55 additions & 7 deletions hypersync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,12 @@ def disable_checksummed_addresses(self):
self.inner.disable_checksummed_addresses()

async def decode_inputs(self, inputs: list[str]) -> list[list[DecodedSolValue]]:
"""Parse log and return decoded event. Returns None if topic0 not found."""
return await self.inner.decode_inputs(input)
"""Decode ABI-encoded calldata strings; forwards to the Rust CallDecoder."""
return await self.inner.decode_inputs(inputs)

def decode_inputs_sync(self, inputs: list[str]) -> list[list[DecodedSolValue]]:
"""Parse log and return decoded event. Returns None if topic0 not found."""
return self.inner.decode_input_syncs(input)
def decode_inputs_sync(self, inputs: list[str]) -> list[Optional[list[DecodedSolValue]]]:
"""Decode ABI-encoded calldata strings; forwards to the Rust CallDecoder."""
return self.inner.decode_inputs_sync(inputs)

async def decode_transactions_input(
self, txs: list[Transaction]
Expand All @@ -236,6 +236,54 @@ def decode_traces_input_sync(
"""Parse log and return decoded event. Returns None if topic0 not found."""
return self.inner.decode_traces_input_sync(traces)

async def decode_outputs(
self, outputs: list[str], signatures: list[str]
) -> list[Optional[list[DecodedSolValue]]]:
"""Decode ABI-encoded return data using the provided function signatures.

Each output hex string is decoded using the corresponding entry in
signatures. The signature must include output types, e.g.
'balanceOf(address)(uint256)'. Returns None for entries where decoding
fails or the output is empty.
"""
return await self.inner.decode_outputs(outputs, signatures)

def decode_outputs_sync(
self, outputs: list[str], signatures: list[str]
) -> list[Optional[list[DecodedSolValue]]]:
"""Decode ABI-encoded return data using the provided function signatures.

Each output hex string is decoded using the corresponding entry in
signatures. The signature must include output types, e.g.
'balanceOf(address)(uint256)'. Returns None for entries where decoding
fails or the output is empty.
"""
return self.inner.decode_outputs_sync(outputs, signatures)

async def decode_traces_output(
self, traces: list[Trace], signatures: list[str]
) -> list[Optional[list[DecodedSolValue]]]:
"""Decode ABI-encoded output data from traces using the provided function signatures.

Each trace's output field is decoded using the corresponding entry in
signatures. The signature must include output types, e.g.
'balanceOf(address)(uint256)'. Returns None for traces with no output
or where decoding fails.
"""
return await self.inner.decode_traces_output(traces, signatures)

def decode_traces_output_sync(
self, traces: list[Trace], signatures: list[str]
) -> list[Optional[list[DecodedSolValue]]]:
"""Decode ABI-encoded output data from traces using the provided function signatures.

Each trace's output field is decoded using the corresponding entry in
signatures. The signature must include output types, e.g.
'balanceOf(address)(uint256)'. Returns None for traces with no output
or where decoding fails.
"""
return self.inner.decode_traces_output_sync(traces, signatures)


class DataType(StrEnum):
"""Data types supported for mapping."""
Expand Down Expand Up @@ -823,7 +871,7 @@ class ArrowStream(object):

# receive the next response, returns None if the stream is finished
async def recv(self) -> Optional[ArrowResponse]:
await self.inner.recv()
return await self.inner.recv()

# close the stream so it doesn't keep loading data in the background
async def close(self):
Expand All @@ -835,7 +883,7 @@ class EventStream(object):

# receive the next response, returns None if the stream is finished
async def recv(self) -> Optional[EventResponse]:
await self.inner.recv()
return await self.inner.recv()

# close the stream so it doesn't keep loading data in the background
async def close(self):
Expand Down
87 changes: 87 additions & 0 deletions src/decode_call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,91 @@ impl CallDecoder {
.collect()
})
}

pub fn decode_outputs<'py>(
&self,
outputs: Vec<String>,
signatures: Vec<String>,
py: Python<'py>,
) -> PyResult<Bound<'py, PyAny>> {
let decoder = self.clone();

future_into_py(py, async move {
let result = tokio::task::spawn_blocking(move || {
Python::attach(|py| decoder.decode_outputs_sync(outputs, signatures, py))
})
.await
.unwrap();
Ok(result)
})
}

pub fn decode_traces_output<'py>(
&self,
traces: Vec<Trace>,
signatures: Vec<String>,
py: Python<'py>,
) -> PyResult<Bound<'py, PyAny>> {
let decoder = self.clone();

future_into_py(py, async move {
let result = tokio::task::spawn_blocking(move || {
Python::attach(|py| decoder.decode_traces_output_sync(traces, signatures, py))
})
.await
.unwrap();
Ok(result)
})
}

pub fn decode_outputs_sync(
&self,
outputs: Vec<String>,
signatures: Vec<String>,
py: Python,
) -> Vec<Option<Vec<DecodedSolValue>>> {
assert_eq!(
+ outputs.len(),
+ signatures.len(),
+ "outputs and signatures must have the same length"
+ );
outputs
.into_iter()
.zip(signatures.into_iter())
.map(|(output, sig)| self.decode_output_impl(output.as_str(), sig.as_str(), py))
.collect()
}

pub fn decode_traces_output_sync(
&self,
traces: Vec<Trace>,
signatures: Vec<String>,
py: Python,
) -> Vec<Option<Vec<DecodedSolValue>>> {
traces
.into_iter()
.zip(signatures.into_iter())
.map(|(trace, sig)| {
trace
.output
.as_ref()
.and_then(|out| self.decode_output_impl(out.as_str(), sig.as_str(), py))
})
.collect()
}

pub fn decode_output_impl(&self, output: &str, signature: &str, py: Python) -> Option<Vec<DecodedSolValue>> {
let data = Data::decode_hex(output).context("decode output").unwrap();
let decoded_output = self
.inner
.decode_output(&data, signature)
.context("decode output")
.unwrap();
decoded_output.map(|decoded| {
decoded
.into_iter()
.map(|value| DecodedSolValue::new(py, value, self.checksummed_addresses))
.collect()
})
}
}