3.2 基于Keras的LTV模型实践
本节介绍基于Keras的神经网络LTV预测模型,希望读者能够快速上手,加深对LTV建模的认识。
3.2.1 Keras介绍
Keras是一个高层级的开源神经网络API库,使用Python语言编写,并且可以运行在TensorFlow、Theano和CNTK上。它隐去了神经网络的底层实现细节,可以让我们像搭积木一样,快速构建网络,缩短从构思到实现的时间。它对用户友好,使用方法简单,可快速上手,现在已经得到了很广泛的使用,并被内置在最新版的TensorFlow中。下面我们使用Keras建立神经网络模型,进行LTV建模。同时我们也使用了Python生态中常用的数据处理库,如NumPy和Pandas等。如果没有安装Keras,使用pip install Keras命令安装即可。
3.2.2 数据的加载和预处理
本节我们会使用一个公开的零售数据,该数据包含某个以英国顾客为主要消费人群的在线零售商店8月的零售数据,数据可从https://www.kaggle.com/vijayuv/onlineretail页面获取。下载后解压为OnlineRetail.csv。我们首先使用如下命令引入需要的库。
import numpy as np import pandas as pd
现在我们使用pandas读入数据,并放在变量OnlRt中,如代码清单3-1所示。
代码清单3-1 数据读取
OnlRt=pd.read_csv('./OnlineRetail.csv', usecols=['CustomerID','InvoiceDate','UnitPrice','Quantity', 'Country'], encoding = "ISO-8859-1", parse_dates=['InvoiceDate'], dtype={'CustomerID':np.str,'UnitPrice':np.float32,'Quantity':np.int32,'Country':np.str}) OnlRt.head()
图3-2所示是文件内容。这里只使用我们关心的一些特征,包括购买数量(Quantity)、开发票的日期(InvoiceDate)、单价(UnitPrice)、顾客ID(CustomerID)和国家(Country)。这里设置编码协议encoding="ISO-8859-1"确保模型能正确解析字符。特征列中开发票的日期这项我们解析为日期类型,顾客ID和国家为字符类型,单价是浮点数类型,购买数量是整数类型。
图3-2 文件内容
之后做一些初步的数据清理,把关注点集中在英国的顾客上。我们以英国顾客的消费数据作为数据集,如代码清单3-2所示。
代码清单3-2 数据清理
neg_id=OnlRt[(OnlRt['Quantity']<=0)|(OnlRt['UnitPrice']<=0)].loc[:,'CustomerID'] data0=OnlRt[(OnlRt['CustomerID'].notnull())& (~OnlRt['CustomerID'].isin(neg_id))& (OnlRt['Country']=='United Kingdom')].drop('Country',axis=1)
对这部分代码再做一些初步的清洗工作:首先找出购买数量为负或者单价为负的顾客,这些顾客存在退货的情况。我们简单地把这些顾客的数据去掉(id在neg_id中)。另外,还要去掉id不存在的顾客。在这之后挑出所有的英国顾客,我们把经过处理的数据集命名为data0。
我们看一下现在的特征列,其中有单价和购买量两种,这两者的乘积就是顾客这次购物行为的总付费,为便于使用,我们添上这一列,命名为amount,并将数据集重命名为data1,代码如下。
data1=data0.assign(amount=data0['UnitPrice'].multiply(data0['Quantity']))
接下来,对于每个顾客,我们需要找出他或她第一次产生购买行为的时间,如代码清单3-3所示。
代码清单3-3 找到顾客第一次购买行为的时间
first_time=data1['InvoiceDate'].sort_values(ascending=True).groupby(data1['CustomerID']).nth(0)\
.apply(lambda x:x.date()).reset_index().rename(columns={'InvoiceDate':'first_time'})
data2=pd.merge(data1,first_time,how='left',on=['CustomerID'])
我们对开发票的时间进行排序,并按照顾客ID分组,取时间最早的一次开票时间作为顾客第一次产生购买行为的时间(这里我们认为开发票的时间就是购买的时间),将结果存入变量first_time。之后,我们将first_time和原来的数据集进行merge操作,并将新数据集命名为data2。
接下来,我们从顾客购买时间这个属性中提取一些新特征。
首先是购买行为相距第一次购买时间的天数dayth,如代码清单3-4所示。
代码清单3-4 时间天数处理
dayth=(data2['InvoiceDate'].apply(lambda x: x.date())-data2['first_time']).apply(lambda x: x.days)
然后是每次购买时间的月(month)、星期(weekday)、小时(hour)、分(minute)和秒(second),如代码清单3-5所示。
代码清单3-5 时间特征抽取
month=data2['InvoiceDate'].apply(lambda x: x.month) weekday=data2['InvoiceDate'].apply(lambda x: x.weekday()) hour=data2['InvoiceDate'].apply(lambda x: x.hour) minute=data2['InvoiceDate'].apply(lambda x: x.minute) second=data2['InvoiceDate'].apply(lambda x: x.second)
我们发现,顾客产生购买行为所在的月份是一个比较大的时间单位,对预测帮助不大。而具体的时间我们做一下整合hour_ preci,然后和星期一起作为新出现的两列特征,如代码清单3-6所示。
代码清单3-6 时间特征构造
hour_preci=(second/60+minute)/60+hour
整理一下数据集,其中first_time和InvoiceDate里的信息已经用过了,将这两列去掉并重新排序,新的数据集记作data3,如代码清单3-7所示。
代码清单3-7 数据表整理
data3=data2.assign(dayth=dayth).assign(hour=hour_preci).\ assign(weekday=weekday).drop(['first_time','InvoiceDate'],axis=1).\ sort_values(by=['CustomerID','dayth','hour'])
下面我们分离出特征数据X和值数据y。我们用顾客前28天的数据作为训练集的输入数据,把amount去掉,因为它不是独立的特征(但鼓励读者试一下合成多个特征,运行模型看看对效果的提升有多大)。使用CustomerID作为索引,如代码清单3-8所示。
代码清单3-8 处理特征列
X=data3[data3['dayth']<28].set_index('CustomerID').drop('amount',axis=1).sort_index()
在这个特征集合中,特征列有Quantity、UnitPrice、dayth、hour和weekday。
对于y,也就是需要预测的LTV,我们定义为180天的总付费,如代码清单3-9所示。
代码清单3-9 处理目标值
data180=data3[(data3['dayth']<180)&(data3['CustomerID'].isin(X.index))] y=data180['amount'].groupby(data180['CustomerID']).sum().sort_index()
data180包含了X中对应顾客180天的付费情况。我们对此进行求和,得到180天的总付费额度y,并对此排序。
两个变量X和y包含了我们需要的数据。我们把它们保存到文件中以备后续使用,如代码清单3-10所示。
代码清单3-10 文件保存
X.to_csv('bookdata_X.csv',index=True,header=True) y.to_csv('bookdata_y.csv',index=True,header=True)
3.2.3 输入数据的准备
下面开始搭建模型。但在机器学习的问题中,模型本身往往并不是决定结果好坏的标准,如何使用数据在很大程度上影响着输出的结果。
我们这里使用到一种(星期,时序,特征)的数据组织形式。之所以这样做,是因为很多数据存在着时间上的周期性,且这种周期性是具备多种周期的。一年四季,季节变化往复,形成了自然的周期,同样,由于按照星期来组织工作,每周也是个很自然的周期。相对而言,月的周期性并不明显。最后,每天也是一个自然周期。在我们的问题中,时间尺度较短,我们只须考虑星期和时序这两种周期性。具体地说,我们把28天分成4个星期,每个星期内用户购买行为的特征都作为一个时间序列。不同的顾客购买行为存在很大差异,有些顾客购买次数会很多,有些又很少。对此我们采用一种“截断+填充”的方式,把序列的长度控制在一个固定值。假设此固定值为p,特征数量为q,最后每个顾客的特征数据会是一个(4,p,q)形态的张量。
下面我们进行具体的操作,首先引入需要的库,如代码清单3-11所示。
代码清单3-11 引入库
import numpy as np import pandas as pd from sklearn.model_selection import train_test_split from sklearn.preprocessing import MinMaxScaler from Keras.layers import Input, Conv1D, Dropout, LSTM, TimeDistributed, Bidirectional, Dense from Keras.models import Model from Keras.callbacks import EarlyStopping import matplotlib.pyplot as plt
然后把3.2.2节的文件加载进来,如代码清单3-12所示。
代码清单3-12 文件加载
columns_picked=['CustomerID','Quantity', 'UnitPrice', 'dayth', 'hour', 'weekday'] y=pd.read_csv('bookdata_y.csv').rename(columns={'CustomerID':'id'}).set_index('id')['amount'] X=pd.read_csv('bookdata_X.csv',usecols=columns_picked ).rename(columns={'CustomerID':'id'}).set_index('id') columns_picked.remove('CustomerID')
在代码中,为确保列名正确,我们在列表columns_picked中写出想要读入的列,并将CustomerID重命名为id,后者比较简短,下面我们会多次使用这个量。将id设置为索引,并将它从columns_picked中去掉,留下那些真正的特征。
之后,我们使用scikit-learn的train_test_split把数据集分成训练集和测试集。这里我们先对X和y的索引进行拆分,然后根据索引找到对应的数据,如代码清单3-13所示。
代码清单3-13 拆分训练集和测试集
indices=y.index.tolist() ind_train,ind_test=map(sorted,train_test_split(indices, test_size=0.25, random_state=42)) X_train=X.loc[ind_train,:] y_train=y[ind_train] X_test=X.loc[ind_test,:] y_test=y[ind_test]
这里先取出y的index,存在listindices中。之后把索引分成训练数据的索引ind_train和测试数据的索引ind_test。我们选择的测试集样本占比是0.25,即25%。随机状态random_state设为42,这是一种主流的习惯,读者也可尝试其他状态,总之目的是使得随机数序列具有可重复性,方便调试。
另外需要注意的是,我们把索引进行了排序,这是一种方便的做法,可以维护X和y中的数据,使得它们保持相同的顺序,这是通过将sort函数进行map操作到结果序列上实现的。之后通过各自的索引,我们把训练数据和测试数据分开,分别放入X_train、y_train、X_test和y_test变量中。
标准化(或者叫正规化)是特征处理中重要的一步,我们采用scikit-learn中的MinMaxScaler实现标准化操作,先创建一个scaler,把所有特征的范围统一缩放到(-0.5,0.5)区间内,之后对X_train进行处理,把结果变为一个数据集,并称之为X_train_scaled,如代码清单3-14所示。
代码清单3-14 特征标准化
scaler=MinMaxScaler(feature_range=(-0.5,0.5)) X_train_scaled =pd.DataFrame(scaler.fit_transform(X_train), columns=X_train.columns, index=X_train.index)
对y值,我们则做一种特殊的处理。由于其变动范围很大,不利于模型的处理,我们先对其取对数,如代码清单3-15所示。从图3-3中可以看到处理前和处理后y分布的差异。
代码清单3-15 对数处理和画图
y_train_log=y_train.apply(np.log) plt.figure(); plt.subplot(1,2,1) y.hist(bins=np.arange(0,4000,50)) plt.xlabel('frequency') plt.ylabel('y value') plt.grid('off') plt.axis([0,4000,0,250]) plt.subplot(1,2,2) y.apply(np.log).hist(bins=np.arange(0,10,0.2)) plt.xlabel('frequency') plt.ylabel('log(y) value') plt.grid('off') plt.axis([0,10,0,250])
图3-3 数据变换前后分布对比
图3-3左图展示的是y的分布,图中显示y呈现逐渐衰减的模式,类似幂律递减的分布。如图3-3右图所示,我们对其取对数之后,y的分布有所改变,看起来比较接近正态分布,log(y)的值从(0, 4000)范围缩减到了(0, 10)。
另外,我们需要把每个顾客的28天购买记录分成4个星期,每7天作为一个星期,如代码清单3-16所示,变量week_train作为每个数据所在星期的标识,分为0、1、2、3,我们称其为“week”。
代码清单3-16 处理星期
week_train=X_train['dayth'].apply(lambda x: int(x/7)).rename('week')
现在设定一些参数,如代码清单3-17所示。
代码清单3-17 参数设定
inner_length=32 outer_length=4 feature_len=len(columns_picked)
这些参数是针对我们设计的数据形态而定的。inner_length是每个星期内(内部)我们截取的交易数量,outer_length是外面的时间单位数量,即星期数,在这里是4,feature_length是我们使用的特征数量,即columns_picked的长度。
将inner_length设为32,这是因为每个顾客每星期平均购买次数是23(计算过程留给读者),这里的数值略大于这个平均数。作为一个超参数,读者也可以尝试其他的值来看效果。
既然需要对序列进行截断和填充,就要确保序列长度统一为inner_length,我们写一个函数cut_pad()来完成这个操作,如代码清单3-18所示。
代码清单3-18 序列长度处理
def cut_pad(x,maxl): head=np.array(x)[0:maxl] head_padding=head if len(head)==maxl else np.pad(head,(0,maxl-len(head)),mode='constant') return head_padding
cut_pad()函数是这样实现的:对于任何一个序列x,我们想截断或填充的长度为maxl,先将x转换为数组,然后取前maxl位。这样做的好处是,如果x的位数小于maxl,x的长度是不变的。现在完成了第一步:对过长的序列进行截断。第二步操作是检查长度够不够maxl,如果不够,就补零。pad()函数可以很好地完成这个任务,注意,这里mode的取值是constant,意思是保持长度恒定。通过这两步,我们得到并返回结果。
之后是对每个顾客的购买记录数据的形态进行变形,如代码清单3-19所示。
代码清单3-19 特征数组处理函数
def feature_array(df, col_n, week,len_outer,len_inner): col=df[[col_n]].assign(week=week).reset_index() ids=col['id'].drop_duplicates().values.tolist() weeks=np.arange(0,len_outer).tolist() id_week=pd.DataFrame([(id,week) for id in ids for week in weeks]).rename(columns={0:'id', 1:'week'}).sort_values(by=['id','week']) arr_base=pd.merge(id_week, col, how='left',on=['id','week']).fillna(0) arr_frame=arr_base[col_n].groupby([arr_base['id'],arr_base['week']]).\ apply(lambda x: cut_pad(x,len_inner)).reset_index().drop('week',axis=1).set_index('id')[col_n] userarray=arr_frame.groupby(arr_frame.index).apply(np.vstack).\ apply(lambda x: x.reshape([1,x.shape[0],x.shape[1]])).sort_index() userarray_var=np.vstack(userarray.values.tolist()) return userarray.index.tolist(),userarray_var
这个feature_array函数比较复杂,我们一项一项来说。
- df是要处理的数据集。
- col_n是所选取的特征。
- week是刚刚生成的星期标识。
- len_outer和len_inner是外部时间和内部时间的序列长度。
首先我们生成一个要关注的数据集,命名为col。col选取了所关注的特征,合并了week标识,并且重新设置了索引。得到这个数据集后,每个顾客所拥有的交易记录就都在其中了。但这里有一些空缺,可能是因为某个顾客在第2周整个时间段内都没有任何购买记录。这样为了维护一个完整的表,就要手动找出这些连续某些周都缺失的值,并加以补全。
先找出所有的顾客id,然后生成一个顾客id和week的所有可能匹配(笛卡儿积)。这是最终数据应该具备的所有id-week对,我们使用它和已有的数据进行左连接,对缺失值进行补零,就得到了所需要的数据,将这个数据储存在变量arr_base中。
下一步,将这个数据的特征列根据id和week分组,并使用Lambda表达式将刚才写的cut_pad()函数应用到每一组,用来截取符合所需长度的序列,最后重设索引为id,去除week,取出相关的列,从而实现整齐化顾客的数据。
下面还需要进行一些数据形状的调整。我们依然使用Lambda表达式,首先使用numpy中的vstack()函数把每个用户的数据堆叠起来,变成二维数组,之后使用reshape()函数把形状变成三维,最后再次排序。我们维护一个关于顾客id的有序排列,以此防止在数据调整中出现乱序,维护特征和值的对应关系。这个数据集的值是一个三维的numpy数组,在顾客id维度再次对这个数组进行vstack操作,就得到了最后的数据。为了方便检查,我们在函数返回值的时候把相关的顾客id列表和处理后的特征数据一起返回。
feature_array()函数的作用是把某个特征列截取和变形为指定的形状。如果有多个特征列,就需要对每一列应用这个函数。我们将这个操作封装在函数make_data_array()中。这个函数先根据数据集中的id个数,构建一个符合形状要求的数组,然后对每个特征进行计算并填充到相应的位置,最后仍然将索引和数据一并返回。值得注意的是,代码中的变量the_ind在每次计算的时候都是被覆盖掉的,但实际上由于每次计算得到的值都是相同的,因此我们并不在意这一点,之后我们处理所有用户特征,如代码清单3-20所示。
代码清单3-20 生成用户特征数组的函数
def make_data_array(df,columns,week,len_outer,len_inner): ids_num = len(set(df.index)) df_ready = np.zeros([ids_num,len_outer,len_inner,len(columns)]) for i,item in enumerate(columns): the_ind, df_ready[:,:,:,i] = feature_array(df,item,week,len_outer,len_inner) return the_ind,df_ready
我们使用make_data_array()函数对特征数据进行处理,如代码清单3-21所示。
代码清单3-21 处理训练集用户特征
X_train_ind,X_train_data=make_data_array(X_train_scaled,columns_picked,week_train,outer_length,inner_length)
这样,特征X_train_data就准备好了。下面利用同样的方法对测试集做类似的处理,如代码清单3-22所示。
代码清单3-22 处理测试集用户特征
X_test_scaled =pd.DataFrame(scaler.transform(X_test),
columns=X_test.columns,
index=X_test.index)
y_test_log=y_test.apply(np.log)
week_test=X_test['dayth'].apply(lambda x: int(x/7)).rename('week')
X_test_ind,X_test_data=make_data_array(X_test_scaled,columns_picked,week_test,outer_length,inner_length)
完成以上步骤,数据就可以使用了。
3.2.4 模型搭建和训练
下面开始搭建模型,我们使用Keras的Functional API实现灵活组织网络层。构建一个函数build_model(),如代码清单3-23所示。
代码清单3-23 模型构造函数
def build_model(len_outer,len_inner,len_fea): filters = [64,32] kernel_size = [2,2] dropout_rate=[0.1,0] inner_input = Input(shape=(len_inner,len_fea), dtype='float32') cnn1d=inner_input for i in range(len(filters)): cnn1d = Conv1D(filters=filters[i], kernel_size=kernel_size[i], padding='valid', activation='relu', strides=1)(cnn1d) cnn1d = Dropout(dropout_rate[i])(cnn1d) lstm = LSTM(32, return_sequences=True, dropout=0.1, recurrent_dropout=0.1)(cnn1d) inner_output = LSTM(16, return_sequences=False)(lstm) inner_model = Model(inputs=inner_input, outputs=inner_output) outer_input = Input(shape=(len_outer, len_inner,len_fea), dtype='float32') innered = TimeDistributed(inner_model)(outer_input) outered=Bidirectional(LSTM(16, return_sequences=False))(innered) outered = Dense(8, activation='relu')(outered) outer_output = Dense(1)(outered) model = Model(inputs=outer_input, outputs=outer_output) model.compile(loss='mape', optimizer='adam') return model,inner_model
整个模型由里外两层构成,我们先构造内部模型。
Input层用来引入输入数据,这里输入的数据是(batch, outerlength, innerlength, featurelength)。其中第一个维度batch在Keras中默认不需要指定。对于其他3个维度,内部模型需要处理的是后两个,所以这里指定的输入数据形态是(leninner,len_fea)(这里内部变量名使用了缩写,以示区分),数据类型为float32。
然后构造卷积层cnn1d。这个层被初始化为输入层inner_input,之后添加两层卷积,卷积核的个数分别为64和32,kernel_size均为2,padding方式采用valid,激活函数为relu,步长为1。这一切可以用Keras的Conv1D来完成。两层卷积使得我们可以抽象出不同层次的序列特征。这里,我们在第一层再加上一个dropout操作,参数取0.1,即在训练时随机丢弃1/10的层内结点,以防止模型过拟合。
CNN抽取的特征是偏局部的。因此,输出向量的头部仍对应输入向量的头部,输出向量的尾部对应输入向量的尾部。LSTM非常适合进行这种时间序列信息的提取,因此我们后面续接两个LSTM层,结点个数分别为32和16。同样,对第一层LSTM,dropout取0.1,recurrent_dropout也取0.1。recurrent_dropout也是一种dropout,指的是递归结点之间的连接,用于对递归连接进行正则化处理。
这里有个需要注意的地方:第一层LSTM的return_sequences参数设为True,但第二层则设为False。因为我们希望第一层继续返回一个时间序列类型的数据,而第二层只需要最后提取成一个向量。经过适当处理后的向量就是我们内部模型的output。把输入为inner_input、输出为inner_output的部分用Model模块连接起来,就构造出内部模型了。
下面来构造外部模型。
外部模型的Input形状只比内部模型多了一个维度(lenouter, leninner,lenfea),数据类型仍然不变。我们把内部模型通过TimeDistributed应用到数据上。TimeDistributed是一个特殊的封装器,能把内部模型应用到每一个时间片上(这里的时间片指的是外部的长周期,即这里的星期)。
将这个结果赋给outered变量之后,我们外加一层双向的递归神经网络。在Keras中,这个操作在LSTM层完成,使用Bidirectional操作作用于其上即可,LSTM层使用16个结点。之后我们加上全连接层Dense,并最后使用Dense(1)获得输出结果。这里输出的是一个单一的值。从outer_input到outer_output,完成了外部网络的构建。我们使用Model这个类API实例化整个模型,之后对模型进行编译,这里代价函数使用的是mape,优化器使用的是adam。最后将整个模型作为返回值返回(这里我们把内部模型也单独返回,并查看它的结构),如代码清单3-24所示。
代码清单3-24 模型构造
LTV_model, LTV_inner_model = build_model(outer_length,inner_length,feature_len)
现在我们实例化了模型LTV_model,同时也返回了一个内部模型。现在使用summary()方法看看这两个模型的结构,如代码清单3-25所示。
代码清单3-25 查看模型结构
LTV_model.summary() LTV_inner_model.summary()
先看一下内部模型,如图3-4所示。
图3-4 内部模型结构和参数数量
图3-4中显示了两层卷积加两层LSTM,形状和参数个数列在旁边,以提示我们参数是否过多或过少。
外部模型结构如图3-5所示。
图3-5 外部模型结构和参数数量
外部模型将内部模型应用在周粒度的时间步上,之后接入双向LSTM,最后连到全连接网络,并给出输出值,这里参数的个数已经将内部模型参数包含在内了。
然后我们对这个模型进行训练。训练过程中有可能会出现过拟合的情况,我们知道训练误差随着训练的迭代,是逐渐减小的,而测试集的误差则是先减小后增大。当测试集的误差开始增大时,说明模型开始出现过拟合现象。在机器学习中,通过监测某些条件及时早停或者提前停止训练过程是一种常用的防止过拟合的方法,可以避免进一步增大泛化误差。在Keras中,我们使用EarlyStopping回调函数实现早停,它在训练过程满足指定条件时自动被调用,从而停止训练。
EarlyStopping的参数设置如下:monitor是val_loss,即测试误差;mode是min,即val_loss达到最小时触发;verbose指选择信息模式是详细还是简略;patience是耐心程度,指的是我们要等多少轮训练,如果测试误差没有继续减小,就停止训练。我们实例化一个回调条件,名为cb。以上实现如代码清单3-26所示。
代码清单3-26 模型训练和保存
cb = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=30) history = LTV_model.fit(x=X_train_data, y=y_train_log, validation_data=(X_test_data, y_test_log), epochs=200, batch_size=128, callbacks=[cb], verbose=2) LTV_model.save('LTV_model.h5')
我们使用LTV_model的fit()方法进行训练,给定训练集和测试集,训练的轮数epochs设为100。每次使用的batchsize设为128,使用[cb]作为callbacks的参数(中括号是必需的,表示参数为一个列表,这里只使用了一个元素的参数),verbose信息模式设为2。训练完毕后,把训练好的模型保存在硬盘上,命名为LTVmodel.h5。
3.2.5 模型分析
下面画一下模型训练过程中cost的变化,训练过程中的变量都存储在history变量中,调用这个词典的键值可以获得cost或者loss(我们这里是mape),如代码清单3-27所示。
代码清单3-27 训练过程可视化
plt.figure() plt.plot(history.history['val_loss'],'o',label='val_loss') plt.plot(history.history['loss'],'-',label='loss') plt.title('model loss') plt.legend() plt.axis([0,200,0,100])
图3-6中,实线代表训练集损失函数的值,点代表验证集损失函数的值。可以看到,训练误差和测试误差都随着训练迭代次数的增加而不断降低,最后趋于稳定,说明我们的模型达到了较好的效果。
图3-6 训练过程中损失的变化