Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/perps-controller/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed

- Bump `@metamask/utils` from `^11.9.0` to `^11.11.0` ([#9074](https://github.com/MetaMask/core/pull/9074))
- On `subscribeToPrices` calls with `includeMarketData: true` (i.e. focused single-symbol screens), the `price` field in each `PriceUpdate` is now driven by the per-symbol `activeAssetCtx` WebSocket stream (`midPx`, falling back to `markPx`) rather than the main-DEX `allMids` snapshot, which Hyperliquid throttles to a ~5 s push cadence ([#TODO](https://github.com/MetaMask/core/pull/TODO))
- The fast-stream price is preferred only while it is fresh (within a 10 s staleness window); `allMids` remains the fallback if the `activeAssetCtx` stream goes silent.
- Subscriptions with `includeMarketData: false` (list/overview screens) are unaffected and continue to use `allMids` exclusively.
- No new WebSocket subscriptions are created; `activeAssetCtx` was already established for `includeMarketData: true` subscriptions.

## [8.1.0]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,19 @@ export class HyperLiquidSubscriptionService {
volume24h?: number;
oraclePrice?: number;
lastUpdated: number;
// Fast-stream price from activeAssetCtx (midPx preferred, markPx fallback).
// Populated only for symbols with includeMarketData subscriptions.
// Used in #createPriceUpdate to prefer this over the 5s-cadence allMids snapshot.
activeAssetCtxPrice?: number;
// Timestamp of the last activeAssetCtx price update, used for staleness checks.
priceLastUpdated?: number;
}
>();

// Stale threshold for activeAssetCtx price preference. If the last fast-stream
// price update is older than this, #createPriceUpdate falls back to allMids.
static readonly #activeAssetCtxPriceTtlMs = 10_000;

// Flag to suppress error logging during intentional disconnect
// Set in clearAll() and never reset (service instance is discarded after disconnect)
#isClearing = false;
Expand Down Expand Up @@ -2831,7 +2841,23 @@ export class HyperLiquidSubscriptionService {
#createPriceUpdate(symbol: string, price: string): PriceUpdate {
const marketData = this.#marketDataCache.get(symbol);
const orderBookData = this.#orderBookCache.get(symbol);
const currentPrice = parseFloat(price);

// Prefer the fast-stream price from activeAssetCtx over the passed-in
// allMids price when one is cached and fresh enough. This absorbs the
// upcoming 5s allMids push cadence on detail/ticket screens without
// affecting list/overview screens (which never populate activeAssetCtxPrice).
const now = Date.now();
const hasFreshActiveAssetCtxPrice =
marketData?.activeAssetCtxPrice !== undefined &&
marketData.priceLastUpdated !== undefined &&
now - marketData.priceLastUpdated <=
HyperLiquidSubscriptionService.#activeAssetCtxPriceTtlMs;

const effectivePrice = hasFreshActiveAssetCtxPrice

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description/changelog says list and overview screens with includeMarketData: false are unaffected and continue to use allMids exclusively, but the changed implementation stores the activeAssetCtx-selected price in shared #cachedPriceData and then fans that same PriceUpdate out to every subscriber for the symbol.

Fanout path: packages/perps-controller/src/services/HyperLiquidSubscriptionService.ts:3850

Contract text: packages/perps-controller/CHANGELOG.md:13

Proof this PR introduces/regresses it: this PR adds activeAssetCtxPrice preference inside #createPriceUpdate, writes the selected fast price into shared #cachedPriceData, and leaves #notifyAllPriceSubscribers subscriber agnostic. If a focused subscriber and a list row subscribe to the same symbol, the list subscriber can receive the fast activeAssetCtx price without opting in.

Suggested fix: keep raw price sources separate and project PriceUpdate per subscriber at notify time. A pragmatic version would be a price source aware #createPriceUpdate(symbol, { priceSource }) path or separate cached allMids and activeAssetCtx prices.

Please add a mixed subscriber test where allMids emits 50000, activeAssetCtx emits 50500, the focused subscriber receives 50500, and the list
subscriber remains at 50000.

? (marketData.activeAssetCtxPrice as number).toString()
: price;

const currentPrice = parseFloat(effectivePrice);

let percentChange24h: string | undefined;
if (marketData?.prevDayPx !== undefined) {
Expand All @@ -2847,8 +2873,8 @@ export class HyperLiquidSubscriptionService {

const priceUpdate = {
symbol,
price, // This is the mid price from allMids
timestamp: Date.now(),
price: effectivePrice,
timestamp: now,
percentChange24h,
// Add mark price from activeAssetCtx
markPrice: marketData?.oraclePrice
Expand All @@ -2872,6 +2898,18 @@ export class HyperLiquidSubscriptionService {

/**
* Ensure global allMids subscription is active (singleton pattern)
*
* NOTE ON PUSH CADENCE: Hyperliquid throttles the main-DEX allMids stream to
* push every ~5 seconds. This cadence is acceptable for list/overview screens
* that show many symbols simultaneously, but would make a focused single-symbol
* view (trade detail, order ticket) feel noticeably stale.
*
* Mitigation: when a subscription is created with `includeMarketData: true`,
* #ensureActiveAssetSubscription also establishes a per-symbol activeAssetCtx
* WebSocket that ticks at a faster cadence. #createPriceUpdate prefers that
* fast-stream price over this allMids snapshot (with a 10s staleness gate via
* #ACTIVE_ASSET_CTX_PRICE_TTL_MS), so detail screens stay responsive. Screens
* that use only allMids (includeMarketData: false) are unaffected.
*/
#ensureGlobalAllMidsSubscription(): void {
// Check both the subscription AND the promise to prevent race conditions
Expand Down Expand Up @@ -3020,6 +3058,7 @@ export class HyperLiquidSubscriptionService {

// Cache market data for consolidation with price updates
const ctxPrice = ctx.midPx ?? ctx.markPx;
const now = Date.now();
const openInterestUSD =
isPerpsContext(data) && ctxPrice
? calculateOpenInterestUSD(data.ctx.openInterest, ctxPrice)
Expand All @@ -3042,23 +3081,35 @@ export class HyperLiquidSubscriptionService {
oraclePrice: isPerpsContext(data)
? parseFloat(data.ctx.oraclePx.toString())
: undefined,
lastUpdated: Date.now(),
lastUpdated: now,
// Store fast-stream price so #createPriceUpdate can prefer it over
// the 5s-cadence allMids snapshot on detail/ticket screens.
activeAssetCtxPrice: ctxPrice
? parseFloat(ctxPrice.toString())
: undefined,
priceLastUpdated: ctxPrice ? now : undefined,
};

this.#marketDataCache.set(symbol, marketData);

// Update cached price data with new 24h change if we have current price
const currentCachedPrice = this.#cachedPriceData?.get(symbol);
if (currentCachedPrice) {
const updatedPrice = this.#createPriceUpdate(
symbol,
currentCachedPrice.price,
);

this.#cachedPriceData ??= new Map<string, PriceUpdate>();
this.#cachedPriceData.set(symbol, updatedPrice);
this.#notifyAllPriceSubscribers();
}
// Drive a price update from the fast-stream price. Use the allMids
// fallback string if ctxPrice is absent so #createPriceUpdate still
// has something to work with. The preference logic inside
// #createPriceUpdate will pick up the cached activeAssetCtxPrice.
const fallbackPrice =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. activeAssetCtx can send a 0 price before a real price exists.

When activeAssetCtx has neither midPx nor markPx, and no allMids price is cached yet, the new path falls back to '0', writes that as the cached price, and notifies subscribers immediately.

Changed hunk: packages/perps-controller/src/services/HyperLiquidSubscriptionService.ts:3099

Proof this PR introduces/regresses it: before this PR, activeAssetCtx only refreshed an existing cached price; this PR makes activeAssetCtx drive a first price update and uses '0' when neither source has a real price. That can briefly surface 0 on focused screens and feed 0 into calculations.

Suggested fix: skip the price update until either activeAssetCtx has midPx ?? markPx or a real cached allMids fallback exists. Please add coverage for activeAssetCtx first without price.

this.#cachedPriceData?.get(symbol)?.price ?? '0';
const priceForUpdate = ctxPrice
? ctxPrice.toString()
: fallbackPrice;

// Notify unconditionally so the first activeAssetCtx tick surfaces a
// price even if no allMids tick has arrived yet.
this.#cachedPriceData ??= new Map<string, PriceUpdate>();
this.#cachedPriceData.set(
symbol,
this.#createPriceUpdate(symbol, priceForUpdate),
);
this.#notifyAllPriceSubscribers();
}
},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2559,4 +2559,252 @@ describe('HyperLiquidSubscriptionService', () => {
expect(result2).toBeNull();
});
});

describe('activeAssetCtx price preference', () => {
it('uses activeAssetCtx midPx as the displayed price when both streams are active', async () => {
const mockCallback = jest.fn();

// allMids will push 50000, activeAssetCtx will push 50500 (faster stream)
mockSubscriptionClient.allMids.mockImplementation(
(paramsOrCallback: any, maybeCallback?: any) => {
const callback =
typeof paramsOrCallback === 'function'
? paramsOrCallback
: maybeCallback;
setTimeout(() => {
callback({ mids: { BTC: '50000' } });
}, 0);
return Promise.resolve({
unsubscribe: jest.fn().mockResolvedValue(undefined),
});
},
);

mockSubscriptionClient.activeAssetCtx.mockImplementation(
(params: any, callback: any) => {
setTimeout(() => {
callback({
coin: params.coin,
ctx: {
prevDayPx: '49000',
funding: '0.01',
openInterest: '1000000',
dayNtlVlm: '50000000',
oraclePx: '50100',
midPx: '50500', // faster-stream price – should win
},
});
}, 5); // fire after allMids to exercise preference logic
return Promise.resolve({
unsubscribe: jest.fn().mockResolvedValue(undefined),
});
},
);

const unsubscribe = await service.subscribeToPrices({
symbols: ['BTC'],
callback: mockCallback,
includeMarketData: true,
});

await jest.runAllTimersAsync();

// The last notification should carry the activeAssetCtx price, not allMids
const lastCall =
mockCallback.mock.calls[mockCallback.mock.calls.length - 1][0];
expect(lastCall).toEqual(
expect.arrayContaining([
expect.objectContaining({
symbol: 'BTC',
price: '50500',
}),
]),
);

unsubscribe();
});

it('does not replace a fresher activeAssetCtx price when allMids ticks again', async () => {
let allMidsCallback: ((data: any) => void) | undefined;
let activeAssetCallback: ((data: any) => void) | undefined;

mockSubscriptionClient.allMids.mockImplementation(
(paramsOrCallback: any, maybeCallback?: any) => {
allMidsCallback =
typeof paramsOrCallback === 'function'
? paramsOrCallback
: maybeCallback;
return Promise.resolve({
unsubscribe: jest.fn().mockResolvedValue(undefined),
});
},
);

mockSubscriptionClient.activeAssetCtx.mockImplementation(
(params: any, callback: any) => {
activeAssetCallback = callback;
return Promise.resolve({
unsubscribe: jest.fn().mockResolvedValue(undefined),
});
},
);

const mockCallback = jest.fn();
const unsubscribe = await service.subscribeToPrices({
symbols: ['BTC'],
callback: mockCallback,
includeMarketData: true,
});

await jest.runAllTimersAsync();

// Fire activeAssetCtx first with a fresh price
activeAssetCallback?.({
coin: 'BTC',
ctx: {
prevDayPx: '49000',
funding: '0.01',
openInterest: '1000000',
dayNtlVlm: '50000000',
oraclePx: '50100',
midPx: '50500',
},
});

mockCallback.mockClear();

// allMids fires again with the older/stale price – should NOT win
allMidsCallback?.({ mids: { BTC: '50000' } });

await jest.runAllTimersAsync();

const lastCall =
mockCallback.mock.calls[mockCallback.mock.calls.length - 1][0];
expect(lastCall).toEqual(
expect.arrayContaining([
expect.objectContaining({
symbol: 'BTC',
price: '50500', // still the activeAssetCtx price
}),
]),
);

unsubscribe();
});

it('uses allMids price when includeMarketData is false (list/overview screens)', async () => {
const mockCallback = jest.fn();

mockSubscriptionClient.allMids.mockImplementation(
(paramsOrCallback: any, maybeCallback?: any) => {
const callback =
typeof paramsOrCallback === 'function'
? paramsOrCallback
: maybeCallback;
setTimeout(() => {
callback({ mids: { BTC: '50000' } });
}, 0);
return Promise.resolve({
unsubscribe: jest.fn().mockResolvedValue(undefined),
});
},
);

const unsubscribe = await service.subscribeToPrices({
symbols: ['BTC'],
callback: mockCallback,
includeMarketData: false,
});

await jest.runAllTimersAsync();

// activeAssetCtx should not have been called at all
expect(mockSubscriptionClient.activeAssetCtx).not.toHaveBeenCalled();

// Price should come from allMids
const lastCall =
mockCallback.mock.calls[mockCallback.mock.calls.length - 1][0];
expect(lastCall).toEqual(
expect.arrayContaining([
expect.objectContaining({
symbol: 'BTC',
price: '50000',
}),
]),
);

unsubscribe();
});

it('falls back to allMids price when activeAssetCtx price is stale (beyond TTL)', async () => {
let allMidsCallback: ((data: any) => void) | undefined;
let activeAssetCallback: ((data: any) => void) | undefined;

mockSubscriptionClient.allMids.mockImplementation(
(paramsOrCallback: any, maybeCallback?: any) => {
allMidsCallback =
typeof paramsOrCallback === 'function'
? paramsOrCallback
: maybeCallback;
return Promise.resolve({
unsubscribe: jest.fn().mockResolvedValue(undefined),
});
},
);

mockSubscriptionClient.activeAssetCtx.mockImplementation(
(params: any, callback: any) => {
activeAssetCallback = callback;
return Promise.resolve({
unsubscribe: jest.fn().mockResolvedValue(undefined),
});
},
);

const mockCallback = jest.fn();
const unsubscribe = await service.subscribeToPrices({
symbols: ['BTC'],
callback: mockCallback,
includeMarketData: true,
});

await jest.runAllTimersAsync();

// Fire activeAssetCtx with a price
activeAssetCallback?.({
coin: 'BTC',
ctx: {
prevDayPx: '49000',
funding: '0.01',
openInterest: '1000000',
dayNtlVlm: '50000000',
oraclePx: '50100',
midPx: '50500',
},
});

// Advance time beyond the 10s TTL so the cached fast-stream price is stale
jest.advanceTimersByTime(11_000);

mockCallback.mockClear();

// allMids fires – should now win because fast-stream price is stale
allMidsCallback?.({ mids: { BTC: '50000' } });

await jest.runAllTimersAsync();

const lastCall =
mockCallback.mock.calls[mockCallback.mock.calls.length - 1][0];
expect(lastCall).toEqual(
expect.arrayContaining([
expect.objectContaining({
symbol: 'BTC',
price: '50000', // allMids wins after TTL expires
}),
]),
);

unsubscribe();
});
});
});
Loading