基于Ray的分布式版本的决策树与随机森林

编程入门 行业动态 更新时间:2024-10-22 14:32:52

基于Ray的<a href=https://www.elefans.com/category/jswz/34/1770120.html style=分布式版本的决策树与随机森林"/>

基于Ray的分布式版本的决策树与随机森林

微信公众号:大数据高性能计算


在金融场景或者其余场景,经常我们需要进行规则或者是策略,如何通过一些算法对入模特征完成分布式化的规则切分是​必须要做的事情。这里面有两个点:一种是规则切分成可解释性的规则,天然比如决策树、随机森林等,​第二个点就是生产阶段我们的数据规模往往比较大,不是单机可以解决的,这种情况下如何完成分布式化改造应对生产需求?​围绕这两个命题我们展开下文的解决方案。

1 基于Ray的分布式版本的决策树与随机森林

1.1 Ray实现的第一版以及问题

我们基于Ray的Api 以及 sklearn的包来实现第一版本的决策树

import ray
from sklearn.datasets import load_iris
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split# Initialize Ray
ray.init()@ray.remote
def train_decision_tree(subset_x, subset_y):"""Train a decision tree on a subset of data."""clf = DecisionTreeClassifier()clf.fit(subset_x, subset_y)return clfdef distribute_training(data, labels, num_splits=4):"""Distribute training across multiple cores using Ray."""# Split the data into multiple subsetschunked_data = np.array_split(data, num_splits)chunked_labels = np.array_split(labels, num_splits)# Distribute the training tasksfutures = [train_decision_tree.remote(chunked_data[i], chunked_labels[i]) for i in range(num_splits)]# Fetch resultsmodels = ray.get(futures)return modelsif __name__ == "__main__":iris = load_iris()X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target, test_size=0.2, random_state=42)models = distribute_training(X_train, y_train)for model in models:score = model.score(X_test, y_test)print(f"Model accuracy: {score:.2f}")# Shutdown Ray
ray.shutdown()

运行之后的结果:

不过我们看下上面的代码,我们可以看到训练是使用了分布式,不过我们的load却是单机的,从实际来用的话,往往我们的数据集是非常庞大的,这时候就需要通过分布式的形式来加载数据集,之后再分布式的来进行训练.

1.2 基于Ray的分布式DecisionTree版本

这里面为了达到分布式的效果,我们准备自己mock 几个数据集文件,然后并行分布式的去读取这些文件,然后再进行并行的训练。

首先我们假设我们的数据集存在多个文件里比如csv,真实情况可能对应的就是某些数仓,下面我们将会呈现怎么进行分布式并行的加载数据,并针对决策树进行升级。

1.2.1 mock数据

mock数据写入到CSV

from sklearn.datasets import load_iris
import pandas as pd
import numpy as npdef create_mock_csv_files(num_files=4):iris = load_iris()df = pd.DataFrame(data= np.c_[iris['data'], iris['target']], columns= iris['feature_names'] + ['target'])# Split and save to CSVchunked_data = np.array_split(df, num_files)file_paths = []for i, chunk in enumerate(chunked_data):file_name = f"data{i + 1}.csv"chunk.to_csv(file_name, index=False)file_paths.append(file_name)return file_pathsfile_paths = create_mock_csv_files()
print(f"Created CSV files: {file_paths}")

1.2.2 分布式加载数据,分布式训练

import ray
import numpy as np
import pandas as pd
from sklearn.tree import DecisionTreeClassifier# Initialize Ray
ray.init()@ray.remote
def load_data(file_path):"""Load data from a given file."""# For demonstration, I'm assuming the data is in CSV format.data = pd.read_csv(file_path)X = data.drop('target', axis=1).valuesy = data['target'].valuesreturn X, y@ray.remote
def train_decision_tree(X, y):"""Train a decision tree on given data."""clf = DecisionTreeClassifier()clf.fit(X, y)return clfdef distributed_loading_and_training(file_paths):"""Distribute data loading and training across cores using Ray."""# Distribute the data loading tasksdata_futures = [load_data.remote(file) for file in file_paths]datasets = ray.get(data_futures)# Distribute the training taskstraining_futures = [train_decision_tree.remote(X, y) for X, y in datasets]models = ray.get(training_futures)return modelsif __name__ == "__main__":# Assume you have data split across 4 CSV filesfile_paths = ["data1.csv", "data2.csv", "data3.csv", "data4.csv"]models = distributed_loading_and_training(file_paths)# Note: For demonstration purposes, this example lacks testing and accuracy reporting. # You can extend it as per the previous example.# Shutdown Ray
ray.shutdown()

1.2.3 测试模型的性能

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score# ... [distributed_loading_and_training code from above] ...if __name__ == "__main__":file_paths = create_mock_csv_files()print(f"Created CSV files: {file_paths}")models = distributed_loading_and_training(file_paths)# Test the performance of one of the modelsiris = load_iris()X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target, test_size=0.2, random_state=42)predictions = models[0].predict(X_test)accuracy = accuracy_score(y_test, predictions)print(f"Model accuracy: {accuracy:.2f}")

完整的工程实现:

from sklearn.datasets import load_iris
import pandas as pd
import numpy as np
import ray
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.tree import DecisionTreeClassifierdef create_mock_csv_files(num_files=4):iris = load_iris()df = pd.DataFrame(data=np.c_[iris['data'], iris['target']], columns=iris['feature_names'] + ['target'])# Split and save to CSVchunked_data = np.array_split(df, num_files)file_paths = []for i, chunk in enumerate(chunked_data):file_name = f"data{i + 1}.csv"chunk.to_csv(file_name, index=False)file_paths.append(file_name)return file_paths@ray.remote
def load_data(file_path):"""Load data from a given file."""# For demonstration, I'm assuming the data is in CSV format.data = pd.read_csv(file_path)X = data.drop('target', axis=1).valuesy = data['target'].valuesreturn X, y@ray.remote
def train_decision_tree(X, y):"""Train a decision tree on given data."""clf = DecisionTreeClassifier()clf.fit(X, y)return clfdef distributed_loading_and_training(file_paths):"""Distribute data loading and training across cores using Ray."""# Distribute the data loading tasksdata_futures = [load_data.remote(file) for file in file_paths]datasets = ray.get(data_futures)# Distribute the training taskstraining_futures = [train_decision_tree.remote(X, y) for X, y in datasets]models = ray.get(training_futures)return models# if __name__ == "__main__":
#     # Assume you have data split across 4 CSV files
#     file_paths = ["data1.csv", "data2.csv", "data3.csv", "data4.csv"]
#
#     models = distributed_loading_and_training(file_paths)
#     # Note: For demonstration purposes, this example lacks testing and accuracy reporting.
#     # You can extend it as per the previous example.#file_paths = create_mock_csv_files()
#print(f"Created CSV files: {file_paths}")if __name__ == "__main__":file_paths = create_mock_csv_files()print(f"Created CSV files: {file_paths}")# Initialize Rayray.init()models = distributed_loading_and_training(file_paths)# Test the performance of one of the modelsiris = load_iris()X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target, test_size=0.2, random_state=42)predictions = models[0].predict(X_test)accuracy = accuracy_score(y_test, predictions)print(f"Model accuracy: {accuracy:.2f}")# Shutdown Rayray.shutdown()

问题来了,我们发现准确率比较低,只有0.33 相比于单机差多了,这是什么问题导致的呢?

这是因为我们本身的数据就比较小,只是Iris的一个子集,如果我们再切片的话,这个数据就不全,就可能造成样本偏差了,效果自然就会差许多

为了解决这个问题,我们可以引用bagging的概念,Bagging可以更好的均衡效果,解决模型偏差,提高模型准确度,一个比较好的方式就是随机森林Random Forest

1.3 基于Ray的分布式随机森林

那我们要怎么样改造我们的上面的代码呢?

首先改造训练方法,我们要使用随机森林


from sklearn.ensemble import RandomForestClassifier@ray.remotedef train_random_forest(X, y):"""Train a random forest on given data."""clf = RandomForestClassifier(n_estimators=10)  # Using 10 trees for demonstrationclf.fit(X, y)return clf

我们需要改造分布式训练以及主类

def distributed_loading_and_training(file_paths):"""Distribute data loading and training across cores using Ray."""# Distribute the data loading tasksdata_futures = [load_data.remote(file) for file in file_paths]datasets = ray.get(data_futures)# Distribute the training taskstraining_futures = [train_random_forest.remote(X, y) for X, y in datasets]models = ray.get(training_futures)return modelsif __name__ == "__main__":file_paths = create_mock_csv_files()print(f"Created CSV files: {file_paths}")models = distributed_loading_and_training(file_paths)# Testing the ensemble's performanceiris = load_iris()X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target, test_size=0.2, random_state=42)# Average the predictions from all models to get the final predictionpredictions = []for x in X_test:model_predictions = [model.predict([x])[0] for model in models]# Using majority voting for classificationfinal_prediction = max(set(model_predictions), key=model_predictions.count)predictions.append(final_prediction)accuracy = accuracy_score(y_test, predictions)print(f"Ensemble model accuracy: {accuracy:.2f}")

完整的代码如下:

from sklearn.datasets import load_iris
import pandas as pd
import numpy as np
import ray
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.ensemble import RandomForestClassifierdef create_mock_csv_files(num_files=4):iris = load_iris()df = pd.DataFrame(data=np.c_[iris['data'], iris['target']], columns=iris['feature_names'] + ['target'])# Split and save to CSVchunked_data = np.array_split(df, num_files)file_paths = []for i, chunk in enumerate(chunked_data):file_name = f"data{i + 1}.csv"chunk.to_csv(file_name, index=False)file_paths.append(file_name)return file_paths@ray.remote
def load_data(file_path):"""Load data from a given file."""# For demonstration, I'm assuming the data is in CSV format.data = pd.read_csv(file_path)X = data.drop('target', axis=1).valuesy = data['target'].valuesreturn X, y@ray.remote
def train_random_forest(X, y):"""Train a random forest on given data."""clf = RandomForestClassifier(n_estimators=10)  # Using 10 trees for demonstrationclf.fit(X, y)return clfdef distributed_loading_and_training(file_paths):"""Distribute data loading and training across cores using Ray."""# Distribute the data loading tasksdata_futures = [load_data.remote(file) for file in file_paths]datasets = ray.get(data_futures)# Distribute the training taskstraining_futures = [train_random_forest.remote(X, y) for X, y in datasets]models = ray.get(training_futures)return modelsif __name__ == "__main__":file_paths = create_mock_csv_files()print(f"Created CSV files: {file_paths}")# Initialize Rayray.init()models = distributed_loading_and_training(file_paths)# Testing the ensemble's performanceiris = load_iris()X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target, test_size=0.2, random_state=42)# Average the predictions from all models to get the final predictionpredictions = []for x in X_test:model_predictions = [model.predict([x])[0] for model in models]# Using majority voting for classificationfinal_prediction = max(set(model_predictions), key=model_predictions.count)predictions.append(final_prediction)accuracy = accuracy_score(y_test, predictions)print(f"Ensemble model accuracy: {accuracy:.2f}")# Shutdown Rayray.shutdown()

可以看到我们的效果立马好上去了。

更多推荐

基于Ray的分布式版本的决策树与随机森林

本文发布于:2023-12-04 01:20:29,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1659337.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:分布式   森林   版本   决策树   Ray

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!