Motivation
深度学习模型为什么要量化
模型量化是深度学习Inference加速的关键技术之一, 一般训练之后得到的模型采用float32 (fp32)格式存储, 由于FP32 bit位数宽, 而且浮点计算对硬件资源消耗高,造成内存带宽,模型吞吐率瓶颈。 量化(Quantization) 是解决FP32 的模型在内存带宽消耗,推理速度的主要技术之一, 其采用定点(fixed point)或者整形数据(INT8)代替FP32类型, Hardware friendly。
如何学习和掌握量化技术
模型量化涉及很多概念和算法,比如 对称量化/非对称量化, 线性量化/非线性量化 等等, 了解这些基础概念之后我们最好结合实践这样才能更好的理解和加深记忆。
本文以PyTorch提供的Quantization例子为例,分析简单的模型量化过程。
Requirements
- Ubuntu 20.04
- PyTorch 1.11.0
量化的基本原理
量化本质就是一个映射, 将一组浮点数映射为一组整形(例如INT8)的过程
- 量化: FP32 ---> INT8
- 反量化: INT8 ---> FP32
# 量化
xq = round(x_fp32 / scale + zero_point)
# 反量化
xqd = (xq - zero_point) * scale
量化参数scale, zero_point影响量化的精度
直接根据上面的公式验证一下:
上例子: 对一个FP32的数据进行量化,观察输出结果
import torch
import numpy as np
import random
import matplotlib.pyplot as plt
import os
seed = 1234
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
# 量化的基本原理
'''
# 量化
xq = round(x_fp32 / scale + zero_point)
# 反量化
xqd = (xq - zero_point) * scale
量化参数scale, zero_point影响量化的精度
'''
x = torch.rand(3, 4, dtype=torch.float32)
scale = 0.3
zero_point = 8
xq = torch.round((x / scale + zero_point))
xqd = (xq - zero_point) * scale
print('量化前的FP32 Tensor', x)
print('量化之后的INT8 Tensor', xq)
print('反量化之后的FP32 Tensor', xqd)
plt.stem(x.flatten().numpy())
# plt.subplot(1, 2, 2)
plt.stem(xqd.flatten().float().numpy())
plt.show()
由于量化参数 scale, zero_point 是随意选取的,因此反量化之后和原始的FP32数据存在一定的误差:
采用PyTorch 提供的量化工具
PyTorch 提供了对Tensor 进行量化的API函数:
- 逐Tensor/Layer量化(Tensor/Channel wise Quantization):
quantize_per_tensor(input: Tensor, scale: Tensor, zero_point: Tensor, dtype: _dtype) -> Tensor:
- 逐Channel量化 (Channel wise Quantization):
quantize_per_channel(input: Tensor, scales: Tensor, zero_points: Tensor, axis: _int, dtype: _dtype) -> Tensor:
两种量化的区别: Tensor-wise 和Channel-wise Quantization的主要区别是量化的粒度
- Tensor-wise: 粒度粗, 量化误差相对大, 每个Tensor只有一个scale, zero_point 参数
- Chanel-wise: 粒度细, 量化误差相对小, Tensor的每个Channel都有独自的scale, zp参数
Tensor-wise Quantization
# Tensor-wise 量化
def quantize_per_tensor():
x = torch.rand(3, 4, dtype=torch.float32)
# quantize, 量化参数: scale + zp 保存在了QuantizeTensor
xq = torch.quantize_per_tensor(x, scale=0.3, zero_point=8, dtype=torch.qint8)
# 反量化
xqd = torch.dequantize(xq)
print('量化前的FP32 Tensor', x)
print('量化之后的INT8 Tensor', xq)
print('反量化之后的FP32 Tensor', xqd)
# compare the x, xq
# plt.subplot(1, 2, 1)
plt.stem(x.flatten().numpy())
# plt.subplot(1, 2, 2)
plt.stem(xqd.flatten().float().numpy())
plt.show()
quantize_per_tensor()
Channel-wise Quantization
# Channel-wise量化
def quantize_per_channel():
x = torch.rand(3, 4, dtype=torch.float32)
# quantize, 量化参数: scale + zp 保存在了QuantizeTensor
xq = torch.quantize_per_channel(x, scales=torch.tensor([0.3, 0.5, 0.4]),
zero_points=torch.tensor([8, 9, 10]),
axis=0, dtype=torch.qint8)
# 反量化
xqd = torch.dequantize(xq)
print('量化前的FP32 Tensor', x)
print('量化之后的INT8 Tensor', xq)
print('反量化之后的FP32 Tensor', xqd)
# compare the x, xq
# plt.subplot(1, 2, 1)
plt.stem(x.flatten().numpy())
# plt.subplot(1, 2, 2)
plt.stem(xqd.flatten().float().numpy())
plt.show()
quantize_per_channel()
对实际的CNN模型进行量化
上面的例子是对单个Tensor进行量化,方便掌握量化的思想; 下面采用实际的MobileNet-V2模型进行量化.
量化前的准备
ImageNet-2012 数据集准备, 包含ImageNet-train, ImageNet-Val数据集, 在实际的量化中需要进行标定(calibration) 以及精度测试, 因此需要这两个数据集
- ImageNet-train: 训练集,总共120w+ 图像, 量化的标定过程会从imageNet-train中随机选择小部分数据
- ImageNet-val: 验证集, 包含5w张图像, 用于测试量化前后模型精度(Top1 Acc) 的变化情况
MobileNet 量化实现
总体方法介绍:
对于一个CNN/RNN/Transformer等模型, 一般需要量化的对象包含2个部分:
- weight: 即模型训练之后的权重参数, 模型训练好之后这部分是固定的, 因此weight可以直接量化
- featureMap/Activation: 和输入数据有关, Activation指的是模型运行期间的中间数据Intermediate data,如果需要对Activation进行量化,则需要采用一部分代表性的输入样本产生中间数据,通过对中间数据进行观察量化,这个过程成为标定Calibration
根据Activation是否需要离线量化,量化方法可分为2种:
- 动态量化 Dynamic Quantization: Activation的量化参数是在模型运行时动态确定的,这种方式得到的量化参数比较准确,缺点也很明显: 运行效率低,加速效果不好
- 静态量化 Static Quantization: 即采用离线标定的方法实现计算Activation的量化参数
下面的示例代码采用Static Quantization:
-
工程结构:
Utils: 一些PyTorch样板代码, 直接复用即可
utils.py
import os
import torch
import torchvision
import torchvision.transforms as transforms
# Helper functions
# 用于ImageNet 验证
class AverageMeter(object):
def __init__(self, name, fmt=':f') -> None:
self.name = name
self.fmt = fmt
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
def __str__(self):
fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})'
return fmtstr.format(**self.__dict__)
def accuracy(output, target, topk=(1,)):
with torch.no_grad():
maxk = max(topk)
batch_size = target.size(0)
_, pred = output.topk(maxk, 1, True, True)
pred = pred.t()
correct = pred.eq(target.view(1, -1).expand_as(pred))
res = []
for k in topk:
correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
res.append(correct_k.mul_(100.0 / batch_size))
return res
def evaluate(model, criterion, data_loader, neval_batches, n_print=5):
model.eval()
top1 = AverageMeter('Acc@1', ':6.2f')
top5 = AverageMeter('Acc@5', ':6.2f')
cnt = 0
with torch.no_grad():
for image, target in data_loader:
output = model(image)
# loss = criterion(output, target)
cnt += 1
acc1, acc5 = accuracy(output, target, topk=(1, 5))
print('.', end = '')
top1.update(acc1[0], n=image.size(0))
top5.update(acc5[0], n=image.size(0))
if cnt % n_print == 0:
print(f'{top1}, {top5}')
if cnt >= neval_batches:
return top1, top5
return top1, top5
def load_model(model, model_weight_file):
state_dict = torch.load(model_weight_file)
model.load_state_dict(state_dict)
model.to('cpu')
return model
def print_size_of_model(model):
torch.save(model.state_dict(), "temp.p")
print('Size (MB):', os.path.getsize("temp.p")/1e6)
os.remove('temp.p')
def prepare_data_loaders(data_path, train_batch_size, eval_batch_size):
'''
准备ImageNet数据集, 包含train_set, val_set
'''
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
dataset_train = torchvision.datasets.ImageNet(
data_path, split='train',
transform=transforms.Compose(
[
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
normalize
]))
dataset_test = torchvision.datasets.ImageNet(
data_path, split='val',
transform=transforms.Compose(
[
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
normalize
]))
train_sampler = torch.utils.data.RandomSampler(dataset_train)
test_sampler = torch.utils.data.SequentialSampler(dataset_test)
data_loader_train = torch.utils.data.DataLoader(
dataset_train, batch_size=train_batch_size,
sampler=train_sampler
)
data_loader_test = torch.utils.data.DataLoader(
dataset_test, batch_size=eval_batch_size,
num_workers=8, sampler=test_sampler
)
return data_loader_train, data_loader_test
- MobileNet-V2模型
MV2模型基本上和torchvision中提供的模型写法相同,只是个别地方的写法需要修改,为量化进行适配
- Replace torch.add with floatfunctional, 为量化修改:
self.skip_add = nn.quantized.FloatFunctional()
- QuantStub(), DeQuantStub(): 相当于插桩, 告诉量化算法对模型的哪些区域进行数据观察以及量化
model.py
from statistics import mode
import numpy as np
import os
import matplotlib.pyplot as plt
import random
import torch
from torch import nn
import torchvision
from torch.quantization import QuantStub, DeQuantStub
# model = torchvision.models.mobilenet_v2(pretrained=True, progress=True)
# # Setup warnings
import warnings
warnings.filterwarnings(
action='ignore',
category=DeprecationWarning,
module=r'.*'
)
warnings.filterwarnings(
action='default',
module=r'torch.quantization'
)
# https://pytorch.org/tutorials/advanced/static_quantization_tutorial.html
seed = 1234
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
# Define MobileNet
# 需要对MobileNetV2模型进行小修改
# Replacing addition with nn.quantized.FloatFunctional
# Insert QuantStub and DeQuantStub at the beginning and end of the network.
# Replace ReLU6 with ReLU
def _make_divisible(v, divisor, min_value=None):
"""
This function is taken from the original tf repo.
It ensures that all layers have a channel number that is divisible by 8
It can be seen here:
https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
:param v:
:param divisor:
:param min_value:
:return:
"""
if min_value is None:
min_value = divisor
new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
# Make sure that round down does not go down by more than 10%.
if new_v < 0.9 * v:
new_v += divisor
return new_v
class ConvBNReLU(nn.Sequential):
def __init__(self, in_planes, out_planes, kernel_size=3, stride=1, groups=1):
padding = (kernel_size - 1) // 2
super(ConvBNReLU, self).__init__(
nn.Conv2d(in_planes, out_planes, kernel_size, stride, padding, groups=groups, bias=False),
nn.BatchNorm2d(out_planes, momentum=0.1),
# Replace with ReLU
nn.ReLU(inplace=False)
)
class InvertedResidual(nn.Module):
def __init__(self, inp, oup, stride, expand_ratio):
super(InvertedResidual, self).__init__()
self.stride = stride
assert stride in [1, 2]
hidden_dim = int(round(inp * expand_ratio))
self.use_res_connect = self.stride == 1 and inp == oup
layers = []
if expand_ratio != 1:
# pw
layers.append(ConvBNReLU(inp, hidden_dim, kernel_size=1))
layers.extend([
# dw
ConvBNReLU(hidden_dim, hidden_dim, stride=stride, groups=hidden_dim),
# pw-linear
nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
nn.BatchNorm2d(oup, momentum=0.1),
])
self.conv = nn.Sequential(*layers)
# Replace torch.add with floatfunctional, 为量化修改
self.skip_add = nn.quantized.FloatFunctional()
def forward(self, x):
if self.use_res_connect:
return self.skip_add.add(x, self.conv(x)) # 为量化修改
else:
return self.conv(x)
class MobileNetV2(nn.Module):
def __init__(self, num_classes=1000, width_mult=1.0, inverted_residual_setting=None, round_nearest=8):
"""
MobileNet V2 main class
Args:
num_classes (int): Number of classes
width_mult (float): Width multiplier - adjusts number of channels in each layer by this amount
inverted_residual_setting: Network structure
round_nearest (int): Round the number of channels in each layer to be a multiple of this number
Set to 1 to turn off rounding
"""
super(MobileNetV2, self).__init__()
block = InvertedResidual
input_channel = 32
last_channel = 1280
if inverted_residual_setting is None:
inverted_residual_setting = [
# t, c, n, s
[1, 16, 1, 1],
[6, 24, 2, 2],
[6, 32, 3, 2],
[6, 64, 4, 2],
[6, 96, 3, 1],
[6, 160, 3, 2],
[6, 320, 1, 1],
]
# only check the first element, assuming user knows t,c,n,s are required
if len(inverted_residual_setting) == 0 or len(inverted_residual_setting[0]) != 4:
raise ValueError("inverted_residual_setting should be non-empty "
"or a 4-element list, got {}".format(inverted_residual_setting))
# building first layer
input_channel = _make_divisible(input_channel * width_mult, round_nearest)
self.last_channel = _make_divisible(last_channel * max(1.0, width_mult), round_nearest)
features = [ConvBNReLU(3, input_channel, stride=2)]
# building inverted residual blocks
for t, c, n, s in inverted_residual_setting:
output_channel = _make_divisible(c * width_mult, round_nearest)
for i in range(n):
stride = s if i == 0 else 1
features.append(block(input_channel, output_channel, stride, expand_ratio=t))
input_channel = output_channel
# building last several layers
features.append(ConvBNReLU(input_channel, self.last_channel, kernel_size=1))
# make it nn.Sequential
self.features = nn.Sequential(*features)
self.quant = QuantStub()
self.dequant = DeQuantStub()
# building classifier
self.classifier = nn.Sequential(
nn.Dropout(0.2),
nn.Linear(self.last_channel, num_classes),
)
# weight initialization
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out')
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, nn.BatchNorm2d):
nn.init.ones_(m.weight)
nn.init.zeros_(m.bias)
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight, 0, 0.01)
nn.init.zeros_(m.bias)
def forward(self, x):
x = self.quant(x)
x = self.features(x)
x = x.mean([2, 3])
x = self.classifier(x)
x = self.dequant(x)
return x
# 模型Fuse优化
def fuse_model(self):
for m in self.modules():
if type(m) == ConvBNReLU:
torch.quantization.fuse_modules(m, ['0', '1', '2'], inplace=True)
if type(m) == InvertedResidual:
for idx in range(len(m.conv)):
if type(m.conv[idx]) == nn.Conv2d:
torch.quantization.fuse_modules(m.conv, [str(idx), str(idx + 1)], inplace=True)
if __name__ == '__main__':
net = MobileNetV2()
net.eval()
print(net)
inputs = torch.rand(1, 3, 224, 224)
outputs = net(inputs)
print(outputs.size())
- 量化测试
分别测试3种情况:
- 量化前模型的精度, baseline accuracy
- 采用layer-wise 进行量化, 得到模型的量化后大小以及精度
- 采用Channel-wise进行量化, 也得到量化后模型尺寸以及精度
baseline FP32模型精度测试
由于只是演示,而且笔者在CPU上跑模型,因此只测试部分的样本。
预训练模型从torchvision下载, 之后重命名为 mobilenet_pretrained_float.pth
代码片段:
data_path = '/media/wei/Development/Datasets/imagenet'
saved_model_dir = 'data/'
float_model_file = 'mobilenet_pretrained_float.pth'
scripted_float_model_file = 'mobilenet_quantization_scripted.pth'
scripted_quantized_model_file = 'mobilenet_quantization_scripted_quantized.pth'
train_batch_size = 30
eval_batch_size = 512
num_eval_batches = 10
def evaluate_baseline():
net = MobileNetV2()
net.eval()
float_model = load_model(net, saved_model_dir + float_model_file).to('cpu')
print('\n Inverted Residual Block: Before fusion \n\n', float_model.features[1].conv)
float_model.eval()
# Fuses modules, Conv, BN, RelU算子融合
float_model.fuse_model()
# Note fusion of Conv+BN+Relu and Conv+Relu
print('\n Inverted Residual Block: After fusion\n\n',float_model.features[1].conv)
print("Size of baseline model")
print_size_of_model(float_model)
# ImageNet
_, data_loader_test = prepare_data_loaders(data_path=data_path, train_batch_size=train_batch_size, eval_batch_size=eval_batch_size)
# Start to get baseline Accuracy
top1, top5 = evaluate(float_model, None, data_loader_test, neval_batches=num_eval_batches, n_print=1)
print('Evaluation accuracy on %d images, %2.2f'%(num_eval_batches * eval_batch_size, top1.avg))
torch.jit.save(torch.jit.script(float_model), saved_model_dir + scripted_float_model_file)
Baseline模型的Top-1 Acc结果: Acc@Top1 = 78.57%
Layer-wise量化方法测试
需要设置量化方法: model.qconfig = torch.quantization.default_qconfig
查看一下default_qconfig的具体方法:
default_qconfig = QConfig(activation=default_observer,
weight=default_weight_observer)
- 对于Activation的量化方法: MinMax量化, 量化之后的值为0~127
default_observer = MinMaxObserver.with_args(quant_min=0, quant_max=127) - 对于Weight的量化方法: MinMax量化, 量化为INT8, 对称量化
default_weight_observer = MinMaxObserver.with_args(
dtype=torch.qint8, qscheme=torch.per_tensor_symmetric
)
关于 MinMaxObserver
: 统计一个Tensor中的最大和最小值, 因此量化方法为Tensor-wise或者称为Layer-wise
class MinMaxObserver(_ObserverBase):
r"""Observer module for computing the quantization parameters based on the
running min and max values.
This observer uses the tensor min/max statistics to compute the quantization
parameters. The module records the running minimum and maximum of incoming
tensors, and uses this statistic to compute the quantization parameters.
具体的测试代码:
def per_layer_quantize():
num_calibration_batches = 32
net = MobileNetV2()
net.eval()
model = load_model(net, saved_model_dir + float_model_file).to('cpu')
model.eval()
# Fuses modules, Conv, BN, RelU算子融合
model.fuse_model()
# 设置量化配置,方法
# MinMax observer, per-tensor quantize of weight
model.qconfig = torch.quantization.default_qconfig
print(model.qconfig)
# model will be attached with qconfig
torch.quantization.prepare(model, inplace=True)
# Calibrate
# 标定应该采用训练集的一部分
data_loader_train, data_loader_test = prepare_data_loaders(data_path=data_path, eval_batch_size=eval_batch_size, train_batch_size=train_batch_size)
# 会进行标定数据统计
evaluate(model, criterion=None, data_loader=data_loader_train, neval_batches=num_calibration_batches)
print('Calibration done...')
# Convert to quantized model, Int8
torch.quantization.convert(model, inplace=True)
print('Quantize done')
print('Size of quantized model')
print_size_of_model(model)
# test accuracy
top1, top5 = evaluate(model, None, data_loader_test, neval_batches=num_eval_batches, n_print=1)
print('Evaluation accuracy on %d images, %2.2f'%(num_eval_batches * eval_batch_size, top1.avg))
运行结果:
Acc@top1 = 65.37%, 比baseline低很多,精度丢失太严重!!!
Channel-wise 量化方法测试
由于Layer-wise量化之后模型的Top1 Acc下降太严重,因此需要更换量化方式。 Channel-wise是一种比layer-wise Quantization粒度更细的算法, 为Tensor的每个通道分别计算各自的量化参数,因此这个方法的精度预期比Layer-wise的高。
Channel-wise量化的实现:
在PyTorch中为了支持不同的量化算法,Pytorch设置的不同的后端backend:
- 针对x86 CPU: fbgemm backend
- ARM CPU: qnnpack backend
暂时未发现对GPU的支持,不过对于NVIDIA GPU, NVIDIA 开发的TensorRT应该是支持的量化后端。
fbgemm backend:
def get_default_qconfig(backend='fbgemm'):
"""
Returns the default PTQ qconfig for the specified backend.
Args:
* `backend`: a string representing the target backend. Currently supports `fbgemm`
and `qnnpack`.
Return:
qconfig
"""
if backend == 'fbgemm':
qconfig = QConfig(activation=HistogramObserver.with_args(reduce_range=True),
weight=default_per_channel_weight_observer)
elif backend == 'qnnpack':
qconfig = QConfig(activation=HistogramObserver.with_args(reduce_range=False),
weight=default_weight_observer)
else:
qconfig = default_qconfig
return qconfig
channel-wise的测试代码:
def per_channel_quantize():
num_calibration_batches = 32
net = MobileNetV2()
net.eval()
model = load_model(net, saved_model_dir + float_model_file).to('cpu')
model.eval()
# Fuses modules, Conv, BN, RelU算子融合
model.fuse_model()
# 设置channel-wise 量化
# qconfig = QConfig(activation=HistogramObserver.with_args(reduce_range=True),
# weight=default_per_channel_weight_observer)
model.qconfig = torch.quantization.get_default_qconfig(backend='fbgemm')
print('量化设置:', model.qconfig)
torch.quantization.prepare(model, inplace=True)
# Calibrate
# 标定应该采用训练集的一部分
data_loader_train, data_loader_test = prepare_data_loaders(data_path=data_path, eval_batch_size=eval_batch_size, train_batch_size=train_batch_size)
# 会进行标定数据统计
evaluate(model, criterion=None, data_loader=data_loader_train, neval_batches=num_calibration_batches)
print('Calibration done...')
# Convert to quantized model, Int8
torch.quantization.convert(model, inplace=True)
print('Quantize done')
print('Size of quantized model')
print_size_of_model(model)
# test accuracy
top1, top5 = evaluate(model, None, data_loader_test, neval_batches=num_eval_batches, n_print=1)
print('Evaluation accuracy on %d images, %2.2f'%(num_eval_batches * eval_batch_size, top1.avg))
量化结果:
- 模型大小下降的大约3~4倍: baseline FP32 13.99MB ---> 3.94MB
- 量化精度: baseline FP32=78.57%, layer-wise=65.37%, channel-wise=74.67%; 很显然channel-wise的精度比layer-wise高很多。
量化前后模型推理速度比较
CPU端测试, batch_size=4
- FP32 量化前的模型: 46ms
- INT8量化后的模型: 15ms, 推断耗时仅为原来的1/3,加速效果非常可观
def speed_test():
# Original FP32 model
net = MobileNetV2()
net.eval()
model_fp32 = load_model(net, saved_model_dir + float_model_file).to('cpu')
model_fp32.eval()
# Fuses modules, Conv, BN, RelU算子融合
model_fp32.fuse_model()
# Int8 Quantized model
model_quantized = per_channel_quantize(evaluate_acc=False)
def inference_with_time_ms(BS=4, model=None):
n = 10
total_time = 0.0
inputs = torch.rand(BS, 3, 224, 224)
for i in range(n+2):
t1 = time.time()
model(inputs)
torch.cuda.synchronize()
t2 = time.time()
if i > 1:
total_time += (t2 - t1)
return total_time / n * 1000
print('Start speed test...')
fp32_avg_time = inference_with_time_ms(BS=4, model=model_fp32)
quantized_avg_time = inference_with_time_ms(BS=4, model=model_quantized)
print(f'FP32 Avg time: {fp32_avg_time}ms')
print(f'Quantized Avg time: {quantized_avg_time}ms')