155 lines
5.2 KiB
Python
155 lines
5.2 KiB
Python
import pandas as pd
|
||
import os, glob
|
||
import numpy as np
|
||
from scipy.stats import skew, kurtosis
|
||
import pywt
|
||
from scipy.signal import butter, filtfilt
|
||
|
||
# --- 1. 数据加载与预处理 ---
|
||
|
||
# 加载并合并Parquet文件
|
||
df_list = []
|
||
# 请确保这里的路径是您实际的文件路径,此处仅为示例
|
||
for i in glob.glob("./data/ef/212/212_*.parquet"):
|
||
df_list.append(pd.read_parquet(i))
|
||
df = pd.concat(df_list, ignore_index=True)
|
||
|
||
df = df.rename(columns={"HappenTime": "time", "ElectricField": "ef"})
|
||
df["time"] = pd.to_datetime(df["time"])
|
||
df = df.sort_values('time').drop_duplicates(subset='time', keep='first')
|
||
|
||
# 创建一个从头到尾每秒连续的时间索引
|
||
full_time_index = pd.date_range(start=df["time"].min(), end=df["time"].max(), freq="S")
|
||
df = df.set_index("time").reindex(full_time_index).rename_axis("time")
|
||
|
||
# --- 修正 1:处理 NaN (空值) ---
|
||
# 在进行任何信号处理之前,必须处理reindex引入的NaN值。
|
||
# 线性插值是处理时间序列数据缺失值的理想方法。
|
||
# limit_direction='both' 有助于填充开头和结尾的NaN。fillna(0)可以捕捉任何剩余的NaN。
|
||
df['ef'] = df['ef'].interpolate(method='linear', limit_direction='both').fillna(0)
|
||
df = df.reset_index()
|
||
|
||
# --- 2. 统计特征提取 ---
|
||
|
||
window_sizes = [2, 5, 10, 30, 60, 300, 600, 1200]
|
||
stat_features_list = [] # 用一个列表来存储每个窗口的特征DataFrame
|
||
|
||
# 将time设为索引以进行时间窗口操作
|
||
df_indexed = df.set_index('time').sort_index()
|
||
|
||
for win in window_sizes:
|
||
rolling_obj = df_indexed['ef'].rolling(f'{win}s', min_periods=1)
|
||
|
||
# 计算基础统计量
|
||
mean = rolling_obj.mean()
|
||
std = rolling_obj.std()
|
||
var = rolling_obj.var()
|
||
min_ = rolling_obj.min()
|
||
max_ = rolling_obj.max()
|
||
ptp_ = max_ - min_ # 极差
|
||
skew_ = rolling_obj.apply(lambda x: skew(x, bias=False), raw=False)
|
||
kurt_ = rolling_obj.apply(lambda x: kurtosis(x, bias=False), raw=False)
|
||
|
||
# 计算一阶差分的统计量
|
||
diff = df_indexed['ef'].diff()
|
||
diff_rolling = diff.rolling(f'{win}s', min_periods=1)
|
||
diff_mean = diff_rolling.mean()
|
||
diff_std = diff_rolling.std()
|
||
|
||
feature_df = pd.DataFrame({
|
||
f'mean_{win}s': mean,
|
||
f'std_{win}s': std,
|
||
f'var_{win}s': var,
|
||
f'min_{win}s': min_,
|
||
f'max_{win}s': max_,
|
||
f'ptp_{win}s': ptp_,
|
||
f'skew_{win}s': skew_,
|
||
f'kurt_{win}s': kurt_,
|
||
f'diff_mean_{win}s': diff_mean,
|
||
f'diff_std_{win}s': diff_std,
|
||
})
|
||
stat_features_list.append(feature_df)
|
||
|
||
# 将所有统计特征合并。由于它们共享相同的时间索引,会自动对齐。
|
||
stat_features_all = pd.concat(stat_features_list, axis=1)
|
||
|
||
|
||
# --- 3. 小波与谱特征提取 ---
|
||
|
||
# 辅助函数
|
||
def calc_spectral_bandwidth(cwt_matrix, freqs):
|
||
power = np.abs(cwt_matrix)**2
|
||
power_sum = np.sum(power, axis=0)
|
||
power_norm = power / (power_sum + 1e-12)
|
||
mean_freq = np.sum(power_norm * freqs[:, None], axis=0)
|
||
std_freq = np.sqrt(np.sum(power_norm * (freqs[:, None] - mean_freq)**2, axis=0))
|
||
return std_freq
|
||
|
||
def highpass_filter(data, fs, cutoff=1.0, order=4):
|
||
nyq = 0.5 * fs
|
||
normal_cutoff = cutoff / nyq
|
||
b, a = butter(order, normal_cutoff, btype='high', analog=False)
|
||
y = filtfilt(b, a, data)
|
||
return y
|
||
|
||
# 此时的信号已经没有NaN值
|
||
ef_signal = df_indexed['ef'].values
|
||
fs = 1.0
|
||
|
||
# 对整个信号计算“逐点”特征(只计算一次)
|
||
# 1. 连续小波变换 (CWT)
|
||
scales = np.arange(1, 64)
|
||
cwt_matrix, freqs = pywt.cwt(ef_signal, scales, 'morl', sampling_period=1 / fs)
|
||
|
||
# 2. 谱宽 B(n)
|
||
B_n = calc_spectral_bandwidth(cwt_matrix, freqs)
|
||
|
||
# 3. 谱宽的一阶差分 ΔB(n)
|
||
delta_B = np.diff(B_n, prepend=B_n[0])
|
||
|
||
# 4. 高频能量 E'(n)
|
||
ef_high = highpass_filter(ef_signal, fs, cutoff=0.49)
|
||
E_n = ef_high**2
|
||
|
||
# 为这些新的“逐点”特征创建一个DataFrame,并与主时间索引对齐
|
||
spectral_features_pointwise = pd.DataFrame({
|
||
'B_n': B_n,
|
||
'delta_B_abs': np.abs(delta_B),
|
||
'E_n': E_n
|
||
}, index=df_indexed.index)
|
||
|
||
|
||
# --- 4. 计算谱特征的滚动统计量 ---
|
||
spectral_features_list = []
|
||
for win in window_sizes:
|
||
rolling_obj = spectral_features_pointwise.rolling(f'{win}s', min_periods=1)
|
||
|
||
# 一次性计算所有谱特征的滚动统计量
|
||
win_features = rolling_obj.agg(['mean', 'max', 'std'])
|
||
|
||
# 将多级列名展平 (例如, ('B_n', 'mean') -> 'B_n_mean')
|
||
win_features.columns = ['_'.join(col).strip() for col in win_features.columns.values]
|
||
|
||
# 为每个列名添加窗口大小后缀
|
||
win_features = win_features.add_suffix(f'_{win}s')
|
||
|
||
spectral_features_list.append(win_features)
|
||
|
||
# 合并所有滚动谱特征
|
||
spectral_features_all = pd.concat(spectral_features_list, axis=1)
|
||
|
||
|
||
# --- 5. 最终合并与保存 ---
|
||
|
||
# 使用 'join' 方法基于索引合并所有特征,这是最安全的方式
|
||
result = df_indexed.join([stat_features_all, spectral_features_all])
|
||
result = result.reset_index() # 将时间索引恢复为 'time' 列
|
||
|
||
# 删除所有值都是NaN的列(可能在小窗口的std计算中产生)
|
||
result = result.dropna(axis=1, how='all')
|
||
|
||
# 保存最终结果到Parquet文件
|
||
print("最终DataFrame的维度:", result.shape)
|
||
result.to_parquet("212_final_corrected.parquet", index=False)
|
||
|
||
print("\n成功处理数据并保存到 '212_final_corrected.parquet'") |