-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmacroWorker.ts
More file actions
270 lines (250 loc) · 9.75 KB
/
macroWorker.ts
File metadata and controls
270 lines (250 loc) · 9.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
/**
* DataLab-Web macro Worker.
*
* Loads a second Pyodide instance in a dedicated Web Worker so user
* macros run isolated from the main UI thread. Communicates with the
* main thread via ``postMessage``:
*
* ──► main thread sends:
* { type: "init" }
* { type: "run", code: string, name?: string }
* { type: "bridge_reply", id: string, ok: boolean, value?, error? }
*
* ◄── worker sends:
* { type: "ready" }
* { type: "stdout"|"stderr", text: string }
* { type: "started", name: string }
* { type: "finished", ok: boolean, error?: string }
* { type: "bridge_call", id: string, method: string, payload: any }
*
* Stop is implemented main-side via ``Worker.terminate()``.
*/
import macroProxySource from "./macro_proxy.py?raw";
import dlwTitleFormatSource from "./dlw_title_format.py?raw";
// Same JSON Schema / backends shims as the main runtime — required so
// ``guidata.dataset`` (transitively imported by ``sigima``) loads
// cleanly under Pyodide. See runtime.ts for the rationale.
import guidataJsonSchemaShim from "./_guidata_jsonschema_shim.py?raw";
const guidataBackendsSource = (() => {
const candidates = import.meta.glob("./_guidata_backends_shim.py", {
query: "?raw",
import: "default",
eager: true,
}) as Record<string, string>;
const first = Object.values(candidates)[0];
return first ?? null;
})();
declare const self: DedicatedWorkerGlobalScope & {
_dlw_bridge_call?: (method: string, payload: unknown) => Promise<unknown>;
_dlw_pending_replies?: Map<
string,
{ resolve: (v: unknown) => void; reject: (e: unknown) => void }
>;
};
interface PyodideAPI {
runPythonAsync: (code: string) => Promise<unknown>;
loadPackage: (names: string | string[]) => Promise<void>;
globals: {
set: (name: string, value: unknown) => void;
get: (name: string) => unknown;
};
FS: {
writeFile: (path: string, data: string) => void;
mkdirTree?: (path: string) => void;
};
setStdout: (opts: {
batched?: (s: string) => void;
raw?: (c: number) => void;
}) => void;
setStderr: (opts: {
batched?: (s: string) => void;
raw?: (c: number) => void;
}) => void;
}
const PYODIDE_VERSION = "v0.26.4";
const PYODIDE_INDEX = `https://cdn.jsdelivr.net/pyodide/${PYODIDE_VERSION}/full/`;
let pyPromise: Promise<PyodideAPI> | null = null;
/** Load Pyodide on first use; subsequent calls return the same instance. */
async function getPyodide(): Promise<PyodideAPI> {
if (pyPromise) return pyPromise;
pyPromise = (async () => {
// Module workers don't support ``importScripts`` — use the ESM
// build of Pyodide and a dynamic ``import()`` instead. Vite needs
// the ``/* @vite-ignore */`` hint because the URL is dynamic.
const pyodideMod = (await import(
/* @vite-ignore */ `${PYODIDE_INDEX}pyodide.mjs`
)) as { loadPyodide: (opts: { indexURL: string }) => Promise<PyodideAPI> };
const py = await pyodideMod.loadPyodide({ indexURL: PYODIDE_INDEX });
// Pin ``LANG=C`` before any guidata/sigima import so gettext-wrapped
// labels (e.g. signal/image creation types, processing labels) come
// back in English and stay consistent with the React UI. See the
// long-form comment in ``runtime.ts`` and the "Internationalisation"
// section of ``README.md`` for the rationale and the path forward.
await py.runPythonAsync(`
import os
os.environ["LANG"] = "C"
os.environ["LANGUAGE"] = "C"
`);
await py.loadPackage(["numpy", "scipy", "h5py", "micropip"]);
// Install Sigima + guidata so macros can ``import sigima`` and
// ``import guidata.dataset`` exactly as they do in DataLab desktop.
// This adds ~10-20s to the first macro run; subsequent runs reuse
// the same worker.
await py.runPythonAsync(`
import micropip
await micropip.install(["sigima", "guidata"])
`);
// Install Sigima's ``PlaceholderTitleFormatter`` so titles produced
// inside macros use the same placeholder format as the main runtime
// (later resolved to source ``oid``s by the main bootstrap).
py.FS.writeFile("/home/pyodide/dlw_title_format.py", dlwTitleFormatSource);
await py.runPythonAsync(dlwTitleFormatSource);
await py.runPythonAsync(guidataJsonSchemaShim);
if (guidataBackendsSource) {
await py.runPythonAsync(guidataBackendsSource);
}
// Stream stdout/stderr to the main thread, line by line.
// ``setStdout``/``setStderr`` work for raw C-level writes but
// user-level ``print`` in the auto-wrapped coroutine doesn't always
// reach them, so we also patch ``sys.stdout`` / ``sys.stderr`` in
// Python — this is what ultimately captures ``print`` output.
py.setStdout({
batched: (s: string) => self.postMessage({ type: "stdout", text: s }),
});
py.setStderr({
batched: (s: string) => self.postMessage({ type: "stderr", text: s }),
});
py.globals.set("_dlw_post_stdout", (s: unknown) =>
self.postMessage({ type: "stdout", text: String(s) }),
);
py.globals.set("_dlw_post_stderr", (s: unknown) =>
self.postMessage({ type: "stderr", text: String(s) }),
);
await py.runPythonAsync(`
import sys
class _DLWStream:
def __init__(self, post):
self._post = post
self._buf = ""
def write(self, s):
if not s:
return 0
self._buf += s
# Flush complete lines immediately; keep the trailing partial.
if "\\n" in self._buf:
head, _, tail = self._buf.rpartition("\\n")
self._post(head + "\\n")
self._buf = tail
return len(s)
def flush(self):
if self._buf:
self._post(self._buf)
self._buf = ""
def isatty(self):
return False
sys.stdout = _DLWStream(_dlw_post_stdout)
sys.stderr = _DLWStream(_dlw_post_stderr)
`);
// Bridge: macro calls ``js._dlw_bridge_call(method, payload)`` and
// awaits the returned Promise; we resolve it when the main thread
// posts back ``{type: "bridge_reply", id, ok, value|error}``.
self._dlw_pending_replies = new Map();
let nextId = 0;
self._dlw_bridge_call = (method: string, payload: unknown) => {
const id = `b${++nextId}`;
return new Promise((resolve, reject) => {
self._dlw_pending_replies!.set(id, { resolve, reject });
self.postMessage({ type: "bridge_call", id, method, payload });
});
};
// Install the Python ``proxy`` global by executing macro_proxy.py.
// Also persist it to the FS so user code can ``import macro_proxy``
// explicitly if it wants to.
try {
py.FS.writeFile("/home/pyodide/macro_proxy.py", macroProxySource);
} catch {
/* /home/pyodide may not exist on every Pyodide build — ignore. */
}
await py.runPythonAsync(macroProxySource);
return py;
})();
return pyPromise;
}
// Surface async errors that escape the ``onmessage`` try/catch (e.g. a
// timer scheduled by Pyodide that throws, or a detached Promise that
// rejects). Without these handlers the worker would die silently — the
// main thread relies on ``stderr`` / ``finished`` messages to update the
// macro console and Stop button state.
self.onerror = (event: Event | string): boolean => {
const text =
typeof event === "string"
? event
: (event as ErrorEvent).message || "macro worker error";
self.postMessage({ type: "stderr", text: text + "\n" });
self.postMessage({ type: "finished", ok: false, error: text });
// Returning ``true`` would prevent default logging in the host page;
// we want the error to also appear in DevTools, so let it propagate.
return false;
};
self.onunhandledrejection = (event: PromiseRejectionEvent): void => {
const reason = event.reason;
const text =
reason instanceof Error
? reason.message
: typeof reason === "string"
? reason
: "unhandled promise rejection in macro worker";
self.postMessage({ type: "stderr", text: text + "\n" });
self.postMessage({ type: "finished", ok: false, error: text });
};
self.onmessage = async (event: MessageEvent) => {
const msg = event.data as
| { type: "init" }
| { type: "run"; code: string; name?: string }
| {
type: "bridge_reply";
id: string;
ok: boolean;
value?: unknown;
error?: string;
};
try {
if (msg.type === "init") {
await getPyodide();
self.postMessage({ type: "ready" });
return;
}
if (msg.type === "bridge_reply") {
const pending = self._dlw_pending_replies?.get(msg.id);
if (!pending) return;
self._dlw_pending_replies!.delete(msg.id);
if (msg.ok) pending.resolve(msg.value);
else pending.reject(new Error(msg.error ?? "bridge call failed"));
return;
}
if (msg.type === "run") {
const py = await getPyodide();
self.postMessage({ type: "started", name: msg.name ?? "" });
try {
// ``proxy`` is already a global injected by macro_proxy.py.
// ``runPythonAsync`` supports top-level ``await`` by auto-
// wrapping the source in a coroutine.
await py.runPythonAsync(msg.code);
// Flush any partial line still buffered in our redirected
// ``sys.stdout`` / ``sys.stderr`` (no trailing ``\n``).
await py.runPythonAsync(
"import sys\nsys.stdout.flush()\nsys.stderr.flush()",
);
self.postMessage({ type: "finished", ok: true });
} catch (err) {
const text = err instanceof Error ? err.message : String(err);
self.postMessage({ type: "stderr", text: text + "\n" });
self.postMessage({ type: "finished", ok: false, error: text });
}
}
} catch (err) {
const text = err instanceof Error ? err.message : String(err);
self.postMessage({ type: "stderr", text: text + "\n" });
self.postMessage({ type: "finished", ok: false, error: text });
}
};