Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 178 additions & 8 deletions polykit/analysis/polymer_analyses.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@

"""

from math import sqrt

import numpy as np
import pandas as pd

from scipy.spatial import cKDTree
from scipy.ndimage import gaussian_filter1d

Expand Down Expand Up @@ -66,6 +66,45 @@ def calculate_contacts(data, cutoff=1.7):
return pairs


def calculate_contacts_chunked(data, cutoff=1.7, chunk_size = int(1e8)):
"""
Calculate contacts between particles, chunked.

Args:
data (ndarray): Array of shape Nx3 representing the polymer data.
cutoff (float, optional): Distance cutoff for identifying contacts. Defaults to 1.7.

Yields:
ndarray: Array of shape Mx2 representing the pairs of pairticles that are in contact.

Raises:
ValueError: If the shape of the polymer data is not Nx3.
RuntimeError: If the polymer data contains NaN values.
"""

if data.shape[1] != 3:
raise ValueError("Incorrect polymer data shape. Must be Nx3.")

if np.isnan(data).any():
raise RuntimeError("Data contains NANs")

tree = cKDTree(data)

N_RANDOM = 10
random_sample = tree.query_ball_point(x=data[np.random.choice(len(data), size=N_RANDOM)], r=cutoff, workers = 1)
n_pairs_avg = np.mean([len(i) for i in random_sample])
particles_per_chunk = int(chunk_size // n_pairs_avg)

for i in range(0, len(data), particles_per_chunk):
chunk = data[i:i+particles_per_chunk]

neighbours = tree.query_ball_point(x=chunk, r=cutoff, workers = 1)
pairs = np.vstack([np.repeat(np.arange(len(neighbours)), [len(i) for i in neighbours]), np.concatenate(neighbours)]).T
pairs = pairs[pairs[:,0] != pairs[:,1]]

yield pairs


def smart_contacts(data, cutoff=1.7, min_cutoff=2.1, percent_func=lambda x: 1 / x):
"""Calculates contacts for a polymer, give the contact radius (cutoff)
This method takes a random fraction of the monomers that is equal to (
Expand Down Expand Up @@ -120,16 +159,62 @@ def smart_contacts(data, cutoff=1.7, min_cutoff=2.1, percent_func=lambda x: 1 /
return calculate_contacts(data, cutoff)


def generate_bins(N, start=4, bins_per_order_magn=10):
def generate_bins(end, start=1, bins_decade=10):
lstart = np.log10(start)
lend = np.log10(N - 1) + 1e-6
num = int(np.ceil((lend - lstart) * bins_per_order_magn))
bins = np.unique(np.logspace(lstart, lend, dtype=int, num=max(num, 0)))
if len(bins) > 0:
assert bins[-1] == N - 1
lend = np.log10(end)
num = int(np.round(np.ceil((lend - lstart) * bins_decade)))
bins = np.unique(
np.round(np.logspace(lstart, lend, num=max(num, 0))).astype(np.int64)
)
assert bins[-1] == end
return bins


def _bin_contacts_df(contacts, N, bins_decade=10, bins=None, ring=False):
if ring:
contour_dists = np.abs(contacts[:, 1] - contacts[:, 0])
mask = contour_dists > N // 2
contour_dists[mask] = N - contour_dists[mask]
else:
contour_dists = np.abs(contacts[:, 1] - contacts[:, 0])

if bins is None and bins_decade is None:
bins = np.arange(0, N + 1)
contacts_per_bin = np.bincount(contour_dists, minlength=N)
else:
if bins is None:
bins = generate_bins(N, bins_decade=bins_decade)
else:
bins = np.array(bins)

contacts_per_bin = np.bincount(
np.searchsorted(bins, contour_dists, side="right"), minlength=len(bins)
)
contacts_per_bin = contacts_per_bin[
1 : len(bins)
] # ignore contacts outside of distance binds

if ring:
pairs_per_bin = np.diff(N * bins)
else:
pairs_per_bin = np.diff(N * bins + 0.5 * bins - 0.5 * (bins ** 2))

contact_freqs = contacts_per_bin / pairs_per_bin

bin_mids = np.sqrt(bins[:-1] * bins[1:])

return pd.DataFrame(
{
"dist": bin_mids,
"contact_freq": contact_freqs,
"n_particle_pairs": pairs_per_bin,
"n_contacts": contacts_per_bin,
"min_dist": bins[:-1],
"max_dist": bins[1:],
}
)


def contact_scaling(data, bins0=None, cutoff=1.1, *, ring=False):
"""
Returns contact probability scaling for a given polymer conformation
Expand Down Expand Up @@ -185,7 +270,7 @@ def contact_scaling(data, bins0=None, cutoff=1.1, *, ring=False):

connumbers = connumbers / possible

a = [sqrt(i[0] * (i[1] - 1)) for i in bins]
a = [np.sqrt(i[0] * (i[1] - 1)) for i in bins]
return a, connumbers


Expand All @@ -204,6 +289,91 @@ def smooth(x):
return mids[1:], slope


def contact_vs_dist_df(
coords,
contact_radius=1.1,
bins_decade=10,
bins=None,
ring=False,
):
"""
Returns the P(s) statistics of a polymer, i.e. the average contact frequency
vs countour distance. Contact frequencies are averaged across ranges (bins)
of countour distance.

Parameters
----------
coords : Nx3 array of ints/floats
An array of coordinates of the polymer particles.
bins : array.
Bins to divide the total span of possible countour distances.
If neither `bins` or `bins_decade` are provided, distances are not binned.
bins_decade : int.
If provided, distance bins are generated automatically
in the range of [1, N_PARTICLES - 1],
such that bins edges are approximately equally spaced in
the log space, with approx. `bins_decade` bins per decade.
If neither `bins` or `bins_decade` are provided, distances are not binned.

contact_radius : float
Particles separated in 3D by less than `contact_radius` are
considered to be in contact.
ring : bool, optional
If True, will calculate contacts for the ring

Returns
-------
(bin_mids, contact_freqs, npairs_per_bin) where "mids" contains
geometric means of bin start/end


"""
coords = np.asarray(coords)
if coords.shape[1] != 3:
raise ValueError(
"coords must contain an Nx3 array of particle coordinates in 3D"
)
N = coords.shape[0]

contacts = np.array(
calculate_contacts(coords, contact_radius)
)

assert np.sum(contacts[:, 1] < contacts[:, 0]) == 0

cvd = _bin_contacts_df(contacts, N, bins_decade=bins_decade, bins=bins, ring=ring)

return cvd


def gaussian_contact_vs_dist(
coords, contact_vs_dist_func, random_sigma=3.0, random_reps=10, **kwargs
):
"""Calculates contact_vs_dist_func for a polymer with random normal shifts.

Args:
coords (_type_): coordinates of a polymer
contact_vs_dist_func (_type_): function to calculate contact_vs_dist
random_sigma (float, optional): the standard deviation of the random gaussian shift along each coordinate. Defaults to 3.0.
random_reps (int, optional): number of random shifts to average over. Defaults to 10.

Returns:
_type_: a pandas dataframe with the average contact frequency at different contour distances
"""
if random_sigma is None:
return contact_vs_dist_func(coords, **kwargs)

contact_freqs = 0
for _ in range(random_reps):
shifts = np.random.normal(scale=random_sigma, size=coords.shape)
res = contact_vs_dist_func(coords + shifts, **kwargs)
contact_freqs += res["contact_freq"] / random_reps

res["contact_freq"] = contact_freqs

return res


def Rg2_scaling(data, bins=None, ring=False):
"""Calculates average gyration radius of subchains a function of s

Expand Down
53 changes: 53 additions & 0 deletions polykit/generators/initial_conformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,56 @@ def grow_cubic(N, boxSize, method="standard"):
break
# print a
return np.array(a) - 1


def space_filling_curve(N, cond, init_point=None):
"""Grow a space-filling curve in 3D.

Args:
N (int): the number of points to generate (including the initial point).
cond (function): a function that takes a point in 3D and returns True if the point is valid and False otherwise.
init_point (tuple, optional): the initial point. Defaults to None.

Raises:
Exception: _description_

Returns:
np.array: the generated points.
"""

i = 0
d = np.zeros((N,3))
if init_point is not None:
d[0] = init_point
nx, ny, nz = 0, 0, 0

while i < N-1:
dx = ((ny + 1) % 2) * 2 - 1
new_p = d[i] + (dx, 0, 0)

if cond(new_p):
d[i+1] = new_p
i += 1
nx += 1
continue

dy = ((nz + 1) % 2) * 2 - 1
new_p = d[i] + (0, dy, 0)

if cond(new_p):
d[i+1] = new_p
i+= 1
ny += 1
continue

new_p = d[i] + (0, 0, 1)

if cond(new_p):
d[i+1] = new_p
i += 1
nz += 1
continue

raise Exception('Cannot grow up!')

return d
Empty file added polykit/io/__init__.py
Empty file.
72 changes: 72 additions & 0 deletions polykit/io/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from ..analysis import polymer_analyses
from . import fetch_hoomd

import collections
import pathlib
import shelve

SCALING_CACHE_FILENAME = "scalings.shlv"


def cached_contact_vs_dist(
folder,
fetch_func=fetch_hoomd.fetch_frame,
get_abs_frame_idx=fetch_hoomd.get_abs_frame_idx,
frame_idx=-1,
contact_radius=1.1,
bins_decade=10,
bins=None,
ring=False,
particle_slice=(0,None),
random_sigma=None,
random_reps=10,
cache_file=SCALING_CACHE_FILENAME,

):
if random_sigma is None:
random_reps = None

frame_idx = fetch_hoomd.get_abs_frame_idx(folder, frame_idx)

path = pathlib.Path(folder) / cache_file
cache_f = shelve.open(path.as_posix(), "c")

key_dict = {}
for k in [
"frame_idx",
"bins",
"bins_decade",
"contact_radius",
"ring",
"random_sigma",
"random_reps",
"particle_slice"
]:
key_dict[k] = locals()[k]

if isinstance(key_dict["bins"], collections.abc.Iterable):
key_dict["bins"] = tuple(key_dict["bins"])

# key = '_'.join([i for for kv in sorted(key_dict.items()) for i in kv])
key = repr(tuple(sorted(key_dict.items())))

if key in cache_f:
return cache_f[key]

coords = fetch_func(folder, frame_idx)
coords = coords[particle_slice[0]:particle_slice[-1]]
sc = polymer_analyses.gaussian_contact_vs_dist(
coords,
contact_vs_dist_func=polymer_analyses.contact_vs_dist_df,
random_sigma=random_sigma,
random_reps=random_reps,
bins_decade=bins_decade,
bins=bins,
contact_radius=contact_radius,
ring=ring,
)
cache_f[key] = sc
cache_f.close()

return sc

Loading