为什么当我发送两个输入流时,Spark Streaming停止工作?
我正在开发一个Spark流应用程序,其中我需要使用来自两个服务器的输入流,每个服务器每秒向Spark上下文发送一条JSON消息。
我的问题是,如果我只在一个流上执行操作,一切都运行得很好。但如果我有来自不同服务器的两个流,那么Spark在可以打印任何东西之前冻结,并且只有在两个服务器都发送了它们必须发送的所有JSON消息时(当它检测到socketTextStream
没有接收数据时)才开始重新工作。
以下是我的代码:
JavaReceiverInputDStream<String> streamData1 = ssc.socketTextStream("localhost",996,
StorageLevels.MEMORY_AND_DISK_SER);
JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream("localhost", 9995,StorageLevels.MEMORY_AND_DISK_SER);
JavaPairDStream<Integer, String> dataStream1= streamData1.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple2<Integer, String> call(String stream) throws Exception {
Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(1, stream);
return streamPair;
}
});
JavaPairDStream<Integer, String> dataStream2= streamData2.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple2<Integer, String> call(String stream) throws Exception {
Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(2, stream);
return streamPair;
}
});
dataStream2.print(); //for example
请注意,没有错误消息,Spark Simple在启动上下文后冻结,虽然我从端口收到JSON消息,但它没有显示任何内容。
非常感谢。
解决方案
查看Spark Streaming documentation中的这些警告,并查看它们是否适用:
要记住的要点
- 在本地运行Spark Streaming程序时,请勿使用local或local1为主URL。这两种情况都意味着只有一个线程将用于本地运行任务。如果您使用的是基于接收器的输入DStream(例如Sockets、Kafka、Flume等),则将使用单个线程来运行接收器,而不会留下处理接收到的数据的线程。因此,在本地运行时,请始终使用"local[n]"作为主URL,其中n>要运行的接收器的数量(有关如何设置主URL的信息,请参阅Spark属性)。
- 将逻辑扩展到在集群上运行,分配给Spark Streaming应用程序的核心数必须多于接收器数。否则,系统将接收数据,但无法处理数据。
相关文章