亲手打造你的神经网络大脑:从零开始理解 TensorFlow 的核心构建
想深入了解深度学习的运作机制,仅仅停留在调用现成的库肯定是不够的。你渴望理解那些复杂的计算图是如何构建的?数据是如何在节点间流动的?模型是如何一步步优化的?那么,这篇文章就是为你准备的——我们将一起揭开 TensorFlow 神秘的面纱,亲手构建一个精简版的“神经网络大脑”,从根本上理解它的核心设计理念。
你可能觉得“开发 TensorFlow”听起来遥不可及,就像要从头造一辆汽车。但实际上,我们关注的重点是 TensorFlow 的“灵魂”——计算图和自动微分。通过构建一个简化的框架,你可以直观地感受到这些核心概念是如何工作的。
第一步:理解计算图的本质
想象一下,你的神经网络就像一个精密运作的计算工厂。输入数据是原材料,经过一系列的加工单元(操作符),最终产出结果。TensorFlow 的核心理念就是将这个计算过程可视化成一张“计算图”。
节点 (Nodes): 代表操作符,比如加法 (`+`)、乘法 (``)、矩阵乘法 (`matmul`),或者变量 (`Variable`)、占位符 (`Placeholder`)。
边 (Edges): 代表数据流,也就是 Tensor(张量),它们在节点之间传递。
我们来模拟一下 TensorFlow 的图构建过程。首先,你需要一个数据结构来表示计算图的节点。一个简单的类就足够了:
```python
class Node:
def __init__(self, op, inputs, name=None):
self.op = op 操作类型,例如 '+' 或 'matmul'
self.inputs = inputs 输入的节点列表
self.name = name if name else f"{op}_{id(self)}" 节点的唯一标识
self.value = None 存储计算结果,用于调试或中间结果保存
self.grad = None 存储梯度值
def __str__(self):
return f"Node(op='{self.op}', name='{self.name}', inputs={[inp.name for inp in self.inputs]})"
def __repr__(self):
return self.__str__()
```
然后,我们需要一个类来代表这张计算图本身,它负责存储所有的节点,并提供一个“执行”的方法。
```python
class Graph:
def __init__(self):
self.nodes = {} 存储所有节点,key为节点名,value为Node对象
def add_node(self, node):
if node.name in self.nodes:
raise ValueError(f"Node with name '{node.name}' already exists.")
self.nodes[node.name] = node
def get_node(self, name):
return self.nodes.get(name)
def execute(self, output_node_names, input_values=None):
"""
执行计算图,计算指定输出节点的数值.
output_node_names: 需要计算的节点名称列表
input_values: 字典,key为输入节点的名称,value为输入数值
"""
if input_values is None:
input_values = {}
为了计算,我们需要确定节点的执行顺序.
通常,这可以通过拓扑排序实现,但为了简化,我们假设可以一次性计算.
实际的TensorFlow会构建一个依赖关系图并进行优化执行.
初始化所有节点的计算结果为None
for node in self.nodes.values():
node.value = None
设置输入节点的数值
for node_name, value in input_values.items():
node = self.get_node(node_name)
if node and node.op == 'placeholder': 假设输入节点op为'placeholder'
node.value = value
else:
raise ValueError(f"Input node '{node_name}' not found or is not a placeholder.")
递归地计算节点的值
def _calculate_node_value(node):
if node.value is not None:
return node.value
if node.op == 'placeholder':
raise ValueError(f"Placeholder node '{node.name}' has no value provided.")
获取输入节点的计算结果
input_values_for_op = []
for inp_node in node.inputs:
input_values_for_op.append(_calculate_node_value(inp_node))
执行操作
try:
if node.op == '+':
node.value = input_values_for_op[0] + input_values_for_op[1]
elif node.op == '':
node.value = input_values_for_op[0] input_values_for_op[1]
elif node.op == 'matmul':
简单的矩阵乘法实现,实际TensorFlow会调用底层库
node.value = np.matmul(input_values_for_op[0], input_values_for_op[1])
elif node.op == 'variable':
变量通常在图构建时就被赋值了
这里为了统一,我们直接返回它的值,但实际情况会更复杂
node.value = node.value if node.value is not None else np.zeros(1) 默认值
elif node.op == 'add': 演示简单的加法节点
node.value = np.add(input_values_for_op[0], input_values_for_op[1])
... 其他操作符可以根据需要添加
else:
raise ValueError(f"Unsupported operation: {node.op}")
except Exception as e:
print(f"Error executing node '{node.name}' with op '{node.op}': {e}")
raise
return node.value
results = {}
for output_name in output_node_names:
output_node = self.get_node(output_name)
if not output_node:
raise ValueError(f"Output node '{output_name}' not found in the graph.")
results[output_name] = _calculate_node_value(output_node)
return results
```
现在,我们来构建一个简单的计算图来测试它: `a + b c`。
```python
import numpy as np
创建一个图
graph = Graph()
创建输入节点 (placeholder)
a = Node(op='placeholder', inputs=[], name='a')
b = Node(op='placeholder', inputs=[], name='b')
c = Node(op='placeholder', inputs=[], name='c')
创建变量节点 (例如,一个权重)
w = Node(op='variable', inputs=[], name='w')
w.value = np.array([[2.0]]) 初始化变量的值
创建计算节点
mul_node = Node(op='', inputs=[b, c], name='mul_bc')
add_node = Node(op='+', inputs=[a, mul_node], name='add_a_mul')
将节点添加到图中
graph.add_node(a)
graph.add_node(b)
graph.add_node(c)
graph.add_node(w)
graph.add_node(mul_node)
graph.add_node(add_node)
准备输入数据
input_data = {
'a': np.array([1.0]),
'b': np.array([2.0]),
'c': np.array([3.0])
}
执行计算
results = graph.execute(output_node_names=['add_a_mul'], input_values=input_data)
print(f"Result of 'add_a_mul': {results['add_a_mul']}") 预期输出: [10.]
假设我们需要计算 a + b c w
创建一个新的乘法节点
mul_bcw = Node(op='', inputs=[mul_node, w], name='mul_bc_w')
graph.add_node(mul_bcw)
add_final = Node(op='+', inputs=[a, mul_bcw], name='final_output')
graph.add_node(add_final)
results_final = graph.execute(output_node_names=['final_output'], input_values=input_data)
print(f"Result of 'final_output': {results_final['final_output']}") 预期输出: [10.] + ([2.] [3.]) [[2.]] = [10.] + [12.] = [22.]
```
到这里,我们就构建了一个最基础的计算图引擎。它能接收输入,执行预设的操作,并输出结果。这就像是TensorFlow的“静态图”模式的核心。
第二步:自动微分的魔力(Backpropagation)
深度学习训练的核心是 反向传播 (Backpropagation),它本质上就是一种 自动微分 (Automatic Differentiation) 技术。它能够根据计算图,自动地计算出每个参数(变量)相对于损失函数的梯度。
要实现自动微分,我们需要为每个操作符定义其 梯度计算规则。这就像工厂里的每个加工单元都清楚自己是如何影响最终产品的,并且知道如何“回溯”计算影响力。
我们需要修改 `Node` 类,使其能够记录其输入节点的梯度,并有一个方法来计算自身的梯度。
```python
class Node:
def __init__(self, op, inputs, name=None):
self.op = op
self.inputs = inputs
self.name = name if name else f"{op}_{id(self)}"
self.value = None
self.grad = None 这里存储的是关于这个节点本身的梯度
self.grad_fn = None 存储计算自身梯度的函数
def __str__(self):
return f"Node(op='{self.op}', name='{self.name}', inputs={[inp.name for inp in self.inputs]})"
def __repr__(self):
return self.__str__()
修改 Graph 类,增加一个执行反向传播的方法
class Graph:
def __init__(self):
self.nodes = {}
self.gradients = {} 存储每个节点的梯度,key为节点名
def add_node(self, node):
if node.name in self.nodes:
raise ValueError(f"Node with name '{node.name}' already exists.")
self.nodes[node.name] = node
def get_node(self, name):
return self.nodes.get(name)
def execute(self, output_node_names, input_values=None):
... (execute 方法保持不变) ...
if input_values is None:
input_values = {}
for node in self.nodes.values():
node.value = None
node.grad = None 清空梯度
for node_name, value in input_values.items():
node = self.get_node(node_name)
if node and node.op == 'placeholder':
node.value = value
else:
raise ValueError(f"Input node '{node_name}' not found or is not a placeholder.")
def _calculate_node_value(node):
if node.value is not None:
return node.value
if node.op == 'placeholder':
raise ValueError(f"Placeholder node '{node.name}' has no value provided.")
input_values_for_op = []
for inp_node in node.inputs:
input_values_for_op.append(_calculate_node_value(inp_node))
try:
if node.op == '+':
node.value = np.add(input_values_for_op[0], input_values_for_op[1])
elif node.op == '':
node.value = np.multiply(input_values_for_op[0], input_values_for_op[1])
elif node.op == 'matmul':
node.value = np.matmul(input_values_for_op[0], input_values_for_op[1])
elif node.op == 'variable':
Variable nodes are initialized with values, not computed from inputs.
If they were computed, they'd have inputs. Here they are treated as constants in the forward pass.
node.value = node.value
elif node.op == 'add':
node.value = np.add(input_values_for_op[0], input_values_for_op[1])
elif node.op == 'square': 添加一个计算平方的操作
node.value = np.square(input_values_for_op[0])
else:
raise ValueError(f"Unsupported operation: {node.op}")
except Exception as e:
print(f"Error executing node '{node.name}' with op '{node.op}': {e}")
raise
return node.value
results = {}
for output_name in output_node_names:
output_node = self.get_node(output_name)
if not output_node:
raise ValueError(f"Output node '{output_name}' not found in the graph.")
results[output_name] = _calculate_node_value(output_node)
return results
def backward(self, output_node_name, loss_node_name):
"""
执行反向传播,计算所有叶子节点(variable)的梯度.
output_node_name: 图的输出节点名称 (例如,预测结果)
loss_node_name: 损失函数节点名称
"""
loss_node = self.get_node(loss_node_name)
if not loss_node:
raise ValueError(f"Loss node '{loss_node_name}' not found.")
初始化所有节点的梯度为0 (除了loss节点本身,它的梯度是1)
for node in self.nodes.values():
node.grad = np.zeros_like(node.value if node.value is not None else np.array(0)) 确保形状一致
loss 节点相对于自身的梯度是1
loss_node.grad = np.array(1.0)
需要一个方法来决定节点的执行顺序进行反向传播,这里用一个简单的反向拓扑排序思路
实际TensorFlow会构建一个依赖图并遍历
為了简化,我们从loss节点开始,向上回溯计算梯度
使用一个堆栈来存储需要处理的节点 (后进先出,模拟反向执行)
我们需要知道节点被哪些节点使用,以便进行反向传播
实际实现会构建一个“反向图”或者记录每个节点的“下游节点”
为了简化,我们直接通过访问 `node.inputs` 来寻找上游节点
并使用一个已访问集合来避免重复计算
processed_nodes = set()
nodes_to_process = [loss_node] 从损失节点开始处理
对节点进行反向排序,保证子节点梯度先计算
这是一个简化的实现,实际需要更精密的图遍历
sorted_nodes = self._topological_sort_reverse()
for node in sorted_nodes:
if node.grad_fn is None:
continue 没有定义梯度计算函数的节点(例如 placeholder)
触发计算该节点相对于其输入的梯度
node.grad_fn(node, ...) 会被调用,并更新node.inputs节点的梯度
这里我们需要一个机制来传递 `node.grad` 给 `grad_fn`
并且 `grad_fn` 需要知道如何根据 `node.grad` 来更新其输入节点的 `grad`
这意味着 `grad_fn` 需要知道 `node.op` 和 `node.inputs`
模拟 grad_fn 的调用
if node.op == '+':
d(a+b)/da = 1, d(a+b)/db = 1
node.grad 是 d(Loss)/d(node)
d(Loss)/da = d(Loss)/d(node) d(node)/da = node.grad 1
d(Loss)/db = d(Loss)/d(node) d(node)/db = node.grad 1
if len(node.inputs) == 2:
node.inputs[0].grad = node.inputs[0].grad + node.grad 1.0 累加梯度
node.inputs[1].grad = node.inputs[1].grad + node.grad 1.0
elif node.op == '':
d(ab)/da = b, d(ab)/db = a
d(Loss)/da = d(Loss)/d(node) d(node)/da = node.grad b
d(Loss)/db = d(Loss)/d(node) d(node)/db = node.grad a
if len(node.inputs) == 2:
需要 node.inputs[0] 的值(即 'a' 的值)和 node.inputs[1] 的值(即 'b' 的值)
在实际 TensorFlow 中,这里会通过中间值获取
input1_value = node.inputs[0].value 假设输入节点的值已经计算并存储
input2_value = node.inputs[1].value
node.inputs[0].grad = node.inputs[0].grad + node.grad input2_value
node.inputs[1].grad = node.inputs[1].grad + node.grad input1_value
elif node.op == 'matmul':
d(A @ B)/dA = B^T, d(A @ B)/dB = A^T
d(Loss)/dA = d(Loss)/d(node) @ B^T
d(Loss)/dB = A^T @ d(Loss)/d(node)
if len(node.inputs) == 2:
input1_value = node.inputs[0].value
input2_value = node.inputs[1].value
node.inputs[0].grad = node.inputs[0].grad + np.dot(node.grad, input2_value.T)
node.inputs[1].grad = node.inputs[1].grad + np.dot(input1_value.T, node.grad)
elif node.op == 'square':
d(x^2)/dx = 2x
d(Loss)/dx = d(Loss)/d(node) d(node)/dx = node.grad 2 x
if len(node.inputs) == 1:
input_value = node.inputs[0].value
node.inputs[0].grad = node.inputs[0].grad + node.grad 2 input_value
... 其他操作的梯度计算 ...
def _topological_sort_reverse(self):
"""
对计算图进行反向拓扑排序,以便反向传播。
这只是一个简化的实现,实际需要一个健壮的拓扑排序算法。
"""
visited = set()
order = []
def dfs(node):
visited.add(node)
遍历节点的输入节点(逆序访问依赖关系)
for neighbor in reversed(node.inputs):
if neighbor not in visited:
dfs(neighbor)
order.append(node)
从输出节点开始,进行反向遍历
为了简化,我们这里假设直接知道所有的节点并进行反向遍历
实际会从损失节点开始遍历
all_nodes_list = list(self.nodes.values())
反转列表,模拟从输出往输入反向执行
for node in reversed(all_nodes_list):
if node not in visited:
dfs(node)
return order
```
现在,我们来模拟一个简单的训练过程。假设我们的目标是让 `output = a + b c w` 这个表达式的结果接近某个目标值 `y_true`。我们定义一个损失函数,比如均方误差 (MSE): `loss = (output y_true)^2`。
```python
重新构建图,加入损失函数
graph = Graph()
输入节点
a = Node(op='placeholder', inputs=[], name='a')
b = Node(op='placeholder', inputs=[], name='b')
c = Node(op='placeholder', inputs=[], name='c')
y_true = Node(op='placeholder', inputs=[], name='y_true')
变量节点 (需要训练的参数)
w = Node(op='variable', inputs=[], name='w')
w.value = np.array([[2.0]]) 初始权重
计算图
mul_bc = Node(op='', inputs=[b, c], name='mul_bc')
mul_bcw = Node(op='', inputs=[mul_bc, w], name='mul_bcw') 注意这里乘法顺序,与之前不同
add_final = Node(op='+', inputs=[a, mul_bcw], name='final_output')
损失函数 (MSE)
diff = Node(op='+', inputs=[add_final, Node(op='', inputs=[y_true, Node(op='variable', inputs=[], name='minus_one', value=1.0)])] , name='difference') (output y_true) = output + (1 y_true)
squared_diff = Node(op='square', inputs=[diff], name='squared_difference')
loss = Node(op='+', inputs=[squared_diff, Node(op='variable', inputs=[], name='zero_loss', value=0.0)], name='loss') Simplification: loss = squared_diff
添加节点到图
graph.add_node(a)
graph.add_node(b)
graph.add_node(c)
graph.add_node(y_true)
graph.add_node(w)
graph.add_node(mul_bc)
graph.add_node(mul_bcw)
graph.add_node(add_final)
graph.add_node(diff)
graph.add_node(squared_diff)
graph.add_node(loss)
辅助节点
graph.add_node(Node(op='variable', inputs=[], name='minus_one', value=1.0))
graph.add_node(Node(op='variable', inputs=[], name='zero_loss', value=0.0))
模拟训练数据
input_data = {
'a': np.array([1.0]),
'b': np.array([2.0]),
'c': np.array([3.0]),
'y_true': np.array([10.0]) 目标值
}
训练过程
learning_rate = 0.01
num_epochs = 100
for epoch in range(num_epochs):
1. 前向传播,计算输出和损失
results = graph.execute(output_node_names=['final_output', 'loss'], input_values=input_data)
current_loss = results['loss'][0]
2. 反向传播,计算梯度
我们需要指定损失节点以及从哪个输出节点开始计算
graph.backward('final_output', 'loss') 指定输出节点和损失节点
3. 更新变量 (只更新 'w')
找到 'w' 节点,并根据梯度更新其值
w_node = graph.get_node('w')
if w_node and w_node.grad is not None:
w_node.value = w_node.value learning_rate w_node.grad
w_node.grad = None 清空本次的梯度,为下次计算做准备
if epoch % 10 == 0:
print(f"Epoch {epoch}, Loss: {current_loss:.4f}, w: {w_node.value}")
最终权重
print(f"
Final w: {w_node.value}")
```
思考一下我们做了什么:
1. 计算图构建: 我们像搭建积木一样,将每个操作和数据定义为图的节点和边。
2. 前向传播 (`execute`): 按照图的依赖关系,从输入到输出计算出数值。
3. 反向传播 (`backward`):
定义了每个操作符的梯度计算规则(例如,乘法的梯度是根据另一个输入乘以当前梯度)。
从损失函数开始,利用链式法则,将损失的梯度层层传递回输入,计算出每个节点相对于损失的梯度。
最重要的是,我们将梯度累加到 叶子节点(我们称之为 变量 或 参数),因为这些是我们希望通过训练来更新的。
4. 参数更新: 使用计算出的梯度和学习率,调整变量的值,以减小损失。
第三步:进一步的抽象和优化
我们上面实现的只是一个非常简化的骨架。一个成熟的深度学习框架会包含更多复杂的功能和优化:
自动图构建: 你不需要手动创建 `Node` 和 `Graph` 对象。像 Keras 或 PyTorch 那样,你可以直接编写数学表达式,框架会自动构建计算图。例如,`output = tf.add(a, tf.matmul(b, c))`。
`tf.Tensor` 对象: 框架的核心数据结构,它不仅仅是一个 NumPy 数组,还包含了关于它在计算图中位置、依赖关系等元信息。
操作符注册: 将大量的数学操作(如卷积 `conv2d`、激活函数 `relu`、损失函数 `mse` 等)注册到框架中,并为它们提供高效的梯度计算方法。
设备管理: TensorFlow 能够将计算分配到 CPU 或 GPU 上运行,这需要底层的 CUDA/cuDNN 集成。
优化器 (`Optimizer`): 将梯度下降、Adam、RMSprop 等优化算法抽象成独立的类,方便用户选择和使用。
自动内存管理: 高效地分配和释放张量占用的内存。
图优化: 在执行前对计算图进行各种优化,如操作融合、死代码消除等,以提高执行效率。
会话 (`Session` 在 TensorFlow 1.x 中): 负责管理计算图的执行和变量值的维护。在 TensorFlow 2.x 中,引入了即时执行 (Eager Execution),使得执行更加直观。
如何“使用”你的自制 TensorFlow 进行训练
在你构建的这个基础上,你可以模拟一个更完整的训练循环:
1. 定义模型: 构建计算图,包含输入、权重(`variable` 节点)、计算层和输出。
2. 定义损失函数: 构建一个计算损失的节点,通常依赖于模型的输出和真实标签。
3. 初始化: 为所有 `variable` 节点赋予初始值。
4. 训练循环:
前向传播: 执行计算图,计算模型的输出和损失。
反向传播: 调用 `backward` 方法,计算所有 `variable` 节点的梯度。
更新参数: 使用一个优化器(例如,我们在这里简单实现的梯度下降)和学习率来更新 `variable` 节点的 `value`。
重复: 对多批数据或多个 epoch 重复以上过程。
总结
通过亲手构建这个简化的计算图和自动微分系统,你应该对 TensorFlow 的核心工作原理有了更深刻的理解。这一切都围绕着 将计算表示为图 和 利用链式法则进行自动微分。
当然,这只是冰山一角。真正的 TensorFlow 是一个庞大而复杂的系统,但理解了这些基础概念,你就能更好地欣赏和利用现有的深度学习框架,甚至在需要时进行定制和扩展。
希望这次“亲手打造”的体验,能让你对神经网络的学习和训练过程有更直观的认识!