Files
very_short_lightning/merge_data.py
2025-07-28 11:08:04 +08:00

155 lines
5.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import os, glob
import numpy as np
from scipy.stats import skew, kurtosis
import pywt
from scipy.signal import butter, filtfilt
# --- 1. 数据加载与预处理 ---
# 加载并合并Parquet文件
df_list = []
# 请确保这里的路径是您实际的文件路径,此处仅为示例
for i in glob.glob("./data/ef/212/212_*.parquet"):
df_list.append(pd.read_parquet(i))
df = pd.concat(df_list, ignore_index=True)
df = df.rename(columns={"HappenTime": "time", "ElectricField": "ef"})
df["time"] = pd.to_datetime(df["time"])
df = df.sort_values('time').drop_duplicates(subset='time', keep='first')
# 创建一个从头到尾每秒连续的时间索引
full_time_index = pd.date_range(start=df["time"].min(), end=df["time"].max(), freq="S")
df = df.set_index("time").reindex(full_time_index).rename_axis("time")
# --- 修正 1处理 NaN (空值) ---
# 在进行任何信号处理之前必须处理reindex引入的NaN值。
# 线性插值是处理时间序列数据缺失值的理想方法。
# limit_direction='both' 有助于填充开头和结尾的NaN。fillna(0)可以捕捉任何剩余的NaN。
df['ef'] = df['ef'].interpolate(method='linear', limit_direction='both').fillna(0)
df = df.reset_index()
# --- 2. 统计特征提取 ---
window_sizes = [2, 5, 10, 30, 60, 300, 600, 1200]
stat_features_list = [] # 用一个列表来存储每个窗口的特征DataFrame
# 将time设为索引以进行时间窗口操作
df_indexed = df.set_index('time').sort_index()
for win in window_sizes:
rolling_obj = df_indexed['ef'].rolling(f'{win}s', min_periods=1)
# 计算基础统计量
mean = rolling_obj.mean()
std = rolling_obj.std()
var = rolling_obj.var()
min_ = rolling_obj.min()
max_ = rolling_obj.max()
ptp_ = max_ - min_ # 极差
skew_ = rolling_obj.apply(lambda x: skew(x, bias=False), raw=False)
kurt_ = rolling_obj.apply(lambda x: kurtosis(x, bias=False), raw=False)
# 计算一阶差分的统计量
diff = df_indexed['ef'].diff()
diff_rolling = diff.rolling(f'{win}s', min_periods=1)
diff_mean = diff_rolling.mean()
diff_std = diff_rolling.std()
feature_df = pd.DataFrame({
f'mean_{win}s': mean,
f'std_{win}s': std,
f'var_{win}s': var,
f'min_{win}s': min_,
f'max_{win}s': max_,
f'ptp_{win}s': ptp_,
f'skew_{win}s': skew_,
f'kurt_{win}s': kurt_,
f'diff_mean_{win}s': diff_mean,
f'diff_std_{win}s': diff_std,
})
stat_features_list.append(feature_df)
# 将所有统计特征合并。由于它们共享相同的时间索引,会自动对齐。
stat_features_all = pd.concat(stat_features_list, axis=1)
# --- 3. 小波与谱特征提取 ---
# 辅助函数
def calc_spectral_bandwidth(cwt_matrix, freqs):
power = np.abs(cwt_matrix)**2
power_sum = np.sum(power, axis=0)
power_norm = power / (power_sum + 1e-12)
mean_freq = np.sum(power_norm * freqs[:, None], axis=0)
std_freq = np.sqrt(np.sum(power_norm * (freqs[:, None] - mean_freq)**2, axis=0))
return std_freq
def highpass_filter(data, fs, cutoff=1.0, order=4):
nyq = 0.5 * fs
normal_cutoff = cutoff / nyq
b, a = butter(order, normal_cutoff, btype='high', analog=False)
y = filtfilt(b, a, data)
return y
# 此时的信号已经没有NaN值
ef_signal = df_indexed['ef'].values
fs = 1.0
# 对整个信号计算“逐点”特征(只计算一次)
# 1. 连续小波变换 (CWT)
scales = np.arange(1, 64)
cwt_matrix, freqs = pywt.cwt(ef_signal, scales, 'morl', sampling_period=1 / fs)
# 2. 谱宽 B(n)
B_n = calc_spectral_bandwidth(cwt_matrix, freqs)
# 3. 谱宽的一阶差分 ΔB(n)
delta_B = np.diff(B_n, prepend=B_n[0])
# 4. 高频能量 E'(n)
ef_high = highpass_filter(ef_signal, fs, cutoff=0.49)
E_n = ef_high**2
# 为这些新的“逐点”特征创建一个DataFrame并与主时间索引对齐
spectral_features_pointwise = pd.DataFrame({
'B_n': B_n,
'delta_B_abs': np.abs(delta_B),
'E_n': E_n
}, index=df_indexed.index)
# --- 4. 计算谱特征的滚动统计量 ---
spectral_features_list = []
for win in window_sizes:
rolling_obj = spectral_features_pointwise.rolling(f'{win}s', min_periods=1)
# 一次性计算所有谱特征的滚动统计量
win_features = rolling_obj.agg(['mean', 'max', 'std'])
# 将多级列名展平 (例如, ('B_n', 'mean') -> 'B_n_mean')
win_features.columns = ['_'.join(col).strip() for col in win_features.columns.values]
# 为每个列名添加窗口大小后缀
win_features = win_features.add_suffix(f'_{win}s')
spectral_features_list.append(win_features)
# 合并所有滚动谱特征
spectral_features_all = pd.concat(spectral_features_list, axis=1)
# --- 5. 最终合并与保存 ---
# 使用 'join' 方法基于索引合并所有特征,这是最安全的方式
result = df_indexed.join([stat_features_all, spectral_features_all])
result = result.reset_index() # 将时间索引恢复为 'time' 列
# 删除所有值都是NaN的列可能在小窗口的std计算中产生
result = result.dropna(axis=1, how='all')
# 保存最终结果到Parquet文件
print("最终DataFrame的维度:", result.shape)
result.to_parquet("212_final_corrected.parquet", index=False)
print("\n成功处理数据并保存到 '212_final_corrected.parquet'")