Python递归实现随机森林算法

2023-04-16 00:00:00 算法 随机 递归

首先,我们需要了解随机森林算法的原理和流程。随机森林算法是一种基于决策树的集成学习方法,通过组合多个决策树来提高模型的准确度和稳定性。具体流程如下:

  1. 从样本集中随机选择若干样本(有放回抽取),构建训练集。
  2. 针对训练集中的每个样本,随机选择若干特征,构建决策树。
  3. 重复步骤1和2,构建若干个决策树。
  4. 对新数据进行分类时,将该数据带入所有决策树,然后统计每个类别的得票数,最终选择得票最多的类别作为分类结果。

接下来,我们就可以使用递归的方式实现随机森林算法:

import random

# 定义决策树节点类
class DecisionNode:
    def __init__(self, col=-1, value=None, results=None, tb=None, fb=None):
        self.col = col  # 分裂数据集的列索引
        self.value = value  # 判断值,对于非叶子节点有效
        self.results = results  # 对于叶子节点有效,保存分类结果
        self.tb = tb  # 左子树(true branch)
        self.fb = fb  # 右子树(false bracn)

# 定义随机森林算法类
class RandomForest:
    def __init__(self, n_trees=10, max_depth=5, min_sample=10, n_features=None):
        self.n_trees = n_trees  # 决策树数量
        self.max_depth = max_depth  # 决策树最大深度
        self.min_sample = min_sample  # 决策树最小样本数
        self.n_features = n_features  # 每个决策树随机选择的特征数量
        self.tree_list = []  # 决策树列表

    # 对训练集进行随机抽样
    def get_sample(self, dataset):
        n_samples = len(dataset)
        sample = []
        while len(sample) < n_samples:
            index = random.randint(0, n_samples-1)
            sample.append(dataset[index])
        return sample

    # 构建决策树
    def build_tree(self, dataset, depth):
        if len(dataset) < self.min_sample or depth == self.max_depth:
            return DecisionNode(results=get_result(dataset))
        else:
            # 随机选择分裂数据集的列
            col_list = range(len(dataset[0])-1)
            if self.n_features:
                col_list = random.sample(col_list, self.n_features)
            # 选择最佳分裂列和判断值
            best_col, best_value = get_split(dataset, col_list)
            if best_col == None:
                return DecisionNode(results=get_result(dataset))
            # 构建左右子树
            tb = [data for data in dataset if data[best_col] >= best_value]
            fb = [data for data in dataset if data[best_col] < best_value]
            left_tree = self.build_tree(tb, depth+1)
            right_tree = self.build_tree(fb, depth+1)
            return DecisionNode(col=best_col, value=best_value, tb=left_tree, fb=right_tree)

    # 训练随机森林
    def train(self, dataset):
        for i in range(self.n_trees):
            # 对训练集进行随机抽样
            sample = self.get_sample(dataset)
            # 构建决策树
            tree = self.build_tree(sample, 0)
            self.tree_list.append(tree)

    # 对新样本进行分类
    def classify(self, sample):
        result_list = {}
        for tree in self.tree_list:
            result = classify(sample, tree)
            if result in result_list:
                result_list[result] += 1
            else:
                result_list[result] = 1
        max_result = None
        max_count = 0
        for result, count in result_list.items():
            if count > max_count:
                max_result = result
                max_count = count
        return max_result

# 获取子数据集的分类结果,如果子数据集中已经只有一种类别,则返回该类别
def get_result(dataset):
    result_dict = {}
    for data in dataset:
        result = data[-1]
        if result in result_dict:
            result_dict[result] += 1
        else:
            result_dict[result] = 1
    max_result = None
    max_count = 0
    for result, count in result_dict.items():
        if count > max_count:
            max_result = result
            max_count = count
    return max_result

# 对数据集中的每个特征列进行二分查找,选择最佳的分裂列和判断值
def get_split(dataset, col_list):
    best_col = None
    best_value = None
    best_score = 0
    for col in col_list:
        column = [data[col] for data in dataset]
        value_set = set(column)
        for value in value_set:
            tb = [data for data in dataset if data[col] >= value]
            fb = [data for data in dataset if data[col] < value]
            p = float(len(tb)) / len(dataset)
            score = gini(tb) * p + gini(fb) * (1-p)
            if score > best_score:
                best_col = col
                best_value = value
                best_score = score
    return best_col, best_value

# 计算数据集的基尼值
def gini(dataset):
    result_dict = {}
    for data in dataset:
        result = data[-1]
        if result in result_dict:
            result_dict[result] += 1
        else:
            result_dict[result] = 1
    impurity = 1
    for result, count in result_dict.items():
        prob = float(count) / len(dataset)
        impurity -= prob ** 2
    return impurity

# 对新数据进行分类
def classify(sample, tree):
    if tree.results != None:
        return tree.results
    else:
        val = sample[tree.col]
        branch = None
        if val >= tree.value:
            branch = tree.tb
        else:
            branch = tree.fb
        return classify(sample, branch)

接下来,我们可以使用如下代码进行训练和分类:

# 准备样本数据
dataset = [[random.randint(0, 10) for i in range(5)] + ['Skinny'] for j in range(50)] +\
    [[random.randint(10, 20) for i in range(5)] + ['Fat'] for j in range(50)]
random.shuffle(dataset)

# 训练随机森林
rf = RandomForest(n_trees=5, n_features=2)
rf.train(dataset)

# 对新样本进行分类
sample = [11, 10, 15, 12, 16]
label = rf.classify(sample)
print(label)

输出结果为:

Fat

说明该样本被正确分类为“Fat”类别。

相关文章