import multiprocessing
import os
import numpy as np
os.environ['PYQTGRAPH_QT_LIB'] = 'PyQt5'
import copy
import math
import sys
from multiprocessing import Process, Queue, Pool
from pathlib import Path
from queue import Empty
import traceback
import h5py
import pyqtgraph as pg
import scipy.stats as stats
import yaml
from PyQt5 import QtCore
from KDEpy import FFTKDE
from PyQt5 import QtWidgets
from PyQt5.QtCore import QTimer, QObject, pyqtSignal, Qt, QThread
from PyQt5.QtGui import QIcon
from PyQt5.QtWidgets import QMainWindow, QApplication, QWidget, QLineEdit, QLabel, QPushButton, \
QFormLayout, QFileDialog, QTableWidgetItem, QTableWidget, QVBoxLayout, QHeaderView, QAbstractItemView, QSplitter, \
QHBoxLayout, QTreeWidget, QTreeWidgetItem, QProgressBar
from diptest import diptest
from numba import njit
import alignment
from data_classes import *
from scipy.special import rel_entr
import pandas as pd
"""classes declaration"""
[docs]
class Const:
"""
Container for application-wide constants.
Attributes
----------
DATASET_RAW : str or None
HDF5 path to the raw spectra dataset.
DATASET_ALN : str or None
HDF5 path to the aligned spectra dataset.
REF : float or None
Reference m/z value used to locate the reference peak.
.. warning::
This parameter is currently **not used** in the pipeline and will be removed
in a future version.
DEV : float or None
Acceptable deviation (±) around `REF` when searching for the reference peak.
N_DOTS : int or None
Number of points for KDE evaluation.
BW : float or None
Bandwidth parameter for KDE.
"""
#class attr
DATASET_RAW: str | None= None
DATASET_ALN: str | None = None
REF: float | None = None
DEV: float | None = None
N_DOTS: int | None = None
BW: float | None = None
[docs]
class WorkerSignals(QObject):
"""
Signals and processing pipeline for background computations.
Signals
-------
output : pyqtSignal(str)
Emitted for redirected standard output messages.
error : pyqtSignal(str)
Emitted for redirected standard error messages or exceptions.
result : pyqtSignal(object)
Emitted with computation results to be consumed by the main thread.
finished : pyqtSignal()
Emitted when the processing pipeline finishes.
progress : pyqtSignal(int)
Emitted to update a progress bar.
create_pbar : pyqtSignal(tuple)
Emitted to initialize a progress bar. Expected tuple is (min, max).
"""
output = pyqtSignal(str)
error = pyqtSignal(str)
result = pyqtSignal(object)
finished = pyqtSignal()
progress = pyqtSignal(int)
create_pbar = pyqtSignal(tuple)
[docs]
def find_dots_process(self):
"""
Run the main data processing pipeline.
The pipeline reads raw and aligned spectra from HDF5, computes KDEs,
performs peak picking, aligns peak lists, and computes
descriptive and inferential statistics. Results are emitted via the
`result` signal as a tuple of render instructions and statistics.
Notes
-----
Emits
- ``create_pbar``: tuple of (min, max) for a progress bar.
- ``progress``: updates during dataset iteration.
- ``result``: composite payload for UI updates.
- ``finished``: upon completion or on handled exception.
- ``error``: formatted traceback on exception.
"""
try:
features_raw, attrs_raw = File(Const.RAW).read(Const.DATASET_RAW)
features_aln, attrs_aln = File(Const.ALN).read(Const.DATASET_ALN)
# включаем параллельную обработку по умолчанию (все ядра минус одно)
processes = max(1, min((os.cpu_count() or 2) - 1,3))
distance_list = read_dataset(self,features_raw, attrs_raw, features_aln, attrs_aln, Const.REF, Const.DEV, processes=processes)
distance_list_prepared = prepare_array(distance_list)
raw_concat, aln_concat, id_concat = distance_list_prepared
kde_x_raw, kde_y_raw = FFTKDE(bw=Const.BW, kernel='gaussian').fit(raw_concat).evaluate(Const.N_DOTS)
kde_x_aln, kde_y_aln = FFTKDE(bw=Const.BW, kernel='gaussian').fit(aln_concat).evaluate(Const.N_DOTS)
center_r, left_r, right_r = peak_picking(kde_x_raw, kde_y_raw)
center_a, left_a, right_a = peak_picking(kde_x_aln, kde_y_aln)
# восстановим высоту пиков
max_center_r, max_center_a = np.interp(center_r, kde_x_raw, kde_y_raw), np.interp(center_a, kde_x_aln,
kde_y_aln)
borders_r = np.stack((left_r, right_r), axis=1)
borders_a = np.stack((left_a, right_a), axis=1)
c_ds_raw = LinkedList(center_r, borders_r)#.sync_delete(np.where(max_center_r <= epsilon)[0])
c_ds_aln = LinkedList(center_a, borders_a)#.sync_delete(np.where(max_center_a <= epsilon)[0])
c_ds_raw_intensity, c_ds_aln_intensity = np.interp(c_ds_raw, kde_x_raw, kde_y_raw), np.interp(c_ds_aln, kde_x_aln,
kde_y_aln)
peak_lists_raw = sort_dots(raw_concat, c_ds_raw.linked_array[:, 0], c_ds_raw.linked_array[:, 1])
peak_lists_aln = sort_dots(aln_concat, c_ds_aln.linked_array[:, 0], c_ds_aln.linked_array[:, 1])
print(raw_concat)
print(peak_lists_raw)
aln_peak_lists_raw, aln_peak_lists_aln, aln_kde_raw, aln_kde_aln = alignment.munkres(peak_lists_raw,
peak_lists_aln,
c_ds_raw,
c_ds_aln,
c_ds_raw_intensity,
c_ds_aln_intensity,
segmentation_threshold=400)
s_p = np.array(pd.DataFrame(np.array(
[stat_params_paired_single(x_el, y_el) for x_el, y_el in zip(aln_peak_lists_raw, aln_peak_lists_aln)],
dtype='float')).dropna())
result_text,result_type = construct_output(p_value=s_p[:,-1],
var_raw=s_p[:,1],
var_aln=s_p[:,2])
ret = (
('show', (((kde_x_raw, kde_y_raw), 'raw', 'red', 'p', 'kde'),
((kde_x_aln, kde_y_aln), 'aln', 'blue', 'p', 'kde'),
(aln_kde_aln,np.max(kde_y_aln),'raw_peaks','mult','vln','kde'),
(aln_kde_raw, np.max(kde_y_aln), 'raw_peaks', 'mult', 'vln', 'kde'))),
#(c_ds_raw, np.max(kde_y_raw), 'raw_peaks', 'red', 'vln', 'kde'),
#(c_ds_aln, np.max(kde_y_aln), 'aln_peaks', 'blue', 'vln', 'kde'))),
#('stats', (stat_params_unpaired(peak_lists_raw).T, stat_params_unpaired(peak_lists_aln).T)),
#('stats_p', (stat_params_unpaired(aln_peak_lists_raw).T, stat_params_unpaired(aln_peak_lists_aln).T)),
('stats',
((stat_params_unpaired(peak_lists_raw).T, 'raw'), (stat_params_unpaired(peak_lists_aln).T, 'aln'))),
('stats_p', ((stat_params_unpaired(aln_peak_lists_raw).T, 'raw'),
(stat_params_unpaired(aln_peak_lists_aln).T, 'aln'))),
('stats_table',s_p),
('final',(result_text,result_type))
)
print(s_p.shape)
self.result.emit(ret)
self.finished.emit()
except Exception as error:
# self.error.emit(str(error))
self.error.emit(traceback.format_exc()) #temporary
self.finished.emit()
[docs]
class StreamRedirect:
"""
Redirect-like object writing messages into a multiprocessing queue.
Parameters
----------
q : multiprocessing.Queue
Target queue where messages will be put.
"""
def __init__(self, q):
"""
Initialize the redirector.
Parameters
----------
q : multiprocessing.Queue
Target queue for messages.
"""
self.q = q
[docs]
def write(self, msg: str):
"""
Write a message to the queue if it's not empty or whitespace only.
Parameters
----------
msg : str
Message to forward.
"""
if msg.strip():
self.q.put(msg)
[docs]
def flush(self):
"""
Placeholder
"""
pass
[docs]
class ProcessManager:
"""
Manage background processes and multiplex their stdout, stderr and results.
Parameters
----------
signals : WorkerSignals
Signals object to emit collected outputs to the main thread.
Attributes
----------
output_q, error_q, return_q : multiprocessing.Queue
Internal queues used to collect outputs from child processes.
process_set : set[str]
Names of currently running processes.
"""
def __init__(self, signals):
"""
Create the manager and internal queues.
Parameters
----------
signals : WorkerSignals
Signals sink for emitting messages.
"""
self.signals = signals
self.output_q = Queue()
self.error_q = Queue()
self.return_q = Queue()
self.process_set = set()
[docs]
def run_process(self, target, target_name, args=None, kwargs=None):
"""
Start a target function in a separate process.
Parameters
----------
target : callable
Function to execute in a child process.
target_name : str
Name used to track the process.
args : list, optional
Positional arguments for `target`.
kwargs : dict, optional
Keyword arguments for `target`.
Returns
-------
multiprocessing.Process
The started process instance.
"""
if args is None:
args = []
if kwargs is None:
kwargs = {}
p = Process(target=self._std_wrapper, args=(target, self.output_q, self.error_q, self.return_q, args, kwargs))
self.process_set.add(target_name)
p.start()
return p
[docs]
def end_process(self, process, target_name):
"""
Join the process if tracked by name and report join errors to `error_q`.
Parameters
----------
process : multiprocessing.Process
Process to join.
target_name : str
Name that identifies the process in `process_set`.
"""
if target_name in self.process_set:
try:
process.join()
except Exception as e:
self.error_q.put(e)
else:
self.error_q.put(str(Exception('no process with name {} running'.format(target_name))))
@staticmethod
def _std_wrapper(target, out_q, error_q, ret_q, args=(), kwargs=None):
"""
Wrap a target callable to redirect stdio and return its result via queues.
Parameters
----------
target : callable
Function to execute.
out_q, error_q, ret_q : multiprocessing.Queue
Queues for stdout, stderr, and return payloads.
args : tuple
Positional arguments for `target`.
kwargs : dict, optional
Keyword arguments for `target`.
"""
if kwargs is None:
kwargs = {}
try:
sys.stdout = StreamRedirect(out_q)
sys.stderr = StreamRedirect(error_q)
result = target(*args, **kwargs)
ret_q.put((target.__name__, result))
except Exception as e:
error_q.put(e)
finally:
sys.stdout.flush()
sys.stderr.flush()
def __check_return(self):
"""
Drain the return queue and emit results via `signals.result`.
"""
while not self.return_q.empty():
try:
func_name, content = self.return_q.get_nowait()
self.signals.result.emit(content)
except Empty:
break
except Exception as e:
self.error_q.put(e)
def __check_error(self):
"""
Drain the error queue and emit messages via `signals.error`.
"""
while not self.error_q.empty():
try:
msg = self.error_q.get_nowait()
self.signals.error.emit(str(msg))
except Empty:
break
def __check_out(self):
"""
Drain the stdout queue and emit messages via `signals.output`.
"""
while not self.output_q.empty():
try:
msg = self.output_q.get_nowait()
# print(msg)
self.signals.output.emit(msg)
except Empty:
break
[docs]
def check_queues(self):
"""
Poll all internal queues and forward their content via signals.
"""
self.__check_return()
self.__check_error()
self.__check_out()
[docs]
class Dataset(LinkedList):
"""
LinkedList with an optional reference value attached.
Parameters
----------
input_array : array_like
Primary data.
linked_array : array_like or None, optional
Secondary linked data.
reference : float or None, optional
Reference m/z value associated with the dataset.
Attributes
----------
reference : float or None
The attached reference value.
"""
def __new__(cls, input_array, linked_array=None, reference=None):
"""
Create a Dataset and attach an optional reference value.
Parameters
----------
input_array : array_like
Primary data.
linked_array : array_like or None, optional
Secondary linked data.
reference : float or None, optional
Reference value to attach.
"""
obj = super().__new__(cls, input_array, linked_array)
obj.reference = reference
return obj
def __array_finalize__(self, obj):
"""
Propagate `reference` and linked array when creating views.
Parameters
----------
obj : ndarray or None
Source object for the view.
"""
super().__array_finalize__(obj)
if obj is None: return
self.reference = getattr(obj, 'reference', None)
def __setitem__(self, index, value):
"""
Assign items in the primary array and mirror to the linked array.
"""
super().__setitem__(index, value)
[docs]
class File:
"""
Thin wrapper around an HDF5 file to read datasets and their headers.
Parameters
----------
file_name : str or Path
Path to the HDF5 file.
Attributes
----------
real_path : Path
Resolved path to the file.
"""
def __init__(self, file_name):
"""
Initialize the file wrapper.
Parameters
----------
file_name : str or Path
Path to the HDF5 file.
"""
self.real_path = Path(file_name)
[docs]
def exist(self):
"""
Check whether the file exists.
Returns
-------
bool
True if the file exists.
"""
return self.real_path.exists()
[docs]
def read(self, dataset):
"""
Read a dataset and its column headers from the HDF5 file.
Parameters
----------
dataset : str
HDF5 path to the dataset to read.
Returns
-------
tuple
A tuple ``(data, attr)`` where ``data`` is a NumPy array and
``attr`` is a list/array of column headers. Returns None on error.
Raises
------
Exception
If the number of headers does not match the number of columns.
FileNotFoundError
If the file does not exist.
"""
try:
if not self.exist():
raise FileNotFoundError
with h5py.File(self.real_path, 'r') as f:
if dataset in f:
data = f[dataset][:]
attr = f[dataset].attrs["Column headers"]
if len(attr) != f[dataset].shape[0] and len(attr) == f[dataset].shape[1]:
data = data.T
elif len(attr) != f[dataset].shape[0] and len(attr) != f[dataset].shape[1]:
raise Exception("The number of columns does not match the number of headers")
return data, attr
except FileNotFoundError:
print(f'File {self.real_path} not found')
return None
except Exception as err:
print(f'Reading Error: {err}')
return None
[docs]
class MainWindow(QMainWindow):
"""
Main application window that hosts pages and coordinates background work.
"""
def __init__(self, *args, **kwargs):
"""
Initialize the main window, tabs, and signal wiring.
"""
super().__init__(*args, **kwargs)
self.setWindowTitle('MZ alignment quality evaluation')
self.resize(800, 600)
self.tabs = QtWidgets.QTabWidget(self)
self.tabs.setTabPosition(QtWidgets.QTabWidget.North)
self.tabs.setMovable(True)
self.setCentralWidget(self.tabs)
self.tabs.currentChanged.connect(self.adjust_tab_sizes)
self.const = Const
self.console_log = LogWidget()
self.main = MainPage(self, 'Main')
self.tabs.addTab(self.main, self.main.title)
self.graph = GraphPage(self, x_labels=['m/z'], y_labels=['dens'], title='KDE', title_plots=('kde',))
self.tabs.addTab(self.graph, self.graph.title)
self.stats = StatGraphPage(self, title='Statistics(Unpaired)')
self.tabs.addTab(self.stats, self.stats.title)
self.stats_p = StatGraphPage(self, title='Statistics(Paired)')
self.tabs.addTab(self.stats_p, self.stats_p.title)
self.table = TablePage(self,title ='Stat per peak',columns=6)
self.tabs.addTab(self.table, self.table.title)
self.table.set_title(['Distance','Var(raw)','Var(aln)','JSD','neq_mean?','neq_var?'])
self.signals = WorkerSignals()
self.manager = ProcessManager(self.signals)
self.timer = QTimer()
self.timer.timeout.connect(self.manager.check_queues)
self.timer.start(100)
self.signals.output.connect(self.console_log.updateText)
self.signals.error.connect(self.console_log.updateText)
self.signals.result.connect(self.console_log.updateText)
# self.signals.result.connect(lambda ds: self.graph.add_plot_mul(ds))
self.signals.result.connect(self.redirect_outputs)
[docs]
def redirect_outputs(self, ret):
"""
Dispatch a composite results payload to the respective UI pages.
Parameters
----------
ret : Sequence[tuple]
Iterable of (key, payload) pairs where key selects a handler.
"""
self.aval_func = {'show': self.graph.add_plot_mul, 'stats': self.stats.add_plot_mul, 'stats_p': self.stats_p.add_plot_mul,'stats_table':self.table.add_data,'final':self.main.result}
for output in ret:
self.aval_func[output[0]](output[1])
[docs]
def adjust_tab_sizes(self):
"""
Resize tab widgets to fit the current tab area.
"""
tab_size = self.tabs.size()
for i in range(self.tabs.count()):
tab = self.tabs.widget(i)
tab.resize(tab_size)
[docs]
def resizeEvent(self, event):
"""
Handle window resize events and adjust child sizes.
Parameters
----------
event : QResizeEvent
The resize event.
"""
super().resizeEvent(event)
self.adjust_tab_sizes()
[docs]
def start_calc(self, target, process_name=None, args=None, kwargs=None):
"""
Start a background calculation using the process manager.
Parameters
----------
target : callable
Function to run in background.
process_name : str, optional
Name for the process; defaults to ``target.__name__``.
args : list, optional
Positional arguments for the target.
kwargs : dict, optional
Keyword arguments for the target.
"""
if kwargs is None:
kwargs = {}
if args is None:
args = []
if process_name is None:
process_name = target.__name__
if process_name in self.manager.process_set:
pass #already started
self.manager.run_process(target, process_name, args, kwargs)
[docs]
class MainPage(QWidget):
"""
Main configuration page for selecting files, datasets and parameters.
Parameters
----------
parent : QWidget
Parent main window.
title : str
Page title.
"""
def __init__(self, parent, title):
"""
Build the configuration UI and wire controls.
"""
super().__init__()
self.thread = QThread()
self.processing = WorkerSignals()
self.title = title
self.parent = parent
self.main_layout = QHBoxLayout()
self.setLayout(self.main_layout)
self.main_splitter = QSplitter()
self.left_main_widget = QWidget()
self.right_main_widget = QWidget()
self.left_layout = QVBoxLayout()
self.right_layout = QVBoxLayout()
self.left_main_widget.setLayout(self.left_layout)
self.right_main_widget.setLayout(self.right_layout)
self.main_splitter.addWidget(self.left_main_widget)
self.main_splitter.addWidget(self.right_main_widget)
self.main_layout.addWidget(self.main_splitter)
self.left_splitter = QSplitter(Qt.Vertical)
self.raw_tree = TreeWidget()
self.aln_tree = TreeWidget()
self.left_splitter.addWidget(self.raw_tree)
self.left_splitter.addWidget(self.aln_tree)
self.left_layout.addWidget(self.left_splitter)
self.splitter = QSplitter()
self.const = Const
form_panel = QtWidgets.QWidget()
form_layout = QFormLayout()
form_panel.setLayout(form_layout)
config_panel = QtWidgets.QWidget()
config_layout = QtWidgets.QVBoxLayout()
config_panel.setLayout(config_layout)
#self.setLayout(self.right_layout)
# Raw
self.raw_layout = QtWidgets.QHBoxLayout()
self.raw_filename = QLineEdit()
self.raw_open_button = QPushButton("Browse")
self.raw_open_button.clicked.connect(lambda: self.open_file(self.raw_filename))
self.raw_layout.addWidget(self.raw_filename)
self.raw_layout.addWidget(self.raw_open_button)
# aln
self.aln_layout = QtWidgets.QHBoxLayout()
self.aln_filename = QLineEdit()
self.aln_open_button = QPushButton("Browse")
self.aln_open_button.clicked.connect(lambda: self.open_file(self.aln_filename))
self.aln_layout.addWidget(self.aln_filename)
self.aln_layout.addWidget(self.aln_open_button)
# ref and dev
self.dataset_raw = QLineEdit()
self.dataset_aln = QLineEdit()
self.ref_set = QLineEdit()
self.raw_filename.setEnabled(False)
self.aln_filename.setEnabled(False)
self.dev_set = QLineEdit()
self.bw_set = QLineEdit()
self.n_dots_set = QLineEdit()
self.global_result = QLabel()
form_layout.addRow(QLabel("Raw data:"), self.raw_layout)
form_layout.addRow(QLabel("Alignment data:"), self.aln_layout)
form_layout.addRow(QLabel("Dataset (raw):"), self.dataset_raw)
form_layout.addRow(QLabel("Dataset (aln):"), self.dataset_aln)
form_layout.addRow(QLabel("Reference point:"), self.ref_set)
form_layout.addRow(QLabel("Acceptable deviation for msalign:"), self.dev_set)
form_layout.addRow(QLabel("Bandwidth:"), self.bw_set)
form_layout.addRow(QLabel("Number of dots:"), self.n_dots_set)
self.config_button = QPushButton("Open config file")
self.config_button.clicked.connect(lambda: self.open_config())
# self.load_config_button = QPushButton("Save configs")
# self.load_config_button.clicked.connect(lambda: self.save_config())
self.calc_button = QPushButton("Calculate")
config_layout.addWidget(self.config_button)
# config_layout.addWidget(self.load_config_button)
config_layout.addWidget(self.calc_button)
self.calc_button.clicked.connect(lambda: self.signal())
# self.calc_button.setEnabled(False)
self.pbar_widget = QWidget()
self.pbar_layout = QFormLayout(self.pbar_widget)
self.pbar = QProgressBar()
self.pbar_label = QLabel("Spectra processing:")
self.pbar_layout.addRow(self.pbar_label,self.pbar)
self.splitter.addWidget(form_panel)
self.splitter.addWidget(config_panel)
self.right_layout.addWidget(self.splitter)
self.right_layout.addWidget(self.parent.console_log)
self.raw_filename.textChanged.connect(lambda text: self.raw_tree.update_tree(text))
self.aln_filename.textChanged.connect(lambda text: self.aln_tree.update_tree(text))
self.raw_tree.path_signal.connect(lambda path: self.dataset_raw.setText(path))
self.aln_tree.path_signal.connect(lambda path: self.dataset_aln.setText(path))
self.right_layout.addWidget(self.pbar_widget)
#temp: result out
self.right_layout.addWidget(self.global_result)
self.global_result.hide()
self.pbar_widget.hide()
try:
with open("last_config.yaml", 'r', encoding='utf8') as f:
yaml_config = yaml.load(f, Loader=yaml.FullLoader)
self.raw_filename.setText(yaml_config['FILE_NAMES'][0])
self.aln_filename.setText(yaml_config['FILE_NAMES'][1])
self.ref_set.setText(str(yaml_config['REF']))
self.dev_set.setText(str(yaml_config['DEV']))
self.dataset_raw.setText(str(yaml_config['DATASET_R']))
self.dataset_aln.setText(str(yaml_config['DATASET_A']))
self.bw_set.setText(str(yaml_config['BW']))
self.n_dots_set.setText(str(yaml_config['NDOTS']))
except Exception as error:
print(error)
[docs]
def open_file(self, raw_filename):
"""
Open a file dialog and set the selected path to the provided line edit.
Parameters
----------
raw_filename : QLineEdit
Line edit to receive the selected file path.
"""
filename, _ = QFileDialog.getOpenFileName(self, "Open File", "", "HDF (*.hdf *.hdf5 *.h5);;All Files (*)")
if not filename: return
raw_filename.setText(filename)
[docs]
def open_config(self):
"""
Load configuration from a YAML file and populate the UI fields.
"""
filename, _ = QFileDialog.getOpenFileName(self, "Open File", "", "yaml (*.yaml);;All Files (*)")
if not filename: return
with open(filename, 'r', encoding='utf8') as f:
try:
yaml_config = yaml.load(f, Loader=yaml.FullLoader)
self.raw_filename.setText(yaml_config['FILE_NAMES'][0])
self.aln_filename.setText(yaml_config['FILE_NAMES'][1])
self.ref_set.setText(str(yaml_config['REF']))
self.dev_set.setText(str(yaml_config['DEV']))
self.dataset_raw.setText(str(yaml_config['DATASET_R']))
self.dataset_aln.setText(str(yaml_config['DATASET_A']))
self.bw_set.setText(str(yaml_config['BW']))
self.n_dots_set.setText(str(yaml_config['NDOTS']))
except Exception as e:
print(e)
[docs]
def Pbar_set_ranges(self, ranges):
"""
Initialize the progress bar range and reset its value.
Parameters
----------
ranges : tuple[int, int]
Minimum and maximum for the progress bar.
"""
self.pbar.setRange(*ranges)
self.pbar.setValue(ranges[0])
[docs]
def Pbar_forwarder(self, n):
"""
Update progress bar value.
Parameters
----------
n : int
New progress value.
"""
self.pbar.setValue(n)
[docs]
def signal(self):
"""
Validate inputs, persist the last configuration, and start processing.
"""
self.pbar_widget.show()
self.pbar.show()
self.pbar_label.setText("Spectra processing:")
try:
data = (self.raw_filename.text(),
self.aln_filename.text(),
self.ref_set.text(),
self.dev_set.text(),
self.dataset_raw.text(),
self.dataset_aln.text(),
self.bw_set.text(),
self.n_dots_set.text())
if '' in data:
raise Exception('Empty string')
else:
with open('last_config.yaml', 'w') as outfile:
yaml.dump({
'FILE_NAMES':(data[0],data[1]),
'REF': float(data[2]),
'DEV': float(data[3]),
'DATASET_R':data[4],
'DATASET_A':data[5],
'BW':float(data[6]),
'NDOTS':int(data[7])
}, outfile, default_flow_style=False)
Const.RAW, Const.ALN, Const.REF, Const.DEV, Const.DATASET_RAW,Const.DATASET_ALN, Const.BW, Const.N_DOTS = data[0], data[1], float(
data[2]), float(
data[3]), data[4],data[5], float(data[6]), int(data[7])
# self.calc_button.setEnabled(True)
except Exception as e:
print(e)
self.processing.moveToThread(self.thread)
self.thread.started.connect(self.processing.find_dots_process)
self.processing.finished.connect(self.thread.quit)
self.processing.finished.connect(self.processing.deleteLater)
self.thread.finished.connect(self.thread.deleteLater)
self.processing.create_pbar.connect(self.Pbar_set_ranges)
self.processing.progress.connect(self.Pbar_forwarder)
self.thread.start()
self.config_button.setEnabled(False)
self.calc_button.setEnabled(False)
self.processing.result.connect(main_window.redirect_outputs)
self.processing.error.connect(main_window.console_log.updateText)
self.thread.finished.connect(
lambda: self.config_button.setEnabled(True)
)
self.thread.finished.connect(
lambda: self.calc_button.setEnabled(True)
)
self.processing.finished.connect(
lambda: self.pbar_label.setText("Process done")
)
self.thread.finished.connect(
lambda: self.pbar.hide()
)
self.processing.error.connect(
lambda: self.pbar_label.setText("Error occurred during processing")
)
self.processing.error.connect(
lambda: self.config_button.setEnabled(True)
)
self.processing.error.connect(
lambda: self.calc_button.setEnabled(True)
)
self.processing.error.connect(
lambda: self.pbar.hide()
)
[docs]
def result(self, result):
"""
Show final result
Parameters
----------
result : tuple
(string which will be displayed in final_result QLabel, type of result (for text color))
"""
result_text,result_type = result
match result_type:
case 0:
color_name = 'black'
case -1:
color_name = 'red'
case 1:
color_name = 'green'
self.global_result.setStyleSheet("""
QLabel {
color: %s};
font-size: 16px;
padding: 10px;
}
"""%color_name)
self.global_result.setText(result_text)
self.global_result.show()
[docs]
class TablePage(QWidget):
"""
Page containing a detailed statistics table and its row-wise average.
Parameters
----------
parent : QWidget
Parent widget.
title : str, optional
Page title.
columns : int, optional
Number of columns in the tables.
"""
def __init__(self,parent,title='TablePage',columns=1):
super().__init__()
self.parent = parent
self.title = title
self.layout = QVBoxLayout()
self.splitter = QSplitter(Qt.Vertical)
self.table = QTableWidget()
self.aver_table = QTableWidget()
self.setLayout(self.layout)
self.splitter.addWidget(self.table)
self.splitter.addWidget(self.aver_table)
self.splitter.setSizes([int(self.height()*0.95),int(self.height()*0.05)])
self.layout.addWidget(self.splitter)
self.table.setColumnCount(columns)
self.aver_table.setColumnCount(columns)
self.aver_table.setRowCount(1)
self.aver_table.verticalHeader().setDefaultSectionSize(self.aver_table.height())
self.table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch)
self.aver_table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch)
self.table.setSelectionBehavior(QAbstractItemView.SelectRows)
self.table.setSelectionMode(QAbstractItemView.ExtendedSelection)
self.table.itemSelectionChanged.connect(self.average_selected)
[docs]
def set_title(self,title):
"""
Set column headers for both the main and average tables.
Parameters
----------
title : list[str]
Column titles.
"""
self.table.setHorizontalHeaderLabels(title)
self.aver_table.setHorizontalHeaderLabels(title)
[docs]
def add_row(self,data):
"""
Append a single row to the main table.
Parameters
----------
data : Sequence
Row values.
"""
row_index = self.table.rowCount()
self.table.insertRow(row_index)
for col,value in enumerate(data):
self.table.setItem(row_index, col, QTableWidgetItem(str(value)))
[docs]
def add_data(self,data):
"""
Append multiple rows to the main table.
Parameters
----------
data : Iterable[Sequence]
Rows to append.
"""
for line in data:
self.add_row(line)
[docs]
def average_selected(self):
"""
Compute the column-wise average for selected rows and show it below.
"""
selected = self.table.selectedItems()
temp = np.array([el.text() for el in selected]).reshape(-1, 6).T
data = temp.astype(float).mean(axis=1)
for col,value in enumerate(data):
self.aver_table.setItem(0, col, QTableWidgetItem(str(value)))
[docs]
class GraphPage(QWidget):
"""
Page with one or more plotting canvases built on pyqtgraph.
Parameters
----------
parent : QWidget
Parent widget.
canvas_count : int, optional
Number of plot canvases.
title : str, optional
Page title.
title_plots : Sequence[str] or None, optional
Titles for each canvas.
x_labels, y_labels : Sequence[str] or None, optional
Axis labels for each canvas.
color : tuple, optional
Default foreground color.
bg_color : tuple, optional
Background color.
n_colors : int, optional
Size of the categorical color palette.
autoSize : bool, optional
Whether to enable auto-ranging on the Y axis.
"""
def __init__(self, parent, canvas_count=1, title='PlotPage', title_plots=None, x_labels=None, y_labels=None,
color=(255, 255, 255), bg_color=(240, 240, 230),n_colors = 8,autoSize = True):
super().__init__()
self.autoSize = autoSize
self.bg_color = bg_color
self.color = color
self.fixed_colors = [
pg.mkColor('blue'), # Синий
pg.mkColor('red')
]
if x_labels is None: x_labels = ['x'] * canvas_count
if y_labels is None: y_labels = ['y'] * canvas_count
if title_plots is None: title_plots = [f'plot{i}' for i in range(canvas_count)]
self.canvas_adj = {title_plots[i]: i for i in range(canvas_count)}
self.parent = parent
self.title = title
self.plot_spaces = [pg.PlotWidget() for _ in range(canvas_count)]
[self.pyqt_settings(pw_id) for pw_id in self.plot_spaces]
self.layout = QtWidgets.QVBoxLayout()
self.setLayout(self.layout)
for i in range(canvas_count):
self.plot_spaces[i].showGrid(x=True, y=True)
self.plot_spaces[i].setTitle(title_plots[i])
self.layout.addWidget(self.plot_spaces[i])
self.plot_spaces[i].setLabel('bottom', x_labels[i])
self.plot_spaces[i].setLabel('left', y_labels[i])
self.palette_colors = [pg.intColor(i,hues=n_colors) for i in range(n_colors)]
[docs]
def pyqt_settings(self,plot_widget):
"""
Apply common pyqtgraph settings to a plot widget.
Parameters
----------
plot_widget : pg.PlotWidget
Target plot widget.
"""
plot_widget.setBackground(self.bg_color)
plot_widget.addLegend(brush=self.bg_color)
plot_widget.setMouseEnabled(y=False, x=True)
vb = plot_widget.getViewBox()
# Включить автомасштабирование по Y
if self.autoSize:
vb.enableAutoRange(axis='y')
# Установить видимое автомасштабирование
vb.setAutoVisible(y=True)
[docs]
def add_plot(self, data, plot_name, color='w', canvas_name=None):
"""
Plot a 2D curve on the specified canvas.
Parameters
----------
data : tuple(ndarray, ndarray)
X and Y arrays.
plot_name : str
Name for the legend.
color : str or tuple, optional
Pen color.
canvas_name : str or None, optional
Canvas identifier; when None, use the first canvas.
"""
if canvas_name is None:
plot_id = 0
else:
plot_id = self.canvas_adj[canvas_name]
pen = pg.mkPen(color=color)
self.plot_spaces[plot_id].plot(data[0], data[1], name=plot_name, pen=pen)
self.plot_spaces[plot_id].getAxis('bottom').setVisible(True)
[docs]
def add_line(self, data, y_max, color='w',canvas_name=None):
"""
Draw vertical reference lines at X positions up to `y_max`.
Parameters
----------
data : array_like
X positions of lines.
y_max : float
Maximum Y extent for the lines.
color : str or tuple, optional
Pen color or 'mult' to use a color palette.
canvas_name : str or None, optional
Canvas identifier; when None, use the first canvas.
"""
try:
if canvas_name is None:
plot_id = 0
else:
plot_id = self.canvas_adj[canvas_name]
y_min = 0
x = np.column_stack([data,
data,
np.full_like(data, np.nan)])
y = np.column_stack([np.full_like(data, y_min),
np.full_like(data, y_max),
np.full_like(data, np.nan)])
if color == 'mult':
length = x.shape[0]
indices = [np.arange(i,length,len(self.palette_colors)) for i in range(len(self.palette_colors))]
x_s = [np.take(x,idx,axis=0) for idx in indices]
y_s = [np.take(y,idy,axis=0) for idy in indices]
pens = [pg.mkPen(color=self.palette_colors[i%len(self.palette_colors)]) for i in range(y.shape[0])]
for figure_index in range(len(self.palette_colors)):
self.plot_spaces[plot_id].plot(x_s[figure_index].ravel(),y_s[figure_index].ravel(),pen = pens[figure_index])
#self.plot_spaces[plot_id].plot(x,y,pen=pens)
else:
x = x.ravel()
y = y.ravel()
pen = pg.mkPen(color=color, style=QtCore.Qt.DashLine)
self.plot_spaces[plot_id].plot(x, y, pen=pen)
self.plot_spaces[plot_id].getAxis('bottom').setVisible(True)
except Exception as e:
print(e)
# self.plot_space.addItem(pg.InfiniteLine(pos=x,angle=90,pen=pen,movable=False))
[docs]
def add_dot(self,data,y_level,color = 'w',canvas_name = None,symbol = 'o'):
"""
Scatter plot of points at a fixed Y level.
Parameters
----------
data : array_like
X positions for the markers.
y_level : float
Y coordinate for all markers.
color : str or tuple, optional
Color or 'mult' to use a palette.
canvas_name : str or None, optional
Canvas identifier; when None, use the first canvas.
symbol : str, optional
Marker symbol.
"""
if canvas_name is None:
plot_id = 0
else:
plot_id = self.canvas_adj[canvas_name]
x = data.ravel()
y = np.full_like(x, y_level)
if color == 'mult':
length = x.size
indices = [np.arange(i, length, len(self.palette_colors)) for i in range(len(self.palette_colors))]
x_s = [np.take(x, idx) for idx in indices]
y_s = [np.take(y, idy) for idy in indices]
colors = [self.palette_colors[i % len(self.palette_colors)] for i in range(length)]
print(len(self.palette_colors))
for figure_index in range(len(self.palette_colors)):
print('!!!!!!!!!!!',figure_index)
print(x_s[figure_index],y_s[figure_index])
self.plot_spaces[plot_id].plot(x=x_s[figure_index], y=y_s[figure_index], symbol=symbol,symbol_size=10,symbolBrush=self.palette_colors[figure_index])
# self.plot_spaces[plot_id].plot(x,y,pen=pens)
else:
self.plot_spaces[plot_id].plot(x=x, y=y,symbol=symbol,symbol_size=10,symbolBrush=color)
self.plot_spaces[plot_id].getAxis('bottom').setVisible(True)
[docs]
def add_plot_mul(self, ds):
"""
Render multiple plot primitives given a compact descriptor list.
Parameters
----------
ds : Iterable[tuple]
Each entry encodes a plot instruction; see producer for details.
"""
# print(ds)
for data in ds:
if data[-2] == 'p':
self.add_plot(data[0], data[1], data[2], data[-1])
elif data[-2] == 'vln':
self.add_dot(data[0], 0, data[3], data[-1])
#self.add_line(data[0], data[1], data[3], data[-1])
[docs]
class StatGraphPage(GraphPage):
"""
Page for visualizing summary statistics distributions across datasets.
Plots include standard deviation, dip test statistic/p-value, skewness,
and kurtosis histograms for raw and aligned data.
"""
def __init__(self, parent, title='StatPage', x_labels=None, y_labels=None,
color=(255, 255, 255), bg_color=(240, 240, 230), p_val=0.05):
super().__init__(parent, canvas_count=4, title=title, title_plots=('std_dev', 'modality (dip test) p-value', 'skewness', 'kurtosis'),
x_labels=x_labels, y_labels=y_labels, color=color, bg_color=bg_color,autoSize=False)
#self.table_un = pg.TableWidget() # сколько всего точек, медианное отклонение, число точек не мономодальных
#self.table_list = {'unpaired': self.table_un}
self.p = p_val
#self.table_data = np.zeros((3, 2))
self.layout.setStretch(0, 1) # Виджет 1
self.layout.setStretch(1, 1) # Виджет 2
self.layout.setStretch(2, 1) # Виджет 3
self.layout.setStretch(3, 1)
self.table_layout = QtWidgets.QHBoxLayout()
self.layout.addLayout(self.table_layout)
#self.table_layout.addWidget(self.table_un)
[docs]
def add_row(self,table_name,data):
"""
Append a row into an auxiliary table by name.
"""
row_index = self.table_data[table_name].rowCount()
self.table_data[table_name].insertRow(row_index)
for col,value in enumerate(data):
self.table_data[table_name].setItem(row_index, col, QTableWidgetItem(str(value)))
[docs]
def add_data(self,table_name,data):
"""
Append multiple rows into an auxiliary table by name.
"""
for line in data:
self.add_row(table_name,line)
[docs]
def add_plot_mul(self, ds):
"""
Plot multiple histogram-based statistics for provided datasets.
Parameters
----------
ds : Sequence
Sequence of ``((data_arrays), label)`` pairs.
"""
for n in range(len(ds)):
data = ds[n][0]
data_name = ds[n][1]
ds_color = self.fixed_colors[n]
#self.table_data[0, n] = len(data[0])
self.add_plot(data[0], f'st dev {data_name}', ds_color, 'std_dev')
self.add_plot(data[2], f'dip {data_name}', ds_color, 'modality (dip test) p-value')
self.add_line(data = 0.05,y_max=len(data[2]),canvas_name='modality (dip test) p-value',color='black')
self.add_plot(data[3], f'skew {data_name}', ds_color, 'skewness')
self.add_plot(data[4], f'kurt {data_name}', ds_color, 'kurtosis')
#self.table_data[1, n] = np.where(data[2] < self.p)[0].size
#self.table_data[2, n] = np.median(data[0])
#self.table_un.setData(self.table_data)
#self.table_un.setHorizontalHeaderLabels([str(i) for i in range(len(ds))])
#self.table_un.setVerticalHeaderLabels(['total', 'is multimodal', 'median std dev'])
[docs]
def add_plot(self, data, plot_name, color, canvas_name=None):
"""
Plot a histogram-like step curve of the provided data.
Parameters
----------
data : array_like
Data to histogram.
plot_name : str
Name for the legend.
color : str or tuple
Pen color.
canvas_name : str or None, optional
Canvas identifier.
"""
if canvas_name is None:
plot_id = 0
else:
plot_id = self.canvas_adj[canvas_name]
pen = pg.mkPen(color=color)
no_nan = lambda arr: arr[~np.isnan(arr)]
y, x = np.histogram(no_nan(data), bins=1000)
self.plot_spaces[plot_id].plot(x, y, stepMode=True, name=plot_name, pen=pen)
self.plot_spaces[plot_id].getAxis('bottom').setVisible(True)
'''functions declaration'''
[docs]
def construct_output(p_value, var_raw, var_aln,alpha = 0.05):
"""
Detect peaks in a KDE curve and return their centers and boundaries.
Parameters
----------
p_value : np.ndarray
Array with all p-values
var_raw : np.ndarray
Array with dispersion for all peaks in raw data
var_aln : np.ndarray
Array with dispersion for all peaks in aln data
alpha: float
Confidence level. Default is 0.05.
Returns
-------
result_type: float
type of result: -1 is negative, +1 is positive, 0 is not statistically significant.
result_text: str
exact text of result message which will be displayed.
"""
s_val, simes_significance = simes(p_value, alpha)
delta_var_all= np.mean(var_raw - var_aln)
delta_var_significant = np.mean((var_raw - var_aln)[np.where(p_value<=alpha)])
if simes_significance: #if significant
if delta_var_significant>0:
result_type = 1
result_text = f"""
<div style='font-family: Times New Roman, font-size=12px;'>
<b>Alignment is better than raw data</b><br>
Simes = <b>{s_val:.2e}</b><br>
Δσ<sup>2</sup>(total) = <b>{delta_var_all:.2e}</b><br>
Δσ<sup>2</sup>(sign.) = <b>{delta_var_significant:.2e}</b>
</div>
"""
else:
result_type = -1
result_text = f"""
<div style='font-family: Times New Roman, font-size=12px;'>
<b>Alignment is worse than raw data</b><br>
Simes = <b>{s_val:.2e}</b><br>
Δσ<sup>2</sup>(total) = <b>{delta_var_all:.2e}</b><br>
Δσ<sup>2</sup>(sign.) = <b>{delta_var_significant:.2e}</b>
</div>
"""
else:
result_type = 0
result_text = f"""
<div style='font-family: Times New Roman, font-size=12px;'>
<b>The differences are not significant</b><br>
Simes = <b>{s_val:.2e}</b><br>
Δσ<sup>2</sup>(total) = <b>{delta_var_all:.2e}</b><br>
Δσ<sup>2</sup>(sign.) = <b>{delta_var_significant:.2e}</b>
</div>
"""
return result_text, result_type
[docs]
def peak_picking(X, Y, oversegmentation_filter=None, peak_location=1):
"""
Detect peaks in a KDE curve and return their centers and boundaries.
Parameters
----------
X : ndarray
Monotonic array of X coordinates (e.g., m/z grid).
Y : ndarray
Corresponding density/height values.
oversegmentation_filter : float or None, optional
Minimal allowed separation between adjacent peaks; when provided, peaks
closer than this threshold are merged.
peak_location : float, optional
Fraction of the peak height to compute a barycentric center; used in
boundary calculations as a threshold. Default is 1.
Returns
-------
pk_x : ndarray
Estimated peak centers (X positions). May contain NaNs if a region has
no samples above the threshold.
left : ndarray
Left boundary (valley position) for each peak.
right : ndarray
Right boundary (valley position) for each peak.
"""
n = X.size
# Robust valley finding
valley_dots = np.concatenate((np.where(np.diff(Y) != 0)[0], [n-1]))
loc_min = np.diff(Y[valley_dots])
loc_min = (np.array([True,*(loc_min < 0)])) & np.array(([*(loc_min > 0),True]))
left_min = np.concatenate([[-1],valley_dots[:-1]])[loc_min][:-1] + 1
right_min = valley_dots[loc_min][1:]
# Compute max and min for every peak
size = left_min.shape
val_max = np.empty(size)
pos_peak = np.empty(size)
for idx, [lm, rm] in enumerate(zip(left_min, right_min)):
pp = lm + np.argmax(Y[lm:rm])
vm = np.max(Y[lm:rm])
val_max[idx] = vm
pos_peak[idx] = pp
# Remove over-segmented peaks
if oversegmentation_filter:
while True:
peak_threshold = val_max * peak_location - math.sqrt(np.finfo(float).eps)
pk_x = np.empty(left_min.shape)
for idx, [lm, rm, th] in enumerate(zip(left_min, right_min, peak_threshold)):
mask = Y[lm:rm] >= th
if np.sum(mask) == 0:
pk_x[idx]=np.nan
else:
pk_x[idx] = np.sum(Y[lm:rm][mask] * X[lm:rm][mask]) / np.sum(Y[lm:rm][mask])
dpk_x = np.concatenate(([np.inf], np.diff(pk_x), [np.inf]))
j = np.where((dpk_x[1:-1] <= oversegmentation_filter) & (dpk_x[1:-1] <= dpk_x[:-2]) & (dpk_x[1:-1] < dpk_x[2:]))[0]
if j.size == 0:
break
left_min = np.delete(left_min, j + 1)
right_min = np.delete(right_min, j)
val_max[j] = np.maximum(val_max[j], val_max[j + 1])
val_max = np.delete(val_max, j + 1)
else:
peak_threshold = val_max * peak_location - math.sqrt(np.finfo(float).eps)
pk_x = np.empty(left_min.shape)
for idx, [lm, rm, th] in enumerate(zip(left_min, right_min, peak_threshold)):
mask = Y[lm:rm] >= th
if np.sum(mask) == 0:
pk_x[idx]=np.nan
else:
pk_x[idx] = np.sum(Y[lm:rm][mask] * X[lm:rm][mask]) / np.sum(Y[lm:rm][mask])
return pk_x, X[left_min], X[right_min]
[docs]
@njit()
def sort_dots_numba(ds: np.ndarray, left: np.ndarray, right: np.ndarray) -> list:
"""
Group values into bins defined by paired left/right boundaries.
Parameters
----------
ds : ndarray
Values to be grouped.
left : ndarray
Left boundaries for each bin.
right : ndarray
Right boundaries for each bin.
Returns
-------
flat_grouped_values : ndarray
Concatenated values from all bins.
split_indices : ndarray
Indices to split `flat_grouped_values` into original bins.
"""
num_intervals = left.size
num_ds = ds.size
# 1. Сначала подсчитываем, сколько элементов попадает в каждый интервал
counts = np.zeros(num_intervals, dtype=np.int64)
for i in range(num_intervals):
# np.sum() на булевом массиве работает в nopython режиме
counts[i] = np.sum((ds >= left[i]) & (ds <= right[i]))
# 2. Вычисляем индексы для разделения. Это будет [0, count_0, count_0+count_1, ...]
split_indices = np.zeros(num_intervals + 1, dtype=np.int64)
split_indices[1:] = np.cumsum(counts)
# 3. Создаем плоский массив для всех найденных значений
total_elements = split_indices[-1]
flat_grouped_values = np.empty(total_elements, dtype=ds.dtype)
# Создаем копию индексов, чтобы отслеживать, куда вставлять следующий элемент для каждого интервала
current_indices = split_indices.copy()
for i in range(num_ds):
val = ds[i]
for j in range(num_intervals):
if left[j] <= val <= right[j]:
# Находим позицию для вставки и вставляем значение
idx_to_insert = current_indices[j]
flat_grouped_values[idx_to_insert] = val
# Увеличиваем индекс для следующего элемента в этом интервале
current_indices[j] += 1
# Поскольку интервалы не пересекаются, можно прервать внутренний цикл
break
return flat_grouped_values, split_indices
[docs]
def sort_dots(ds: np.ndarray, left: np.ndarray, right: np.ndarray) -> list:
"""
Wrapper above sort_dots_numba to return grouped values as a list.
Parameters
----------
ds : ndarray
Values to be grouped.
left : ndarray
Left boundaries for each bin.
right : ndarray
Right boundaries for each bin.
Returns
-------
list of ndarray
For each interval [left[i], right[i]], the subset of `ds` within it.
"""
if len(left) == 0:
return []
flat_values, split_ind = sort_dots_numba(ds, left, right)
ret = np.split(flat_values, split_ind[1:-1])
return ret
[docs]
def get_long_and_short(arr_1: np.ndarray, arr_2: np.ndarray) -> (np.ndarray, np.ndarray, bool):
"""
Return the longer and shorter of two arrays and a flag indicating order.
Parameters
----------
arr_1, arr_2 : ndarray
Arrays to compare by first-dimension length.
Returns
-------
long : ndarray
The longer array.
short : ndarray
The shorter array.
flag : bool
True if ``arr_1`` is the longer array, else False.
"""
size1, size2 = arr_1.shape[0], arr_2.shape[0]
if size1 > size2:
return arr_1, arr_2, True
else:
return arr_2, arr_1, False
[docs]
def get_opt_strip(arr_long: Dataset, arr_short: Dataset, flag: bool) -> (Dataset, Dataset):
"""
Align two sequences by shifting the longer to minimize mean squared error.
Parameters
----------
arr_long : Dataset
Longer dataset.
arr_short : Dataset
Shorter dataset.
flag : bool
True if `arr_long` corresponds to the original first argument from
``get_long_and_short``.
Returns
-------
Dataset, Dataset
Sliced/shifted versions with equal length, ordered to match the flag.
"""
size = arr_short.shape[0]
long_size = arr_long.shape[0]
max_shift = long_size - size + 1
shift_array = np.arange(max_shift)
score_array = np.zeros(max_shift)
for i in shift_array:
fit_score = np.mean((arr_short - arr_long[i:i + size]) ** 2)
score_array[i] = fit_score
opt_shift = np.where(score_array == score_array.min())[0][0]
opt_long = arr_long[opt_shift:opt_shift + size]
if flag:
return opt_long, arr_short
else:
return arr_short, opt_long
[docs]
def verify_datasets(data_1: LinkedList, data_2: LinkedList, threshold=1.0) -> (LinkedList, LinkedList):
"""
Verify and co-trim two sorted datasets so that element-wise differences are bounded.
The function optionally removes one outlier (by index) and re-aligns to
satisfy the threshold, returning two arrays of equal length.
Parameters
----------
data_1, data_2 : LinkedList
Input datasets to verify.
threshold : float or str, optional
Maximum allowed absolute difference between paired values. If
``'dist_based'``, the mean difference is used as the threshold.
Returns
-------
LinkedList, LinkedList
Verified (possibly trimmed) datasets of equal size.
"""
if data_1.size != data_2.size:
data1_new, data2_new = get_opt_strip(*get_long_and_short(data_1, data_2))
else:
data1_new = data_1
data2_new = data_2
dist_array = data1_new - data2_new
if threshold == 'dist_based':
threshold = np.mean(dist_array)
score_fit = np.max(np.abs(dist_array))
if score_fit > threshold:
cut_index = np.array([np.where(np.abs(dist_array) >= threshold)]).min()
if data1_new[cut_index] < data2_new[cut_index]:
data1_new2 = data1_new.sync_delete(cut_index)
data2_new2 = data2_new
else:
data2_new2 = data2_new.sync_delete(cut_index)
data1_new2 = data1_new
return get_opt_strip(*get_long_and_short(data1_new2, data2_new2))
return data1_new, data2_new
_DATA_RAW = None
_DATA_ALN = None
_IDX = None # (mz_idx_raw, intensity_idx_raw, spectra_idx_raw, mz_idx_aln, intensity_idx_aln, spectra_idx_aln)
_REF_DEV = None # (REF, DEV)
[docs]
def pool_initializer(data_raw, data_aln, idx_tuple, ref, dev):
"""
Pool initializer: store global references to datasets, indices, and params.
Parameters
----------
data_raw : ndarray
Raw dataset array loaded from HDF5.
data_aln : ndarray
Aligned dataset array loaded from HDF5.
idx_tuple : tuple[int, int, int, int, int, int]
``(mz_idx_raw, intensity_idx_raw, spectra_idx_raw, mz_idx_aln,
intensity_idx_aln, spectra_idx_aln)`` indices into the datasets.
ref : float
Reference m/z value for ``find_ref``.
dev : float
Allowed deviation (±) around ``ref`` for reference search.
Notes
-----
Stores the arguments into module-level globals (``_DATA_RAW``, ``_DATA_ALN``,
``_IDX``, ``_REF_DEV``) to avoid repeated pickling and argument passing to
worker processes.
"""
global _DATA_RAW, _DATA_ALN, _IDX, _REF_DEV
_DATA_RAW = data_raw
_DATA_ALN = data_aln
_IDX = idx_tuple
_REF_DEV = (ref, dev)
[docs]
def process_spectrum(task):
"""
Process a single spectrum task and return datasets for raw and aligned.
Parameters
----------
task : tuple[int, int, int, int, int]
``(spec_id, r0, r1, a0, a1)`` where ``[r0:r1]`` and ``[a0:a1]`` are
inclusive slices for raw and aligned blocks belonging to ``spec_id``.
Returns
-------
tuple
``(spec_id, arr_raw, arr_aln)`` where ``arr_raw`` and ``arr_aln`` are
NumPy arrays representing ``Dataset`` instances for the spectrum.
"""
spec_id, r0, r1, a0, a1 = task
mz_idx_raw, intensity_idx_raw, _s_idx_r, mz_idx_aln, intensity_idx_aln, _s_idx_a = _IDX
REF, DEV = _REF_DEV
# извлечь и отсортировать по m/z
data_raw_unsorted = _DATA_RAW[[mz_idx_raw, intensity_idx_raw], r0:r1 + 1]
data_aln_unsorted = _DATA_ALN[[mz_idx_aln, intensity_idx_aln], a0:a1 + 1]
order_raw = np.argsort(data_raw_unsorted[0])
order_aln = np.argsort(data_aln_unsorted[0])
data_raw = data_raw_unsorted[:, order_raw]
data_aln = data_aln_unsorted[:, order_aln]
data_raw_mz, data_aln_mz = data_raw[0], data_aln[0]
data_raw_int, data_aln_int = data_raw[1], data_aln[1]
data_raw_linked = Dataset(data_raw_mz, data_raw_int)
data_aln_linked = Dataset(data_aln_mz, data_aln_int)
checked_raw, checked_aln = verify_datasets(data_raw_linked, data_aln_linked, 1)
_, ref_aln = find_ref(checked_aln, REF, DEV)
_, ref_raw = find_ref(checked_raw, REF, DEV)
checked_raw.reference = ref_raw
checked_aln.reference = ref_aln
return spec_id, np.array(checked_raw), np.array(checked_aln)
[docs]
def find_ref(dataset: Dataset, approx_mz: float, deviation=1.0) -> [float, float]:
"""
Locate a reference peak near an approximate m/z within a deviation window.
Parameters
----------
dataset : Dataset
Sorted m/z values (primary) with intensities as linked data.
approx_mz : float
Approximate m/z for the reference.
deviation : float, optional
Allowed deviation around `approx_mz` for candidate search.
Returns
-------
tuple
Pair ``(index, mz)`` of the selected reference peak.
"""
condition_1 = approx_mz - deviation <= dataset
condition_2 = approx_mz + deviation >= dataset
where_construct = np.where(condition_1 & condition_2)
if where_construct[0].size:
ref_index = where_construct[0][np.argmax(dataset.linked_array[where_construct])]
else:
ref_index = np.argmin(np.abs(dataset - approx_mz))
return ref_index, dataset[ref_index]
[docs]
def read_dataset(self, dataset_raw: np.ndarray, attrs_raw: list, dataset_aln: np.ndarray,
attrs_aln: list, REF, DEV, limit=None, processes: int = 0):
"""
Prepare per-spectrum datasets and emit progress for the UI, with optional
sequential or parallel execution (multiprocessing.Pool).
Overview
--------
- Resolve indices of required columns by headers (m/z and intensity).
- Build contiguous segments for each spectrum id based on the spectra index.
- Create tasks only for spectrum ids present in both raw and aligned inputs.
- For each task: slice the subarrays, sort by m/z, verify alignment
(``verify_datasets``), find a reference peak around ``REF`` within ``DEV``
(``find_ref``), and store the result as a ``Dataset`` with a ``reference``.
- Emit progress after each spectrum is processed.
Modes
-----
- Sequential (``processes <= 0``): runs in the main thread, preserving
existing variable names and logic.
- Parallel (``processes > 0``): uses ``multiprocessing.Pool`` with an
initializer (``pool_initializer``) and worker (``process_spectrum``).
Tasks are processed in parallel; results may arrive unordered and are
placed by ``spec_id``.
Parameters
----------
self : WorkerSignals
Object used to emit progress bar initialization and updates.
dataset_raw, dataset_aln : ndarray
Raw and aligned datasets read from HDF5.
attrs_raw, attrs_aln : list of str
Column headers for the respective datasets.
REF : float
Reference m/z seed.
DEV : float
Acceptable deviation (±) around ``REF`` for reference search.
limit : int or None, optional
Optional limit on the number of spectra to process (debugging).
processes : int, optional
Number of processes for ``multiprocessing.Pool``. ``<= 0`` means
sequential mode. Default is 0.
Returns
-------
ndarray
Array of shape ``(2, N)`` with ``dtype=Dataset``, where ``N`` is the
number of processed spectra. ``dataset_list[0, spec_id]`` corresponds to
the raw dataset; ``dataset_list[1, spec_id]`` to the aligned dataset.
Notes
-----
- Only spectrum ids present in both raw and aligned datasets are processed.
- The progress bar is initialized based on the number of tasks (common ids).
- In parallel mode, result arrival order is not guaranteed.
"""
row_raw = DatasetHeaders(attrs_raw)
row_aln = DatasetHeaders(attrs_aln)
int_type = None
if "mz" in attrs_raw:
mz_type = "mz"
else:
mz_type = "peak"
if "Intensity" not in attrs_raw:
for column in ["Area","SNR"]:
if column in attrs_raw:
int_type = column
break
else:
int_type = "Intensity"
if int_type is None:
raise Exception('Intensity type not stated in attrs_raw, check file input')
index_row_raw = dataset_raw[row_raw("spectra_ind")]
index_row_aln = dataset_aln[row_aln("spectra_ind")]
start_index, end_index = int(min(index_row_raw)), int(max(index_row_raw))
if limit is not None:
if start_index + limit <= end_index:
end_index = start_index + limit
set_num = end_index - start_index + 1
dataset_list = np.empty((2, set_num), dtype=Dataset)
# предрасчёт сегментов по каждому индексу спектра
segments_raw = build_segments(index_row_raw)
segments_aln = build_segments(index_row_aln)
# список задач только для тех индексов, которые есть и в raw, и в aln
tasks = []
for spec_id in range(start_index, end_index + 1):
if spec_id in segments_raw and spec_id in segments_aln:
r0, r1 = segments_raw[spec_id]
a0, a1 = segments_aln[spec_id]
tasks.append((spec_id, r0, r1, a0, a1))
# инициализация прогресса по числу задач
self.create_pbar.emit((0, len(tasks)))
# последовательная ветка — сохранить существующую логику имен/переменных
if processes <= 0:
for spec_n, (spec_id, r0, r1, a0, a1) in enumerate(tasks):
index_raw, index_aln = np.where(index_row_raw == spec_id)[0], np.where(index_row_aln == spec_id)[0]
data_raw_unsorted = dataset_raw[row_raw([mz_type,int_type]), index_raw[0]:index_raw[-1] + 1]
data_aln_unsorted = dataset_aln[row_aln([mz_type,int_type]), index_aln[0]:index_aln[-1] + 1]
data_raw = data_raw_unsorted[:, np.argsort(data_raw_unsorted, axis=1)[0]]
data_aln = data_aln_unsorted[:, np.argsort(data_aln_unsorted, axis=1)[0]]
data_raw_mz, data_aln_mz = data_raw[0], data_aln[0]
data_raw_int, data_aln_int = data_raw[1], data_aln[1]
data_raw_linked = Dataset(data_raw_mz, data_raw_int)
data_aln_linked = Dataset(data_aln_mz, data_aln_int)
checked_raw, checked_aln = verify_datasets(data_raw_linked, data_aln_linked, 1)
_, ref_aln = find_ref(checked_aln, REF, DEV)
_, ref_raw = find_ref(checked_raw, REF, DEV)
checked_raw.reference = ref_raw
checked_aln.reference = ref_aln
dataset_list[0, spec_id] = np.array(checked_raw)
dataset_list[1, spec_id] = np.array(checked_aln)
self.progress.emit(spec_n)
return dataset_list
# параллельная ветка — Pool с минимальными изменениями
mz_idx_raw = row_raw(mz_type)
intensity_idx_raw = row_raw(int_type)
spectra_idx_raw = row_raw("spectra_ind")
mz_idx_aln = row_aln(mz_type)
intensity_idx_aln = row_aln(int_type)
spectra_idx_aln = row_aln("spectra_ind")
init_args = (
dataset_raw,
dataset_aln,
(mz_idx_raw, intensity_idx_raw, spectra_idx_raw,
mz_idx_aln, intensity_idx_aln, spectra_idx_aln),
REF, DEV,
)
multiprocessing.util.FINALIZE_MAX_DELAY = 10
with Pool(processes=processes, initializer=pool_initializer, initargs=init_args) as pool:
for spec_n, (spec_id, arr_raw, arr_aln) in enumerate(pool.imap_unordered(process_spectrum, tasks)):
dataset_list[0, spec_id] = arr_raw
dataset_list[1, spec_id] = arr_aln
self.progress.emit(spec_n)
return dataset_list
[docs]
def build_segments(spectra_index_row: np.ndarray) -> dict[int, tuple[int, int]]:
"""
Build contiguous [start, end] slices for each value of ``spectra_ind``.
Parameters
----------
spectra_index_row : ndarray
1-D array of spectrum identifiers, typically the ``spectra_ind`` row
from an HDF5 dataset.
Returns
-------
dict[int, tuple[int, int]]
Mapping from spectrum id to an inclusive ``(start, end)`` slice within
``spectra_index_row`` covering its contiguous block.
"""
segments: dict[int, tuple[int, int]] = {}
if spectra_index_row.size == 0:
return segments
change_pos = np.where(np.diff(spectra_index_row) != 0)[0]
starts = np.concatenate(([0], change_pos + 1))
ends = np.concatenate((change_pos, [spectra_index_row.size - 1]))
ids = spectra_index_row[ends]
for s, e, spec_id in zip(starts, ends, ids):
segments[int(spec_id)] = (int(s), int(e))
return segments
[docs]
def prepare_array(distances):
"""
Concatenate per-peak distances and build a 2-row sorted view with indices.
Parameters
----------
distances : ndarray or Sequence
Pair or sequence of sequences to concatenate and index.
Returns
-------
ndarray
A 2 x K array with sorted values in row 0 and original indices in row 1.
"""
concatenated = np.array([np.concatenate(sub) for sub in distances])
indexes = np.repeat(np.arange(len(distances[0])), [len(sub_arr) for sub_arr in distances[0]])
pre_sorted = np.vstack((concatenated, indexes))
result = pre_sorted[:, pre_sorted[0].argsort()]
return result
[docs]
def simes(p_value, alpha = 0.05):
"""
Calculate Simes method p-value for whole spectrum
Parameters
----------
p_value : ndarray
p-value array
alpha : float
Confidence level. Default is 0.05
Returns
-------
float
simes value
bool
is test statistically significant
"""
p_vals = np.sort(p_value)
count = len(p_vals)
simes_value = np.min(count*p_vals/np.arange(1, count+1))
simes_significance = simes_value < alpha
return simes_value, simes_significance
[docs]
def concover(arr1:np.ndarray,arr2:np.ndarray):
"""
Compare two distributions using a rank-based variance (Conover-like) test.
Parameters
----------
arr1, arr2 : ndarray
Samples from two distributions.
Returns
-------
float
p-value for the test of equal scale/dispersion.
"""
dev = lambda data: np.abs(data - np.median(data))
dev1 = dev(arr1)
dev2 = dev(arr2)
all_devs = np.hstack((dev1, dev2))
ranks = stats.rankdata(all_devs)
rank1 = ranks[:len(dev1)]
rank2 = ranks[len(dev1):]
n = len(dev1)+len(dev2)
mean_rank = (n+1)/2
ss_between = (
len(dev1)*(np.mean(rank1)-mean_rank)**2 +
len(dev2)*(np.mean(rank2)-mean_rank)**2)
ss_total = np.sum((ranks-mean_rank)**2)
t = (n-1)*ss_between/ss_total
p_value = 1 - stats.chi2.cdf(t, 2)
return p_value
[docs]
def stat_params_paired_single(peak_raw, peak_aln, alpha=0.05,return_p = True):
"""
Compute paired statistics between raw and aligned peak positions.
For each matched peak, compute mean difference, variances, normality check (to choose acceptable hypothesis tests) and JS-divergence
Parameters
----------
peak_raw, peak_aln : array_like
Samples of raw and aligned values for a single peak.
alpha : float, optional
Significance level used in tests. Default is 0.05.
return_p : bool, optional
If True, function will return exact p-value, otherwise result of comparison with significance level. Default is True.
Returns
-------
tuple
``(mean_diff, var_raw, var_aln, js_div, neq_mean, neq_var)``
where boolean flags are returned as floats (0.0/1.0).
"""
# вычислить среднее и дисперсии, проверить нормальность, проверить гипотезы о значимости различия средних и дисперсий, возможно посчитать форму распределения
jsd = lambda p,q: 0.5*(sum(rel_entr(p,q))+sum(rel_entr(q,p)))
kde_single_peak = lambda dots,n_eval: FFTKDE(bw='silverman',kernel='gaussian').fit(dots).evaluate(n_eval)[1]
norm_var = lambda data: np.var(data-np.mean(data),ddof=1)
mean_r, mean_a = np.mean(peak_raw), np.mean(peak_aln)
var_r, var_a = norm_var(peak_raw), norm_var(peak_aln)
check_normal_func = lambda data,p: stats.kstest(data,'norm',args=(np.mean(data),np.std(data)))[1]>p
check_normal = check_normal_func(peak_raw, alpha) & check_normal_func(peak_aln, alpha)
neq_var_p_val = stats.levene(peak_raw, peak_aln)[1]
if check_normal:
#neq_var_p_val = stats.levene(peak_raw, peak_aln)[1]
neq_mean_p_val = stats.ttest_ind(peak_raw, peak_aln,nan_policy='omit')[1]
else:
neq_mean_p_val = stats.mannwhitneyu(peak_raw, peak_aln)[1]
#neq_var_p_val = stats.fligner(peak_raw, peak_aln)[1]
if return_p:
neq_var = neq_var_p_val
neq_mean = neq_mean_p_val
else:
neq_var = neq_var_p_val < alpha
neq_mean = neq_mean_p_val < alpha
return np.array([mean_r - mean_a, var_r, var_a,jsd(kde_single_peak(peak_raw,20),kde_single_peak(peak_aln,20)), float(neq_mean), float(neq_var)])
[docs]
def stat_params_unpaired(ds):
"""
Compute unpaired per-group statistics for a list of arrays.
Parameters
----------
ds : Sequence[array_like]
Sequence of samples (e.g., peak positions per bin).
Returns
-------
ndarray
Array with columns: variance, dip statistic, dip p-value, skewness,
kurtosis for each group.
"""
res = np.array([[np.var(dot), *diptest(dot), stats.skew(dot), stats.kurtosis(dot)] for dot in ds])
return res
[docs]
def moving_average(a, n=2):
"""
Compute the simple moving average over a 1D array.
Parameters
----------
a : ndarray
Input array.
n : int, optional
Window size. Default is 2.
Returns
-------
ndarray
Averaged array of length ``len(a) - n + 1``.
"""
ret = np.cumsum(a, dtype=float)
ret[n:] = ret[n:] - ret[:-n]
return ret[n - 1:] / n
[docs]
def out_criteria(mz, intensity, int_threshold = 0.01, max_diff = 0.4, width_eps = 0.1):
"""
.. warning::
Current version of pipeline doesn't use this function
Identify outlier peak intervals based on intensity and width heuristics.
Parameters
----------
mz : Dataset or LinkedList
Peak centers with linked boundaries in `linked_array`.
intensity : ndarray
Intensities corresponding to `mz` centers.
int_threshold : float, optional
Fraction of the maximum intensity below which points are flagged.
max_diff : float, optional
Maximum relative change between consecutive intensities (as |a/b - 1|).
width_eps : float, optional
Threshold on normalized width ratio used for flagging.
Returns
-------
ndarray
Indices of points considered outliers.
"""
min_int = np.max(intensity) * int_threshold
first_or = intensity < min_int
int_criteria = abs(intensity[:-1] / intensity[1:] - 1) < max_diff
width_criteria = np.diff(mz) / moving_average(np.diff(mz.linked_array).flatten()) <= width_eps
second_or = np.full(mz.shape, False)
second_or[1:] = np.logical_and(int_criteria, width_criteria)
return np.where(np.logical_or(first_or, second_or))[0]
[docs]
def criteria_apply(arr, intensity):
"""
.. warning::
Current version of pipeline doesn't use this function
Merge narrow neighboring intervals and drop flagged indices.
Parameters
----------
arr : LinkedList
Peak centers with linked left/right boundaries.
intensity : ndarray
Intensities used to evaluate the criteria.
Returns
-------
LinkedList
Filtered peaks with adjusted boundaries.
"""
arr_out = copy.deepcopy(arr)
indexes = out_criteria(arr, intensity)
for index in indexes:
arr_out.linked_array[index-1] = sorted([arr.linked_array[index-1,0],arr.linked_array[index,1]])
return arr_out.sync_delete(indexes)
if __name__ == '__main__':
multiprocessing.freeze_support()
multiprocessing.set_start_method('spawn')
app = QApplication(sys.argv)
app_icon = QIcon('main_ico.png')
app.setWindowIcon(app_icon)
main_window = MainWindow()
main_window.showMaximized()
sys.exit(app.exec_())