Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions apps/memos-local-openclaw/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,19 @@ const pluginConfigSchema = {
},
};

// ─── Module-level singleton guard ───
// Prevents duplicate viewers/stores when OpenClaw calls register() multiple
// times for the SAME stateDir (e.g. deferred reload, gateway restart).
// Keyed by stateDir so that truly independent instances (different data dirs,
// as in tests) are not accidentally torn down.
const activeInstances = new Map<string, {
viewer: InstanceType<typeof ViewerServer>;
hubServer: InstanceType<typeof HubServer> | null;
worker: InstanceType<typeof IngestWorker>;
store: InstanceType<typeof SqliteStore>;
telemetry: InstanceType<typeof Telemetry>;
}>();

const memosLocalPlugin = {
id: "memos-local-openclaw-plugin",
name: "MemOS Local Memory",
Expand All @@ -160,6 +173,19 @@ const memosLocalPlugin = {
configSchema: pluginConfigSchema,

register(api: OpenClawPluginApi) {
// Resolve stateDir early so we can check for a duplicate instance with the
// same data directory (deferred reload / gateway restart scenario).
const stateDir = process.env.OPENCLAW_STATE_DIR || api.resolvePath("~/.openclaw");

// Snapshot the previous instance for this stateDir so startServiceCore()
// can tear it down asynchronously (awaiting port release) before starting
// the new viewer. Instances with a different stateDir are left untouched.
const instanceToReplace = activeInstances.get(stateDir) ?? null;
activeInstances.delete(stateDir);
if (instanceToReplace) {
api.logger.info("memos-local: previous instance detected, will stop before starting new viewer");
}

api.registerMemoryCapability({
promptBuilder: buildMemoryPromptSection,
});
Expand Down Expand Up @@ -286,7 +312,6 @@ const memosLocalPlugin = {
}

let pluginCfg = (api.pluginConfig ?? {}) as Record<string, unknown>;
const stateDir = process.env.OPENCLAW_STATE_DIR || api.resolvePath("~/.openclaw");

// Fallback: read config from file if not provided by OpenClaw
const configPath = path.join(stateDir, "state", "memos-local", "config.json");
Expand Down Expand Up @@ -2378,6 +2403,9 @@ Groups: ${groupNames.length > 0 ? groupNames.join(", ") : "(none)"}`,
? new HubServer({ store, log: ctx.log, config: ctx.config, dataDir: stateDir, embedder, defaultHubPort: derivedHubPort })
: null;

// Track this instance so the next register() with the same stateDir can tear it down
activeInstances.set(stateDir, { viewer, hubServer, worker, store, telemetry });

// ─── Service lifecycle ───

let serviceStarted = false;
Expand All @@ -2386,6 +2414,21 @@ Groups: ${groupNames.length > 0 ? groupNames.join(", ") : "(none)"}`,
if (serviceStarted) return;
serviceStarted = true;

// Gracefully shut down the previous instance before binding new ports
if (instanceToReplace) {
api.logger.info("memos-local: stopping previous instance...");
try {
await instanceToReplace.viewer.stop();
await instanceToReplace.hubServer?.stop();
await instanceToReplace.worker.flush().catch(() => {});
await instanceToReplace.telemetry.shutdown().catch(() => {});
instanceToReplace.store.close();
} catch (err) {
api.logger.warn(`memos-local: previous instance cleanup error: ${err instanceof Error ? err.message : String(err)}`);
}
api.logger.info("memos-local: previous instance stopped");
}

if (hubServer) {
const hubUrl = await hubServer.start();
api.logger.info(`memos-local: hub started at ${hubUrl}`);
Expand Down Expand Up @@ -2429,10 +2472,11 @@ Groups: ${groupNames.length > 0 ? groupNames.join(", ") : "(none)"}`,
id: "memos-local-openclaw-plugin",
start: async () => { await startServiceCore(); },
stop: async () => {
activeInstances.delete(stateDir);
await worker.flush();
await telemetry.shutdown();
await hubServer?.stop();
viewer.stop();
await viewer.stop();
store.close();
api.logger.info("memos-local: stopped");
},
Expand All @@ -2445,6 +2489,8 @@ Groups: ${groupNames.length > 0 ? groupNames.join(", ") : "(none)"}`,
// service.start() immediately after registration.
const SELF_START_DELAY_MS = 0;
setTimeout(() => {
// Abort if this instance was already replaced by a newer register() call
if (activeInstances.get(stateDir)?.viewer !== viewer) return;
if (!serviceStarted) {
api.logger.info("memos-local: service.start() not called by host, self-starting viewer...");
startServiceCore().catch((err) => {
Expand Down
13 changes: 13 additions & 0 deletions apps/memos-local-openclaw/src/hub/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ export class HubServer {

private static readonly OFFLINE_THRESHOLD_MS = 2 * 60 * 1000;
private static readonly OFFLINE_CHECK_INTERVAL_MS = 30 * 1000;
private static readonly REMOTE_HIT_CLEANUP_INTERVAL_MS = 5 * 60 * 1000;
private offlineCheckTimer?: ReturnType<typeof setInterval>;
private remoteHitCleanupTimer?: ReturnType<typeof setInterval>;
private knownOnlineUsers = new Set<string>();

constructor(private opts: HubServerOptions) {
Expand Down Expand Up @@ -123,6 +125,7 @@ export class HubServer {

this.initOnlineTracking();
this.offlineCheckTimer = setInterval(() => this.checkOfflineUsers(), HubServer.OFFLINE_CHECK_INTERVAL_MS);
this.remoteHitCleanupTimer = setInterval(() => this.cleanExpiredRemoteHits(), HubServer.REMOTE_HIT_CLEANUP_INTERVAL_MS);

this.backfillMemoryEmbeddings();

Expand All @@ -131,6 +134,7 @@ export class HubServer {

async stop(): Promise<void> {
if (this.offlineCheckTimer) { clearInterval(this.offlineCheckTimer); this.offlineCheckTimer = undefined; }
if (this.remoteHitCleanupTimer) { clearInterval(this.remoteHitCleanupTimer); this.remoteHitCleanupTimer = undefined; }
if (!this.server) return;

try {
Expand Down Expand Up @@ -1133,6 +1137,15 @@ export class HubServer {
} catch { /* best-effort */ }
}

private cleanExpiredRemoteHits(): void {
const now = Date.now();
let removed = 0;
for (const [key, hit] of this.remoteHitMap) {
if (hit.expiresAt < now) { this.remoteHitMap.delete(key); removed++; }
}
if (removed > 0) this.opts.log.debug(`Hub: cleaned ${removed} expired remote hit(s) (remaining=${this.remoteHitMap.size})`);
}

private authenticate(req: http.IncomingMessage) {
const header = req.headers.authorization;
if (!header || !header.startsWith("Bearer ")) return null;
Expand Down
45 changes: 42 additions & 3 deletions apps/memos-local-openclaw/src/viewer/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,30 @@ export class ViewerServer {
}
}

stop(): void {
stop(): Promise<void> {
this.stopHubHeartbeat();
this.stopNotifPoll();
for (const c of this.notifSSEClients) { try { c.end(); } catch {} }
this.notifSSEClients = [];
this.server?.close();
if (!this.server) return Promise.resolve();
const srv = this.server;
this.server = null;
return new Promise<void>((resolve) => {
const timeout = setTimeout(() => resolve(), 3000);
srv.close(() => { clearTimeout(timeout); resolve(); });
// Force-close idle keep-alive sockets. closeAllConnections is
// available from Node 18.2; fall back to destroying tracked sockets.
if (typeof srv.closeAllConnections === "function") {
srv.closeAllConnections();
} else {
// Older Node: close idle connections via closeIdleConnections
// (18.0+) or just unref so the event loop can exit.
if (typeof (srv as any).closeIdleConnections === "function") {
(srv as any).closeIdleConnections();
}
srv.unref();
}
});
}

getResetToken(): string {
Expand Down Expand Up @@ -3014,6 +3031,28 @@ export class ViewerServer {
res.end(JSON.stringify({ ips }));
}

/**
* Recursively resolve environment variable references in config values.
* OpenClaw stores env refs as { source: "env", id: "VAR_NAME" }.
*/
private static resolveEnvVars(obj: unknown): unknown {
if (obj === null || obj === undefined) return obj;
if (typeof obj === "string") return obj;
if (Array.isArray(obj)) return obj.map(item => ViewerServer.resolveEnvVars(item));
if (typeof obj === "object") {
const entry = obj as Record<string, unknown>;
if (entry.source === "env" && typeof entry.id === "string") {
return process.env[entry.id] ?? "";
}
const result: Record<string, unknown> = {};
for (const [key, value] of Object.entries(entry)) {
result[key] = ViewerServer.resolveEnvVars(value);
}
return result;
}
return obj;
}

private serveConfig(res: http.ServerResponse): void {
try {
const cfgPath = this.getOpenClawConfigPath();
Expand All @@ -3028,7 +3067,7 @@ export class ViewerServer {
?? entries["memos-lite-openclaw-plugin"]?.config
?? entries["memos-lite"]?.config
?? {};
const result: Record<string, unknown> = { ...pluginEntry };
const result: Record<string, unknown> = ViewerServer.resolveEnvVars(pluginEntry) as Record<string, unknown>;
const topEntry = entries["memos-local-openclaw-plugin"]
?? entries["memos-local"]
?? entries["memos-lite-openclaw-plugin"]
Expand Down
33 changes: 33 additions & 0 deletions apps/memos-local-openclaw/tests/hub-server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -498,3 +498,36 @@ describe("hub skill pipeline", () => {
expect(store.getHubSkillBySource(userId, "skill-source-1")).toBeNull();
});
});

describe("hub server remote hit cleanup", () => {
it("should clean up expired remote hits", async () => {
const dir = fs.mkdtempSync(path.join(os.tmpdir(), "memos-hub-cleanup-"));
dirs.push(dir);
const dbPath = path.join(dir, "test.db");
const store = new SqliteStore(dbPath, noopLog);
stores.push(store);
const server = new HubServer({
store,
log: noopLog,
config: { sharing: { enabled: true, role: "hub", hub: { port: 18919, teamName: "Cleanup", teamToken: "cleanup-token" } } },
dataDir: dir,
} as any);
servers.push(server);
await server.start();

const map = (server as any).remoteHitMap as Map<string, any>;
const hitId = "test-hit-1";
map.set(hitId, {
chunkId: "chunk-1",
type: "chunk",
expiresAt: Date.now() - 1000, // Already expired
requesterUserId: "user-1",
});
expect(map.size).toBe(1);

(server as any).cleanExpiredRemoteHits();

expect(map.size).toBe(0);
expect(map.get(hitId)).toBeUndefined();
});
});
2 changes: 1 addition & 1 deletion apps/memos-local-openclaw/tests/shutdown-lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ describe("shutdown lifecycle", () => {

class MockViewer {
async start(): Promise<string> { return "http://127.0.0.1:18799"; }
stop(): void { events.push("viewer-stop"); }
async stop(): Promise<void> { events.push("viewer-stop"); }
getResetToken(): string { return "token"; }
}

Expand Down
24 changes: 23 additions & 1 deletion apps/memos-local-plugin/src/viewer/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3080,6 +3080,28 @@ export class ViewerServer {
res.end(JSON.stringify({ ips }));
}

/**
* Recursively resolve environment variable references in config values.
* OpenClaw stores env refs as { source: "env", id: "VAR_NAME" }.
*/
private static resolveEnvVars(obj: unknown): unknown {
if (obj === null || obj === undefined) return obj;
if (typeof obj === "string") return obj;
if (Array.isArray(obj)) return obj.map(item => ViewerServer.resolveEnvVars(item));
if (typeof obj === "object") {
const entry = obj as Record<string, unknown>;
if (entry.source === "env" && typeof entry.id === "string") {
return process.env[entry.id] ?? "";
}
const result: Record<string, unknown> = {};
for (const [key, value] of Object.entries(entry)) {
result[key] = ViewerServer.resolveEnvVars(value);
}
return result;
}
return obj;
}

private serveConfig(res: http.ServerResponse): void {
try {
const cfgPath = this.getOpenClawConfigPath();
Expand All @@ -3094,7 +3116,7 @@ export class ViewerServer {
?? entries["memos-lite-openclaw-plugin"]?.config
?? entries["memos-lite"]?.config
?? {};
const result: Record<string, unknown> = { ...pluginEntry };
const result: Record<string, unknown> = ViewerServer.resolveEnvVars(pluginEntry) as Record<string, unknown>;
const topEntry = entries["memos-local-openclaw-plugin"]
?? entries["memos-local"]
?? entries["memos-lite-openclaw-plugin"]
Expand Down