前言
在了解深度学习框架之前,我们需要自己去理解甚至去实现一个网络学习和调参的过程,进而理解深度学习的机理;
为此,博主这里提供了一个自己编写的一个例子,带领大家理解一下网络学习的正向传播和反向传播的过程;
除此之外,为了实现batch读取,我还设计并提供了一个简单的DataLoader类去模拟深度学习中数据迭代器的取样;并且提供了存取模型的函数;
值得注意的是仅仅使用python实现,因此对于环境的需求不是很大,希望各位可以多多star我的博客和github,学习到更有用的知识!!
目录
一、实现效果
二、整体代码框架
三、详细代码说明
1.数据处理
2.网络设计
3.激活函数
4.训练
四、训练演示
五、总结
一、实现效果
实现一个由多个Linear层构成的网络来拟合函数,项目地址:https://github.com/nickhuang1996/HJLNet,运行:
python demo.py
拟合函数为:
以下结果从左到右依次为(学习率为0.03,batchsize为90):
Epoch:400,1000, 2000, 10000以上
二、整体代码框架
三、详细代码说明
1.数据处理
Dataset.py
x****是0到2之间的数据,步长为0.01,因此是200个数据;
y****是目标函数,振幅为20;
length****是数据长度;
_build_items()是建立一个dict存储x和y;
_transform()是对x和y进行数据的变换;
import numpy as np
class Dataset:
def __init__(self):
self.x = np.arange(0.0, 2.0, 0.01)
self.y = 20 * np.sin(2 * np.pi * self.x)
self.length = len(list(self.x))
self._build_items()
self._transform()
def _build_items(self):
self.items = [{
'x': list(self.x)[i],
'y': list(self.y)[i]
}for i in range(self.length)]
def _transform(self):
self.x = self.x.reshape(1, self.__len__())
self.y = self.y.reshape(1, self.__len__())
def __len__(self):
return self.length
def __getitem__(self, index):
return self.items[index]
DataLoader.py
类似于Pytorch里的DataLoader,博主这里初始化也传入两个参数:dataset和batch_size
__next__()就是每次迭代执行的函数,利用__len__()得到dataset的长度,利用__getitem__()得到数据集里的数据;
_concate()就是把一个batch的数据拼接起来;
_transform()就是转换一个batch的数据形式;
import numpy as np
class DataLoader:
def __init__(self, dataset, batch_size):
self.dataset = dataset
self.batch_size = batch_size
self.current = 0
def __next__(self):
if self.current < self.dataset.__len__():
if self.current + self.batch_size <= self.dataset.__len__():
item = self._concate([self.dataset.__getitem__(index) for index in range(self.current, self.current + self.batch_size)])
self.current += self.batch_size
else:
item = self._concate([self.dataset.__getitem__(index) for index in range(self.current, self.dataset.__len__())])
self.current = self.dataset.__len__()
return item
else:
self.current = 0
raise StopIteration
def _concate(self, dataset_items):
concated_item = {}
for item in dataset_items:
for k, v in item.items():
if k not in concated_item:
concated_item[k] = [v]
else:
concated_item[k].append(v)
concated_item = self._transform(concated_item)
return concated_item
def _transform(self, concated_item):
for k, v in concated_item.items():
concated_item[k] = np.array(v).reshape(1, len(v))
return concated_item
def __iter__(self):
return self
2.网络设计
Linear.py
类似于Pytorch里的Linear,博主这里初始化也传入三个参数:in_features, out_features, bias
*_init_parameters()是初始化权重weight和偏置bias,weight大小是[out_features, in_features],bias大小是[out_features, 1]*
forward就是前向传播:
import numpy as np
class Linear:
def __init__(self, in_features, out_features, bias=False):
self.in_features = in_features
self.out_features = out_features
self.bias = bias
self._init_parameters()
def _init_parameters(self):
self.weight = np.random.random([self.out_features, self.in_features])
if self.bias:
self.bias = np.zeros([self.out_features, 1])
else:
self.bias = None
def forward(self, input):
return self.weight.dot(input) + self.bias
*network.py
一个简单的多层Linear网络
_init_parameters()是把Linear层里的权重和偏执都放在一个dict里存储;
forward()就是前向传播,最后一层不经过Sigmoid;
backward()就是反向传播,利用梯度下降实现误差传递和调参:例如一个两层的Linear层的反向传播如下
update_grads()是更新权重和偏置;
# -*- coding: UTF-8 -*-
import numpy as np
from ..lib.Activation.Sigmoid import sigmoid_derivative, sigmoid
from ..lib.Module.Linear import Linear
class network:
def __init__(self, layers_dim):
self.layers_dim = layers_dim
self.linear_list = [Linear(layers_dim[i - 1], layers_dim[i], bias=True) for i in range(1, len(layers_dim))]
self.parameters = {}
self._init_parameters()
def _init_parameters(self):
for i in range(len(self.layers_dim) - 1):
self.parameters["w" + str(i)] = self.linear_list[i].weight
self.parameters["b" + str(i)] = self.linear_list[i].bias
def forward(self, x):
a = []
z = []
caches = {}
a.append(x)
z.append(x)
layers = len(self.parameters) // 2
for i in range(layers):
z_temp = self.linear_list[i].forward(a[i])
self.parameters["w" + str(i)] = self.linear_list[i].weight
self.parameters["b" + str(i)] = self.linear_list[i].bias
z.append(z_temp)
if i == layers - 1:
a.append(z_temp)
else:
a.append(sigmoid(z_temp))
caches["z"] = z
caches["a"] = a
return caches, a[layers]
def backward(self, caches, output, y):
layers = len(self.parameters) // 2
grads = {}
m = y.shape[1]
for i in reversed(range(layers)):
# 假设最后一层不经历激活函数
# 就是按照上面的图片中的公式写的
if i == layers - 1:
grads["dz" + str(i)] = output - y
else: # 前面全部都是sigmoid激活
grads["dz" + str(i)] = self.parameters["w" + str(i + 1)].T.dot(
grads["dz" + str(i + 1)]) * sigmoid_derivative(
caches["z"][i + 1])
grads["dw" + str(i)] = grads["dz" + str(i)].dot(caches["a"][i].T) / m
grads["db" + str(i)] = np.sum(grads["dz" + str(i)], axis=1, keepdims=True) / m
return grads
# 就是把其所有的权重以及偏执都更新一下
def update_grads(self, grads, learning_rate):
layers = len(self.parameters) // 2
for i in range(layers):
self.parameters["w" + str(i)] -= learning_rate * grads["dw" + str(i)]
self.parameters["b" + str(i)] -= learning_rate * grads["db" + str(i)]
3.激活函数
Sigmoid.py
公式定义:
导数可由自身表示:
import numpy as np
def sigmoid(x):
return 1.0 / (1.0 + np.exp(-x))
def sigmoid_derivative(x):
return sigmoid(x) * (1 - sigmoid(x))
4.训练
demo.py
训练模型的入口文件,包含训练、测试和****存储模型
from code.scripts.trainer import Trainer
from code.config.default_config import _C
if __name__ == '__main__':
trainer = Trainer(cfg=_C)
trainer.train()
trainer.test()
trainer.save_models()
default_config.py
配置文件****:
layers_dim****代表Linear层的输入输出维度;
batch_size****是batch的大小;
total_epochs****是总体的训练时间,训练一次x为一个epoch;
resume****是判断继续训练;
result_img_path****是结果存储的路径;
ckpt_path****是模型存储的路径;
from easydict import EasyDict
_C = EasyDict()
_C.layers_dim = [1, 25, 1] # [1, 30, 10, 1]
_C.batch_size = 90
_C.total_epochs = 40000
_C.resume = True # False means retraining
_C.result_img_path = "D:/project/Pycharm/HJLNet/result.png"
_C.ckpt_path = 'D:/project/Pycharm/HJLNet/ckpt.npy'
trainer.py
*这里不多赘述,主要利用***train()这个函数进行训练,test()**进行测试
from ..lib.Data.DataLoader import DataLoader
from ..scripts.Dataset import Dataset
from ..scripts.network import network
import matplotlib.pyplot as plt
import numpy as np
class Trainer:
def __init__(self, cfg):
self.ckpt_path = cfg.ckpt_path
self.result_img_path = cfg.result_img_path
self.layers_dim = cfg.layers_dim
self.net = network(self.layers_dim)
if cfg.resume:
self.load_models()
self.dataset = Dataset()
self.dataloader = DataLoader(dataset=self.dataset, batch_size=cfg.batch_size)
self.total_epochs = cfg.total_epochs
self.iterations = 0
self.x = self.dataset.x
self.y = self.dataset.y
self.draw_data(self.x, self.y)
def train(self):
for i in range(self.total_epochs):
for item in self.dataloader:
caches, output = self.net.forward(item['x'])
grads = self.net.backward(caches, output, item['y'])
self.net.update_grads(grads, learning_rate=0.03)
if i % 100 == 0:
print("Epoch: {}/{} Iteration: {} Loss: {}".format(i + 1,
self.total_epochs,
self.iterations,
self.compute_loss(output, item['y'])))
self.iterations += 1
def test(self):
caches, output = self.net.forward(self.x)
self.draw_data(self.x, output)
self.save_results()
self.show()
def save_models(self):
ckpt = {
"layers_dim": self.net.layers_dim,
"parameters": self.net.linear_list
}
np.save(self.ckpt_path, ckpt)
print('Save models finish!!')
def load_models(self):
ckpt = np.load(self.ckpt_path).item()
self.net.layers_dim = ckpt["layers_dim"]
self.net.linear_list = ckpt["parameters"]
print('load models finish!!')
def draw_data(self, x, y):
plt.scatter(x, y)
def show(self):
plt.show()
def save_results(self):
plt.savefig(fname=self.result_img_path, figsize=[10, 10])
# 计算误差值
def compute_loss(self, output, y):
return np.mean(np.square(output - y))
四、训练演示
训练期间会输出训练的时间,迭代次数和损失变化,训练结束存储模型和结果。
1.开始训练
2.训练完毕,读取上次的模型继续训练
3.结果展示
五、总结
如此一来便知晓了一个基本网络训练过程中正向反向传播过程,之后会更新更加详细的代码和原理,帮助各位学习深度学习的知识和概念~