DataFrame,数据列筛选代替遍历每一行数据去判断,大大提高数据过滤速度

编程入门 行业动态 更新时间:2024-10-26 15:17:47

DataFrame,<a href=https://www.elefans.com/category/jswz/34/1771445.html style=数据列筛选代替遍历每一行数据去判断,大大提高数据过滤速度"/>

DataFrame,数据列筛选代替遍历每一行数据去判断,大大提高数据过滤速度

运行环境

笔记本win10,i5-6200U,16G
python3.8
pandas1.3.5

程序目的

在一张接近45M的csv表格,63.7w行数据,从中提取符合时间段范围的(每天20到第二天9点的数据),并且信号数值小于一定范围的数据,接着按每天为一份,去重统计和提取这些数据的原始详细信息

最原始的运算方式,一行一行遍历去判断每个单元格

def date_clean(file: str, time_interval: tuple, distance: tuple):"""按时间区间,信号范围来提取出数据:param file: 读取csv文件:param time_interval: 开始小时和结束小时,元组类型(20,9)20点到9点:param distance: 开始结束信号,元组类型(500,30000) 500到30000:return:"""df_list = []  # 存储筛选好的数据,先存列表或字典,最后再转为DataFrame能缩短一半时间print(f"loading... {datetime.datetime.now()}")data_frame = pd.read_csv(file, encoding='utf-8', dtype="string[pyarrow]")  # 防止读取长数值时,用的是科学计数法导致精度丢失print(f"加载完毕... {datetime.datetime.now()}")start_time = datetime.time(time_interval[0], 0, 0)end_time = datetime.time(time_interval[1], 0, 0)if time_interval[0] > time_interval[1]:  # 如果时间区间是跨天for ind, row in data_frame.iterrows():if (datetime.time(time_interval[0], 0, 0) <= pd.to_datetime(row["采集时间"]).time() orpd.to_datetime(row["采集时间"]).time() <= datetime.time(time_interval[1], 0, 0)):  # 只比较小时:if distance[0] <= int(float(row["信号"])) <= distance[1]:#print(data_frame.loc[[row]])#print(row)df_list.append(row.to_list())else:passdf = data_frame.reset_index(drop=True)#df = pd.DataFrame(df_list, columns=["数据", "地点", "采集时间", "归属地", "信号"])  # to_list()后没有了表头,需要手动添加# .reset_index()列表里存的是Series,他还有一组与数组数据对应的标签索引。转为DataFrame后需要重设index#print(df)print(f"依据时间和信号值过滤的数据比为 {len(df)}/{len(data_frame)}")print(f"数据清理... {datetime.datetime.now()}")subsection_count_filt(df, time_interval, 28)def subsection_count_filt(full_data: pd.DataFrame, time_interval: tuple, times: int):"""按时间段划分为一段数据区间,在从去重后的唯一值列表去筛选出这段区间内出现的次数:param unique_data: 去重后的唯一值列表:param full_data: 全量数据:param time_interval: 时间范围区间,(开始时间,结束时间):param times: 筛选出现的次数 大于等于:return:"""full_result_list = []  # 保存统计次数的结果full_result_details = pd.DataFrame()   # 保存结果的详情内容dt = full_data["采集时间"].str.split(' ', expand=True)  # 将年月日 时分秒拆分成两列# print(dt)# full_data["年月日"] = dt[0]# full_data["时分秒"] = dt[1]day_times = dt[0].drop_duplicates().to_list()  # 获取表格中包含的日期day_times.sort()for day in day_times:  # 按时间归为一组print(day)day_time1 = datetime.datetime(int(day.split('-')[0]), int(day.split('-')[1]), int(day.split('-')[2]), time_interval[0], 0, 0)day_time2 = datetime.datetime(int(day.split('-')[0]), int(day.split('-')[1]), int(day.split('-')[2]), time_interval[1], 0, 0) + datetime.timedelta(days=1)df_day_list = []  # 分类一天的数据to_drop = []have_data = False  # 判断插入了数据#for row in range(0, len(full_data)):for index, row in full_data.iterrows():if day_time1 <= pd.to_datetime(row["采集时间"]) <= day_time2:# 一天内的数据汇总统计df_day_list.append(row.to_list())to_drop.append(index)have_data = True# 删除已不需要的后能加快速度下次遍历速度full_data = full_data.drop(to_drop)  # 快了1/3if have_data:df_day = pd.DataFrame(df_day_list, columns=["数据", "地点", "采集时间", "归属地", "信号"])one_day_counts = df_day["数据"].value_counts()  # 计数one_day_result = one_day_counts[one_day_counts >= times]  # 筛选出现次数大于n次的result = pd.DataFrame({'数据': one_day_result.index, '次数': one_day_result.values})# print(result)for ind, row in result.iterrows():full_result_list.append({"日期": day, "数据": row["数据"], "次数": row["次数"]})full_result_details = pd.concat([full_result_details, (df_day[df_day["数据"] == row["数据"]])], ignore_index=True)  # 从一天数据中检索并插入详情,concat拼接# print(full_result)full_result = pd.DataFrame(full_result_list)# 结果去重print("结果去重")print(full_result.drop_duplicates(subset="数据").reset_index())

运行时间 900s+,时间太长了,查资料说用pandas2.0之后的,可以用pyarrow来提高速率,但是2.0很多方法不适用了,影响到之前的一些脚本运行,后来仔细想了想,是遍历那里的时间复杂度太大了,提取出结果的数据,大概要遍历10亿次,决定从这里的循环逻辑做改善

取消遍历的方式,用DataFrame本身的数据筛选

之前一直难到的点是,觉得时间这一列要提取出来转换成time后才能作比较,现在找到直接转换成时间而不保留日期的函数了

def date_clean(file: str, time_interval: tuple, distance: tuple):"""按时间区间,信号范围来提取出数据:param file: 读取csv文件:param time_interval: 开始小时和结束小时,元组类型(20,9)20点到9点:param distance: 开始结束信号,元组类型(500,30000) 500到30000:return:"""df_list = []  # 存储筛选好的数据,先存列表或字典,最后再转为DataFrame能缩短一半时间print(f"loading... {datetime.datetime.now()}")data_frame = pd.read_csv(file, encoding='utf-8', dtype="string[pyarrow]")  # 防止读取长数值时,用的是科学计数法导致精度丢失print(f"加载完毕... {datetime.datetime.now()}")start_time = datetime.time(time_interval[0], 0, 0)end_time = datetime.time(time_interval[1], 0, 0)if time_interval[0] > time_interval[1]:  # 如果时间区间是跨天#  !!! 取消用一行行遍历的方式,来减少时间复杂度,直接根据列值来筛选后使用data_frame["小时间"] = pd.to_datetime(data_frame["采集时间"]).dt.time  # 只提取小时data_frame["信号"] = pd.to_numeric(data_frame["信号"])  # 转换数值行data_frame = data_frame.loc[(data_frame["小时间"] >= start_time) | (data_frame["小时间"] <= end_time)]  # 跨天时间用 或data_frame = data_frame.loc[(data_frame["信号"] >= float(distance[0])) & (data_frame["信号"] <= float(distance[1]))]data_frame.drop(['小时间'], axis=1, inplace=True)  # 删除小时else:passdf = data_frame.reset_index(drop=True)#print(df)print(f"依据时间和信号值过滤的数据比为 {len(df)}/{len(data_frame)}")print(f"数据清理... {datetime.datetime.now()}")# print(f"再次去重后 {len(df_unique)}/{find_num}")subsection_count_filt(df, time_interval, 28)def subsection_count_filt(full_data: pd.DataFrame, time_interval: tuple, times: int):"""按时间段划分为一段数据区间,在从去重后的唯一值列表去筛选出这段区间内出现的次数:param unique_data: 去重后的唯一值列表:param full_data: 全量数据:param time_interval: 时间范围区间,(开始时间,结束时间):param times: 筛选出现的次数 大于等于:return:"""full_result_list = []  # 保存统计次数的结果full_result_details = pd.DataFrame()   # 保存结果的详情内容dt = full_data["采集时间"].str.split(' ', expand=True)  # 将年月日 时分秒拆分成两列day_times = dt[0].drop_duplicates().to_list()  # 获取表格中包含的日期day_times.sort()full_data["采集时间"] = pd.to_datetime(full_data["采集时间"])  # 转换为时间格式for day in day_times:  # 按时间归为一组print(day)#  划分一组的开始时间和结束时间day_time1 = datetime.datetime(int(day.split('-')[0]), int(day.split('-')[1]), int(day.split('-')[2]), time_interval[0], 0, 0)day_time2 = datetime.datetime(int(day.split('-')[0]), int(day.split('-')[1]), int(day.split('-')[2]), time_interval[1], 0, 0) + datetime.timedelta(days=1)#  取消使用上面的遍历模式df_day = full_data.loc[(full_data["采集时间"] >= day_time1) & (full_data["采集时间"] <= day_time2)]  # 具体日期时间用 与if len(df_day) != 0:  # 有数据df_day = df_day.reset_index(drop=True)one_day_counts = df_day["数据"].value_counts()  # 计数one_day_result = one_day_counts[one_day_counts >= times]  # 筛选出现次数大于n次的result = pd.DataFrame({'数据': one_day_result.index, '次数': one_day_result.values})# print(result)for ind, row in result.iterrows():full_result_list.append({"日期": day, "数据": row["数据"], "次数": row["次数"]})full_result_details = pd.concat([full_result_details, (df_day[df_day["数据"] == row["数据"]])], ignore_index=True)  # 从一天数据中检索并插入详情,concat拼接# print(full_result)full_result = pd.DataFrame(full_result_list)# 结果去重print("结果去重")print(full_result.drop_duplicates(subset="数据").reset_index())

经过这种方法后,运算时间在30s+,快了30倍啊!!!,所以有时候运行慢,找到不是系统io读写文件这种问题后,考虑一下程序的时间复杂度

更多推荐

DataFrame,数据列筛选代替遍历每一行数据去判断,大大提高数据过滤速度

本文发布于:2023-12-03 08:44:26,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1653359.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:数据   遍历   速度   DataFrame

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!