Source code for nengo.builder.signal

from io import StringIO

import numpy as np

from nengo.exceptions import SignalError
from nengo.rc import rc
from nengo.transforms import SparseMatrix
import nengo.utils.numpy as npext


[docs]def is_sparse(obj): """Check if ``obj`` is a sparse matrix.""" return isinstance(obj, SparseMatrix) or npext.is_spmatrix(obj)
[docs]class Signal: """Represents data or views onto data within a Nengo simulation. Signals are tightly coupled to NumPy arrays, which is how live data is represented in a Nengo simulation. Signals provide a view onto the important metadata of the live NumPy array, and maintain the original value of the array in order to reset the simulation to the initial state. Parameters ---------- initial_value : array_like The initial value of the signal. Much of the metadata tracked by the Signal is based on this array as well (e.g., dtype). name : str, optional Name of the signal. Primarily used for debugging. If None, the memory location of the Signal will be used. base : Signal, optional The base signal, if this signal is a view on another signal. Linking the two signals with the ``base`` argument is necessary to ensure that their live data is also linked. readonly : bool, optional Whether this signal and its related live data should be marked as readonly. Writing to these arrays will raise an exception. offset : int, optional For a signal view this gives the offset of the view from the base ``initial_value`` in bytes. This might differ from the offset of the NumPy array view provided as ``initial_value`` if the base is a view already (in which case the signal base offset will be 0 because it starts where the view starts. That NumPy view can have an offset of itself). """ # Set assert_named_signals True to raise an Exception # if model.signal is used to create a signal with no name. # This can help to identify code that's creating un-named signals, # if you are trying to track down mystery signals that are showing # up in a model. assert_named_signals = False def __init__( self, initial_value=None, shape=None, name=None, base=None, readonly=False, offset=0, ): if self.assert_named_signals: assert name is not None self._name = name if initial_value is None: assert shape is not None initial_value = np.zeros(shape, dtype=rc.float_dtype) elif shape is not None: assert initial_value.shape == shape self._initial_value = initial_value if np.any( np.isnan(self._initial_value.data if self.sparse else self._initial_value) ): raise SignalError("%r contains NaNs." % self) if self.sparse: assert initial_value.ndim == 2 assert offset == 0 assert base is None if npext.is_spmatrix(initial_value): self._initial_value.data.setflags(write=False) else: # To ensure we do not modify data passed into the signal, # we make a view of the data and mark it as not writeable. # Consumers (like SignalDict) are responsible for making copies # that can be modified, or using the readonly view appropriately. readonly_view = np.asarray(self._initial_value) if readonly_view.ndim > 0 and base is None: readonly_view = np.ascontiguousarray(readonly_view) # Ensure we have a view and aren't modifying the original's flags readonly_view = readonly_view.view() readonly_view.setflags(write=False) self._initial_value = readonly_view if base is not None: assert isinstance(base, Signal) and not base.is_view # make sure initial_value uses the same data as base.initial_value assert initial_value.base is base.initial_value.base self._base = base self._offset = offset self._readonly = bool(readonly) def __getstate__(self): state = dict(self.__dict__) if not self.sparse: # For normal arrays, the initial value could be a view on another # signal's data. To make sure we do not make a copy of the data, # we store the underlying metadata in a tuple, which pickle can # inspect to see that v.base is the same in different signals # and avoid serializing it multiple times. v = self._initial_value state["_initial_value"] = ( v.shape, v.base, npext.array_offset(v), v.strides, ) return state def __setstate__(self, state): for k, v in state.items(): setattr(self, k, v) if not self.sparse: shape, base, offset, strides = self._initial_value self._initial_value = np.ndarray( shape, buffer=base, dtype=base.dtype, offset=offset, strides=strides ) self._initial_value.setflags(write=False) def __getitem__(self, item): """Index or slice into array.""" if self.sparse: raise SignalError("Attempting to create a view of a sparse Signal") if item is Ellipsis or (isinstance(item, slice) and item == slice(None)): return self if not isinstance(item, tuple): item = (item,) if not all(npext.is_integer(i) or isinstance(i, slice) for i in item): raise SignalError("Can only index or slice into signals") if all(npext.is_integer(i) for i in item): # turn one index into slice to get a view from numpy item = item[:-1] + (slice(item[-1], item[-1] + 1),) view = self._initial_value[item] offset = npext.array_offset(view) - npext.array_offset(self._initial_value) return Signal( view, name="%s[%s]" % (self.name, item), base=self.base, offset=offset ) def __repr__(self): return "Signal(name=%s, shape=%s)" % (self._name, self.shape) @property def base(self): """(Signal) The base signal, if this signal is a view. Linking the two signals with the ``base`` argument is necessary to ensure that their live data is also linked. """ return self if self._base is None else self._base @property def dtype(self): """(numpy.dtype) Data type of the signal (e.g., float64).""" return self.initial_value.dtype @property def elemoffset(self): """(int) Offset of data from base in elements.""" return self.offset // self.itemsize @property def elemstrides(self): """(int) Strides of data in elements.""" return None if self.sparse else tuple(s // self.itemsize for s in self.strides) @property def initial_value(self): """(numpy.ndarray) Initial value of the signal. Much of the metadata tracked by the Signal is based on this array as well (e.g., dtype). """ return self._initial_value @initial_value.setter def initial_value(self, val): raise SignalError("Cannot change initial value after initialization") @property def is_view(self): """(bool) True if this Signal is a view on another Signal.""" return self._base is not None @property def itemsize(self): """(int) Size of an array element in bytes.""" return self.dtype.itemsize @property def name(self): """(str) Name of the signal. Primarily used for debugging.""" return self._name if self._name is not None else ("0x%x" % id(self)) @name.setter def name(self, name): self._name = name @property def nbytes(self): """(int) Number of bytes consumed by the signal.""" return self.itemsize * self.size @property def ndim(self): """(int) Number of array dimensions.""" return self.initial_value.ndim @property def offset(self): """(int) Offset of data from base in bytes. For a signal view this gives the offset of the view from the base ``initial_value`` in bytes. This might differ from the offset of the NumPy array view provided as ``initial_value`` if the base is a view already (in which case the signal base offset will be 0 because it starts where the view starts. That NumPy view can have an offset of itself). """ return self._offset @property def readonly(self): """(bool) Whether associated live data can be changed.""" return self._readonly @readonly.setter def readonly(self, readonly): self._readonly = bool(readonly) @property def shape(self): """(tuple) Tuple of array dimensions.""" return self.initial_value.shape @property def size(self): """(int) Total number of elements.""" return self.initial_value.size @property def strides(self): """(tuple) Strides of data in bytes.""" return None if self.sparse else self.initial_value.strides @property def sparse(self): """(bool) Whether the signal is sparse.""" return is_sparse(self.initial_value)
[docs] def may_share_memory(self, other): """Determine if two signals might overlap in memory. This comparison is not exact and errs on the side of false positives. See `numpy.may_share_memory` for more details. Parameters ---------- other : Signal The other signal we are investigating. """ return (self.is_view or other.is_view) and np.may_share_memory( self.initial_value, other.initial_value )
[docs] def reshape(self, *shape): """Return a view on this signal with a different shape. Note that ``reshape`` cannot change the overall size of the signal. See `numpy.reshape` for more details. Any number of integers can be passed to this method, describing the desired shape of the returned signal. """ if self.sparse: raise SignalError("Attempting to create a view of a sparse Signal") if len(shape) == 1: shape = shape[0] # in case a tuple is passed in initial_value = self.initial_value.view() try: # this raises AttributeError if cannot reshape without copying initial_value.shape = shape except AttributeError: raise SignalError( "Reshaping %s to %s would require the array to be copied " "(because it is not contiguous), which is not supported" % (self, shape) ) return Signal( initial_value, name="%s.reshape(%s)" % (self.name, shape), base=self.base, offset=self.offset, )
[docs]class SignalDict(dict): """Map from Signal -> ndarray. This dict subclass ensures that the ndarray values aren't overwritten, and instead data are written into them, which ensures that these arrays never get copied, which wastes time and space. Use ``init`` to set the ndarray initially. """ def __getitem__(self, key): try: return dict.__getitem__(self, key) except KeyError: if isinstance(key, Signal) and key.base is not key: # return a view on the base signal base = dict.__getitem__(self, key.base) return np.ndarray( buffer=base, dtype=key.dtype, shape=key.shape, offset=key.offset, strides=key.strides, ) else: raise def __setitem__(self, key, val): """Ensures that ndarrays stay in the same place in memory. Unlike normal dicts, this means that you cannot add a new key to a SignalDict using __setitem__. This is by design, to avoid silent typos when debugging Simulator. Every key must instead be explicitly initialized with SignalDict.init. """ self[key][...] = val def __str__(self): """Pretty-print the signals and current values.""" sio = StringIO() for k in self: sio.write("%s %s\n" % (repr(k), repr(self[k]))) return sio.getvalue()
[docs] def init(self, signal): """Set up a permanent mapping from signal -> data.""" if signal in self: raise SignalError("Cannot add signal twice") assert isinstance(signal, Signal) data = signal.initial_value if isinstance(data, SparseMatrix): data = data.allocate() if signal.is_view: if signal.base not in self: self.init(signal.base) # get a view onto the base data view = np.ndarray( shape=data.shape, strides=data.strides, offset=signal.offset, dtype=data.dtype, buffer=self[signal.base].data, ) assert np.array_equal(view, data) view.setflags(write=not signal.readonly) dict.__setitem__(self, signal, view) else: data = data if signal.readonly else data.copy() dict.__setitem__(self, signal, data)
[docs] def reset(self, signal): """Reset ndarray to the base value of the signal that maps to it.""" if not signal.readonly: self[signal] = signal.initial_value