Source code for alignment

from scipy.optimize import linear_sum_assignment

from data_classes import *


[docs] def munkres(x_arr,y_arr,x_linked,y_linked,intens_x,intens_y,skip_fraction=0.4,skip_level=0.8,segmentation_threshold = 300): """ Chunked alignment wrapper over `munkres_align` to handle large inputs. This function splits both input sequences and their linked/intensity arrays into chunks to control memory usage and runtime, aligns each chunk independently via the Hungarian algorithm, and concatenates the results. Parameters ---------- x_arr, y_arr : Sequence[array_like] or array_like Sequences of items for the X and Y sides. Each item is reduced to its mean value inside `munkres_align` for distance computation. x_linked, y_linked : array_like Auxiliary arrays associated with `x_arr`/`y_arr` that are returned in aligned order (e.g., original structures or metadata). Must be indexable by the same indices as the corresponding inputs. intens_x, intens_y : array_like Intensity values linked to `x_arr`/`y_arr`. Used to compute intensity-aware costs in `munkres_align`. skip_fraction : float, optional Fraction of extra dummy rows/columns added to each chunk's cost matrix to allow skipping matches. Example: 0.2 means add ~20% rows/cols. skip_level : float, optional Penalty multiplier applied to the maximum base cost to build dummy rows/columns. Larger values discourage skipping. segmentation_threshold : int, optional Target maximum chunk length. The input is split into roughly ceil(len/segmentation_threshold) segments to avoid huge cost matrices and memory errors. Returns ------- tuple (aln_x, aln_y, aln_x_linked, aln_y_linked) where - aln_x, aln_y: lists of items from x_arr/y_arr in aligned order - aln_x_linked, aln_y_linked: np.ndarray of linked items aligned with aln_x/aln_y respectively. Notes ----- - Chunking reduces peak memory: each cost matrix is built per-chunk. - Choose `skip_fraction` modestly (e.g., 0.1–0.5); large values can blow up the matrix and cause MemoryError. - The function preserves per-chunk order; cross-chunk matches are not considered. If global cross-chunk matches matter, increase `segmentation_threshold` or consider overlapping chunks. """ print('size x:', len(x_arr)) print('size y:', len(y_arr)) x_len, y_len = len(x_arr), len(y_arr) ch_count_arr = lambda n, ln : n//ln + int(n%ln != 0) choose_split = lambda arr, ch_count: arr.sync_split(ch_count) if type(arr) is LinkedList else np.array_split(np.asarray(arr,dtype='object'), ch_count, axis=0) chunk_count = min(ch_count_arr(x_len, segmentation_threshold), ch_count_arr(y_len, segmentation_threshold)) chunk_ags = [choose_split(x_arr, chunk_count), choose_split(y_arr, chunk_count), choose_split(x_linked, chunk_count), choose_split(y_linked,chunk_count), choose_split(np.asarray(intens_x), chunk_count), choose_split(np.asarray(intens_y), chunk_count)] # Инициализируем аккумуляторы до цикла aln_x, aln_y = [], [] aln_x_linked = np.array([], dtype=float) aln_y_linked = np.array([], dtype=float) for i in range(chunk_count): print(f'Processing chunk {i+1}/{chunk_count} with sizes {len(chunk_ags[0][i])} and {len(chunk_ags[1][i])}') if i == 0: aln_x, aln_y, aln_x_linked, aln_y_linked = munkres_align(chunk_ags[0][i], chunk_ags[1][i], chunk_ags[2][i], chunk_ags[3][i], chunk_ags[4][i], chunk_ags[5][i], skip_fraction, skip_level) else: ax, ay, axl, ayl = munkres_align(chunk_ags[0][i], chunk_ags[1][i], chunk_ags[2][i], chunk_ags[3][i], chunk_ags[4][i], chunk_ags[5][i], skip_fraction, skip_level) aln_x.extend(ax) aln_y.extend(ay) aln_x_linked = np.concatenate([aln_x_linked, axl]) aln_y_linked = np.concatenate([aln_y_linked, ayl]) return aln_x, aln_y, aln_x_linked, aln_y_linked
[docs] def munkres_align(x_arr,y_arr,x_linked,y_linked,intens_x,intens_y,skip_fraction=0.4,skip_level=0.8): """ Align two sequences using the Hungarian (Munkres) algorithm with an intensity-aware distance cost and optional skipping. Parameters ---------- x_arr : Sequence[array_like] or array_like Sequence of items for the X side. Each item is reduced to its mean value for distance computation. y_arr : Sequence[array_like] or array_like Sequence of items for the Y side, treated analogously to `x_arr`. x_linked : array_like Array of auxiliary data associated with `x_arr` (returned aligned; e.g., original structures or metadata). Must be indexable by the same indices as `x_arr`. y_linked : array_like Auxiliary data associated with `y_arr` (returned aligned). intens_x : array_like Intensity values associated with `x_arr`, used as the `linked_array` to compute intensity differences in the cost. intens_y : array_like Intensity values associated with `y_arr`. skip_fraction : float, optional Fraction of additional dummy rows/cols to add to the cost matrix to enable skipping matches. ``round(n * skip_fraction)`` rows/cols are padded, where ``n`` is the current matrix size. Default is 0.3. skip_level : float, optional Multiplier applied to the maximum cost to set the padding value for dummy rows/cols. Higher values discourage skipping. Default is 0.8. Returns ------- aln_x : list Elements of `x_arr` selected by the optimal assignment, in match order. aln_y : list Elements of `y_arr` selected by the optimal assignment, in match order. aln_x_linked : ndarray Items from `x_linked` corresponding to `aln_x`. aln_y_linked : ndarray Items from `y_linked` corresponding to `aln_y`. """ convert = lambda arr, arr_linked: LinkedList(np.array([np.array(el).mean() for el in arr]), arr_linked) x, y = convert(x_arr, intens_x), convert(y_arr, intens_y) x_len,y_len = len(x),len(y) x_n,y_n = __equal_size(x,y) matrix = __make_matrix(x_n,y_n,skip_fraction,skip_level) #print(f'matrix shape {matrix.shape}') indexes = np.array(linear_sum_assignment(matrix)) condition = (indexes[0,:] < x_len) & (indexes[1,:] < y_len) xind = indexes[:,condition][0] yind = indexes[:,condition][1] aln_x = [x_arr[i] for i in xind] aln_y = [y_arr[i] for i in yind] aln_x_linked = np.asarray(x_linked)[xind] aln_y_linked = np.asarray(y_linked)[yind] return aln_x,aln_y,aln_x_linked,aln_y_linked
def __w(x, y,alpha_dist=1,alpha_int=0.01, k = 20): """ Compute the pairwise cost between two LinkedLists combining distance and intensity differences with exponential scaling and tanh normalization. Parameters ---------- x : LinkedList or array_like Values for the first axis. When `LinkedList`, its `linked_array` is used to compute intensity differences. y : LinkedList or array_like Values for the second axis. When `LinkedList`, its `linked_array` is used to compute intensity differences. alpha_dist : float, optional Exponential scaling factor for absolute distance differences. Default is 0.01. alpha_int : float, optional Exponential scaling factor for absolute intensity differences. Default is 0.01. k : float, optional Scaling parameter for the tanh normalization. Default is 20. Returns ------- ndarray A 2-D cost matrix broadcast from `x` and `y` shapes, with entries in the range (-1, 1), approaching 1 for large combined differences. """ x_arr = np.asarray(x, dtype=float) y_arr = np.asarray(y, dtype=float) # Важно: linked_array тоже принудительно приводим к числовому типу, # чтобы избежать dtype=object и object-циклов ufunc'ов x_linked = np.asarray(x.linked_array, dtype=float) y_linked = np.asarray(y.linked_array, dtype=float) scale = lambda var, alpha: np.exp(var * alpha) - 1 norm = lambda var,k: np.tanh(var/k) dist_scaled = scale(abs(x_arr-y_arr),alpha_dist) dint_scaled = scale(abs(x_linked-y_linked),alpha_int) weight = (dist_scaled**2+dint_scaled**2)**0.5 return norm(weight,k) def __make_matrix(x,y,skip_fraction=None,skip_level=1): """ Build a padded cost matrix for assignment, allowing optional skipping via dummy rows/columns filled with a constant penalty. Parameters ---------- x : LinkedList Input values (possibly reshaped to 2-D) for the first axis. y : LinkedList Input values (possibly reshaped to 2-D) for the second axis. skip_fraction : float or None, optional Fraction of the base matrix size to determine how many dummy rows/cols to pad. If None, no padding is applied. Default is None. skip_level : float, optional Multiplier for the maximum base cost to set the pad penalty. Default is 1. Returns ------- ndarray The (possibly padded) 2-D cost matrix suitable for ``scipy.optimize.linear_sum_assignment``. """ matrix = __w(x.sync_reshape((-1,1)), y.sync_reshape((1,-1))) if skip_fraction is not None: add_lines = round(matrix.shape[0]*skip_fraction) pad_value = matrix.max()*skip_level matrix = np.pad(matrix,((0,add_lines),(0,add_lines)),'constant',constant_values=pad_value) return matrix def __equal_size(x,y): """ Make two LinkedLists equal in length by padding the shorter one with ``np.inf`` both in values and in its linked array. Parameters ---------- x : LinkedList First sequence. y : LinkedList Second sequence. Returns ------- tuple of LinkedList A pair ``(x_equal, y_equal)`` with matching lengths. """ change_linked = lambda linked_arr, delta: LinkedList(np.concatenate([linked_arr,np.full(delta,np.inf)]),linked_array=np.concatenate([linked_arr.linked_array,np.full(delta,np.inf)])) x_len,y_len = len(x),len(y) d_len = abs(x_len-y_len) if x_len == y_len: return x,y elif x_len > y_len: return x, change_linked(y,d_len)#np.concatenate([y,np.full(x_len-y_len,np.inf)]) else: return change_linked(x,d_len),y#np.concatenate([x, np.full(y_len - x_len, np.inf)]),y