使用本网站的FIND_PEAKS检测绘制尖峰期间,从电光数据帧读取列表值的问题/错误
问题描述
假设我有以下 pandas 数据帧随时间包含value
或date
:
import pandas as pd
pdf = pd.DataFrame(data={'date':['2020-10-16','2020-10-17','2020-10-18','2020-10-19','2020-10-20','2020-10-21','2020-10-22','2020-10-23','2020-10-24','2020-10-25','2020-10-26','2020-10-27','2020-10-28','2020-10-29','2020-10-30','2020-10-31','2020-11-01','2020-11-02','2020-11-03','2020-11-04','2020-11-05','2020-11-06','2020-11-07','2020-11-08','2020-11-09','2020-11-10','2020-11-11','2020-11-12','2020-11-13','2020-11-14','2020-11-15'],
'value':[161967, 161270, 148508, 152442, 157504, 157118, 155674, 134522, 213384, 163242, 217415, 221502, 146267, 143621, 145875, 139488, 104466, 94825, 143686, 151952, 161074, 161417, 135042, 148768, 131428, 127816, 151905, 180498, 177899, 193950, 12]})
pdf
或者我有下面的电光数据框有类似的数据:
import pyspark.sql.types
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Row, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType, DateType
dict = [ ('2020-10-16', 161967),
('2020-10-17', 161270),
('2020-10-18', 148508),
('2020-10-19', 152442),
('2020-10-20', 157504),
('2020-10-21', 157118),
('2020-10-22', 155674),
('2020-10-23', 134522),
('2020-10-24', 213384),
('2020-10-25', 163242),
('2020-10-26', 217415),
('2020-10-27', 221502),
('2020-10-28', 146267),
('2020-10-29', 143621),
('2020-10-30', 145875),
('2020-10-31', 139488),
('2020-11-01', 104466),
('2020-11-02', 94825),
('2020-11-03', 143686),
('2020-11-04', 151952),
('2020-11-05', 161074),
('2020-11-06', 161417),
('2020-11-07', 135042),
('2020-11-08', 148768),
('2020-11-09', 131428),
('2020-11-10', 127816),
('2020-11-11', 151905),
('2020-11-12', 180498),
('2020-11-13', 177899),
('2020-11-14', 193950),
('2020-11-15', 12),
]
schema = StructType([
StructField("date", StringType(), True),
StructField("value", IntegerType(), True),
])
#create a Spark dataframe
sc= SparkContext()
sqlContext = SQLContext(sc)
sdf = sqlContext.createDataFrame(data=dict,schema=schema)
sdf.printSchema()
sdf.sort('date').show(truncate = False)
我受此启发answer通过以下代码检测峰谷:
from scipy.signal import find_peaks
import numpy as np
import matplotlib.pyplot as plt
# Input signal from Pandas dataframe
t = pdf.date
x = pdf.value
# Set thresholds
# std calculated on 10-90 percentile data, without outliers is used for threshold
thresh_top = np.median(x) + 1 * np.std(x)
thresh_bottom = np.median(x) - 1 * np.std(x)
# Find indices of peaks & of valleys (from inverting the signal)
peak_idx, _ = find_peaks(x, height = thresh_top)
valley_idx, _ = find_peaks(-x, height = -thresh_bottom)
# Plot signal
plt.figure(figsize=(14,12))
plt.plot(t, x , color='b', label='data')
plt.scatter(t, x, s=10,c='b',label='value')
# Plot threshold
plt.plot([min(t), max(t)], [thresh_top, thresh_top], '--', color='r', label='peaks-threshold')
plt.plot([min(t), max(t)], [thresh_bottom, thresh_bottom], '--', color='g', label='valleys-threshold')
# Plot peaks (red) and valleys (blue)
plt.plot(t[peak_idx], x[peak_idx], "x", color='r', label='peaks')
plt.plot(t[valley_idx], x[valley_idx], "x", color='g', label='valleys')
plt.xticks(rotation=45)
plt.ylabel('value')
plt.xlabel('timestamp')
plt.title(f'data over time')
plt.legend( loc='lower left')
plt.gcf().autofmt_xdate()
plt.show()
在创建电光数据帧之前,当我从 pandas 数据帧pdf
读取数据时,它工作并绘制成功。一旦我创建了电光数据帧sdf
,即使您在读取和输入 pandas 的笔记本上运行相同的单元格pdf
,pdf
也不再起作用,并给出以下错误:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-12-1c79b34272c5> in <module>()
23
24 # Plot threshold
---> 25 plt.plot([min(t), max(t)], [thresh_top, thresh_top], '--', color='r', label='peaks-threshold')
26 plt.plot([min(t), max(t)], [thresh_bottom, thresh_bottom], '--', color='g', label='valleys-threshold')
27
2 frames
/content/spark-3.1.2-bin-hadoop2.7/python/pyspark/sql/column.py in _to_java_column(col)
47 "{0} of type {1}. "
48 "For column literals, use 'lit', 'array', 'struct' or 'create_map' "
---> 49 "function.".format(col, type(col)))
50 return jcol
51
TypeError: Invalid argument, not a string or column: 0 2020-10-16
1 2020-10-17
2 2020-10-18
3 2020-10-19
4 2020-10-20
5 2020-10-21
6 2020-10-22
7 2020-10-23
8 2020-10-24
9 2020-10-25
10 2020-10-26
11 2020-10-27
12 2020-10-28
13 2020-10-29
14 2020-10-30
15 2020-10-31
16 2020-11-01
17 2020-11-02
18 2020-11-03
19 2020-11-04
20 2020-11-05
21 2020-11-06
22 2020-11-07
23 2020-11-08
24 2020-11-09
25 2020-11-10
26 2020-11-11
27 2020-11-12
28 2020-11-13
29 2020-11-14
30 2020-11-15
Name: date, dtype: object of type <class 'pandas.core.series.Series'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
当我尝试使用以下命令直接从电光数据帧读取数据时:
# Input signal from Spark dataframe
t = [val.date for val in sdf.select('date').collect()]
x = [val.value for val in sdf.select('value').collect()]
遗憾的是,绘图代码无法工作,并抛出以下错误:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-14-9cfede77d0fb> in <module>()
31 # Find indices of peaks & of valleys (from inverting the signal)
32 peak_idx, _ = find_peaks(x, height = thresh_top)
---> 33 valley_idx, _ = find_peaks(-x, height = -thresh_bottom)
34
35
TypeError: bad operand type for unary -: 'list'
我花了很多时间,但我无法修复此错误。如果我能适应代码并将其应用到电光数据帧上,我还可以使用其他非find_peaks()
解决方案来绘制尖峰检测,比如这个数字answer。我已经尝试了很多东西,您可以在这个GoogleColab Notebook中查看,您可以随意运行/测试/编辑它以进行快速调试。
解决方案
问题在于您使用的不是相同的对象。
当您处理 pandas 时,您会得到x = pdf.value
,实际上得到的是Series
对象。此对象可以将-
放在前面,并且它知道必须将其中的值转换为负值。
但是,当您使用PySpark并收集值时,您会得到list
对象,如果您将-
放在前面,则会收到错误:
TypeError: bad operand type for unary -: 'list'
这告诉您它不知道如何处理它。
- 所以首先要做的是,而不是:
valley_idx, _ = find_peaks(-x, height=-thresh_bottom)
您必须将值转换为负值,例如:
valley_idx, _ = find_peaks([-i for i in x], height=-thresh_bottom)
- 下一步,
find_peaks
将返回ndarray
,它也不能与中的list
一起使用:
plt.plot(t[peak_idx], x[peak_idx], "x", color="r", label="peaks")
所以您必须手动完成,例如:
plt.plot(
[t[i] for i in peak_idx],
[x[i] for i in peak_idx],
"x",
color="r",
label="peaks",
)
我用以下代码(+计算PySpark中的median
和std_dev
为例)再现了您的情节:
# data is the same
# ...
# create a Spark dataframe
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(data=data, schema=schema)
std_dev = sdf.select(F.stddev(F.col("value")).alias("std")).collect()[0]["std"]
median = (
sdf.groupBy("value")
.agg(F.expr("percentile_approx(value, 0.5)").alias("med"))
.collect()[0]["med"]
)
thresh_top = median + 1 * std_dev
thresh_bottom = median - 1 * std_dev
t = sdf.select("date").rdd.flatMap(lambda x: x).collect()
x = sdf.select("value").rdd.flatMap(lambda x: x).collect()
peak_idx, _ = find_peaks(x, height=thresh_top)
valley_idx, _ = find_peaks([-i for i in x], height=-thresh_bottom)
plt.figure(figsize=(14, 12))
plt.plot(t, x, color="b", label="data")
plt.scatter(t, x, s=10, c="b", label="value")
# Plot threshold
plt.plot(
[min(t), max(t)],
[thresh_top, thresh_top],
"--",
color="r",
label="peaks-threshold",
)
plt.plot(
[min(t), max(t)],
[thresh_bottom, thresh_bottom],
"--",
color="g",
label="valleys-threshold",
)
# Plot peaks (red) and valleys (blue)
plt.plot(
[t[i] for i in peak_idx],
[x[i] for i in peak_idx],
"x",
color="r",
label="peaks",
)
plt.plot(
[t[i] for i in valley_idx],
[x[i] for i in valley_idx],
"x",
color="g",
label="valleys",
)
# ...
相关文章