from __future__ import division
from collections import namedtuple, OrderedDict
import six
import numpy as np
import xarray as xr
import logging
from itertools import chain
import iucm.energy_consumption as en
from iucm.utils import docstrings
import iucm.utils as utils
import psyplot.data as psyd
logger = logging.getLogger(__name__)
def _nothing():
return None, None
#: Meta information for the variables in the 1D-output of the
#: :class:`PopulationModel`
fields = OrderedDict([
('cons', {
'units': 'MJ / inh',
'long_name': 'Energy consumption',
}),
('dist', {
'units': 'km',
'long_name': 'Average distance between two individuals',
}),
('entrop', {
'units': '-',
'long_name': 'Entropy',
}),
('rss', {
'units': '-',
'long_name': 'Rank-Size Slope',
}),
('cons_det', {
'units': 'MJ / inh',
'long_name': 'Deterministic energy consumption',
'comments': ('Energy consumption calculated based on based on '
'Le Nechet, 2012'),
}),
('cons_std', {
'units': 'MJ / inh',
'long_name': 'std. deviation of Energy consumption',
}),
('left_over', {
'units': 'inhabitants',
'long_name': 'Left over population that could not be subtracted',
}),
('nscenarios', {
'units': '-',
'long_name': (
'The number of scenarios that have been changed during this step'),
}),
])
Output = utils.append_doc(
namedtuple('Output', list(fields)),
"""The state of the model
The :func:`collections.namedtuple` defining the state of the model during
a single step. Each field of this class corresponds to one output variable
in the output netCDF of the :class:`PopulationModel`. Meta information are
taken from the :attr:`fields` dictionary
Parameters
----------
cons: float
Energy consumption
dist: float
Average distance between two individuals
entrop: float
Entropy
rss: float
Rank-Size Slope
cons_det: float
The deterministic energy consumption based on
:attr:`iucm.energy_consumption.weights_LeNechet`
cons_std: float
The standard deviation of the energy consumption
left_over: float
The left over inhabitants that could not be subtracted in the last step
nscenarios: float
The number of scenarios that have been changed during this step
See Also
--------
Output2D
PopulationModel.state
PopulationModel.allocate_output
""")
#: Meta information for the variables in the 2D-output of the
#: :class:`PopulationModel`
fields2D = fields.copy()
fields2D['scenarios'] = {'long_name': 'Scenario identifier'}
Output2D = utils.append_doc(
namedtuple('Output2D', list(fields2D)),
"""The 2D state variables of the model
2D output variables. Each variable corresponds to the same variable as in
0-d :class:`Output` objects, but instead of saving only the best state of
the model, this output saves all scenarios
Parameters
----------
cons: np.ndarray of dtype float
Energy consumption
dist: np.ndarray of dtype float
Average distance between two individuals
entrop: np.ndarray of dtype float
Entropy
rss: np.ndarray of dtype float
Rank-Size Slope
cons_det: np.ndarray of dtype float
The deterministic energy consumption based on
:attr:`iucm.energy_consumption.weights_LeNechet`
cons_std: np.ndarray of dtype float
The standard deviations within each probabilistic scenario
left_over: float
The left over inhabitants that could not be subtracted in the last step
nscenarios: float
The weight of each scenario that has been used during this step
scenarios: np.ndarray of dtype int
The number of the scenario
See Also
--------
Output
PopulationModel.state2d""")
[docs]class PopulationModel(object):
"""
Class that runs and manages the population model
This class represents one instance of the model for one experiment and is
responsible for all the computation, parallelization and input-output
coordination.
The major important features of this class are
:meth:`from_da` method
A classmethod to construct the model from a :class:`xarray.DataArray`
:meth:`step` method
The method that is brings the model to the next step
:attr:`init_step_methods` and :attr:`update_methods`
The available update methods how to bring the model to the next change
:attr:`selection_methods`
The available selection methods that define the available scenarios
:attr:`update_methods`
The available update methods that define how the population change
for the given selection is pursued
:attr:`state` attribute
The current state of the model. It's an instance of the :class:`Output`
class containing all 1D-variables of the current :attr:`data` attribute
Most of the other routines are related to input/output and parallelization
"""
@property
def dist(self):
"""Denumerator of average distance between two individuals"""
return self.state.dist
@property
def consumption(self):
"""Energy consumption"""
return self.state.cons
@property
def population_change(self):
"""The population change during this time step"""
return self._change[self.total_step]
@property
def movement(self):
"""The population movement during this time step"""
return self._movement[self.total_step]
@property
def total_change(self):
"""The total population change during this time step"""
return self.movement + self.population_change + self.left_over
@property
def state(self):
"""The state as a namedtuple. You may set it with an iterable defined
by the :attr:`Output` class"""
return self._state
@state.setter
def state(self, value):
self._state = Output(*value)
@property
def state2d(self):
"""The different values from the :attr:`state` of the model for each
of the scenarios"""
return self._state2d
@state2d.setter
def state2d(self, value):
self._state2d = Output2D(*value)
@property
def state_dict(self):
"""The state as a dictionary. Mapping from state variable to the
corresponding value. You may also set it with a dictionary"""
return self._state._asdict()
@state_dict.setter
def state_dict(self, kwargs):
self._state = Output(**kwargs)
@property
def state2d_dict(self):
"""The state as a dictionary. Mapping from state variable to the
corresponding value. You may also set it with a dictionary"""
return self._state2d._asdict()
@state2d_dict.setter
def state2d_dict(self, kwargs):
self._state2d = Output(**kwargs)
@property
def left_over(self):
"""Left over population that could not have been subtracted for within
the :meth:`PopulationModel.value_update` method and should be
considered in the next step"""
return self.state.left_over
#: Left over population that could not have been subtracted for within the
#: :meth:`PopulationModel.value_update` method
_left_over = 0
#: :class:`int`. The number of cells that are modified for one scenario
ncells = 4
#: The current step since the last output. -1 means, output has just been
#: written or the model has been initialized
current_step = -1
#: The current step since the initialization of the model. -1 means, the
#: model has been initialized
total_step_run = -1
#: The absolute current step in case the model has been restarted. If -1,
#: the model has not yet been started
total_step = -1
#: np.ndarray. The current simulation data of the model
data = None
#: np.ndarray. The x-coordinates of the points in :attr:`data`
x = None
#: np.ndarray. The y-coordinates of the points in :attr:`data`
y = None
#: np.ndarray. The positions of the points in :attr:`data` that can be
#: modified
data2modify = None
#: Attribute that is either False or the name of the output file where the
#: model data just has been written to
output_written = False
#: The selection method from :attr:`selection_methods` we use to define the
#: scenarios
select = None
#: The step initialization method from :attr:`init_step_methods` we use to
#: define the scenarios
init_step = None
#: The update method from :attr:`update_methods` we use to compute that
#: changes for each scenario
update = None
@docstrings.get_sectionsf('PopulationModel')
def __init__(self, data, x, y, selection_method='consecutive',
update_method='categorical', ncells=4, categories=None,
state=None, forcing=None, probabilistic=0, max_pop=None,
use_pctls=False, last_step=0, data2modify=None):
"""
Parameters
----------
data: np.ndarray
The 1D-data array (without NaN!) of the model
x: np.ndarray
The x-coordinates in *km* of each point in `data` (same shape as
`data`)
y: np.ndarray
The y-coordinates in *km* of each point in `data` (same shape as
`data`)
selection_method: { 'consecutive' | 'random' }
The avaiable selection scenarios (see :attr:`selection_methods`)
update_method: { 'categorical' | 'random' | 'forced' }
The avaiable update methods (see :attr:`update_methods`)
ncells: int
The number of cells that shall be modified for one scenario. The
higher the number, the less computationally expensive is the
computation
categories: list of str
The categories to use. If `update_method` is ``'categorical'``, it
describes the categories and if `use_pctls` is True, it the
each category is interpreted as a quantile in `data`
state: list of float
The current state of `data`. Must be a list corresponding to the
:class:`Output` class
forcing: xarray.Dataset
The input dataset for the model containing variables with
population evolution information. Possible variables in the netCDF
file are *movement* containing the number of people to move and
*change* containing the population change (positive or negative)
probabilistic: int or tuple
The number of probabilistic scenarios. For each scenario the energy
consumption is calculated and the final population is distributed
to the cells with the ideal energy consumption. Set this to 0 to
only use the weights by [LeNechet2012]_. If tuple, then they are
considered as the weights
max_pop: np.ndarray
A 1d-array with the maximum population for each cell in `data`. If
None, the last value in `categories` will be used
use_pctls: bool
If True, values given in `categories` are interpreted as quantiles
last_step: int
If the model is restarted, the total number of already made steps
(see :attr:`total_step` attribute)
data2modify: np.ndarray
The indices of points in `data` which are allowed to be modified.
If None, all points are allowed to be modified
See Also
--------
from_da: A more convenient initialization method using a xarray.Dataset
"""
self.data = data
self.x = x
self.y = y
if data2modify is not None:
data2modify = data2modify.astype(np.int64)
self.data2modify = data2modify
self.selection_method = selection_method
# set the maximum population
max_pop = np.asarray(max_pop)
if max_pop.ndim == 0:
if not max_pop and categories is None:
raise ValueError(
"The categories must be specified if max_pop is None!")
elif not max_pop: # max_pop is None
max_pop = categories[-1]
else:
categories = np.append(categories, max_pop)
self.max_pop = np.empty_like(data)
self.max_pop[:] = max_pop
else:
assert max_pop.shape == data.shape, (
"Shape of max_pop must %r equal the shape of the data %r" % (
max_pop.shape, data.shape))
self.max_pop = max_pop
if use_pctls:
categories = np.percentile(
self.data[~np.isnan(self.data) & (self.data > 0)],
categories)
self.categories = np.asarray(categories)
try:
self.select = self.selection_methods[selection_method]
except KeyError:
raise ValueError(
"Unknown selection method %s! Possibilities are {%s}" % (
selection_method, ', '.join(self.selection_methods)))
self.ncells = ncells
self.update_method = update_method
try:
self.update = self.update_methods[update_method]
except KeyError:
raise ValueError(
"Unknown update method %s! Possibilities are {%s}" % (
update_method, ', '.join(self.update_methods)))
self.init_step = self.init_step_methods[update_method]
if update_method == 'forced' and forcing is None:
raise ValueError(
"Need forcing information for forced update method! Please "
"use the forcing keyword!")
# determine the weights
self.nprobabilistic = probabilistic
if probabilistic:
if update_method != 'forced':
raise ValueError(
"Can only do probabilistic runs for a forced model! "
"Set probabilistic to 0 or update_method to 'forced'!")
self.weights = en.EnVariables(
*(np.zeros(probabilistic) for var in en.EnVariables._fields))
else: # use the weights from LeNechet, 2012
self.weights = en.EnVariables(
*np.reshape(en.weights_LeNechet,
(len(en.EnVariables._fields), 1)))
# set the state
if state is None:
# calculate data
logger.debug('Calculating state')
en_info = en.energy_consumption(self.data, self.x, self.y,
weights=self.weights)
self.state = Output(
en_info[0].mean(), *en_info[1:],
cons_det=en._calculate_en(en_info[1:]),
cons_std=en_info[0].std(), left_over=0, nscenarios=0)
logger.debug('Done')
else:
self.state = state
self.state2d = [np.zeros_like(self.data) for _ in range(
len(self.state) + 1)]
for arr in self.state2d:
arr[:] = np.nan
self.forcing = forcing
if forcing is not None:
self._change = forcing.variables['change'].values
self._movement = forcing.variables['movement'].values
self.current_step = -1
self.total_step = last_step - 1
self.total_step_run = -1
[docs] @docstrings.get_sectionsf('PopulationModel.initialize_model')
@docstrings.dedent
def initialize_model(self, da, dsi, ofiles, osteps, mask=None):
"""
Initialize the model on the I/O processor
Parameters
----------
data: xr.DataArray
The dataarray during the initialization
dsi: xr.Dataset
The base dataset of the `da`
ofiles: list of str
The name of the output files
osteps: list of int
Steps when to make the output
mask: np.ndarray
A boolean array that maps from the :attr:`data` attribute into the
2D output data array
"""
if isinstance(ofiles, six.string_types):
ofiles = [ofiles]
ofiles = iter(ofiles)
try:
osteps = iter(osteps)
except TypeError:
osteps = iter([osteps])
self._ofiles = ofiles
self._osteps = osteps
self._next_ostep = next(self._osteps)
if mask is None:
mask = np.zeros(da.shape, dtype=bool)
self.mask = mask
self.allocate_output(da, steps=self._next_ostep, dsi=dsi)
docstrings.delete_params('PopulationModel.parameters',
'data', 'x', 'y', 'data2modify')
docstrings.delete_params('PopulationModel.initialize_model.parameters',
'mask')
[docs] @classmethod
@docstrings.dedent
def from_da(cls, da, dsi, ofiles=None, osteps=None,
coord_transform=1, **kwargs):
"""
Construct the model from a :class:`psyplot.data.InteractiveArray`
Parameters
----------
%(PopulationModel.initialize_model.parameters.no_mask)s
coord_transform: float
The transformation factor to transform the coordinate values into
kilometres
Other Parameters
----------------
%(PopulationModel.parameters.no_data|x|y|data2modify)s
Returns
-------
PopulationModel
The model created ready to use"""
def get_val(name):
var = dsi.variables[name]
return var.values[() if var.ndim == 0 else -1]
current = da if da.ndim < 3 else da[-1]
data = current.values.ravel().copy()
x = current.coords[current.dims[-1]].values
y = current.coords[current.dims[-2 if current.ndim > 1 else -1]].values
if x.shape != data.shape:
x, y = [arr.ravel() for arr in np.meshgrid(x, y)]
# apply the mask to filter out NaN values
mask = ~np.isnan(data)
data = data[mask].ravel().astype(np.float64)
x = x[mask.ravel()]
y = y[mask.ravel()]
if not set(fields).difference(dsi.variables):
state = map(get_val, fields)
else:
state = None
# check for values that should not be modified
if 'mask' in dsi.variables and 'data2modify' not in kwargs:
modi_mask = dsi.mask.values.ravel()[mask].astype(bool)
data2modify = np.arange(data.size, dtype=np.int64)[~modi_mask]
kwargs['data2modify'] = data2modify
if 'max_pop' in dsi.variables and not kwargs.get('max_pop'):
kwargs['max_pop'] = dsi.variables['max_pop'].values.ravel()[
mask.ravel()]
ret = cls(data, x * coord_transform, y * coord_transform, state=state,
**kwargs)
# initialize the model and allocate the output
if ofiles is not None:
ret.initialize_model(da, ofiles=ofiles, osteps=osteps, dsi=dsi,
mask=mask.reshape(current.shape))
return ret
[docs] def best_scenario(self, all_slices, all_indices):
"""
Compute the best scenario
This method computes the best scenario for the given scenarios defined
through the given `slices` and `indices`
Parameters
----------
slices: list of ``None``, :class:`slice` or boolean arrays
The slicing objects for each scenario that allow us to create a
view of the :attr:`data` attribute that we modify in place. If list
of ``None``, it is computed using `indices`
indices: list of list of :class`int`
The numpy array containing the integer position in :attr:`data` of
the cells modified for each scenario
Returns
-------
1-dim np.ndarray of dtype float
The consumptions of the best scenarios for each set of weights used
2-dim np.ndarray of dtype float with shape ``(nprob, len(self.state))``
The state of the best scenario for each probabilistic scenario
which can be used for the :attr:`state` attribute.
list of :class:`slice` or boolean array
The slicing object from `slices` that corresponds to the best
scenario and can be used to create a view on the :attr:`data`
list of list of float
The numbers of the modified cells for the best scenario
list of 2d-np.ndarrays
The 2d variables of the :attr:`state2d` attribute
"""
self.current_step += 1
self.total_step += 1
self.total_step_run += 1
data = self.data
x = self.x
y = self.y
update = self.update
to_slice = self._bool_or_slice
dist = self.dist
nprobabilistic = self.nprobabilistic or 1
consumptions = np.empty((len(all_slices), nprobabilistic))
consumptions.fill(np.inf)
others = np.empty((len(all_slices), len(self.state) - 1))
ret_cells = [[] for _ in range(len(all_slices))]
state2d = self.state2d
weights = self.weights
i_left_over = state2d._fields.index('left_over') - 1
i_cons_std = state2d._fields.index('cons_std') - 1
i_cons_det = state2d._fields.index('cons_det') - 1
for arr in state2d:
arr[:] = np.nan
# loop through the scenarios
for i, (sl, ind) in enumerate(zip(all_slices, all_indices)):
all_slices[i] = sl = to_slice(data, sl, ind)
#: store the old values
old = data[sl].copy()
#: The cells that are modified for this scenario
cell_values = update(old.copy(), sl)
#: compute the energy consumption for this scenario if (if we have
#: any changes to the initial state)
if (old != cell_values).any():
en_info = en.energy_consumption(
data, x, y, indices=ind, increase=cell_values - old,
dist0=dist, slicer=sl, weights=weights)
consumptions[i] = en_info[0]
others[i][:len(en_info) - 1] = en_info[1:]
others[i][i_cons_det] = en._calculate_en(en_info[1:])
others[i][i_cons_std] = en_info[0].std()
others[i][i_left_over] = self._left_over
ret_cells[i] = cell_values
state2d[0][sl] = consumptions[i].mean()
for j, val in enumerate(others[i], 1):
state2d[j][sl] = val
state2d.scenarios[sl] = i
# get the index of the scenario with the least energy consumption
argmins = consumptions.argmin(axis=0)
others[:, state2d._fields.index('nscenarios') - 1] = np.unique(
argmins).size
return (
# the consumptions (1-d array)
consumptions[argmins, np.arange(nprobabilistic)],
# the other state variables
others[argmins],
# the slicers
[all_slices[i] for i in argmins],
# the cell values
[ret_cells[i] for i in argmins],
# the 2d state
state2d)
[docs] def step(self):
"""
Bring the model to the next step and eventually write the output
This method is the core of the entire :class:`PopulationModel` API
connecting the necessary functions to compute the next best scenario.
The general structure is
1. initialize the step (see :attr:`init_step_methods`)
2. define the scenarios (see :attr:`selection_methods`)
3. choose the best scenario (see :attr:`best_scenario` and
:attr:`update_methods`)
4. write the output (see :meth:`write` method)
Depending on whether the :meth:`start_processes` method has been called
earlier, this is either done serial or in parallel"""
self.output_written = False
# ---- initialize the step
current = self.data.copy()
init_sl, indices = self.init_step()
if init_sl is not None:
en_info = en.energy_consumption(
self.data, self.x, self.y, indices=indices,
dist0=self.dist, slicer=init_sl,
increase=self.data[init_sl] - current[init_sl],
weights=self.weights)
self.state = Output(en_info[0].mean(), *en_info[1:],
cons_det=en._calculate_en(en_info[1:]),
cons_std=en_info[0].std(),
left_over=self._left_over, nscenarios=0)
nprobabilistic = self.nprobabilistic
# ---- define the scenarios
all_slices, all_indices = self.select()
if nprobabilistic:
en.random_weights(self.weights)
else:
nprobabilistic = 1
if self.procs is None:
# ---- get the best scenario in serial
consumptions, others, sl, changed_values, self.state2d = \
self.best_scenario(all_slices, all_indices)
nscenarios = others[
0, self.state2d._fields.index('nscenarios') - 1]
else:
# ---- get the best scenario in parallel
self.current_step += 1
self.total_step += 1
self.total_step_run += 1
for conn in self.parent_conns:
conn.send(['update', init_sl,
self.data[init_sl] if init_sl is not None else None,
self.state, self.weights])
conn.recv()
nprocs = self.nprocs
splitted_slices = np.array_split(all_slices, nprocs)
splitted_indices = np.array_split(all_indices, nprocs)
for i, (conn, slices, indices) in enumerate(zip(
self.parent_conns, splitted_slices, splitted_indices)):
if len(slices):
conn.send(['best', slices, indices])
else:
nprocs = i
break
#: The states of the model of the best scenario for each process
all_consumptions = np.zeros((nprocs, nprobabilistic))
all_others = np.zeros((nprocs, nprobabilistic,
len(self.state) - 1))
#: The slicers for the best scenario for each process
proc_slices = [[0] * nprobabilistic] * nprocs
#: The values of the changed cells of the best scenario for each
#: process
all_changed = [
[[] for _ in range(nprobabilistic)] for _ in self.procs]
# ---- receive results
state2d = self.state2d
for arr in state2d:
arr[:] = np.nan
for i, conn in zip(range(nprocs), self.parent_conns):
all_consumptions[i], all_others[i], proc_slices[i], \
all_changed[i], st2d = conn.recv()
mask = ~np.isnan(st2d.scenarios)
for j, arr in enumerate(st2d):
state2d[j][mask] = arr[mask]
# add the number of previous scenarios to the one of the
# current process `i` which doesn't see the other processes
if i:
state2d.scenarios[mask] += sum(map(len,
splitted_indices[:i]))
# choose the best scenario from all processes
argmins = all_consumptions.argmin(axis=0)
consumptions = all_consumptions[argmins, np.arange(nprobabilistic)]
others = all_others[argmins]
sl = [proc_slices[j][i] for i, j in enumerate(argmins)]
changed_values = [all_changed[j][i] for i, j in enumerate(argmins)]
nscenarios = np.unique(argmins).size
if self.nprobabilistic >= 2:
self.state, sl, changed_values = self.distribute_probabilistic(
sl, nscenarios)
else:
self.state = chain(consumptions, np.squeeze(others))
sl = sl[0]
changed_values = changed_values[0]
self.state2d.nscenarios[:] = 0.0
self.state2d.nscenarios[sl] = 1.0
if len(changed_values):
self.data[sl] = changed_values
# ---- send the changes to the new processes
if self.procs is not None:
for conn in self.parent_conns:
conn.send(['update', sl, changed_values, self.state])
conn.recv()
# ---- store the current state and data of the model
self.write()
[docs] def distribute_probabilistic(self, slices, nscenarios):
"""
Redistribute the population increase to the best scenarios
This method distributes the population changes to the cells that have
been computed as the best scenarios. It takes the input of the
:meth:`best_scenario` method
Parameters
----------
slices: list
The slicers of the best scenarios"""
change_per_cell = self.total_change / len(slices)
scenario_fract = 1. / len(slices)
left_over = 0
arr = self.data.copy()
changed_cells = np.zeros_like(arr, dtype=np.bool)
nscenarios_2d = self.state2d.nscenarios
nscenarios_2d[:] = 0
for sl in slices:
arr[sl] = self.value_update(arr[sl], sl, change_per_cell)
changed_cells[sl] = True
left_over += self._left_over
nscenarios_2d[sl] += scenario_fract
if left_over: # redistribute the left_over to the other cells
for sl in slices:
self.value_update(arr[sl], sl, left_over)
left_over = self._left_over
if not left_over:
break
en_info = en.energy_consumption(
arr, self.x, self.y, dist0=self.dist,
increase=arr[changed_cells] - self.data[changed_cells],
slicer=changed_cells,
indices=np.arange(len(arr))[changed_cells], weights=self.weights)
return (
# the state
Output(en_info[0].mean(), *en_info[1:],
cons_det=en._calculate_en(en_info[1:]),
cons_std=en_info[0].std(),
left_over=self._left_over, nscenarios=nscenarios),
# the slicer
changed_cells,
# the changed cells
arr[changed_cells])
# -------------------------------------------------------------------------
# --------------------------- First step part -----------------------------
# ---------------------------- Initialization -----------------------------
# -------------------------------------------------------------------------
@property
def init_step_methods(self):
"""Mapping from *update_method* name to the corresponding init function
This property defines the init_step methods. Those methods are called
at the beginning of each step on the main processor (I/O-processor).
Each init_step method must accept no arguments and return a tuple with
- an :attr:`slice` object or boolean array containing the information
where the data changed
- the indices of the cells in :attr:`data` that changed"""
return {'categorical': _nothing,
'random': _nothing,
'forced': self.subtract_random}
[docs] def subtract_random(self):
"""Subtract the people moving during this timestep"""
movement = self.movement
if movement == 0.0:
return None, None
data = self.data
changed = 0.0
sl = np.zeros_like(data, dtype=np.bool)
for i in np.random.permutation(np.arange(data.size, dtype=int)):
cell = data[i]
if not np.isnan(cell):
cell_change = cell * np.random.rand()
data[i] -= cell_change
sl[i] = True
changed += cell_change
if changed >= movement:
break
return sl, np.arange(len(data), dtype=np.int64)[sl]
# -------------------------------------------------------------------------
# --------------------------- Second step part ----------------------------
# ------------------------------- Selection -------------------------------
# -------------------------------------------------------------------------
@property
def selection_methods(self):
"""Mapping from *selection_method* name to the corresponding function
This property defines the selection methods. Those methods are called
at the beginning of each step on the main processor (I/O-processor).
Each selection_step method must accept no arguments and return a tuple
with
- an :attr:`slice` object or boolean array containing the information
where the data changed. Alternatively it can be a list of ``None``
and those slicing objects will be computed from the second argument
- a 2D list of dtype integer containing the indices of the cells that
for changed for the corresponding scenario"""
return {'consecutive': self.consecutive_selection,
'random': self.random_selection}
[docs] def consecutive_selection(self):
total_cells = self.data.size
ncells = self.ncells
if self.data2modify is None:
slices = [slice(i, i + ncells) for i in range(
0, self.data.size, ncells)]
indices = [np.arange(i, min(i + ncells, total_cells))
for i in range(0, total_cells, ncells)]
else:
arr = self.data2modify
indices = [
arr[i:i + ncells] for i in range(0, arr.size, ncells)]
slices = [None] * len(indices)
return slices, indices
[docs] def random_selection(self):
if self.data2modify is None:
indices = np.random.permutation(self.data.size)
else:
indices = np.random.permutation(self.data2modify)
ncells = self.ncells
indices = [
indices[i:i + ncells] for i in range(0, indices.size, ncells)]
return [None] * len(indices), indices
# -------------------------------------------------------------------------
# ---------------------------- Third step part ----------------------------
# -------------------------------- Update ---------------------------------
# -------------------------------------------------------------------------
@property
def update_methods(self):
"""Mapping from *update_method* name to the corresponding function
This property defines the update methods. Each update method must
accept and return a 1D numpy array of dtype float64 containing a view
of the :attr:`data` attribute. The data must be modified in place!"""
return {'categorical': self.categorical_update,
'random': self.randomized_update,
'forced': self.value_update}
[docs] def value_update(self, cell_values, slicer, remaining=None):
"""Change the cells by using the forcing
This update method changes the given cells based upon the
:attr:`movement` information and the :attr:`population_change`
information from the :attr:`forcing` dataset"""
if remaining is None:
remaining = self.total_change
max_pop = self.max_pop[slicer]
n = cell_values.size
popsum = cell_values.sum()
final_pop = max(popsum + remaining, 0)
if remaining < 0 or final_pop <= max_pop.sum() * n:
if n == 1:
cell_values += remaining
if cell_values[0] < 0:
remaining = cell_values[0]
cell_values[:] = 0
else:
remaining = 0
else:
for i, cell in enumerate(cell_values):
# the maximum the current population inside the cell and
# otherwise the corresponding fraction at the popsum
change = max(-cell_values[i], min(
0 if popsum == 0 else (
remaining * (popsum - cell) / (
popsum * (n - 1))),
max_pop[i] - cell))
remaining -= change
cell_values[i] += change
self._left_over = remaining
return cell_values
[docs] def categorical_update(self, cell_values, slicer):
"""Change the values through an update to the next category
This method increases the population by updating the cells to the next
(possible) category"""
mask = ~np.isnan(cell_values)
categories = self.categories
max_pop = self.max_pop[slicer]
indices = categories.searchsorted(cell_values[mask]) + 1
cell_values[mask] += (
categories[indices.clip(0, len(categories) - 1)] -
categories[(indices-1).clip(0, len(categories) - 1)])
cell_values[mask] = np.c_[cell_values[mask], max_pop[mask]].min(axis=1)
return cell_values
[docs] def randomized_update(self, cell_values, slicer):
"""Change the values through an update to a number within the next
category
This method increases the population by updating the cells to a random
value within the next (possible) category"""
def get_random(i):
return np.random.randint(categories[i], categories[i + 1])
categories = self.categories
max_pop = self.max_pop[slicer]
mask = ~np.isnan(cell_values)
indices = categories.searchsorted(cell_values[mask]) + 1
cell_values[mask] = list(map(get_random, indices.clip(
0, len(categories) - 2)))
cell_values[mask] = np.c_[cell_values[mask], max_pop[mask]].min(axis=1)
return cell_values
# -------------------------------------------------------------------------
# ------------------------- Parallel Processing ---------------------------
# -------- Necessary parts for letting the model run in parallel ----------
# -------------------------------------------------------------------------
#: :class:`multiprocessing.Process`. The processes of this model
procs = None
@property
def nprocs(self):
""":class:`int`. The number of processes started for this model"""
return len(self.procs or [])
[docs] def start_processes(self, nprocs):
"""Start `nprocs` processes for the model"""
import multiprocessing as mp
logger.debug('Start %i processes', nprocs)
parent_conns = []
child_conns = []
for i in range(nprocs):
parent_conn, child_conn = mp.Pipe()
parent_conns.append(parent_conn)
child_conns.append(child_conn)
procs = [
mp.Process(target=self, args=(conn, )) for conn in child_conns]
for proc in procs:
proc.daemon = True
proc.start()
self.procs = procs
self.parent_conns = parent_conns
[docs] def stop_processes(self):
"""Stop the processes for the model"""
for conn in self.parent_conns:
conn.send(None)
def __call__(self, conn):
logger.debug('Enter loop with %s and %s', self.consumption,
self.dist)
received = True
try:
while received:
received = conn.recv()
if received:
try:
conn.send(self._step_methods[received[0]](
*received[1:]))
except Exception as e:
conn.close()
raise e
else:
logger.debug('Stopping processes with %s and %s',
self.consumption, self.dist)
except EOFError: # the connection has been closed
pass
@property
def _step_methods(self):
"""The methods that are called during one :meth:`step` to synchronize
and manage the different processes"""
return {'best': self.best_scenario,
'update': self.sync_state}
[docs] def sync_state(self, sl, cell_values, state=None, weights=None):
"""
Synchronize the :attr:`data` attribute between the processes
This method is called to synchronize the states of the model in the
different processes
Parameters
----------
sl: :class:`slice` or boolean np.ndarray
The slicer that can be used to create a view on the :attr:`data`
cell_values: np.ndarray of dtype float
The values of the cells described by `sl`
state: Output
The state of the model
"""
if sl is not None:
self.data[sl] = cell_values
if state is not None:
self.state = state
if weights is not None:
self.weights = weights
return
def __reduce__(self):
return self.__class__, (
self.data, self.x, self.y, self.selection_method,
self.update_model, self.ncells, self.categories,
self.state, self.forcing, self.nprobabilistic, self.max_pop), dict(
total_step=self.total_step, data2modify=self.data2modify,
weights=self.weights)
# -------------------------------------------------------------------------
# ------------------------------- I/O Part --------------------------------
# ----- Necessary parts for Input/Output (only called on main process) ----
# -------------------------------------------------------------------------
#: :class:`xarray.Dataset`. The output dataset
_dso = None
#: Flag that is True if the data was written to a file during the last
#: step
output_written = False
#: :class:`int`. The step (corresponding to :attr:`current_step`) when the
#: next output is written
_next_ostep = None
[docs] def allocate_output(self, da, dsi, steps):
"""Create the dataset for the output
Parameters
----------
da: psyplot.data.IneractiveArray
The input data for the model
dsi: xarray.Dataset
The dataset `data` belongs to. If None, the
:attr:`psyplot.data.InteractiveArray.base` attribute is used
steps: int
The number of steps
"""
def empty_2d():
ret = np.empty_like(odata)
return ret
logger.debug('Allocating new output dataset...')
dso = da.to_dataset()[list(da.coords)]
forcing = self.forcing
if forcing is not None:
self._tname = tname = forcing.movement.dims[0]
current_step = self.total_step + 1
time = forcing.coords[tname][current_step:current_step + steps]
steps = len(time)
else:
tname = None
if da.ndim == 2:
odata = np.zeros([steps] + list(da.shape[-2:]))
if tname is None:
self._tname = tname = 'time'
time = xr.Variable(('time', ), range(1, steps + 1))
dims = [tname] + list(da.dims)
else:
odata = np.zeros([steps] + list(da.shape[-2:]))
dims = da.dims
if tname is None:
last_time = dso[dims[0]][-1]
self._tname = tname = last_time.name
time = xr.concat([last_time + i for i in range(1, steps + 1)],
dim=last_time.name)
dso.drop(tname)
dso[tname] = time
dso[da.name] = vout = xr.Variable(dims, odata, attrs=da.attrs)
# create other output fields
vars2d = []
for key, meta in fields.items():
if key in dsi.variables:
meta = dsi.variables[key].attrs
dso[key] = xr.Variable((tname, ), np.zeros(steps), meta)
for key, meta in fields2D.items():
if key in dsi.variables:
meta = dsi.variables[key].attrs
vars2d.append(xr.Variable(dims, empty_2d(), meta))
dso[key + '_2d'] = vars2d[-1]
# create weights output field
dso['weights'] = v_weights = xr.Variable(
(tname, 'en_variables', 'probabilistic'),
np.zeros((len(time), len(self.weights), self.nprobabilistic or 1)),
{'long_name': 'Used energy consumption weights'})
dso['en_variables'] = xr.Variable(
('en_variables', ), np.array(self.weights._fields),
{'long_name': 'Variables to calculate the energy consumption'})
self._dso = dso
self._data_out = vout.values
self._state2d_out = self.state2d.__class__(*[v.values for v in vars2d])
self._weights_out = v_weights.values
self._vname = da.name
[docs] def write(self):
"""Write the current state to the output dataset"""
dso = self._dso
if dso is not None:
i = self.current_step
mask = self.mask
self._data_out[i, mask] = self.data
self._data_out[i, ~mask] = np.nan
for key, val in six.iteritems(self.state_dict):
dso[key][i] = val
for j, arr in enumerate(self.state2d):
self._state2d_out[j][i, mask] = arr
self._weights_out[i][:, :] = self.weights
if self._next_ostep is not None and i == self._next_ostep - 1:
self.write_output()
[docs] def write_output(self, complete=True):
"""Write the data to the next netCDF file
Parameters
----------
complete: bool
If True, write the complete dataset, otherwise only until the
current step"""
ofile = next(self._ofiles)
logger.info('Saving to %s...', ofile)
if not complete:
dso = self._dso.isel(**{
self._tname: slice(0, self.current_step + 1)})
else:
dso = self._dso
psyd.to_netcdf(dso, ofile)
self._next_ostep = next(self._osteps)
self.allocate_output(dso[self._vname], dsi=dso, steps=self._next_ostep)
self.current_step = -1
self.output_written = ofile
docstrings.keep_params('PopulationModel.initialize_model.parameters',
'data', 'dsi')
# -------------------------------------------------------------------------
# ------------------------------ Miscallaneous ----------------------------
# -------------------------------------------------------------------------
@classmethod
def _bool_or_slice(cls, arr, slicer, indices):
"""Transform the given indices to a slice object or boolean array
Convenience method such that we do not have to give share large boolean
arrays with other processes
Parameters
----------
arr: np.ndarray
The data whose shape to use
slicer: :class:`slice` object, boolean numpy.ndarray or None
If None, a boolean array will be created using `indices and `data`.
Otherwise, the given `slicer` is returned
indices: np.ndarray of dtype int
The indices of the cells that changed in `arr`
Returns
-------
:class:`slice` object, boolean numpy.ndarray
The `slicer` that can be used to create a view on `arr` for the
given `indices`"""
if slicer is None:
arr_bool = np.zeros(arr.shape, dtype=bool)
for i in indices:
arr_bool[i] = True
return arr_bool
return slicer