Source code for nengo.builder.transforms

import numpy as np

from nengo.builder import Builder, Operator, Signal
from nengo.builder.operator import DotInc, ElementwiseInc, Reset, SparseDotInc
from nengo.exceptions import BuildError
from nengo.rc import rc
from nengo.transforms import Convolution, Dense, Sparse
from nengo._vendor.npconv2d import conv2d


[docs]def multiply(x, y): """Matrix-matrix multiply, interpreting vectors as diagonal matrices.""" if x.ndim <= 2 and y.ndim < 2: return x * y elif x.ndim < 2 and y.ndim == 2: return x.reshape(-1, 1) * y elif x.ndim == 2 and y.ndim == 2: return np.dot(x, y) else: raise BuildError( "Tensors not supported (x.ndim=%d, y.ndim=%d)" % (x.ndim, y.ndim) )
[docs]@Builder.register(Dense) def build_dense(model, transform, sig_in, decoders=None, encoders=None, rng=np.random): """Build a `.Dense` transform object.""" weights = transform.sample(rng=rng).astype(rc.float_dtype) if decoders is not None: weights = multiply(weights, decoders.astype(rc.float_dtype)) if encoders is not None: weights = multiply(encoders.astype(rc.float_dtype).T, weights) # Add operator for applying weights weight_sig = Signal(weights, readonly=True, name="%s.weights" % transform) weighted = Signal( shape=transform.size_out if encoders is None else weights.shape[0], name="%s.weighted" % transform, ) model.add_op(Reset(weighted)) op = ElementwiseInc if weights.ndim < 2 else DotInc model.add_op(op(weight_sig, sig_in, weighted, tag="%s.apply_weights" % transform)) return weighted, weight_sig
[docs]@Builder.register(Sparse) def build_sparse(model, transform, sig_in, decoders=None, encoders=None, rng=np.random): """Build a `.Sparse` transform object.""" if decoders is not None: raise BuildError( "Applying a sparse transform to a decoded connection is not supported" ) # Shouldn't be possible for encoders to be non-None, since that only # occurs for a connection solver with weights=True, and those can only # be applied to decoded connections (which are disallowed above) assert encoders is None # Add output signal weighted = Signal(shape=transform.size_out, name="%s.weighted" % transform) model.add_op(Reset(weighted)) weights = transform.sample(rng=rng) assert weights.ndim == 2 # Add operator for applying weights weight_sig = Signal(weights, name="%s.weights" % transform, readonly=True) model.add_op( SparseDotInc(weight_sig, sig_in, weighted, tag="%s.apply_weights" % transform) ) return weighted, weight_sig
[docs]@Builder.register(Convolution) def build_convolution( model, transform, sig_in, decoders=None, encoders=None, rng=np.random ): """Build a `.Convolution` transform object.""" if decoders is not None: raise BuildError( "Applying a convolution transform to a decoded " "connection is not supported" ) # Shouldn't be possible for encoders to be non-None, since that only # occurs for a connection solver with weights=True, and those can only # be applied to decoded connections (which are disallowed above) assert encoders is None weights = transform.sample(rng=rng) weight_sig = Signal(weights, readonly=True, name="%s.weights" % transform) weighted = Signal(shape=transform.size_out, name="%s.weighted" % transform) model.add_op(Reset(weighted)) model.add_op( ConvInc( weight_sig, sig_in, weighted, transform, tag="%s.apply_weights" % transform ) ) return weighted, weight_sig
[docs]class ConvInc(Operator): """Apply convolutional weights to input signal. .. versionadded:: 3.0.0 Parameters ---------- W : Signal The convolutional weights (a.k.a. the kernel). X : Signal The input signal. Y : Signal Output signal to be incremented. conv : `~nengo.Convolution` The Convolution object being applied. tag : str, optional A label associated with the operator, for debugging purposes. Attributes ---------- W : Signal The convolutional weights. X : Signal The input signal. Y : Signal Output signal to be incremented. conv : `~nengo.Convolution` The Convolution object being applied. tag : str, optional A label associated with the operator, for debugging purposes. Notes ----- 1. sets ``[]`` 2. incs ``[Y]`` 3. reads ``[W, X]`` 4. updates ``[]`` """ def __init__(self, W, X, Y, conv, tag=None): super().__init__(tag=tag) self.conv = conv self.sets = [] self.incs = [Y] self.reads = [W, X] self.updates = [] @property def W(self): return self.reads[0] @property def X(self): return self.reads[1] @property def Y(self): return self.incs[0] def _descstr(self): return "conv2d(%s, %s) -> %s" % (self.W, self.X, self.Y)
[docs] def make_step(self, signals, dt, rng): if self.conv.dimensions > 2: # note: we raise the error here, rather than earlier, because # other backends might support different convolutions raise NotImplementedError("Convolution > 2D not supported") W = signals[self.W] X = signals[self.X] Y = signals[self.Y] pad = self.conv.padding.upper() stride = self.conv.strides X = X.reshape(self.conv.input_shape.shape) Y = Y.reshape(self.conv.output_shape.shape) if not self.conv.channels_last: X = np.moveaxis(X, 0, -1) Y = np.moveaxis(Y, 0, -1) if self.conv.dimensions == 1: # add extra dimension to make it a 2D convolution X = X[None, :, :] W = W[None, :, :, :] Y = Y[None, :, :] stride = (1,) + stride # add empty batch dimension X = X[None, ...] def step_conv(): Y[...] += conv2d.conv2d(X, W, pad=pad, stride=stride)[0] return step_conv