From 47a6cc00e75282d5eb0c2742c435c0146875fa27 Mon Sep 17 00:00:00 2001 From: lvhao Date: Mon, 28 Jul 2025 11:08:04 +0800 Subject: [PATCH] init --- 1_get_ef_data.py | 233 +++++++++++++++++++++++++++++++++++++++++++ 1_get_ef_data_bak.py | 85 ++++++++++++++++ 2_get_flash.py | 78 +++++++++++++++ 2_get_flash_v2.py | 92 +++++++++++++++++ 3_train.py | 181 +++++++++++++++++++++++++++++++++ 3_train_v2.py | 51 ++++++++++ 3_train_v3.py | 64 ++++++++++++ 3_train_v4.py | 191 +++++++++++++++++++++++++++++++++++ 4_predict.py | 170 +++++++++++++++++++++++++++++++ 4_predict_v2.py | 170 +++++++++++++++++++++++++++++++ merge_data.py | 155 ++++++++++++++++++++++++++++ 11 files changed, 1470 insertions(+) create mode 100644 1_get_ef_data.py create mode 100644 1_get_ef_data_bak.py create mode 100644 2_get_flash.py create mode 100644 2_get_flash_v2.py create mode 100644 3_train.py create mode 100644 3_train_v2.py create mode 100644 3_train_v3.py create mode 100644 3_train_v4.py create mode 100644 4_predict.py create mode 100644 4_predict_v2.py create mode 100644 merge_data.py diff --git a/1_get_ef_data.py b/1_get_ef_data.py new file mode 100644 index 0000000..7e8d144 --- /dev/null +++ b/1_get_ef_data.py @@ -0,0 +1,233 @@ +import pandas as pd +import os, glob +import numpy as np +from scipy.stats import skew, kurtosis +import pywt +from scipy.signal import butter, filtfilt +from concurrent.futures import ProcessPoolExecutor, as_completed +from loguru import logger +import warnings +import traceback + +logger.add("1_get_ef_data.log", rotation="10 MB", retention="10 days", level="DEBUG") + +# 抑制特定的警告 +warnings.filterwarnings("ignore", category=RuntimeWarning, + message=".*Precision loss occurred in moment calculation.*") +warnings.filterwarnings("ignore", category=RuntimeWarning, + message=".*invalid value encountered in divide.*") + +def safe_skew(arr, axis=1, bias=False, batch_size=50000): + try: + logger.debug(f"safe_skew: input array shape: {arr.shape}") + var = np.var(arr, axis=axis, ddof=1 if not bias else 0) + result = np.full(arr.shape[0] if axis == 1 else arr.shape[1], np.nan) + valid_mask = var > 1e-10 + logger.debug(f"safe_skew: valid_mask count: {np.sum(valid_mask)}/{len(valid_mask)}") + + valid_indices = np.where(valid_mask)[0] + for start in range(0, len(valid_indices), batch_size): + batch_idx = valid_indices[start:start + batch_size] + if axis == 1: + sub_arr = arr[batch_idx] + else: + sub_arr = arr[:, batch_idx] + + try: + sub_skew = skew(sub_arr, axis=axis, bias=bias, nan_policy='omit') + result[batch_idx] = sub_skew + except Exception as e: + logger.warning(f"safe_skew batch error: {e}") + continue + + logger.debug(f"safe_skew: result shape: {result.shape}, nan count: {np.sum(np.isnan(result))}") + return result + except Exception as e: + logger.error(f"safe_skew error: {e}") + logger.error(traceback.format_exc()) + return np.full(arr.shape[0] if axis == 1 else arr.shape[1], np.nan) + + +def safe_kurtosis(arr, axis=1, bias=False, batch_size=50000): + try: + logger.debug(f"safe_kurtosis: input array shape: {arr.shape}") + var = np.var(arr, axis=axis, ddof=1 if not bias else 0) + result = np.full(arr.shape[0] if axis == 1 else arr.shape[1], np.nan) + valid_mask = var > 1e-10 + logger.debug(f"safe_kurtosis: valid_mask count: {np.sum(valid_mask)}/{len(valid_mask)}") + + valid_indices = np.where(valid_mask)[0] + for start in range(0, len(valid_indices), batch_size): + batch_idx = valid_indices[start:start + batch_size] + if axis == 1: + sub_arr = arr[batch_idx] + else: + sub_arr = arr[:, batch_idx] + + try: + sub_kurt = kurtosis(sub_arr, axis=axis, bias=bias, nan_policy='omit') + result[batch_idx] = sub_kurt + except Exception as e: + logger.warning(f"safe_kurtosis batch error: {e}") + continue + + logger.debug(f"safe_kurtosis: result shape: {result.shape}, nan count: {np.sum(np.isnan(result))}") + return result + except Exception as e: + logger.error(f"safe_kurtosis error: {e}") + logger.error(traceback.format_exc()) + return np.full(arr.shape[0] if axis == 1 else arr.shape[1], np.nan) + + +# --- 1. 数据加载与预处理 --- +def single_station(station_id="212"): + try: + logger.info(f"开始处理站点 {station_id}") + + # 检查输入目录是否存在 + input_dir = f"./data/ef/{station_id}" + if not os.path.exists(input_dir): + logger.error(f"输入目录不存在: {input_dir}") + return False + + # 加载并合并Parquet文件 + df_list = [] + file_pattern = f"./data/ef/{station_id}/{station_id}_*.parquet" + files = glob.glob(file_pattern) + + if not files: + logger.error(f"没有找到匹配的文件: {file_pattern}") + return False + + logger.info(f"找到 {len(files)} 个文件: {files}") + + for file_path in files: + try: + df_temp = pd.read_parquet(file_path) + df_list.append(df_temp) + logger.info(f"成功加载文件: {file_path}, 数据量: {len(df_temp)}") + except Exception as e: + logger.error(f"加载文件失败 {file_path}: {e}") + continue + + if not df_list: + logger.error(f"没有成功加载任何数据文件") + return False + + df = pd.concat(df_list, ignore_index=True) + logger.info(f"合并后数据量: {len(df)}") + + df = df.rename(columns={"HappenTime": "time", "ElectricField": "ef"}) + df["time"] = pd.to_datetime(df["time"]) # 确保'time'列为datetime类型 + df = df.sort_values('time').drop_duplicates(subset='time', keep='first') + df = df.drop_duplicates(subset='time', keep='first') + + if len(df) == 0: + logger.error(f"处理后数据为空") + return False + + # 创建一个从头到尾每秒连续的时间索引 + full_time_index = pd.date_range(start=df["time"].min(), end=df["time"].max(), freq="s") + df = df.set_index("time").reindex(full_time_index).rename_axis("time") + + # --- 修正 1:处理 NaN (空值) --- + # 在线性插值前处理reindex引入的NaN值 + df['ef'] = df['ef'].interpolate(method='linear', limit_direction='both') + df = df.reset_index() + + # --- 2. 统计特征提取 --- + + window_sizes = [2, 5, 10, 30, 60, 300, 600, 1200] + + def rolling_window(a, window): + shape = (a.size - window + 1, window) + strides = (a.strides[0], a.strides[0]) + return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides) + + stat_features_all = pd.DataFrame({'time': df['time']}) + ef = df['ef'].values + + def pad_result(res, win): + # 前面补 nan,使长度与原始数据一致 + return np.concatenate([np.full(win-1, np.nan), res]) + + for win in window_sizes: + logger.info(f"正在处理{win}") + if len(ef) < win: + # 数据太短,全部填 nan + n = len(ef) + stat_features_all[f'mean_{win}s'] = np.full(n, np.nan) + stat_features_all[f'std_{win}s'] = np.full(n, np.nan) + stat_features_all[f'var_{win}s'] = np.full(n, np.nan) + stat_features_all[f'min_{win}s'] = np.full(n, np.nan) + stat_features_all[f'max_{win}s'] = np.full(n, np.nan) + stat_features_all[f'ptp_{win}s'] = np.full(n, np.nan) + stat_features_all[f'skew_{win}s'] = np.full(n, np.nan) + stat_features_all[f'kurt_{win}s'] = np.full(n, np.nan) + stat_features_all[f'diff_mean_{win}s'] = np.full(n, np.nan) + stat_features_all[f'diff_std_{win}s'] = np.full(n, np.nan) + continue + + win_arr = rolling_window(ef, win) + stat_features_all[f'mean_{win}s'] = pad_result(win_arr.mean(axis=1), win) + stat_features_all[f'std_{win}s'] = pad_result(win_arr.std(axis=1, ddof=1), win) + stat_features_all[f'var_{win}s'] = pad_result(win_arr.var(axis=1, ddof=1), win) + stat_features_all[f'min_{win}s'] = pad_result(win_arr.min(axis=1), win) + stat_features_all[f'max_{win}s'] = pad_result(win_arr.max(axis=1), win) + stat_features_all[f'ptp_{win}s'] = pad_result(np.ptp(win_arr, axis=1), win) + stat_features_all[f'skew_{win}s'] = pad_result(safe_skew(win_arr, axis=1, bias=False), win) + stat_features_all[f'kurt_{win}s'] = pad_result(safe_kurtosis(win_arr, axis=1, bias=False), win) + + # 差分特征 + diff = np.diff(ef, prepend=ef[0]) + diff_win_arr = rolling_window(diff, win) + stat_features_all[f'diff_mean_{win}s'] = pad_result(diff_win_arr.mean(axis=1), win) + stat_features_all[f'diff_std_{win}s'] = pad_result(diff_win_arr.std(axis=1, ddof=1), win) + + save_path = f"./data/preprocessed_ef/ef_{station_id}.parquet" + os.makedirs(os.path.dirname(save_path), exist_ok=True) + logger.info(f"开始保存{save_path}") + stat_features_all.to_parquet(save_path, index=False) + logger.info(f"成功保存到 {save_path}, 数据量: {len(stat_features_all)}") + logger.info(f"数据预览:\n{stat_features_all.head()}") + + return True + + except Exception as e: + logger.error(f"处理站点 {station_id} 时发生错误: {e}") + logger.error(traceback.format_exc()) + return False + +if __name__ == "__main__": + stations = ["213","249","251","252","253","254","261","262","263","266","267","268","269","270","271","272","276","281","282","283","285","286","212"] + + success_count = 0 + failed_stations = [] + + for station_id in stations: + logger.info(f"\n{'='*50}") + logger.info(f"开始处理站点: {station_id}") + + if single_station(station_id): + success_count += 1 + logger.info(f"站点 {station_id} 处理成功") + else: + failed_stations.append(station_id) + logger.error(f"站点 {station_id} 处理失败") + + logger.info(f"\n{'='*50}") + logger.info(f"处理完成!") + logger.info(f"成功处理: {success_count}/{len(stations)} 个站点") + + if failed_stations: + logger.error(f"失败站点: {failed_stations}") + + # 检查输出目录中的文件 + output_dir = "./data/preprocessed_ef" + if os.path.exists(output_dir): + output_files = [f for f in os.listdir(output_dir) if f.endswith('.parquet')] + logger.info(f"输出目录中共有 {len(output_files)} 个parquet文件") + for file in output_files: + logger.info(f" - {file}") + else: + logger.error(f"输出目录不存在: {output_dir}") diff --git a/1_get_ef_data_bak.py b/1_get_ef_data_bak.py new file mode 100644 index 0000000..917b479 --- /dev/null +++ b/1_get_ef_data_bak.py @@ -0,0 +1,85 @@ +import pandas as pd +import os +import glob +import numpy as np + +# --- 1. 数据加载与预处理 --- + +# 加载并合并Parquet文件 +df_list: list[pd.DataFrame] = [] +# 请确保这里的路径是您实际的文件路径,此处仅为示例 +for i in glob.glob("./data/ef/212/212_*.parquet"): + df_list.append(pd.read_parquet(i)) +df: pd.DataFrame = pd.concat(df_list, ignore_index=True) + +# 重命名列并转换时间格式 +df = df.rename(columns={"HappenTime": "time", "ElectricField": "ef"}) +df["time"] = pd.to_datetime(df["time"]) +df = df.sort_values('time').drop_duplicates(subset='time', keep='first') + +# 创建一个从头到尾每秒连续的时间索引 +full_time_index: pd.DatetimeIndex = pd.date_range(start=df["time"].min(), end=df["time"].max(), freq="s") +df = df.set_index("time").reindex(full_time_index).rename_axis("time") + +# --- 修正 1:处理 NaN (空值) --- +# 在线性插值前处理reindex引入的NaN值 +df['ef'] = df['ef'].interpolate(method='linear', limit_direction='both') +df = df.reset_index() + +# --- 2. 统计特征提取 --- + +window_sizes: list[int] = [2, 5, 10, 30, 60, 300, 600, 1200] + + +def rolling_window(a: np.ndarray, window: int) -> np.ndarray: + """ + 创建滚动窗口数组 + + Args: + a: 输入数组 + window: 窗口大小 + + Returns: + 滚动窗口数组 + """ + shape: tuple[int, int] = (a.size - window + 1, window) + strides: tuple[int, int] = (a.strides[0], a.strides[0]) + return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides) + + +def pad_result(res: np.ndarray, win: int) -> np.ndarray: + """ + 在前面补 nan,使长度与原始数据一致 + + Args: + res: 结果数组 + win: 窗口大小 + + Returns: + 填充后的数组 + """ + return np.concatenate([np.full(win-1, np.nan), res]) + + +# 初始化特征DataFrame +stat_features_all: pd.DataFrame = pd.DataFrame({'time': df['time']}) +ef: np.ndarray = df['ef'].values + +# 只计算mean特征 +for win in window_sizes: + if len(ef) < win: + # 数据太短,全部填 nan + n: int = len(ef) + stat_features_all[f'mean_{win}s'] = np.full(n, np.nan) + continue + + # 计算滚动窗口均值 + win_arr: np.ndarray = rolling_window(ef, win) + stat_features_all[f'mean_{win}s'] = pad_result(win_arr.mean(axis=1), win) + +# 保存结果 +save_path: str = "./data/preprocessed_ef/ef_212.parquet" +os.makedirs(os.path.dirname(save_path), exist_ok=True) +stat_features_all.to_parquet(save_path, index=True) +print(f"Saved to {save_path}") +print(stat_features_all.head()) diff --git a/2_get_flash.py b/2_get_flash.py new file mode 100644 index 0000000..54c6113 --- /dev/null +++ b/2_get_flash.py @@ -0,0 +1,78 @@ +import pandas as pd + +import os +df = pd.read_parquet("./data/flash/FLASH_2024.parquet") + +''' + 时间 微秒 纬度 经度 电流 回击 ... 误差椭圆长半轴方向 标志 陡度 时间差 RawID GUID +0 1/1/2024 03:16:40 5888618 25.850505 106.476598 -17.1 1 ... 118 0 0 0 1050961381,1050961380; 165612266 +1 1/1/2024 03:42:33 7173002 25.684971 106.972500 73.1 1 ... 104 0 0 0 1050979230,1050979226,1050979231;1050979227,10... 165612269 +2 1/1/2024 04:11:28 1344128 25.544373 106.873714 45.7 1 ... 87 0 0 0 1050991636,1050991653,1050991654;1050991651,10... 165612271 +3 1/1/2024 04:37:00 2312000 26.191353 105.803525 -12.9 1 ... 118 0 0 0 1051001690,1051001680; 165612285 +4 1/1/2024 05:22:22 1754035 26.291300 106.622184 -29.1 1 ... 0 0 0 0 1051017776,1051017786,1051017785; 165612299 +''' +# 只保留需要的列并解析时间 +df = df[['时间', '微秒', '纬度', '经度', '电流']].copy() + +# 解析时间列,将微秒信息合并到时间中 +df['时间'] = pd.to_datetime(df['时间'], format='%d/%m/%Y %H:%M:%S') + pd.to_timedelta(df['微秒'], unit='us') + + + +lon = 108.068889 +lat = 28.118333 + + +# 将时间列转换为datetime格式 +df['时间'] = pd.to_datetime(df['时间']) + +# 创建整5秒的时间范围(即每个时间点的秒数都为0或5的倍数) +start_time = df['时间'].min().replace(microsecond=0) +if start_time.second % 5 != 0: + start_time = start_time + pd.Timedelta(seconds=(5 - start_time.second % 5)) +end_time = df['时间'].max().replace(microsecond=0) +if end_time.second % 5 != 0: + end_time = end_time - pd.Timedelta(seconds=(end_time.second % 5)) +time_range = pd.date_range(start=start_time, end=end_time, freq='5S') + +# 向量化加速:先筛选地理范围,再用pandas的rolling count方法高效统计时间窗口内的闪电次数 + +# 先筛选地理范围(0.15度内) +geo_mask = ( + (df['纬度'] >= lat - 0.15) & (df['纬度'] <= lat + 0.15) & + (df['经度'] >= lon - 0.15) & (df['经度'] <= lon + 0.15) +) +df_geo = df[geo_mask].copy() + +# 按时间排序 +df_geo = df_geo.sort_values('时间').reset_index(drop=True) + +# 设置时间为索引 +df_geo = df_geo.set_index('时间') + +# 用pandas的rolling+count方法,先构造一个以秒为频率的时间序列 +full_time_index = pd.date_range(start=time_range.min(), end=time_range.max(), freq='5S') +# 新建一个Series,所有闪电事件置1,其他为0 +flash_series = pd.Series(1, index=df_geo.index) +# 重新索引到完整时间轴,NaN填0 +flash_series = flash_series.reindex(full_time_index, fill_value=0) + +# 计算未来30分钟(1800秒)内的闪电次数 +# rolling的窗口是“右闭左开”,所以要用window=1801(含当前点) +flash_count = flash_series[::-1].rolling(window=1801, min_periods=1).sum()[::-1] + +# 只保留每5秒的时间点 +result_df = pd.DataFrame({ + 'time': full_time_index, + 'flash_count': flash_count.values +}) + +print("结果DataFrame的前10行:") +print(result_df.head(10)) +print(f"\n总时间点数:{len(result_df)}") +print(f"最大闪电次数:{result_df['flash_count'].max()}") + + +save_path = "./data/preprocessed_flash/212/flash_count.csv" +os.makedirs(os.path.dirname(save_path), exist_ok=True) +result_df.to_csv(save_path, index=False) diff --git a/2_get_flash_v2.py b/2_get_flash_v2.py new file mode 100644 index 0000000..200ce73 --- /dev/null +++ b/2_get_flash_v2.py @@ -0,0 +1,92 @@ +import pandas as pd +from tqdm import tqdm +import os +df = pd.read_parquet("./data/flash/FLASH_2024.parquet") + +''' + 时间 微秒 纬度 经度 电流 回击 ... 误差椭圆长半轴方向 标志 陡度 时间差 RawID GUID +0 1/1/2024 03:16:40 5888618 25.850505 106.476598 -17.1 1 ... 118 0 0 0 1050961381,1050961380; 165612266 +1 1/1/2024 03:42:33 7173002 25.684971 106.972500 73.1 1 ... 104 0 0 0 1050979230,1050979226,1050979231;1050979227,10... 165612269 +2 1/1/2024 04:11:28 1344128 25.544373 106.873714 45.7 1 ... 87 0 0 0 1050991636,1050991653,1050991654;1050991651,10... 165612271 +3 1/1/2024 04:37:00 2312000 26.191353 105.803525 -12.9 1 ... 118 0 0 0 1051001690,1051001680; 165612285 +4 1/1/2024 05:22:22 1754035 26.291300 106.622184 -29.1 1 ... 0 0 0 0 1051017776,1051017786,1051017785; 165612299 +''' +# 只保留需要的列并解析时间 +df = df[['时间', '微秒', '纬度', '经度', '电流']].copy() + +# 解析时间列,只保留日期和时分秒(不需要微秒) +# 先将'时间'列转换为datetime对象,格式为日/月/年 时:分:秒 +df['时间'] = pd.to_datetime(df['时间'], format='%d/%m/%Y %H:%M:%S') + + +df_station_id = pd.read_csv("station_id.csv") +EFID = df_station_id["EFID"].to_list() +Longitude = df_station_id["Longitude"].to_list() +Latitude = df_station_id["Latitude"].to_list() + +for id,lat,lon in zip(EFID,Latitude,Longitude): + + # 2. 定义目标点和范围 (与之前相同) + target_lon = lon + target_lat = lat + degree_range = 0.15 + + # 3. 筛选在指定经纬度范围内的点 (与之前相同) + df_filtered = df[ + (df['经度'] >= target_lon - degree_range) & + (df['经度'] <= target_lon + degree_range) & + (df['纬度'] >= target_lat - degree_range) & + (df['纬度'] <= target_lat + degree_range) + ].copy() + + # 4. 核心逻辑修正:使用 resample 和 rolling + if df_filtered.empty: + # 输出未找到雷电数据的提示,并显示经纬度范围 + lon_min: float = target_lon - degree_range # 计算经度下限 + lon_max: float = target_lon + degree_range # 计算经度上限 + lat_min: float = target_lat - degree_range # 计算纬度下限 + lat_max: float = target_lat + degree_range # 计算纬度上限 + print(f"{id}在指定范围内(经度: {lon_min:.5f}~{lon_max:.5f}, 纬度: {lat_min:.5f}~{lat_max:.5f})没有找到雷电数据。") + else: + # 将时间设置为索引 + df_filtered.set_index('时间', inplace=True) + + # 创建一个表示雷电事件的 Series,每个事件计为 1 + # 然后按5秒的间隔重采样,并对每个间隔内的事件求和 + # 这会得到一个时间序列,其中索引是每5秒,值是该5秒内发生的雷电次数 + lightning_counts_per_5s = df_filtered.resample('5S').size() + + # 使用滚动窗口计算未来30分钟的雷电数 + # window='30min' 定义了窗口大小 + # 为了实现“未来”30分钟的计数,我们使用一个技巧: + # 1. 将时间序列反转 (iloc[::-1]) + # 2. 使用一个标准的向后看的滚动窗口 + # 3. 再将结果反转回来 + # 这样就等效于一个向前看的滚动窗口 + # closed='right' 确保窗口包含其起始点(在反转前) + future_counts = lightning_counts_per_5s.iloc[::-1].rolling(window='30min', closed='right').sum().iloc[::-1] + + # 上一步的结果只包含有雷电的5秒间隔。现在我们需要创建一个完整的时间范围。 + # 定义我们关心的完整时间范围的开始和结束 + start_time = df_filtered.index.min().floor('D') # 从数据第一天的零点开始 + end_time = df_filtered.index.max().ceil('D') # 到数据最后一天的零点结束 + + # 创建一个从开始到结束,每隔5秒一个点的完整时间索引 + full_time_index = pd.date_range(start=start_time, end=end_time, freq='5S') + + # 将我们计算出的结果填充到这个完整的时间索引上 + # reindex 会自动对齐索引,没有值的地方会填充为 NaN,我们再用 fillna(0) 填充为 0 + final_result = future_counts.reindex(full_time_index).fillna(0).astype(int) + + # 重命名列并计算flash_count的分布 + result_df = final_result.reset_index() + result_df.columns = ['time', 'flash_count'] + + # 计算flash_count列中每个数值的数量 + flash_count_distribution = result_df['flash_count'].value_counts().sort_index() + print("\nflash_count列中每个数值的数量:") + print(flash_count_distribution) + + # 您也可以将结果保存到文件 + os.makedirs(f'data/preprocessed_flash/{id}/',exist_ok=True) + result_df.to_parquet(f'data/preprocessed_flash/{id}/flash_{id}.parquet') \ No newline at end of file diff --git a/3_train.py b/3_train.py new file mode 100644 index 0000000..c970cde --- /dev/null +++ b/3_train.py @@ -0,0 +1,181 @@ +import pandas as pd +import lightgbm as lgb +from sklearn.model_selection import train_test_split +from sklearn.metrics import mean_squared_error, r2_score, classification_report, balanced_accuracy_score +import numpy as np +import os +import pickle +from imblearn.over_sampling import SMOTE +from imblearn.pipeline import Pipeline # 引入 imblearn Pipeline 以防止数据泄露 + +# --------------------------------------------------------------------------- +# --- 主流程 --- +# --------------------------------------------------------------------------- +df_flash = pd.read_parquet("data/preprocessed_flash/212/flash_212.parquet") +df_ef = pd.read_parquet("data/preprocessed_ef/212/ef_212.parquet") +df_ef['time'] = pd.to_datetime(df_ef['time']) +df_flash['time'] = pd.to_datetime(df_flash['time']) +merged_df = pd.merge(df_flash, df_ef, on='time', how='inner') +merged_df = merged_df.drop('time', axis=1) + +print("数据框形状:", merged_df.shape) +print(merged_df.head()) + +# 分离特征和目标变量 +# 确保列名是字符串,以避免 LightGBM 的问题 +X = merged_df.drop('flash_count', axis=1) +X.columns = ["".join (c if c.isalnum() else "_" for c in str(x)) for x in X.columns] + +y = merged_df['flash_count'] + +# 划分训练集和测试集 +X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.2, random_state=42, stratify=(y > 0) # 使用stratify保证训练集和测试集中0/1比例相似 +) + +# --------------------------------------------------------------------------- +# --- 步骤 1: 训练不平衡分类器 (使用SMOTE) --- +# --------------------------------------------------------------------------- +print("\n--- 步骤 1: 正在训练分类模型 (使用 SMOTE) ---") + +# 创建二元目标变量 (0 vs >0) +y_train_binary = (y_train > 0).astype(int) +y_test_binary = (y_test > 0).astype(int) + +# 定义分类器 +classifier = lgb.LGBMClassifier(n_estimators=100, random_state=42) + +# 定义SMOTE +# n_jobs=-1 表示使用所有可用的CPU核心 +smote = SMOTE(random_state=42,) + +# 使用 imblearn 的 Pipeline +# 这是使用SMOTE的标准做法,可以防止SMOTE在交叉验证时对验证集进行过采样,避免数据泄露 +smote_pipeline = Pipeline([ + ('smote', smote), + ('classifier', classifier) +]) + +# 训练分类器 +print("正在对训练数据进行SMOTE过采样并训练分类器...") +smote_pipeline.fit(X_train, y_train_binary) +print("分类器训练完成。") + +# --------------------------------------------------------------------------- +# --- 步骤 2: 训练回归器 (仅在非零数据上) --- +# --------------------------------------------------------------------------- +print("\n--- 步骤 2: 正在训练回归模型 (仅在非零数据上) ---") + +# 定义回归器 +regressor = lgb.LGBMRegressor(n_estimators=150, learning_rate=0.05, random_state=42) + +# 筛选出训练集中的非零数据 +mask_pos_train = (y_train > 0) +X_train_pos = X_train[mask_pos_train] +y_train_pos = y_train[mask_pos_train] + +# 检查是否有非零数据可供训练 +if X_train_pos.shape[0] > 0: + # 对目标变量进行 log1p 转换,以处理右偏分布 + y_train_pos_log = np.log1p(y_train_pos) + + print(f"正在使用 {X_train_pos.shape[0]} 个非零样本训练回归器...") + regressor.fit(X_train_pos, y_train_pos_log) + print("回归器训练完成。") +else: + print("警告:训练集中没有非零数据,回归器未被训练。") + + +# --------------------------------------------------------------------------- +# --- 模型保存 --- +# --------------------------------------------------------------------------- +os.makedirs("model", exist_ok=True) +# 分别保存两个模型 +with open("model/212_classifier_model.pkl", "wb") as f: + pickle.dump(smote_pipeline, f) +print("\n分类器模型已保存到 model/212_classifier_model.pkl") + +with open("model/212_regressor_model.pkl", "wb") as f: + pickle.dump(regressor, f) +print("回归器模型已保存到 model/212_regressor_model.pkl") + + +# --------------------------------------------------------------------------- +# --- 预测与评估 --- +# --------------------------------------------------------------------------- +print("\n--- 正在加载模型并进行预测 ---") +# 加载模型 (用于演示,实际应用中可以在新脚本中加载) +with open("model/212_classifier_model.pkl", "rb") as f: + loaded_classifier_pipeline = pickle.load(f) +with open("model/212_regressor_model.pkl", "rb") as f: + loaded_regressor = pickle.load(f) + +# --- 组合预测 --- +# 1. 分类器预测为正类的概率 +prob_positive = loaded_classifier_pipeline.predict_proba(X_test)[:, 1] + +# 2. 回归器预测数值 (对数尺度) +# 如果回归器被训练过,则进行预测 +if hasattr(loaded_regressor, 'n_features_in_'): + predictions_log = loaded_regressor.predict(X_test) + # 转换回原始尺度 + predictions_pos = np.expm1(predictions_log) +else: + # 如果回归器未被训练,则预测为0 + predictions_pos = np.zeros(X_test.shape[0]) + +# 3. 最终预测 = 概率 * 预测值 +final_predictions = prob_positive * predictions_pos + + +# --- 分类器评估 --- +print("\n--- 内部二元分类器评估 (使用SMOTE) ---") +y_pred_binary = loaded_classifier_pipeline.predict(X_test) +print(classification_report(y_test_binary, y_pred_binary, target_names=['是 零 (class 0)', '非 零 (class 1)'])) +print(f"平衡准确率 (Balanced Accuracy): {balanced_accuracy_score(y_test_binary, y_pred_binary):.4f}") + + +# --- 最终回归任务评估 --- +print("\n--- 最终回归任务评估 ---") +mse = mean_squared_error(y_test, final_predictions) +rmse = np.sqrt(mse) +r2 = r2_score(y_test, final_predictions) +print(f"均方误差 (MSE): {mse:.4f}") +print(f"均方根误差 (RMSE): {rmse:.4f}") +print(f"决定系数 (R²): {r2:.4f}") + +# --------------------------------------------------------------------------- +# --- 获取并打印特征重要性 --- +# --------------------------------------------------------------------------- +print("\n--- 特征重要性排序 ---") + +# 从 Pipeline 中提取分类器 +final_classifier = smote_pipeline.named_steps['classifier'] + +# 分类器的特征重要性 +clf_imp = pd.DataFrame({ + 'feature': X_train.columns, + 'importance_classifier': final_classifier.feature_importances_ +}) + +# 回归器的特征重要性 +if hasattr(regressor, 'n_features_in_'): + reg_imp = pd.DataFrame({ + 'feature': X_train_pos.columns, # 使用训练回归器时的列名 + 'importance_regressor': regressor.feature_importances_ + }) + # 合并两个重要性DataFrame + importances = pd.merge(clf_imp, reg_imp, on='feature', how='outer').fillna(0) + # 将重要性转换为整数类型以便查看 + importances['importance_classifier'] = importances['importance_classifier'].astype(int) + importances['importance_regressor'] = importances['importance_regressor'].astype(int) +else: + importances = clf_imp + importances['importance_regressor'] = 0 + +print("\n分类器重要性 (用于预测'零'或'非零'):") +print(importances.sort_values('importance_classifier', ascending=False).head(10)) + +if hasattr(regressor, 'n_features_in_'): + print("\n回归器重要性 (用于预测'非零值'的大小):") + print(importances.sort_values('importance_regressor', ascending=False).head(10)) \ No newline at end of file diff --git a/3_train_v2.py b/3_train_v2.py new file mode 100644 index 0000000..ed52a23 --- /dev/null +++ b/3_train_v2.py @@ -0,0 +1,51 @@ +import pandas as pd +import lightgbm as lgb +from sklearn.model_selection import train_test_split +from sklearn.metrics import mean_squared_error, r2_score, classification_report, balanced_accuracy_score +import numpy as np +import os +import pickle +from imblearn.over_sampling import SMOTE +from imblearn.pipeline import Pipeline # 引入 imblearn Pipeline 以防止数据泄露 + +# --------------------------------------------------------------------------- +# --- 主流程 --- +# --------------------------------------------------------------------------- +df_flash = pd.read_parquet("data/preprocessed_flash/212/flash_212.parquet") +df_ef = pd.read_parquet("data/preprocessed_ef/212/ef_212.parquet") +df_ef['time'] = pd.to_datetime(df_ef['time']) +df_flash['time'] = pd.to_datetime(df_flash['time']) +merged_df = pd.merge(df_flash, df_ef, on='time', how='inner') +merged_df = merged_df.drop('time', axis=1) + +print("数据框形状:", merged_df.shape) +print(merged_df.head()) + +# 分离特征和目标变量 +# 确保列名是字符串,以避免 LightGBM 的问题 +X = merged_df.drop('flash_count', axis=1) +X.columns = ["".join (c if c.isalnum() else "_" for c in str(x)) for x in X.columns] + +y = merged_df['flash_count'] + +# 划分训练集和测试集 +X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.2, random_state=42, stratify=(y > 0) # 使用stratify保证训练集和测试集中0/1比例相似 +) + + +# 定义回归器 +regressor = lgb.LGBMRegressor(n_estimators=150, learning_rate=0.05, random_state=42) + +regressor.fit(X_train, y_train) + + +# --------------------------------------------------------------------------- +# --- 模型保存 --- +# --------------------------------------------------------------------------- +os.makedirs("model", exist_ok=True) + +with open("model/212_regressor_model.pkl", "wb") as f: + pickle.dump(regressor, f) +print("回归器模型已保存到 model/212_regressor_model.pkl") + diff --git a/3_train_v3.py b/3_train_v3.py new file mode 100644 index 0000000..322d2ed --- /dev/null +++ b/3_train_v3.py @@ -0,0 +1,64 @@ +import pandas as pd +import lightgbm as lgb +from sklearn.model_selection import train_test_split +from sklearn.metrics import mean_squared_error, r2_score, classification_report, balanced_accuracy_score +import numpy as np +import os +import pickle +from imblearn.over_sampling import SMOTE +from imblearn.pipeline import Pipeline # 引入 imblearn Pipeline 以防止数据泄露 + +# --------------------------------------------------------------------------- +# --- 主流程 --- +# --------------------------------------------------------------------------- +df_flash = pd.read_parquet("data/preprocessed_flash/212/flash_212.parquet") +df_ef = pd.read_parquet("data/preprocessed_ef/212/ef_212.parquet") +df_ef['time'] = pd.to_datetime(df_ef['time']) +df_flash['time'] = pd.to_datetime(df_flash['time']) +merged_df = pd.merge(df_flash, df_ef, on='time', how='inner') +merged_df = merged_df.drop('time', axis=1) + +print("数据框形状:", merged_df.shape) +print(merged_df.head()) + +# 分离特征和目标变量 +# 确保列名是字符串,以避免 LightGBM 的问题 +X = merged_df.drop('flash_count', axis=1) +X.columns = ["".join (c if c.isalnum() else "_" for c in str(x)) for x in X.columns] + +y = merged_df['flash_count'] + +# 1. 先训练分类模型,判断flash_count是否为0 +y_cls = (y > 0).astype(int) # 0为无闪电,1为有闪电 + +X_train_cls, X_test_cls, y_train_cls, y_test_cls = train_test_split( + X, y_cls, test_size=0.2, random_state=42, stratify=y_cls +) + +classifier = lgb.LGBMClassifier(n_estimators=100, learning_rate=0.05, random_state=42) +classifier.fit(X_train_cls, y_train_cls) + +# 2. 只用flash_count>0的数据训练回归模型 +X_reg = X[y > 0] +y_reg = y[y > 0] + +X_train_reg, X_test_reg, y_train_reg, y_test_reg = train_test_split( + X_reg, y_reg, test_size=0.2, random_state=42 +) + +regressor = lgb.LGBMRegressor(n_estimators=150, learning_rate=0.05, random_state=42) +regressor.fit(X_train_reg, y_train_reg) + +# --------------------------------------------------------------------------- +# --- 模型保存 --- +# --------------------------------------------------------------------------- +os.makedirs("model", exist_ok=True) + +with open("model/212_classifier_model.pkl", "wb") as f: + pickle.dump(classifier, f) +print("分类器模型已保存到 model/212/212_classifier_model.pkl") + +with open("model/212_regressor_model.pkl", "wb") as f: + pickle.dump(regressor, f) +print("回归器模型已保存到 model/212/212_regressor_model.pkl") + diff --git a/3_train_v4.py b/3_train_v4.py new file mode 100644 index 0000000..c40b652 --- /dev/null +++ b/3_train_v4.py @@ -0,0 +1,191 @@ +import os +import pickle + +import lightgbm as lgb +import numpy as np +import pandas as pd +import xgboost as xgb +from sklearn.metrics import (accuracy_score, balanced_accuracy_score, + classification_report, mean_absolute_error, + mean_squared_error, r2_score) +from sklearn.model_selection import train_test_split + + +def load_station_ids(station_csv_path: str) -> list: + """ + 读取所有站点的EFID。 + + Args: + station_csv_path (str): 站点信息csv文件路径 + + Returns: + list: EFID列表 + """ + # 读取站点信息 + df_station: pd.DataFrame = pd.read_csv(station_csv_path) + # 获取EFID列并转为列表 + efid_list: list = df_station["EFID"].tolist() + return efid_list + +def load_and_merge_data_for_station(efid: int) -> pd.DataFrame | None: + """ + 加载指定站点的flash和ef数据,并按时间合并,若有一个不存在则返回None。 + + Args: + efid (int): 站点EFID + + Returns: + pd.DataFrame | None: 合并后的数据框,若数据缺失则为None + """ + # 构建文件路径 + flash_path: str = f"data/preprocessed_flash/{efid}/flash_{efid}.parquet" + ef_path: str = f"data/preprocessed_ef/ef_{efid}.parquet" + # 检查文件是否存在 + if not (os.path.exists(flash_path) and os.path.exists(ef_path)): + # 如果有一个文件不存在,返回None + print(f"站点{efid}的flash或ef数据不存在,跳过。") + return None + # 读取数据 + df_flash: pd.DataFrame = pd.read_parquet(flash_path) + df_ef: pd.DataFrame = pd.read_parquet(ef_path) + # 转换时间列为datetime + df_flash['time'] = pd.to_datetime(df_flash['time']) + df_ef['time'] = pd.to_datetime(df_ef['time']) + # 合并数据 + merged_df: pd.DataFrame = pd.merge(df_flash, df_ef, on='time', how='inner') + # 删除time列 + merged_df = merged_df.drop('time', axis=1) + # 增加站点标识列 + merged_df['EFID'] = efid + return merged_df + +def prepare_full_dataset(station_csv_path: str) -> pd.DataFrame: + """ + 合并所有站点的flash和ef数据,跳过缺失的站点。 + + Args: + station_csv_path (str): 站点信息csv文件路径 + + Returns: + pd.DataFrame: 合并后的大数据框 + """ + # 获取所有站点EFID + efid_list: list = load_station_ids(station_csv_path) + # 存储所有合并后的数据 + merged_list: list = [] + # 遍历所有站点 + for efid in efid_list: + merged_df: pd.DataFrame | None = load_and_merge_data_for_station(efid) + if merged_df is not None: + merged_list.append(merged_df) + # 合并所有站点数据 + if not merged_list: + raise ValueError("没有可用的站点数据。") + full_df: pd.DataFrame = pd.concat(merged_list, ignore_index=True) + return full_df + +# --------------------------------------------------------------------------- +# --- 主流程 --- +# --------------------------------------------------------------------------- +if __name__ == "__main__": + # 读取并合并所有站点数据 + # station_id.csv为站点信息文件 + full_merged_df: pd.DataFrame = prepare_full_dataset("station_id.csv") + + print("合并后数据框形状:", full_merged_df.shape) + print(full_merged_df.head()) + + # 分离特征和目标变量 + # 确保列名是字符串,以避免 LightGBM 的问题 + X: pd.DataFrame = full_merged_df.drop('flash_count', axis=1) + if "EFID" in X.columns: + X = X.drop("EFID", axis=1) + X.columns = ["".join(c if c.isalnum() else "_" for c in str(x)) for x in X.columns] + + y: pd.Series = full_merged_df['flash_count'] + + # 1. 先训练分类模型,判断flash_count是否为0 + y_cls: pd.Series = (y > 0).astype(int) # 0为无闪电,1为有闪电 + + X_train_cls, X_test_cls, y_train_cls, y_test_cls = train_test_split( + X, y_cls, test_size=0.2, random_state=42, stratify=y_cls + ) + + + # 1. 训练分类模型,判断flash_count是否为0 + # 由于数据量较大,适当增加树的数量,调整max_depth防止过拟合 + classifier: xgb.XGBClassifier = xgb.XGBClassifier( + n_estimators=300, # 增加树的数量以提升表现 + learning_rate=0.05, # 学习率 + max_depth=6, # 控制树的最大深度,防止过拟合 + subsample=0.8, # 随机采样部分样本,提升泛化能力 + colsample_bytree=0.8, # 随机采样部分特征,提升泛化能力 + random_state=42, # 随机种子 + use_label_encoder=False, # 关闭label encoder警告 + eval_metric='logloss', # 评估指标 + n_jobs=-1 # 使用所有CPU核心加速训练 + ) + # 拟合分类器 + classifier.fit(X_train_cls, y_train_cls) + + # 2. 只用flash_count>0的数据训练回归模型 + # 选取flash_count大于0的样本作为回归数据 + # 注意:X[y > 0] 可能返回DataFrame或Series,需确保类型正确 + X_reg = X.loc[y > 0] # 保证返回DataFrame + y_reg = y.loc[y > 0] # 保证返回Series + + # 划分回归训练集和测试集 + X_train_reg, X_test_reg, y_train_reg, y_test_reg = train_test_split( + X_reg, y_reg, test_size=0.2, random_state=42 + ) + + # 创建XGBoost回归器,设置参数适应大数据量 + regressor: xgb.XGBRegressor = xgb.XGBRegressor( + n_estimators=400, # 增加树的数量以提升表现 + learning_rate=0.05, # 学习率 + max_depth=6, # 控制树的最大深度 + subsample=0.8, # 随机采样部分样本 + colsample_bytree=0.8, # 随机采样部分特征 + random_state=42, # 随机种子 + n_jobs=-1 # 使用所有CPU核心加速训练 + ) + # 拟合回归器 + regressor.fit(X_train_reg, y_train_reg) + + # --------------------------------------------------------------------------- + # --- 模型保存 --- + # --------------------------------------------------------------------------- + os.makedirs("model", exist_ok=True) + + with open("model/classifier_model.pkl", "wb") as f: + pickle.dump(classifier, f) + print("分类器模型已保存到 model/classifier_model.pkl") + + with open("model/regressor_model.pkl", "wb") as f: + pickle.dump(regressor, f) + print("回归器模型已保存到 model/regressor_model.pkl") + + # ===== + # 性能评估 + # =========================================== + # 分类器预测 + y_pred_cls: list[int] = classifier.predict(X_test_cls) + # 计算准确率 + cls_accuracy: float = accuracy_score(y_test_cls, y_pred_cls) + print(f"分类器在测试集上的准确率: {cls_accuracy:.4f}") + # 输出详细分类报告 + print("分类器详细分类报告:") + print(classification_report(y_test_cls, y_pred_cls, digits=4)) + + # 评估回归器在测试集上的性能 + # 回归器只在flash_count>0的样本上评估 + y_pred_reg: list[float] = regressor.predict(X_test_reg) + # 计算均方误差 + reg_mse: float = mean_squared_error(y_test_reg, y_pred_reg) + # 计算平均绝对误差 + reg_mae: float = mean_absolute_error(y_test_reg, y_pred_reg) + # 计算R2分数 + reg_r2: float = r2_score(y_test_reg, y_pred_reg) + print(f"回归器在测试集上的均方误差(MSE): {reg_mse:.4f}") + print(f"回归器在测试集上的平均绝对误差(MAE): {reg_mae:.4f}") + print(f"回归器在测试集上的R2分数: {reg_r2:.4f}") diff --git a/4_predict.py b/4_predict.py new file mode 100644 index 0000000..0777fe3 --- /dev/null +++ b/4_predict.py @@ -0,0 +1,170 @@ +import pandas as pd +import pickle +import os +import glob +import numpy as np +from scipy.stats import skew, kurtosis +from loguru import logger + +# --- 配置参数 (与原脚本保持一致) --- +WINDOW_SIZES = [2, 5, 10, 30, 60, 300, 600, 1200] +# 为了计算1200秒的窗口,我们需要至少20分钟的数据。为安全起见,我们加载约25分钟的数据。 +# 假设每个CSV文件包含5分钟数据,加载最新的5个文件。 +NUM_FILES_TO_LOAD = 5 + +def get_current_features(station_id: str) -> tuple[pd.DataFrame, pd.Timestamp] | tuple[None, None]: + """ + 加载指定站点的最新实时数据,并计算最后一个时间点的滚动窗口特征。 + + Args: + station_id (str): 站点ID,用于定位数据目录。 + + Returns: + tuple[pd.DataFrame, pd.Timestamp] | tuple[None, None]: + 包含单个时间点及其所有特征的DataFrame和最新时间点,如果数据不足则返回(None, None)。 + """ + logger.info(f"\n--- 开始为站点 {station_id} 计算当前特征 ---") + + # 1. 加载近期数据 + file_list: list[str] = glob.glob(f"./realtime_data/{station_id}/*.csv") + if not file_list: + logger.info(f"错误: 在 './realtime_data/{station_id}/' 目录下未找到任何CSV文件。") + return None, None + + file_list = sorted(file_list) + files_to_process: list[str] = file_list[-NUM_FILES_TO_LOAD:] + + logger.info(f"准备加载最新的 {len(files_to_process)} 个文件: {files_to_process}") + + df_list: list[pd.DataFrame] = [pd.read_csv(f) for f in files_to_process] + if not df_list: + logger.info("错误: 未能成功读取任何文件。") + return None, None + + df: pd.DataFrame = pd.concat(df_list, ignore_index=True) + logger.info(df.head()) + # 2. 数据预处理 + df["time"] = pd.to_datetime(df["time"], format="%Y-%m-%d %H:%M:%S") + df = df.sort_values('time').drop_duplicates(subset='time', keep='first') + logger.info("line49 df, %s", df) + if df.empty: + logger.info("错误: 处理后的数据为空。") + return None, None + + # 创建连续时间索引并插值 + full_time_index: pd.DatetimeIndex = pd.date_range(start=df["time"].min(), end=df["time"].max(), freq="s") + df = df.set_index("time").reindex(full_time_index).rename_axis("time") + df['ef'] = df['ef'].interpolate(method='linear', limit_direction='both') + df = df.reset_index() + + # 3. 为最后一个时间点计算特征 + latest_time: pd.Timestamp = df['time'].iloc[-1] + logger.info(f"数据已预处理完毕,将为最新时间点 {latest_time} 计算特征。") + + ef_values: np.ndarray = df['ef'].values + diff_values: np.ndarray = np.diff(ef_values, prepend=ef_values[0]) + + current_features: dict = {'time': latest_time} + + for win in WINDOW_SIZES: + # 检查是否有足够的数据来形成窗口 + if len(ef_values) < win: + logger.info(f"数据长度 ({len(ef_values)}s) 不足以计算 {win}s 窗口,特征将填充为NaN。") + current_features[f'mean_{win}s'] = np.nan + current_features[f'std_{win}s'] = np.nan + current_features[f'var_{win}s'] = np.nan + current_features[f'min_{win}s'] = np.nan + current_features[f'max_{win}s'] = np.nan + current_features[f'ptp_{win}s'] = np.nan + current_features[f'skew_{win}s'] = np.nan + current_features[f'kurt_{win}s'] = np.nan + current_features[f'diff_mean_{win}s'] = np.nan + current_features[f'diff_std_{win}s'] = np.nan + continue + + # 只取信号末尾的 `win` 个点进行计算 + last_window_ef: np.ndarray = ef_values[-win:] + last_window_diff: np.ndarray = diff_values[-win:] + + # 计算统计特征 + current_features[f'mean_{win}s'] = last_window_ef.mean() + current_features[f'std_{win}s'] = last_window_ef.std(ddof=1) + current_features[f'var_{win}s'] = last_window_ef.var(ddof=1) + current_features[f'min_{win}s'] = last_window_ef.min() + current_features[f'max_{win}s'] = last_window_ef.max() + current_features[f'ptp_{win}s'] = np.ptp(last_window_ef) + current_features[f'skew_{win}s'] = skew(last_window_ef, bias=False) + current_features[f'kurt_{win}s'] = kurtosis(last_window_ef, bias=False) + + # 计算差分特征 + current_features[f'diff_mean_{win}s'] = last_window_diff.mean() + current_features[f'diff_std_{win}s'] = last_window_diff.std(ddof=1) + + # 4. 返回单行DataFrame + return pd.DataFrame([current_features]), latest_time + + +if __name__ == "__main__": + station_id_list: list[str] = ["212"] + + logger.info("--- 正在创建演示用的实时数据文件 ---") + demo_data_path: str = "./realtime_data/212/" + os.makedirs(demo_data_path, exist_ok=True) + + # 循环处理每个需要计算的站点 + for sid in station_id_list: + # 调用核心函数,获取特征和最新时间 + latest_features_df, latest_time = get_current_features(sid) + if latest_features_df is None or latest_time is None: + logger.info("未能获取到有效的特征数据,跳过该站点。") + continue + + # 只保留特征,不包含'time'列 + features_for_pred: pd.DataFrame = latest_features_df.drop('time', axis=1) + + # 1. 先加载分类模型,判断是否为0 + classifier_model_path: str = "model/212_classifier_model.pkl" + if not os.path.exists(classifier_model_path): + logger.error(f"分类模型文件不存在: {classifier_model_path}") + continue + + with open(classifier_model_path, "rb") as f: + classifier_model = pickle.load(f) + # 预测分类结果 + try: + classifier_pred: int | float = classifier_model.predict(features_for_pred)[0] + except Exception as e: + logger.error(f"分类模型预测出错: {e}") + continue + + logger.info(f"分类模型预测结果: {classifier_pred}") + + # 2. 如果分类结果为0,则回归预测为0,否则调用回归模型 + if classifier_pred == 0: + prediction: int = 0 + logger.info("分类结果为0,回归预测直接设为0。") + else: + regressor_model_path: str = "model/212_regressor_model.pkl" + if not os.path.exists(regressor_model_path): + logger.error(f"回归模型文件不存在: {regressor_model_path}") + continue + with open(regressor_model_path, "rb") as f: + regressor_model = pickle.load(f) + try: + prediction: float = regressor_model.predict(features_for_pred)[0] + except Exception as e: + logger.error(f"回归模型预测出错: {e}") + continue + logger.info(f"回归模型预测结果: {prediction}") + + # 3. 结果写入 + result: pd.DataFrame = pd.DataFrame({'time': [latest_time], 'prediction': [prediction]}) + file_name: str = f"result/212/{latest_time:%Y%m%d%H}.csv" + os.makedirs(os.path.dirname(file_name), exist_ok=True) + with open(file_name, 'a', encoding='utf-8') as f: + if f.tell() == 0: + header_string: str = ",".join(result.columns) + "\n" + f.write(header_string) + f.write(result.to_csv(index=False, header=False)) + + print(f"结果已通过文本追加方式写入: {file_name}") \ No newline at end of file diff --git a/4_predict_v2.py b/4_predict_v2.py new file mode 100644 index 0000000..42f9217 --- /dev/null +++ b/4_predict_v2.py @@ -0,0 +1,170 @@ +import pandas as pd +import pickle +import os +import glob +import numpy as np +from scipy.stats import skew, kurtosis +from loguru import logger + +# --- 配置参数 (与原脚本保持一致) --- +WINDOW_SIZES = [2, 5, 10, 30, 60, 300, 600, 1200] +# 为了计算1200秒的窗口,我们需要至少20分钟的数据。为安全起见,我们加载约25分钟的数据。 +# 假设每个CSV文件包含5分钟数据,加载最新的5个文件。 +NUM_FILES_TO_LOAD = 5 + +def get_current_features(station_id: str) -> tuple[pd.DataFrame, pd.Timestamp] | tuple[None, None]: + """ + 加载指定站点的最新实时数据,并计算最后一个时间点的滚动窗口特征。 + + Args: + station_id (str): 站点ID,用于定位数据目录。 + + Returns: + tuple[pd.DataFrame, pd.Timestamp] | tuple[None, None]: + 包含单个时间点及其所有特征的DataFrame和最新时间点,如果数据不足则返回(None, None)。 + """ + logger.info(f"\n--- 开始为站点 {station_id} 计算当前特征 ---") + + # 1. 加载近期数据 + file_list: list[str] = glob.glob(f"./realtime_data/{station_id}/*.csv") + if not file_list: + logger.info(f"错误: 在 './realtime_data/{station_id}/' 目录下未找到任何CSV文件。") + return None, None + + file_list = sorted(file_list) + files_to_process: list[str] = file_list[-NUM_FILES_TO_LOAD:] + + logger.info(f"准备加载最新的 {len(files_to_process)} 个文件: {files_to_process}") + + df_list: list[pd.DataFrame] = [pd.read_csv(f) for f in files_to_process] + if not df_list: + logger.info("错误: 未能成功读取任何文件。") + return None, None + + df: pd.DataFrame = pd.concat(df_list, ignore_index=True) + logger.info(df.head()) + # 2. 数据预处理 + df["time"] = pd.to_datetime(df["time"], format="%Y-%m-%d %H:%M:%S") + df = df.sort_values('time').drop_duplicates(subset='time', keep='first') + logger.info("line49 df, %s", df) + if df.empty: + logger.info("错误: 处理后的数据为空。") + return None, None + + # 创建连续时间索引并插值 + full_time_index: pd.DatetimeIndex = pd.date_range(start=df["time"].min(), end=df["time"].max(), freq="s") + df = df.set_index("time").reindex(full_time_index).rename_axis("time") + df['ef'] = df['ef'].interpolate(method='linear', limit_direction='both') + df = df.reset_index() + + # 3. 为最后一个时间点计算特征 + latest_time: pd.Timestamp = df['time'].iloc[-1] + logger.info(f"数据已预处理完毕,将为最新时间点 {latest_time} 计算特征。") + + ef_values: np.ndarray = df['ef'].values + diff_values: np.ndarray = np.diff(ef_values, prepend=ef_values[0]) + + current_features: dict = {'time': latest_time} + + for win in WINDOW_SIZES: + # 检查是否有足够的数据来形成窗口 + if len(ef_values) < win: + logger.info(f"数据长度 ({len(ef_values)}s) 不足以计算 {win}s 窗口,特征将填充为NaN。") + current_features[f'mean_{win}s'] = np.nan + current_features[f'std_{win}s'] = np.nan + current_features[f'var_{win}s'] = np.nan + current_features[f'min_{win}s'] = np.nan + current_features[f'max_{win}s'] = np.nan + current_features[f'ptp_{win}s'] = np.nan + current_features[f'skew_{win}s'] = np.nan + current_features[f'kurt_{win}s'] = np.nan + current_features[f'diff_mean_{win}s'] = np.nan + current_features[f'diff_std_{win}s'] = np.nan + continue + + # 只取信号末尾的 `win` 个点进行计算 + last_window_ef: np.ndarray = ef_values[-win:] + last_window_diff: np.ndarray = diff_values[-win:] + + # 计算统计特征 + current_features[f'mean_{win}s'] = last_window_ef.mean() + current_features[f'std_{win}s'] = last_window_ef.std(ddof=1) + current_features[f'var_{win}s'] = last_window_ef.var(ddof=1) + current_features[f'min_{win}s'] = last_window_ef.min() + current_features[f'max_{win}s'] = last_window_ef.max() + current_features[f'ptp_{win}s'] = np.ptp(last_window_ef) + current_features[f'skew_{win}s'] = skew(last_window_ef, bias=False) + current_features[f'kurt_{win}s'] = kurtosis(last_window_ef, bias=False) + + # 计算差分特征 + current_features[f'diff_mean_{win}s'] = last_window_diff.mean() + current_features[f'diff_std_{win}s'] = last_window_diff.std(ddof=1) + + # 4. 返回单行DataFrame + return pd.DataFrame([current_features]), latest_time + + +if __name__ == "__main__": + # 获取realtime_data文件夹下所有子文件夹的名称,作为station_id_list + # 1. 使用glob模块获取realtime_data目录下的所有内容 + all_paths: list = glob.glob("realtime_data/*") + # 2. 只保留目录(即文件夹),并提取其名称 + station_id_list: list = [os.path.basename(path) for path in all_paths if os.path.isdir(path)] + print(f"station_id_list:{station_id_list}") + # 循环处理每个需要计算的站点 + for sid in station_id_list: + # 调用核心函数,获取特征和最新时间 + latest_features_df, latest_time = get_current_features(sid) + if latest_features_df is None or latest_time is None: + logger.info("未能获取到有效的特征数据,跳过该站点。") + continue + + # 只保留特征,不包含'time'列 + features_for_pred: pd.DataFrame = latest_features_df.drop('time', axis=1) + + # 1. 先加载分类模型,判断是否为0 + classifier_model_path: str = "model/classifier_model.pkl" + if not os.path.exists(classifier_model_path): + logger.error(f"分类模型文件不存在: {classifier_model_path}") + continue + + with open(classifier_model_path, "rb") as f: + classifier_model = pickle.load(f) + # 预测分类结果 + try: + classifier_pred: int | float = classifier_model.predict(features_for_pred)[0] + except Exception as e: + logger.error(f"分类模型预测出错: {e}") + continue + + logger.info(f"分类模型预测结果: {classifier_pred}") + + # 2. 如果分类结果为0,则回归预测为0,否则调用回归模型 + if classifier_pred == 0: + prediction: int = 0 + logger.info("分类结果为0,回归预测直接设为0。") + else: + regressor_model_path: str = "model/regressor_model.pkl" + if not os.path.exists(regressor_model_path): + logger.error(f"回归模型文件不存在: {regressor_model_path}") + continue + with open(regressor_model_path, "rb") as f: + regressor_model = pickle.load(f) + try: + prediction: float = regressor_model.predict(features_for_pred)[0] + except Exception as e: + logger.error(f"回归模型预测出错: {e}") + continue + logger.info(f"回归模型预测结果: {prediction}") + + # 3. 结果写入 + result: pd.DataFrame = pd.DataFrame({'time': [latest_time], 'prediction': [prediction]}) + file_name: str = f"result/{sid}/{latest_time:%Y%m%d%H}.csv" + os.makedirs(os.path.dirname(file_name), exist_ok=True) + with open(file_name, 'a', encoding='utf-8') as f: + if f.tell() == 0: + header_string: str = ",".join(result.columns) + "\n" + f.write(header_string) + f.write(result.to_csv(index=False, header=False)) + + print(f"结果已通过文本追加方式写入: {file_name}") \ No newline at end of file diff --git a/merge_data.py b/merge_data.py new file mode 100644 index 0000000..cd0a17a --- /dev/null +++ b/merge_data.py @@ -0,0 +1,155 @@ +import pandas as pd +import os, glob +import numpy as np +from scipy.stats import skew, kurtosis +import pywt +from scipy.signal import butter, filtfilt + +# --- 1. 数据加载与预处理 --- + +# 加载并合并Parquet文件 +df_list = [] +# 请确保这里的路径是您实际的文件路径,此处仅为示例 +for i in glob.glob("./data/ef/212/212_*.parquet"): + df_list.append(pd.read_parquet(i)) +df = pd.concat(df_list, ignore_index=True) + +df = df.rename(columns={"HappenTime": "time", "ElectricField": "ef"}) +df["time"] = pd.to_datetime(df["time"]) +df = df.sort_values('time').drop_duplicates(subset='time', keep='first') + +# 创建一个从头到尾每秒连续的时间索引 +full_time_index = pd.date_range(start=df["time"].min(), end=df["time"].max(), freq="S") +df = df.set_index("time").reindex(full_time_index).rename_axis("time") + +# --- 修正 1:处理 NaN (空值) --- +# 在进行任何信号处理之前,必须处理reindex引入的NaN值。 +# 线性插值是处理时间序列数据缺失值的理想方法。 +# limit_direction='both' 有助于填充开头和结尾的NaN。fillna(0)可以捕捉任何剩余的NaN。 +df['ef'] = df['ef'].interpolate(method='linear', limit_direction='both').fillna(0) +df = df.reset_index() + +# --- 2. 统计特征提取 --- + +window_sizes = [2, 5, 10, 30, 60, 300, 600, 1200] +stat_features_list = [] # 用一个列表来存储每个窗口的特征DataFrame + +# 将time设为索引以进行时间窗口操作 +df_indexed = df.set_index('time').sort_index() + +for win in window_sizes: + rolling_obj = df_indexed['ef'].rolling(f'{win}s', min_periods=1) + + # 计算基础统计量 + mean = rolling_obj.mean() + std = rolling_obj.std() + var = rolling_obj.var() + min_ = rolling_obj.min() + max_ = rolling_obj.max() + ptp_ = max_ - min_ # 极差 + skew_ = rolling_obj.apply(lambda x: skew(x, bias=False), raw=False) + kurt_ = rolling_obj.apply(lambda x: kurtosis(x, bias=False), raw=False) + + # 计算一阶差分的统计量 + diff = df_indexed['ef'].diff() + diff_rolling = diff.rolling(f'{win}s', min_periods=1) + diff_mean = diff_rolling.mean() + diff_std = diff_rolling.std() + + feature_df = pd.DataFrame({ + f'mean_{win}s': mean, + f'std_{win}s': std, + f'var_{win}s': var, + f'min_{win}s': min_, + f'max_{win}s': max_, + f'ptp_{win}s': ptp_, + f'skew_{win}s': skew_, + f'kurt_{win}s': kurt_, + f'diff_mean_{win}s': diff_mean, + f'diff_std_{win}s': diff_std, + }) + stat_features_list.append(feature_df) + +# 将所有统计特征合并。由于它们共享相同的时间索引,会自动对齐。 +stat_features_all = pd.concat(stat_features_list, axis=1) + + +# --- 3. 小波与谱特征提取 --- + +# 辅助函数 +def calc_spectral_bandwidth(cwt_matrix, freqs): + power = np.abs(cwt_matrix)**2 + power_sum = np.sum(power, axis=0) + power_norm = power / (power_sum + 1e-12) + mean_freq = np.sum(power_norm * freqs[:, None], axis=0) + std_freq = np.sqrt(np.sum(power_norm * (freqs[:, None] - mean_freq)**2, axis=0)) + return std_freq + +def highpass_filter(data, fs, cutoff=1.0, order=4): + nyq = 0.5 * fs + normal_cutoff = cutoff / nyq + b, a = butter(order, normal_cutoff, btype='high', analog=False) + y = filtfilt(b, a, data) + return y + +# 此时的信号已经没有NaN值 +ef_signal = df_indexed['ef'].values +fs = 1.0 + +# 对整个信号计算“逐点”特征(只计算一次) +# 1. 连续小波变换 (CWT) +scales = np.arange(1, 64) +cwt_matrix, freqs = pywt.cwt(ef_signal, scales, 'morl', sampling_period=1 / fs) + +# 2. 谱宽 B(n) +B_n = calc_spectral_bandwidth(cwt_matrix, freqs) + +# 3. 谱宽的一阶差分 ΔB(n) +delta_B = np.diff(B_n, prepend=B_n[0]) + +# 4. 高频能量 E'(n) +ef_high = highpass_filter(ef_signal, fs, cutoff=0.49) +E_n = ef_high**2 + +# 为这些新的“逐点”特征创建一个DataFrame,并与主时间索引对齐 +spectral_features_pointwise = pd.DataFrame({ + 'B_n': B_n, + 'delta_B_abs': np.abs(delta_B), + 'E_n': E_n +}, index=df_indexed.index) + + +# --- 4. 计算谱特征的滚动统计量 --- +spectral_features_list = [] +for win in window_sizes: + rolling_obj = spectral_features_pointwise.rolling(f'{win}s', min_periods=1) + + # 一次性计算所有谱特征的滚动统计量 + win_features = rolling_obj.agg(['mean', 'max', 'std']) + + # 将多级列名展平 (例如, ('B_n', 'mean') -> 'B_n_mean') + win_features.columns = ['_'.join(col).strip() for col in win_features.columns.values] + + # 为每个列名添加窗口大小后缀 + win_features = win_features.add_suffix(f'_{win}s') + + spectral_features_list.append(win_features) + +# 合并所有滚动谱特征 +spectral_features_all = pd.concat(spectral_features_list, axis=1) + + +# --- 5. 最终合并与保存 --- + +# 使用 'join' 方法基于索引合并所有特征,这是最安全的方式 +result = df_indexed.join([stat_features_all, spectral_features_all]) +result = result.reset_index() # 将时间索引恢复为 'time' 列 + +# 删除所有值都是NaN的列(可能在小窗口的std计算中产生) +result = result.dropna(axis=1, how='all') + +# 保存最终结果到Parquet文件 +print("最终DataFrame的维度:", result.shape) +result.to_parquet("212_final_corrected.parquet", index=False) + +print("\n成功处理数据并保存到 '212_final_corrected.parquet'") \ No newline at end of file