时频分析方法使用时-频域联合分布描述时间序列信号的瞬态特征,并通过瞬时频率估计来表征信号的特征频率随时间变化的趋势,在时间序列信号处理中得到了广泛的应用。STFT 和WT等常用的时频分析方法时频分辨率较低,而且对于多分量时变信号的匹配效果不佳;WVD对噪声的鲁棒性不足且对于多分量时变信号存在交叉干扰项;EMD及其改进方法缺乏数学理论支撑,并存在端点效应和模态混叠等问题。以上时频分析方法存在一些共性问题,例如它们在时频平面的变换系数分布比较离散,瞬时频率曲线幅值能量不够集中,因此时频谱会出现模糊的现象。为了实现理想的时频表示,在原始时频谱的基础上进行能量重排是当前的研究热点。基于同步压缩变换SST的时频分析方法实现了对时频系数的压缩和重排,能够对复杂多分量信号实现高分辨率表达。同步压缩变换的前身,即重排算法,具有坚实的理论基础。重排算法作为一种后处理的时频分析方法,主要用于提升时频表达的效果,但是它最大的缺陷是不支持信号重构。在此基础上,小波的创始人之一Daubechies等在2011年提出了同步压缩小波变换SST,SST通过同步压缩算子对时频系数进行重排,将信号在时频平面任一点处的时频分布移到能量的重心位置,增强瞬时频率的能量集中,较好地解决传统时频分析方法存在的时频模糊问题。
前置知识可参考如下文章
调制信号的连续小波变换 - 哥廷根数学学派的文章 - 知乎 https://zhuanlan.zhihu.com/p/539011866
再看连续小波变换 - 哥廷根数学学派的文章 - 知乎 https://zhuanlan.zhihu.com/p/539265735
同步压缩变换初探 - 哥廷根数学学派的文章 - 知乎 https://zhuanlan.zhihu.com/p/543569766
同步压缩变换在机械振动信号处理领域[1],在微震信号处理领域[2],辐射源个体识别领域[3],雷达辐射源分选识别领域[4],储层识别领域[5],通信信号调制识别领域[6],生理信号处理领域[7]等方面都有广泛应用。
[1]基于迭代广义同步压缩变换的时变工况行星齿轮箱故障诊断
[2]基于多重同步压缩变换的微震信号去噪方法研究
[3]基于同步压缩小波变换的主信号抑制技术
[4]基于多重同步压缩变换的雷达辐射源分选识别
[5]时间域和频率域二阶同步压缩变换及其在储层识别中的应用
[6]基于同步压缩小波变换的通信信号调制识别
[7]基于物联网和云平台的老人家庭远程监护系统研究
由于很多研究者在工作中使用python语言,因为这篇文章介绍在python环境下如何更好地使用SST方法。首先安装ssqueezepy模块:pip install ssqueezepy
ssqueezepy模块里是这么介绍SST的:Synchrosqueezing is a powerful reassignment method that focuses time-frequency representations, and allows extraction of instantaneous amplitudes and frequencies.
主要包括如下内容
1.首先,看一下基础模块Benchmarks
导入相关模块
import osimport numpy as npimport gcimport pandas as pdimport scipy.signal as sig#pip install librosaimport librosafrom pywt import cwt as pcwtfrom timeit import timeit as _timeitfrom ssqueezepy import cwt, stft, ssq_cwt, ssq_stft, Waveletfrom ssqueezepy.utils import process_scales, padsignalfrom ssqueezepy.ssqueezing import _compute_associated_frequenciesdef timeit(fn, number=10): return _timeit(fn, number=number) / number
定义一些基础函数,以后都会用到
#输出名称def print_report(header, times): print(("{}
" "CWT: {:.3f} sec
" "STFT: {:.3f} sec
" "SSQ_CWT: {:.3f} sec
" "SSQ_STFT: {:.3f} sec
" ).format(header, *list(times.values())[-4:]))#CWT同步压缩变换def time_ssq_cwt(x, dtype, scales, cache_wavelet, ssq_freqs): wavelet = Wavelet(dtype=dtype) kw = dict(wavelet=wavelet, scales=scales, ssq_freqs=ssq_freqs) if cache_wavelet: for _ in range(3): # warmup run _ = ssq_cwt(x, cache_wavelet=True, **kw) del _; gc.collect() return timeit(lambda: ssq_cwt(x, cache_wavelet=cache_wavelet, **kw))#STFT同步压缩变换def time_ssq_stft(x, dtype, n_fft): for _ in range(3): _ = ssq_stft(x, dtype=dtype, n_fft=n_fft) del _; gc.collect() return timeit(lambda: ssq_stft(x, dtype=dtype, n_fft=n_fft))#CWT变换def time_cwt(x, dtype, scales, cache_wavelet): wavelet = Wavelet(dtype=dtype) if cache_wavelet: for _ in range(3): # warmup run _ = cwt(x, wavelet, scales=scales, cache_wavelet=True) del _; gc.collect() return timeit(lambda: cwt(x, wavelet, scales=scales, cache_wavelet=cache_wavelet))#STFT变换def time_stft(x, dtype, n_fft): for _ in range(3): _ = stft(x, dtype=dtype, n_fft=n_fft) del _; gc.collect() return timeit(lambda: stft(x, dtype=dtype, n_fft=n_fft))def time_all(x, dtype, scales, cache_wavelet, ssq_freqs, n_fft): num = str(len(x))[:-3] + 'k' return {num: '', f'{num}-cwt': time_cwt(x, dtype, scales, cache_wavelet), f'{num}-stft': time_stft(x, dtype, n_fft), f'{num}-ssq_cwt': time_ssq_cwt(x, dtype, scales, cache_wavelet, ssq_freqs), f'{num}-ssq_stft': time_ssq_stft(x, dtype, n_fft) }x = np.random.randn(1000)for dtype in ('float32', 'float64'): wavelet = Wavelet(dtype=dtype) _ = ssq_cwt(x, wavelet, cache_wavelet=False) _ = ssq_stft(x, dtype=dtype)del _, wavelet
设置STFT 和 CWT的 输出参数
N0, N1 = 10000, 160000 # selected such that CWT pad length ratios are samen_rows = 300n_fft = n_rows * 2 - 2wavelet = Wavelet()scales = process_scales('log-piecewise', N1, wavelet=wavelet)[:n_rows]ssq_freqs = _compute_associated_frequencies( scales, N1, wavelet, 'log-piecewise', maprange='peak', was_padded=True, dt=1, transform='cwt')kw = dict(scales=scales, ssq_freqs=ssq_freqs, n_fft=n_fft)t_all = {}
使用一个很简单的数据x = np.random.randn(N)测试用时
#基础用时print("// BASELINE (dtype=float32, cache_wavelet=True)")os.environ['SSQ_PARALLEL'] = '0'os.environ['SSQ_GPU'] = '0't_all['base'] = {}dtype = 'float32'for N in (N0, N1): x = np.random.randn(N) t_all['base'].update(time_all(x, dtype=dtype, cache_wavelet=True, **kw)) print_report(f"/ N={N}", t_all['base'])#并行+小波Parallel + wavelet cache用时print("// PARALLEL + CACHE (dtype=float32, cache_wavelet=True)")os.environ['SSQ_PARALLEL'] = '1'os.environ['SSQ_GPU'] = '0't_all['parallel'] = {}for N in (N0, N1): x = np.random.randn(N) t_all['parallel'].update(time_all(x, dtype='float32', cache_wavelet=True, **kw)) print_report(f"/ N={N}", t_all['parallel'])#GPU+小波GPU + wavelet cache用时print("// GPU + CACHE (dtype=float32, cache_wavelet=True)")os.environ['SSQ_GPU'] = '1't_all['gpu'] = {}for N in (N0, N1): x = np.random.randn(N) t_all['gpu'].update(time_all(x, dtype='float32', cache_wavelet=True, **kw)) print_report(f"/ N={N}", t_all['gpu'])
然后测试一下python的其他模块执行小波变换的耗时
#PyWavelets模块,小波变换常用的一个python模块for N in (N0, N1): x = np.random.randn(N) xp = padsignal(x) t = timeit(lambda: pcwt(xp, wavelet='cmor1.5-1.0', scales=scales, method='fft')) print("pywt_cwt-%s:" % N, t)#Scipy模块for N in (N0, N1): x = np.random.randn(N) xp = padsignal(x) t = timeit(lambda: sig.cwt(xp, wavelet=sig.morlet, widths=np.arange(4, 4 + len(scales)))) print("scipy_cwt-%s:" % N, t)#%%for N in (N0, N1): x = np.random.randn(N) t = timeit(lambda: sig.stft(x, nperseg=n_fft, nfft=n_fft, noverlap=n_fft-1)) print("scipy_stft-%s:" % N, t)#Librosa模块,语音信号分析中常用的模块# NOTE: we bench here with float64 since float32 is slower for librosa as of 0.8.0for N in (N0, N1): x = np.random.randn(N) t = timeit(lambda: librosa.stft(x, n_fft=n_fft, hop_length=1, dtype='float64')) print("librosa_stft-%s:" % N, t)
最后看一下执行结果
2.讲完了benchmarks,接下来看一下test_transforms相关内容
导入相关模块
import numpy as npimport scipy.signal as sigfrom ssqueezepy import Wavelet, TestSignalsfrom ssqueezepy.utils import window_resolution
生成测试信号
tsigs = TestSignals(N=2048)# 设置 `dft` 模式dft = (None, 'rows', 'cols')[0]tsigs.demo(dft=dft)
看一下配备了多少测试信号
还有很多很多,就不一一列举了。
还可以指定测试信号类,例如
signals = [ 'am-cosine', ('hchirp', dict(fmin=.2)), ('sine:am-cosine', (dict(f=32, phi0=1), dict(amin=.3))),]tsigs.demo(signals, N=2048)
加上`dft` 参数,就展示了相应的频谱
tsigs.demo(signals, dft='rows')#tsigs.demo(signals, dft='cols')
下面开始进行CWT变换和SST变换,使用广义morse小波,但参数不同,只列出几个图片
tsigs = TestSignals(N=2048)wavelets = [ Wavelet(('gmw', {'beta': 60})), Wavelet(('gmw', {'beta': 5})),]tsigs.wavcomp(wavelets, signals='all')
再看一个chirp信号的例子
tsigs.wavcomp(wavelets, signals=[('#echirp', dict(fmin=.1))], N=2048)
再对比一下CWT和STFT以及相应的同步压缩变换的例子
再看一下含噪声信号的例子
再简要看一下时频谱图的脊线提取的例子,以后会慢慢讲
今天就讲到这吧,精华就是这些,其他的关于小波的知识以后再讲。
详细文章见
时间序列信号处理系列-基于Python的同步压缩变换 - 哥廷根数学学派的文章 - 知乎 https://zhuanlan.zhihu.com/p/554189692
留言与评论(共有 0 条评论) “” |