);
diff --git a/forester/dashboard/src/components/ForesterList.tsx b/forester/dashboard/src/components/ForesterList.tsx
index b6a56736aa..bb9f4c09e2 100644
--- a/forester/dashboard/src/components/ForesterList.tsx
+++ b/forester/dashboard/src/components/ForesterList.tsx
@@ -1,38 +1,163 @@
import type { ForesterInfo } from "@/types/forester";
-import { truncateAddress, formatSol } from "@/lib/utils";
+import { truncateAddress, formatSol, explorerUrl } from "@/lib/utils";
+import type { BalanceTrend } from "@/hooks/useBalanceHistory";
interface ForesterListProps {
+ active: ForesterInfo[];
+ registering: ForesterInfo[];
+ activeEpoch: number;
+ registrationEpoch: number;
+ getTrend?: (authority: string, hours?: number) => BalanceTrend | null;
+}
+
+function BalanceTrendBadge({ trend }: { trend: BalanceTrend }) {
+ const ratePerHour = trend.hourlyRate;
+ const burning = ratePerHour < -0.005; // more than 0.005 SOL/hr
+ const fast = ratePerHour < -0.05; // more than 0.05 SOL/hr
+
+ if (!burning) return null;
+
+ const hoursLeft =
+ ratePerHour < 0 ? Math.abs(trend.current / ratePerHour) : Infinity;
+ const rateStr = Math.abs(ratePerHour).toFixed(3);
+
+ return (
+
+ );
+}
+
+function ForesterRow({
+ info,
+ trend,
+}: {
+ info: ForesterInfo;
+ trend: BalanceTrend | null;
+}) {
+ const low = info.balance_sol != null && info.balance_sol < 0.1;
+ return (
+
+ );
+}
+
+function ForesterSection({
+ title,
+ epoch,
+ foresters,
+ badgeClass,
+ emptyMessage,
+ getTrend,
+}: {
title: string;
+ epoch: number;
foresters: ForesterInfo[];
+ badgeClass: string;
+ emptyMessage: string;
+ getTrend?: (authority: string, hours?: number) => BalanceTrend | null;
+}) {
+ return (
+
+ );
}
-export function ForesterList({ title, foresters }: ForesterListProps) {
+export function ForesterList({
+ active,
+ registering,
+ activeEpoch,
+ registrationEpoch,
+ getTrend,
+}: ForesterListProps) {
+ // Foresters registered for next epoch but not in current
+ const newNextEpoch = registering.filter(
+ (f) => !active.some((a) => a.authority === f.authority)
+ );
+ // Foresters in current epoch that haven't re-registered for next
+ const notReRegistered = active.filter(
+ (f) => !registering.some((r) => r.authority === f.authority)
+ );
+
return (
-
- {title}{" "}
- ({foresters.length})
-
- {foresters.length === 0 ? (
-
No foresters registered
- ) : (
-
- {foresters.map((f, i) => {
- const low = f.balance_sol != null && f.balance_sol < 0.1;
- return (
-
-
- {truncateAddress(f.authority, 6)}
-
-
- {formatSol(f.balance_sol)}
-
-
- );
- })}
+
Foresters
+
+
+
+
+ {/* Continuity warnings */}
+ {(notReRegistered.length > 0 || newNextEpoch.length > 0) && (
+
+ {notReRegistered.length > 0 && (
+
+ {notReRegistered.length} active forester{notReRegistered.length !== 1 ? "s" : ""} not yet registered for next epoch:{" "}
+ {notReRegistered.map((f) => truncateAddress(f.authority, 4)).join(", ")}
+
+ )}
+ {newNextEpoch.length > 0 && (
+
+ {newNextEpoch.length} new forester{newNextEpoch.length !== 1 ? "s" : ""} joining next epoch:{" "}
+ {newNextEpoch.map((f) => truncateAddress(f.authority, 4)).join(", ")}
+
+ )}
)}
diff --git a/forester/dashboard/src/components/MetricsPanel.tsx b/forester/dashboard/src/components/MetricsPanel.tsx
index 7ce372b42c..571ce29e20 100644
--- a/forester/dashboard/src/components/MetricsPanel.tsx
+++ b/forester/dashboard/src/components/MetricsPanel.tsx
@@ -1,125 +1,30 @@
import type { MetricsResponse } from "@/types/forester";
-import { formatNumber, formatSol } from "@/lib/utils";
+import { formatNumber } from "@/lib/utils";
interface MetricsPanelProps {
metrics: MetricsResponse;
}
export function MetricsPanel({ metrics }: MetricsPanelProps) {
- const totalTx = Object.values(metrics.transactions_processed_total).reduce(
- (a, b) => a + b,
- 0
- );
const rates = Object.entries(metrics.transaction_rate);
- const balances = Object.entries(metrics.forester_balances);
- const queues = Object.entries(metrics.queue_lengths);
-
- return (
-
-
-
- 0
- ? new Date(metrics.last_run_timestamp * 1000).toLocaleString()
- : "N/A"
- }
- />
-
-
-
- {rates.length > 0 && (
-
-
- {rates.map(([epoch, rate]) => (
-
-
Epoch {epoch}
-
- {rate.toFixed(2)} tx/s
-
-
- {formatNumber(
- metrics.transactions_processed_total[epoch] ?? 0
- )}{" "}
- total
-
-
- ))}
-
-
- )}
-
- {balances.length > 0 && (
-
-
- {balances.map(([pubkey, balance]) => (
-
-
- {pubkey.slice(0, 8)}...{pubkey.slice(-4)}
-
-
- {formatSol(balance)}
-
-
- ))}
-
-
- )}
-
- {queues.length > 0 && (
-
-
- {queues.map(([tree, length]) => (
-
-
- {tree.slice(0, 8)}...{tree.slice(-4)}
-
-
- {formatNumber(length)}
-
-
- ))}
-
-
- )}
-
- );
-}
-function StatCard({ label, value }: { label: string; value: string }) {
- return (
-
- );
-}
+ if (rates.length === 0) return null;
-function Section({
- title,
- children,
-}: {
- title: string;
- children: React.ReactNode;
-}) {
return (
-
{title}
- {children}
+
+ {rates.map(([epoch, rate]) => (
+
+
Epoch {epoch}
+
+ {rate.toFixed(2)} tx/s
+
+
+ {formatNumber(metrics.transactions_processed_total[epoch] ?? 0)} total
+
+
+ ))}
+
);
}
diff --git a/forester/dashboard/src/components/PhotonStatsPanel.tsx b/forester/dashboard/src/components/PhotonStatsPanel.tsx
new file mode 100644
index 0000000000..52ea820914
--- /dev/null
+++ b/forester/dashboard/src/components/PhotonStatsPanel.tsx
@@ -0,0 +1,106 @@
+import type { PhotonStats } from "@/types/forester";
+import { formatNumber, formatAgeFromUnixSeconds } from "@/lib/utils";
+
+interface PhotonStatsPanelProps {
+ data: PhotonStats;
+}
+
+function compactNumber(n: number): string {
+ if (n >= 1_000_000) return `${(n / 1_000_000).toFixed(1)}M`;
+ if (n >= 1_000) return `${(n / 1_000).toFixed(1)}K`;
+ return formatNumber(n);
+}
+
+export function PhotonStatsPanel({ data }: PhotonStatsPanelProps) {
+ if (data.error) {
+ return (
+
+ );
+ }
+
+ const rows = [
+ {
+ label: "Compressed Accounts",
+ total: data.accounts.total,
+ active: data.accounts.active,
+ },
+ {
+ label: "Token Accounts",
+ total: data.token_accounts.total,
+ active: data.token_accounts.active,
+ },
+ {
+ label: "Compressed by Forester",
+ total: data.compressed_from_onchain.total,
+ active: data.compressed_from_onchain.active,
+ },
+ ];
+
+ return (
+
+ {/* Summary cards */}
+
+
+
+
+
+
+ {/* Detail table */}
+
+
+
+
+ | Category |
+ Total |
+ Active |
+ Spent |
+
+
+
+ {rows.map((row) => (
+
+ |
+ {row.label}
+ |
+
+ {formatNumber(row.total)}
+ |
+
+ {formatNumber(row.active)}
+ |
+
+ {formatNumber(row.total - row.active)}
+ |
+
+ ))}
+
+
+
+
+
+ Source: photon DB
+ {data.timestamp
+ ? ` · ${formatAgeFromUnixSeconds(data.timestamp)}`
+ : ""}
+
+
+ );
+}
+
+function MiniStat({ label, value }: { label: string; value: number }) {
+ return (
+
+
+ {label}
+
+
+ {compactNumber(value)}
+
+
+ );
+}
diff --git a/forester/dashboard/src/components/QueuePressureChart.tsx b/forester/dashboard/src/components/QueuePressureChart.tsx
index 5b6d15eefa..d06b8e6943 100644
--- a/forester/dashboard/src/components/QueuePressureChart.tsx
+++ b/forester/dashboard/src/components/QueuePressureChart.tsx
@@ -6,42 +6,44 @@ interface QueuePressureChartProps {
}
const entries: {
- key: keyof AggregateQueueStats;
+ batchKey: keyof AggregateQueueStats;
+ itemKey: keyof AggregateQueueStats;
label: string;
color: string;
}[] = [
{
- key: "state_v1_total_pending",
- label: "State V1",
- color: "bg-purple-500",
- },
- {
- key: "state_v2_input_pending",
+ batchKey: "state_v2_input_pending_batches",
+ itemKey: "state_v2_input_pending",
label: "State V2 Input",
color: "bg-indigo-500",
},
{
- key: "state_v2_output_pending",
+ batchKey: "state_v2_output_pending_batches",
+ itemKey: "state_v2_output_pending",
label: "State V2 Output",
color: "bg-indigo-300",
},
{
- key: "address_v1_total_pending",
- label: "Addr V1",
- color: "bg-teal-500",
- },
- {
- key: "address_v2_input_pending",
- label: "Addr V2 Input",
+ batchKey: "address_v2_input_pending_batches",
+ itemKey: "address_v2_input_pending",
+ label: "Addr V2",
color: "bg-cyan-500",
},
];
+const v1Entries: {
+ key: keyof AggregateQueueStats;
+ label: string;
+ color: string;
+}[] = [
+ { key: "state_v1_total_pending", label: "State V1", color: "bg-purple-500" },
+ { key: "address_v1_total_pending", label: "Addr V1", color: "bg-teal-500" },
+];
+
export function QueuePressureChart({ stats }: QueuePressureChartProps) {
- const maxVal = Math.max(
- ...entries.map((e) => stats[e.key]),
- 1
- );
+ const allBatches = entries.map((e) => stats[e.batchKey]);
+ const allV1 = v1Entries.map((e) => stats[e.key]);
+ const maxVal = Math.max(...allBatches, ...allV1, 1);
return (
@@ -50,13 +52,36 @@ export function QueuePressureChart({ stats }: QueuePressureChartProps) {
{entries.map((e) => {
+ const batches = stats[e.batchKey];
+ const items = stats[e.itemKey];
+ const pct = (batches / maxVal) * 100;
+ return (
+
+
+ {e.label}
+
+ {formatNumber(batches)}{batches !== 1 ? " batches" : " batch"}
+ ({formatNumber(items)} items)
+
+
+
+
+ );
+ })}
+ {v1Entries.map((e) => {
const val = stats[e.key];
+ if (val === 0) return null;
const pct = (val / maxVal) * 100;
return (
{e.label}
- {formatNumber(val)}
+ {formatNumber(val)} items
-
-
Light Protocol
-
Forester Dashboard
-
-
-
- );
-}
diff --git a/forester/dashboard/src/components/TreeTable.tsx b/forester/dashboard/src/components/TreeTable.tsx
index c49d365c99..2a82faaa8f 100644
--- a/forester/dashboard/src/components/TreeTable.tsx
+++ b/forester/dashboard/src/components/TreeTable.tsx
@@ -10,6 +10,7 @@ import {
formatNumber,
formatPercentage,
treeTypeColor,
+ explorerUrl,
} from "@/lib/utils";
interface TreeTableProps {
@@ -18,6 +19,20 @@ interface TreeTableProps {
currentLightSlot: number | null;
}
+function pendingColor(count: number, isBatches: boolean): string {
+ if (count === 0) return "text-gray-400";
+ if (isBatches) {
+ // V2: batches — 1 is normal, 2+ is busy, 3+ is hot
+ if (count >= 3) return "text-red-600 font-medium";
+ if (count >= 2) return "text-amber-600 font-medium";
+ return "text-gray-700";
+ }
+ // V1: item count
+ if (count >= 500) return "text-red-600 font-medium";
+ if (count >= 100) return "text-amber-600 font-medium";
+ return "text-gray-700";
+}
+
type SortKey = "type" | "fullness" | "pending";
type FilterType = "all" | "StateV1" | "StateV2" | "AddressV1" | "AddressV2";
@@ -99,7 +114,9 @@ export function TreeTable({
-
+
+
+
@@ -131,8 +148,17 @@ export function TreeTable({
color={treeTypeColor(tree.tree_type)}
/>
- |
- {truncateAddress(tree.merkle_tree, 6)}
+ |
+ e.stopPropagation()}
+ >
+ {truncateAddress(tree.merkle_tree, 6)}
+
|
@@ -151,19 +177,18 @@ export function TreeTable({
|
{tree.v2_queue_info ? (
-
- I:{tree.v2_queue_info.input_pending_batches *
- tree.v2_queue_info.zkp_batch_size}{" "}
- {tree.tree_type === "StateV2" && (
+
+ {tree.tree_type === "StateV2" ? (
<>
- O:
- {tree.v2_queue_info.output_pending_batches *
- tree.v2_queue_info.zkp_batch_size}
+ I:{tree.v2_queue_info.input_pending_batches}{" "}
+ O:{tree.v2_queue_info.output_pending_batches}
>
+ ) : (
+ tree.v2_queue_info.input_pending_batches
)}
) : (
-
+
{tree.queue_length != null
? formatNumber(tree.queue_length)
: "-"}
@@ -171,14 +196,24 @@ export function TreeTable({
)}
|
- {tree.assigned_forester
- ? truncateAddress(tree.assigned_forester, 4)
- : "-"}
+ {tree.assigned_forester ? (
+ e.stopPropagation()}
+ >
+ {truncateAddress(tree.assigned_forester, 4)}
+
+ ) : "-"}
|
|
@@ -206,38 +241,106 @@ export function TreeTable({
import { Fragment } from "react";
+// Distinct colors for up to 8 foresters; cycles if more
+const FORESTER_COLORS = [
+ "bg-emerald-400",
+ "bg-blue-400",
+ "bg-amber-400",
+ "bg-rose-400",
+ "bg-violet-400",
+ "bg-cyan-400",
+ "bg-orange-400",
+ "bg-pink-400",
+];
+
+const FORESTER_HEX = [
+ "#34d399",
+ "#60a5fa",
+ "#fbbf24",
+ "#fb7185",
+ "#a78bfa",
+ "#22d3ee",
+ "#fb923c",
+ "#f472b6",
+];
+
+function foresterColor(index: number): string {
+ return FORESTER_COLORS[index % FORESTER_COLORS.length];
+}
+
function ScheduleGrid({
schedule,
currentSlot,
+ foresters,
}: {
schedule: (number | null)[];
currentSlot: number | null;
+ foresters: ForesterInfo[];
}) {
if (schedule.length === 0)
return -;
// Show a compact view: only around the current slot
const start = currentSlot != null ? Math.max(0, currentSlot - 2) : 0;
- const visible = schedule.slice(start, start + 8);
+ const visible = schedule.slice(start, start + 12);
return (
-
+
{visible.map((slot, i) => {
const idx = start + i;
const isCurrent = idx === currentSlot;
+ const foresterName =
+ slot != null && foresters[slot]
+ ? truncateAddress(foresters[slot].authority, 3)
+ : slot != null
+ ? `#${slot}`
+ : "unassigned";
return (
);
})}
- {schedule.length > 8 && (
-
+{schedule.length - 8}
+ {schedule.length > start + 12 && (
+
+{schedule.length - start - 12}
)}
);
}
+
+export function ForesterScheduleLegend({ foresters }: { foresters: ForesterInfo[] }) {
+ if (foresters.length === 0) return null;
+ return (
+
+
Schedule:
+ {foresters.map((f, i) => (
+
+ ))}
+
+
+
+ );
+}
diff --git a/forester/dashboard/src/hooks/useBalanceHistory.ts b/forester/dashboard/src/hooks/useBalanceHistory.ts
new file mode 100644
index 0000000000..1ed0773e26
--- /dev/null
+++ b/forester/dashboard/src/hooks/useBalanceHistory.ts
@@ -0,0 +1,99 @@
+import { useEffect, useRef, useCallback } from "react";
+import type { ForesterInfo } from "@/types/forester";
+
+interface BalanceSnapshot {
+ timestamp: number; // unix ms
+ balances: Record
; // authority -> balance_sol
+}
+
+const STORAGE_KEY = "forester_balance_history";
+const MAX_HISTORY_HOURS = 24;
+const SNAPSHOT_INTERVAL_MS = 60_000; // store at most once per minute
+
+function loadHistory(): BalanceSnapshot[] {
+ try {
+ const raw = localStorage.getItem(STORAGE_KEY);
+ if (!raw) return [];
+ const data = JSON.parse(raw) as BalanceSnapshot[];
+ const cutoff = Date.now() - MAX_HISTORY_HOURS * 3600_000;
+ return data.filter((s) => s.timestamp > cutoff);
+ } catch {
+ return [];
+ }
+}
+
+function saveHistory(history: BalanceSnapshot[]) {
+ try {
+ const cutoff = Date.now() - MAX_HISTORY_HOURS * 3600_000;
+ const trimmed = history.filter((s) => s.timestamp > cutoff);
+ localStorage.setItem(STORAGE_KEY, JSON.stringify(trimmed));
+ } catch {
+ // storage full or unavailable
+ }
+}
+
+export interface BalanceTrend {
+ current: number;
+ hourlyRate: number; // SOL per hour (negative = burning)
+ hoursTracked: number;
+ oldest: number;
+}
+
+export function useBalanceHistory(foresters: ForesterInfo[]) {
+ const lastSnapshot = useRef(0);
+
+ // Record a snapshot if enough time has passed
+ useEffect(() => {
+ if (foresters.length === 0) return;
+ const now = Date.now();
+ if (now - lastSnapshot.current < SNAPSHOT_INTERVAL_MS) return;
+ lastSnapshot.current = now;
+
+ const balances: Record = {};
+ for (const f of foresters) {
+ if (f.balance_sol != null) {
+ balances[f.authority] = f.balance_sol;
+ }
+ }
+ if (Object.keys(balances).length === 0) return;
+
+ const history = loadHistory();
+ history.push({ timestamp: now, balances });
+ saveHistory(history);
+ }, [foresters]);
+
+ const getTrend = useCallback(
+ (authority: string, hours: number = 6): BalanceTrend | null => {
+ const forester = foresters.find((f) => f.authority === authority);
+ if (!forester || forester.balance_sol == null) return null;
+
+ const history = loadHistory();
+ const cutoff = Date.now() - hours * 3600_000;
+ const relevant = history
+ .filter((s) => s.timestamp > cutoff && s.balances[authority] != null)
+ .sort((a, b) => a.timestamp - b.timestamp);
+
+ if (relevant.length < 2) return null;
+
+ const oldest = relevant[0];
+ const newest = relevant[relevant.length - 1];
+ const timeDiffHours =
+ (newest.timestamp - oldest.timestamp) / 3600_000;
+ if (timeDiffHours < 0.05) return null; // less than 3 minutes
+
+ const balanceDiff =
+ newest.balances[authority] - oldest.balances[authority];
+ const hourlyRate = balanceDiff / timeDiffHours;
+
+ return {
+ current: forester.balance_sol,
+ hourlyRate,
+ hoursTracked: timeDiffHours,
+ oldest: oldest.balances[authority],
+ };
+ },
+ [foresters]
+ );
+
+ return { getTrend };
+}
diff --git a/forester/dashboard/src/hooks/useCompressible.ts b/forester/dashboard/src/hooks/useCompressible.ts
index 1158185f79..364ceded13 100644
--- a/forester/dashboard/src/hooks/useCompressible.ts
+++ b/forester/dashboard/src/hooks/useCompressible.ts
@@ -4,7 +4,7 @@ import type { CompressibleResponse } from "@/types/forester";
export function useCompressible() {
return useSWR("/compressible", fetcher, {
- refreshInterval: 15000,
+ refreshInterval: 5000,
revalidateOnFocus: true,
});
}
diff --git a/forester/dashboard/src/hooks/usePhotonStats.ts b/forester/dashboard/src/hooks/usePhotonStats.ts
new file mode 100644
index 0000000000..14207ae0b9
--- /dev/null
+++ b/forester/dashboard/src/hooks/usePhotonStats.ts
@@ -0,0 +1,18 @@
+import useSWR from "swr";
+import type { PhotonStats } from "@/types/forester";
+
+async function fetchPhotonStats(): Promise {
+ const res = await fetch("/api/photon-stats", { cache: "no-store" });
+ if (!res.ok) {
+ const body = await res.json().catch(() => ({}));
+ throw new Error(body.error || `HTTP ${res.status}`);
+ }
+ return res.json();
+}
+
+export function usePhotonStats() {
+ return useSWR("photon-stats", fetchPhotonStats, {
+ refreshInterval: 30_000,
+ revalidateOnFocus: true,
+ });
+}
diff --git a/forester/dashboard/src/lib/api.ts b/forester/dashboard/src/lib/api.ts
index 186359c71f..ff9084b9dd 100644
--- a/forester/dashboard/src/lib/api.ts
+++ b/forester/dashboard/src/lib/api.ts
@@ -1,5 +1,9 @@
const API_URL =
- process.env.NEXT_PUBLIC_FORESTER_API_URL || "http://localhost:8080";
+ process.env.NEXT_PUBLIC_FORESTER_API_URL ?? "/api";
+
+const REQUEST_TIMEOUT_MS = Number(
+ process.env.NEXT_PUBLIC_FORESTER_API_TIMEOUT_MS ?? 8000
+);
export class ApiError extends Error {
constructor(
@@ -12,16 +16,44 @@ export class ApiError extends Error {
}
}
+function isAbortError(error: unknown): boolean {
+ return (
+ typeof error === "object" &&
+ error !== null &&
+ "name" in error &&
+ (error as { name?: string }).name === "AbortError"
+ );
+}
+
export async function fetchApi(path: string): Promise {
+ const url = `${API_URL}${path}`;
+ const timeoutMs =
+ Number.isFinite(REQUEST_TIMEOUT_MS) && REQUEST_TIMEOUT_MS > 0
+ ? REQUEST_TIMEOUT_MS
+ : 8000;
+
+ const controller = new AbortController();
+ const timer = setTimeout(() => controller.abort(), timeoutMs);
+
let res: Response;
try {
- res = await fetch(`${API_URL}${path}`);
+ res = await fetch(url, {
+ cache: "no-store",
+ signal: controller.signal,
+ });
} catch (e) {
- // Network error — server not reachable
+ if (isAbortError(e)) {
+ throw new Error(
+ `Request to forester API timed out after ${timeoutMs}ms (${url}).`
+ );
+ }
throw new Error(
`Cannot connect to forester API at ${API_URL}. Make sure the API server is running.`
);
+ } finally {
+ clearTimeout(timer);
}
+
if (!res.ok) {
let serverMsg = "";
try {
@@ -36,6 +68,7 @@ export async function fetchApi(path: string): Promise {
serverMsg
);
}
+
return res.json();
}
diff --git a/forester/dashboard/src/lib/utils.ts b/forester/dashboard/src/lib/utils.ts
index 0249948133..a505fe2fea 100644
--- a/forester/dashboard/src/lib/utils.ts
+++ b/forester/dashboard/src/lib/utils.ts
@@ -3,6 +3,18 @@ export function truncateAddress(addr: string, chars = 4): string {
return `${addr.slice(0, chars)}...${addr.slice(-chars)}`;
}
+export function explorerUrl(address: string): string {
+ let network = "mainnet";
+ if (typeof window !== "undefined") {
+ const host = window.location.hostname;
+ if (host.includes("-devnet")) network = "devnet";
+ else if (host.includes("-mainnet")) network = "mainnet";
+ else network = process.env.NEXT_PUBLIC_SOLANA_NETWORK ?? "mainnet";
+ }
+ const cluster = network === "mainnet" ? "" : `?cluster=${network}`;
+ return `https://explorer.solana.com/address/${address}${cluster}`;
+}
+
export function formatSol(lamports: number | null | undefined): string {
if (lamports == null) return "-";
return `${lamports.toFixed(4)} SOL`;
@@ -16,8 +28,10 @@ export function formatPercentage(n: number, decimals = 2): string {
return `${n.toFixed(decimals)}%`;
}
+const DEFAULT_SLOT_DURATION_MS = 400;
+
export function slotsToTime(slots: number): string {
- const seconds = Math.round(slots * 0.46);
+ const seconds = Math.round((slots * DEFAULT_SLOT_DURATION_MS) / 1000);
if (seconds < 60) return `${seconds}s`;
if (seconds < 3600) return `${Math.round(seconds / 60)}m`;
const hours = Math.floor(seconds / 3600);
@@ -25,6 +39,34 @@ export function slotsToTime(slots: number): string {
return mins > 0 ? `${hours}h ${mins}m` : `${hours}h`;
}
+export function formatAgeFromUnixSeconds(unixTs: number | null | undefined): string {
+ if (unixTs == null || unixTs <= 0) return "unknown";
+ const ageSec = Math.max(0, Math.floor(Date.now() / 1000) - unixTs);
+ if (ageSec < 5) return "just now";
+ if (ageSec < 60) return `${ageSec}s ago`;
+ if (ageSec < 3600) return `${Math.round(ageSec / 60)}m ago`;
+ const hours = Math.floor(ageSec / 3600);
+ const mins = Math.round((ageSec % 3600) / 60);
+ return mins > 0 ? `${hours}h ${mins}m ago` : `${hours}h ago`;
+}
+
+export function formatSlotCountdown(
+ currentSlot: number | null | undefined,
+ nextReadySlot: number | null | undefined,
+ ready?: number,
+ waiting?: number
+): string {
+ if (nextReadySlot == null) {
+ // No waiting accounts — distinguish "all caught up" from "nothing tracked"
+ if (ready != null && ready > 0 && waiting === 0) return "caught up";
+ return "—";
+ }
+ if (currentSlot == null) return `slot ${nextReadySlot.toLocaleString()}`;
+ if (currentSlot > nextReadySlot) return "ready now";
+ const remaining = nextReadySlot - currentSlot;
+ return `${remaining.toLocaleString()} slots (~${slotsToTime(remaining)})`;
+}
+
export function batchStateLabel(state: number): string {
switch (state) {
case 0:
diff --git a/forester/dashboard/src/types/forester.ts b/forester/dashboard/src/types/forester.ts
index c9f047dfe4..fc094aec26 100644
--- a/forester/dashboard/src/types/forester.ts
+++ b/forester/dashboard/src/types/forester.ts
@@ -46,6 +46,9 @@ export interface AggregateQueueStats {
state_v2_output_pending: number;
address_v1_total_pending: number;
address_v2_input_pending: number;
+ state_v2_input_pending_batches: number;
+ state_v2_output_pending_batches: number;
+ address_v2_input_pending_batches: number;
}
export interface ForesterStatus {
@@ -68,6 +71,7 @@ export interface ForesterStatus {
total_trees: number;
active_trees: number;
rolled_over_trees: number;
+ total_pending_batches: number;
total_pending_items: number;
aggregate_queue_stats: AggregateQueueStats;
}
@@ -83,6 +87,60 @@ export interface MetricsResponse {
export interface CompressibleResponse {
enabled: boolean;
ctoken_count?: number;
+ ata_count?: number;
pda_count?: number;
mint_count?: number;
+ current_slot?: number;
+ total_tracked?: number;
+ total_ready?: number;
+ total_waiting?: number;
+ ctoken?: CompressibleTypeStats;
+ ata?: CompressibleTypeStats;
+ pda?: CompressibleTypeStats;
+ mint?: CompressibleTypeStats;
+ pda_programs?: PdaProgramStats[];
+ upstreams?: CompressibleUpstreamStatus[];
+ note?: string;
+ error?: string;
+ refresh_interval_secs?: number;
+ source?: string;
+ cached_at?: number;
+}
+
+export interface CompressibleTypeStats {
+ tracked: number;
+ compressed?: number;
+ ready?: number;
+ waiting?: number;
+ next_ready_slot?: number;
+}
+
+export interface PdaProgramStats {
+ program_id: string;
+ tracked: number;
+ compressed?: number;
+ ready?: number;
+ waiting?: number;
+ next_ready_slot?: number;
+}
+
+
+export interface CompressibleUpstreamStatus {
+ base_url: string;
+ ok: boolean;
+ source?: string;
+ cached_at?: number;
+ error?: string;
+}
+
+export interface PhotonStats {
+ accounts: { total: number; active: number };
+ token_accounts: { total: number; active: number };
+ compressed_from_onchain: {
+ total: number;
+ active: number;
+ by_owner: { owner: string; total: number; active: number }[];
+ };
+ timestamp: number;
+ error?: string;
}
diff --git a/forester/src/api_server.rs b/forester/src/api_server.rs
index e83b0f04e4..4ea4b091b4 100644
--- a/forester/src/api_server.rs
+++ b/forester/src/api_server.rs
@@ -6,7 +6,10 @@ use tracing::{error, info, warn};
use warp::Filter;
use crate::{
- compressible::{CTokenAccountTracker, MintAccountTracker, PdaAccountTracker},
+ compressible::{
+ traits::{CompressibleState, CompressibleTracker},
+ CTokenAccountState, CTokenAccountTracker, MintAccountTracker, PdaAccountTracker,
+ },
forester_status::get_forester_status,
metrics::REGISTRY,
};
@@ -36,9 +39,76 @@ pub struct CompressibleResponse {
#[serde(skip_serializing_if = "Option::is_none")]
pub ctoken_count: Option,
#[serde(skip_serializing_if = "Option::is_none")]
+ pub ata_count: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
pub pda_count: Option,
#[serde(skip_serializing_if = "Option::is_none")]
pub mint_count: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub current_slot: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub total_tracked: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub total_ready: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub total_waiting: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub ctoken: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub ata: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub pda: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub mint: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub pda_programs: Option>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub upstreams: Option>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub note: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub error: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub refresh_interval_secs: Option,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct CompressibleTypeStats {
+ pub tracked: usize,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub compressed: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub ready: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub waiting: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub next_ready_slot: Option,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct PdaProgramStats {
+ pub program_id: String,
+ pub tracked: usize,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub compressed: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub ready: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub waiting: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub next_ready_slot: Option,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct CompressibleUpstreamStatus {
+ pub base_url: String,
+ pub ok: bool,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub source: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub cached_at: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub error: Option,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -73,8 +143,22 @@ impl CompressibleSnapshot {
data: CompressibleResponse {
enabled: false,
ctoken_count: None,
+ ata_count: None,
pda_count: None,
mint_count: None,
+ current_slot: None,
+ total_tracked: None,
+ total_ready: None,
+ total_waiting: None,
+ ctoken: None,
+ ata: None,
+ pda: None,
+ mint: None,
+ pda_programs: None,
+ upstreams: None,
+ note: None,
+ error: None,
+ refresh_interval_secs: None,
},
source: "none".to_string(),
cached_at: 0,
@@ -82,6 +166,395 @@ impl CompressibleSnapshot {
}
}
+fn summarize_slots(
+ slots: I,
+ current_slot: Option,
+ compressed: Option,
+) -> CompressibleTypeStats
+where
+ I: IntoIterator- ,
+{
+ let all_slots: Vec = slots.into_iter().collect();
+ let tracked = all_slots.len();
+
+ if let Some(slot) = current_slot {
+ let mut ready = 0usize;
+ let mut next_ready_slot: Option = None;
+ for compressible_slot in all_slots {
+ if slot > compressible_slot {
+ ready += 1;
+ } else {
+ next_ready_slot = Some(
+ next_ready_slot
+ .map(|current_min| current_min.min(compressible_slot))
+ .unwrap_or(compressible_slot),
+ );
+ }
+ }
+ CompressibleTypeStats {
+ tracked,
+ compressed,
+ ready: Some(ready),
+ waiting: Some(tracked.saturating_sub(ready)),
+ next_ready_slot,
+ }
+ } else {
+ CompressibleTypeStats {
+ tracked,
+ compressed,
+ ready: None,
+ waiting: None,
+ next_ready_slot: None,
+ }
+ }
+}
+
+fn is_ctoken_ata(state: &CTokenAccountState) -> bool {
+ state.is_ata
+}
+
+fn summarize_ctoken_and_ata_slots(
+ tracker: &CTokenAccountTracker,
+ current_slot: Option,
+) -> (CompressibleTypeStats, CompressibleTypeStats) {
+ let mut ctoken_slots: Vec = Vec::new();
+ let mut ata_slots: Vec = Vec::new();
+
+ for entry in tracker.accounts().iter() {
+ let state = entry.value();
+ let slot = state.compressible_slot();
+ ctoken_slots.push(slot);
+ if is_ctoken_ata(state) {
+ ata_slots.push(slot);
+ }
+ }
+
+ let compressed = Some(tracker.total_compressed());
+ (
+ summarize_slots(ctoken_slots, current_slot, compressed),
+ summarize_slots(ata_slots, current_slot, None),
+ )
+}
+
+fn aggregate_optional_sum(values: impl Iterator
- >) -> Option {
+ let mut sum = 0usize;
+ let mut seen = false;
+ for v in values.flatten() {
+ sum = sum.saturating_add(v);
+ seen = true;
+ }
+ if seen {
+ Some(sum)
+ } else {
+ None
+ }
+}
+
+fn aggregate_type_stats(
+ stats: impl Iterator
- >,
+) -> Option {
+ let mut tracked = 0usize;
+ let mut compressed = 0u64;
+ let mut compressed_seen = false;
+ let mut ready = 0usize;
+ let mut waiting = 0usize;
+ let mut ready_seen = false;
+ let mut waiting_seen = false;
+ let mut next_ready_slot: Option = None;
+ let mut any = false;
+
+ for stat in stats.flatten() {
+ any = true;
+ tracked = tracked.saturating_add(stat.tracked);
+ if let Some(v) = stat.compressed {
+ compressed = compressed.saturating_add(v);
+ compressed_seen = true;
+ }
+ if let Some(v) = stat.ready {
+ ready = ready.saturating_add(v);
+ ready_seen = true;
+ }
+ if let Some(v) = stat.waiting {
+ waiting = waiting.saturating_add(v);
+ waiting_seen = true;
+ }
+ if let Some(slot) = stat.next_ready_slot {
+ next_ready_slot = Some(
+ next_ready_slot
+ .map(|current| current.min(slot))
+ .unwrap_or(slot),
+ );
+ }
+ }
+
+ if !any {
+ return None;
+ }
+
+ Some(CompressibleTypeStats {
+ tracked,
+ compressed: compressed_seen.then_some(compressed),
+ ready: ready_seen.then_some(ready),
+ waiting: waiting_seen.then_some(waiting),
+ next_ready_slot,
+ })
+}
+
+async fn fetch_upstream_compressible(
+ client: &reqwest::Client,
+ base_url: String,
+) -> (CompressibleUpstreamStatus, Option) {
+ let normalized = base_url.trim_end_matches('/').to_string();
+ let endpoint = format!("{}/compressible", normalized);
+
+ let response =
+ match tokio::time::timeout(Duration::from_secs(6), client.get(&endpoint).send()).await {
+ Ok(Ok(resp)) => resp,
+ Ok(Err(e)) => {
+ return (
+ CompressibleUpstreamStatus {
+ base_url: normalized,
+ ok: false,
+ source: None,
+ cached_at: None,
+ error: Some(format!("request failed: {}", e)),
+ },
+ None,
+ );
+ }
+ Err(_) => {
+ return (
+ CompressibleUpstreamStatus {
+ base_url: normalized,
+ ok: false,
+ source: None,
+ cached_at: None,
+ error: Some("request timed out after 6s".to_string()),
+ },
+ None,
+ );
+ }
+ };
+
+ if !response.status().is_success() {
+ let status = response.status();
+ return (
+ CompressibleUpstreamStatus {
+ base_url: normalized,
+ ok: false,
+ source: None,
+ cached_at: None,
+ error: Some(format!("upstream returned {}", status)),
+ },
+ None,
+ );
+ }
+
+ let snapshot = match response.json::().await {
+ Ok(snapshot) => snapshot,
+ Err(e) => {
+ return (
+ CompressibleUpstreamStatus {
+ base_url: normalized,
+ ok: false,
+ source: None,
+ cached_at: None,
+ error: Some(format!("invalid json payload: {}", e)),
+ },
+ None,
+ );
+ }
+ };
+
+ (
+ CompressibleUpstreamStatus {
+ base_url: normalized,
+ ok: true,
+ source: Some(snapshot.source.clone()),
+ cached_at: Some(snapshot.cached_at),
+ error: None,
+ },
+ Some(snapshot),
+ )
+}
+
+async fn fetch_compressible_snapshot_from_foresters(
+ forester_api_urls: &[String],
+ run_id: &str,
+) -> Option {
+ if forester_api_urls.is_empty() {
+ return None;
+ }
+
+ let now = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .unwrap_or_default()
+ .as_secs() as i64;
+
+ let client = reqwest::Client::new();
+ let calls = forester_api_urls
+ .iter()
+ .cloned()
+ .map(|url| fetch_upstream_compressible(&client, url));
+
+ let responses = futures::future::join_all(calls).await;
+ let mut upstreams: Vec = Vec::with_capacity(responses.len());
+ let mut snapshots: Vec = Vec::new();
+
+ for (status, snapshot) in responses {
+ upstreams.push(status);
+ if let Some(snapshot) = snapshot {
+ snapshots.push(snapshot);
+ }
+ }
+
+ let ok_count = snapshots.len();
+ let fail_count = upstreams.len().saturating_sub(ok_count);
+
+ if snapshots.is_empty() {
+ warn!(
+ event = "api_server_upstream_compressible_all_failed",
+ run_id = %run_id,
+ total_upstreams = upstreams.len(),
+ "All upstream forester compressible endpoints failed"
+ );
+ return Some(CompressibleSnapshot {
+ data: CompressibleResponse {
+ enabled: false,
+ ctoken_count: None,
+ ata_count: None,
+ pda_count: None,
+ mint_count: None,
+ current_slot: None,
+ total_tracked: None,
+ total_ready: None,
+ total_waiting: None,
+ ctoken: None,
+ ata: None,
+ pda: None,
+ mint: None,
+ pda_programs: None,
+ upstreams: Some(upstreams),
+ note: None,
+ error: Some(
+ "Failed to fetch /compressible from all configured forester APIs".to_string(),
+ ),
+ refresh_interval_secs: Some(5),
+ },
+ source: "forester-apis".to_string(),
+ cached_at: now,
+ });
+ }
+
+ let ctoken_count = aggregate_optional_sum(snapshots.iter().map(|s| s.data.ctoken_count));
+ let ata_count = aggregate_optional_sum(snapshots.iter().map(|s| s.data.ata_count));
+ let pda_count = aggregate_optional_sum(snapshots.iter().map(|s| s.data.pda_count));
+ let mint_count = aggregate_optional_sum(snapshots.iter().map(|s| s.data.mint_count));
+
+ let ctoken = aggregate_type_stats(snapshots.iter().map(|s| s.data.ctoken.clone()));
+ let ata = aggregate_type_stats(snapshots.iter().map(|s| s.data.ata.clone()));
+ let pda = aggregate_type_stats(snapshots.iter().map(|s| s.data.pda.clone()));
+ let mint = aggregate_type_stats(snapshots.iter().map(|s| s.data.mint.clone()));
+
+ let mut pda_program_map: HashMap = HashMap::new();
+ for snapshot in &snapshots {
+ if let Some(programs) = &snapshot.data.pda_programs {
+ for row in programs {
+ let entry = pda_program_map.entry(row.program_id.clone()).or_insert(
+ CompressibleTypeStats {
+ tracked: 0,
+ compressed: None,
+ ready: Some(0),
+ waiting: Some(0),
+ next_ready_slot: None,
+ },
+ );
+ entry.tracked = entry.tracked.saturating_add(row.tracked);
+ entry.ready = Some(
+ entry
+ .ready
+ .unwrap_or(0)
+ .saturating_add(row.ready.unwrap_or(0)),
+ );
+ entry.waiting = Some(
+ entry
+ .waiting
+ .unwrap_or(0)
+ .saturating_add(row.waiting.unwrap_or(0)),
+ );
+ if let Some(slot) = row.next_ready_slot {
+ entry.next_ready_slot = Some(
+ entry
+ .next_ready_slot
+ .map(|current| current.min(slot))
+ .unwrap_or(slot),
+ );
+ }
+ }
+ }
+ }
+
+ let mut pda_programs: Vec = pda_program_map
+ .into_iter()
+ .map(|(program_id, stats)| PdaProgramStats {
+ program_id,
+ tracked: stats.tracked,
+ compressed: stats.compressed,
+ ready: stats.ready,
+ waiting: stats.waiting,
+ next_ready_slot: stats.next_ready_slot,
+ })
+ .collect();
+ pda_programs.sort_by(|a, b| a.program_id.cmp(&b.program_id));
+
+ let current_slot = snapshots.iter().filter_map(|s| s.data.current_slot).max();
+ let total_tracked = aggregate_optional_sum(snapshots.iter().map(|s| s.data.total_tracked))
+ .or_else(|| {
+ Some(ctoken_count.unwrap_or(0) + pda_count.unwrap_or(0) + mint_count.unwrap_or(0))
+ });
+ let total_ready = aggregate_optional_sum(snapshots.iter().map(|s| s.data.total_ready));
+ let total_waiting = aggregate_optional_sum(snapshots.iter().map(|s| s.data.total_waiting));
+
+ let note = if fail_count > 0 {
+ Some(format!(
+ "Aggregated from {}/{} forester API endpoints ({} unavailable).",
+ ok_count,
+ upstreams.len(),
+ fail_count
+ ))
+ } else {
+ Some(format!(
+ "Aggregated from {} forester API endpoint(s).",
+ ok_count
+ ))
+ };
+
+ Some(CompressibleSnapshot {
+ data: CompressibleResponse {
+ enabled: true,
+ ctoken_count,
+ ata_count,
+ pda_count,
+ mint_count,
+ current_slot,
+ total_tracked,
+ total_ready,
+ total_waiting,
+ ctoken,
+ ata,
+ pda,
+ mint,
+ pda_programs: (!pda_programs.is_empty()).then_some(pda_programs),
+ upstreams: Some(upstreams),
+ note,
+ error: None,
+ refresh_interval_secs: Some(5),
+ },
+ source: "forester-apis".to_string(),
+ cached_at: now,
+ })
+}
+
#[derive(Clone)]
pub(crate) struct CompressibleTrackers {
pub ctoken: Option>,
@@ -98,11 +571,14 @@ pub struct CompressibleDashboardState {
/// Configuration for the HTTP API server.
pub struct ApiServerConfig {
+ pub run_id: Arc,
pub rpc_url: String,
pub port: u16,
pub allow_public_bind: bool,
pub compressible_state: Option,
pub prometheus_url: Option,
+ pub helius_rpc: bool,
+ pub forester_api_urls: Vec,
}
/// Default timeout for status endpoint in seconds
@@ -120,16 +596,23 @@ pub struct ApiServerHandle {
pub thread_handle: JoinHandle<()>,
/// Sender to trigger graceful shutdown
pub shutdown_tx: oneshot::Sender<()>,
+ pub run_id: Arc,
}
impl ApiServerHandle {
/// Trigger graceful shutdown and wait for the server to stop
pub fn shutdown(self) {
+ let run_id = self.run_id.clone();
// Send shutdown signal (ignore error if receiver already dropped)
let _ = self.shutdown_tx.send(());
// Wait for the thread to finish
if let Err(e) = self.thread_handle.join() {
- error!("API server thread panicked: {:?}", e);
+ error!(
+ event = "api_server_thread_panicked",
+ run_id = %run_id,
+ error = ?e,
+ "API server thread panicked"
+ );
}
}
}
@@ -142,6 +625,7 @@ impl ApiServerHandle {
pub(crate) async fn fetch_metrics_snapshot(
client: &reqwest::Client,
prometheus_url: &Option,
+ run_id: &str,
) -> MetricsSnapshot {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
@@ -170,7 +654,12 @@ pub(crate) async fn fetch_metrics_snapshot(
};
}
Err(e) => {
- warn!("Prometheus query failed: {}", e);
+ warn!(
+ event = "api_server_prometheus_query_failed",
+ run_id = %run_id,
+ error = %e,
+ "Prometheus query failed"
+ );
}
}
}
@@ -186,54 +675,235 @@ pub(crate) async fn fetch_metrics_snapshot(
/// Fetch compressible counts: try in-memory trackers first, then RPC.
pub(crate) async fn fetch_compressible_snapshot(
trackers: &Option,
+ forester_api_urls: &[String],
rpc_url: &str,
+ helius_rpc: bool,
+ run_id: &str,
) -> CompressibleSnapshot {
- use crate::compressible::traits::CompressibleTracker;
-
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64;
if let Some(ref t) = trackers {
+ let client = reqwest::Client::new();
+ let current_slot = match crate::compressible::bootstrap_helpers::get_current_slot(
+ &client, rpc_url,
+ )
+ .await
+ {
+ Ok(slot) => Some(slot),
+ Err(e) => {
+ warn!(
+ event = "api_server_compressible_slot_fetch_failed",
+ run_id = %run_id,
+ error = %e,
+ "Failed to fetch current slot for compressible readiness breakdown"
+ );
+ None
+ }
+ };
+ let (ctoken, ata) = if let Some(tracker) = t.ctoken.as_ref() {
+ let (ctoken_stats, ata_stats) = summarize_ctoken_and_ata_slots(tracker, current_slot);
+ (Some(ctoken_stats), Some(ata_stats))
+ } else {
+ (None, None)
+ };
+
+ let pda = t.pda.as_ref().map(|tracker| {
+ summarize_slots(
+ tracker
+ .accounts()
+ .iter()
+ .map(|entry| entry.value().compressible_slot()),
+ current_slot,
+ Some(tracker.total_compressed()),
+ )
+ });
+
+ let mint = t.mint.as_ref().map(|tracker| {
+ summarize_slots(
+ tracker
+ .accounts()
+ .iter()
+ .map(|entry| entry.value().compressible_slot()),
+ current_slot,
+ Some(tracker.total_compressed()),
+ )
+ });
+
+ let mut pda_programs: Option> = None;
+ if let Some(pda_tracker) = t.pda.as_ref() {
+ let mut by_program: HashMap> = HashMap::new();
+ for entry in pda_tracker.accounts().iter() {
+ let state = entry.value();
+ by_program
+ .entry(state.program_id.to_string())
+ .or_default()
+ .push(state.compressible_slot());
+ }
+ let mut rows: Vec = by_program
+ .into_iter()
+ .map(|(program_id, slots)| {
+ let stats = summarize_slots(slots, current_slot, None);
+ PdaProgramStats {
+ program_id,
+ tracked: stats.tracked,
+ compressed: None,
+ ready: stats.ready,
+ waiting: stats.waiting,
+ next_ready_slot: stats.next_ready_slot,
+ }
+ })
+ .collect();
+ rows.sort_by(|a, b| a.program_id.cmp(&b.program_id));
+ pda_programs = Some(rows);
+ }
+
+ let total_tracked = ctoken.as_ref().map(|s| s.tracked).unwrap_or(0)
+ + pda.as_ref().map(|s| s.tracked).unwrap_or(0)
+ + mint.as_ref().map(|s| s.tracked).unwrap_or(0);
+
+ let (total_ready, total_waiting, note) = if current_slot.is_some() {
+ (
+ Some(
+ ctoken.as_ref().and_then(|s| s.ready).unwrap_or(0)
+ + pda.as_ref().and_then(|s| s.ready).unwrap_or(0)
+ + mint.as_ref().and_then(|s| s.ready).unwrap_or(0),
+ ),
+ Some(
+ ctoken.as_ref().and_then(|s| s.waiting).unwrap_or(0)
+ + pda.as_ref().and_then(|s| s.waiting).unwrap_or(0)
+ + mint.as_ref().and_then(|s| s.waiting).unwrap_or(0),
+ ),
+ None,
+ )
+ } else {
+ (
+ None,
+ None,
+ Some(
+ "Current slot is unavailable; readiness breakdown is temporarily unknown."
+ .to_string(),
+ ),
+ )
+ };
+
return CompressibleSnapshot {
data: CompressibleResponse {
enabled: true,
- ctoken_count: t.ctoken.as_ref().map(|tr| tr.len()),
- pda_count: t.pda.as_ref().map(|tr| tr.len()),
- mint_count: t.mint.as_ref().map(|tr| tr.len()),
+ ctoken_count: ctoken.as_ref().map(|s| s.tracked),
+ ata_count: ata.as_ref().map(|s| s.tracked),
+ pda_count: pda.as_ref().map(|s| s.tracked),
+ mint_count: mint.as_ref().map(|s| s.tracked),
+ current_slot,
+ total_tracked: Some(total_tracked),
+ total_ready,
+ total_waiting,
+ ctoken,
+ ata,
+ pda,
+ mint,
+ pda_programs,
+ upstreams: None,
+ note,
+ error: None,
+ refresh_interval_secs: Some(5),
},
source: "tracker".to_string(),
cached_at: now,
};
}
+ if let Some(snapshot) =
+ fetch_compressible_snapshot_from_foresters(forester_api_urls, run_id).await
+ {
+ return snapshot;
+ }
+
// Standalone mode: RPC with timeout
let fetch_result = tokio::time::timeout(
COMPRESSIBLE_FETCH_TIMEOUT,
- crate::compressible::count_compressible_accounts(rpc_url),
+ crate::compressible::count_compressible_accounts(rpc_url, helius_rpc),
)
.await;
match fetch_result {
- Ok(Ok((ctoken_count, mint_count))) => CompressibleSnapshot {
- data: CompressibleResponse {
- enabled: true,
- ctoken_count: Some(ctoken_count),
- pda_count: None,
- mint_count: Some(mint_count),
- },
- source: "rpc".to_string(),
- cached_at: now,
- },
+ Ok(Ok((ctoken_count, mint_count))) => {
+ let ctoken = CompressibleTypeStats {
+ tracked: ctoken_count,
+ compressed: None,
+ ready: None,
+ waiting: None,
+ next_ready_slot: None,
+ };
+ let mint = CompressibleTypeStats {
+ tracked: mint_count,
+ compressed: None,
+ ready: None,
+ waiting: None,
+ next_ready_slot: None,
+ };
+ let client = reqwest::Client::new();
+ let current_slot =
+ crate::compressible::bootstrap_helpers::get_current_slot(&client, rpc_url)
+ .await
+ .ok();
+
+ CompressibleSnapshot {
+ data: CompressibleResponse {
+ enabled: true,
+ ctoken_count: Some(ctoken_count),
+ ata_count: None,
+ pda_count: None,
+ mint_count: Some(mint_count),
+ current_slot,
+ total_tracked: Some(ctoken_count + mint_count),
+ total_ready: None,
+ total_waiting: None,
+ ctoken: Some(ctoken),
+ ata: None,
+ pda: None,
+ mint: Some(mint),
+ pda_programs: None,
+ upstreams: None,
+ note: Some(
+ "Standalone API mode: readiness and PDA breakdown require in-memory trackers inside this process, or --forester-api-url upstream(s).".to_string(),
+ ),
+ error: None,
+ refresh_interval_secs: Some(30),
+ },
+ source: "rpc".to_string(),
+ cached_at: now,
+ }
+ }
Ok(Err(e)) => {
- warn!("RPC compressible count failed: {}", e);
+ warn!(
+ event = "api_server_compressible_count_failed",
+ run_id = %run_id,
+ error = %e,
+ "RPC compressible count failed"
+ );
CompressibleSnapshot {
data: CompressibleResponse {
enabled: false,
ctoken_count: None,
+ ata_count: None,
pda_count: None,
mint_count: None,
+ current_slot: None,
+ total_tracked: None,
+ total_ready: None,
+ total_waiting: None,
+ ctoken: None,
+ ata: None,
+ pda: None,
+ mint: None,
+ pda_programs: None,
+ upstreams: None,
+ note: None,
+ error: Some(format!("RPC compressible count failed: {}", e)),
+ refresh_interval_secs: Some(30),
},
source: "none".to_string(),
cached_at: now,
@@ -241,15 +911,34 @@ pub(crate) async fn fetch_compressible_snapshot(
}
Err(_) => {
warn!(
- "Compressible count timed out after {}s",
- COMPRESSIBLE_FETCH_TIMEOUT.as_secs()
+ event = "api_server_compressible_count_timeout",
+ run_id = %run_id,
+ timeout_seconds = COMPRESSIBLE_FETCH_TIMEOUT.as_secs(),
+ "Compressible count timed out"
);
CompressibleSnapshot {
data: CompressibleResponse {
enabled: false,
ctoken_count: None,
+ ata_count: None,
pda_count: None,
mint_count: None,
+ current_slot: None,
+ total_tracked: None,
+ total_ready: None,
+ total_waiting: None,
+ ctoken: None,
+ ata: None,
+ pda: None,
+ mint: None,
+ pda_programs: None,
+ upstreams: None,
+ note: None,
+ error: Some(format!(
+ "Compressible count timed out after {} seconds",
+ COMPRESSIBLE_FETCH_TIMEOUT.as_secs()
+ )),
+ refresh_interval_secs: Some(30),
},
source: "none".to_string(),
cached_at: now,
@@ -268,9 +957,10 @@ async fn run_metrics_provider(
client: reqwest::Client,
prometheus_url: Option,
mut shutdown: broadcast::Receiver<()>,
+ run_id: Arc,
) {
loop {
- let snapshot = fetch_metrics_snapshot(&client, &prometheus_url).await;
+ let snapshot = fetch_metrics_snapshot(&client, &prometheus_url, run_id.as_ref()).await;
if tx.send(snapshot).is_err() {
break; // all receivers dropped
}
@@ -279,25 +969,39 @@ async fn run_metrics_provider(
_ = shutdown.recv() => break,
}
}
- info!("Metrics provider stopped");
+ info!(
+ event = "api_server_metrics_provider_stopped",
+ run_id = %run_id,
+ "Metrics provider stopped"
+ );
}
/// Periodically fetches compressible counts and publishes via watch channel.
async fn run_compressible_provider(
tx: watch::Sender,
trackers: Option,
+ forester_api_urls: Vec,
rpc_url: String,
mut shutdown: broadcast::Receiver<()>,
+ helius_rpc: bool,
+ run_id: Arc,
) {
- // In-memory trackers are cheap (.len()); RPC is expensive (getProgramAccounts)
- let interval = if trackers.is_some() {
+ // In-memory trackers and upstream forester APIs are cheap compared to full RPC scans.
+ let interval = if trackers.is_some() || !forester_api_urls.is_empty() {
Duration::from_secs(5)
} else {
Duration::from_secs(30)
};
loop {
- let snapshot = fetch_compressible_snapshot(&trackers, &rpc_url).await;
+ let snapshot = fetch_compressible_snapshot(
+ &trackers,
+ &forester_api_urls,
+ &rpc_url,
+ helius_rpc,
+ run_id.as_ref(),
+ )
+ .await;
if tx.send(snapshot).is_err() {
break;
}
@@ -306,7 +1010,11 @@ async fn run_compressible_provider(
_ = shutdown.recv() => break,
}
}
- info!("Compressible provider stopped");
+ info!(
+ event = "api_server_compressible_provider_stopped",
+ run_id = %run_id,
+ "Compressible provider stopped"
+ );
}
// ---------------------------------------------------------------------------
@@ -319,26 +1027,40 @@ async fn run_compressible_provider(
/// An `ApiServerHandle` that can be used to trigger graceful shutdown
pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle {
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
+ let run_id_for_handle = config.run_id.clone();
let thread_handle = std::thread::spawn(move || {
+ let run_id = config.run_id.clone();
let rt = match tokio::runtime::Runtime::new() {
Ok(rt) => rt,
Err(e) => {
- error!("Failed to create tokio runtime for API server: {}", e);
+ error!(
+ event = "api_server_runtime_create_failed",
+ run_id = %run_id,
+ error = %e,
+ "Failed to create tokio runtime for API server"
+ );
return;
}
};
rt.block_on(async move {
let addr = if config.allow_public_bind {
warn!(
- "API server binding to 0.0.0.0:{} - endpoints will be publicly accessible",
- config.port
+ event = "api_server_public_bind_enabled",
+ run_id = %run_id,
+ port = config.port,
+ "API server binding to 0.0.0.0; endpoints will be publicly accessible"
);
SocketAddr::from(([0, 0, 0, 0], config.port))
} else {
SocketAddr::from(([127, 0, 0, 1], config.port))
};
- info!("Starting HTTP API server on {}", addr);
+ info!(
+ event = "api_server_started",
+ run_id = %run_id,
+ address = %addr,
+ "Starting HTTP API server"
+ );
// Shared HTTP client with timeout for external requests (Prometheus)
let http_client = reqwest::Client::builder()
@@ -369,13 +1091,17 @@ pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle {
http_client.clone(),
config.prometheus_url.clone(),
provider_shutdown_tx.subscribe(),
+ run_id.clone(),
));
tokio::spawn(run_compressible_provider(
compressible_tx,
trackers,
+ config.forester_api_urls.clone(),
config.rpc_url.clone(),
provider_shutdown_tx.subscribe(),
+ config.helius_rpc,
+ run_id.clone(),
));
let cors = warp::cors()
@@ -391,8 +1117,10 @@ pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle {
// --- Status route (unchanged — per-request RPC call) ---
let rpc_url_for_status = config.rpc_url.clone();
+ let run_id_for_status = run_id.clone();
let status_route = warp::path("status").and(warp::get()).and_then(move || {
let rpc_url = rpc_url_for_status.clone();
+ let run_id = run_id_for_status.clone();
async move {
let timeout_duration = Duration::from_secs(STATUS_TIMEOUT_SECS);
match tokio::time::timeout(timeout_duration, get_forester_status(&rpc_url))
@@ -403,7 +1131,12 @@ pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle {
warp::http::StatusCode::OK,
)),
Ok(Err(e)) => {
- error!("Failed to get forester status: {:?}", e);
+ error!(
+ event = "api_server_status_fetch_failed",
+ run_id = %run_id,
+ error = ?e,
+ "Failed to get forester status"
+ );
let error_response = ErrorResponse {
error: format!("Failed to get forester status: {}", e),
};
@@ -414,8 +1147,10 @@ pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle {
}
Err(_elapsed) => {
error!(
- "Forester status request timed out after {}s",
- STATUS_TIMEOUT_SECS
+ event = "api_server_status_timeout",
+ run_id = %run_id,
+ timeout_seconds = STATUS_TIMEOUT_SECS,
+ "Forester status request timed out"
);
let error_response = ErrorResponse {
error: format!(
@@ -453,21 +1188,33 @@ pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle {
warp::serve(routes)
.bind(addr)
.await
- .graceful(async move {
- let _ = shutdown_rx.await;
- info!("API server received shutdown signal");
- // Signal providers to stop
- let _ = provider_shutdown_tx.send(());
+ .graceful({
+ let run_id_for_shutdown = run_id.clone();
+ async move {
+ let _ = shutdown_rx.await;
+ info!(
+ event = "api_server_shutdown_signal_received",
+ run_id = %run_id_for_shutdown,
+ "API server received shutdown signal"
+ );
+ // Signal providers to stop
+ let _ = provider_shutdown_tx.send(());
+ }
})
.run()
.await;
- info!("API server shut down gracefully");
+ info!(
+ event = "api_server_stopped",
+ run_id = %run_id,
+ "API server shut down gracefully"
+ );
});
});
ApiServerHandle {
thread_handle,
shutdown_tx,
+ run_id: run_id_for_handle,
}
}
diff --git a/forester/src/cli.rs b/forester/src/cli.rs
index 8da994cc17..41da7cf56b 100644
--- a/forester/src/cli.rs
+++ b/forester/src/cli.rs
@@ -36,7 +36,7 @@ pub struct StartArgs {
env = "INDEXER_URL",
help = "Photon indexer URL. API key can be included as query param: https://host?api-key=KEY"
)]
- pub indexer_url: Option,
+ pub indexer_url: String,
#[arg(long, env = "PROVER_URL")]
pub prover_url: Option,
@@ -270,6 +270,14 @@ pub struct StartArgs {
)]
pub api_server_public_bind: bool,
+ #[arg(
+ long,
+ env = "HELIUS_RPC",
+ help = "Use Helius getProgramAccountsV2 for compressible account queries (default: standard getProgramAccounts)",
+ default_value = "false"
+ )]
+ pub helius_rpc: bool,
+
#[arg(
long,
env = "GROUP_AUTHORITY",
@@ -381,6 +389,14 @@ pub struct DashboardArgs {
help = "Prometheus server URL for querying metrics (e.g. http://prometheus:9090)"
)]
pub prometheus_url: Option,
+
+ #[arg(
+ long = "forester-api-url",
+ env = "FORESTER_API_URLS",
+ value_delimiter = ',',
+ help = "Forester API base URL(s) to aggregate compressible data from (e.g. http://forester-a:8080,http://forester-b:8080)"
+ )]
+ pub forester_api_urls: Vec,
}
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
@@ -438,6 +454,7 @@ mod tests {
"forester",
"--processor-mode", "v1",
"--rpc-url", "http://test.com",
+ "--indexer-url", "http://indexer.test.com",
"--payer", "[1,2,3]",
"--derivation", "[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]"
]).unwrap();
@@ -448,6 +465,7 @@ mod tests {
"forester",
"--processor-mode", "v2",
"--rpc-url", "http://test.com",
+ "--indexer-url", "http://indexer.test.com",
"--payer", "[1,2,3]",
"--derivation", "[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]"
]).unwrap();
@@ -457,6 +475,7 @@ mod tests {
let args = StartArgs::try_parse_from([
"forester",
"--rpc-url", "http://test.com",
+ "--indexer-url", "http://indexer.test.com",
"--payer", "[1,2,3]",
"--derivation", "[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]"
]).unwrap();
@@ -467,6 +486,7 @@ mod tests {
"forester",
"--processor-mode", "invalid-mode",
"--rpc-url", "http://test.com",
+ "--indexer-url", "http://indexer.test.com",
"--payer", "[1,2,3]",
"--derivation", "[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]"
]);
diff --git a/forester/src/compressible/bootstrap_helpers.rs b/forester/src/compressible/bootstrap_helpers.rs
index 625aa97bf7..e6f622f915 100644
--- a/forester/src/compressible/bootstrap_helpers.rs
+++ b/forester/src/compressible/bootstrap_helpers.rs
@@ -227,6 +227,13 @@ pub fn is_localhost(rpc_url: &str) -> bool {
rpc_url.contains("localhost") || rpc_url.contains("127.0.0.1")
}
+/// Whether to use Helius `getProgramAccountsV2` instead of standard
+/// `getProgramAccounts`. Returns `true` only when `--helius-rpc` CLI flag
+/// is set **and** the URL is not localhost.
+pub fn use_helius_rpc(rpc_url: &str, helius_rpc_flag: bool) -> bool {
+ helius_rpc_flag && !is_localhost(rpc_url)
+}
+
/// Generic bootstrap using standard getProgramAccounts API
///
/// Calls `process_fn` for each account that passes initial extraction.
@@ -390,6 +397,7 @@ pub async fn run_bootstrap(
shutdown_rx: Option>,
process_fn: F,
label: &str,
+ helius_rpc: bool,
) -> Result
where
F: FnMut(RawAccountData) -> bool,
@@ -415,8 +423,8 @@ where
label, program_id
);
- let result = if is_localhost(rpc_url) {
- debug!("Detected localhost, using standard getProgramAccounts");
+ let result = if !use_helius_rpc(rpc_url, helius_rpc) {
+ debug!("Using standard getProgramAccounts");
let api_result = bootstrap_standard_api(
&client,
rpc_url,
@@ -489,13 +497,14 @@ pub async fn count_program_accounts(
rpc_url: &str,
program_id: &Pubkey,
filters: Option>,
+ helius_rpc: bool,
) -> Result {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {}", e))?;
- if is_localhost(rpc_url) {
+ if !use_helius_rpc(rpc_url, helius_rpc) {
let mut params = json!({
"encoding": "base64",
"commitment": "confirmed",
@@ -563,7 +572,10 @@ pub async fn count_program_accounts(
/// Uses `count_program_accounts` with `dataSlice` to minimize
/// bandwidth. Both queries run concurrently. Errors are propagated so callers
/// can distinguish "0 accounts" from "RPC failure".
-pub async fn count_compressible_accounts(rpc_url: &str) -> Result<(usize, usize)> {
+pub async fn count_compressible_accounts(
+ rpc_url: &str,
+ helius_rpc: bool,
+) -> Result<(usize, usize)> {
let program_id = Pubkey::new_from_array(light_token_interface::LIGHT_TOKEN_PROGRAM_ID);
let ctoken_filters = vec![json!({"memcmp": {
@@ -579,8 +591,8 @@ pub async fn count_compressible_accounts(rpc_url: &str) -> Result<(usize, usize)
}})];
let (ctoken_result, mint_result) = tokio::join!(
- count_program_accounts(rpc_url, &program_id, Some(ctoken_filters)),
- count_program_accounts(rpc_url, &program_id, Some(mint_filters)),
+ count_program_accounts(rpc_url, &program_id, Some(ctoken_filters), helius_rpc),
+ count_program_accounts(rpc_url, &program_id, Some(mint_filters), helius_rpc),
);
Ok((ctoken_result?, mint_result?))
diff --git a/forester/src/compressible/ctoken/bootstrap.rs b/forester/src/compressible/ctoken/bootstrap.rs
index caf03b79d9..a85e3e28b2 100644
--- a/forester/src/compressible/ctoken/bootstrap.rs
+++ b/forester/src/compressible/ctoken/bootstrap.rs
@@ -11,7 +11,7 @@ use super::state::CTokenAccountTracker;
use crate::{
compressible::{
bootstrap_helpers::{
- bootstrap_standard_api, bootstrap_v2_api, is_localhost, RawAccountData,
+ bootstrap_standard_api, bootstrap_v2_api, use_helius_rpc, RawAccountData,
},
config::{ACCOUNT_TYPE_OFFSET, CTOKEN_ACCOUNT_TYPE_FILTER},
},
@@ -24,6 +24,7 @@ pub async fn bootstrap_ctoken_accounts(
rpc_url: String,
tracker: Arc,
shutdown_rx: Option>,
+ helius_rpc: bool,
) -> Result<()> {
info!("Starting bootstrap of CToken accounts");
@@ -88,8 +89,8 @@ pub async fn bootstrap_ctoken_accounts(
true
};
- if is_localhost(&rpc_url) {
- info!("Detected localhost, using standard getProgramAccounts");
+ if !use_helius_rpc(&rpc_url, helius_rpc) {
+ info!("Using standard getProgramAccounts");
let (total_fetched, total_inserted) = bootstrap_standard_api(
&client,
&rpc_url,
diff --git a/forester/src/compressible/ctoken/compressor.rs b/forester/src/compressible/ctoken/compressor.rs
index 79d9898927..1aa8a864d4 100644
--- a/forester/src/compressible/ctoken/compressor.rs
+++ b/forester/src/compressible/ctoken/compressor.rs
@@ -17,10 +17,13 @@ use solana_sdk::{
signature::{Keypair, Signature},
signer::Signer,
};
-use tracing::{debug, info};
+use tracing::debug;
use super::{state::CTokenAccountTracker, types::CTokenAccountState};
-use crate::{compressible::traits::CompressibleTracker, Result};
+use crate::{
+ compressible::traits::{send_and_confirm_with_tracking, CompressibleTracker},
+ Result,
+};
/// Compression executor for CToken accounts via the registry program's compress_and_close instruction.
pub struct CTokenCompressor {
@@ -79,9 +82,38 @@ impl CTokenCompressor {
debug!("Compressible config: {}", compressible_config);
- // Get output tree from RPC
let mut rpc = self.rpc_pool.get_connection().await?;
+ // Pre-check: filter out accounts that no longer exist on-chain
+ let all_pubkeys: Vec = account_states.iter().map(|a| a.pubkey).collect();
+ let on_chain_accounts = rpc
+ .get_multiple_accounts(&all_pubkeys)
+ .await
+ .map_err(|e| anyhow::anyhow!("Failed to pre-check accounts: {:?}", e))?;
+
+ let account_states: Vec<&CTokenAccountState> = account_states
+ .iter()
+ .zip(on_chain_accounts.iter())
+ .filter_map(|(state, on_chain)| {
+ if on_chain.is_some() {
+ Some(state)
+ } else {
+ debug!(
+ "CToken account {} no longer exists on-chain, removing from tracker",
+ state.pubkey
+ );
+ self.tracker.remove(&state.pubkey);
+ None
+ }
+ })
+ .collect();
+
+ if account_states.is_empty() {
+ return Err(anyhow::anyhow!(
+ "All accounts already closed, nothing to compress"
+ ));
+ }
+
// Fetch latest active state trees and get a random one
rpc.get_latest_active_state_trees()
.await
@@ -102,7 +134,7 @@ impl CTokenCompressor {
let mut indices_vec = Vec::with_capacity(account_states.len());
- for account_state in account_states {
+ for account_state in &account_states {
let source_index = packed_accounts.insert_or_get(account_state.pubkey);
// Convert mint from light_compressed_account::Pubkey to solana_sdk::Pubkey
@@ -211,41 +243,16 @@ impl CTokenCompressor {
data: instruction.data(),
};
- // Send transaction
- let signature = rpc
- .create_and_send_transaction(
- &[ix],
- &self.payer_keypair.pubkey(),
- &[&self.payer_keypair],
- )
- .await
- .map_err(|e| anyhow::anyhow!("Failed to send transaction: {}", e))?;
-
- info!(
- "compress_and_close tx with ({:?}) accounts sent {}",
- account_states.iter().map(|a| a.pubkey.to_string()),
- signature
- );
-
- // Wait for confirmation before removing from tracker
- let confirmed = rpc
- .confirm_transaction(signature)
- .await
- .map_err(|e| anyhow::anyhow!("Failed to confirm transaction: {}", e))?;
-
- if confirmed {
- // Only remove from tracker after confirmed
- for account_state in account_states {
- self.tracker.remove(&account_state.pubkey);
- }
- info!("compress_and_close tx confirmed: {}", signature);
- Ok(signature)
- } else {
- // Transaction not confirmed - keep accounts in tracker for retry
- Err(anyhow::anyhow!(
- "compress_and_close tx not confirmed: {} - accounts kept in tracker for retry",
- signature
- ))
- }
+ let pubkeys: Vec = account_states.iter().map(|a| a.pubkey).collect();
+
+ send_and_confirm_with_tracking(
+ &mut *rpc,
+ &[ix],
+ &self.payer_keypair,
+ &*self.tracker,
+ &pubkeys,
+ "compress_and_close",
+ )
+ .await
}
}
diff --git a/forester/src/compressible/ctoken/state.rs b/forester/src/compressible/ctoken/state.rs
index 5dbc5b9961..1c71e785f3 100644
--- a/forester/src/compressible/ctoken/state.rs
+++ b/forester/src/compressible/ctoken/state.rs
@@ -1,5 +1,7 @@
+use std::sync::atomic::AtomicU64;
+
use borsh::BorshDeserialize;
-use dashmap::DashMap;
+use dashmap::{DashMap, DashSet};
use light_compressible::rent::{get_rent_exemption_lamports, SLOTS_PER_EPOCH};
use light_token_interface::state::Token;
use solana_sdk::pubkey::Pubkey;
@@ -44,12 +46,16 @@ fn calculate_compressible_slot(account: &Token, lamports: u64, account_size: usi
#[derive(Debug)]
pub struct CTokenAccountTracker {
accounts: DashMap,
+ compressed_count: AtomicU64,
+ pending: DashSet,
}
impl CTokenAccountTracker {
pub fn new() -> Self {
Self {
accounts: DashMap::new(),
+ compressed_count: AtomicU64::new(0),
+ pending: DashSet::new(),
}
}
@@ -101,11 +107,29 @@ impl CTokenAccountTracker {
}
};
+ let is_ata = {
+ let owner = Pubkey::new_from_array(ctoken.owner.to_bytes());
+ let mint = Pubkey::new_from_array(ctoken.mint.to_bytes());
+ let light_token_program_id =
+ Pubkey::new_from_array(light_token_interface::LIGHT_TOKEN_PROGRAM_ID);
+ let expected_ata = Pubkey::find_program_address(
+ &[
+ owner.as_ref(),
+ light_token_program_id.as_ref(),
+ mint.as_ref(),
+ ],
+ &light_token_program_id,
+ )
+ .0;
+ pubkey == expected_ata
+ };
+
let state = CTokenAccountState {
pubkey,
account: ctoken,
lamports,
compressible_slot,
+ is_ata,
};
debug!(
@@ -126,11 +150,23 @@ impl CompressibleTracker for CTokenAccountTracker {
fn accounts(&self) -> &DashMap {
&self.accounts
}
+
+ fn compressed_counter(&self) -> &AtomicU64 {
+ &self.compressed_count
+ }
+
+ fn pending(&self) -> &DashSet {
+ &self.pending
+ }
}
impl Default for CTokenAccountTracker {
fn default() -> Self {
- Self::new()
+ Self {
+ accounts: DashMap::new(),
+ compressed_count: AtomicU64::new(0),
+ pending: DashSet::new(),
+ }
}
}
@@ -142,7 +178,27 @@ impl SubscriptionHandler for CTokenAccountTracker {
data: &[u8],
lamports: u64,
) -> Result<()> {
- self.update_from_account(pubkey, data, lamports)
+ // If account data is empty (account was closed), remove from tracker
+ if data.is_empty() {
+ if self.remove(&pubkey).is_some() {
+ debug!("Removed closed ctoken account {} from tracker", pubkey);
+ }
+ return Ok(());
+ }
+ match self.update_from_account(pubkey, data, lamports) {
+ Ok(()) => Ok(()),
+ Err(e) => {
+ // Deserialization failed — account is no longer a valid cToken,
+ // remove stale entry from tracker
+ if self.remove(&pubkey).is_some() {
+ warn!(
+ "Removed invalid ctoken account {} from tracker: {}",
+ pubkey, e
+ );
+ }
+ Ok(())
+ }
+ }
}
fn handle_removal(&self, pubkey: &Pubkey) {
diff --git a/forester/src/compressible/ctoken/types.rs b/forester/src/compressible/ctoken/types.rs
index 21ab03d3d1..5eb9de4132 100644
--- a/forester/src/compressible/ctoken/types.rs
+++ b/forester/src/compressible/ctoken/types.rs
@@ -10,6 +10,8 @@ pub struct CTokenAccountState {
pub lamports: u64,
/// Ready to compress when current_slot > compressible_slot
pub compressible_slot: u64,
+ /// Whether this account is an ATA (computed once at insert time).
+ pub is_ata: bool,
}
impl CompressibleState for CTokenAccountState {
diff --git a/forester/src/compressible/mint/bootstrap.rs b/forester/src/compressible/mint/bootstrap.rs
index 104c8dd00c..27a55b059e 100644
--- a/forester/src/compressible/mint/bootstrap.rs
+++ b/forester/src/compressible/mint/bootstrap.rs
@@ -18,6 +18,7 @@ pub async fn bootstrap_mint_accounts(
rpc_url: String,
tracker: Arc,
shutdown_rx: Option>,
+ helius_rpc: bool,
) -> Result<()> {
// Light Token Program ID
let program_id =
@@ -50,6 +51,7 @@ pub async fn bootstrap_mint_accounts(
shutdown_rx,
process_account,
"Mint",
+ helius_rpc,
)
.await?;
diff --git a/forester/src/compressible/mint/compressor.rs b/forester/src/compressible/mint/compressor.rs
index 1c2cd6e317..8fb65d4605 100644
--- a/forester/src/compressible/mint/compressor.rs
+++ b/forester/src/compressible/mint/compressor.rs
@@ -10,13 +10,20 @@ use futures::StreamExt;
use light_client::{indexer::Indexer, rpc::Rpc};
use solana_sdk::{
instruction::Instruction,
+ pubkey::Pubkey,
signature::{Keypair, Signature},
signer::Signer,
};
use tracing::{debug, info};
use super::{state::MintAccountTracker, types::MintAccountState};
-use crate::{compressible::traits::CompressibleTracker, Result};
+use crate::{
+ compressible::traits::{
+ send_and_confirm_with_tracking, verify_transaction_execution, Cancelled,
+ CompressibleTracker,
+ },
+ Result,
+};
/// Compressor for decompressed Mint accounts - builds and sends CompressAndCloseMint transactions.
pub struct MintCompressor {
@@ -99,51 +106,18 @@ impl MintCompressor {
instructions.len()
);
- // Send all instructions in a single transaction
- let mut rpc = self.rpc_pool.get_connection().await?;
- let signature = rpc
- .create_and_send_transaction(
- &instructions,
- &self.payer_keypair.pubkey(),
- &[&self.payer_keypair],
- )
- .await
- .map_err(|e| {
- anyhow::anyhow!(
- "Failed to send batched CompressAndCloseMint transaction: {:?}",
- e
- )
- })?;
+ let pubkeys: Vec = mint_states.iter().map(|s| s.pubkey).collect();
- info!(
- "Batched CompressAndCloseMint tx for {} mints sent: {}",
- mint_states.len(),
- signature
- );
-
- // Wait for confirmation before removing from tracker
- let confirmed = rpc
- .confirm_transaction(signature)
- .await
- .map_err(|e| anyhow::anyhow!("Failed to confirm transaction: {:?}", e))?;
-
- if confirmed {
- // Only remove from tracker after confirmed
- for mint_state in mint_states {
- self.tracker.remove(&mint_state.pubkey);
- }
- info!("Batched CompressAndCloseMint tx confirmed: {}", signature);
- Ok(signature)
- } else {
- tracing::warn!(
- "Batch CompressAndCloseMint tx not confirmed: {} - accounts kept in tracker for retry",
- signature
- );
- Err(anyhow::anyhow!(
- "Batch CompressAndCloseMint tx not confirmed: {}",
- signature
- ))
- }
+ let mut rpc = self.rpc_pool.get_connection().await?;
+ send_and_confirm_with_tracking(
+ &mut *rpc,
+ &instructions,
+ &self.payer_keypair,
+ &*self.tracker,
+ &pubkeys,
+ "CompressAndCloseMint",
+ )
+ .await
}
/// Compress a batch of decompressed Mint accounts with concurrent execution.
@@ -171,6 +145,10 @@ impl MintCompressor {
.collect();
}
+ // Mark all as pending upfront
+ let all_pubkeys: Vec = mint_states.iter().map(|s| s.pubkey).collect();
+ self.tracker.mark_pending(&all_pubkeys);
+
// Create futures for each mint
let compression_futures = mint_states.iter().cloned().map(|mint_state| {
let compressor = self.clone();
@@ -178,7 +156,8 @@ impl MintCompressor {
async move {
// Check cancellation before processing
if cancelled.load(Ordering::Relaxed) {
- return Err((mint_state, anyhow::anyhow!("Cancelled")));
+ compressor.tracker.unmark_pending(&[mint_state.pubkey]);
+ return Err((mint_state, Cancelled.into()));
}
match compressor.compress(&mint_state).await {
@@ -194,9 +173,16 @@ impl MintCompressor {
.collect()
.await;
- // Remove successfully compressed mints from tracker
- for (_, mint_state) in results.iter().flatten() {
- self.tracker.remove(&mint_state.pubkey);
+ // Remove successfully compressed mints; unmark failed ones
+ for result in &results {
+ match result {
+ Ok((_, mint_state)) => {
+ self.tracker.remove_compressed(&mint_state.pubkey);
+ }
+ Err((mint_state, _)) => {
+ self.tracker.unmark_pending(&[mint_state.pubkey]);
+ }
+ }
}
results
@@ -215,8 +201,24 @@ impl MintCompressor {
let mut rpc = self.rpc_pool.get_connection().await?;
+ // Pre-check: verify the Mint PDA still exists on-chain to avoid no-op txs
+ let account_info = rpc
+ .get_account(*mint_pda)
+ .await
+ .map_err(|e| anyhow::anyhow!("Failed to check Mint PDA {}: {:?}", mint_pda, e))?;
+ if account_info.is_none() {
+ debug!(
+ "Mint PDA {} no longer exists on-chain, removing from tracker",
+ mint_pda
+ );
+ self.tracker.remove(mint_pda);
+ return Err(anyhow::anyhow!(
+ "Mint PDA {} already closed, skipping",
+ mint_pda
+ ));
+ }
+
// Build the CompressAndCloseMint instruction
- // This is idempotent - succeeds silently if mint doesn't exist or is already compressed
let ix = create_compress_and_close_mint_instruction(
&mut *rpc,
self.payer_keypair.pubkey(),
@@ -258,6 +260,8 @@ impl MintCompressor {
.map_err(|e| anyhow::anyhow!("Failed to confirm transaction: {:?}", e))?;
if confirmed {
+ verify_transaction_execution(&*rpc, signature).await?;
+
info!("CompressAndCloseMint tx for Mint {} confirmed", mint_pda);
Ok(signature)
} else {
diff --git a/forester/src/compressible/mint/state.rs b/forester/src/compressible/mint/state.rs
index 4ddebb4847..cca5a0b6fe 100644
--- a/forester/src/compressible/mint/state.rs
+++ b/forester/src/compressible/mint/state.rs
@@ -1,5 +1,7 @@
+use std::sync::atomic::AtomicU64;
+
use borsh::BorshDeserialize;
-use dashmap::DashMap;
+use dashmap::{DashMap, DashSet};
use light_compressible::rent::{
get_last_funded_epoch, get_rent_exemption_lamports, SLOTS_PER_EPOCH,
};
@@ -35,13 +37,13 @@ fn calculate_compressible_slot(mint: &Mint, lamports: u64, account_size: usize)
#[derive(Debug)]
pub struct MintAccountTracker {
accounts: DashMap,
+ compressed_count: AtomicU64,
+ pending: DashSet,
}
impl MintAccountTracker {
pub fn new() -> Self {
- Self {
- accounts: DashMap::new(),
- }
+ Self::default()
}
pub fn update_from_account(
@@ -69,7 +71,13 @@ impl MintAccountTracker {
};
if !mint.metadata.mint_decompressed {
- debug!("Mint {} is not decompressed, skipping", pubkey);
+ // Mint was compressed — remove stale entry from tracker if present
+ if self.remove(&pubkey).is_some() {
+ debug!(
+ "Mint {} no longer decompressed, removed from tracker",
+ pubkey
+ );
+ }
return Ok(());
}
@@ -120,11 +128,23 @@ impl CompressibleTracker for MintAccountTracker {
fn accounts(&self) -> &DashMap {
&self.accounts
}
+
+ fn compressed_counter(&self) -> &AtomicU64 {
+ &self.compressed_count
+ }
+
+ fn pending(&self) -> &DashSet {
+ &self.pending
+ }
}
impl Default for MintAccountTracker {
fn default() -> Self {
- Self::new()
+ Self {
+ accounts: DashMap::new(),
+ compressed_count: AtomicU64::new(0),
+ pending: DashSet::new(),
+ }
}
}
@@ -136,6 +156,13 @@ impl SubscriptionHandler for MintAccountTracker {
data: &[u8],
lamports: u64,
) -> Result<()> {
+ // If account data is empty (account was closed), remove from tracker
+ if data.is_empty() {
+ if self.remove(&pubkey).is_some() {
+ debug!("Removed closed Mint account {} from tracker", pubkey);
+ }
+ return Ok(());
+ }
self.update_from_account(pubkey, data, lamports)
}
diff --git a/forester/src/compressible/pda/bootstrap.rs b/forester/src/compressible/pda/bootstrap.rs
index 0142b56570..c2000070b8 100644
--- a/forester/src/compressible/pda/bootstrap.rs
+++ b/forester/src/compressible/pda/bootstrap.rs
@@ -7,7 +7,7 @@ use super::state::PdaAccountTracker;
use crate::{
compressible::{
bootstrap_helpers::{
- bootstrap_standard_api, bootstrap_v2_api, is_localhost, RawAccountData,
+ bootstrap_standard_api, bootstrap_v2_api, use_helius_rpc, RawAccountData,
},
config::PdaProgramConfig,
traits::CompressibleTracker,
@@ -20,6 +20,7 @@ pub async fn bootstrap_pda_accounts(
rpc_url: String,
tracker: Arc,
shutdown_rx: Option>,
+ helius_rpc: bool,
) -> Result<()> {
info!("Starting bootstrap of compressible PDA accounts");
@@ -54,8 +55,15 @@ pub async fn bootstrap_pda_accounts(
program_config.program_id
);
- let result =
- bootstrap_program(&client, &rpc_url, &tracker, &program_config, &shutdown_flag).await;
+ let result = bootstrap_program(
+ &client,
+ &rpc_url,
+ &tracker,
+ &program_config,
+ &shutdown_flag,
+ helius_rpc,
+ )
+ .await;
if let Err(e) = result {
error!(
@@ -81,6 +89,7 @@ async fn bootstrap_program(
tracker: &PdaAccountTracker,
program_config: &PdaProgramConfig,
shutdown_flag: &std::sync::atomic::AtomicBool,
+ helius_rpc: bool,
) -> Result<()> {
let program_id = &program_config.program_id;
@@ -108,7 +117,7 @@ async fn bootstrap_program(
}
})]);
- if is_localhost(rpc_url) {
+ if !use_helius_rpc(rpc_url, helius_rpc) {
let (total_fetched, total_inserted) = bootstrap_standard_api(
client,
rpc_url,
diff --git a/forester/src/compressible/pda/compressor.rs b/forester/src/compressible/pda/compressor.rs
index 3b5664f857..c364d1c5d4 100644
--- a/forester/src/compressible/pda/compressor.rs
+++ b/forester/src/compressible/pda/compressor.rs
@@ -26,7 +26,13 @@ use tracing::{debug, info};
use super::{state::PdaAccountTracker, types::PdaAccountState};
use crate::{
- compressible::{config::PdaProgramConfig, traits::CompressibleTracker},
+ compressible::{
+ config::PdaProgramConfig,
+ traits::{
+ send_and_confirm_with_tracking, verify_transaction_execution, Cancelled,
+ CompressibleTracker,
+ },
+ },
Result,
};
@@ -153,6 +159,10 @@ impl PdaCompressor {
return Vec::new();
}
+ // Mark all accounts as pending upfront so concurrent cycles skip them
+ let all_pubkeys: Vec = account_states.iter().map(|s| s.pubkey).collect();
+ self.tracker.mark_pending(&all_pubkeys);
+
// Create futures for each account
let compression_futures = account_states.iter().cloned().map(|account_state| {
let compressor = self.clone();
@@ -163,7 +173,9 @@ impl PdaCompressor {
async move {
// Check cancellation before processing
if cancelled.load(Ordering::Relaxed) {
- return Err((account_state, anyhow::anyhow!("Cancelled")));
+ // Unmark since we won't process this account
+ compressor.tracker.unmark_pending(&[account_state.pubkey]);
+ return Err((account_state, Cancelled.into()));
}
match compressor
@@ -182,9 +194,16 @@ impl PdaCompressor {
.collect()
.await;
- // Remove successfully compressed PDAs from tracker
- for (_, pda_state) in results.iter().flatten() {
- self.tracker.remove(&pda_state.pubkey);
+ // Remove successfully compressed PDAs; unmark failed ones
+ for result in &results {
+ match result {
+ Ok((_, pda_state)) => {
+ self.tracker.remove_compressed(&pda_state.pubkey);
+ }
+ Err((pda_state, _)) => {
+ self.tracker.unmark_pending(&[pda_state.pubkey]);
+ }
+ }
}
results
@@ -279,48 +298,15 @@ impl PdaCompressor {
program_id
);
- // Send single transaction
- let signature = rpc
- .create_and_send_transaction(
- &[ix],
- &self.payer_keypair.pubkey(),
- &[&self.payer_keypair],
- )
- .await
- .map_err(|e| anyhow::anyhow!("Failed to send transaction: {:?}", e))?;
-
- info!(
- "Batched compress_accounts_idempotent tx for {} PDAs sent: {}",
- account_states.len(),
- signature
- );
-
- // Wait for confirmation before removing from tracker
- let confirmed = rpc
- .confirm_transaction(signature)
- .await
- .map_err(|e| anyhow::anyhow!("Failed to confirm transaction: {:?}", e))?;
-
- if confirmed {
- // Only remove from tracker after confirmed
- for state in account_states {
- self.tracker.remove(&state.pubkey);
- }
- info!(
- "Batched compress_accounts_idempotent tx confirmed: {}",
- signature
- );
- Ok(signature)
- } else {
- tracing::warn!(
- "compress_accounts_idempotent tx not confirmed: {} - accounts kept in tracker for retry",
- signature
- );
- Err(anyhow::anyhow!(
- "Batch transaction not confirmed: {}",
- signature
- ))
- }
+ send_and_confirm_with_tracking(
+ &mut *rpc,
+ &[ix],
+ &self.payer_keypair,
+ &*self.tracker,
+ &pubkeys,
+ "compress_accounts_idempotent",
+ )
+ .await
}
/// Compress a single PDA account using cached config
@@ -342,6 +328,20 @@ impl PdaCompressor {
let mut rpc = self.rpc_pool.get_connection().await?;
+ // Pre-check: verify the PDA still exists on-chain to avoid no-op txs
+ let account_info = rpc
+ .get_account(*pda)
+ .await
+ .map_err(|e| anyhow::anyhow!("Failed to check PDA {}: {:?}", pda, e))?;
+ if account_info.is_none() {
+ debug!(
+ "PDA {} no longer exists on-chain, removing from tracker",
+ pda
+ );
+ self.tracker.remove(pda);
+ return Err(anyhow::anyhow!("PDA {} already closed, skipping", pda));
+ }
+
// Get the compressed account
let compressed_account = rpc
.get_compressed_account(compressed_address, None)
@@ -378,7 +378,7 @@ impl PdaCompressor {
pda, program_id
);
- // Send transaction
+ // Send transaction (pending is managed by the caller)
let signature = rpc
.create_and_send_transaction(
&[ix],
@@ -400,6 +400,8 @@ impl PdaCompressor {
.map_err(|e| anyhow::anyhow!("Failed to confirm transaction: {:?}", e))?;
if confirmed {
+ verify_transaction_execution(&*rpc, signature).await?;
+
info!("compress_accounts_idempotent tx for PDA {} confirmed", pda);
Ok(signature)
} else {
diff --git a/forester/src/compressible/pda/state.rs b/forester/src/compressible/pda/state.rs
index af00bc806e..92f9fe0a61 100644
--- a/forester/src/compressible/pda/state.rs
+++ b/forester/src/compressible/pda/state.rs
@@ -1,5 +1,7 @@
+use std::sync::atomic::AtomicU64;
+
use borsh::BorshDeserialize;
-use dashmap::DashMap;
+use dashmap::{DashMap, DashSet};
use light_account::CompressionInfo;
use light_compressible::rent::{
get_last_funded_epoch, get_rent_exemption_lamports, SLOTS_PER_EPOCH,
@@ -48,6 +50,8 @@ fn calculate_compressible_slot(
pub struct PdaAccountTracker {
accounts: DashMap,
programs: Vec,
+ compressed_count: AtomicU64,
+ pending: DashSet,
}
impl PdaAccountTracker {
@@ -55,6 +59,8 @@ impl PdaAccountTracker {
Self {
accounts: DashMap::new(),
programs,
+ compressed_count: AtomicU64::new(0),
+ pending: DashSet::new(),
}
}
@@ -110,10 +116,10 @@ impl PdaAccountTracker {
};
if compression_info.is_compressed() {
- debug!(
- "Account {} is already compressed; skipping re-compression",
- pubkey
- );
+ // Account was compressed — remove stale entry from tracker if present
+ if self.remove(&pubkey).is_some() {
+ debug!("PDA {} already compressed, removed from tracker", pubkey);
+ }
return Ok(());
}
@@ -150,11 +156,24 @@ impl CompressibleTracker for PdaAccountTracker {
fn accounts(&self) -> &DashMap {
&self.accounts
}
+
+ fn compressed_counter(&self) -> &AtomicU64 {
+ &self.compressed_count
+ }
+
+ fn pending(&self) -> &DashSet {
+ &self.pending
+ }
}
impl Default for PdaAccountTracker {
fn default() -> Self {
- Self::new(Vec::new())
+ Self {
+ accounts: DashMap::new(),
+ programs: Vec::new(),
+ compressed_count: AtomicU64::new(0),
+ pending: DashSet::new(),
+ }
}
}
@@ -166,6 +185,13 @@ impl SubscriptionHandler for PdaAccountTracker {
data: &[u8],
lamports: u64,
) -> Result<()> {
+ // If account data is empty (account was closed), remove from tracker
+ if data.is_empty() {
+ if self.remove(&pubkey).is_some() {
+ debug!("Removed closed PDA account {} from tracker", pubkey);
+ }
+ return Ok(());
+ }
self.update_from_account(pubkey, program_id, data, lamports)
}
diff --git a/forester/src/compressible/traits.rs b/forester/src/compressible/traits.rs
index d7df33abb3..0255d89458 100644
--- a/forester/src/compressible/traits.rs
+++ b/forester/src/compressible/traits.rs
@@ -1,10 +1,34 @@
//! Shared traits for compressible account tracking.
-use dashmap::DashMap;
-use solana_sdk::pubkey::Pubkey;
+use std::{
+ fmt,
+ sync::atomic::{AtomicU64, Ordering},
+};
+
+use dashmap::{DashMap, DashSet};
+use light_client::rpc::Rpc;
+use solana_sdk::{
+ instruction::Instruction,
+ pubkey::Pubkey,
+ signature::{Keypair, Signature},
+ signer::Signer,
+};
+use tracing::info;
use crate::Result;
+/// Typed error for compressor cancellation, used instead of string matching.
+#[derive(Debug)]
+pub struct Cancelled;
+
+impl fmt::Display for Cancelled {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "Cancelled")
+ }
+}
+
+impl std::error::Error for Cancelled {}
+
pub trait CompressibleState: Clone + Send + Sync {
fn pubkey(&self) -> &Pubkey;
fn lamports(&self) -> u64;
@@ -15,18 +39,56 @@ pub trait CompressibleState: Clone + Send + Sync {
}
}
-/// Implementors only need to provide `accounts()` - all other methods have default implementations.
+/// Implementors only need to provide `accounts()`, `compressed_counter()`,
+/// and `pending()` — all other methods have default implementations.
pub trait CompressibleTracker: Send + Sync {
fn accounts(&self) -> &DashMap;
+ /// Counter for total accounts successfully compressed and removed from the tracker.
+ fn compressed_counter(&self) -> &AtomicU64;
+
+ /// Set of account pubkeys with in-flight compression transactions.
+ /// Accounts in this set are skipped by `get_ready_to_compress()`.
+ fn pending(&self) -> &DashSet;
+
fn insert(&self, state: S) {
self.accounts().insert(*state.pubkey(), state);
}
fn remove(&self, pubkey: &Pubkey) -> Option
{
+ self.pending().remove(pubkey);
self.accounts().remove(pubkey).map(|(_, v)| v)
}
+ /// Remove an account after successful compression, incrementing the compressed counter.
+ fn remove_compressed(&self, pubkey: &Pubkey) -> Option {
+ let removed = self.remove(pubkey);
+ if removed.is_some() {
+ self.compressed_counter().fetch_add(1, Ordering::Relaxed);
+ }
+ removed
+ }
+
+ /// Total number of accounts successfully compressed since startup.
+ fn total_compressed(&self) -> u64 {
+ self.compressed_counter().load(Ordering::Relaxed)
+ }
+
+ /// Mark accounts as pending (in-flight tx). They will be skipped by
+ /// `get_ready_to_compress()` until confirmed or returned to the pool.
+ fn mark_pending(&self, pubkeys: &[Pubkey]) {
+ for pk in pubkeys {
+ self.pending().insert(*pk);
+ }
+ }
+
+ /// Return accounts to the work pool after a failed transaction.
+ fn unmark_pending(&self, pubkeys: &[Pubkey]) {
+ for pk in pubkeys {
+ self.pending().remove(pk);
+ }
+ }
+
fn len(&self) -> usize {
self.accounts().len()
}
@@ -36,9 +98,12 @@ pub trait CompressibleTracker: Send + Sync {
}
fn get_ready_to_compress(&self, current_slot: u64) -> Vec {
+ let pending = self.pending();
self.accounts()
.iter()
- .filter(|entry| entry.value().is_ready_to_compress(current_slot))
+ .filter(|entry| {
+ entry.value().is_ready_to_compress(current_slot) && !pending.contains(entry.key())
+ })
.map(|entry| entry.value().clone())
.collect()
}
@@ -56,3 +121,109 @@ pub trait SubscriptionHandler: Send + Sync {
fn handle_removal(&self, pubkey: &Pubkey);
}
+
+pub async fn verify_transaction_execution(rpc: &impl Rpc, signature: Signature) -> Result<()> {
+ const MAX_RETRIES: u32 = 3;
+ const RETRY_DELAY: std::time::Duration = std::time::Duration::from_millis(500);
+
+ for attempt in 0..MAX_RETRIES {
+ let statuses = rpc
+ .get_signature_statuses(&[signature])
+ .await
+ .map_err(|e| {
+ anyhow::anyhow!("Failed to get signature status for {}: {:?}", signature, e)
+ })?;
+
+ match statuses.first() {
+ Some(Some(status)) => {
+ if let Some(err) = &status.err {
+ return Err(anyhow::anyhow!(
+ "Transaction {} confirmed but execution failed: {:?}",
+ signature,
+ err
+ ));
+ }
+ return Ok(());
+ }
+ _ if attempt < MAX_RETRIES - 1 => {
+ tracing::debug!(
+ "Transaction {} status not yet available, retrying ({}/{})",
+ signature,
+ attempt + 1,
+ MAX_RETRIES
+ );
+ tokio::time::sleep(RETRY_DELAY).await;
+ }
+ _ => {}
+ }
+ }
+
+ Err(anyhow::anyhow!(
+ "Transaction {} status unavailable after {} retries",
+ signature,
+ MAX_RETRIES
+ ))
+}
+
+/// Marks `pubkeys` as pending, sends the transaction, waits for confirmation,
+/// verifies execution, and either marks accounts as compressed or unmarks pending
+/// on any failure.
+pub async fn send_and_confirm_with_tracking(
+ rpc: &mut impl Rpc,
+ instructions: &[Instruction],
+ payer: &Keypair,
+ tracker: &impl CompressibleTracker,
+ pubkeys: &[Pubkey],
+ tx_label: &str,
+) -> Result {
+ tracker.mark_pending(pubkeys);
+
+ let signature = match rpc
+ .create_and_send_transaction(instructions, &payer.pubkey(), &[payer])
+ .await
+ {
+ Ok(sig) => sig,
+ Err(e) => {
+ tracker.unmark_pending(pubkeys);
+ return Err(anyhow::anyhow!(
+ "Failed to send {} transaction: {:?}",
+ tx_label,
+ e
+ ));
+ }
+ };
+
+ info!("{} tx sent: {}", tx_label, signature);
+
+ let confirmed = match rpc.confirm_transaction(signature).await {
+ Ok(confirmed) => confirmed,
+ Err(e) => {
+ tracker.unmark_pending(pubkeys);
+ return Err(anyhow::anyhow!(
+ "Failed to confirm {} transaction: {:?}",
+ tx_label,
+ e
+ ));
+ }
+ };
+
+ if confirmed {
+ if let Err(e) = verify_transaction_execution(rpc, signature).await {
+ tracker.unmark_pending(pubkeys);
+ return Err(e);
+ }
+
+ for pubkey in pubkeys {
+ tracker.remove_compressed(pubkey);
+ }
+ info!("{} tx confirmed: {}", tx_label, signature);
+ Ok(signature)
+ } else {
+ tracker.unmark_pending(pubkeys);
+ Err(anyhow::anyhow!(
+ "{} tx not confirmed: {} - accounts returned to work pool",
+ tx_label,
+ signature
+ ))
+ }
+}
diff --git a/forester/src/config.rs b/forester/src/config.rs
index 0dc22b9227..42eee8d2ee 100644
--- a/forester/src/config.rs
+++ b/forester/src/config.rs
@@ -97,6 +97,8 @@ pub struct GeneralConfig {
pub sleep_when_idle_ms: u64,
pub queue_polling_mode: QueuePollingMode,
pub group_authority: Option,
+ /// Use Helius getProgramAccountsV2 instead of standard getProgramAccounts
+ pub helius_rpc: bool,
}
impl Default for GeneralConfig {
@@ -114,6 +116,7 @@ impl Default for GeneralConfig {
sleep_when_idle_ms: 45_000,
queue_polling_mode: QueuePollingMode::Indexer,
group_authority: None,
+ helius_rpc: false,
}
}
}
@@ -121,35 +124,23 @@ impl Default for GeneralConfig {
impl GeneralConfig {
pub fn test_address_v2() -> Self {
GeneralConfig {
- slot_update_interval_seconds: 10,
- tree_discovery_interval_seconds: 1,
- enable_metrics: true,
skip_v1_state_trees: true,
skip_v1_address_trees: true,
skip_v2_state_trees: true,
- skip_v2_address_trees: false,
- tree_ids: vec![],
sleep_after_processing_ms: 50,
sleep_when_idle_ms: 100,
- queue_polling_mode: QueuePollingMode::Indexer,
- group_authority: None,
+ ..Default::default()
}
}
pub fn test_state_v2() -> Self {
GeneralConfig {
- slot_update_interval_seconds: 10,
- tree_discovery_interval_seconds: 1,
- enable_metrics: true,
skip_v1_state_trees: true,
skip_v1_address_trees: true,
- skip_v2_state_trees: false,
skip_v2_address_trees: true,
- tree_ids: vec![],
sleep_after_processing_ms: 50,
sleep_when_idle_ms: 100,
- queue_polling_mode: QueuePollingMode::Indexer,
- group_authority: None,
+ ..Default::default()
}
}
}
@@ -245,12 +236,13 @@ impl ForesterConfig {
.rpc_url
.clone()
.ok_or(ConfigError::MissingField { field: "rpc_url" })?;
+ let indexer_url = args.indexer_url.clone();
Ok(Self {
external_services: ExternalServicesConfig {
rpc_url,
ws_rpc_url: args.ws_rpc_url.clone(),
- indexer_url: args.indexer_url.clone(),
+ indexer_url: Some(indexer_url),
prover_url: args.prover_url.clone(),
prover_append_url: args
.prover_append_url
@@ -341,6 +333,7 @@ impl ForesterConfig {
})
})
.transpose()?,
+ helius_rpc: args.helius_rpc,
},
rpc_pool_config: RpcPoolConfig {
max_size: args.rpc_pool_size,
@@ -436,18 +429,10 @@ impl ForesterConfig {
indexer_config: IndexerConfig::default(),
transaction_config: TransactionConfig::default(),
general_config: GeneralConfig {
- slot_update_interval_seconds: 10,
tree_discovery_interval_seconds: 60,
enable_metrics: args.enable_metrics(),
- skip_v1_state_trees: false,
- skip_v2_state_trees: false,
- skip_v1_address_trees: false,
- skip_v2_address_trees: false,
- tree_ids: vec![],
- sleep_after_processing_ms: 10_000,
- sleep_when_idle_ms: 45_000,
queue_polling_mode: QueuePollingMode::OnChain, // Status uses on-chain reads
- group_authority: None,
+ ..Default::default()
},
rpc_pool_config: RpcPoolConfig {
max_size: 10,
diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs
index 49da980118..6a18e2a059 100644
--- a/forester/src/epoch_manager.rs
+++ b/forester/src/epoch_manager.rs
@@ -4,12 +4,12 @@ use std::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc,
},
- time::Duration,
+ time::{Duration, SystemTime, UNIX_EPOCH},
};
use anyhow::{anyhow, Context};
use borsh::BorshSerialize;
-use dashmap::{mapref::entry::Entry, DashMap};
+use dashmap::DashMap;
use forester_utils::{
forester_epoch::{get_epoch_phases, Epoch, ForesterSlot, TreeAccounts, TreeForesterSchedule},
rpc_pool::SolanaRpcPool,
@@ -41,15 +41,19 @@ use solana_sdk::{
use tokio::{
sync::{broadcast, broadcast::error::RecvError, mpsc, oneshot, Mutex},
task::JoinHandle,
- time::{sleep, Instant},
+ time::{sleep, Instant, MissedTickBehavior},
};
use tracing::{debug, error, info, info_span, instrument, trace, warn};
use crate::{
- compressible::{traits::CompressibleTracker, CTokenAccountTracker, CTokenCompressor},
+ compressible::{
+ traits::{Cancelled, CompressibleTracker},
+ CTokenAccountTracker, CTokenCompressor,
+ },
errors::{
ChannelError, ForesterError, InitializationError, RegistrationError, WorkReportError,
},
+ logging::{should_emit_rate_limited_warning, ServiceHeartbeat},
metrics::{push_metrics, queue_metric_update, update_forester_sol_balance},
pagerduty::send_pagerduty_alert,
processor::{
@@ -84,6 +88,7 @@ type StateBatchProcessorMap =
Arc>>)>>;
type AddressBatchProcessorMap =
Arc>>)>>;
+type ProcessorInitLockMap = Arc>>>;
/// Timing for a single circuit type (circuit inputs + proof generation)
#[derive(Copy, Clone, Debug, Default)]
@@ -207,12 +212,16 @@ pub struct EpochManager {
proof_caches: Arc>>,
state_processors: StateBatchProcessorMap,
address_processors: AddressBatchProcessorMap,
+ state_processor_init_locks: ProcessorInitLockMap,
+ address_processor_init_locks: ProcessorInitLockMap,
compressible_tracker: Option>,
pda_tracker: Option>,
mint_tracker: Option>,
/// Cached zkp_batch_size per tree to filter queue updates below threshold
zkp_batch_sizes: Arc>,
address_lookup_tables: Arc>,
+ heartbeat: Arc,
+ run_id: Arc,
}
impl Clone for EpochManager {
@@ -234,11 +243,15 @@ impl Clone for EpochManager {
proof_caches: self.proof_caches.clone(),
state_processors: self.state_processors.clone(),
address_processors: self.address_processors.clone(),
+ state_processor_init_locks: self.state_processor_init_locks.clone(),
+ address_processor_init_locks: self.address_processor_init_locks.clone(),
compressible_tracker: self.compressible_tracker.clone(),
pda_tracker: self.pda_tracker.clone(),
mint_tracker: self.mint_tracker.clone(),
zkp_batch_sizes: self.zkp_batch_sizes.clone(),
address_lookup_tables: self.address_lookup_tables.clone(),
+ heartbeat: self.heartbeat.clone(),
+ run_id: self.run_id.clone(),
}
}
}
@@ -259,6 +272,8 @@ impl EpochManager {
pda_tracker: Option>,
mint_tracker: Option>,
address_lookup_tables: Arc>,
+ heartbeat: Arc,
+ run_id: String,
) -> Result {
let authority = Arc::new(config.payer_keypair.insecure_clone());
Ok(Self {
@@ -278,11 +293,15 @@ impl EpochManager {
proof_caches: Arc::new(DashMap::new()),
state_processors: Arc::new(DashMap::new()),
address_processors: Arc::new(DashMap::new()),
+ state_processor_init_locks: Arc::new(DashMap::new()),
+ address_processor_init_locks: Arc::new(DashMap::new()),
compressible_tracker,
pda_tracker,
mint_tracker,
zkp_batch_sizes: Arc::new(DashMap::new()),
address_lookup_tables,
+ heartbeat,
+ run_id: Arc::::from(run_id),
})
}
@@ -324,7 +343,11 @@ impl EpochManager {
balance_check_handle,
),
|(h2, h3, h4)| {
- info!("Aborting EpochManager background tasks");
+ info!(
+ event = "background_tasks_aborting",
+ run_id = %self.run_id,
+ "Aborting EpochManager background tasks"
+ );
h2.abort();
h3.abort();
h4.abort();
@@ -336,16 +359,49 @@ impl EpochManager {
epoch_opt = rx.recv() => {
match epoch_opt {
Some(epoch) => {
- debug!("Received new epoch: {}", epoch);
+ debug!(
+ event = "epoch_queued_for_processing",
+ run_id = %self.run_id,
+ epoch,
+ "Received epoch from monitor"
+ );
let self_clone = Arc::clone(&self);
tokio::spawn(async move {
if let Err(e) = self_clone.process_epoch(epoch).await {
- error!("Error processing epoch {}: {:?}", epoch, e);
+ if let Some(ForesterError::Registration(
+ RegistrationError::FinalizeRegistrationPhaseEnded {
+ epoch,
+ current_slot,
+ active_phase_end_slot,
+ },
+ )) = e.downcast_ref::()
+ {
+ debug!(
+ event = "epoch_processing_skipped_finalize_registration_phase_ended",
+ run_id = %self_clone.run_id,
+ epoch = *epoch,
+ current_slot = *current_slot,
+ active_phase_end_slot = *active_phase_end_slot,
+ "Skipping epoch processing because FinalizeRegistration is no longer possible"
+ );
+ } else {
+ error!(
+ event = "epoch_processing_failed",
+ run_id = %self_clone.run_id,
+ epoch,
+ error = ?e,
+ "Error processing epoch"
+ );
+ }
}
});
}
None => {
- error!("Epoch monitor channel closed unexpectedly!");
+ error!(
+ event = "epoch_monitor_channel_closed",
+ run_id = %self.run_id,
+ "Epoch monitor channel closed unexpectedly"
+ );
break Err(anyhow!(
"Epoch monitor channel closed - forester cannot function without it"
));
@@ -355,13 +411,27 @@ impl EpochManager {
result = &mut monitor_handle => {
match result {
Ok(Ok(())) => {
- error!("Epoch monitor exited unexpectedly with Ok(())");
+ error!(
+ event = "epoch_monitor_exited_unexpected_ok",
+ run_id = %self.run_id,
+ "Epoch monitor exited unexpectedly with Ok(())"
+ );
}
Ok(Err(e)) => {
- error!("Epoch monitor exited with error: {:?}", e);
+ error!(
+ event = "epoch_monitor_exited_with_error",
+ run_id = %self.run_id,
+ error = ?e,
+ "Epoch monitor exited with error"
+ );
}
Err(e) => {
- error!("Epoch monitor task panicked or was cancelled: {:?}", e);
+ error!(
+ event = "epoch_monitor_task_failed",
+ run_id = %self.run_id,
+ error = ?e,
+ "Epoch monitor task panicked or was cancelled"
+ );
}
}
if let Some(pagerduty_key) = &self.config.external_services.pagerduty_routing_key {
@@ -396,11 +466,26 @@ impl EpochManager {
&self.config.payer_keypair.pubkey().to_string(),
balance_in_sol,
);
- debug!("Current SOL balance: {} SOL", balance_in_sol);
+ debug!(
+ event = "forester_balance_updated",
+ run_id = %self.run_id,
+ balance_sol = balance_in_sol,
+ "Current SOL balance updated"
+ );
}
- Err(e) => error!("Failed to get balance: {:?}", e),
+ Err(e) => error!(
+ event = "forester_balance_fetch_failed",
+ run_id = %self.run_id,
+ error = ?e,
+ "Failed to get balance"
+ ),
},
- Err(e) => error!("Failed to get RPC connection for balance check: {:?}", e),
+ Err(e) => error!(
+ event = "forester_balance_rpc_connection_failed",
+ run_id = %self.run_id,
+ error = ?e,
+ "Failed to get RPC connection for balance check"
+ ),
}
}
}
@@ -410,18 +495,37 @@ impl EpochManager {
loop {
match receiver.recv().await {
Ok(new_tree) => {
- info!("Received new tree: {:?}", new_tree);
+ info!(
+ event = "new_tree_received",
+ run_id = %self.run_id,
+ tree = %new_tree.merkle_tree,
+ tree_type = ?new_tree.tree_type,
+ "Received new tree"
+ );
if let Err(e) = self.add_new_tree(new_tree).await {
- error!("Failed to add new tree: {:?}", e);
+ error!(
+ event = "new_tree_add_failed",
+ run_id = %self.run_id,
+ error = ?e,
+ "Failed to add new tree"
+ );
// Continue processing other trees instead of crashing
}
}
Err(e) => match e {
RecvError::Lagged(lag) => {
- warn!("Lagged in receiving new trees: {:?}", lag);
+ warn!(
+ event = "new_tree_receiver_lagged",
+ run_id = %self.run_id,
+ lag, "Lagged while receiving new trees"
+ );
}
RecvError::Closed => {
- info!("New tree receiver closed");
+ info!(
+ event = "new_tree_receiver_closed",
+ run_id = %self.run_id,
+ "New tree receiver closed"
+ );
break;
}
},
@@ -431,23 +535,54 @@ impl EpochManager {
}
async fn add_new_tree(&self, new_tree: TreeAccounts) -> Result<()> {
- info!("Adding new tree: {:?}", new_tree);
+ info!(
+ event = "new_tree_add_started",
+ run_id = %self.run_id,
+ tree = %new_tree.merkle_tree,
+ tree_type = ?new_tree.tree_type,
+ "Adding new tree"
+ );
let mut trees = self.trees.lock().await;
trees.push(new_tree);
drop(trees);
- info!("New tree added to the list of trees");
+ info!(
+ event = "new_tree_added",
+ run_id = %self.run_id,
+ tree = %new_tree.merkle_tree,
+ "New tree added to tracked list"
+ );
let (current_slot, current_epoch) = self.get_current_slot_and_epoch().await?;
let phases = get_epoch_phases(&self.protocol_config, current_epoch);
// Check if we're currently in the active phase
if current_slot >= phases.active.start && current_slot < phases.active.end {
- info!("Currently in active phase. Attempting to process the new tree immediately.");
- info!("Recovering registration info...");
+ info!(
+ event = "new_tree_active_phase_injection",
+ run_id = %self.run_id,
+ tree = %new_tree.merkle_tree,
+ current_slot,
+ active_phase_start_slot = phases.active.start,
+ active_phase_end_slot = phases.active.end,
+ "In active phase; attempting immediate processing for new tree"
+ );
+ info!(
+ event = "new_tree_recover_registration_started",
+ run_id = %self.run_id,
+ tree = %new_tree.merkle_tree,
+ epoch = current_epoch,
+ "Recovering registration info for new tree"
+ );
match self.recover_registration_info(current_epoch).await {
Ok(mut epoch_info) => {
- info!("Recovered registration info for current epoch");
+ info!(
+ event = "new_tree_recover_registration_succeeded",
+ run_id = %self.run_id,
+ tree = %new_tree.merkle_tree,
+ epoch = current_epoch,
+ "Recovered registration info for current epoch"
+ );
let tree_schedule = TreeForesterSchedule::new_with_schedule(
&new_tree,
current_slot,
@@ -458,8 +593,15 @@ impl EpochManager {
let self_clone = Arc::new(self.clone());
- info!("Spawning task to process new tree in current epoch");
+ info!(
+ event = "new_tree_processing_task_spawned",
+ run_id = %self.run_id,
+ tree = %new_tree.merkle_tree,
+ epoch = current_epoch,
+ "Spawning task to process new tree in current epoch"
+ );
tokio::spawn(async move {
+ let tree_pubkey = tree_schedule.tree_accounts.merkle_tree;
if let Err(e) = self_clone
.process_queue(
&epoch_info.epoch,
@@ -468,9 +610,20 @@ impl EpochManager {
)
.await
{
- error!("Error processing queue for new tree: {:?}", e);
+ error!(
+ event = "new_tree_process_queue_failed",
+ run_id = %self_clone.run_id,
+ tree = %tree_pubkey,
+ error = ?e,
+ "Error processing queue for new tree"
+ );
} else {
- info!("Successfully processed new tree in current epoch");
+ info!(
+ event = "new_tree_process_queue_succeeded",
+ run_id = %self_clone.run_id,
+ tree = %tree_pubkey,
+ "Successfully processed new tree in current epoch"
+ );
}
});
}
@@ -482,19 +635,33 @@ impl EpochManager {
) {
debug!("Not registered for current epoch yet, new tree will be picked up during next registration");
} else {
- warn!("Failed to recover registration info for new tree: {:?}", e);
+ warn!(
+ event = "new_tree_recover_registration_failed",
+ run_id = %self.run_id,
+ tree = %new_tree.merkle_tree,
+ epoch = current_epoch,
+ error = ?e,
+ "Failed to recover registration info for new tree"
+ );
}
}
}
info!(
- "Injected new tree into current epoch {}: {:?}",
- current_epoch, new_tree
+ event = "new_tree_injected_into_current_epoch",
+ run_id = %self.run_id,
+ tree = %new_tree.merkle_tree,
+ epoch = current_epoch,
+ "Injected new tree into current epoch"
);
} else {
info!(
- "Not in active phase (current slot: {}, active start: {}). Tree will be picked up in next registration.",
- current_slot, phases.active.start
+ event = "new_tree_queued_for_next_registration",
+ run_id = %self.run_id,
+ tree = %new_tree.merkle_tree,
+ current_slot,
+ active_phase_start_slot = phases.active.start,
+ "Not in active phase; new tree will be picked up in next registration"
);
}
@@ -507,15 +674,20 @@ impl EpochManager {
let mut consecutive_failures = 0u32;
const MAX_BACKOFF_SECS: u64 = 60;
- info!("Starting epoch monitor");
+ info!(
+ event = "epoch_monitor_started",
+ run_id = %self.run_id,
+ "Starting epoch monitor"
+ );
loop {
let (slot, current_epoch) = match self.get_current_slot_and_epoch().await {
Ok(result) => {
if consecutive_failures > 0 {
info!(
- "Epoch monitor recovered after {} consecutive failures",
- consecutive_failures
+ event = "epoch_monitor_recovered",
+ run_id = %self.run_id,
+ consecutive_failures, "Epoch monitor recovered after failures"
);
}
consecutive_failures = 0;
@@ -528,13 +700,21 @@ impl EpochManager {
if consecutive_failures == 1 {
warn!(
- "Epoch monitor: failed to get slot/epoch: {:?}. Retrying in {:?}",
- e, backoff
+ event = "epoch_monitor_slot_epoch_failed",
+ run_id = %self.run_id,
+ consecutive_failures,
+ error = ?e,
+ backoff_ms = backoff.as_millis() as u64,
+ "Epoch monitor failed to get slot/epoch; retrying"
);
} else if consecutive_failures.is_multiple_of(10) {
error!(
- "Epoch monitor: {} consecutive failures, last error: {:?}. Still retrying every {:?}",
- consecutive_failures, e, backoff
+ event = "epoch_monitor_slot_epoch_failed_repeated",
+ run_id = %self.run_id,
+ consecutive_failures,
+ error = ?e,
+ backoff_ms = backoff.as_millis() as u64,
+ "Epoch monitor still failing repeatedly"
);
}
@@ -544,19 +724,36 @@ impl EpochManager {
};
debug!(
- "last_epoch: {:?}, current_epoch: {:?}, slot: {:?}",
- last_epoch, current_epoch, slot
+ event = "epoch_monitor_tick",
+ run_id = %self.run_id,
+ last_epoch = ?last_epoch,
+ current_epoch,
+ slot,
+ "Epoch monitor tick"
);
if last_epoch.is_none_or(|last| current_epoch > last) {
- debug!("New epoch detected: {}", current_epoch);
+ debug!(
+ event = "epoch_monitor_new_epoch_detected",
+ run_id = %self.run_id,
+ epoch = current_epoch,
+ "New epoch detected"
+ );
let phases = get_epoch_phases(&self.protocol_config, current_epoch);
if slot < phases.registration.end {
- debug!("Sending current epoch {} for processing", current_epoch);
+ debug!(
+ event = "epoch_monitor_send_current_epoch",
+ run_id = %self.run_id,
+ epoch = current_epoch,
+ "Sending current epoch for processing"
+ );
if let Err(e) = tx.send(current_epoch).await {
error!(
- "Failed to send current epoch {} for processing: {:?}. Channel closed, exiting.",
- current_epoch, e
+ event = "epoch_monitor_send_current_epoch_failed",
+ run_id = %self.run_id,
+ epoch = current_epoch,
+ error = ?e,
+ "Failed to send current epoch for processing; channel closed"
);
return Err(anyhow!("Epoch channel closed: {}", e));
}
@@ -577,7 +774,13 @@ impl EpochManager {
let mut rpc = match self.rpc_pool.get_connection().await {
Ok(rpc) => rpc,
Err(e) => {
- warn!("Failed to get RPC connection for slot waiting: {:?}", e);
+ warn!(
+ event = "epoch_monitor_wait_rpc_connection_failed",
+ run_id = %self.run_id,
+ target_epoch,
+ error = ?e,
+ "Failed to get RPC connection while waiting for registration slot"
+ );
tokio::time::sleep(Duration::from_secs(1)).await;
break;
}
@@ -591,36 +794,59 @@ impl EpochManager {
let slots_to_wait = wait_target.saturating_sub(slot);
debug!(
- "Waiting for epoch {} registration phase. Current slot: {}, Wait target: {} (registration starts at {}), Slots to wait: {}",
- target_epoch, slot, wait_target, target_phases.registration.start, slots_to_wait
+ event = "epoch_monitor_wait_for_registration",
+ run_id = %self.run_id,
+ target_epoch,
+ current_slot = slot,
+ wait_target_slot = wait_target,
+ registration_start_slot = target_phases.registration.start,
+ slots_to_wait,
+ "Waiting for target epoch registration phase"
);
if let Err(e) =
wait_until_slot_reached(&mut *rpc, &self.slot_tracker, wait_target)
.await
{
- error!("Error waiting for registration phase: {:?}", e);
+ error!(
+ event = "epoch_monitor_wait_for_registration_failed",
+ run_id = %self.run_id,
+ target_epoch,
+ error = ?e,
+ "Error waiting for registration phase"
+ );
break;
}
let current_slot = self.slot_tracker.estimated_current_slot();
if current_slot >= target_phases.registration.end {
debug!(
- "Epoch {} registration ended while waiting (current slot {} >= end {}), trying next epoch",
- target_epoch, current_slot, target_phases.registration.end
+ event = "epoch_monitor_registration_ended_while_waiting",
+ run_id = %self.run_id,
+ target_epoch,
+ current_slot,
+ registration_end_slot = target_phases.registration.end,
+ "Target epoch registration ended while waiting; trying next epoch"
);
target_epoch += 1;
continue;
}
debug!(
- "Epoch {} registration phase ready, sending for processing (current slot: {}, registration end: {})",
- target_epoch, current_slot, target_phases.registration.end
+ event = "epoch_monitor_send_target_epoch_after_wait",
+ run_id = %self.run_id,
+ target_epoch,
+ current_slot,
+ registration_end_slot = target_phases.registration.end,
+ "Target epoch registration phase ready; sending for processing"
);
if let Err(e) = tx.send(target_epoch).await {
error!(
- "Failed to send epoch {} for processing: {:?}",
- target_epoch, e
+ event = "epoch_monitor_send_target_epoch_failed",
+ run_id = %self.run_id,
+ target_epoch,
+ error = ?e,
+ "Failed to send target epoch for processing"
);
break;
}
@@ -631,13 +857,20 @@ impl EpochManager {
// If we're within the registration window, send it
if slot < target_phases.registration.end {
debug!(
- "Epoch {} registration phase is open (slot {} < end {}), sending for processing",
- target_epoch, slot, target_phases.registration.end
+ event = "epoch_monitor_send_target_epoch_window_open",
+ run_id = %self.run_id,
+ target_epoch,
+ slot,
+ registration_end_slot = target_phases.registration.end,
+ "Target epoch registration window is open; sending for processing"
);
if let Err(e) = tx.send(target_epoch).await {
error!(
- "Failed to send epoch {} for processing: {:?}",
- target_epoch, e
+ event = "epoch_monitor_send_target_epoch_failed",
+ run_id = %self.run_id,
+ target_epoch,
+ error = ?e,
+ "Failed to send target epoch for processing"
);
break;
}
@@ -647,8 +880,12 @@ impl EpochManager {
// Registration already ended, try next epoch
debug!(
- "Epoch {} registration already ended (slot {} >= end {}), checking next epoch",
- target_epoch, slot, target_phases.registration.end
+ event = "epoch_monitor_target_epoch_registration_closed",
+ run_id = %self.run_id,
+ target_epoch,
+ slot,
+ registration_end_slot = target_phases.registration.end,
+ "Target epoch registration already ended; checking next epoch"
);
target_epoch += 1;
}
@@ -718,7 +955,13 @@ impl EpochManager {
if slot > current_phases.registration.start {
debug!("Processing previous epoch: {}", previous_epoch);
if let Err(e) = tx.send(previous_epoch).await {
- error!("Failed to send previous epoch for processing: {:?}", e);
+ error!(
+ event = "initial_epoch_send_previous_failed",
+ run_id = %self.run_id,
+ epoch = previous_epoch,
+ error = ?e,
+ "Failed to send previous epoch for processing"
+ );
return Ok(());
}
}
@@ -731,7 +974,13 @@ impl EpochManager {
current_epoch
);
if let Err(e) = tx.send(current_epoch).await {
- error!("Failed to send current epoch for processing: {:?}", e);
+ error!(
+ event = "initial_epoch_send_current_failed",
+ run_id = %self.run_id,
+ epoch = current_epoch,
+ error = ?e,
+ "Failed to send current epoch for processing"
+ );
return Ok(()); // Channel closed, exit gracefully
}
} else {
@@ -752,20 +1001,32 @@ impl EpochManager {
current_epoch
);
if let Err(e) = tx.send(current_epoch).await {
- error!("Failed to send current epoch for processing: {:?}", e);
+ error!(
+ event = "initial_epoch_send_current_registered_failed",
+ run_id = %self.run_id,
+ epoch = current_epoch,
+ error = ?e,
+ "Failed to send current epoch for processing"
+ );
return Ok(()); // Channel closed, exit gracefully
}
} else {
- warn!(
- "Skipping current epoch {} - registration ended at slot {} (current slot: {})",
- current_epoch, current_phases.registration.end, slot
+ info!(
+ event = "skip_current_epoch_registration_closed",
+ run_id = %self.run_id,
+ epoch = current_epoch,
+ registration_end_slot = current_phases.registration.end,
+ current_slot = slot,
+ "Skipping current epoch because registration has ended"
);
}
}
Err(e) => {
warn!(
- "Failed to get RPC connection to check registration, skipping: {:?}",
- e
+ event = "registration_check_rpc_failed",
+ run_id = %self.run_id,
+ error = ?e,
+ "Failed to get RPC connection to check registration, skipping"
);
}
}
@@ -818,7 +1079,13 @@ impl EpochManager {
epoch
);
} else {
- warn!("Failed to recover registration info: {:?}", e);
+ warn!(
+ event = "recover_registration_info_failed",
+ run_id = %self.run_id,
+ epoch,
+ error = ?e,
+ "Failed to recover registration info"
+ );
}
// Attempt to register
match self
@@ -839,9 +1106,16 @@ impl EpochManager {
next_phases.registration.start.saturating_sub(current_slot);
info!(
- "Too late to register for epoch {} (registration ended at slot {}, current slot: {}). Next available epoch: {}. Registration opens at slot {} ({} slots to wait).",
- failed_epoch, registration_end, current_slot, next_epoch, next_phases.registration.start, slots_to_wait
- );
+ event = "registration_window_missed",
+ run_id = %self.run_id,
+ failed_epoch,
+ registration_end_slot = registration_end,
+ current_slot,
+ next_epoch,
+ next_registration_start_slot = next_phases.registration.start,
+ slots_to_wait,
+ "Too late to register for requested epoch; next epoch will be used"
+ );
return Ok(());
}
Err(e) => return Err(e.into()),
@@ -870,16 +1144,25 @@ impl EpochManager {
if self.sync_slot().await? < phases.report_work.end {
self.report_work_onchain(®istration_info).await?;
} else {
+ let current_slot = self.slot_tracker.estimated_current_slot();
info!(
- "Skipping on-chain work report for epoch {} (report_work phase ended)",
- registration_info.epoch.epoch
+ event = "skip_onchain_work_report_phase_ended",
+ run_id = %self.run_id,
+ epoch = registration_info.epoch.epoch,
+ current_slot,
+ report_work_end_slot = phases.report_work.end,
+ "Skipping on-chain work report because report_work phase has ended"
);
}
// TODO: implement
// self.claim(®istration_info).await?;
- info!("Exiting process_epoch");
+ info!(
+ event = "process_epoch_completed",
+ run_id = %self.run_id,
+ epoch, "Exiting process_epoch"
+ );
Ok(())
}
@@ -920,8 +1203,13 @@ impl EpochManager {
if slot < phases.registration.start {
let slots_to_wait = phases.registration.start.saturating_sub(slot);
info!(
- "Registration for epoch {} hasn't started yet (current slot: {}, starts at: {}). Waiting {} slots...",
- epoch, slot, phases.registration.start, slots_to_wait
+ event = "registration_wait_for_window",
+ run_id = %self.run_id,
+ epoch,
+ current_slot = slot,
+ registration_start_slot = phases.registration.start,
+ slots_to_wait,
+ "Registration window not open yet; waiting"
);
let wait_duration = slot_duration() * slots_to_wait as u32;
sleep(wait_duration).await;
@@ -931,11 +1219,38 @@ impl EpochManager {
match self.register_for_epoch(epoch).await {
Ok(registration_info) => return Ok(registration_info),
Err(e) => {
+ if let Some(RegistrationError::RegistrationPhaseEnded {
+ epoch: ended_epoch,
+ current_slot,
+ registration_end,
+ }) = e.downcast_ref::()
+ {
+ warn!(
+ event = "registration_attempt_non_retryable",
+ run_id = %self.run_id,
+ epoch,
+ attempt = attempt + 1,
+ max_attempts = max_retries,
+ error = ?e,
+ "Registration phase ended; stopping retries for this epoch"
+ );
+ return Err(ForesterError::Registration(
+ RegistrationError::RegistrationPhaseEnded {
+ epoch: *ended_epoch,
+ current_slot: *current_slot,
+ registration_end: *registration_end,
+ },
+ ));
+ }
+
warn!(
- "Failed to register for epoch {} (attempt {}): {:?}",
+ event = "registration_attempt_failed",
+ run_id = %self.run_id,
epoch,
- attempt + 1,
- e
+ attempt = attempt + 1,
+ max_attempts = max_retries,
+ error = ?e,
+ "Failed to register for epoch; retrying"
);
if attempt < max_retries - 1 {
sleep(retry_delay).await;
@@ -954,7 +1269,13 @@ impl