From eba27e48e7cb771fc5c986ac20ba8ff2bf246f3c Mon Sep 17 00:00:00 2001 From: Noley Holland Date: Thu, 18 Jun 2026 13:45:01 -0700 Subject: [PATCH 01/15] Replace instance status polling with live MQTT updates, send state to the team channel, add Pinia store, connect for Instances list only --- forge/comms/aclManager.js | 32 +++++++- forge/comms/devices.js | 5 ++ forge/comms/index.js | 8 ++ forge/db/controllers/Project.js | 24 ++++++ forge/db/models/Project.js | 5 ++ frontend/src/pages/team/Instances.vue | 28 ++++++- frontend/src/stores/live-status.ts | 31 ++++++++ .../subscribers/team-channel.subscriber.ts | 30 +++++++- test/unit/forge/comms/authRoutesV2_spec.js | 36 +++++++++ test/unit/forge/comms/devices_spec.js | 32 ++++++++ .../unit/forge/db/controllers/Project_spec.js | 41 ++++++++++ test/unit/forge/db/models/Project_spec.js | 36 +++++++++ test/unit/frontend/stores/live-status.spec.js | 76 +++++++++++++++++++ .../team-channel.subscriber.spec.js | 73 +++++++++++++++++- 14 files changed, 449 insertions(+), 8 deletions(-) create mode 100644 frontend/src/stores/live-status.ts create mode 100644 test/unit/frontend/stores/live-status.spec.js diff --git a/forge/comms/aclManager.js b/forge/comms/aclManager.js index d760fb7db9..f388d31816 100644 --- a/forge/comms/aclManager.js +++ b/forge/comms/aclManager.js @@ -140,6 +140,29 @@ module.exports = function (app) { return false } }, + checkTeamStatusSub: async function (requestParts, usernameParts) { + // requestParts = [ fullTopic , , ] + // usernameParts = [ 'fe-team', , , ] + // team-wide status wildcard: gate on team membership only. requestParts[2] + // is the instance/device id (or '+'), never the user — don't validate it. + const topicTeamHash = requestParts[1] + const usernameUserHash = usernameParts[1] + const usernameTeamHash = usernameParts[2] + if (topicTeamHash !== usernameTeamHash) { + return false + } + try { + const team = await app.db.models.Team.byId(usernameTeamHash) + if (!team) return false + const user = await app.db.models.User.byId(usernameUserHash) + if (!user) return false + const membership = await app.db.models.TeamMember.getTeamMembership(user.id, team.id, false) + return !!membership + } catch (error) { + app.log.error('Unexpected error during team-channel status ACL check', { requestParts, usernameParts, error }) + return false + } + }, checkExpertTopic: async function (topicParts, usernameParts, acl) { // topicParts = [ fullTopic , , , , [, ] ] // usernameParts = [ 'expert-client' | 'expert-agent', [, ] ] @@ -333,6 +356,9 @@ module.exports = function (app) { { topic: /^ff\/v1\/[^/]+\/t\/updated$/ }, // - ff/v1//u//membership { topic: /^ff\/v1\/[^/]+\/u\/[^/]+\/membership$/ }, + // - ff/v1//p//status + { topic: /^ff\/v1\/[^/]+\/p\/[^/]+\/status$/ }, + // TODO NOLEY: devices here too // ff/v1/platform/sync { topic: /^ff\/v1\/platform\/sync$/ }, // ff/v1/platform/leader @@ -396,7 +422,11 @@ module.exports = function (app) { // - ff/v1//t/updated { topic: /^ff\/v1\/([^/]+)\/t\/updated$/, verify: 'checkUserIsTeamMember' }, // - ff/v1//u//membership - { topic: /^ff\/v1\/([^/]+)\/u\/([^/]+)\/membership$/, verify: 'checkUserIsTeamMember' } + { topic: /^ff\/v1\/([^/]+)\/u\/([^/]+)\/membership$/, verify: 'checkUserIsTeamMember' }, + // - ff/v1//p/+/status + { topic: /^ff\/v1\/([^/]+)\/p\/([^/]+)\/status$/, verify: 'checkTeamStatusSub' }, + // - ff/v1//d/+/status + { topic: /^ff\/v1\/([^/]+)\/d\/([^/]+)\/status$/, verify: 'checkTeamStatusSub' } ], pub: [] }, diff --git a/forge/comms/devices.js b/forge/comms/devices.js index ab85b6d9cc..2bd38e45bb 100644 --- a/forge/comms/devices.js +++ b/forge/comms/devices.js @@ -121,6 +121,7 @@ class DeviceCommsHandler { const teamId = this.app.db.models.Team.encodeHashid(device.TeamId) const startTime = Date.now() try { + const previousState = device.state const payload = JSON.parse(status.status) await this.app.db.controllers.Device.updateState(device, payload) @@ -130,6 +131,10 @@ class DeviceCommsHandler { return } + if (payload.state !== previousState) { + this.app.comms.team.notifyDeviceStatus(teamId, status.id, payload.state) + } + // If the status state===unknown, the device is waiting for confirmation // it has the right details. Always response with an 'update' command in // this scenario diff --git a/forge/comms/index.js b/forge/comms/index.js index 04a7c463e0..3c8c889400 100644 --- a/forge/comms/index.js +++ b/forge/comms/index.js @@ -71,6 +71,14 @@ module.exports = fp(async function (app, _opts) { if (!teamHash || !userHash) return const msg = { reason: reason || null, srcId: srcId || null } client.publish(`ff/v1/${teamHash}/u/${userHash}/membership`, JSON.stringify(msg)) + }, + notifyDeviceStatus: function (teamHash, id, state) { + if (!teamHash || !id) return + client.publish(`ff/v1/${teamHash}/d/${id}/status`, JSON.stringify({ id, meta: { state } })) + }, + notifyInstanceStatus: function (teamHash, id, state) { + if (!teamHash || !id) return + client.publish(`ff/v1/${teamHash}/p/${id}/status`, JSON.stringify({ id, meta: { state } })) } } }) diff --git a/forge/db/controllers/Project.js b/forge/db/controllers/Project.js index 19ee3b8b23..c0e73b56d1 100644 --- a/forge/db/controllers/Project.js +++ b/forge/db/controllers/Project.js @@ -14,12 +14,28 @@ const latestProjectState = 'project-latestProjectState' const inflightDeploys = 'project-inflightDeploys' +const lastPublishedProjectState = 'project-lastPublishedProjectState' + module.exports = { init (app) { app.caches.createCache(inflightProjectState) app.caches.createCache(latestProjectState) app.caches.createCache(inflightDeploys) + app.caches.createCache(lastPublishedProjectState) + }, + + // NOLEY TODO: this has issues with restarting states + publishLiveState: async function (app, project) { + if (!app.comms || !project?.TeamId) return + const inflight = await this.getInflightState(app, project) + const latest = await this.getLatestProjectState(app, project.id) + const effective = inflight ?? (project.state === 'suspended' ? 'suspended' : (latest ?? project.state)) + if (!effective) return + const cache = app.caches.getCache(lastPublishedProjectState) + if (await cache.get(project.id) === effective) return + await cache.set(project.id, effective) + app.comms.team.notifyInstanceStatus(app.db.models.Team.encodeHashid(project.TeamId), project.id, effective) }, getProjectPaginationOptions: function (app, request) { @@ -58,6 +74,7 @@ module.exports = { */ setInflightState: async function (app, project, state) { await app.caches.getCache(inflightProjectState).set(project.id, state) + await this.publishLiveState(app, project) }, /** @@ -87,6 +104,7 @@ module.exports = { clearInflightState: async function (app, project) { await app.caches.getCache(inflightProjectState).del(project.id) await app.caches.getCache(inflightDeploys).del(project.id) + await this.publishLiveState(app, project) }, /** @@ -810,6 +828,12 @@ module.exports = { } else { await this.setLatestProjectState(app, projectId, state) } + if (app.comms) { + const project = await app.db.models.Project.byId(projectId, { barebone: true }) + if (project) { + await this.publishLiveState(app, project) + } + } } } diff --git a/forge/db/models/Project.js b/forge/db/models/Project.js index fc4817f0e0..a6dab47dfd 100644 --- a/forge/db/models/Project.js +++ b/forge/db/models/Project.js @@ -166,6 +166,11 @@ module.exports = { } } }, + afterSave: async (project, opts) => { + if (project.changed('state')) { + await Controllers.Project.publishLiveState(project) + } + }, afterDestroy: async (project, opts) => { await M.AccessToken.destroy({ where: { diff --git a/frontend/src/pages/team/Instances.vue b/frontend/src/pages/team/Instances.vue index 82cd2dadea..681d463193 100644 --- a/frontend/src/pages/team/Instances.vue +++ b/frontend/src/pages/team/Instances.vue @@ -156,7 +156,9 @@ - + @@ -187,6 +189,7 @@ import InstanceStatusBadge from '../instance/components/InstanceStatusBadge.vue' import { useAccountSettingsStore } from '@/stores/account-settings.js' import { useContextStore } from '@/stores/context.js' +import { useLiveStatusStore } from '@/stores/live-status' export default { name: 'TeamInstances', @@ -273,6 +276,7 @@ export default { computed: { ...mapState(useContextStore, ['team']), ...mapState(useAccountSettingsStore, ['featuresCheck']), + ...mapState(useLiveStatusStore, { liveInstanceStatuses: 'instanceStatuses', statusChannelLive: 'live' }), instances () { return Array.from(this.instancesMap.values()) }, @@ -289,7 +293,8 @@ export default { } }, watch: { - team: 'fullReload' + team: 'fullReload', + liveInstanceStatuses: { handler: 'applyLiveStatus', deep: true } }, mounted () { this.fullReload() @@ -336,12 +341,31 @@ export default { nextMap.set(instance.id, instance) }) this.instancesMap = nextMap + this.applyLiveStatus() } catch (e) { Alerts.emit('Failed to load instances.', 'warning') } finally { this.loading = false } }, + applyLiveStatus () { + const statuses = this.liveInstanceStatuses + for (const id of this.instancesMap.keys()) { + const state = statuses[id] + if (!state) continue + const row = this.instancesMap.get(id) + if (row.status === state && row.meta?.state === state) continue + this.instancesMap.set(id, { + ...row, + status: state, + meta: { ...(row.meta || {}), state }, + running: this.isRunningState(state), + notSuspended: state !== 'suspended', + pendingStateChange: false, + optimisticStateChange: false + }) + } + }, updateSearch: debounce(function (term) { this.searchTerm = term this.page = 1 diff --git a/frontend/src/stores/live-status.ts b/frontend/src/stores/live-status.ts new file mode 100644 index 0000000000..a4af110424 --- /dev/null +++ b/frontend/src/stores/live-status.ts @@ -0,0 +1,31 @@ +import { defineStore } from 'pinia' +import { computed, ref } from 'vue' + +export const useLiveStatusStore = defineStore('live-status', () => { + const instanceStatuses = ref>({}) + const deviceStatuses = ref>({}) + const live = ref(false) + + const getInstanceStatus = computed(() => (id: string) => instanceStatuses.value[id]) + const getDeviceStatus = computed(() => (id: string) => deviceStatuses.value[id]) + + function setInstanceStatus (id: string, state: string): void { + instanceStatuses.value[id] = state + } + + function setDeviceStatus (id: string, state: string): void { + deviceStatuses.value[id] = state + } + + function setLive (value: boolean): void { + live.value = value + } + + function clear (): void { + instanceStatuses.value = {} + deviceStatuses.value = {} + live.value = false + } + + return { instanceStatuses, deviceStatuses, live, getInstanceStatus, getDeviceStatus, setDeviceStatus, setInstanceStatus, setLive, clear } +}) diff --git a/frontend/src/subscribers/team-channel.subscriber.ts b/frontend/src/subscribers/team-channel.subscriber.ts index c4a3bb2e6a..72ee33aa1f 100644 --- a/frontend/src/subscribers/team-channel.subscriber.ts +++ b/frontend/src/subscribers/team-channel.subscriber.ts @@ -3,11 +3,14 @@ import { BaseSubscriber } from './subscriber.contract' import teamApi from '@/api/team.js' import { useAccountAuthStore } from '@/stores/account-auth.js' import { useContextStore } from '@/stores/context.js' +import { useLiveStatusStore } from '@/stores/live-status' import { Maybe } from '@/types/common/types' import type { CreateSubscriberOptions, TeamChannelSubscriberI, TeamRef } from '@/types/subscribers/subscriber.types' const MEMBERSHIP_TOPIC_REGEX = /^ff\/v1\/[^/]+\/u\/([^/]+)\/membership$/ const TEAM_UPDATED_TOPIC_REGEX = /^ff\/v1\/[^/]+\/t\/updated$/ +const DEVICE_STATUS_UPDATED_TOPIC_REGEX = /^ff\/v1\/[^/]+\/d\/[^/]+\/status$/ +const INSTANCE_STATUS_UPDATED_TOPIC_REGEX = /^ff\/v1\/[^/]+\/p\/[^/]+\/status$/ function connectionKey (teamId: string): string { return `team:${teamId}` @@ -73,6 +76,7 @@ class TeamChannelSubscriber extends BaseSubscriber implements TeamChannelSubscri if (!transport) return try { await transport.disconnect(key) + useLiveStatusStore().clear() } catch { // ignore teardown failures } @@ -88,8 +92,12 @@ class TeamChannelSubscriber extends BaseSubscriber implements TeamChannelSubscri try { await transport.subscribe(connectionKey(teamId), [ `ff/v1/${teamId}/t/updated`, - `ff/v1/${teamId}/u/${userId}/membership` + `ff/v1/${teamId}/u/${userId}/membership`, + `ff/v1/${teamId}/p/+/status`, + `ff/v1/${teamId}/d/+/status` ], { qos: 1 }) + + useLiveStatusStore().setLive(true) } catch { // non-fatal — the transport replays subscriptions on reconnect } @@ -116,13 +124,29 @@ class TeamChannelSubscriber extends BaseSubscriber implements TeamChannelSubscri // topic pattern → store action; the store owns interpretation (what a // reason means, what to refresh). Add a row per new sync-able entity. - protected _topicRoutes (): Array<{ pattern: RegExp, handle: (payload: { reason?: string }) => void }> { + protected _topicRoutes (): Array<{ pattern: RegExp, handle: (payload: { reason?: string, id?: string, meta?: { state?: string } }) => void }> { return [ { pattern: MEMBERSHIP_TOPIC_REGEX, handle: (payload) => this._onMembership(payload) }, - { pattern: TEAM_UPDATED_TOPIC_REGEX, handle: () => this._onTeamUpdated() } + { pattern: TEAM_UPDATED_TOPIC_REGEX, handle: () => this._onTeamUpdated() }, + { pattern: DEVICE_STATUS_UPDATED_TOPIC_REGEX, handle: (payload) => this._onDeviceStatus(payload) }, + { pattern: INSTANCE_STATUS_UPDATED_TOPIC_REGEX, handle: (payload) => this._onInstanceStatus(payload) } ] } + protected _onInstanceStatus (payload: { id?: string, meta?: { state?: string } }): void { + if (!payload?.id || !payload.meta?.state) return + try { + useLiveStatusStore().setInstanceStatus(payload.id, payload.meta.state) + } catch {} + } + + protected _onDeviceStatus (payload: { id?: string, meta?: { state?: string } }): void { + if (!payload?.id || !payload.meta?.state) return + try { + useLiveStatusStore().setDeviceStatus(payload.id, payload.meta.state) + } catch {} + } + protected _onMembership (payload: { reason?: string }): void { try { useContextStore().onTeamChannelMembership(payload).catch(() => undefined) diff --git a/test/unit/forge/comms/authRoutesV2_spec.js b/test/unit/forge/comms/authRoutesV2_spec.js index 2175f48742..5ccf362ae5 100644 --- a/test/unit/forge/comms/authRoutesV2_spec.js +++ b/test/unit/forge/comms/authRoutesV2_spec.js @@ -1376,6 +1376,42 @@ describe('Broker Auth v2 API', async function () { topic: teamUpdatedTopic }) }) + it('allows a team member to subscribe to the team instance-status wildcard', async function () { + await allowRead({ + username: teamFrontendUsername, + topic: `ff/v1/${TestObjects.ATeam.hashid}/p/+/status` + }) + }) + it('allows a team member to subscribe to the team device-status wildcard', async function () { + await allowRead({ + username: teamFrontendUsername, + topic: `ff/v1/${TestObjects.ATeam.hashid}/d/+/status` + }) + }) + it('denies subscribe to another team\'s status wildcard', async function () { + await denyRead({ + username: teamFrontendUsername, + topic: `ff/v1/${otherTeam.hashid}/p/+/status` + }) + await denyRead({ + username: teamFrontendUsername, + topic: `ff/v1/${otherTeam.hashid}/d/+/status` + }) + }) + it('denies status subscribe for a user who is not a member of the team', async function () { + const dave = await factory.createUser({ username: 'dave', name: 'Dave', email: 'dave@example.com', password: 'ddPassword1!' }) + const daveUsername = `fe-team:${dave.hashid}:${TestObjects.ATeam.hashid}:session-1234567890` + await denyRead({ + username: daveUsername, + topic: `ff/v1/${TestObjects.ATeam.hashid}/p/+/status` + }) + }) + it('denies fe-team from publishing to status (read-only client)', async function () { + await denyWrite({ + username: teamFrontendUsername, + topic: `ff/v1/${TestObjects.ATeam.hashid}/p/+/status` + }) + }) it('denies fe-team from publishing (read-only client)', async function () { await denyWrite({ username: teamFrontendUsername, diff --git a/test/unit/forge/comms/devices_spec.js b/test/unit/forge/comms/devices_spec.js index df915412d7..b316796233 100644 --- a/test/unit/forge/comms/devices_spec.js +++ b/test/unit/forge/comms/devices_spec.js @@ -1,6 +1,8 @@ const sleep = require('util').promisify(setTimeout) const should = require('should') // eslint-disable-line +const sinon = require('sinon') + const setup = require('../routes/setup') const FF_UTIL = require('flowforge-test-utils') @@ -216,6 +218,36 @@ describe('DeviceCommsHandler', function () { payload.should.have.property('project', null) payload.should.have.property('snapshot', null) }) + + it('forwards a team status notification only when the device state changes', async function () { + const device = await app.factory.createDevice({ name: 'status-forward-device' }, TestObjects.ATeam) + const notifySpy = sinon.spy(app.comms.team, 'notifyDeviceStatus') + try { + // establish a known baseline regardless of the factory default state + client.emit('status/device', { id: device.hashid, status: JSON.stringify({ state: 'stopped' }) }) + await sleep(100) + notifySpy.resetHistory() + + // state changes -> notifies with { teamHash, deviceHashid, state } + client.emit('status/device', { id: device.hashid, status: JSON.stringify({ state: 'running' }) }) + await sleep(100) + notifySpy.calledOnce.should.be.true() + notifySpy.firstCall.args.should.eql([TestObjects.ATeam.hashid, device.hashid, 'running']) + + // same state again -> no further notification + client.emit('status/device', { id: device.hashid, status: JSON.stringify({ state: 'running' }) }) + await sleep(100) + notifySpy.calledOnce.should.be.true() + + // state changes again -> notifies again + client.emit('status/device', { id: device.hashid, status: JSON.stringify({ state: 'stopped' }) }) + await sleep(100) + notifySpy.calledTwice.should.be.true() + notifySpy.secondCall.args.should.eql([TestObjects.ATeam.hashid, device.hashid, 'stopped']) + } finally { + notifySpy.restore() + } + }) }) describe('sendCommandAwaitReply', async function () { diff --git a/test/unit/forge/db/controllers/Project_spec.js b/test/unit/forge/db/controllers/Project_spec.js index f9d77922dd..cc18252cef 100644 --- a/test/unit/forge/db/controllers/Project_spec.js +++ b/test/unit/forge/db/controllers/Project_spec.js @@ -1,6 +1,8 @@ const crypto = require('crypto') const should = require('should') // eslint-disable-line +const sinon = require('sinon') + const { encryptCreds, decryptCreds } = require('../../../../lib/credentials') const setup = require('../setup') // const FF_UTIL = require('flowforge-test-utils') @@ -525,5 +527,44 @@ describe('Project controller', function () { tempResult = await app.db.controllers.Project.getLatestProjectState('project-id') should(tempResult).equal('stopped') }) + + it('broadcasts a launcher-reported state change to the team channel, only on change', async function () { + const notifySpy = sinon.spy(app.comms.team, 'notifyInstanceStatus') + const team = await app.db.models.Team.create({ name: 'Cache Status Team', TeamTypeId: 1 }) + const instance = await app.db.models.Project.create({ name: 'cache-status-p1', type: '', url: '', state: 'running', TeamId: team.id }) + notifySpy.resetHistory() // ignore the create-time save + try { + // running -> stopped is a change + await app.db.controllers.Project.updateLatestProjectState(instance.id, 'stopped') + notifySpy.calledOnce.should.be.true() + notifySpy.firstCall.args.should.eql([app.db.models.Team.encodeHashid(team.id), instance.id, 'stopped']) + + // same state again -> no further notification + await app.db.controllers.Project.updateLatestProjectState(instance.id, 'stopped') + notifySpy.calledOnce.should.be.true() + } finally { + notifySpy.restore() + } + }) + + it('broadcasts inflight transition states, then reverts when cleared', async function () { + const notifySpy = sinon.spy(app.comms.team, 'notifyInstanceStatus') + const team = await app.db.models.Team.create({ name: 'Inflight Status Team', TeamTypeId: 1 }) + const instance = await app.db.models.Project.create({ name: 'inflight-status-p1', type: '', url: '', state: 'running', TeamId: team.id }) + const teamHash = app.db.models.Team.encodeHashid(team.id) + notifySpy.resetHistory() // ignore the create-time save + try { + // inflight wins over the db/latest state + await app.db.controllers.Project.setInflightState(instance, 'starting') + notifySpy.calledWith(teamHash, instance.id, 'starting').should.be.true() + + // clearing reverts to the effective db state (running) + notifySpy.resetHistory() + await app.db.controllers.Project.clearInflightState(instance) + notifySpy.calledWith(teamHash, instance.id, 'running').should.be.true() + } finally { + notifySpy.restore() + } + }) }) }) diff --git a/test/unit/forge/db/models/Project_spec.js b/test/unit/forge/db/models/Project_spec.js index 06de3b979d..7d6c2e77a3 100644 --- a/test/unit/forge/db/models/Project_spec.js +++ b/test/unit/forge/db/models/Project_spec.js @@ -666,4 +666,40 @@ describe('Project model', function () { await team.destroy() }) }) + + describe('Status change broadcast (afterSave hook)', function () { + let app + let notifySpy + before(async function () { + app = await setup({ limits: { instances: 20 } }) + }) + after(async function () { + await app.close() + }) + beforeEach(function () { + notifySpy = sinon.spy(app.comms.team, 'notifyInstanceStatus') + }) + afterEach(function () { + notifySpy.restore() + }) + + it('notifies the team channel when an instance state changes on save', async function () { + const team = await app.db.models.Team.create({ name: 'Broadcast Team', TeamTypeId: 1 }) + const instance = await app.db.models.Project.create({ name: 'broadcast-p1', type: '', url: '', state: 'running', TeamId: team.id }) + notifySpy.resetHistory() // ignore the create-time save + instance.state = 'suspended' + await instance.save() + notifySpy.calledOnce.should.be.true() + notifySpy.firstCall.args.should.eql([app.db.models.Team.encodeHashid(team.id), instance.id, 'suspended']) + }) + + it('does not notify when a save leaves state unchanged', async function () { + const team = await app.db.models.Team.create({ name: 'Broadcast Team 2', TeamTypeId: 1 }) + const instance = await app.db.models.Project.create({ name: 'broadcast-p2', type: '', url: '', state: 'suspended', TeamId: team.id }) + notifySpy.resetHistory() + instance.name = 'broadcast-p2-renamed' // name change is allowed while suspended + await instance.save() + notifySpy.called.should.be.false() + }) + }) }) diff --git a/test/unit/frontend/stores/live-status.spec.js b/test/unit/frontend/stores/live-status.spec.js new file mode 100644 index 0000000000..3fbcfd0fab --- /dev/null +++ b/test/unit/frontend/stores/live-status.spec.js @@ -0,0 +1,76 @@ +import { createPinia, setActivePinia } from 'pinia' +import { beforeEach, describe, expect, it } from 'vitest' + +import { useLiveStatusStore } from '@/stores/live-status' + +describe('live-status store', () => { + beforeEach(() => { + setActivePinia(createPinia()) + }) + + it('starts empty and not live', () => { + const store = useLiveStatusStore() + expect(store.instanceStatuses).toEqual({}) + expect(store.deviceStatuses).toEqual({}) + expect(store.live).toBe(false) + }) + + describe('setInstanceStatus / setDeviceStatus', () => { + it('records instance status and exposes it via map and getter', () => { + const store = useLiveStatusStore() + store.setInstanceStatus('inst-1', 'running') + expect(store.instanceStatuses['inst-1']).toBe('running') + expect(store.getInstanceStatus('inst-1')).toBe('running') + }) + + it('records device status independently of instance status', () => { + const store = useLiveStatusStore() + store.setInstanceStatus('inst-1', 'running') + store.setDeviceStatus('dev-1', 'stopped') + expect(store.deviceStatuses['dev-1']).toBe('stopped') + expect(store.getDeviceStatus('dev-1')).toBe('stopped') + // the two maps don't bleed into each other + expect(store.instanceStatuses['dev-1']).toBeUndefined() + expect(store.deviceStatuses['inst-1']).toBeUndefined() + }) + + it('overwrites an existing id with the latest state', () => { + const store = useLiveStatusStore() + store.setInstanceStatus('inst-1', 'running') + store.setInstanceStatus('inst-1', 'suspended') + expect(store.instanceStatuses['inst-1']).toBe('suspended') + }) + + it('returns undefined for an unknown id', () => { + const store = useLiveStatusStore() + expect(store.getInstanceStatus('nope')).toBeUndefined() + expect(store.getDeviceStatus('nope')).toBeUndefined() + }) + }) + + describe('setLive', () => { + it('toggles the live flag', () => { + const store = useLiveStatusStore() + store.setLive(true) + expect(store.live).toBe(true) + store.setLive(false) + expect(store.live).toBe(false) + }) + }) + + describe('clear', () => { + it('resets both maps and the live flag (team-switch teardown)', () => { + const store = useLiveStatusStore() + store.setInstanceStatus('inst-1', 'running') + store.setDeviceStatus('dev-1', 'stopped') + store.setLive(true) + + store.clear() + + expect(store.instanceStatuses).toEqual({}) + expect(store.deviceStatuses).toEqual({}) + expect(store.live).toBe(false) + expect(store.getInstanceStatus('inst-1')).toBeUndefined() + }) + }) +}) diff --git a/test/unit/frontend/subscribers/team-channel.subscriber.spec.js b/test/unit/frontend/subscribers/team-channel.subscriber.spec.js index 8a7596461b..94e2d61b4a 100644 --- a/test/unit/frontend/subscribers/team-channel.subscriber.spec.js +++ b/test/unit/frontend/subscribers/team-channel.subscriber.spec.js @@ -6,12 +6,18 @@ const refreshTeam = vi.fn().mockResolvedValue(undefined) const onTeamChannelMembership = vi.fn().mockResolvedValue(undefined) const useContextStore = vi.fn(() => ({ refreshTeam, onTeamChannelMembership })) const useAccountAuthStore = vi.fn(() => ({ user: { id: 'user-hashid-1' }, getSessionId: () => 'session-test-id' })) +const setInstanceStatus = vi.fn() +const setDeviceStatus = vi.fn() +const setLive = vi.fn() +const clear = vi.fn() +const useLiveStatusStore = vi.fn(() => ({ setInstanceStatus, setDeviceStatus, setLive, clear })) vi.mock('@/api/team.js', () => ({ default: { getTeamCommsCreds: (...args) => getTeamCommsCreds(...args) } })) vi.mock('@/stores/context.js', () => ({ useContextStore })) vi.mock('@/stores/account-auth.js', () => ({ useAccountAuthStore })) +vi.mock('@/stores/live-status', () => ({ useLiveStatusStore })) function makeTransport () { return { @@ -45,6 +51,10 @@ describe('TeamChannelSubscriber', async () => { onTeamChannelMembership.mockClear() useContextStore.mockClear() useAccountAuthStore.mockClear().mockReturnValue({ user: { id: 'user-hashid-1' }, getSessionId: () => 'session-test-id' }) + setInstanceStatus.mockClear() + setDeviceStatus.mockClear() + setLive.mockClear() + clear.mockClear() await destroyTeamChannelSubscriber() }) @@ -145,7 +155,7 @@ describe('TeamChannelSubscriber', async () => { }) describe('subscribe on connect', () => { - test('subscribes to t/updated and membership topics with qos 1', async () => { + test('subscribes to t/updated, membership and the status wildcards with qos 1', async () => { const { subscriber, transport } = createSubscriber() let onConnect transport.connect.mockImplementation(async (_key, opts) => { @@ -155,11 +165,39 @@ describe('TeamChannelSubscriber', async () => { await onConnect() expect(transport.subscribe).toHaveBeenCalledWith( 'team:team-1', - ['ff/v1/team-1/t/updated', 'ff/v1/team-1/u/user-hashid-1/membership'], + [ + 'ff/v1/team-1/t/updated', + 'ff/v1/team-1/u/user-hashid-1/membership', + 'ff/v1/team-1/p/+/status', + 'ff/v1/team-1/d/+/status' + ], { qos: 1 } ) }) + test('marks the status channel live once subscribed', async () => { + const { subscriber, transport } = createSubscriber() + let onConnect + transport.connect.mockImplementation(async (_key, opts) => { + onConnect = opts.onConnect + }) + await subscriber.connect({ id: 'team-1' }) + await onConnect() + expect(setLive).toHaveBeenCalledWith(true) + }) + + test('does not mark live when the subscribe fails', async () => { + const { subscriber, transport } = createSubscriber() + transport.subscribe.mockRejectedValue(new Error('subscribe failed')) + let onConnect + transport.connect.mockImplementation(async (_key, opts) => { + onConnect = opts.onConnect + }) + await subscriber.connect({ id: 'team-1' }) + await onConnect() + expect(setLive).not.toHaveBeenCalled() + }) + test('swallows subscribe errors (transport reconnect will retry)', async () => { const { subscriber, transport } = createSubscriber() transport.subscribe.mockRejectedValue(new Error('subscribe failed')) @@ -215,6 +253,28 @@ describe('TeamChannelSubscriber', async () => { expect(refreshTeam).not.toHaveBeenCalled() expect(onTeamChannelMembership).not.toHaveBeenCalled() }) + + test('instance status routes id + state into setInstanceStatus', async () => { + const { onMessage } = await connectAndCaptureOnMessage() + onMessage('ff/v1/team-1/p/inst-1/status', Buffer.from(JSON.stringify({ id: 'inst-1', meta: { state: 'running' } }))) + expect(setInstanceStatus).toHaveBeenCalledWith('inst-1', 'running') + expect(setDeviceStatus).not.toHaveBeenCalled() + }) + + test('device status routes id + state into setDeviceStatus', async () => { + const { onMessage } = await connectAndCaptureOnMessage() + onMessage('ff/v1/team-1/d/dev-1/status', Buffer.from(JSON.stringify({ id: 'dev-1', meta: { state: 'stopped' } }))) + expect(setDeviceStatus).toHaveBeenCalledWith('dev-1', 'stopped') + expect(setInstanceStatus).not.toHaveBeenCalled() + }) + + test('ignores a status message missing id or state', async () => { + const { onMessage } = await connectAndCaptureOnMessage() + onMessage('ff/v1/team-1/p/inst-1/status', Buffer.from(JSON.stringify({ id: 'inst-1' }))) + onMessage('ff/v1/team-1/d/dev-1/status', Buffer.from(JSON.stringify({ meta: { state: 'running' } }))) + expect(setInstanceStatus).not.toHaveBeenCalled() + expect(setDeviceStatus).not.toHaveBeenCalled() + }) }) describe('disconnect / destroy', () => { @@ -227,10 +287,19 @@ describe('TeamChannelSubscriber', async () => { expect(subscriber.isConnected()).toBe(false) }) + test('disconnect clears the live-status store (no prior-team leak)', async () => { + const { subscriber, transport } = createSubscriber() + transport.connect.mockResolvedValue(undefined) + await subscriber.connect({ id: 'team-1' }) + await subscriber.disconnect() + expect(clear).toHaveBeenCalledTimes(1) + }) + test('disconnect is a no-op when not connected', async () => { const { subscriber, transport } = createSubscriber() await subscriber.disconnect() expect(transport.disconnect).not.toHaveBeenCalled() + expect(clear).not.toHaveBeenCalled() }) test('destroy disconnects the active client', async () => { From 4b63e35c05f71ddd61f590210632134c4b87b3f6 Mon Sep 17 00:00:00 2001 From: Noley Holland Date: Mon, 22 Jun 2026 11:12:04 -0700 Subject: [PATCH 02/15] Serialize publishLiveState per project so concurrent state changes can't intermix and broadcast a stale instance status --- forge/db/controllers/Project.js | 17 ++++++++++++- .../unit/forge/db/controllers/Project_spec.js | 25 ++++++++++++++++++- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/forge/db/controllers/Project.js b/forge/db/controllers/Project.js index c0e73b56d1..c5b2232c3f 100644 --- a/forge/db/controllers/Project.js +++ b/forge/db/controllers/Project.js @@ -16,6 +16,8 @@ const inflightDeploys = 'project-inflightDeploys' const lastPublishedProjectState = 'project-lastPublishedProjectState' +const publishLocks = new Map() + module.exports = { init (app) { @@ -25,9 +27,22 @@ module.exports = { app.caches.createCache(lastPublishedProjectState) }, - // NOLEY TODO: this has issues with restarting states + // serialized per project so concurrent callers can't intermix and strand a stale state publishLiveState: async function (app, project) { if (!app.comms || !project?.TeamId) return + const key = project.id + const previous = publishLocks.get(key) ?? Promise.resolve() + const current = previous.catch(() => {}).then(() => this._publishLiveStateUnlocked(app, project)) + const tracked = current.finally(() => { + if (publishLocks.get(key) === tracked) { + publishLocks.delete(key) + } + }) + publishLocks.set(key, tracked) + return tracked + }, + + _publishLiveStateUnlocked: async function (app, project) { const inflight = await this.getInflightState(app, project) const latest = await this.getLatestProjectState(app, project.id) const effective = inflight ?? (project.state === 'suspended' ? 'suspended' : (latest ?? project.state)) diff --git a/test/unit/forge/db/controllers/Project_spec.js b/test/unit/forge/db/controllers/Project_spec.js index cc18252cef..94302ace8f 100644 --- a/test/unit/forge/db/controllers/Project_spec.js +++ b/test/unit/forge/db/controllers/Project_spec.js @@ -14,7 +14,8 @@ describe('Project controller', function () { before(async function () { app = await setup({ limits: { - instances: 50 + instances: 50, + teams: 50 } }) }) @@ -566,5 +567,27 @@ describe('Project controller', function () { notifySpy.restore() } }) + + it('serializes concurrent publishLiveState so a state is broadcast once', async function () { + const notifySpy = sinon.spy(app.comms.team, 'notifyInstanceStatus') + const team = await app.db.models.Team.create({ name: 'Serialize Status Team', TeamTypeId: 1 }) + const instance = await app.db.models.Project.create({ name: 'serialize-status-p1', type: '', url: '', state: 'running', TeamId: team.id }) + notifySpy.resetHistory() // ignore the create-time save (cached 'running') + try { + // make the effective state 'stopped' without publishing it + await app.db.controllers.Project.setLatestProjectState(instance.id, 'stopped') + // two concurrent publishes: without serialization both read the stale + // 'running' cache and broadcast 'stopped' twice; serialized, the second + // sees the first's write and skips + await Promise.all([ + app.db.controllers.Project.publishLiveState(instance), + app.db.controllers.Project.publishLiveState(instance) + ]) + notifySpy.calledOnce.should.be.true() + notifySpy.firstCall.args.should.eql([app.db.models.Team.encodeHashid(team.id), instance.id, 'stopped']) + } finally { + notifySpy.restore() + } + }) }) }) From bead91a11ef696cef741a22dde036da7fa7b0412 Mon Sep 17 00:00:00 2001 From: Noley Holland Date: Mon, 22 Jun 2026 11:45:36 -0700 Subject: [PATCH 03/15] Wire instance consumers to live MQTT status and gate their pollers on the connection --- frontend/src/mixins/Instance.js | 14 +++++++++++- frontend/src/pages/application/index.vue | 22 ++++++++++++++++++- frontend/src/pages/instance/Editor/index.vue | 1 + frontend/src/pages/instance/index.vue | 2 +- .../components/compact/InstanceTile.vue | 16 ++++++++++++-- 5 files changed, 50 insertions(+), 5 deletions(-) diff --git a/frontend/src/mixins/Instance.js b/frontend/src/mixins/Instance.js index 7a2d00b903..e82f4eb717 100644 --- a/frontend/src/mixins/Instance.js +++ b/frontend/src/mixins/Instance.js @@ -9,10 +9,12 @@ import { InstanceStateMutator } from '../utils/InstanceStateMutator.js' import { useAccountStore } from '@/stores/account.js' import { useContextStore } from '@/stores/context.js' +import { useLiveStatusStore } from '@/stores/live-status' export default { computed: { ...mapState(useContextStore, ['team']), + ...mapState(useLiveStatusStore, { liveInstanceStatuses: 'instanceStatuses', statusChannelLive: 'live' }), instanceRunning () { return this.instance?.meta?.state === 'running' }, @@ -37,10 +39,20 @@ export default { instance (instance) { this.instanceChanged() this.setContextualInstance(instance) - } + }, + liveInstanceStatuses: { handler: 'applyLiveStatus', deep: true } }, methods: { ...mapActions(useContextStore, { setContextualInstance: 'setInstance' }), + applyLiveStatus () { + const state = this.liveInstanceStatuses[this.instance?.id] + if (!state || this.instance?.meta?.state === state) return + this.instance = { + ...this.instance, + status: state, + meta: { ...(this.instance.meta || {}), state } + } + }, showConfirmDeleteDialog () { this.$refs.confirmInstanceDeleteDialog.show(this.instance) }, diff --git a/frontend/src/pages/application/index.vue b/frontend/src/pages/application/index.vue index 5a9cafe6ee..e6fac4ce1b 100644 --- a/frontend/src/pages/application/index.vue +++ b/frontend/src/pages/application/index.vue @@ -25,7 +25,9 @@ @instance-delete="instanceShowConfirmDelete" /> - + @@ -45,6 +47,7 @@ import ConfirmApplicationDeleteDialog from './Settings/dialogs/ConfirmApplicatio import { useAccountSettingsStore } from '@/stores/account-settings.js' import { useContextStore } from '@/stores/context.js' +import { useLiveStatusStore } from '@/stores/live-status' export default { name: 'ApplicationPage', @@ -62,6 +65,7 @@ export default { computed: { ...mapState(useContextStore, ['team']), ...mapState(useAccountSettingsStore, ['features']), + ...mapState(useLiveStatusStore, { liveInstanceStatuses: 'instanceStatuses', statusChannelLive: 'live' }), navigation () { const routes = [ { @@ -133,6 +137,22 @@ export default { '$route.params': { handler: 'updateApplication', immediate: true + }, + liveInstanceStatuses: { handler: 'applyLiveStatus', deep: true } + }, + methods: { + applyLiveStatus () { + for (const id of this.applicationInstances.keys()) { + const state = this.liveInstanceStatuses[id] + if (!state) continue + const row = this.applicationInstances.get(id) + if (row?.status === state && row?.meta?.state === state) continue + this.applicationInstances.set(id, { + ...row, + status: state, + meta: { ...(row?.meta || {}), state } + }) + } } } } diff --git a/frontend/src/pages/instance/Editor/index.vue b/frontend/src/pages/instance/Editor/index.vue index 82e9ae15dd..c771fa66e3 100644 --- a/frontend/src/pages/instance/Editor/index.vue +++ b/frontend/src/pages/instance/Editor/index.vue @@ -43,6 +43,7 @@ diff --git a/frontend/src/pages/instance/index.vue b/frontend/src/pages/instance/index.vue index 7f4cd6b85d..0920682f50 100644 --- a/frontend/src/pages/instance/index.vue +++ b/frontend/src/pages/instance/index.vue @@ -70,7 +70,7 @@ /> - + diff --git a/frontend/src/pages/team/Applications/components/compact/InstanceTile.vue b/frontend/src/pages/team/Applications/components/compact/InstanceTile.vue index 6c3ecc9e1f..f1068a9467 100644 --- a/frontend/src/pages/team/Applications/components/compact/InstanceTile.vue +++ b/frontend/src/pages/team/Applications/components/compact/InstanceTile.vue @@ -82,7 +82,7 @@ - +