一、迁移学习
机器学习有一个基本假设:数据同分布。可参考之前这篇介绍:《数据不同分布,怎么整?》 然而,现实中的数据情况通常有点坎坷,数据不同分布的也很常见,比如,已有的大量其他领域的数据如何适用以当前任务的问题,这也是迁移学习所要解决的!
迁移学习,即Transfer learning 可以简单定义为:将在源域S的知识迁移到目标域T任务,提高在目标域T的任务下模型预测的性能。
顾名思义就是迁移其他领域的数据经验,用于当前的任务,以解决当前领域任务的数据积累不充分的问题。而通常地,其他领域的数据和当前目标域任务的数据分布是有差异的。
在NLP、CV任务中很天然地适合做迁移学习,各种的CV、NLP预训练模型层出不穷(如Bert、GPT、MAE)。一个例子如文本分类任务,我们可以借用在其他文本数据已经训练好的预训练模型(word2vec、BERT、GPT等等),用于当前的新闻分类任务里面。这样可以大大减少当前任务需要的数据,而且预训练模型是基于大样本学习的经验,做下微调,应用效果也杠杠的。
二、风控的迁移学习
回到金融风控任务,需要寄望于迁移学习的场景还是挺多的。很经常的,业务有扩展,引入了新的一个经营客群,而新的客群样本量刚开始肯定是很少的,这时就很需要借助下旧客群的数据。
而难点在于,风控领域很难像NLP领域那样的文字表示直接迁移,NLP中一个任务的文本表示可能就很适用另一文本任务。
风控面对的主要是结构化数据(Tabular Data),一个任务的数据组成、特征含义就很复杂多样了,尽管可以抽取出同一组特征表示,数据分布可能也是天差地别。这种情况下怎么做迁移学习呢?
下面结合风控的信用评分卡的任务,具体介绍迁移学习方法及项目代码实践。
首先先做下任务的背景介绍。信用评分卡是风控领域的核心任务之一,依据如个人基本信息、经济能力、贷款历史信息,用于判断借贷用户的按时还款的概率。本文数据来源github.com/aialgorithm/Blog《一文梳理金融风控建模全流程(Python)》
引用下迁移学习经典论文《A survey on transfer learning》,迁移学习可以分为四种基本的方法:
- 基于样本的迁移
- 基于特征的迁移
- 基于模型的迁移
- 基于关系的迁移(此项相关研究较少,本文略)。
2.1 基于样本的迁移
基于样本的迁移,是通过迁移源域的某些样本或设定样本权重到目标域学习。
基于样本应该是结构化数据任务应用较为广泛的迁移方法,但这类方法通常只在领域间分布差异较小时有效。最简单直接的,可以把原领域的样本直接加入目标领域一起学习训练,但由于数据分布差异的问题,这样可能效果反而更差。
# 代码来源:公众号-算法进阶
# 完整代码:github.com/aialgorithm/Blog 请见文章对应项目代码
train_xa = pd.concat([train_x,train_source[filter_feas]]) # 直接把全量原领域数据加到目标领域训练
train_ya = pd.concat([train_y,train_source['isDefault']])
lgb_source=lightgbm.LGBMClassifier(n_estimators=100, num_leaves=4,class_weight= 'balanced',metric = 'AUC',lambda_l1=1,lambda_l2=1)
lgb_source.fit(train_xa, train_ya)
print(train_ya.value_counts())
# 以目标领域的测试集判断效果
print('train ',model_metrics(lgb_source,train_x, train_y))
print('test ',model_metrics(lgb_source,test_x,test_y))
对于本项目直接拼凑两份领域数据用于训练,效果提升并不大。相比暴力拼凑,还有有两种做法可以减少分布差异。
2.1.1 样本选择法
只从原领域迁移与目标域分布一致(相似)的那部分样本。
我们可以通过像one class-SVM 、孤立森林等异常检测方法,以目标域样本为训练单分类SVM,选取预测原领域的样本中与目标域相似概率(阈值)较高的样本。
from sklearn.svm import OneClassSVM
from sklearn.ensemble import IsolationForest
ir = IsolationForest()#OneClassir()
ir.fit(train_x.fillna(0))
# 预测
train_source['isnormal'] = ir.predict(train_source[filter_feas].fillna(0)) #-1为分布异常的样本
train_source['normal_score'] = ir.score_samples(train_source[filter_feas].fillna(0))+1 #正常程度数值
train_source['normal_score'].hist(bins=10)
#筛选出分布正常的样本,拼接训练集
train_xa = pd.concat([train_x,train_source.loc[train_source['isnormal']==1,filter_feas]])
train_ya = pd.concat([train_y,train_source.loc[train_source['isnormal']==1,'isDefault']])
lgb_source.fit(train_xa, train_ya)
print(train_ya.value_counts())
# 以目标领域的测试集判断效果
print('train ',model_metrics(lgb_source,train_x, train_y))
print('test ',model_metrics(lgb_source,test_x,test_y))
- 或者另一种方式, 结合我们的信用违约二分类模型的任务,也可以通过目标域样本训练一个二分类模型,通过分类模型对原领域样本的预测结果,选取出在原领域中预测较准的那部分样本(原领域预测较准的样本说明和目标领域学习的样本比较一致)。
其中预测概率的阈值是个经验值,可以通过搜索不同阈值下实际效果确定。另外,可以结合实际效果,进行多次迭代选择。
2.1.2 样本权重法
对与目标域分布一致性较高的样本,就给以较高的样本学习权重。反之亦然。通过样本权重控制对原领域样本的有效利用。
- 简单点的方法,可以通过异常检测模型评价原领域的样本的正常的程度作为权重;
import numpy as np
from sklearn.svm import OneClassSVM
from sklearn.ensemble import IsolationForest
ir = IsolationForest()#OneClassir()
ir.fit(train_x.fillna(0))
# 预测
train_source['isnormal'] = ir.predict(train_source[filter_feas].fillna(0)) #-1为与目标域分布异常的样本
train_source['normal_score'] = ir.score_samples(train_source[filter_feas].fillna(0))**2 #正常程度数值
train_source['normal_score'].hist(bins=10)
#拼接训练集
train_xa = pd.concat([train_x,train_source.loc[:,filter_feas]])
train_ya = pd.concat([train_y,train_source.loc[:,'isDefault']])
weights = np.append(np.array([1]*train_x.shape[0]) ,train_source['normal_score']) #目标域权重固定为1,原领域按照正常程度加权
lgb_source.fit(train_xa, train_ya,sample_weight=weights)
print(train_ya.value_counts())
# 以目标领域的测试集判断效果
print('train ',model_metrics(lgb_source,train_x, train_y))
print('test ',model_metrics(lgb_source,test_x,test_y))
也可以对目标域,原领域的样本分别标注为’1‘和’0‘标签,训练一个分类模型,验证分类模型的预测效果,模型效果越好说明两个领域的不一样(区分)程度越明显,越有必要做样本选择或样本权重。 这里原领域的样本权重可以用归一化后的odds作为权重(odds是指一个事件的发生比,为该事件发生的概率与该事件不发生的概率的比值)。在这里,原领域的样本权重就是归一化后的【样本属于目标域的概率p(t|x)除以不属于目标域的概率1-p(t|x)】。
或者还有一种方式, 以我们的信用违约二分类模型的任务,可以通过目标域样本训练一个二分类模型,以分类模型对原领域样本的预测结果的准确性为权重。预测准确的样本权重越高。权重=1- abs(预测概率 - 实际标签值),如实际标签为1的样本,模型预测的概率为0.9权重就为0.9,预测概率如为0.2,与标签差异比较大,权重就是0.2 也比较小。
#拼接训练集
train_xa = pd.concat([train_x,train_source.loc[:,filter_feas]])
train_ya = pd.concat([train_y,train_source.loc[:,'isDefault']])
train_source['proba'] = lgb_tar.predict_proba(train_source[filter_feas])[:,1]
train_source['samplew'] = 1-abs(train_source['proba']-train_source['isDefault'])
weights = np.append(np.array([1]*train_x.shape[0]) ,train_source['samplew']) #目标域权重固定为1,原领域按照分类准确程度加权
lgb_source.fit(train_xa, train_ya,sample_weight=weights)
print(train_ya.value_counts())
# 以目标领域的测试集判断效果
print('train ',model_metrics(lgb_source,train_x, train_y))
print('test ',model_metrics(lgb_source,test_x,test_y))
以上几种方法都可以在原领域样本引入一个先验的样本权重,加入到模型训练的sample_weights。其中,目标领域的样本就没必要加样本权重或加入个较大的权重。当然,权重方法的好坏最终还是要结合实际效果。
- TrAdaboost
不同于上面直接给一个固定的样本权重,我们还可以在训练学习中动态地调整好权重,这里有个现成的迁移学习boosting框架TrAdaboost 《Boosting for Transfer Learning 》
TrAdaboost可以通过动态调整样本权重,在目标领域的任务上利用好原领域的数据。权重的调整的大概思路:以目标域样本评估训练损失,对于造成较大误差的原领域的那部分样本,则认为其和目标领域数据差异较大,不断降低它的权重。而对于目标领域分类错误的样本则提升样本权重(这个和Adaboost难样本一脉相承)。核心代码如下
"""" 附录相关资料
论文地址:cse.hkust.edu.hk/~qyang/Docs/2007/tradaboost.pdf
代码来源:github.com/loyalzc/transfer_learning/blob/master/TrAdaboost
论文解读:by马东什么zhuanlan.zhihu.com/p/109540481
""""
class TrAdaboost:
def __init__(self, base_classifier=DecisionTreeClassifier(), N=10):
self.base_classifier = base_classifier
self.N = N
self.beta_all = np.zeros([1, self.N])
self.classifiers = []
def fit(self, x_source, x_target, y_source, y_target):
x_train = np.concatenate((x_source, x_target), axis=0)
y_train = np.concatenate((y_source, y_target), axis=0)
x_train = np.asarray(x_train, order='C')
y_train = np.asarray(y_train, order='C')
y_source = np.asarray(y_source, order='C')
y_target = np.asarray(y_target, order='C')
row_source = x_source.shape[0]
row_target = x_target.shape[0]
# 初始化权重
weight_source = np.ones([row_source, 1]) / row_source
weight_target = np.ones([row_target, 1]) / row_target
weights = np.concatenate((weight_source, weight_target), axis=0)
beta = 1 / (1 + np.sqrt(2 * np.log(row_source / self.N)))
result = np.ones([row_source + row_target, self.N])
for i in range(self.N):
weights = self._calculate_weight(weights)
self.base_classifier.fit(x_train, y_train, sample_weight=weights[:, 0])
self.classifiers.append(self.base_classifier)
result[:, i] = self.base_classifier.predict(x_train)
error_rate = self._calculate_error_rate(y_target,
result[row_source:, i],
weights[row_source:, :])
print("Error Rate in target data: ", error_rate, 'round:', i, 'all_round:', self.N)
if error_rate > 0.5:
error_rate = 0.5
if error_rate == 0:
self.N = i
print("Early stopping...")
break
self.beta_all[0, i] = error_rate / (1 - error_rate)
# 调整 target 样本权重 错误样本权重变大
for t in range(row_target):
weights[row_source + t] = weights[row_source + t] * np.power(self.beta_all[0, i], -np.abs(result[row_source + t, i] - y_target[t]))
# 调整 source 样本 错分样本变小
for s in range(row_source):
weights[s] = weights[s] * np.power(beta, np.abs(result[s, i] - y_source[s]))
def predict(self, x_test):
result = np.ones([x_test.shape[0], self.N + 1])
predict = []
i = 0
for classifier in self.classifiers:
y_pred = classifier.predict(x_test)
result[:, i] = y_pred
i += 1
for i in range(x_test.shape[0]):
left = np.sum(result[i, int(np.ceil(self.N / 2)): self.N] *
np.log(1 / self.beta_all[0, int(np.ceil(self.N / 2)):self.N]))
right = 0.5 * np.sum(np.log(1 / self.beta_all[0, int(np.ceil(self.N / 2)): self.N]))
if left >= right:
predict.append(1)
else:
predict.append(0)
return predict
def predict_prob(self, x_test):
result = np.ones([x_test.shape[0], self.N + 1])
predict = []
i = 0
for classifier in self.classifiers:
y_pred = classifier.predict(x_test)
result[:, i] = y_pred
i += 1
for i in range(x_test.shape[0]):
left = np.sum(result[i, int(np.ceil(self.N / 2)): self.N] *
np.log(1 / self.beta_all[0, int(np.ceil(self.N / 2)):self.N]))
right = 0.5 * np.sum(np.log(1 / self.beta_all[0, int(np.ceil(self.N / 2)): self.N]))
predict.append([left, right])
return predict
def _calculate_weight(self, weights):
sum_weight = np.sum(weights)
return np.asarray(weights / sum_weight, order='C')
def _calculate_error_rate(self, y_target, y_predict, weight_target):
sum_weight = np.sum(weight_target)
return np.sum(weight_target[:, 0] / sum_weight * np.abs(y_target - y_predict))
2.2 基于特征的迁移
基于特征的迁移方法是指将源域和目标域的数据特征变换到统一特征空间中,来减少源域和目标域之间的差距。它基于这样的假设:“为了有效的迁移,良好的表征应该是对主要学习任务的区别性,以及对源域和目标域的不加区分。”
基于特征的迁移学习方法也是一大热门,大多是与神经网络的表示学习进行结合,应用于文本、图像数据等任务。
对于结构化数据,个人理解基于特征的迁移通常是要结合样本、模型层面的迁移。对应着,可以通过特征加工转换到同一分布减少些差异(如归一化什么的),以及可以做下特征选择,寻找既适用于源域又适用于目标域的可迁移特征。比如可以通过特征重要性及稳定性PSI进行筛选,在原领域 与目标领域的分布比较一致的特征。PSI表示的是实际分布与预期两个分布间的差异,PSI=SUM( (实际占比 - 预期占比)* ln(实际占比 / 预期占比) )。特征选择方法可以参考这篇《特征选择》
特征处理完,再进一步选择合适的样本或模型迁移方法。
2.3 基于模型的迁移
基于模型的方法是NLP、CV领域较为普遍的思路,通过大数据训一个预训练模型,然后下游任务只需要微调下就可以用了。
而对于结构化数据任务,这里我们可以采用熟悉的模型融合方式,简单高效地利用好原领域的这部分数据信息,基于模型的方法很大程度也包含了特征层面的加工及迁移。
一个融合的思路是,我们可以先以原领域的数据训练模型A,将模型A的预测概率作为特征加入 以目标领域训练的模型。这个思路还是和预训练微调很像的。
# 先从原领域学习一个模型
lgb_source=lightgbm.LGBMClassifier(n_estimators=100, num_leaves=6,class_weight= 'balanced',metric = 'AUC',lambda_l1=1,lambda_l2=1)
lgb_source.fit(train_source.loc[:,filter_feas], train_source.loc[:,'isDefault'])
# 原领域模型评分作为特征输入目标领域模型
train_bank['source_score'] = lgb_source.predict_proba(train_bank.loc[:,filter_feas])[:,1]
train_x, test_x, train_y, test_y = train_test_split(train_bank[filter_feas+['source_score']], train_bank.isDefault,test_size=0.3, random_state=0)
lgb_tar=lightgbm.LGBMClassifier(n_estimators=100, num_leaves=6,class_weight= 'balanced',metric = 'AUC',lambda_l1=1,lambda_l2=1)
lgb_tar.fit(train_x, train_y)
print(train_y.value_counts())
print('train ',model_metrics(lgb_tar,train_x, train_y))
print('test ',model_metrics(lgb_tar,test_x,test_y))
## 输出特征重要性
from lightgbm import plot_importance
plot_importance(lgb_tar)