Source code for IDSimPy.analysis.trajectory

# -*- coding: utf-8 -*-

import gzip
import json
import io
import h5py
import numpy as np
from enum import Enum

import IDSimPy


[docs] class OptionalAttribute(Enum): """ Optional trajectory attributes identifiers: Distinct and extensible identifiers for optional trajectory data attributes. """ PARTICLE_MASSES = 1 #: Particle masses PARTICLE_CHARGES = 2 #: Particle charges
[docs] class ParticleAttributes: """ Container class for heterogeneous particle attributes. This container class can store a set of named additional attributes for every particle of a trajectory, for every time step. The attributes can be of type float or integer. The two sets of attributes are stored in different arrays internally, and are therefore passed seperately to the constructor. Similarly to internal representation of ``positions`` in :py:class:`Trajectory` class, the shape of the internal arrays depends if the trajectory this ParticleAttribute object is belonging to is static or not: * If the trajectory is static: The internal arrays are ``numpy.ndarray`` with the shape ``[n ions, particle attribute, n time steps]``. With 5 particles, 4 additional numerical attributes (e.g. x,y,z and velocity) and 15 time steps the shape would be ``[5, 4, 15]``. * If the trajectory is not static: The internal arrays are ``lists`` of ``numpy.ndarray`` with the shape ``[particle attribute, n ions]`` """ def __init__(self, attribute_names_float=None, attributes_float=None, attribute_names_int=None, attributes_int=None): """ Constructs a new particle attribute container. The names and actual attribute data for floating point (float) and integer attributes are passed seperately. :param attribute_names_float: List of attribute names of the float attributes, can be None if no floating point attributes are stored :type attributes_names_float: list of str :param attributes_float: Floating point attribute data. Either three dimensional np.ndarray for static data or list of two dimensioal np.ndarray for non static data :type attributes_float: np.ndarray or list of np.ndarray :param attribute_names_int: List of attribute names of the integer attributes, can be None if no integer point attributes are stored :type attributes_names_int: list of str :param attributes_int: Integer point attribute data. Either three dimensional np.ndarray for static data or list of two dimensioal np.ndarray for non static data :type attributes_float: np.ndarray or list of np.ndarray """ self.attr_names_float = attribute_names_float self.attr_names_int = attribute_names_int self.attr_dat_float = attributes_float self.attr_dat_int = attributes_int self.attr_names = [] float_static = None float_n_ts = None n_attr_float = 0 if self.attr_dat_float is not None: self.attr_names += self.attr_names_float n_attr_float = len(attribute_names_float) if type(self.attr_dat_float) == np.ndarray: float_static = True float_n_ts = np.shape(self.attr_dat_float)[2] n_columns_float = np.shape(self.attr_dat_float)[1] elif type(self.attr_dat_float) == list: float_static = False float_n_ts = len(self.attr_dat_float) n_columns_float = [np.shape(i)[1] for i in self.attr_dat_float if np.size(np.shape(i)) == 2][0] else: raise TypeError('Wrong type for float particle attributes, has to be an numpy.ndarray or a list of numpy.ndarrays') if n_columns_float != len(self.attr_names_float): raise ValueError('Wrong number of data columns for particle attributes (float)') int_static = None int_n_ts = None n_attr_int = 0 if self.attr_dat_int is not None: self.attr_names += self.attr_names_int n_attr_int = len(attribute_names_int) if type(self.attr_dat_int) == np.ndarray: int_static = True int_n_ts = np.shape(self.attr_dat_int)[2] n_columns_int = np.shape(self.attr_dat_int)[1] elif type(self.attr_dat_int) == list: int_static = False int_n_ts = len(self.attr_dat_int) n_columns_int = [np.shape(i)[1] for i in self.attr_dat_int if np.size(np.shape(i)) == 2][0] else: raise TypeError('Wrong type for int particle attributes, has to be an numpy.ndarray or a list of numpy.ndarrays') if n_columns_int != len(self.attr_names_int): raise ValueError('Wrong number of data columns for particle attributes (int)') self.n_attr = n_attr_float + n_attr_int if float_static is not None and int_static is not None: if float_static == int_static: self.is_static = float_static else: raise ValueError('Float and int particle attributes have to be both equally static or non static') if float_n_ts == int_n_ts: self.n_timesteps = float_n_ts else: raise ValueError('Float and int attribute data arrays inconsistent in time step axis') else: if float_static is not None: self.is_static = float_static self.n_timesteps = float_n_ts else: self.is_static = int_static self.n_timesteps = int_n_ts attributes_ids = [] if attribute_names_float: attributes_ids += [(self.attr_names_float[i], True, i) for i in range(len(self.attr_names_float))] if attribute_names_int: attributes_ids += [(self.attr_names_int[i], False, i) for i in range(len(self.attr_names_int))] self.attr_name_map = {ai[0]: (ai[1], ai[2]) for ai in attributes_ids} @property def attribute_names(self): """ Names of all stored parameter attributes """ return self.attr_names @property def number_of_timesteps(self): """ Number of time steps particle attribute data is stored for """ return self.n_timesteps @property def number_of_attributes(self): """ Number of stored particle attributes """ return self.n_attr def _select_nonstatic(self, selected_particle_indices): if len(selected_particle_indices) != self.n_timesteps: raise ValueError('Length of list of selected particle ids differs from number of time steps') if self.is_static: if self.attr_names_float: selected_attrs_float = [self.attr_dat_float[selected_particle_indices[i], :, i] for i in range(self.n_timesteps)] else: selected_attrs_float = None if self.attr_names_int: selected_attrs_int = [self.attr_dat_int[selected_particle_indices[i], :, i] for i in range(self.n_timesteps)] else: selected_attrs_int = None else: if self.attr_names_float: selected_attrs_float = [self.attr_dat_float[i][selected_particle_indices[i], :] for i in range(self.n_timesteps)] else: selected_attrs_float = None if self.attr_names_int: selected_attrs_int = [self.attr_dat_int[i][selected_particle_indices[i], :] for i in range(self.n_timesteps)] else: selected_attrs_int = None return selected_attrs_float, selected_attrs_int
[docs] def select(self, selected_particle_indices): """ Select individual particles based on their indices and return new ParticleAttribute container for the selected particles. The selector data in `selected_particle_indices` can be a simple vector of indices to select, if the particle attributes are static. If not, a list of selected indices is expected, one entry per time step. Note that this variable selection mode is also possible for static particle attributes. :param selected_particle_indices: Indices to select :type selected_particle_indices: vector of int or list of vector of int """ if type(selected_particle_indices) == np.ndarray: static_selector = True else: static_selector = False if not self.is_static and static_selector: return TypeError("Particle attribute selection with static selection for multiple time steps" " is only possible with static trajectories") if static_selector: if self.attr_names_float: selected_attrs_float = self.attr_dat_float[selected_particle_indices, :, :] else: selected_attrs_float = None if self.attr_names_int: selected_attrs_int = self.attr_dat_int[selected_particle_indices, :, :] else: selected_attrs_int = None else: selected_attrs_float, selected_attrs_int = self._select_nonstatic(selected_particle_indices) return ParticleAttributes( self.attr_names_float, selected_attrs_float, self.attr_names_int, selected_attrs_int )
[docs] def get(self, attrib_name, timestep_index=None): """ Gets and returns individual particle attributes. Attributes are specified by their name. If the time step index is not specified, the attribute is returned for all time steps. If the particle attributes are static this is an two dimensional array with the dimensions [n particles, n time steps]. For non static particle attributes this is a list of vectors (arrays with one column) with the length of [n particles]. If the time step index is specified, a vector (arrays with one column) with length of [n particles] is returned. :param attrib_name: Name of the particle attribute to return. :type attrib_name: str :param timestep_index: Index of the time step to return (can be Null for all time steps) :type timestep_index: int """ ap = self.attr_name_map[attrib_name] if ap[0]: attr_dat = self.attr_dat_float else: attr_dat = self.attr_dat_int if self.is_static: if timestep_index is not None: return attr_dat[:, ap[1], timestep_index] else: return attr_dat[:, ap[1], :] else: if timestep_index is not None: return attr_dat[timestep_index][:, ap[1]] else: return [ attr_dat[i][:, ap[1]] if attr_dat[i].ndim == 2 else attr_dat[i] for i in range(self.n_timesteps) ]
[docs] def get_attribs_for_particle(self, particle_index, timestep_index): """ Returns a list of the particle attributes for a single particle at a specified time step. :param particle_index: The index of the particle to get the attributes for :type particle_index: int :param timestep_index: The index of the time step to get the attributes for :type timestep_index: int """ if self.attr_names_float: if self.is_static: float_attribs = self.attr_dat_float[particle_index, :, timestep_index].tolist() else: float_attribs = self.attr_dat_float[timestep_index][particle_index, :].tolist() else: float_attribs = [] if self.attr_names_int: if self.is_static: int_attribs = self.attr_dat_int[particle_index, :, timestep_index].tolist() else: int_attribs = self.attr_dat_int[timestep_index][particle_index, :].tolist() else: int_attribs = [] return float_attribs + int_attribs
[docs] class StartSplatTrackingData: """ Simple container class for start / splat data of simulated particles """ def __init__(self, start_times, start_positions, start_velocities, splat_times, splat_positions, splat_velocities, splat_states): """ Constructs a new StartSplatTrackingData container :param start_times: A vector of start times for the particles (1 dimensional numpy array) :type start_times: np.ndarray with shape (n_ions, 1) :param start_positions: A vector of start positions of the particles (numpy array with three columns) :type start_times: np.ndarray with shape (n_ions, 3) :param start_velocities: A vector of start velocities of the particles (numpy array with three columns) :type start_velocities: np.ndarray with shape (n_ions, 3) :param splat_times: A vector of splat times for the particles (1 dimensional numpy array) :type splat_times: np.ndarray with shape (n_ions, 1) :param splat_positions: A vector of splat positions of the particles (numpy array with three columns) :type splat_positions: np.ndarray with shape (n_ions, 3) :param splat_velocities: A vector of splat velocities of the particles (numpy array with three columns) :type splat_velocities: np.ndarray with shape (n_ions, 3) :param splat_states: A vector of splat state for the particles (1 dimensional integer numpy array) :type splat_states: np.ndarray with shape (n_ions, 1) """ self.start_times: np.ndarray = start_times self.start_positions: np.ndarray = start_positions self.splat_times: np.ndarray = splat_times self.splat_positions: np.ndarray = splat_positions self.splat_states: np.ndarray = splat_states self.start_velocities: np.ndarray = start_velocities self.splat_velocities: np.ndarray = splat_velocities
[docs] class Trajectory: """ An IDSimF particle simulation trajectory. The simulation trajectory combines the result of an IDSimF particle simulation in one object. The trajectory consists of the positions of simulated particles at the time steps of the simulation, the times of the time steps and optional attributes of the simulated particles. The trajectory can be "static" which means that the number of particles is not changing between the time steps. :ivar positions: Particle positions. The particles are stored in a different scheme, depending if the trajectory is static: * If the trajectory is static: **positions** is a ``numpy.ndarray`` with the shape ``[n ions, spatial dimensions, n time steps]``. With 5 particles and 15 time steps the shape would be ``[5, 3, 15]``. * If the trajectory is not static: **positions** is a ``list`` of ``numpy.ndarray`` with the shape ``[spatial dimensions, n ions]`` :ivar times: Vector of simulated times for the individual time frames. :type times: numpy.ndarray :ivar n_timesteps: Number of time steps in the trajectory :type n_timesteps: int :ivar particle_attributes: Optional simulation result attributes for the simulated particles. Basically, particle attributes are a vector of numeric additional particle attributes, attached to every particle in every time step. The particle attributes are provided in a dedicated container class :py:class:`ParticleAttributes` :type particle_attributes: ParticleAttributes :ivar start_splat_data: Optional information about particle start and particle termination (splat) times and locations, which is also stored in a dedicated container class :py:class:`StartSplatTrackingData` :type start_splat_data: StartSplatTrackingData :ivar optional_attributes: dictionary of optional / free form additional attributes for the trajectory :type optional_attributes: dict :ivar is_static_trajectory: Flag if the trajectory is static. :type is_static_trajectory: bool """ def __init__(self, positions=None, times=None, particle_attributes=None, start_splat_data=None, optional_attributes=None, file_version_id=0): """ Constructor: (for details about the shape of the parameters see the class docsting) :param positions: Particle positions :type positions: numpy.ndarray or list[numpy.ndarray] :param times: Times of the simulation time steps :type times: numpy.ndarray with shape ``[n timesteps, 1]`` :param particle_attributes: Additional attributes for every particle for every time step, provided by a ParticleAttributes container object :type particle_attributes: ParticleAttributes :param start_splat_data: Particle start and termination ("splat") tracking data (start / splat times and positions) :type start_splat_data: StartSplatTrackingData :param optional_attributes: Optional attributes dictionary :type optional_attributes: dict :param file_version_id: File version id :type file_version_id: int """ if type(positions) == np.ndarray: self.is_static_trajectory = True if len(positions.shape) != 3 or positions.shape[1] != 3: raise ValueError('Static positions have wrong shape') self.n_timesteps = positions.shape[2] elif type(positions) == list: self.is_static_trajectory = False self.n_timesteps = len(positions) else: raise TypeError('Wrong type for positions, has to be an numpy.ndarray or a list of numpy.ndarrays') if type(times) != np.ndarray: raise TypeError('Wrong type for times, a numpy vector is expected') if times.shape[0] != self.n_timesteps: raise ValueError('Times vector has wrong length') if particle_attributes is not None: if type(particle_attributes) != ParticleAttributes: raise ValueError('Particle attributes argument has to be of type ParticleAttributes') if particle_attributes.is_static != self.is_static_trajectory: if self.is_static_trajectory: raise ValueError('Non static particle attributes passed for static trajectory') else: raise ValueError('Static particle attributes passed for non static trajectory') if start_splat_data is not None: if type(start_splat_data) != StartSplatTrackingData: raise ValueError('Start / splat tracking data has wrong type, has to be of type StartSplatTrackingData') self.positions = positions self.times: np.ndarray = times self.particle_attributes: ParticleAttributes = particle_attributes self.start_splat_data: StartSplatTrackingData = start_splat_data self.optional_attributes = optional_attributes self.file_version_id: int = file_version_id def __len__(self): return self.n_timesteps def __getitem__(self, timestep_index): if self.is_static_trajectory: return self.positions[:, :, timestep_index] else: return self.positions[timestep_index]
[docs] def get_n_particles(self, timestep_index=None): """ Returns the static number of particles in a static trajectory :return: Number of particles in the static trajectory :rtype: int """ if self.is_static_trajectory: return self.positions.shape[0] else: if timestep_index is not None: return self.positions[timestep_index].shape[0] else: raise AttributeError("Time step independent number of ions is only defined for static trajectories")
n_particles = property(get_n_particles)
[docs] def get_positions(self, timestep_index): """ Get particle positions for a time step :param timestep_index: The index of the time step to get the positions for :type timestep_index: int :return: Array of particle positions for a time step. Dimensions are ``[n particles, spatial dimensions]`` :rtype: numpy.ndarray """ return self[timestep_index]
[docs] def get_particle(self, particle_index, timestep_index): """ Get particle values (positions and additional attributes) for a particle at a specified time step :param particle_index: The index of the particle :type particle_index: int :param timestep_index: The index of the time step :type timestep_index: int :return: Tuple with the position and the attribute vector for the particle at the selected time step :rtype: tuple of two numpy.ndarray """ if self.is_static_trajectory: pos = self.positions[particle_index, :, timestep_index] attributes = self.particle_attributes.get_attribs_for_particle(particle_index, timestep_index) else: pos = self.positions[timestep_index][particle_index, :] attributes = self.particle_attributes.get_attribs_for_particle(particle_index, timestep_index) return pos, attributes
# -------------- Trajectory input -------------- #
[docs] def read_trajectory_file_for_project(project_name, file_type): if file_type == 'hdf5': file_ext = "_trajectories.h5" tr = read_hdf5_trajectory_file(project_name + file_ext) elif file_type == 'legacy_hdf5': file_ext = "_trajectories.h5" tr = read_legacy_hdf5_trajectory_file(project_name + file_ext) elif file_type == 'compressed': file_ext = "_trajectories.json.gz" tr = read_json_trajectory_file(project_name + file_ext) elif file_type == 'json': file_ext = "_trajectories.json" tr = read_json_trajectory_file(project_name + file_ext) else: raise ValueError('illegal file type flag (not hdf5, json or compressed)') return tr
[docs] def read_json_trajectory_file(trajectory_filename): """ Reads a json trajectory file and returns a trajectory object :param trajectory_filename: File name of the file to read :type trajectory_filename: str :return: Trajectory object with trajectory data :rtype: Trajectory """ if trajectory_filename[-8:] == ".json.gz": with gzip.open(trajectory_filename) as tf: tj = json.load(io.TextIOWrapper(tf)) else: with open(trajectory_filename) as tf: tj = json.load(tf) steps = tj["steps"] n_timesteps = len(steps) nIons = len(steps[0]["ions"]) times = np.zeros(len(steps)) positions = np.zeros([nIons, 3, len(steps)]) n_additional_parameters = len(steps[0]["ions"][0]) - 1 additional_parameters = np.zeros([nIons, n_additional_parameters, n_timesteps]) additional_parameters_names = ['attribute '+str(i+1) for i in range(n_additional_parameters)] for i in range(n_timesteps): for j in range(nIons): positions[j, :, i] = np.array(steps[i]["ions"][j][0]) additional_parameters[j, :, i] = np.array(steps[i]["ions"][j][1:]) times[i] = float(steps[i]["time"]) masses = np.zeros([nIons]) masses_json = tj["ionMasses"] for i in range(len(masses_json)): masses[i] = float(masses_json[i]) splat_times = np.zeros([nIons]) splat_times_json = tj["splatTimes"] for i in range(len(splat_times_json)): splat_times[i] = float(splat_times_json[i]) optional_attributes = {OptionalAttribute.PARTICLE_MASSES: masses} result = Trajectory( positions=positions, times=times, particle_attributes=ParticleAttributes(additional_parameters_names, additional_parameters), optional_attributes=optional_attributes) return result
def _read_hdf5_v2_trajectory(tra_group): attribs = tra_group.attrs file_version_id = attribs['file version'][0] n_timesteps = attribs['number of timesteps'][0] timesteps_group = tra_group['timesteps'] times = tra_group['times'] particle_attributes_names = None if 'auxiliary parameter names' in attribs.keys(): particle_attributes_names = [ name.decode('UTF-8') if isinstance(name, bytes) else name for name in attribs['auxiliary parameter names'] ] positions = [] particle_attributes = [] n_ion_per_frame = [] for ts_i in range(n_timesteps): ts_group = timesteps_group[str(ts_i)] ion_positions = np.array(ts_group['positions']) n_ion_per_frame.append(np.shape(ion_positions)[0]) positions.append(ion_positions) if particle_attributes_names: particle_attributes.append(np.array(ts_group['aux_parameters'])) unique_n_ions = len(set(n_ion_per_frame)) # if more than one number of ions are present in the frames, the trajectory is not static # and has variable frames # if the trajectory is static, transform the trajectory to the old format returned by # legacy hdf5 and json files to allow compatibility with the visualization methods if unique_n_ions > 1: static_trajectory = False else: static_trajectory = True positions = np.dstack(positions) particle_attributes_dat = None if particle_attributes_names: particle_attributes_dat = particle_attributes if static_trajectory: particle_attributes_dat = np.dstack(np.array(particle_attributes_dat)) result = Trajectory( positions=positions, times=np.array(times), particle_attributes=ParticleAttributes(particle_attributes_names, particle_attributes_dat), file_version_id=file_version_id) return result
[docs] def read_hdf5_trajectory_file(trajectory_file_name): """ Reads a version 2 or 3 hdf5 trajectory file (which allows also exported simulation frames with variable number of particles. :param trajectory_file_name: Name of the file to read :type trajectory_file_name: str :return: Trajectory object with trajectory data :rtype: Trajectory """ with h5py.File(trajectory_file_name, 'r') as hdf5file: tra_group = hdf5file['particle_trajectory'] attribs = tra_group.attrs file_version_id = attribs['file version'][0] if file_version_id == 2: return _read_hdf5_v2_trajectory(tra_group) n_timesteps = attribs['number of timesteps'][0] timesteps_group = tra_group['timesteps'] times = tra_group['times'] particle_attributes_names_float = None if 'attributes names' in attribs.keys(): particle_attributes_names_float = [ name.decode('UTF-8') if isinstance(name, bytes) else name for name in attribs['attributes names'] ] particle_attributes_names_int = None if 'integer attributes names' in attribs.keys(): particle_attributes_names_int = [ name.decode('UTF-8') if isinstance(name, bytes) else name for name in attribs['integer attributes names'] ] positions = [] particle_attributes_float = [] particle_attributes_int = [] n_ion_per_frame = [] for ts_i in range(n_timesteps): ts_group = timesteps_group[str(ts_i)] if 'positions' in ts_group.keys(): ion_positions = np.array(ts_group['positions']) else: ion_positions = np.empty([0, 3]) # maintain correct dimensionality even in empty array n_ion_per_frame.append(np.shape(ion_positions)[0]) positions.append(ion_positions) if particle_attributes_names_float: if n_ion_per_frame[ts_i] == 0: particle_attributes_float.append(np.empty([0, len(particle_attributes_names_float)])) else: particle_attributes_float.append(np.array(ts_group['particle_attributes_float'])) if particle_attributes_names_int: if n_ion_per_frame[ts_i] == 0: particle_attributes_int.append(np.empty([0, len(particle_attributes_names_int)], dtype=int)) else: particle_attributes_int.append(np.array(ts_group['particle_attributes_integer'], dtype=int)) unique_n_ions = len(set(n_ion_per_frame)) # if more than one number of ions are present in the frames, the trajectory is not static # and has variable frames # if the trajectory is static, transform the trajectory to the old format returned by # legacy hdf5 and json files to allow compatibility with the visualization methods if unique_n_ions > 1: static_trajectory = False else: static_trajectory = True positions = np.dstack(positions) # read auxiliary trajectory datasets optional_data_sets = None if 'optional_datasets' in tra_group.keys(): op_ds_group = tra_group['optional_datasets'] optional_data_sets = {ds_key: np.array(op_ds_group[ds_key]) for ds_key in op_ds_group.keys()} # read particle attributes p_attr_final_float = None if particle_attributes_names_float: p_attr_final_float = particle_attributes_float if static_trajectory: p_attr_final_float = np.dstack(np.array(p_attr_final_float)) p_attr_final_int = None if particle_attributes_names_int: p_attr_final_int = particle_attributes_int if static_trajectory: p_attr_final_int = np.dstack(np.array(p_attr_final_int, dtype=int)) p_attribs = ParticleAttributes( particle_attributes_names_float, p_attr_final_float, particle_attributes_names_int, p_attr_final_int) # read start / splat data start_splat_data = None if 'start_splat' in tra_group.keys(): ss_grp = tra_group['start_splat'] start_pos = np.array(ss_grp['particle start locations']) splat_pos = np.array(ss_grp['particle splat locations']) start_times = np.array(ss_grp['particle start times']) splat_times = np.array(ss_grp['particle splat times']) p_states = np.array(ss_grp['particle splat state'], dtype=int) if 'particle start velocities' in ss_grp.keys(): start_velocities = np.array(ss_grp['particle start velocities']) else: start_velocities = None if 'particle splat velocities' in ss_grp.keys(): splat_velocities = np.array(ss_grp['particle splat velocities']) else: splat_velocities = None start_splat_data = StartSplatTrackingData( start_times, start_pos, start_velocities, splat_times, splat_pos, splat_velocities, p_states ) # Read legacy splat data: if 'splattimes' in tra_group.keys(): splat_times = np.array(tra_group['splattimes']) start_splat_data = StartSplatTrackingData( None, None, splat_times, None, None, None, None ) result = Trajectory( positions=positions, times=np.array(times), optional_attributes=optional_data_sets, particle_attributes=p_attribs, start_splat_data=start_splat_data, file_version_id=file_version_id) return result
[docs] def read_legacy_hdf5_trajectory_file(trajectory_file_name): """ Reads a legacy hdf5 trajectory file (with static particles per exported simulation frame) :param trajectory_file_name: The name of the file to read :type trajectory_file_name: str :return: Trajectory object with trajectory data :rtype: Trajectory """ hdf5file = h5py.File(trajectory_file_name, 'r') tra_group = hdf5file['particle_trajectory'] attribs = tra_group.attrs positions = tra_group['positions'] times = tra_group['times'] aux_parameters_names = None aux_parameters = None if 'aux_parameters' in tra_group.keys(): #aux_parameters_names = [name.decode('UTF-8') for name in attribs['auxiliary parameter names']] aux_parameters_names = [ name.decode('UTF-8') if isinstance(name, bytes) else name for name in attribs['auxiliary parameter names'] ] aux_parameters = np.array(tra_group['aux_parameters']) result = Trajectory( positions=np.array(positions), times=np.array(times), particle_attributes=ParticleAttributes(aux_parameters_names, aux_parameters), file_version_id=1) return result
# -------------- Trajectory output / translation -------------- #
[docs] def export_trajectory_to_vtk(trajectory: Trajectory, vtk_file_base_name): """ Translates and exports an ion trajectory to a set of legacy VTK ascii files :param trajectory: The trajectory to translate and export :type json_file_name: Trajectory :param vtk_file_base_name: The base name of the vtk files to generate :type vtk_file_base_name: str """ header = """# vtk DataFile Version 2.0 BTree Test ASCII DATASET POLYDATA POINTS """ n_steps = trajectory.n_timesteps for i in range(n_steps): vtk_file_name = vtk_file_base_name + "%05d" % i + ".vtk" with open(vtk_file_name, 'w') as vtk_file: vtk_file.write(header + str(trajectory.get_n_particles()) + " float\n") ion_positions = trajectory.get_positions(i) for i_pos in ion_positions[:, :]: vtk_file.write(str(i_pos[0]) + " " + str(i_pos[1]) + " " + str(i_pos[2]) + " \n")
# -------------- Data Processing Methods -------------- #
[docs] def filter_attribute(trajectory, attribute_name, value): """ Filters select ions according to a value of a specified particle attribute. The method takes a :py:class:`Trajectory` the name of a particle attribute and a value which is selected for and constructs a new :py:class:`Trajectory` from the filtered data. Currently optional trajectory attributes and splat times are **not** retained. This method returns always a variable, non static Trajectory. :param trajectory: Trajectory object with the trajectory data to filter for :type trajectory: Trajectory :param attribute_name: Name of a particle attribute to filter for :type attribute_name: str :param value: Value to filter for: This value is used as selector :return: A Trajectory object with filtered particle positions :rtype: Trajectory """ n_ts = trajectory.n_timesteps # iterate through time steps and construct time step wise selected index arrays filtered_indexes = [ np.nonzero(trajectory.particle_attributes.get(attribute_name, i) == value)[0] for i in range(n_ts) ] new_positions = [trajectory.get_positions(i)[filtered_indexes[i], :] for i in range(n_ts)] new_particle_attributes = trajectory.particle_attributes.select(filtered_indexes) result = Trajectory( positions=new_positions, particle_attributes=new_particle_attributes, times=trajectory.times) return result
[docs] def select(trajectory, selector_data, value): """ Selects simulated particles according to given value in an array of selector data and constructs a new :py:class:`Trajectory` with the selected data. This method is primarily intended to provide a flexible mechanism to select particles from a trajectory with a custom constructed parameter to be used for selection. :param trajectory: The trajectory object to be selected from :param selector_data: Selector data which assigns one value of a parameter to be used for selection to every particle. * if a vector (one dimensional array) is provided it is assumed, that the particle related parameter which is filtered for is stable across all time steps. This is only possible for static :py:class:`Trajectory` objects. * An individual filtering for every time step is done when a list of selector data vectors, one per time step, is provided. Every time step then has an vector of parameter values for the individual particles. This mode is possible for static an variable :py:class:`Trajectory` objects. :type selector_data: numpy.ndarray or list of numpy.ndarray :param value: Value to select for: Particles with this value are selected from the trajectory object. :return: Trajectory with selected data :rtype: Trajectory """ # if selector is a vector: Same filtering for all time steps if type(selector_data) is np.ndarray and selector_data.ndim == 1: static_selector = True selected_indices = np.nonzero(selector_data == value)[0] elif type(selector_data) is list: # we have a different filter parameter vector per time step # filtered particles per time step could variate: generate a vector per time step static_selector = False n_ts = len(selector_data) selected_indices = [np.nonzero(selector_data[i] == value)[0] for i in range(n_ts)] else: raise TypeError('Wrong data type for selector_data. One dimensional numpy array or list of one dimensional ' 'numpy arrays expected') new_splat_times = None if static_selector: if trajectory.is_static_trajectory: new_positions = trajectory.positions[selected_indices, :, :] new_particle_attributes = trajectory.particle_attributes.select(selected_indices) else: raise TypeError('Variable trajectory can not be filtered with static selector_data') else: new_positions = [trajectory.get_positions(i)[selected_indices[i], :] for i in range(n_ts)] new_particle_attributes = trajectory.particle_attributes.select(selected_indices) result = Trajectory( positions=new_positions, times=trajectory.times, particle_attributes=new_particle_attributes, start_splat_data=trajectory.start_splat_data ) return result
[docs] def is_active_particle(trajectory, true_val=True, false_val=False): """ Constructs a selection map / boolean array which particles are active in the individual time frames :param trajectory: Input trajectory """ splat_times = trajectory.start_splat_data.splat_times global_index = trajectory.particle_attributes.get('global index') times = trajectory.times # create boolean array for selection if trajectory.is_static_trajectory: is_active = [ np.array([true_val if times[frame_number] < splat_times[gi][0] else false_val for gi in global_index[:,frame_number]]) for frame_number in range(len(times))] else: is_active = [ np.array([true_val if times[frame_number] < splat_times[gi][0] else false_val for gi in global_index[frame_number]]) for frame_number in range(len(times))] return is_active
[docs] def filter_for_active_particles(trajectory): """ Select only active (non splatted) particles from a trajectory and constructs a new trajectory from it :param trajectory: Input trajectory """ is_active = is_active_particle(trajectory) filtered_trajectory = select(trajectory, is_active, True) return filtered_trajectory
[docs] def center_of_charge(trajectory): """ Calculates the center of charge of an ensemble of particles in a Trajectory. **Note:** If there is no explicit information about the particle charges in the input trajectory object (the optional trajectory attribute ``OptionalAttribute.PARTICLE_MASSES`` is not present) *all* particles are assumed to be singly positively charged. :param trajectory: Trajectory to calculate the center of charge for :type trajectory: Trajectory :return: Vector of the spatial position of the center of mass: Array with time steps as first and spatial dimension (x,y,z) as second dimension :rtype: numpy.ndarray """ n_timesteps = trajectory.n_timesteps coc = np.zeros((n_timesteps, 3)) particle_charges = None if trajectory.optional_attributes and OptionalAttribute.PARTICLE_CHARGES in trajectory.optional_attributes: particle_charges = trajectory.optional_attributes[OptionalAttribute.PARTICLE_CHARGES] for i in range(n_timesteps): p_pos = trajectory.get_positions(i) x_mean = np.average(p_pos[:, 0], weights=particle_charges) y_mean = np.average(p_pos[:, 1], weights=particle_charges) z_mean = np.average(p_pos[:, 2], weights=particle_charges) coc[i, :] = np.array([x_mean, y_mean, z_mean]) return coc