从 BigQuery 读取并将数据存储到 Google 存储(特殊字符问题)
Reference: Can Google Data flow use existent VM and not temporary created ones?
Code is working, but the issue is that when it saves response from BigQuery to google storage all the Japanese characters are corrupted.
PCollectionTuple QVCollections = rows.apply("FilterEmptyRows", ParDo.of(new FilterEmptyRowDoFn("TransactionId", "TransactionDateTime"))).apply("CreateQVFiles",ParDo.of(new TransactionToQVFilesDoFnJP())
.withOutputTags(BobShare.QVHeaders, TupleTagList.of(BobShare.QVEvents).and(BobShare.QVPayments)));
QVCollections.get(BobShare.QVEvents).apply("WriteQVEvents", TextIO.write().to(storagePath + CSV_OUTPUT_FOLDER + "events_" + timeSuffix).withoutSharding().withHeader(CSV_HEADER_EVENTS).withSuffix(".csv"));
QVCollections.get(BobShare.QVPayments).apply("WriteQVPayments", TextIO.write().to(storagePath + CSV_OUTPUT_FOLDER + "payments_" + timeSuffix).withoutSharding().withHeader(CSV_HEADER_PAYMENTS).withSuffix(".csv"));
QVCollections.get(BobShare.QVHeaders).apply("WriteQVHeaders", TextIO.write().to(storagePath + CSV_OUTPUT_FOLDER + "header_" + timeSuffix).withoutSharding().withHeader(CSV_HEADER_TRANSACTION).withSuffix(".csv"));
Based on what I have found, need to use .withCoder(StringUtf8Coder.of())
In addition, this is what have tried (but working only locally - DirectRunner)
private static void uploadBlob(String project, String bucket, String filename, String localfile) {
String listFromCsv = readCsvFromLocalStorage(localfile);
Storage storage = StorageOptions.newBuilder().setProjectId(project).build().getService();
BlobId blobId = BlobId.of(bucket, filename);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("application/json").setContentEncoding(UTF_8).build();
try {
storage.create(blobInfo, listFromCsv.getBytes(UTF_8));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
private static String readCsvFromLocalStorage(String fileName) {
StringBuilder builder = new StringBuilder();
Path pathToFile = Paths.get(fileName);
try (BufferedReader br = Files.newBufferedReader(pathToFile,
StandardCharsets.UTF_8)) {
// read the first line from the text file
String line = br.readLine();
// loop until all lines are read
while (line != null) {
builder.append(line).append("
");
line = br.readLine();
}
} catch (IOException ioe) {
ioe.printStackTrace();
}
return builder.toString();
}
private static void deleteLocalFile (String fileName)
{
try {
if (new File(fileName).delete()) {
System.out.println(fileName + " deleted.");
} else {
System.out.println(fileName + " could not be deleted.");
}
} catch (Exception e)
{
System.out.println(fileName + " could not be deleted.");
e.printStackTrace();
}
}
This is how data looks like (corrupted) : JAPANESE CHRACTERS
Any suggestions? Any .... (((
解决方案You need to replace
BufferedReader br = Files.newBufferedReader(pathToFile, StandardCharsets.UTF_8))
by
BufferedReader br = Files.newBufferedReader(pathToFile, Charset.forName("UTF-8"))
相关文章