diff --git a/.gitignore b/.gitignore index 47650c0e..1583fd4f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +.DS_Store +.idea build PyORBIT.egg-info .vscode diff --git a/examples/Envelope/plot.py b/examples/Envelope/plot.py new file mode 100644 index 00000000..0a088a66 --- /dev/null +++ b/examples/Envelope/plot.py @@ -0,0 +1,133 @@ +import numpy as np +import matplotlib.pyplot as plt +import matplotlib.patches as patches + + +def calc_rms_ellipse_params(cov_matrix: np.ndarray) -> tuple[float, float, float]: + """Return rms ellipse dimensions and orientation.""" + (i, j) = (0, 1) + + sii = cov_matrix[i, i] + sjj = cov_matrix[j, j] + sij = cov_matrix[i, j] + + angle = -0.5 * np.arctan2(2.0 * sij, sii - sjj) + + _sin = np.sin(angle) + _cos = np.cos(angle) + _sin2 = _sin**2 + _cos2 = _cos**2 + + c1 = np.sqrt(abs(sii * _cos2 + sjj * _sin2 - 2 * sij * _sin * _cos)) + c2 = np.sqrt(abs(sii * _sin2 + sjj * _cos2 + 2 * sij * _sin * _cos)) + + return (c1, c2, angle) + + +def plot_ellipse( + r1: float = 1.0, + r2: float = 1.0, + angle: float = 0.0, + center: tuple[float, float] = None, + ax=None, + **kws, +): + kws.setdefault("fill", False) + kws.setdefault("color", "black") + kws.setdefault("lw", 1.25) + + if center is None: + center = (0.0, 0.0) + + d1 = r1 * 2.0 + d2 = r2 * 2.0 + angle = -np.degrees(angle) + + ax.add_patch(patches.Ellipse(center, d1, d2, angle=angle, **kws)) + return ax + + +def plot_rms_ellipse( + cov_matrix: np.ndarray, + level: float = 1.0, + ax=None, + **ellipse_kws, +): + """Plot rms ellipse from 2 x 2 covariance matrix.""" + r1, r2, angle = calc_rms_ellipse_params(cov_matrix) + plot_ellipse(r1 * level, r2 * level, angle=angle, ax=ax, **ellipse_kws) + return ax + + +def plot_corner( + particles: np.ndarray, + limits: list[tuple[float, float]] = None, + bins: int = 64, + labels: list[str] = None, + blur: float = None, +) -> tuple: + """Generate corner plot.""" + ndim = particles.shape[1] + + if limits is None: + xmax = np.max(particles, axis=0) + xmin = np.min(particles, axis=0) + limits = list(zip(xmin, xmax)) + + if labels is None: + labels = ndim * [""] + + fig, axs = plt.subplots( + ncols=ndim, nrows=ndim, sharex=None, sharey=None, figsize=(8, 8) + ) + for i in range(ndim): + for j in range(ndim): + axis = (j, i) + ax = axs[i, j] + if i > j: + values, edges = np.histogramdd( + particles[:, axis], bins=bins, range=[limits[k] for k in axis] + ) + if blur: + values = scipy.ndimage.gaussian_filter(values, sigma=blur) + ax.pcolormesh( + edges[0], + edges[1], + values.T, + linewidth=0.0, + rasterized=True, + shading="auto", + ) + elif i == j: + values, edges = np.histogram( + particles[:, i], bins=bins, range=limits[i] + ) + if blur: + values = scipy.ndimage.gaussian_filter(values, sigma=blur) + ax.stairs(values, edges, lw=1.5, color="black") + else: + ax.axis("off") + + for i in range(0, ndim - 1): + for j in range(0, ndim): + axs[i, j].set_xticklabels([]) + for i in range(0, ndim): + for j in range(1, ndim): + axs[i, j].set_yticklabels([]) + + for ax in axs.flat: + for loc in ["top", "right"]: + ax.spines[loc].set_visible(False) + + for i, label in enumerate(labels): + axs[-1, i].set_xlabel(label) + for i, label in enumerate(labels[1:], start=1): + axs[i, 0].set_ylabel(label) + + axs[0, 0].set_yticklabels([]) + axs[0, 0].set_ylabel(None) + + fig.align_ylabels() + fig.align_xlabels() + + return fig, axs diff --git a/examples/Envelope/style.mplstyle b/examples/Envelope/style.mplstyle new file mode 100644 index 00000000..71f36605 --- /dev/null +++ b/examples/Envelope/style.mplstyle @@ -0,0 +1,8 @@ +axes.linewidth: 1.25 +axes.titlesize: "medium" +image.cmap: "Greys" +figure.constrained_layout.use: True +savefig.dpi: 300 +savefig.format: "png" +xtick.minor.visible: True +ytick.minor.visible: True diff --git a/examples/Envelope/test_env_fodo.py b/examples/Envelope/test_env_fodo.py new file mode 100644 index 00000000..671b00f9 --- /dev/null +++ b/examples/Envelope/test_env_fodo.py @@ -0,0 +1,332 @@ +"""Test envelope tracker in FODO lattice.""" + +import argparse +import copy +import math +import os +import pathlib + +import numpy as np +import matplotlib.pyplot as plt + +from orbit.core.bunch import Bunch +from orbit.core.bunch import BunchTwissAnalysis +from orbit.core.spacecharge import SpaceChargeCalc2p5D +from orbit.bunch_utils import collect_bunch +from orbit.envelope import Envelope +from orbit.envelope import EnvelopeTracker +from orbit.lattice import AccLattice +from orbit.lattice import AccNode +from orbit.core.spacecharge import SpaceChargeCalc2p5D +from orbit.space_charge.sc2p5d import setSC2p5DAccNodes +from orbit.teapot import DriftTEAPOT +from orbit.teapot import QuadTEAPOT +from orbit.teapot import TEAPOT_Lattice +from orbit.teapot import TEAPOT_MATRIX_Lattice +from orbit.utils.consts import mass_proton + +from plot import plot_rms_ellipse +from plot import plot_corner +from utils import gen_dist +from utils import build_rotation_matrix_xy +from utils import project_cov_matrix + +plt.style.use("style.mplstyle") + + +# Parse arguments +# ------------------------------------------------------------------------------ + +parser = argparse.ArgumentParser() +parser.add_argument("--zrms", type=float, default=5.0) +parser.add_argument("--kin-energy", type=float, default=0.025) +parser.add_argument("--intensity", type=float, default=5e10) + +parser.add_argument( + "--dist-name", type=str, default="kv", choices=["kv", "waterbag", "gauss"] +) +parser.add_argument("--dist-mismatch-x", type=float, default=0.0) +parser.add_argument("--dist-mismatch-y", type=float, default=0.0) +parser.add_argument("--dist-offset-x", type=float, default=0.0) +parser.add_argument("--dist-offset-y", type=float, default=0.0) +parser.add_argument("--dist-tilt", action="store_true") + +parser.add_argument("--nslice", type=int, default=10) +parser.add_argument("--kq", type=float, default=0.25) + +parser.add_argument("--nparts", type=int, default=100_000) +parser.add_argument("--turns", type=int, default=25) +parser.add_argument("--sc", type=int, default=0) +args = parser.parse_args() + + +# Setup +# ------------------------------------------------------------------------------ + +path = pathlib.Path(__file__) +output_dir = os.path.join("outputs", path.stem) +os.makedirs(output_dir, exist_ok=True) + + +# Create lattice +# ------------------------------------------------------------------------------ + +nodes = [ + QuadTEAPOT(length=0.5, kq=+args.kq), + DriftTEAPOT(length=1.0), + QuadTEAPOT(length=1.0, kq=-args.kq), + DriftTEAPOT(length=1.0), + QuadTEAPOT(length=0.5, kq=+args.kq), +] + +lattice = TEAPOT_Lattice() +for node in nodes: + node.setnParts(args.nslice) + node.setUsageFringeFieldIN(False) + node.setUsageFringeFieldOUT(False) + lattice.addNode(node) + +lattice.initialize() + + +# Create envelope +# ------------------------------------------------------------------------------ + +# Create bunch +bunch = Bunch() +bunch.mass(mass_proton) +sync_part = bunch.getSyncParticle() +sync_part.kinEnergy(args.kin_energy) + +# Find periodic lattice parameters +matrix_lattice = TEAPOT_MATRIX_Lattice(lattice, bunch) +matrix_lattice_params = matrix_lattice.getRingParametersDict() +alpha_x = matrix_lattice_params["alpha x"] +alpha_y = matrix_lattice_params["alpha y"] +beta_x = matrix_lattice_params["beta x [m]"] +beta_y = matrix_lattice_params["beta y [m]"] +eps_x = 0.25e-06 +eps_y = eps_x + +# Generate covariance matrix +cov_matrix = np.zeros((6, 6)) +cov_matrix[0, 0] = eps_x * beta_x +cov_matrix[2, 2] = eps_y * beta_y +cov_matrix[0, 1] = cov_matrix[1, 0] = -eps_x * alpha_x +cov_matrix[2, 3] = cov_matrix[3, 2] = -eps_y * alpha_y +cov_matrix[1, 1] = eps_x * (1.0 + alpha_x**2) / beta_x +cov_matrix[3, 3] = eps_y * (1.0 + alpha_y**2) / beta_y +cov_matrix[4, 4] = args.zrms**2 +cov_matrix[5, 5] = 0.0 + +# Tilt +if args.dist_tilt: + rot_matrix = np.identity(6) + rot_matrix[:4, :4] = build_rotation_matrix_xy(angle=(0.25 * math.pi)) + cov_matrix = np.linalg.multi_dot([rot_matrix, cov_matrix, rot_matrix.T]) + +# Mismatch +cov_matrix[0, 0] *= (1.0 + args.dist_mismatch_x) ** 2 +cov_matrix[2, 2] *= (1.0 + args.dist_mismatch_y) ** 2 +cov_matrix_init = np.copy(cov_matrix) + +# Offset +centroid_init = np.zeros(6) +centroid_init[0] += args.dist_offset_x +centroid_init[2] += args.dist_offset_y + +# Create envelope +envelope = Envelope( + sync_part=sync_part, + cov_matrix=cov_matrix_init, + centroid=centroid_init, + intensity=args.intensity, +) + + +# Track envelope +# ------------------------------------------------------------------------------ + +print("TRACK ENVELOPE") + +tracker = EnvelopeTracker(lattice, space_charge=("2d" if args.sc else None)) + +history = {"xrms": [], "yrms": [], "xavg": [], "yavg": []} +for turn in range(args.turns): + if turn > 0: + tracker.track(envelope) + + cov_matrix = envelope.cov() + centroid = envelope.centroid() + + xrms = 1000.0 * math.sqrt(cov_matrix[0, 0]) + yrms = 1000.0 * math.sqrt(cov_matrix[2, 2]) + xavg = 1000.0 * centroid[0] + yavg = 1000.0 * centroid[2] + + print( + f"turn={turn} xrms={xrms:0.3f} yrms={yrms:0.3f} xavg={xavg:0.3f} yavg={yavg:0.3f}" + ) + + history["xrms"].append(xrms) + history["yrms"].append(yrms) + history["xavg"].append(xavg) + history["yavg"].append(yavg) + +histories = {} +histories["envelope"] = copy.deepcopy(history) + + +# Track bunch +# ------------------------------------------------------------------------------ + +print("TRACK BUNCH") + +# Generate particles from KV distribution + uniform longitudinal distribution. +rng = np.random.default_rng() + +bunch_coords = np.zeros((args.nparts, 6)) +bunch_coords[:, :4] = gen_dist(args.nparts, cov_matrix_init[0:4, 0:4], args.dist_name) +bunch_coords[:, 4] = 2.0 * rng.uniform(-args.zrms, args.zrms, size=args.nparts) +bunch_coords += centroid_init[None, :6] + +for i in range(bunch_coords.shape[0]): + bunch.addParticle(*bunch_coords[i]) + + +# Add space charge nodes to lattice. +if args.sc: + sc_calc = SpaceChargeCalc2p5D(128, 128, 1) + sc_path_length_min = 1.00e-06 + sc_nodes = setSC2p5DAccNodes(lattice, sc_path_length_min, sc_calc) + + bunch_size = bunch.getSizeGlobal() + bunch.macroSize(args.intensity / bunch_size) + + +# Track bunch through lattice. +history = {"xrms": [], "yrms": [], "xavg": [], "yavg": []} +for turn in range(args.turns): + if turn > 0: + lattice.trackBunch(bunch) + + twiss_calc = BunchTwissAnalysis() + twiss_calc.computeBunchMoments(bunch, 2, 0, 0) + + cov_matrix = np.zeros((6, 6)) + for i in range(6): + for j in range(i + 1): + cov_matrix[i, j] = twiss_calc.getCorrelation(j, i) + cov_matrix[j, i] = cov_matrix[i, j] + + xrms = 1000.0 * math.sqrt(cov_matrix[0, 0]) + yrms = 1000.0 * math.sqrt(cov_matrix[2, 2]) + xavg = 1000.0 * twiss_calc.getAverage(0) + yavg = 1000.0 * twiss_calc.getAverage(2) + + print( + f"turn={turn} xrms={xrms:0.3f} yrms={yrms:0.3f} xavg={xavg:0.3f} yavg={yavg:0.3f}" + ) + + history["xrms"].append(xrms) + history["yrms"].append(yrms) + history["xavg"].append(xavg) + history["yavg"].append(yavg) + +histories["bunch"] = copy.deepcopy(history) + + +# Analysis +# ------------------------------------------------------------------------------ + +for history in histories.values(): + for key in history: + history[key] = np.array(history[key]) + +# Print errors +for key in histories["envelope"]: + deltas = histories["bunch"][key] - histories["envelope"][key] + print("key:", key) + print("max_abs_delta:", np.max(np.abs(deltas))) + print("avg_abs_delta:", np.mean(np.abs(deltas))) + +# Plot rms bunch sizes +for key in ["xrms", "yrms"]: + fig, ax = plt.subplots(figsize=(5, 3)) + for i, model in enumerate(["envelope", "bunch"]): + color = ["black", "red"][i] + lw = [None, 0][i] + ax.plot(histories[model][key], marker=".", lw=lw, color=color, label=model) + ax.set_ylim(0.0, ax.get_ylim()[1] * 2.0) + ax.set_xlabel("Turn") + ax.set_ylabel("RMS [mm]") + ax.legend(loc="upper right") + plt.savefig(os.path.join(output_dir, f"fig_{key}")) + plt.close() + +# Plot centroids +for key in ["xavg", "yavg"]: + fig, ax = plt.subplots(figsize=(5, 3)) + for i, model in enumerate(["envelope", "bunch"]): + color = ["black", "red"][i] + lw = [None, 0][i] + ax.plot(histories[model][key], marker=".", lw=lw, color=color, label=model) + ax.set_ylim(-5.0, 5.0) + ax.set_xlabel("Turn") + ax.set_ylabel("AVG [mm]") + ax.legend(loc="upper right") + plt.savefig(os.path.join(output_dir, f"fig_{key}")) + plt.close() + + +# Collect bunch/envelope data on final turn. +particles = collect_bunch(bunch)["coords"] +particles[:, :4] *= 1000.0 + +env_cov_matrix = envelope.cov() +env_cov_matrix[:4, :4] *= 1000.0**2 + +print(env_cov_matrix) + +env_centroid = envelope.centroid() +env_centroid[:4] *= 1000.0 + +xmax = 4.0 * np.std(particles, axis=0) +limits = list(zip(-xmax, xmax)) +labels = ["x [mm]", "xp [mrad]", "y [mm]", "yp [mrad]", "z [m]", "dE [GeV]"] + + +# Plot x-x' +fig, ax = plt.subplots(figsize=(4, 4)) +ax.hist2d(particles[:, 0], particles[:, 1], bins=100, range=[limits[0], limits[1]]) +plot_rms_ellipse( + env_cov_matrix[0:2, 0:2], + center=(env_centroid[0], env_centroid[1]), + level=2.0, + color="red", + ax=ax, +) +ax.set_xlabel(labels[0]) +ax.set_ylabel(labels[1]) +plt.savefig(os.path.join(output_dir, "fig_dist_x_xp")) +plt.close() + +# Plot corner +fig, axs = plot_corner( + particles, + limits=limits, + bins=100, + labels=labels, +) +for i in range(6): + for j in range(i): + env_cov_matrix_proj = project_cov_matrix(env_cov_matrix, axis=(j, i)) + plot_rms_ellipse( + env_cov_matrix_proj, + center=(env_centroid[j], env_centroid[i]), + level=2.0, + color="red", + ax=axs[i, j], + ) +plt.savefig(os.path.join(output_dir, "fig_dist_corner")) +plt.close() diff --git a/examples/Envelope/utils.py b/examples/Envelope/utils.py new file mode 100644 index 00000000..432e4945 --- /dev/null +++ b/examples/Envelope/utils.py @@ -0,0 +1,59 @@ +import numpy as np + + +def gen_dist_gauss(n: int, cov_matrix: np.ndarray) -> np.ndarray: + return np.random.multivariate_normal( + mean=np.zeros(cov_matrix.shape[0]), + cov=cov_matrix, + size=n, + ) + + +def gen_dist_kv(n: int, cov_matrix: np.ndarray) -> np.ndarray: + X = np.random.normal(size=(n, cov_matrix.shape[0])) + X /= np.linalg.norm(X, axis=1)[:, None] + X /= np.std(X, axis=0) + return X + + +def gen_dist_waterbag(n: int, cov_matrix: np.ndarray) -> np.ndarray: + X = gen_dist_kv(n, cov_matrix) + dim = X.shape[1] + r = np.random.uniform(size=n) ** (1.0 / dim) + X *= r[:, None] + X /= np.std(X, axis=0) + return X + + +def gen_dist(n: int, cov_matrix: np.ndarray, name: str) -> np.ndarray: + if name == "kv": + X = gen_dist_kv(n, cov_matrix) + elif name == "waterbag": + X = gen_dist_waterbag(n, cov_matrix) + elif name == "gauss": + X = gen_dist_gauss(n, cov_matrix) + else: + raise ValueError(f"Invalid distribution name: {name}") + + L = np.linalg.cholesky(cov_matrix) + return np.matmul(X, L.T) + + +def build_rotation_matrix_xy(angle: float) -> np.ndarray: + cs = np.cos(angle) + sn = np.sin(angle) + + matrix = np.identity(4) + matrix[0, 0] = matrix[1, 1] = +cs + matrix[0, 2] = matrix[1, 3] = +sn + matrix[2, 0] = matrix[3, 1] = -sn + matrix[2, 2] = matrix[3, 3] = +cs + return matrix + + +def project_cov_matrix(cov_matrix: np.ndarray, axis: tuple[int, ...]) -> np.ndarray: + cov_matrix_proj = np.zeros((len(axis), len(axis))) + for i in range(len(axis)): + for j in range(len(axis)): + cov_matrix_proj[i, j] = cov_matrix[axis[i], axis[j]] + return cov_matrix_proj diff --git a/py/orbit/envelope/__init__.py b/py/orbit/envelope/__init__.py new file mode 100644 index 00000000..70a5fcbc --- /dev/null +++ b/py/orbit/envelope/__init__.py @@ -0,0 +1,2 @@ +from .envelope import Envelope +from .envelope import EnvelopeTracker diff --git a/py/orbit/envelope/envelope.py b/py/orbit/envelope/envelope.py new file mode 100644 index 00000000..596be246 --- /dev/null +++ b/py/orbit/envelope/envelope.py @@ -0,0 +1,154 @@ +import math + +import numpy as np + +from ..core.bunch import SyncParticle +from ..lattice import AccNode +from ..lattice import AccLattice + +from .matrix import MatrixFactory +from .utils import gen_dist + + +ENTRANCE = AccNode.ENTRANCE +BODY = AccNode.BODY +EXIT = AccNode.EXIT + +BEFORE = AccNode.BEFORE +AFTER = AccNode.AFTER + + +def get_perveance(mass: float, kin_energy: float, line_density: float) -> float: + classical_proton_radius = 1.53469e-18 # [m] + gamma = 1.0 + (kin_energy / mass) # Lorentz factor + beta = np.sqrt(1.0 - (1.0 / gamma) ** 2) # velocity/speed_of_light + return (classical_proton_radius * line_density) / (beta**2 * gamma**3) + + +class Envelope: + """Represents beam envelope and centroid. + + Attributes: + matrix: 7 x 7 covariance matrix for augmented phase space vector. + + Define the phase space vector X = [x, x', y, y', z, dE]^T and + augmented vector Y = [x, x', y, y', z, dE, 1]. + + Let X evolve according to X -> MX + U, where M is a 6 x 6 transfer matrix + and U is 6 x 1 "driving" vector. The augmented vector Y evolves according + to Y -> NY, where N = [[M, U], [0, 1]] is a 7 x 7 matrix. + + We track the 7 x 7 covariance matrix of Y: + + R = = [[, ], [, 1]], + + which contains both the phase space covariance matrix and centroid vector. + R evolves according to R -> N R N^T. + """ + def __init__( + self, + sync_part: SyncParticle, + cov_matrix: np.ndarray = None, + centroid: np.ndarray = None, + intensity: float = 0.0, + ) -> None: + self.sync_part = sync_part + + if centroid is None: + centroid = np.zeros(6) + + if cov_matrix is None: + cov_matrix = np.eye(6) + + self.matrix = np.zeros((7, 7)) + self.matrix[0:6, 0:6] = cov_matrix + self.matrix[0:6, 6] = centroid + self.matrix[6, 0:6] = centroid + self.matrix[6, 6] = 1.0 + + self.intensity = 0.0 + self.perveance = 0.0 + self.set_intensity(intensity) + + def set_intensity(self, intensity: float) -> None: + self.intensity = intensity + cov_matrix = self.cov() + length = 2.0 * math.sqrt(cov_matrix[4, 4]) # assume uniform density + self.perveance = get_perveance( + mass=self.sync_part.mass(), + kin_energy=self.sync_part.kinEnergy(), + line_density=(self.intensity / length), + ) + + def centroid(self) -> np.ndarray: + return self.matrix[0:6, 6] + + def cov(self) -> np.ndarray: + return self.matrix[0:6, 0:6] + + def rms(self) -> np.ndarray: + return np.sqrt(np.diag(self.cov())) + + def apply_transfer_matrix(self, transfer_matrix: np.ndarray) -> None: + self.matrix = np.linalg.multi_dot([transfer_matrix, self.matrix, transfer_matrix.T]) + + def sample(self, n: int, dist: str = "kv") -> np.ndarray: + # Issue: covariance matrix is becoming non semi-positive definite, + # giving error in cholesky decomposition. + particles = gen_dist(n=n, cov_matrix=self.cov(), name=dist) + particles = particles + self.centroid() + return particles + + +class EnvelopeTracker: + """Tracks envelope through linear lattice with optional linear space charge kicks.""" + def __init__(self, lattice: AccLattice, space_charge: str | None = None) -> None: + self.lattice = lattice + self.matrix_factory = MatrixFactory() + self.space_charge = space_charge + + def track(self, envelope: Envelope) -> None: + for node in self.lattice.getNodes(): + # Child nodes before node + for child_node in node.getChildNodes(ENTRANCE): + envelope.apply_transfer_matrix(self.matrix_factory(child_node, envelope.sync_part)) + + for part_index in range(node.getnParts()): + # Child nodes before part + for child_node in node.getChildNodes(BODY, part_index, place_in_part=BEFORE): + envelope.apply_transfer_matrix( + self.matrix_factory(child_node, envelope.sync_part) + ) + + # Space charge + if self.space_charge: + length = node.getLength(part_index) + cov_matrix = envelope.cov() + + if self.space_charge == "2d": + matrix = self.matrix_factory.space_charge_2d( + length=length, cov_matrix=cov_matrix, perveance=envelope.perveance + ) + elif self.space_charge == "3d": + matrix = self.matrix_factory.space_charge_3d( + length=length, cov_matrix=cov_matrix, intensity=envelope.intensity + ) + else: + raise ValueError(f"Invalid space charge model: {self.space_charge}") + + envelope.apply_transfer_matrix(matrix) + + # Main node part + envelope.apply_transfer_matrix( + self.matrix_factory(node, envelope.sync_part, part_index) + ) + + # Child nodes after part + for child_node in node.getChildNodes(BODY, part_index, place_in_part=AFTER): + envelope.apply_transfer_matrix( + self.matrix_factory(child_node, envelope.sync_part) + ) + + # Child nodes after node + for child_node in node.getChildNodes(EXIT): + envelope.apply_transfer_matrix(self.matrix_factory(child_node, envelope.sync_part)) diff --git a/py/orbit/envelope/matrix.py b/py/orbit/envelope/matrix.py new file mode 100644 index 00000000..f93e4fda --- /dev/null +++ b/py/orbit/envelope/matrix.py @@ -0,0 +1,182 @@ +import math + +import numpy as np + +from ..core.bunch import Bunch +from ..core.bunch import SyncParticle +from ..lattice import AccNode +from ..teapot import DriftTEAPOT +from ..teapot import QuadTEAPOT +from ..teapot import BendTEAPOT +from ..teapot import TiltTEAPOT +from ..teapot import KickTEAPOT +from ..teapot import ApertureTEAPOT +from ..teapot import BunchWrapTEAPOT +from ..teapot import FringeFieldTEAPOT +from ..teapot import MonitorTEAPOT +from ..teapot import TurnCounterTEAPOT + + +class MatrixFactory: + """Factory for 7x7 transfer matrices.""" + + def __init__(self) -> None: + self.ignore_node_types = [ + ApertureTEAPOT, + BunchWrapTEAPOT, + FringeFieldTEAPOT, + MonitorTEAPOT, + TurnCounterTEAPOT, + ] + + def drift(self, length: float, gamma: float) -> np.ndarray: + matrix = np.identity(7) + matrix[0, 1] = length + matrix[2, 3] = length + matrix[4, 5] = length / gamma**2 + return matrix + + def quad(self, length: float, kq: float) -> np.ndarray: + sqrt_abs_kq = math.sqrt(abs(kq)) + + matrix = np.identity(7) + if kq > 0: + cx = np.cos(sqrt_abs_kq * length) + sx = np.sin(sqrt_abs_kq * length) + cy = np.cosh(sqrt_abs_kq * length) + sy = np.sinh(sqrt_abs_kq * length) + matrix[0, 0] = cx + matrix[0, 1] = +sx / sqrt_abs_kq + matrix[1, 0] = -sx * sqrt_abs_kq + matrix[1, 1] = cx + matrix[2, 2] = cy + matrix[2, 3] = sy / sqrt_abs_kq + matrix[3, 2] = sy * sqrt_abs_kq + matrix[3, 3] = cy + elif kq < 0: + cx = np.cosh(sqrt_abs_kq * length) + sx = np.sinh(sqrt_abs_kq * length) + cy = np.cos(sqrt_abs_kq * length) + sy = np.sin(sqrt_abs_kq * length) + matrix[0, 0] = cx + matrix[0, 1] = sx / sqrt_abs_kq + matrix[1, 0] = sx * sqrt_abs_kq + matrix[1, 1] = cx + matrix[2, 2] = cy + matrix[2, 3] = +sy / sqrt_abs_kq + matrix[3, 2] = -sy * sqrt_abs_kq + matrix[3, 3] = cy + return matrix + + def bend(self, length: float, theta: float, gamma: float) -> np.ndarray: + rho = length / theta + + cx = math.cos(theta) + sx = math.sin(theta) + + matrix = np.identity(7) + matrix[0, 0] = cx + matrix[0, 1] = sx * rho + matrix[0, 5] = (1.0 - cx) * rho + matrix[1, 0] = -sx / rho + matrix[1, 1] = cx + matrix[1, 5] = sx + matrix[2, 3] = length + matrix[4, 0] = -sx + matrix[4, 1] = -(1.0 - cx) * rho + matrix[4, 5] = (length / gamma**2) - rho * (theta - sx) + return matrix + + def tilt(self, angle: float) -> np.ndarray: + cs = math.cos(angle) + sn = math.sin(angle) + + matrix = np.identity(7) + matrix[0, 0] = matrix[1, 1] = +cs + matrix[0, 2] = matrix[1, 3] = +sn + matrix[2, 0] = matrix[3, 1] = -sn + matrix[2, 2] = matrix[3, 3] = +cs + return matrix + + def kick(self, kx: float, ky: float, dE: float) -> np.ndarray: + matrix = np.identity(7) + matrix[1, 6] = kx + matrix[3, 6] = ky + matrix[5, 6] = dE + return matrix + + def space_charge_2d( + self, length: float, cov_matrix: np.ndarray, perveance: float + ) -> np.ndarray: + cov_xx = cov_matrix[0, 0] + cov_yy = cov_matrix[2, 2] + cov_xy = cov_matrix[0, 2] + + angle = -0.5 * math.atan2(2.0 * cov_xy, cov_xx - cov_yy) + _sin = math.sin(angle) + _cos = math.cos(angle) + + rx = 2.0 * np.sqrt(abs(cov_xx * _cos**2 + cov_yy * _sin**2 - 2.0 * cov_xy * _sin * _cos)) + ry = 2.0 * np.sqrt(abs(cov_xx * _sin**2 + cov_yy * _cos**2 + 2.0 * cov_xy * _sin * _cos)) + + kappa_x = 2.0 * perveance / (rx * (rx + ry)) + kappa_y = 2.0 * perveance / (ry * (rx + ry)) + + matrix = np.identity(7) + matrix[1, 0] = kappa_x * length + matrix[3, 2] = kappa_y * length + + R = self.tilt(angle) + M = np.linalg.multi_dot([R, matrix, np.linalg.inv(R)]) + return M + + def space_charge_3d( + self, length: float, cov_matrix: np.ndarray, intensity: float + ) -> np.ndarray: + raise NotImplementedError() + + def __call__(self, node: AccNode, sync_part: SyncParticle, part_index: int = 0) -> np.ndarray: + if type(node) is DriftTEAPOT: + length = node.getLength(part_index) + gamma = sync_part.gamma() + return self.drift(length=length, gamma=gamma) + + elif type(node) is QuadTEAPOT: + nparts = node.getnParts() + length = node.getLength(part_index) + + scale = 1.0 + if node.waveform: + scale = node.waveform.getStrength() + + kq = scale * node.getParam("kq") + return self.quad(length=length, kq=kq) + + elif type(node) is BendTEAPOT: + nparts = node.getnParts() + length = node.getLength(part_index) + theta = node.getParam("theta") / (nparts - 1) + gamma = sync_part.gamma() + return self.bend(length=length, theta=theta, gamma=gamma) + + elif type(node) is KickTEAPOT: + nparts = node.getnParts() + + scale = 1.0 + if node.waveform: + scale = node.waveform.getStrength() + + kx = scale * node.getParam("kx") / (nparts - 1) + ky = scale * node.getParam("ky") / (nparts - 1) + dE = node.getParam("dE") / (nparts - 1) + return self.kick(kx, ky, dE) + + elif type(node) is TiltTEAPOT: + angle = node.getTiltAngle() + return self.tilt(angle) + + elif type(node) in self.ignore_node_types: + return np.identity(7) + + else: + raise NotImplementedError("Unsupported node type: {}".format(type(node))) diff --git a/py/orbit/envelope/meson.build b/py/orbit/envelope/meson.build new file mode 100644 index 00000000..31c9033b --- /dev/null +++ b/py/orbit/envelope/meson.build @@ -0,0 +1,13 @@ +py_sources = files([ + '__init__.py', + 'envelope.py', + 'matrix.py', + 'utils.py' +]) + +python.install_sources( + py_sources, + subdir: 'orbit/envelope', + # pure: true, +) + diff --git a/py/orbit/envelope/utils.py b/py/orbit/envelope/utils.py new file mode 100644 index 00000000..e846d83c --- /dev/null +++ b/py/orbit/envelope/utils.py @@ -0,0 +1,39 @@ +import numpy as np + + +def gen_dist_gauss(n: int, cov_matrix: np.ndarray) -> np.ndarray: + return np.random.multivariate_normal( + mean=np.zeros(cov_matrix.shape[0]), + cov=cov_matrix, + size=n, + ) + + +def gen_dist_kv(n: int, cov_matrix: np.ndarray) -> np.ndarray: + X = np.random.normal(size=(n, cov_matrix.shape[0])) + X /= np.linalg.norm(X, axis=1)[:, None] + X /= np.std(X, axis=0) + return X + + +def gen_dist_waterbag(n: int, cov_matrix: np.ndarray) -> np.ndarray: + X = gen_dist_kv(n, cov_matrix) + dim = X.shape[1] + r = np.random.uniform(size=n) ** (1.0 / dim) + X *= r[:, None] + X /= np.std(X, axis=0) + return X + + +def gen_dist(n: int, cov_matrix: np.ndarray, name: str) -> np.ndarray: + if name == "kv": + X = gen_dist_kv(n, cov_matrix) + elif name == "waterbag": + X = gen_dist_waterbag(n, cov_matrix) + elif name == "gauss": + X = gen_dist_gauss(n, cov_matrix) + else: + raise ValueError(f"Invalid distribution name: {name}") + + L = np.linalg.cholesky(cov_matrix) + return np.matmul(X, L.T) diff --git a/py/orbit/meson.build b/py/orbit/meson.build index 2ba16e73..651b14d5 100644 --- a/py/orbit/meson.build +++ b/py/orbit/meson.build @@ -24,6 +24,7 @@ subdir('space_charge') subdir('errors') subdir('matrix_lattice') subdir('teapot') +subdir('envelope') py_sources = files([ diff --git a/py/orbit/teapot/__init__.py b/py/orbit/teapot/__init__.py index 945450d4..9886b7fe 100644 --- a/py/orbit/teapot/__init__.py +++ b/py/orbit/teapot/__init__.py @@ -16,6 +16,10 @@ from .teapot import SolenoidTEAPOT from .teapot import TiltTEAPOT from .teapot import NodeTEAPOT +from .teapot import TurnCounterTEAPOT +from .teapot import ApertureTEAPOT +from .teapot import MonitorTEAPOT +from .teapot import BunchWrapTEAPOT from .teapot import TPB @@ -38,3 +42,6 @@ __all__.append("NodeTEAPOT") __all__.append("TPB") __all__.append("TEAPOT_MATRIX_Lattice") +__all__.append("TurnCounterTEAPOT") +__all__.append("ApertureTEAPOT") +__all__.append("MonitorTEAPOT")