-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathad.py
441 lines (351 loc) · 14.5 KB
/
ad.py
1
import numpy as npclass Node(object): def __init__(self): self.inputs = []#输入节点列表 self.op = None#运算符类型 self.constVal = None#常量 self.name = "" def __add__(self, other): # 重载加法 if isinstance(other, Node): # 如果加法的另一个操作数也是节点 retNode = add_op(self, other) else: # 是常数 retNode = add_const_op(self, other) return retNode def __mul__(self, other): # 重载乘法 if isinstance(other, Node): # 如果加法的另一个操作数也是节点 retNode = mul_op(self, other) else: # 是常数 retNode = mul_const_op(self, other) return retNode def __neg__(self): return neg_op(self) def __sub__(self,other): if isinstance(other,Node): retNode=sub_op(self,other) else: retNode=sub_const_op(self,other) return retNode def __rsub__(self,other): if isinstance(other,Node): retNode=sub_op(self,other) else: retNode=rsub_const_op(other,self) return retNode def __truediv__(self, other): if isinstance(other,Node): retNode=div_op(self,other) else: retNode=div_const_op(self,other) return retNode def __rtruediv__(self, other): if (isinstance(other,Node)): retNode=div_op(self,other) else: retNode=rdiv_const_op(other,self) return retNode __radd__ = __add__ __rmul__ = __mul__ def __str__(self): # 便于输出 return self.nameclass Op(object): # 操作类型的基类 def __call__(self): # 通过call调用 newNode = Node() # 调用时新建一个节点 newNode.op = self return newNode def forward(self, node, inputVals): """ 前向计算(抽象) :param node: 执行计算的节点 :param inputVals: 输入节点(node的父节点)的值列表 :return: 前向输出值 """ raise NotImplementedError def backward(self, node, gradFromBack): """ 反向梯度(抽象) :param node: 执行计算的节点 :param gradFromBack: node节点所有子节点的梯度和 :return: node节点对其各父节点的梯度列表 """ raise NotImplementedErrorclass AddOp(Op): # 两节点加法操作类,后面以一个singleton object(单对象)的形式实例化 # 用singleton object的形式是因为我们不需要一个“操作”记录任何东西 # Op只是指示了操作类型以及需要进行的操作,xxOp是各个节点的companion object # 我们需要记录的信息(变量名,输入,运算类型)都保存在Node内 # 我们需要的只是这个“操作”本身从Node中读取信息,执行我们想要的计算 def __call__(self, inputNode1, inputNode2): newNode = Op.__call__(self) newNode.inputs = [inputNode1, inputNode2] newNode.name = "(%s+%s)" % (inputNode1.name, inputNode2.name) return newNode def forward(self, node, inputVals): return sum(inputVals) def backward(self, node, gradFromBack): # d(N1+N2)/dN1*grad=grad # d(N1+N2)/dN2*grad=grad return [gradFromBack, gradFromBack]add_op = AddOp() # 实例化class AddConstOp(Op): # 节点与常量加法类 def __call__(self, inputNode1, const): newNode = Op.__call__(self) newNode.inputs = [inputNode1] newNode.constVal = const # node中常量记录 newNode.name = "(%s+%s)" % (inputNode1.name, str(const)) return newNode def forward(self, node: Node, inputVals): return inputVals[0] + node.constVal def backward(self, node, gradFromBack): # 常量的梯度必然为0,也没有必要计算常量的梯度 return [gradFromBack] # 因此只返回节点的梯度*gradadd_const_op = AddConstOp() # 实例化class SubOp(Op): def __call__(self, inputNode1, inputNode2): newNode = Op.__call__(self) newNode.inputs = [inputNode1, inputNode2] newNode.name = "(%s-%s)" % (inputNode1.name, inputNode2.name) return newNode def forward(self, node, inputVals): return inputVals[0]-inputVals[1] def backward(self, node, gradFromBack): return [gradFromBack, -gradFromBack]sub_op=SubOp()class SubConstOp(Op): def __call__(self, inputNode1, const): newNode = Op.__call__(self) newNode.inputs = [inputNode1] newNode.constVal = const # node中常量记录 newNode.name = "(%s-%s)" % (inputNode1.name, str(const)) return newNode def forward(self, node: Node, inputVals): return inputVals[0] - node.constVal def backward(self, node, gradFromBack): # 常量的梯度必然为0,也没有必要计算常量的梯度 return [gradFromBack] # 因此只返回节点的梯度*gradsub_const_op=SubConstOp()class RSubConstOp(Op): def __call__(self, const,inputNode1): newNode = Op.__call__(self) newNode.inputs = [inputNode1] newNode.constVal = const # node中常量记录 newNode.name = "(%s-%s)" % (str(const),inputNode1.name) return newNode def forward(self, node: Node, inputVals): return node.constVal-inputVals[0] def backward(self, node, gradFromBack): # 常量的梯度必然为0,也没有必要计算常量的梯度 return [-gradFromBack] # 因此只返回节点的梯度*gradrsub_const_op=RSubConstOp()class NegOp(Op): def __call__(self,inputNode): newNode=Op.__call__(self) newNode.inputs=[inputNode] newNode.name="(-%s)"%inputNode.name return newNode def forward(self, node, inputVals): return -inputVals[0] def backward(self, node, gradFromBack): return [-gradFromBack]neg_op=NegOp()class MulOp(Op):#逐元素乘法 def __call__(self, inputNode1, inputNode2): newNode = Op.__call__(self) newNode.inputs = [inputNode1, inputNode2] newNode.name = "(%s*%s)" % (inputNode1.name, inputNode2.name) return newNode def forward(self, node:Node, inputVals): return inputVals[0]*inputVals[1] def backward(self, node:Node, gradFromBack): return [node.inputs[1]*gradFromBack,node.inputs[0]*gradFromBack]mul_op=MulOp()class MulConstOp(Op):#逐元素乘法 def __call__(self, inputNode1, const): newNode = Op.__call__(self) newNode.inputs = [inputNode1] newNode.constVal=const newNode.name = "(%s*%s)" % (inputNode1.name, str(const)) return newNode def forward(self, node:Node, inputVals): return inputVals[0]*node.constVal def backward(self, node:Node, gradFromBack): return [node.constVal*gradFromBack]mul_const_op=MulConstOp()class DivOp(Op):#逐元素乘法 def __call__(self, inputNode1, inputNode2): newNode = Op.__call__(self) newNode.inputs = [inputNode1, inputNode2] newNode.name = "(%s/%s)" % (inputNode1.name, inputNode2.name) return newNode def forward(self, node:Node, inputVals): return inputVals[0]/inputVals[1] def backward(self, node:Node, gradFromBack): return [gradFromBack/node.inputs[1],-gradFromBack*node.inputs[0]/(node.inputs[1]*node.inputs[1])]div_op=DivOp()class DivConstOp(Op):#逐元素乘法 def __call__(self, inputNode1, const): newNode = Op.__call__(self) newNode.inputs = [inputNode1] newNode.constVal=const newNode.name = "(%s/%s)" % (inputNode1.name, str(const)) return newNode def forward(self, node:Node, inputVals): return inputVals[0]/node.constVal def backward(self, node:Node, gradFromBack): return [gradFromBack/node.constVal]div_const_op=DivConstOp()class LogOp(Op): def __call__(self, node): new_node = Op.__call__(self) new_node.inputs = [node] new_node.name = "log(%s)" % node.name return new_node def forward(self, node, input_vals): return np.log(input_vals[0]) def backward(self, node, gradFromBack): return [gradFromBack / node.inputs[0]]log_op=LogOp()class RDivConstOp(Op): def __call__(self, const,inputNode1): newNode = Op.__call__(self) newNode.inputs = [inputNode1] newNode.constVal = const newNode.name = "(%s/%s)" % (str(const),inputNode1.name ) return newNode def forward(self, node: Node, inputVals): return node.constVal/inputVals[0] def backward(self, node: Node, gradFromBack): return [-gradFromBack *node.constVal/ (node.inputs[0]*node.inputs[0])]rdiv_const_op=RDivConstOp()class PlaceHolderOp(Op): def __call__(self): newNode=Op.__call__(self) return newNode def forward(self, node,inputVals): raise NotImplementedError def backward(self,node,gradFromBack): return Noneplaceholder_op=PlaceHolderOp()class ZerosLikeOp(Op): def __call__(self, node): new_node = Op.__call__(self) new_node.inputs = [node] new_node.name = "Zeroslike(%s)" % node.name return new_node def forward(self,node,inputVals): assert(isinstance(inputVals[0], np.ndarray)) return np.zeros(inputVals[0].shape) def backward(self,node,gradFromBack): return [zeroslike_op(node.inputs[0])]zeroslike_op=ZerosLikeOp()class OnesLikeOp(Op): def __call__(self,node): newNode=Op.__call__(self) newNode.inputs=[node] newNode.name="Oneslike(%s)" % node.name return newNode def forward(self,node,inputVals): assert (isinstance(inputVals[0], np.ndarray)) return np.ones(inputVals[0].shape) def backward(self,node,gradFromBack): return [zeroslike_op(node.inputs[0])]oneslike_op=OnesLikeOp()class MatMulOp(Op): """Op to matrix multiply two nodes.""" def __call__(self, node_A, node_B, trans_A=False, trans_B=False): new_node = Op.__call__(self) new_node.matmul_attr_trans_A = trans_A new_node.matmul_attr_trans_B = trans_B new_node.inputs = [node_A, node_B] new_node.name = "MatMul(%s,%s,%s,%s)" % (node_A.name, node_B.name, str(trans_A), str(trans_B)) return new_node def forward(self, node, input_vals): mat_A = input_vals[0] mat_B = input_vals[1] if node.matmul_attr_trans_A: mat_A = mat_A.T if node.matmul_attr_trans_B: mat_B = mat_B.T return np.matmul(mat_A, mat_B) def backward(self, node, gradFromBack): return [matmul(gradFromBack, node.inputs[1], False, True), matmul(node.inputs[0], gradFromBack, True, False)]matmul=MatMulOp()class SoftMaxOp(Op): def __call__(self, node): newNode = Op.__call__(self) newNode.inputs = [node] newNode.name = "Softmax(%s)" % (node.name) return newNode def forward(self, node, inputVals): assert (isinstance(inputVals[0], np.ndarray)) return np.exp(inputVals[0])/np.sum(np.exp(inputVals[0])) def backward(self, node, gradFromBack): raise NotImplementedErrorsoftmax=SoftMaxOp()class SoftMax_CrossEnt_Op(Op): def __call__(self, node,target): newNode = Op.__call__(self) newNode.inputs = [node,target] newNode.name = "Softmax_CrossEnt(%s,%s)" % (node.name,target.name) return newNode def forward(self, node, inputVals): assert (isinstance(inputVals[0], np.ndarray)) return -np.sum(inputVals[1]*np.log(np.exp(inputVals[0])/np.sum(np.exp(inputVals[0])))) def backward(self, node, gradFromBack): return [(softmax(node.inputs[0])-node.inputs[1])*gradFromBack,0]softmax_crossent=SoftMax_CrossEnt_Op()def Variable(name:str): placeholderNode=placeholder_op() placeholderNode.name=name return placeholderNodedef log(input): if isinstance(input,Node): return log_op(input) else: return np.log(input)class Executor: def __init__(self, eval_node_list): self.eval_node_list = eval_node_list def run(self, feed_dict): node_to_val_map = dict(feed_dict) topo_order = list(find_topo_sort(self.eval_node_list)) for node in topo_order: if isinstance(node.op,PlaceHolderOp): continue inputVals=[node_to_val_map[ele] for ele in node.inputs] forwardVal=node.op.forward(node,inputVals) node_to_val_map[node]=forwardVal if isinstance(forwardVal,np.ndarray) else np.array(forwardVal) node_val_results = [node_to_val_map[node] for node in self.eval_node_list] return node_val_resultsdef gradients(output_node, node_list): node_to_output_grads_list = {}#输出y对每个节点n的梯度字典,每个value是一个列表,其内是从所有路径对key的梯度 node_to_output_grads_list[output_node] = [oneslike_op(output_node)]#首先加入输出节点对自己的梯度:全1阵 node_to_output_grad = {}#输出对每个节点的梯度 reverse_topo_order = list(reversed(find_topo_sort([output_node]))) for node in reverse_topo_order: grad_sum=sum_node_list(node_to_output_grads_list[node]) node_to_output_grad[node]=grad_sum for i in range(len(node.inputs)):#分析当前节点的所有父节点的梯度 parent=node.inputs[i] all_grads=node.op.backward(node,grad_sum)#获取当前节点对所有父节点的梯度列表 parent_grad_list=node_to_output_grads_list.get(parent,[])#获取输出对当前分析的父节点的梯度列表 parent_grad_list.append(all_grads[i])#为当前父节点的梯度列表追加上node对其的梯度 node_to_output_grads_list[parent]=parent_grad_list#将添加完的父节点梯度列表放回总梯度字典 grad_node_list = [node_to_output_grad[node] for node in node_list] return grad_node_listdef find_topo_sort(node_list): visited = set() topo_order = [] for node in node_list: topo_sort_dfs(node, visited, topo_order) return topo_orderdef topo_sort_dfs(node, visited, topo_order): """Post-order DFS""" if node in visited: return visited.add(node) for n in node.inputs: topo_sort_dfs(n, visited, topo_order) topo_order.append(node)def sum_node_list(node_list): """Custom sum function in order to avoid create redundant nodes in Python sum implementation.""" from operator import add from functools import reduce return reduce(add, node_list)