Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
eba27e4
Replace instance status polling with live MQTT updates, send state to…
n-lark Jun 18, 2026
fd04a6f
Merge branch 'main' into 7324-mqtt/ws-device-instances
n-lark Jun 22, 2026
4b63e35
Serialize publishLiveState per project so concurrent state changes ca…
n-lark Jun 22, 2026
bead91a
Wire instance consumers to live MQTT status and gate their pollers on…
n-lark Jun 22, 2026
dcd9d6b
Fix failing spec
n-lark Jun 22, 2026
d6dd028
Rename frontend live-state topics to /state for both instances and de…
n-lark Jun 22, 2026
e593cf9
Wire device consumers to live MQTT status and gate their pollers on t…
n-lark Jun 22, 2026
90e1f37
Rename status-broadcast publishers + ACL verify to state so naming ma…
n-lark Jun 22, 2026
75f93f8
Fix transient stopped flash on restart/start and stuck actions menu a…
n-lark Jun 23, 2026
976637f
Mask transient stopped during device restart and consolidate state-tr…
n-lark Jun 23, 2026
817af8d
Fix the stopped flash sticking under polling, stop rollbacks from fla…
n-lark Jun 23, 2026
57247d8
Stop failed starts from hanging on starting and add tests for the fla…
n-lark Jun 23, 2026
881b1c1
Merge branch 'main' into 7324-mqtt/ws-device-instances
n-lark Jun 23, 2026
6ba8c73
Add additional specs to cover all instance flows
n-lark Jun 23, 2026
9b85f31
Trim redundant specs
n-lark Jun 24, 2026
591ad6c
Remove setTimeout for clearInflightStateIfStill
n-lark Jun 24, 2026
92e5a22
Report running from stub restartFlows so restarts clear the inflight …
n-lark Jun 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion forge/comms/aclManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,29 @@ module.exports = function (app) {
return false
}
},
checkTeamStateSub: async function (requestParts, usernameParts) {
// requestParts = [ fullTopic , <teamHash> , <entityId|'+'> ]
// usernameParts = [ 'fe-team', <userHash>, <teamHash>, <sessionId> ]
// team-wide status wildcard: gate on team membership only. requestParts[2]
// is the instance/device id (or '+'), never the user — don't validate it.
const topicTeamHash = requestParts[1]
const usernameUserHash = usernameParts[1]
const usernameTeamHash = usernameParts[2]
if (topicTeamHash !== usernameTeamHash) {
return false
}
try {
const team = await app.db.models.Team.byId(usernameTeamHash)
if (!team) return false
const user = await app.db.models.User.byId(usernameUserHash)
if (!user) return false
const membership = await app.db.models.TeamMember.getTeamMembership(user.id, team.id, false)
return !!membership
} catch (error) {
app.log.error('Unexpected error during team-channel status ACL check', { requestParts, usernameParts, error })
return false
}
},
checkExpertTopic: async function (topicParts, usernameParts, acl) {
// topicParts = [ fullTopic , <userid>, <sessionid>, <entityType>, <entityId> [, <inflightType>] ]
// usernameParts = [ 'expert-client' | 'expert-agent', <userid> [, <sessionid>] ]
Expand Down Expand Up @@ -333,6 +356,10 @@ module.exports = function (app) {
{ topic: /^ff\/v1\/[^/]+\/t\/updated$/ },
// - ff/v1/<team>/u/<user>/membership
{ topic: /^ff\/v1\/[^/]+\/u\/[^/]+\/membership$/ },
// - ff/v1/<team>/p/<instance>/state
{ topic: /^ff\/v1\/[^/]+\/p\/[^/]+\/state$/ },
// - ff/v1/<team>/d/<device>/state
{ topic: /^ff\/v1\/[^/]+\/d\/[^/]+\/state$/ },

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So devices already publish their raw status to the …/status topics, which forge subscribes to in order to collect it.

If forge re-published the browser-facing copy onto that same …/status topic, it would receive its own message back and loop forever, so the browser updates use a separate …/state topic that nothing on the backend listens to.

Also named instances .../state for consistency. I was also considering .../liveStatus to imply FE, not sold on the name.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

differentiating the topics to avoid loopbacks sounds sensible, the state name for consistency too

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also like the liveStatus approach but in the live-state form to adhere to the ui naming and topic format, up to you if you want to change it

// ff/v1/platform/sync
{ topic: /^ff\/v1\/platform\/sync$/ },
// ff/v1/platform/leader
Expand Down Expand Up @@ -396,7 +423,11 @@ module.exports = function (app) {
// - ff/v1/<team>/t/updated
{ topic: /^ff\/v1\/([^/]+)\/t\/updated$/, verify: 'checkUserIsTeamMember' },
// - ff/v1/<team>/u/<user>/membership
{ topic: /^ff\/v1\/([^/]+)\/u\/([^/]+)\/membership$/, verify: 'checkUserIsTeamMember' }
{ topic: /^ff\/v1\/([^/]+)\/u\/([^/]+)\/membership$/, verify: 'checkUserIsTeamMember' },
// - ff/v1/<team>/p/+/state
{ topic: /^ff\/v1\/([^/]+)\/p\/([^/]+)\/state$/, verify: 'checkTeamStateSub' },
// - ff/v1/<team>/d/+/state
{ topic: /^ff\/v1\/([^/]+)\/d\/([^/]+)\/state$/, verify: 'checkTeamStateSub' }
],
pub: []
},
Expand Down
8 changes: 8 additions & 0 deletions forge/comms/devices.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ class DeviceCommsHandler {
const teamId = this.app.db.models.Team.encodeHashid(device.TeamId)
const startTime = Date.now()
try {
const previousState = device.state
const payload = JSON.parse(status.status)
if (previousState === 'restarting' && payload?.state === 'stopped') {
payload.state = 'restarting'
}
await this.app.db.controllers.Device.updateState(device, payload)

if (payload === null) {
Expand All @@ -130,6 +134,10 @@ class DeviceCommsHandler {
return
}

if (payload.state !== previousState) {
this.app.comms.team.notifyDeviceState(teamId, status.id, payload.state)
}

// If the status state===unknown, the device is waiting for confirmation
// it has the right details. Always response with an 'update' command in
// this scenario
Expand Down
8 changes: 8 additions & 0 deletions forge/comms/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ module.exports = fp(async function (app, _opts) {
if (!teamHash || !userHash) return
const msg = { reason: reason || null, srcId: srcId || null }
client.publish(`ff/v1/${teamHash}/u/${userHash}/membership`, JSON.stringify(msg))
},
notifyDeviceState: function (teamHash, id, state) {
if (!teamHash || !id) return
client.publish(`ff/v1/${teamHash}/d/${id}/state`, JSON.stringify({ id, meta: { state } }))
},
notifyInstanceState: function (teamHash, id, state) {
if (!teamHash || !id) return
client.publish(`ff/v1/${teamHash}/p/${id}/state`, JSON.stringify({ id, meta: { state } }))
}
}
})
Expand Down
15 changes: 14 additions & 1 deletion forge/containers/stub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,12 @@ module.exports = {
} else {
const startTime = project.name === 'stub-slow-start' ? 6000 : module.exports.START_DELAY
return new Promise((resolve, reject) => {
setTimeout(() => {
setTimeout(async () => {
list[project.id].state = 'running'
// mirror a real launcher reporting its settled state so inflight clears via the confirm path
try {
await this._app.db.controllers.Project.updateLatestProjectState(project.id, 'running')
} catch (err) {}
resolve()
}, startTime)
})
Expand Down Expand Up @@ -218,6 +222,15 @@ module.exports = {
*/
restartFlows: async (project, options) => {
this._app.log.info(`[stub driver] Restarting flows ${project.id}`)
// mirror a real launcher reporting running shortly after the bounce, so the restarting mask clears
setTimeout(async () => {
if (list[project.id]) {
list[project.id].state = 'running'
}
try {
await this._app.db.controllers.Project.updateLatestProjectState(project.id, 'running')
} catch (err) {}
}, module.exports.START_DELAY)
},

/**
Expand Down
47 changes: 47 additions & 0 deletions forge/db/controllers/Project.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,47 @@ const latestProjectState = 'project-latestProjectState'

const inflightDeploys = 'project-inflightDeploys'

const lastPublishedProjectState = 'project-lastPublishedProjectState'

const publishLocks = new Map()

module.exports = {

init (app) {
app.caches.createCache(inflightProjectState)
app.caches.createCache(latestProjectState)
app.caches.createCache(inflightDeploys)
app.caches.createCache(lastPublishedProjectState)
},

// serialized per project so concurrent callers can't intermix and strand a stale state
publishLiveState: async function (app, project) {
if (!app.comms || !project?.TeamId) return
const key = project.id
const previous = publishLocks.get(key) ?? Promise.resolve()
const current = previous.catch(() => {}).then(() => this._publishLiveStateUnlocked(app, project))
const tracked = current.finally(() => {
if (publishLocks.get(key) === tracked) {
publishLocks.delete(key)
}
})
publishLocks.set(key, tracked)
return tracked
},

_publishLiveStateUnlocked: async function (app, project) {
try {
const inflight = await this.getInflightState(app, project)
const latest = await this.getLatestProjectState(app, project.id)
const effective = inflight ?? (project.state === 'suspended' ? 'suspended' : (latest ?? project.state))
if (!effective) return
const cache = app.caches.getCache(lastPublishedProjectState)
if (await cache.get(project.id) === effective) return
await cache.set(project.id, effective)
app.comms.team.notifyInstanceState(app.db.models.Team.encodeHashid(project.TeamId), project.id, effective)
} catch (err) {
app.log.warn(`Failed to broadcast live state for ${project.id}: ${err.toString()}`)
}
},

getProjectPaginationOptions: function (app, request) {
Expand Down Expand Up @@ -58,6 +93,7 @@ module.exports = {
*/
setInflightState: async function (app, project, state) {
await app.caches.getCache(inflightProjectState).set(project.id, state)
await this.publishLiveState(app, project)
},

/**
Expand Down Expand Up @@ -87,6 +123,7 @@ module.exports = {
clearInflightState: async function (app, project) {
await app.caches.getCache(inflightProjectState).del(project.id)
await app.caches.getCache(inflightDeploys).del(project.id)
await this.publishLiveState(app, project)
},

/**
Expand Down Expand Up @@ -810,6 +847,16 @@ module.exports = {
} else {
await this.setLatestProjectState(app, projectId, state)
}
const project = await app.db.models.Project.byId(projectId, { barebone: true })
if (project) {
const inflight = await this.getInflightState(app, project)
const isTransition = inflight === 'starting' || inflight === 'restarting'
if (isTransition && ['running', 'safe', 'crashed'].includes(state)) {
await this.clearInflightState(app, project)
} else if (app.comms) {
await this.publishLiveState(app, project)
}
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions forge/db/models/Project.js
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ module.exports = {
}
}
},
afterSave: async (project, opts) => {
if (project.changed('state')) {
await Controllers.Project.publishLiveState(project)
}
},
afterDestroy: async (project, opts) => {
await M.AccessToken.destroy({
where: {
Expand Down
13 changes: 9 additions & 4 deletions forge/routes/api/projectActions.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* @namespace project
* @memberof forge.routes.api
*/

module.exports = async function (app) {
app.post('/start', {
preHandler: app.needsPermission('project:change-status'),
Expand Down Expand Up @@ -50,8 +51,11 @@ module.exports = async function (app) {
await app.db.controllers.Project.setInflightState(request.project, 'starting')
const startResult = await app.containers.start(request.project)
startResult.started.then(async () => {
await app.auditLog.Project.project.started(request.session.User, null, request.project)
await app.db.controllers.Project.clearInflightState(request.project)
if (request.project.state === 'suspended') {
await app.db.controllers.Project.clearInflightState(request.project)
} else {
await app.auditLog.Project.project.started(request.session.User, null, request.project)
}
return true
}).catch(err => {
app.log.info(`failed to start project ${request.project.id}`)
Expand Down Expand Up @@ -154,7 +158,6 @@ module.exports = async function (app) {
await request.project.save()
await app.containers.restartFlows(request.project)
await app.auditLog.Project.project.restarted(request.session.User, null, request.project)
await app.db.controllers.Project.clearInflightState(request.project)
reply.send({ status: 'okay' })
} catch (err) {
await app.db.controllers.Project.clearInflightState(request.project)
Expand Down Expand Up @@ -255,10 +258,12 @@ module.exports = async function (app) {
}
await app.db.controllers.Project.setInflightState(request.project, 'rollback')
await app.db.controllers.Project.importProjectSnapshot(request.project, snapshot)
await app.db.controllers.Project.clearInflightState(request.project)
await app.auditLog.Project.project.snapshot.rolledBack(request.session.User, null, request.project, snapshot)
if (restartProject) {
await app.db.controllers.Project.setInflightState(request.project, 'restarting')
await app.containers.restartFlows(request.project)
} else {
await app.db.controllers.Project.clearInflightState(request.project)
}
reply.send({ status: 'okay' })
} catch (err) {
Expand Down
26 changes: 25 additions & 1 deletion frontend/src/components/DevicesBrowser.vue
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ import RemoveDeviceFromGroupDialog from './dialogs/device-group-management/Remov

import { useAccountSettingsStore } from '@/stores/account-settings.js'
import { useContextStore } from '@/stores/context.js'
import { useLiveStatusStore } from '@/stores/live-status'
import { useUxDialogStore } from '@/stores/ux-dialog.js'
import { useUxToursStore } from '@/stores/ux-tours.js'

Expand Down Expand Up @@ -468,6 +469,7 @@ export default {
computed: {
...mapState(useContextStore, ['team']),
...mapState(useAccountSettingsStore, ['featuresCheck']),
...mapState(useLiveStatusStore, { liveDeviceStatuses: 'deviceStatuses', statusChannelLive: 'live' }),
...mapState(useUxDialogStore, ['dialog']),
...mapState(useUxToursStore, ['tours']),
columns () {
Expand Down Expand Up @@ -604,11 +606,19 @@ export default {
if (this.dialog?.is?.payload?.devices) {
this.setDialogDevices(devices)
}
},
liveDeviceStatuses: { handler: 'applyLiveStatus', deep: true },
statusChannelLive (live) {
if (live) {
this.pollTimer?.stop()
} else {
this.pollTimer?.start()
}
}
},
mounted () {
this.fullReloadOfData()
this.pollTimer = createPollTimer(this.pollTimerElapsed, POLL_TIME) // auto starts
this.pollTimer = createPollTimer(this.pollTimerElapsed, POLL_TIME, !this.statusChannelLive)
},
async unmounted () {
this.pollTimer.stop()
Expand All @@ -620,6 +630,20 @@ export default {
},
methods: {
...mapActions(useUxDialogStore, ['setDialogDevices']),
applyLiveStatus () {
for (const id of this.allDeviceStatuses.keys()) {
const state = this.liveDeviceStatuses[id]
if (!state) continue
const statusObj = this.allDeviceStatuses.get(id)
if (statusObj.status !== state) {
this.allDeviceStatuses.set(id, { ...statusObj, status: state })
}
const device = this.devices.get(id)
if (device && device.status !== state) {
this.devices.set(id, { ...device, status: state })
}
}
},
pollTimerElapsed: async function () {
this.pollTimer.pause()
try {
Expand Down
12 changes: 2 additions & 10 deletions frontend/src/composables/DeviceHelper.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,13 @@ import deviceApi from '../api/devices.js'
import Alerts from '../services/alerts.js'
import Dialog from '../services/dialog.js'
import { DeviceStateMutator } from '../utils/DeviceStateMutator.js'
import { isTransitionState } from '../utils/stateTransitions.js'
import { createPollTimer } from '../utils/timers.js'

import { useContextStore } from '@/stores/context.js'

// constants
const POLL_TIME = 5000
const deviceTransitionStates = [
'loading',
'installing',
'starting',
'stopping',
'restarting',
'suspending',
'importing'
]

export function useDeviceHelper () {
const $router = useRouter()
Expand All @@ -33,7 +25,7 @@ export function useDeviceHelper () {
// duplicated functionality because the pollTimer is not reactive
const isPolling = ref(false)

const isInTransitionState = computed(() => deviceTransitionStates.includes(device.value.status))
const isInTransitionState = computed(() => isTransitionState(device.value.status))

const agentSupportsDeviceAccess = computed(() =>
device.value?.agentVersion && semver.gte(device.value?.agentVersion, '0.8.0')
Expand Down
16 changes: 15 additions & 1 deletion frontend/src/mixins/Instance.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import { InstanceStateMutator } from '../utils/InstanceStateMutator.js'

import { useAccountStore } from '@/stores/account.js'
import { useContextStore } from '@/stores/context.js'
import { useLiveStatusStore } from '@/stores/live-status'

export default {
computed: {
...mapState(useContextStore, ['team']),
...mapState(useLiveStatusStore, { liveInstanceStatuses: 'instanceStatuses', statusChannelLive: 'live' }),
instanceRunning () {
return this.instance?.meta?.state === 'running'
},
Expand All @@ -37,10 +39,22 @@ export default {
instance (instance) {
this.instanceChanged()
this.setContextualInstance(instance)
}
},
liveInstanceStatuses: { handler: 'applyLiveStatus', deep: true }
},
methods: {
...mapActions(useContextStore, { setContextualInstance: 'setInstance' }),
applyLiveStatus () {
const state = this.liveInstanceStatuses[this.instance?.id]
if (!state || this.instance?.meta?.state === state) return
this.instance = {
...this.instance,
status: state,
meta: { ...(this.instance.meta || {}), state },
optimisticStateChange: false,
pendingStateChange: false
}
},
showConfirmDeleteDialog () {
this.$refs.confirmInstanceDeleteDialog.show(this.instance)
},
Expand Down
Loading
Loading