Skip to content

Commit

Permalink
refactor/sync: convert to the new memory management API
Browse files Browse the repository at this point in the history
Use .read()/.write_only()/.read_write() instead of .sync()/.add_device()/.get()
calls.

REFERENCE: autumnai/collenchyma#37, autumnai/collenchyma#62
  • Loading branch information
alexandermorozov committed Apr 30, 2016
1 parent 37d1994 commit e20fc95
Show file tree
Hide file tree
Showing 17 changed files with 126 additions and 210 deletions.
114 changes: 14 additions & 100 deletions src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ impl<B: IBackend> Layer<B> {
}

let backend: Rc<IBackend<F=B::F>> = self.backend.clone();
blob_data = Arc::new(RwLock::new(SharedTensor::new(backend.device(), &vec![1,1,1]).unwrap())); // [1,1,1] for CUDA
blob_gradient = Arc::new(RwLock::new(SharedTensor::new(backend.device(), &vec![1,1,1]).unwrap())); // [1,1,1] for CUDA
blob_data = Arc::new(RwLock::new(SharedTensor::new(&[1,1,1]))); // [1,1,1] for CUDA
blob_gradient = Arc::new(RwLock::new(SharedTensor::new(&[1,1,1]))); // [1,1,1] for CUDA
}
self.output_blob_names.push(blob_name.clone());
self.output_blobs_data.push(blob_data.clone());
Expand All @@ -234,8 +234,8 @@ impl<B: IBackend> Layer<B> {
info!("{} -> {}", self.name, blob_name);

let backend: Rc<IBackend<F=B::F>> = self.backend.clone();
let output_data = Arc::new(RwLock::new(SharedTensor::new(backend.device(), &vec![1,1,1]).unwrap())); // [1,1,1] for CUDA
let output_gradient = Arc::new(RwLock::new(SharedTensor::new(backend.device(), &vec![1,1,1]).unwrap())); // [1,1,1] for CUDA
let output_data = Arc::new(RwLock::new(SharedTensor::new(&[1,1,1]))); // [1,1,1] for CUDA
let output_gradient = Arc::new(RwLock::new(SharedTensor::new(&[1,1,1]))); // [1,1,1] for CUDA
self.output_blobs_data.push(output_data);
self.output_blobs_gradient.push(output_gradient);
}
Expand Down Expand Up @@ -264,8 +264,8 @@ impl<B: IBackend> Layer<B> {
let net_weight_id = weights_len;
let output_data = self.output_blobs_data[weight_id].read().unwrap();
debug!("Layer {} - creating weight and gradient of size {:?}", &layer_config.name, output_data.desc());
let weight_data = Arc::new(RwLock::new(SharedTensor::<f32>::new(output_data.latest_device(), output_data.desc()).unwrap()));
let weight_gradient = Arc::new(RwLock::new(SharedTensor::<f32>::new(output_data.latest_device(), output_data.desc()).unwrap()));
let weight_data = Arc::new(RwLock::new(SharedTensor::new(output_data.desc())));
let weight_gradient = Arc::new(RwLock::new(SharedTensor::new(output_data.desc())));
self.weights_data.push(weight_data.clone());
self.weights_gradient.push(weight_gradient.clone());

Expand Down Expand Up @@ -460,11 +460,6 @@ impl<B: IBackend> Layer<B> {
self.input_blobs_data[input_i].write().unwrap().reshape(&reshaped_shape).unwrap();
}

self.worker.sync(&self.backend,
&mut self.input_blobs_data, &mut self.input_blobs_gradient,
&mut self.weights_data, &mut self.weights_gradient,
&mut self.output_blobs_data, &mut self.output_blobs_gradient);

let forward_time = timeit_loops!(1, {
if self.is_using_in_place() {
self.worker.forward(&self.backend, &vec![], &self.weights_data, &mut self.output_blobs_data);
Expand Down Expand Up @@ -497,11 +492,6 @@ impl<B: IBackend> Layer<B> {
self.output_blobs_gradient[output_i] = output.clone();
}

self.worker.sync(&self.backend,
&mut self.input_blobs_data, &mut self.input_blobs_gradient,
&mut self.weights_data, &mut self.weights_gradient,
&mut self.output_blobs_data, &mut self.output_blobs_gradient);

if self.is_using_in_place() {
self.worker.backward_input(&self.backend,
&self.weights_data,
Expand All @@ -527,11 +517,6 @@ impl<B: IBackend> Layer<B> {
///
/// This method is mostly used when doing backpropagation.
pub fn backward_parameters(&mut self) {
self.worker.sync(&self.backend,
&mut self.input_blobs_data, &mut self.input_blobs_gradient,
&mut self.weights_data, &mut self.weights_gradient,
&mut self.output_blobs_data, &mut self.output_blobs_gradient);

self.worker.backward_parameters(&self.backend,
&self.output_blobs_data,
&self.output_blobs_gradient,
Expand All @@ -553,13 +538,11 @@ impl<B: IBackend> Layer<B> {
///
/// [3]: ../solver/enum.LRPolicy.html
pub fn update_weights<SolverB: IBackend + ::util::SolverOps<f32>>(&mut self, backend: &SolverB) {
let mut shared_a = ::util::native_scalar(-1f32);
let _ = shared_a.add_device(IBackend::device(backend));
shared_a.sync(IBackend::device(backend)).unwrap();
// PERF: allocate this scalar once
let shared_a = ::util::native_scalar(-1f32);
for (weight_gradient, weight_data) in self.learnable_weights_gradients().iter().zip(&mut self.learnable_weights_data()) {
weight_gradient.write().unwrap().sync(IBackend::device(backend)).unwrap();
weight_data.write().unwrap().sync(IBackend::device(backend)).unwrap();
backend.axpy_plain(&shared_a, &weight_gradient.read().unwrap(), &mut weight_data.write().unwrap()).unwrap();
backend.axpy(&shared_a, &weight_gradient.read().unwrap(),
&mut weight_data.write().unwrap()).unwrap();
}
}

Expand Down Expand Up @@ -695,7 +678,6 @@ impl<B: IBackend> Layer<B> {
}

let mut weight_lock = weight.write().unwrap();
weight_lock.sync(native_backend.device()).unwrap();

let capnp_tensor = capnp_weight.get_tensor().unwrap();
let mut shape = Vec::new();
Expand All @@ -705,7 +687,7 @@ impl<B: IBackend> Layer<B> {
}
weight_lock.reshape(&shape).unwrap();

let mut native_slice = weight_lock.get_mut(native_backend.device()).unwrap().as_mut_native().unwrap().as_mut_slice::<f32>();
let mut native_slice = weight_lock.write_only(native_backend.device()).unwrap().as_mut_native().unwrap().as_mut_slice::<f32>();
let data = capnp_tensor.get_data().unwrap();
for k in 0..data.len() {
native_slice[k as usize] = data.get(k);
Expand Down Expand Up @@ -814,8 +796,7 @@ impl<'a, B: IBackend> CapnpWrite<'a> for Layer<B> {
let mut capnp_weight = weights.borrow().get(i as u32);
capnp_weight.set_name(name);

let mut weight_lock = weight.write().unwrap();
weight_lock.sync(native_backend.device()).unwrap();
let weight_lock = weight.write().unwrap();

let mut tensor = capnp_weight.init_tensor();
{
Expand All @@ -825,7 +806,8 @@ impl<'a, B: IBackend> CapnpWrite<'a> for Layer<B> {
}
}
{
let native_slice = weight_lock.get(native_backend.device()).unwrap().as_native().unwrap().as_slice::<f32>();
let native_slice = weight_lock.read(native_backend.device())
.unwrap().as_native().unwrap().as_slice::<f32>();
let mut tensor_data = tensor.borrow().init_data(native_slice.len() as u32);
for (i, datum) in native_slice.iter().enumerate() {
tensor_data.set(i as u32, *datum);
Expand Down Expand Up @@ -1025,74 +1007,6 @@ pub trait ILayer<B: IBackend> : ComputeOutput<f32, B> + ComputeInputGradient<f32
self.compute_parameters_gradient(backend, &output_data_, &output_gradients_, &input_data_, &mut weights_gradients_);
}

/// Synchronize the blobs before doing a forward or backward operation.
///
/// This is necessary because the forward_layer and backward_layer methods only immutably
/// borrow the corresponding input blobs and weights which they are not supposed to change.
/// However synchronizing all blobs to the same device may be neccessary for some computations,
/// which can only be done with a mutable borrow.
fn sync(&self,
backend: &B,
input_data: &mut [ArcLock<SharedTensor<f32>>],
input_gradients: &mut [ArcLock<SharedTensor<f32>>],
weights_data: &mut [ArcLock<SharedTensor<f32>>],
weights_gradients: &mut [ArcLock<SharedTensor<f32>>],
output_data: &mut Vec<ArcLock<SharedTensor<f32>>>,
output_gradients: &mut Vec<ArcLock<SharedTensor<f32>>>) {
if self.sync_native() {
let backend = native_backend();
for tensor in input_data {
let mut sync = tensor.write().unwrap();
match sync.add_device(backend.device()) { _ => sync.sync(backend.device()).unwrap() }
}
for tensor in input_gradients {
let mut sync = tensor.write().unwrap();
match sync.add_device(backend.device()) { _ => sync.sync(backend.device()).unwrap() }
}
for tensor in weights_data {
let mut sync = tensor.write().unwrap();
match sync.add_device(backend.device()) { _ => sync.sync(backend.device()).unwrap() }
}
for tensor in weights_gradients {
let mut sync = tensor.write().unwrap();
match sync.add_device(backend.device()) { _ => sync.sync(backend.device()).unwrap() }
}
for tensor in output_data {
let mut sync = tensor.write().unwrap();
match sync.add_device(backend.device()) { _ => sync.sync(backend.device()).unwrap() }
}
for tensor in output_gradients {
let mut sync = tensor.write().unwrap();
match sync.add_device(backend.device()) { _ => sync.sync(backend.device()).unwrap() }
}
} else {
for tensor in input_data {
let mut sync = tensor.write().unwrap();
match sync.add_device(backend.device()) { _ => sync.sync(backend.device()).unwrap() }
}
for tensor in input_gradients {
let mut sync = tensor.write().unwrap();
match sync.add_device(backend.device()) { _ => sync.sync(backend.device()).unwrap() }
}
for tensor in weights_data {
let mut sync = tensor.write().unwrap();
match sync.add_device(backend.device()) { _ => sync.sync(backend.device()).unwrap() }
}
for tensor in weights_gradients {
let mut sync = tensor.write().unwrap();
match sync.add_device(backend.device()) { _ => sync.sync(backend.device()).unwrap() }
}
for tensor in output_data {
let mut sync = tensor.write().unwrap();
match sync.add_device(backend.device()) { _ => sync.sync(backend.device()).unwrap() }
}
for tensor in output_gradients {
let mut sync = tensor.write().unwrap();
match sync.add_device(backend.device()) { _ => sync.sync(backend.device()).unwrap() }
}
}
}

/// Return whether "anonymous" output blobs are created automatically for the layer.
///
/// If this method returns true, Network::init will create enough "anonymous" output
Expand Down
12 changes: 6 additions & 6 deletions src/layers/activation/relu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ impl<B: IBackend + Relu<f32> + ReluPointwise<f32>> ComputeOutput<f32, B> for ReL
input_data: &[&SharedTensor<f32>],
output_data: &mut [&mut SharedTensor<f32>]) {
match input_data.get(0) {
Some(input) => backend.relu_plain(input, output_data[0]).unwrap(),
None => backend.relu_pointwise_plain(output_data[0]).unwrap(),
Some(input) => backend.relu(input, output_data[0]).unwrap(),
None => backend.relu_pointwise(output_data[0]).unwrap(),
}
}
}
Expand All @@ -72,8 +72,8 @@ impl<B: IBackend + Relu<f32> + ReluPointwise<f32>> ComputeInputGradient<f32, B>
input_data: &[&SharedTensor<f32>],
input_gradients: &mut [&mut SharedTensor<f32>]) {
match output_data.get(0) {
Some(_) => backend.relu_grad_plain(output_data[0], output_gradients[0], input_data[0], input_gradients[0]).unwrap(),
None => backend.relu_pointwise_grad_plain(input_data[0], input_gradients[0]).unwrap(),
Some(_) => backend.relu_grad(output_data[0], output_gradients[0], input_data[0], input_gradients[0]).unwrap(),
None => backend.relu_pointwise_grad(input_data[0], input_gradients[0]).unwrap(),
}
}
}
Expand Down Expand Up @@ -115,7 +115,7 @@ impl<B: IBackend + Relu<f32>> ComputeOutput<f32, B> for ReLU {
input_data: &[&SharedTensor<f32>],
output_data: &mut [&mut SharedTensor<f32>]) {
match input_data.get(0) {
Some(input) => backend.relu_plain(input, output_data[0]).unwrap(),
Some(input) => backend.relu(input, output_data[0]).unwrap(),
None => panic!("No input provided for ReLU layer."),
}
}
Expand All @@ -131,7 +131,7 @@ impl<B: IBackend + Relu<f32>> ComputeInputGradient<f32, B> for ReLU {
input_data: &[&SharedTensor<f32>],
input_gradients: &mut [&mut SharedTensor<f32>]) {
match output_data.get(0) {
Some(_) => backend.relu_grad_plain(output_data[0], output_gradients[0], input_data[0], input_gradients[0]).unwrap(),
Some(_) => backend.relu_grad(output_data[0], output_gradients[0], input_data[0], input_gradients[0]).unwrap(),
None => panic!("No output_data provided for ReLU layer backward."),
}
}
Expand Down
14 changes: 8 additions & 6 deletions src/layers/activation/sigmoid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ impl<B: IBackend + conn::Sigmoid<f32> + conn::SigmoidPointwise<f32>> ComputeOutp
input_data: &[&SharedTensor<f32>],
output_data: &mut [&mut SharedTensor<f32>]) {
match input_data.get(0) {
Some(input) => backend.sigmoid_plain(input, output_data[0]).unwrap(),
None => backend.sigmoid_pointwise_plain(output_data[0]).unwrap(),
Some(input) => backend.sigmoid(input, output_data[0]).unwrap(),
None => backend.sigmoid_pointwise(output_data[0]).unwrap(),
}
}
}
Expand All @@ -76,8 +76,9 @@ impl<B: IBackend + conn::Sigmoid<f32> + conn::SigmoidPointwise<f32>> ComputeInpu
input_data: &[&SharedTensor<f32>],
input_gradients: &mut [&mut SharedTensor<f32>]) {
match output_data.get(0) {
Some(_) => backend.sigmoid_grad_plain(output_data[0], output_gradients[0], input_data[0], input_gradients[0]).unwrap(),
None => backend.sigmoid_pointwise_grad_plain(input_data[0], input_gradients[0]).unwrap(),
Some(_) => backend.sigmoid_grad(output_data[0], output_gradients[0],
input_data[0], input_gradients[0]).unwrap(),
None => backend.sigmoid_pointwise_grad(input_data[0], input_gradients[0]).unwrap(),
}
}
}
Expand Down Expand Up @@ -119,7 +120,7 @@ impl<B: IBackend + conn::Sigmoid<f32>> ComputeOutput<f32, B> for Sigmoid {
input_data: &[&SharedTensor<f32>],
output_data: &mut [&mut SharedTensor<f32>]) {
match input_data.get(0) {
Some(input) => backend.sigmoid_plain(input, output_data[0]).unwrap(),
Some(input) => backend.sigmoid(input, output_data[0]).unwrap(),
None => panic!("No input provided for Sigmoid layer."),
}
}
Expand All @@ -135,7 +136,8 @@ impl<B: IBackend + conn::Sigmoid<f32>> ComputeInputGradient<f32, B> for Sigmoid
input_data: &[&SharedTensor<f32>],
input_gradients: &mut [&mut SharedTensor<f32>]) {
match output_data.get(0) {
Some(_) => backend.sigmoid_grad_plain(output_data[0], output_gradients[0], input_data[0], input_gradients[0]).unwrap(),
Some(_) => backend.sigmoid_grad(output_data[0], output_gradients[0],
input_data[0], input_gradients[0]).unwrap(),
None => panic!("No output_data provided for Sigmoid layer backward."),
}
}
Expand Down
14 changes: 8 additions & 6 deletions src/layers/activation/tanh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ impl<B: IBackend + conn::Tanh<f32> + conn::TanhPointwise<f32>> ComputeOutput<f32
input_data: &[&SharedTensor<f32>],
output_data: &mut [&mut SharedTensor<f32>]) {
match input_data.get(0) {
Some(input) => backend.tanh_plain(input, output_data[0]).unwrap(),
None => backend.tanh_pointwise_plain(output_data[0]).unwrap(),
Some(input) => backend.tanh(input, output_data[0]).unwrap(),
None => backend.tanh_pointwise(output_data[0]).unwrap(),
}
}
}
Expand All @@ -73,8 +73,9 @@ impl<B: IBackend + conn::Tanh<f32> + conn::TanhPointwise<f32>> ComputeInputGradi
input_data: &[&SharedTensor<f32>],
input_gradients: &mut [&mut SharedTensor<f32>]) {
match output_data.get(0) {
Some(_) => backend.tanh_grad_plain(output_data[0], output_gradients[0], input_data[0], input_gradients[0]).unwrap(),
None => backend.tanh_pointwise_grad_plain(input_data[0], input_gradients[0]).unwrap(),
Some(_) => backend.tanh_grad(output_data[0], output_gradients[0],
input_data[0], input_gradients[0]).unwrap(),
None => backend.tanh_pointwise_grad(input_data[0], input_gradients[0]).unwrap(),
}
}
}
Expand Down Expand Up @@ -116,7 +117,7 @@ impl<B: IBackend + conn::Tanh<f32>> ComputeOutput<f32, B> for TanH {
input_data: &[&SharedTensor<f32>],
output_data: &mut [&mut SharedTensor<f32>]) {
match input_data.get(0) {
Some(input) => backend.tanh_plain(input, output_data[0]).unwrap(),
Some(input) => backend.tanh(input, output_data[0]).unwrap(),
None => panic!("No input provided for TanH layer."),
}
}
Expand All @@ -132,7 +133,8 @@ impl<B: IBackend + conn::Tanh<f32>> ComputeInputGradient<f32, B> for TanH {
input_data: &[&SharedTensor<f32>],
input_gradients: &mut [&mut SharedTensor<f32>]) {
match output_data.get(0) {
Some(_) => backend.tanh_grad_plain(output_data[0], output_gradients[0], input_data[0], input_gradients[0]).unwrap(),
Some(_) => backend.tanh_grad(output_data[0], output_gradients[0],
input_data[0], input_gradients[0]).unwrap(),
None => panic!("No output_data provided for TanH layer backward."),
}
}
Expand Down
Loading

0 comments on commit e20fc95

Please sign in to comment.