使用pyflink从本地系统以批处理模式读取CSV文件

2022-03-15 00:00:00 python csv apache-flink pyflink

问题描述

我在编写pyflink作业时尝试读取已建立的CSV文件。我使用文件系统连接器来获取数据,但是在DDL上执行EXECUTE_SQL()并稍后在表上执行查询之后,我收到一个错误,说明它无法获取下一个结果。我无法解决此错误。我已经检查了CSV文件,它是完全正确的,并且可以与 pandas 一起工作,但是在这里我不明白为什么它不能获取下一行。请查找附加的代码以供参考。

from pyflink.common.serialization import SimpleStringEncoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink
from pyflink.table import EnvironmentSettings, StreamTableEnvironment, BatchTableEnvironment
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit
import pandas as pd
from inspect import getmembers, isfunction
import os
## CREATE THE ENVIRONMENT

# create a blink batch TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)
#connector for ingesting the data
source_ddl = """
                CREATE TABLE MyUserTable (
                        timestamp_info TIMESTAMP(3),
                        column_a FLOAT,
                        column_b FLOAT,
                        column_c INT,
                        ) WITH (
                          'connector' = 'filesystem',          
                          'path' = 'file:///Users/abc/Projects/temp.csv', 
                          'format' = 'csv'

                        )"""


#connector for data output/sink
sink_ddl = """
                CREATE TABLE results (
                            timestamp_info TIMESTAMP(3),
                            score FLOAT)
                            WITH (
                                'connector' = 'filesystem',
                                'path' = 'file:///Users/abc/Projects/results.csv',
                                'format' = 'csv'
                            )"""

#make the table corresponding to the schema mentioned
source_table = table_env.execute_sql(source_ddl)
sink_table = table_env.execute_sql(sink_ddl)

#convert the sql table to table API
table_path = table_env.from_path("MyUserTable")

# execute SELECT statement
table_result2 = table_env.execute_sql("SELECT timestamp_info,column_a FROM MyUserTable")
table_result2.print()

即将出现的错误:-

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/avil.a/opt/anaconda3/lib/python3.8/site-packages/pyflink/lib/flink-dist_2.11-1.12.2.jar) to field java.lang.Class.ANNOTATION
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
+-------------------------+--------------------------------+
|          timestamp_info |                       column_a |
+-------------------------+--------------------------------+
Traceback (most recent call last):
  File "local_implementation.py", line 51, in <module>
    table_result2.print()
  File "/Users/avil.a/opt/anaconda3/lib/python3.8/site-packages/pyflink/table/table_result.py", line 219, in print
    self._j_table_result.print()
  File "/Users/avil.a/opt/anaconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1285, in __call__
    return_value = get_return_value(
  File "/Users/avil.a/opt/anaconda3/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/Users/avil.a/opt/anaconda3/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o46.print.
: java.lang.RuntimeException: Failed to fetch next result
    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
    at org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:117)
    at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
    at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:149)
    at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:154)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.io.IOException: Failed to fetch job execution result
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:169)
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)
    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
    ... 16 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2086)
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:167)
    ... 18 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
    at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
    at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680)
    at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658)
    at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2158)
    at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:114)
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:166)
    ... 18 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Failed to deserialize CSV row.
    at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:257)
    at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:162)
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
Caused by: java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
    at java.sql/java.sql.Timestamp.valueOf(Timestamp.java:196)
    at org.apache.flink.formats.csv.CsvToRowDataConverters.convertToTimestamp(CsvToRowDataConverters.java:250)
    at org.apache.flink.formats.csv.CsvToRowDataConverters.lambda$createNullableConverter$ac6e531e$1(CsvToRowDataConverters.java:113)
    at org.apache.flink.formats.csv.CsvToRowDataConverters.lambda$createRowConverter$18bb1dd$1(CsvToRowDataConverters.java:98)
    at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:251)

解决方案

时间戳格式与csv文件中的时间索引不匹配。此外,当像这样读取CSV文件时,它将获取第一行并尝试解析它,如果您的CSV文件中有头,那么它将给出错误,因为它一开始需要时间戳格式,但得到的字符串无法解析为所需的格式。因此,请确保CSV文件中没有标头。

相关文章