{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# [PyBroMo](http://tritemio.github.io/PyBroMo/) 5. Two-state dynamics - Dynamic smFRET simulation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "This notebook is part of PyBroMo a \n", "python-based single-molecule Brownian motion diffusion simulator \n", "that simulates confocal smFRET\n", "experiments.\n", "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## *Overview*\n", "\n", "*In this notebook we simulate a freely-diffusing smFRET experiment with dynamics between two states.*\n", "*The input are two smFRET files, one for each static state.*\n", "*These input files need to be simulations from the same particles trajectories \n", "*but with different E*." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Load static FRET data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pathlib import Path\n", "from textwrap import dedent, indent\n", "import numpy as np\n", "import tables\n", "from scipy.stats import expon\n", "import phconvert as phc\n", "print('phconvert version:', phc.__version__)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "SIM_PATH = 'data/'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "filelist = list(Path(SIM_PATH).glob('smFRET_*_600s.hdf5'))\n", "[f.name for f in filelist]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "filename_a = str([f for f in filelist if '11_E_40_Em' in f.name][0])\n", "filename_b = str([f for f in filelist if '11_E_75_Em' in f.name][0])\n", "filename_a, filename_b" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "da = tables.open_file(filename_a)\n", "db = tables.open_file(filename_b)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "da.filename" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "db.filename" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(da.root.description.read().decode())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(db.root.description.read().decode())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Make sure files a re using the same trajectories\n", "assert da.root.description.read().decode().split('\\n')[5] == db.root.description.read().decode().split('\\n')[5]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Timestamps, detectors and particles for the two states" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Timestamps\n", "times_a = da.root.photon_data.timestamps.read()\n", "times_b = db.root.photon_data.timestamps.read()\n", "\n", "# Detectors\n", "det_a = da.root.photon_data.detectors.read()\n", "det_b = db.root.photon_data.detectors.read()\n", "\n", "# Particle number for each timestamp\n", "par_a = da.root.photon_data.particles.read()\n", "par_b = db.root.photon_data.particles.read()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "par_a" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "acquisition_duration = da.root.acquisition_duration.read()\n", "assert acquisition_duration == db.root.acquisition_duration.read()\n", "print('Acquisition duration: %d s' % acquisition_duration)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "times_unit = da.root.photon_data.timestamps_specs.timestamps_unit.read()\n", "times_unit" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Simulation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Mean residence times for the two states a and b" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tau_a = 1e-3 / 5\n", "tau_b = 0.5e-3 / 5\n", "\n", "tau_s = [tau_a, tau_b]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Exponential distributions of residence times for the two states" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "expon_s = tuple(expon(scale=tau / times_unit) for tau in tau_s)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "n = int(1.1 * acquisition_duration / (tau_a + tau_b)) # Number of transitions (upper limit)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Define functions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def sim_two_states_single_particle(times_s, taus_s):\n", " \"\"\"Simulate 2-state transitions for a single particle.\n", " \n", " Arguments:\n", " times_s (tuple or arrays): 2-tuple of timestamps arrays\n", " for the two states a (times_s[0]) and b (times_s[1]).\n", " taus_s (tuple or arrays): 2-tuple of residence times arrays\n", " for the two states a (taus_s[0]) and b (taus_s[1]).\n", " \n", " Returns:\n", " List of index pairs. Each pair is a start/stop index for \n", " for the timestamps of current state for a specific residence time. \n", " The states are strictly alternating starting from 0 (i.e. a).\n", " \n", " - first pair: (state = 0) start/stop index for array `times_s[0]` (where 0 = state) \n", " corresponding to the residence time `taus_s[state][i_residence_time] = taus_s[0][0]`\n", " - second pair: (state = 1) start/stop index for array `times_s[1]` (where 1 = state) \n", " corresponding to the residence time `taus_s[state][i_residence_time] = taus_s[1][0]`\n", " - third pair: (state = 0) start/stop index for array `times_s[0]` (where 0 = state) \n", " corresponding to the residence time `taus_s[state][i_residence_time] = taus_s[0][1]`\n", " \n", " and so on.\n", " \"\"\"\n", " slices_list = []\n", "\n", " index_s = [0, 0] # indexes for looping thorugh the timestamps arrays\n", " index_start_s = [0, 0] # indexes of current state start in each timestamps array\n", " index_tau_s = [0, 0] # index of current time window duration\n", " t_start = 0 # time of current state start\n", "\n", " state, otherstate = 0, 1\n", " while ((index_s[0] < len(times_s[0]) - 1) and\n", " (index_s[1] < len(times_s[1]) - 1)):\n", " # Duration of current time window (i.e. duration of current state)\n", " tau = taus_s[state][index_tau_s[state]]\n", "\n", " # Find timestamps in current time window \n", " # for both timestamps arrays\n", " for state_i in (0, 1):\n", " times = times_s[state_i]\n", " delta_t = times[index_s[state_i]] - t_start\n", " while delta_t < tau and index_s[state_i] < len(times) - 1:\n", " index_s[state_i] += 1\n", " delta_t = times[index_s[state_i]] - t_start\n", " #print(state, index_s[state])\n", " \n", " # Save the timestamps only for current state\n", " slices_list.append((index_start_s[state], index_s[state]))\n", "\n", " # Save index of first timestamp in next time window\n", " index_start_s = index_s.copy()\n", " \n", " # Discard current \"used\" tau\n", " index_tau_s[state] += 1\n", "\n", " # Switch to a new state\n", " t_start += tau\n", " state, otherstate = otherstate, state\n", " return slices_list" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def sim_two_states(num_particles, times_states, det_states, par_states, times_unit, expon_s, seed=1):\n", " \"\"\"Simulate 2-state transitions for a set of particles.\n", " \n", " Arguments:\n", " num_particles (int): number of simulated particles.\n", " times_states (tuple of arrays): 2-tuple of timestamps arrays, one for each state\n", " det_states (tuple of arrays): 2-tuple of detectors arrays, one for each state\n", " par_states (tuple of arrays): 2-tuple of particles arrays, one for each state\n", " times_unit (float): timestamps unit in seconds.\n", " expon_s (tuple of scipy.stats distributions): 2-tuple of exponential distributions\n", " used to simulate residency times for each state. Each element is a frozen\n", " `scipy.stats.expon` distribution with scale parameter set according to the \n", " residency time for the corresponding state.\n", " \n", " Returns:\n", " Tuple of 2 lists:\n", " \n", " - List of timestamps arrays, one for each particle, after 2-states dynamics simulation.\n", " - List of detectors arrays, one for each particle, after 2-states dynamics simulation.\n", " \"\"\"\n", " np.random.seed(seed)\n", " times_p = []\n", " det_p = []\n", " for particle in range(num_particles):\n", " print('\\n- Simulating particle %d: ' % particle, end='', flush=True)\n", "\n", " # Get timestamps and detectors for current particle in each state\n", " print('timestamps..', end='', flush=True)\n", " masks_states = [par == particle for par in par_states]\n", " times_s = [memoryview(t_par[mask_par]) for t_par, mask_par in zip(times_states, masks_states)]\n", " det_s = [memoryview(det_par[mask_par]) for det_par, mask_par in zip(det_states, masks_states)]\n", " print('[done] ', end='', flush=True)\n", "\n", " # Simulate residence times\n", " print('residence..', end='', flush=True)\n", " taus_s = [memoryview(exp_dist.rvs(n)) for exp_dist in expon_s]\n", " sim_duration = np.sum(np.sum(taus) for taus in taus_s) * times_unit\n", " assert sim_duration > acquisition_duration\n", " print('[done] ', end='', flush=True)\n", "\n", " # Compute start/stop indexes for the timestamps for each residence time\n", " print('transition-index..', end='', flush=True)\n", " slices_list = sim_two_states_single_particle(times_s, taus_s)\n", " print('[done] ', end='', flush=True)\n", "\n", " # Create new timestamps and detectors to store dynamics simulation results \n", " print('merge..', end='', flush=True)\n", " times_size = sum([s[1] - s[0] for s in slices_list])\n", " times = np.zeros(times_size, dtype='int64')\n", " det = np.zeros(times_size, dtype='uint8')\n", " par = np.ones(times_size, dtype='uint8') * particle\n", " times_m = memoryview(times)\n", " det_m = memoryview(det)\n", "\n", " # istart, istop are indexes of times_m/det_m while the\n", " # start, stop indexes in slices_list refer to `times_s[state]`\n", " # where state = 0 for odd elements and state = 1 for even elements.\n", " # See `sim_two_states_single_particle()` for more info on `slice_list`.\n", " istart = 0\n", " state, otherstate = 0, 1\n", " for start, stop in slices_list:\n", " istop = istart + stop - start\n", " times_m[istart:istop] = times_s[state][start:stop]\n", " det_m[istart:istop] = det_s[state][start:stop]\n", " istart = istop\n", " state, otherstate = otherstate, state\n", "\n", " print('[done]', flush=True)\n", " assert (times != 0).all()\n", " times_p.append(times)\n", " det_p.append(det)\n", " return times_p, det_p" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Simulate dynamics" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "seed = 987123654 # random number generator seed\n", "\n", "times_p, det_p = sim_two_states(35, (times_a, times_b), (det_a, det_b), (par_a, par_b), \n", " times_unit=times_unit, expon_s=expon_s, seed=seed)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "assert all(all(np.diff(t) >= 0) for t in times_p)\n", "assert len(times_p) == len(det_p) == 35" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "det_p[0][:10]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "det_p[1][:10]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "times_p[0][:10]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "times_p[1][:10]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Add background\n", "\n", "Background is the same Poisson process in the two static files\n", "and it is saved as a virtual particle (index = 35, the \"36th\" virtual particle).\n", "Let's take the background from file **a** for simplicity." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "times_a[par_a == 35]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "det_a[par_a == 35]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "times_p.append(times_a[par_a == 35])\n", "det_p.append(det_a[par_a == 35])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Merge arrays from individual particles" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "times_dyn = np.hstack(times_p)\n", "det_dyn = np.hstack(det_p)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "argsort = times_dyn.argsort(kind='mergesort')\n", "times_dyn = times_dyn[argsort]\n", "det_dyn = det_dyn[argsort]\n", "det_dyn" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "par_dyn = np.hstack([det_p_i.size * [idx] for idx, det_p_i in enumerate(det_p)])\n", "assert par_dyn.shape[0] == sum(d.size for d in det_p)\n", "par_dyn = par_dyn[argsort]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Save data to Photon-HDF5" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def make_photon_hdf5(times, det, par, times_unit, description, identity=None):\n", " photon_data = dict(\n", " timestamps = times,\n", " timestamps_specs = dict(timestamps_unit=times_unit),\n", " detectors = det,\n", " particles = par,\n", " measurement_specs = dict(\n", " measurement_type = 'smFRET',\n", " detectors_specs = dict(spectral_ch1 = np.atleast_1d(0),\n", " spectral_ch2 = np.atleast_1d(1))))\n", "\n", " setup = dict(\n", " num_pixels = 2,\n", " num_spots = 1,\n", " num_spectral_ch = 2,\n", " num_polarization_ch = 1,\n", " num_split_ch = 1,\n", " modulated_excitation = False,\n", " lifetime = False,\n", " excitation_alternated=(False,),\n", " excitation_cw=(True,))\n", "\n", " provenance = dict(software='', software_version='', filename='')\n", "\n", " if identity is None:\n", " identity = dict()\n", "\n", " description = description\n", " acquisition_duration = np.round((times[-1] - times[0]) * times_unit)\n", " data = dict(\n", " acquisition_duration = round(acquisition_duration),\n", " description = description,\n", " photon_data = photon_data,\n", " setup=setup,\n", " provenance=provenance,\n", " identity=identity)\n", " return data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create description string" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "da.filename" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "db.filename" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "traj_descr = dedent('\\n'.join(da.root.description.read().decode().split('\\n')[4:7]))\n", "print(traj_descr)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "part_D_descr = indent(dedent('\\n'.join(da.root.description.read().decode().split('\\n')[9:11])), ' ')\n", "print(part_D_descr)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "state0_descr = indent(dedent('\\n'.join(da.root.description.read().decode().split('\\n')[11:13])), ' ')\n", "print(state0_descr)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "state1_descr = indent(dedent('\\n'.join(db.root.description.read().decode().split('\\n')[11:13])), ' ')\n", "print(state1_descr)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bg_descr = dedent('\\n'.join(da.root.description.read().decode().split('\\n')[-4:]))\n", "print(bg_descr)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tau_a_ms = tau_a * 1e3\n", "tau_b_ms = tau_b * 1e3\n", "\n", "tau_a_us = tau_a * 1e6\n", "tau_b_us = tau_b * 1e6" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "filename_a_name = Path(filename_a).name\n", "filename_b_name = Path(filename_b).name" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "description = f\"\"\"\\\n", "PyBroMo simulation of 2-states dynamics\n", "----------------------------------------\n", "\n", "{traj_descr}\n", "{part_D_descr}\n", "\n", "State 0:\n", " Residency time: {tau_a_ms} ms\n", "{state0_descr}\n", " filename: {filename_a_name}\n", " \n", "State 1:\n", " Residency time: {tau_b_ms} ms\n", "{state0_descr}\n", " filename: {filename_b_name}\n", " \n", "{bg_descr}\n", "\"\"\"\n", "print(description)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Save file" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "identity=dict(author='Antonino Ingargiola', \n", " author_affiliation='UCLA')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data = make_photon_hdf5(times_dyn, det_dyn, par_dyn, times_unit, description, identity=identity)\n", "data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "h5_fname = f'smFRET_0eb9b3_P_35_s0_D_2.5e-11_dynamics_E_40-75_Tau_{tau_a_us:.0f}-{tau_b_us:.0f}us_EmTot_226k-200k_BgD900_BgA600_t_max_600s.hdf5'\n", "h5_fname" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "phc.hdf5.save_photon_hdf5(data, h5_fname=h5_fname, overwrite=True)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.5.2" }, "toc": { "nav_menu": {}, "number_sections": false, "sideBar": true, "skip_h1_title": false, "toc_cell": false, "toc_position": {}, "toc_section_display": "block", "toc_window_display": true } }, "nbformat": 4, "nbformat_minor": 1 }