import numpy as np import random # sigmoid阶跃函数 def sigmoid(inX): # return 1.0 / (1 + exp(-inX)) return 1.0 / (1 + np.exp(-inX)) # 随机梯度上升算法(随机化) def stocGradAscent1(dataMatIn, classLabels, numIter=150): m, n = np.shape(dataMatIn) # 创建与列数相同的矩阵的系数矩阵,1行3列 weights = np.ones(n) # 随机梯度, 循环150,观察是否收敛 for j in range(numIter): # [0, 1, 2 .. m-1] dataIndex = list(range(m)) for i in range(m): # i和j的不断增大,导致alpha的值不断减少,但是不为0 alpha = 4 / (1.0 + j + i) + 0.01 # alpha 会随着迭代不断减小,但永远不会减小到0,因为后边还有一个常数项0.0001 # 随机产生一个 0~len()之间的一个值 # random.uniform(x, y) 方法将随机生成下一个实数,它在[x,y]范围内,x是这个范围内的最小值,y是这个范围内的最大值。 randIndex = int(random.uniform(0, len(dataIndex))) # sum(dataMatrix[i]*weights)为了求 f(x)的值, f(x)=a1*x1+b2*x2+..+nn*xn h = sigmoid(sum(dataMatIn[dataIndex[randIndex]] * weights)) error = classLabels[dataIndex[randIndex]] - h weights = weights + alpha * error * dataMatIn[dataIndex[randIndex]] del (dataIndex[randIndex]) return weights # 分类函数,根据回归系数和特征向量来计算 Sigmoid的值 def classifyVector(inX, weights): ''' Desc: 最终的分类函数,根据回归系数和特征向量来计算 Sigmoid 的值,大于0.5函数返回1,否则返回0 Args: inX -- 特征向量,features weights -- 根据梯度下降/随机梯度下降 计算得到的回归系数 Returns: 如果 prob 计算大于 0.5 函数返回 1 否则返回 0 ''' prob = sigmoid(sum(inX * weights)) if prob > 0.5: return 1.0 else: return 0.0 # 打开测试集和训练集,并对数据进行格式化处理 def colicTest(): ''' Desc: 打开测试集和训练集,并对数据进行格式化处理 Args: None Returns: errorRate -- 分类错误率 ''' frTrain = open('horseColicTraining.txt') frTest = open('horseColicTest.txt') trainingSet = [] trainingLabels = [] # 解析训练数据集中的数据特征和Labels # trainingSet 中存储训练数据集的特征,trainingLabels 存储训练数据集的样本对应的分类标签 for line in frTrain.readlines(): currLine = line.strip().split('\t') lineArr = [] for i in range(21): lineArr.append(float(currLine[i])) trainingSet.append(lineArr) trainingLabels.append(float(currLine[21])) # 使用 改进后的 随机梯度下降算法 求得在此数据集上的最佳回归系数 trainWeights trainWeights = stocGradAscent1(np.array(trainingSet), trainingLabels, 500) errorCount = 0 numTestVec = 0.0 # 读取 测试数据集 进行测试,计算分类错误的样本条数和最终的错误率 for line in frTest.readlines(): numTestVec += 1.0 currLine = line.strip().split('\t') lineArr = [] for i in range(21): lineArr.append(float(currLine[i])) if int(classifyVector(np.array(lineArr), trainWeights)) != int(currLine[21]): errorCount += 1 errorRate = (float(errorCount) / numTestVec) print("the error rate of this test is: %f" % errorRate) return errorRate # 调用 colicTest() 10次并求结果的平均值 def multiTest(): numTests = 10 errorSum = 0.0 for k in range(numTests): errorSum += colicTest() print("after %d iterations the average error rate is: %f" % (numTests, errorSum / float(numTests))) multiTest()