说明:本文依据《中文自然语言处理入门实战》完成。目前网上有不少转载的课程,我是从GitChat上购买。
近几年在自然语言处理领域中,HMM(隐马尔可夫模型)和 CRF(条件随机场)算法常常被用于分词、句法分析、命名实体识别、词性标注等。
由于两者之间有很大的共同点,所以在很多应用上往往是重叠的,但在命名实体、句法分析等领域 CRF 似乎更胜一筹。通常来说如果做自然语言处理,这两个模型应该都要了解,下面我们来看看本文的内容。
第八课 从自然语言处理角度看HMM和CRF
基于HMM训练中文分词器
1.模型介绍
HMM模型是由一个“五元组”构成的集合。
1.StatusSet:状态值集合,状态值集合为 (B, M, E, S),其中 B 为词的首个字,M 为词中间的字,E 为词语中最后一个字,S 为单个字,B、M、E、S 每个状态代表的是该字在词语中的位置。
2.ObservedSet:观察值集合,观察值集合就是所有语料的汉字,甚至包括标点符号所组成的集合。
3.TransProbMatrix:转移概率矩阵,状态转移概率矩阵的含义就是从状态 X 转移到状态 Y 的概率,是一个4×4的矩阵,即 {B,E,M,S}×{B,E,M,S}。
4.EmitProbMatrix:发射概率矩阵,发射概率矩阵的每个元素都是一个条件概率,代表 P(Observed[i]|Status[j]) 概率。
5.InitStatus:初始状态分布,初始状态概率分布表示句子的第一个字属于 {B,E,M,S} 这四种状态的概率。
使用HMM进行分词,要解决的问题就是参数已知的情况下,如何求解状态值序列。
2.语料准备
本次训练使用的预料 syj_trainCorpus_utf8.txt 是我爬取的短文本处理生成的。整个语料大小 264M,包含1116903条数据,UTF-8 编码,词与词之间用空格隔开,用来训练分词模型。
教程给的是CSDN地址,很没劲又设置了1分的付费下载,搞得必须去充值才行,于是就在淘宝付费代下,以下为百度云盘地址
链接:https://pan.baidu.com/s/1eyTPOD7kuDdcIVDLVl6OlA
提取码:qf5k
3.实现
import pickle
import json
STATES = {'B', 'M', 'E', 'S'}
EPS = 0.0001
seg_stop_words = {" ", ",", "。", "“", "”", '“', "?", "!", ":", "《", "》", "、", ";", "·", "‘ ", "’", "──", ",", ".", "?",
"!", "`", "~", "@", "#", "$", "%", "^", "&", "*", "(", ")", "-", "_", "+", "=", "[", "]", "{", "}",
'"', "'", "<", ">", "\\", "|" "\r", "\n", "\t"}
class HMM_Model:
def __init__(self):
self.trans_mat = {} # trans_mat:状态转移矩阵,trans_mat[state1][state2] 表示训练集中由 state1 转移到 state2 的次数。
self.emit_mat = {} # emit_mat:观测矩阵,emit_mat[state][char] 表示训练集中单字 char 被标注为 state 的次数。
self.init_vec = {} # init_vec:初始状态分布向量,init_vec[state] 表示状态 state 在训练集中出现的次数。
self.state_count = {} # state_count:状态统计向量,state_count[state]表示状态 state 出现的次数。
self.states = {}
self.inited = False
# 初始化第一个方法中的数据结构
def setup(self):
for state in self.states:
# build trans_mat
self.trans_mat[state] = {}
for target in self.states:
self.trans_mat[state][target] = 0.0
self.emit_mat[state] = {}
self.init_vec[state] = 0
self.state_count[state] = 0
self.inited = True
# filename 指定模型名称,默认模型名称为 hmm.json,这里提供两种格式的保存类型,JSON 或者 pickle 格式,通过参数 code 来决定,code 的值为 code='json' 或者 code = 'pickle',默认为 code='json'
def save(self, filename="hmm.json", code='json'):
fw = open(filename, 'w', encoding='utf-8')
data = {
"trans_mat": self.trans_mat,
"emit_mat": self.emit_mat,
"init_vec": self.init_vec,
"state_count": self.state_count
}
if code == "json":
txt = json.dumps(data)
txt = txt.encode('utf-8').decode('unicode-escape')
fw.write(txt)
elif code == "pickle":
pickle.dump(data, fw)
fw.close()
# filename 指定模型名称,默认模型名称为 hmm.json,这里提供两种格式的保存类型,JSON 或者 pickle 格式,通过参数 code 来决定,code 的值为 code='json' 或者 code = 'pickle',默认为 code='json'
def load(self, filename="hmm.json", code="json"):
fr = open(filename, 'r', encoding='utf-8')
if code == "json":
txt = fr.read()
model = json.loads(txt)
elif code == "pickle":
model = pickle.load(fr)
self.trans_mat = model["trans_mat"]
self.emit_mat = model["emit_mat"]
self.init_vec = model["init_vec"]
self.state_count = model["state_count"]
self.inited = True
fr.close()
# 用来训练模型,因为使用的标注数据集, 因此可以使用更简单的监督学习算法,训练函数输入观测序列和状态序列进行训练, 依次更新各矩阵数据。类中维护的模型参数均为频数而非频率, 这样的设计使得模型可以进行在线训练,使得模型随时都可以接受新的训练数据继续训练,不会丢失前次训练的结果
def do_train(self, observes, states):
if not self.inited:
self.setup()
for i in range(len(states)):
if i == 0:
self.init_vec[states[0]] += 1
self.state_count[states[0]] += 1
else:
self.trans_mat[states[i - 1]][states[i]] += 1
self.state_count[states[i]] += 1
if observes[i] not in self.emit_mat[states[i]]:
self.emit_mat[states[i]][observes[i]] = 1
else:
self.emit_mat[states[i]][observes[i]] += 1
# 在进行预测前,需将数据结构的频数转换为频率
def get_prob(self):
init_vec = {}
trans_mat = {}
emit_mat = {}
default = max(self.state_count.values())
for key in self.init_vec:
if self.state_count[key] != 0:
init_vec[key] = float(self.init_vec[key]) / self.state_count[key]
else:
init_vec[key] = float(self.init_vec[key]) / default
for key1 in self.trans_mat:
trans_mat[key1] = {}
for key2 in self.trans_mat[key1]:
if self.state_count[key1] != 0:
trans_mat[key1][key2] = float(self.trans_mat[key1][key2]) / self.state_count[key1]
else:
trans_mat[key1][key2] = float(self.trans_mat[key1][key2]) / default
for key1 in self.emit_mat:
emit_mat[key1] = {}
for key2 in self.emit_mat[key1]:
if self.state_count[key1] != 0:
emit_mat[key1][key2] = float(self.emit_mat[key1][key2]) / self.state_count[key1]
else:
emit_mat[key1][key2] = float(self.emit_mat[key1][key2]) / default
return init_vec, trans_mat, emit_mat
# 采用Viterbi(维特比)算法求得最优路径
def do_predict(self, sequence):
tab = [{}]
path = {}
init_vec, trans_mat, emit_mat = self.get_prob()
# 初始化
for state in self.states:
tab[0][state] = init_vec[state] * emit_mat[state].get(sequence[0], EPS)
path[state] = [state]
# 创建动态搜索表
for t in range(1, len(sequence)):
tab.append({})
new_path = {}
for state1 in self.states:
items = []
for state2 in self.states:
if tab[t - 1][state2] == 0:
continue
prob = tab[t - 1][state2] * trans_mat[state2].get(state1, EPS) * emit_mat[state1].get(sequence[t],
EPS)
items.append((prob, state2))
best = max(items)
tab[t][state1] = best[0]
new_path[state1] = path[best[1]] + [state1]
path = new_path
# 搜索最有路径
prob, state = max([(tab[len(sequence) - 1][state], state) for state in self.states])
return path[state]
def get_tags(src):
tags = []
if len(src) == 1:
tags = ['S']
elif len(src) == 2:
tags = ['B', 'E']
else:
m_num = len(src) - 2
tags.append('B')
tags.extend(['M'] * m_num)
tags.append('E')
return tags
def cut_sent(src, tags):
word_list = []
start = -1
started = False
if len(tags) != len(src):
return None
if tags[-1] not in {'S', 'E'}:
if tags[-2] in {'S', 'E'}:
tags[-1] = 'S'
else:
tags[-1] = 'E'
for i in range(len(tags)):
if tags[i] == 'S':
if started:
started = False
word_list.append(src[start:i])
word_list.append(src[i])
elif tags[i] == 'B':
if started:
word_list.append(src[start:i])
start = i
started = True
elif tags[i] == 'E':
started = False
word = src[start:i + 1]
word_list.append(word)
elif tags[i] == 'M':
continue
return word_list
class HMMSoyoger(HMM_Model):
def __init__(self, *args, **kwargs):
super(HMMSoyoger, self).__init__(*args, **kwargs)
self.states = STATES
self.data = None
# 加载训练数据
def read_txt(self, filename):
self.data = open(filename, 'r', encoding="utf-8")
# 模型训练函数
def train(self):
if not self.inited:
self.setup()
for line in self.data:
line = line.strip()
if not line:
continue
# 观测序列
observes = []
for i in range(len(line)):
if line[i] == " ":
continue
observes.append(line[i])
# 状态序列
words = line.split(" ")
states = []
for word in words:
if word in seg_stop_words:
continue
states.extend(get_tags(word))
# 开始训练
if (len(observes) >= len(states)):
self.do_train(observes, states)
else:
pass
# 模型分词预测
def lcut(self, sentence):
try:
tags = self.do_predict(sentence)
return cut_sent(sentence, tags)
except:
return sentence
#训练模型,首先实例化 HMMSoyoger 类,然后通过 read_txt() 方法加载语料,再通过 train() 进行在线训练,如果训练语料比较大,可能需要等待一点时间
soyoger=HMMSoyoger()
soyoger.read_txt(r'C:\Users\01\Desktop\机器学习作业\sklearn+tensorflow\corpus_data\syj_trainCorpus_utf8.txt')
soyoger.train()
print(soyoger.lcut("中国的人工智能发展进入高潮阶段。"))
print(soyoger.lcut("中文自然语言处理是人工智能技术的一个重要分支。"))
print(soyoger.lcut("关注可了解更多的教程及排版技巧。问题或建议,请公众号留言"))
print(soyoger.lcut("南京市长是个好同志"))
print(soyoger.lcut("南京市长江大桥"))
我按照教程敲下来的代码,分词结果明显有问题……两个两个分词,就这教程还说好……
['中国', '的', '人工', '智能', '发展', '进入', '高潮', '阶段', '。']
['中文', '自然', '语言', '处理', '是', '人工', '智能', '技术', '的一', '个重', '要分', '支。']
['关注', '可了', '解更', '多的', '教程', '及排', '版技', '巧。', '问题', '或建', '议,', '请公', '众号', '留言']
['南京', '市长', '是', '个好', '同志']
['南京', '市长', '江大', '桥']
试一下HanLP分词
from pyhanlp import *
print(HanLP.segment(r'你好,欢迎在Python中调用HanLP的API'))
testCases = [
"中国的人工智能发展进入高潮阶段。",
"中文自然语言处理是人工智能技术的一个重要分支。",
"关注可了解更多的教程及排版技巧。问题或建议,请公众号留言",
"南京市长是个好同志",
"南京市长江大桥"]
for sentence in testCases: print(HanLP.segment(sentence))
#[你好/vl, ,/w, 欢迎/v, 在/p, Python/nx, 中/f, 调用/v, HanLP/nx, 的/ude1, API/nx]
# [中国/ns, 的/ude1, 人工智能/n, 发展/vn, 进入/v, 高潮/n, 阶段/n, 。/w]
# [中文/nz, 自然语言处理/nz, 是/vshi, 人工智能/n, 技术/n, 的/ude1, 一个/mq, 重要/a, 分支/n, 。/w]
# [关注/v, 可/v, 了解/v, 更多/ad, 的/ude1, 教程/n, 及/cc, 排版/vn, 技巧/n, 。/w, 问题/n, 或/c, 建议/n, ,/w, 请/v, 公众/n, 号/q, 留言/n]
# [南京/ns, 市长/nnt, 是/vshi, 个/q, 好/a, 同志/n]
# [南京市/ns, 长江大桥/nz]
结果一目了然,干嘛还要重复造轮子……
关于CRF分词,同样HanLP也有,不过需要通过调用JClass来完成对Java的底层API的调用。