huffman编码简介
参考 这篇文章
python实现
- 开头
# -*- coding: utf-8 -*-
import heapq
import collections
import json
import io
from BitVector import BitVector # 网上的bitbector实现
import hashlib
import sys
import time
import base64
- 二叉树类
class TreeNode(object):
def __init__(self, key=None, val=None, code=None):
self.key, self.val, self.code = key, val, code
self.left = None
self.right = None
def __cmp__(self, other):
return cmp(self.val, other.val)
def __repr__(self):
return 'key=%s\nval=%s\ncode=%s' % (self.key, self.val, self.code)
- 定义装饰器用于计时
def caltime(func):
def wrapper(*args, **kwargs):
st = time.clock()
res = func(*args, **kwargs)
print '%s take %.3f s...' % (func.__name__, time.clock()-st)
return res
return wrapper
- 定义huffman类
class Huffman(object):
def __init__(self, raw_name, action='zip'):
self.raw_name = raw_name
if action=='zip':
with open(self.raw_name, 'rb') as f:
self.data = f.read()
self.save_name = raw_name + '.hfm'
elif action=='unzip':
if not raw_name.endswith('.hfm'):
raise IOError('file name must be ends with .hfm')
self.save_name = raw_name.rsplit('.', 1)[0]
self.root_node = None
self.encode_dict = None
self.tree_str = None
@caltime
def _build_heap(self):
count = collections.Counter(self.data)
count_list = count.items()
return [TreeNode(key=ord(x[0]), val=x[1]) for x in count_list]
@caltime
def _build_huffman_tree(self):
heap = self._build_heap()
while heap:
left_node = heapq.heappop(heap)
right_node = heapq.heappop(heap)
parent_node = TreeNode(val=left_node.val+right_node.val)
parent_node.left = left_node
parent_node.right = right_node
if heap:
heapq.heappush(heap, parent_node)
else:
return parent_node
# =========================================================== #
# 进行huffman编码,返回编码字典
# =========================================================== #
@caltime
def _huffman_encode(self):
def huffman_encode_helper(nd, cur_code=''):
encode_dict = {}
if nd.left is None and nd.right is None:
nd.code = cur_code
encode_dict.update({nd.key: nd.code})
if nd.left:
encode_dict.update(huffman_encode_helper(nd.left, cur_code+'0'))
if nd.right:
encode_dict.update(huffman_encode_helper(nd.right, cur_code+'1'))
return encode_dict
self.root_node = self._build_huffman_tree()
self.encode_dict = huffman_encode_helper(self.root_node)
self.tree_str = self._serialize_huffman_tree()
# =========================================================== #
# huffman树序列化
# =========================================================== #
@caltime
def _serialize_huffman_tree(self):
def pre_order(nd):
if nd is None:
return ['#']
output = [[nd.key, nd.code]] if nd.key is not None else [None]
output += pre_order(nd.left)
output += pre_order(nd.right)
return output
return json.dumps(pre_order(self.root_node))
# =========================================================== #
# 按照码本进行字符压缩
# =========================================================== #
@caltime
def huffman_zip(self):
print 'start huffman encoding...'
self._huffman_encode()
new_data = ''.join([self.encode_dict[ord(x)] for x in self.data])
bit_sz = len(new_data)
pad_sz = 8 - bit_sz%8
new_data += '0'*pad_sz
# bit_data = BitVector(bitstring=new_data) # 使用BitVector很慢
byte_data = []
for ii in range(0, len(new_data), 8):
bit = 0
for j in range(8):
bit = (bit<<1) + int(new_data[ii+j])
byte_data.append(chr(bit))
print 'save to %s...' % self.save_name
with open(self.save_name, 'wb') as fp:
# 把序列化的huffman树写在前面
fp.write('%s-%s-%s-' % (len(self.tree_str), self.tree_str, bit_sz))
# bit_data.write_to_file(fp)
fp.writelines(byte_data)
# =========================================================== #
# huffman树反序列化,重建树
# =========================================================== #
@caltime
def _deserialize_huffman_tree(self):
def pre_order(node_list):
nd_data = node_list.pop(0)
if nd_data=='#':
return None
nd = TreeNode(key=chr(nd_data[0]), code=nd_data[1]) if isinstance(nd_data, list) else TreeNode()
nd.left = pre_order(node_list)
nd.right = pre_order(node_list)
return nd
return pre_order(json.loads(self.tree_str))
# =========================================================== #
# huffman解压缩
# =========================================================== #
@caltime
def huffman_unzip(self):
with open(self.raw_name, 'rb') as fp:
n_tree = ''
while 1:
c = fp.read(1)
if c!='-':
n_tree += c
else:
break
## 重建huffman树
print 'start huffman decoding...'
self.tree_str = fp.read(int(n_tree))
self.root_node = self._deserialize_huffman_tree()
fp.read(1)
n_data = ''
while 1:
c = fp.read(1)
if c!='-':
n_data += c
else:
break
n_data = int(n_data)
byte_data = fp.read()
## 保存到文件
print 'save to %s...' % self.save_name
nd = self.root_node
with open(self.save_name, 'wb') as fp:
ii = 0
data = []
while ii<=n_data:
if nd.left is None and nd.right is None:
data.append(nd.key)
nd = self.root_node
else:
bit = ord(byte_data[ii/8]) & (128>>ii%8)
nd = nd.left if bit==0 else nd.right
ii += 1
fp.writelines(data)
总结
- huffman压缩
- 词频统计
统计每个字符(8位)出现的频率,针对中文编码,每个字节可用
0x??
表示
- 构建二叉树
每次取出词频最低的2个字符进行构建(利用堆),并将其父节点加入词库
- 进行编码
自上而下左0右1给每个叶子节点赋值,得到编码字典
字符->二进制序列
- 压缩
将源文件所有字符按编码字典转换为二进制序列后(需补齐位数为8的倍数)
写入新文件
我在文件头写入了二叉树(编码字典)信息,供解压缩使用
- 运行结果
start huffman encoding...
_build_heap take 0.079 s...
_build_huffman_tree take 0.116 s...
_serialize_huffman_tree take 0.001 s...
_huffman_encode take 0.123 s...
save to xxx.pdf.hfm...
huffman_zip take 2.759 s...
-
看一下生成的压缩文件
- huffman解码
- 重建二叉树
从压缩文件头读入二叉树信息,反序列化后构建二叉树
- 解码
从文件读入字节流,按位操作,从根节点开始,0左1右,直到叶子节点,
复原出原字符,写入新文件
猜想使用2个线程一个读一个写会不会更快
- 运行结果
start huffman decoding...
_deserialize_huffman_tree take 0.003 s...
save to xxx.pdf...
huffman_unzip take 1.030 s...