Spark Streaming:如何在Python中获取已处理文件的文件名

2022-03-29 00:00:00 python spark-streaming

问题描述

我是Spark的新手(老实说,我也是Python的新手),如果我错过了一些明显的东西,请原谅我。

我正在使用Spark和Python进行文件流传输。在我所做的第一个示例中,Spark正确地侦听给定的目录并计算文件中出现的单词,因此我知道一切都是在侦听该目录的情况下工作的。

现在,我正在尝试获取为进行审计而处理的文件的名称。我在这里读到 http://mail-archives.us.apache.org/mod_mbox/spark-user/201504.mbox/%3CCANvfmP8OC9jrpVgWsRWfqjMxeYd6sE6EojfdyFy_GaJ3BO43_A@mail.gmail.com%3E 这不是一项微不足道的任务。我这里有一个可能的解决方案 http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3CCAEgyCiZbnrd6Y_aG0cBRCVC1u37X8FERSEcHB=tR3A2VGrGrPQ@mail.gmail.com%3E 我试着按如下方式实现:

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def fileName(data):
    string = data.toDebugString

if __name__ == "__main__":
    sc = SparkContext(appName="PythonStreamingFileNamePrinter")
    ssc = StreamingContext(sc, 1)
    lines = ssc.textFileStream("file:///test/input/")
    files = lines.foreachRDD(fileName)
    print(files)
    ssc.start()
    ssc.awaitTermination()
不幸的是,现在它不是每秒监听文件夹,而是监听一次,输出‘NONE’,然后等待什么也不做。这与运行正常的代码之间的唯一区别是

files = lines.foreachRDD(fileName)

在我担心获取文件名(明天的问题)之前,有人能理解为什么这只检查目录一次吗?

提前感谢 M


解决方案

,因此这是一个新手错误。我将我的解决方案张贴出来,供我自己和其他人参考。

正如@user3689574所指出的,我没有在我的函数中返回调试字符串。这充分解释了为什么我得到的是‘无’。

接下来,我在函数外部打印调试,这意味着它从来不是ForeachRDD的一部分。将其移动到函数中,如下所示:

def fileName(data):
    debug = data.toDebugString()
    print(debug)

这会打印调试信息,并继续监听目录。改变这一点解决了我最初的问题。在获取文件名方面,这变得非常简单。

目录没有变化时的调试字符串如下:

(0) MapPartitionsRDD[1] at textFileStream at NativeMethodAccessorImpl.java:-2 [] | UnionRDD[0] at textFileStream at NativeMethodAccessorImpl.java:-2 []

这清楚地表明没有文件。将文件复制到目录中时,调试输出如下:

(1) MapPartitionsRDD[42] at textFileStream at NativeMethodAccessorImpl.java:-2 [] | UnionRDD[41] at testFileStream at NativeMethodAccessorImpl.java:-2 [] | file:/test/input/test.txt New HadoopRDD[40] at textFileStream at NativeMethodAccessorImpl.java:-2 []

它通过一个快速正则表达式很容易地为您提供了文件名。希望这对其他人有帮助。

相关文章