You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
368 lines
16 KiB
368 lines
16 KiB
import numpy as np
|
|
import tensorflow as tf
|
|
from utils import *
|
|
|
|
DEFAULT_PADDING = 'SAME'
|
|
|
|
"""
|
|
A wrapper for TensorFlow network layers
|
|
Adapted from SubCNN_TF (https://github.com/yuxng/SubCNN_TF)
|
|
"""
|
|
|
|
def layer(op):
|
|
def layer_decorated(self, *args, **kwargs):
|
|
# Automatically set a name if not provided.
|
|
name = kwargs.setdefault('name', self.get_unique_name(op.__name__))
|
|
# Figure out the layer inputs.
|
|
if len(self.inputs) == 0:
|
|
raise RuntimeError('No input variables found for layer %s.'%name)
|
|
elif len(self.inputs) == 1:
|
|
layer_input = self.inputs[0]
|
|
else:
|
|
layer_input = list(self.inputs)
|
|
# Perform the operation and get the output.
|
|
layer_output = op(self, layer_input, *args, **kwargs)
|
|
# Add to layer LUT.
|
|
if name in self.layers:
|
|
print('overriding layer %s!!!!' % name)
|
|
self.layers[name] = layer_output
|
|
self.feed(layer_output)
|
|
return self
|
|
return layer_decorated
|
|
|
|
class Network(object):
|
|
def __init__(self, inputs, trainable=True):
|
|
self.inputs = []
|
|
self.layers = dict(inputs)
|
|
self.trainable = trainable
|
|
self.setup()
|
|
|
|
def setup(self):
|
|
raise NotImplementedError('Must be subclassed.')
|
|
|
|
def feed(self, *args):
|
|
assert len(args)!=0
|
|
self.inputs = []
|
|
for layer in args:
|
|
if isinstance(layer, basestring):
|
|
try:
|
|
layer = self.layers[layer]
|
|
print "feed function"
|
|
print layer
|
|
except KeyError:
|
|
print "feed function"
|
|
print self.layers.keys()
|
|
raise KeyError('Unknown layer name fed: %s'%layer)
|
|
self.inputs.append(layer)
|
|
return self
|
|
|
|
def wrap(self, tensor, name):
|
|
self.layers[name] = tensor
|
|
return self.feed(tensor)
|
|
|
|
@layer
|
|
def stop_gradient(self, input, name):
|
|
return tf.stop_gradient(input, name=name)
|
|
|
|
def get_output(self, layer):
|
|
try:
|
|
layer = self.layers[layer]
|
|
except KeyError:
|
|
print "get_output function"
|
|
print self.layers.keys()
|
|
raise KeyError('Unknown layer name fed: %s'%layer)
|
|
return layer
|
|
|
|
def activation_summary(self, layer_name):
|
|
try:
|
|
layer = self.layers[layer_name]
|
|
self._variable_summaries(layer, layer_name)
|
|
|
|
except KeyError:
|
|
print self.layers.keys()
|
|
raise KeyError('Unknown layer name fed: %s'%layer)
|
|
return layer
|
|
|
|
def _variable_summaries(self, var, name):
|
|
"""Attach a lot of summaries to a Tensor."""
|
|
with tf.name_scope('summaries'):
|
|
mean = tf.reduce_mean(var)
|
|
tf.summary.scalar('mean/' + name, mean)
|
|
tf.summary.histogram(name, var)
|
|
|
|
def get_unique_name(self, prefix):
|
|
id = sum(t.startswith(prefix) for t,_ in self.layers.items())+1
|
|
return '%s_%d'%(prefix, id)
|
|
|
|
def make_var(self, name, shape, initializer=None, trainable=True):
|
|
return tf.get_variable(name, shape, initializer=initializer, trainable=trainable)
|
|
|
|
def validate_padding(self, padding):
|
|
assert padding in ('SAME', 'VALID')
|
|
|
|
def conv_residual_block(self, input_layer, k_size, output_channel, init_weight, name=None, leaky_value=0.0,
|
|
padding='SAME', trainable=True, reuse=False, bottle_neck=False):
|
|
input_channel = input_layer.get_shape().as_list()[-1]
|
|
|
|
if not (input_channel * 2 == output_channel or input_channel == output_channel) == True:
|
|
raise ValueError("Output and input channel does not match in convolution residual blocks!!!")
|
|
|
|
if bottle_neck:
|
|
valid_strides = [1, 2, 2, 2, 1]
|
|
else:
|
|
valid_strides = [1, 1, 1, 1, 1]
|
|
|
|
same_strides = [1, 1, 1, 1, 1]
|
|
|
|
with tf.variable_scope(name) as scope:
|
|
if reuse: tf.get_variable_scope().reuse_variables()
|
|
|
|
short_cut = input_layer
|
|
if bottle_neck:
|
|
left_weight_name = 'left_conv'
|
|
left_kernel = make_var(left_weight_name, [1, 1, 1, input_channel, output_channel], init_weight, trainable=trainable)
|
|
left_conv_out = tf.nn.conv3d(input_layer, left_kernel, strides=valid_strides, padding=padding, name='left_conv')
|
|
short_cut = left_conv_out
|
|
|
|
right_first_depth = output_channel / 2
|
|
right_second_depth = output_channel / 2
|
|
right_third_depth = output_channel
|
|
|
|
right_first_layer_name = 'right_conv_0'
|
|
right_second_layer_name = 'right_conv_1'
|
|
right_third_layer_name = 'right_conv_2'
|
|
|
|
right_first_conv = self.conv3d(input_layer, 1, right_first_depth, right_first_layer_name, init_weight,
|
|
strides=valid_strides, padding=DEFAULT_PADDING, leaky_value=leaky_value,
|
|
relu=True, has_biases=False, batch_norm=True, trainable=trainable, reuse=reuse)
|
|
|
|
right_second_conv = self.conv3d(right_first_conv, k_size, right_second_depth, right_second_layer_name, init_weight,
|
|
strides=same_strides, padding=DEFAULT_PADDING, leaky_value=leaky_value, relu=True,
|
|
has_biases=False, batch_norm=True, trainable=trainable, reuse=reuse)
|
|
|
|
right_third_conv = self.conv3d(right_second_conv, 1, right_third_depth, right_third_layer_name, init_weight,
|
|
strides=same_strides, padding=DEFAULT_PADDING, leaky_value=leaky_value, relu=False,
|
|
has_biases=False, batch_norm=True, trainable=trainable, reuse=reuse)
|
|
|
|
res_block_out = tf.add(short_cut, right_third_conv)
|
|
res_block_out = lrelu(res_block_out, leaky_value, name=scope.name)
|
|
return res_block_out
|
|
|
|
|
|
def deconv_residual_block(self, input_layer, k_size, output_channel, target_shape, init_weights, name=None, leaky_value=0.0,
|
|
padding='SAME', trainable=True, reuse=False, bottle_neck=True):
|
|
input_channel = input_layer.get_shape().as_list()[-1]
|
|
|
|
if not (input_channel == output_channel * 2 or input_channel == output_channel) == True:
|
|
raise ValueError("Output and input channel does not match in deconvolution residual blocks!!!")
|
|
|
|
if bottle_neck:
|
|
valid_strides = [1, 2, 2, 2, 1]
|
|
else:
|
|
valid_strides = [1, 1, 1, 1, 1]
|
|
same_strides = [1, 1, 1, 1, 1]
|
|
|
|
with tf.variable_scope(name) as scope:
|
|
if reuse: tf.get_variable_scope().reuse_variables()
|
|
|
|
short_cut = input_layer
|
|
if bottle_neck:
|
|
left_weight_name = 'left_deconv'
|
|
left_deconv_out = self.deconv3d(input_layer, 1, output_channel, left_weight_name, target_shape, init_weights, strides=valid_strides,
|
|
leaky_value=leaky_value, padding=DEFAULT_PADDING, relu=False,
|
|
has_biases=False, batch_norm=True, trainable=trainable, reuse=reuse)
|
|
short_cut = left_deconv_out
|
|
|
|
right_first_depth = output_channel / 2
|
|
right_second_depth = output_channel / 2
|
|
right_third_depth = output_channel
|
|
|
|
right_first_layer_name = 'right_deconv_0'
|
|
right_second_layer_name = 'right_deconv_1'
|
|
right_third_layer_name = 'right_deconv_2'
|
|
|
|
first_target_shape = [target_shape[0], target_shape[1], target_shape[2], target_shape[3], right_first_depth]
|
|
second_target_shape = [target_shape[0], target_shape[1], target_shape[2], target_shape[3], right_second_depth]
|
|
third_target_shape = [target_shape[0], target_shape[1], target_shape[2], target_shape[3], right_third_depth]
|
|
|
|
right_first_deconv = self.deconv3d(input_layer, 1, right_first_depth, right_first_layer_name,
|
|
first_target_shape, init_weights, strides=valid_strides,
|
|
leaky_value=leaky_value, padding=DEFAULT_PADDING, relu=True,
|
|
has_biases=False, batch_norm=True, trainable=trainable, reuse=reuse)
|
|
|
|
right_second_deconv = self.deconv3d(right_first_deconv, k_size, right_second_depth, right_second_layer_name,
|
|
second_target_shape, init_weights, strides=same_strides,
|
|
leaky_value=leaky_value, padding=DEFAULT_PADDING, relu=True,
|
|
has_biases=False, batch_norm=True, trainable=trainable, reuse=reuse)
|
|
|
|
right_third_deconv = self.deconv3d(right_second_deconv, 1, right_third_depth, right_third_layer_name,
|
|
third_target_shape, init_weights, strides=same_strides,
|
|
leaky_value=leaky_value, padding=DEFAULT_PADDING, relu=False,
|
|
has_biases=False, batch_norm=True, trainable=trainable, reuse=reuse)
|
|
|
|
res_block_out = tf.add(short_cut, right_third_deconv)
|
|
res_block_out = lrelu(res_block_out, leaky_value, name=scope.name)
|
|
return res_block_out
|
|
|
|
|
|
def conv3d(self, input, k_size, out_depth, name, init_weights, strides, init_biases=None, padding=DEFAULT_PADDING, leaky_value=0.0,
|
|
relu=True, has_biases=True, batch_norm=True, trainable=True, reuse=False):
|
|
self.validate_padding(padding)
|
|
weight_name = 'weight'
|
|
biases_name = 'biases'
|
|
in_depth = input.get_shape()[-1]
|
|
with tf.variable_scope(name) as scope:
|
|
if reuse: tf.get_variable_scope().reuse_variables()
|
|
|
|
if has_biases:
|
|
kernel = make_var(weight_name, [k_size, k_size, k_size, in_depth, out_depth], init_weights, trainable=trainable)
|
|
biases = make_var(biases_name, [out_depth], init_biases, trainable=trainable)
|
|
conv3d_out = tf.nn.conv3d(input, kernel, strides=strides, padding=padding, name=name)
|
|
conv_out = tf.nn.bias_add(conv3d_out, biases)
|
|
|
|
else:
|
|
kernel = make_var(weight_name, [k_size, k_size, k_size, in_depth, out_depth], init_weights, trainable=trainable)
|
|
conv_out = tf.nn.conv3d(input, kernel, strides=strides, padding=padding, name=name)
|
|
|
|
if batch_norm:
|
|
conv_out = tf.contrib.layers.batch_norm(conv_out, is_training=trainable)
|
|
if relu:
|
|
conv_out = lrelu(conv_out, leaky_value, scope.name)
|
|
return conv_out
|
|
|
|
def deconv3d(self, input, k_size, out_depth, name, target_shape, init_weights, strides, init_biases=None, leaky_value=0.0, relu=True, batch_norm=True,
|
|
padding=DEFAULT_PADDING, has_biases=False, trainable=True, reuse=False):
|
|
self.validate_padding(padding)
|
|
weight_name = 'weight'
|
|
biases_name = 'biases'
|
|
in_depth = input.get_shape()[-1]
|
|
with tf.variable_scope(name) as scope:
|
|
if reuse: tf.get_variable_scope().reuse_variables()
|
|
|
|
if has_biases:
|
|
kernel = make_var(weight_name, [k_size, k_size, k_size, out_depth, in_depth], init_weights, trainable=trainable)
|
|
biases = make_var(biases_name, [out_depth], init_biases, trainable=trainable)
|
|
|
|
deconv3d_out = tf.nn.conv3d_transpose(input, kernel, target_shape, strides=strides, padding=padding, name=name)
|
|
deconv_out = tf.nn.bias_add(deconv3d_out, biases)
|
|
else:
|
|
kernel = make_var(weight_name, [k_size, k_size, k_size, out_depth, in_depth], init_weights, trainable=trainable)
|
|
deconv_out = tf.nn.conv3d_transpose(input, kernel, target_shape, strides=strides, padding=padding, name=name)
|
|
|
|
if batch_norm:
|
|
deconv_out = tf.contrib.layers.batch_norm(deconv_out, is_training=trainable)
|
|
if relu:
|
|
deconv_out = lrelu(deconv_out, leaky_value, name=scope.name)
|
|
return deconv_out
|
|
|
|
@layer
|
|
def relu(self, input, name):
|
|
return tf.nn.relu(input, name=name)
|
|
|
|
@layer
|
|
def sigmoid(self, input, name):
|
|
return tf.sigmoid(input, name=name)
|
|
|
|
@layer
|
|
def tanh(self, input, name):
|
|
return tf.tanh(input, name=name)
|
|
|
|
@layer
|
|
def lrelu(self, input, leaky_value, name):
|
|
return lrelu(input, leaky_value, name=name)
|
|
|
|
@layer
|
|
def max_pool(self, input, k_h, k_w, s_h, s_w, name, padding=DEFAULT_PADDING):
|
|
self.validate_padding(padding)
|
|
return tf.nn.max_pool(input,
|
|
ksize=[1, k_h, k_w, 1],
|
|
strides=[1, s_h, s_w, 1],
|
|
padding=padding,
|
|
name=name)
|
|
|
|
@layer
|
|
def avg_pool(self, input, k_h, k_w, s_h, s_w, name, padding=DEFAULT_PADDING):
|
|
self.validate_padding(padding)
|
|
return tf.nn.avg_pool(input,
|
|
ksize=[1, k_h, k_w, 1],
|
|
strides=[1, s_h, s_w, 1],
|
|
padding=padding,
|
|
name=name)
|
|
|
|
@layer
|
|
def lrn(self, input, radius, alpha, beta, name, bias=1.0):
|
|
return tf.nn.local_response_normalization(input,
|
|
depth_radius=radius,
|
|
alpha=alpha,
|
|
beta=beta,
|
|
bias=bias,
|
|
name=name)
|
|
|
|
@layer
|
|
def concat(self, inputs, axis, name):
|
|
return tf.concat(axis=axis, values=inputs, name=name)
|
|
|
|
|
|
@layer
|
|
def fc(self, input, num_out, name, relu=True, leaky_value = 0.0, trainable=True, reuse=False):
|
|
with tf.variable_scope(name) as scope:
|
|
if reuse: tf.get_variable_scope().reuse_variables()
|
|
# only use the first input
|
|
if isinstance(input, tuple):
|
|
input = input[0]
|
|
|
|
input_shape = input.get_shape()
|
|
print name
|
|
print input_shape
|
|
if input_shape.ndims == 5:
|
|
dim = 1
|
|
for d in input_shape[1:].as_list():
|
|
dim *= d
|
|
feed_in = tf.reshape(input, [-1, dim])
|
|
elif input_shape.ndims == 3:
|
|
dim = 1
|
|
for d in input_shape[2:].as_list():
|
|
dim *= d
|
|
feed_in = tf.reshape(input, [-1, dim])
|
|
else:
|
|
feed_in, dim = (input, int(input_shape[-1]))
|
|
|
|
init_weights = tf.truncated_normal_initializer(0.0, stddev=0.001)
|
|
init_biases = tf.constant_initializer(0.0)
|
|
weights = make_var('weights', [dim, num_out], init_weights, trainable)
|
|
biases = make_var('biases', [num_out], init_biases, trainable)
|
|
|
|
if relu:
|
|
fc = lrelu(tf.matmul(feed_in, weights) + biases, leaky_value, scope.name)
|
|
else:
|
|
fc = tf.add(tf.matmul(feed_in, weights), biases, name=scope.name)
|
|
|
|
return fc
|
|
|
|
@layer
|
|
def softmax(self, input, name):
|
|
input = tf.cast(input, dtype=tf.float64)
|
|
return tf.nn.softmax(input, name=name)
|
|
|
|
@layer
|
|
def dropout(self, input, keep_prob, name):
|
|
return tf.nn.dropout(input, keep_prob, name=name)
|
|
|
|
@layer
|
|
def argmax(self, input, dim, name):
|
|
return tf.argmax(input, dim, name=name)
|
|
|
|
@layer
|
|
def identity(self, input, name):
|
|
return tf.identity(input, name=name)
|
|
|
|
@layer
|
|
def batch_norm(self, input, name, is_training=True, relu=False, reuse=False):
|
|
with tf.variable_scope(name) as scope:
|
|
if reuse:
|
|
tf.get_variable_scope().reuse_variables()
|
|
|
|
t = tf.contrib.layers.batch_norm(input, is_training=is_training)
|
|
if relu:
|
|
t = tf.nn.relu(t)
|
|
return t
|
|
|