java.nio.file.FileSystemException进程无法访问该文件,因为其他进程正在使用该文件
我在spring-boot
应用程序中使用spring batch
。Spring Boot版本为2.3.3.RELEASE
。
multi-step job
,在第一步中validates xml file header
然后read
transaction
在chunk oriented step
中,对每个事务做一些business logic
,然后write
将其返回到XML文件。在third
也是最后一步中,当我尝试delete the input file
时,它抛出FileSystemException
。
更新:即使在作业完成后,我也无法删除输入文件。
Spring批量配置
package com.trax.europeangateway.config;
import java.util.Arrays;
import java.util.List;
import javax.xml.bind.JAXBException;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.batch.item.xml.builder.StaxEventItemReaderBuilder;
import org.springframework.batch.item.xml.builder.StaxEventItemWriterBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import org.springframework.util.ClassUtils;
import com.trax.europeangateway.factory.XMLSchemaValidatorFactory;
import com.trax.europeangateway.itemprocessor.omegaxml.PIExtractorItemProcessor;
import com.trax.europeangateway.itemprocessor.omegaxml.PIRemoverItemProcessor;
import com.trax.europeangateway.itemwriter.omegaxml.EdsClientItemWriter;
import com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter;
import com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFooterCallBack;
import com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlHeaderCallBack;
import com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlJaxb2Marshaller;
import com.trax.europeangateway.listener.JobResultListener;
import com.trax.europeangateway.listener.StepResultListener;
import com.trax.europeangateway.model.dto.FileInformationHeaderDto;
import com.trax.europeangateway.model.dto.ProcessorWriterDto;
import com.trax.europeangateway.model.dto.xsd.omega.TransactionPositionReport;
import com.trax.europeangateway.model.enums.Directory;
import com.trax.europeangateway.service.AfterJobService;
import com.trax.europeangateway.service.ExtractHeaderOmegaXml;
import com.trax.europeangateway.util.FileUtils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Configuration
public class OmegaXmlBatchConfig {
@Autowired
PIExtractorItemProcessor pIExtractorItemProcessor;
@Autowired
StepResultListener stepResultListener;
@Autowired
JobBuilderFactory jobBuilderFactory;
@Autowired
StepBuilderFactory stepBuilderFactory;
@Autowired
private FileUtils fileUtils;
@Value( "${eugateway.batch.chunk.size}" )
private int chunkSize;
@JobScope
@Bean (name = "extractOmegaXmlHeaderStep")
public Step extractOmegaXmlHeaderStep(StepBuilderFactory steps , XMLSchemaValidatorFactory factory,
@Value("#{jobParameters['file.path']}") String filePath,
@Value("#{jobParameters['submission.account']}") String submitter) {
return steps.get("omegaXmlExtractHeaderStep")
.tasklet((contribution, chunkContext) -> {
//factory.validatePayload(filePath, submitter);
FileInformationHeaderDto fileInformation = new ExtractHeaderOmegaXml().readHeader(filePath);
ExecutionContext jobExecutionContext = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
jobExecutionContext.put("file.information", fileInformation);
return RepeatStatus.FINISHED;
}).build();
}
@Bean (name = "OmegaXmlExtractAndReplacePersonalDataStep")
public Step jobStep(ItemStreamReader<TransactionPositionReport> omegaXmlItemReader,
CompositeItemProcessor<TransactionPositionReport,
ProcessorWriterDto> omegaXmlExtractAndRemoveItemProcessor,
CompositeItemWriter<ProcessorWriterDto> omegaXmlCompositeItemWriter,
StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("OmegaXmlExtractAndReplacePersonalDataStep")
.<TransactionPositionReport, ProcessorWriterDto>chunk(chunkSize)
.reader(omegaXmlItemReader)
.processor(omegaXmlExtractAndRemoveItemProcessor)
.writer(omegaXmlCompositeItemWriter)
.faultTolerant()
.listener(stepResultListener)
.build();
}
@JobScope
@Bean (name = "moveFileStep")
public Step moveFileStep(StepBuilderFactory steps , AfterJobService afterJobService,
@Value("#{jobParameters['file.path']}") String filePath,
@Value("#{jobParameters['submission.account']}") String submitter) {
return steps.get("moveFileStep")
.tasklet((contribution, chunkContext) -> {
Path inputFile = new File(filePath).toPath();
Files.delete(inputFile);
log.info("Submitter {}: File {} moved with status {}",submitter, filePath, status);
return RepeatStatus.FINISHED;
}).build();
}
@Primary
@Bean("omegaXmlJob")
public Job extractPersonalDataJob(Step extractOmegaXmlHeaderStep, Step OmegaXmlExtractAndReplacePersonalDataStep,
Step moveFileStep, JobResultListener jobListener,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get("omegaXmlExtractAndReplacePersonalDataJob")
.incrementer(new RunIdIncrementer())
.start(extractOmegaXmlHeaderStep)
.next(OmegaXmlExtractAndReplacePersonalDataStep)
.next(moveFileStep)
.listener(jobListener)
.build();
}
@StepScope
@Bean(destroyMethod="doClose")
public ItemStreamReader<TransactionPositionReport> omegaXmlItemReader(@Value("#{jobParameters['file.path']}") String path) {
Jaxb2Marshaller transactionMarshaller = new Jaxb2Marshaller();
transactionMarshaller.setMappedClass(TransactionPositionReport.class);
transactionMarshaller.setPackagesToScan(ClassUtils.getPackageName(TransactionPositionReport.class));
transactionMarshaller.setSupportJaxbElementClass(true);
transactionMarshaller.setSupportDtd(true);
log.debug("Generating StaxEventItemReader");
return new StaxEventItemReaderBuilder<TransactionPositionReport>()
.name("transactionPositionReport")
.resource(new FileSystemResource(path))
.addFragmentRootElements("transaction")
.unmarshaller(transactionMarshaller)
.build();
}
@Bean
@JobScope
OmegaXmlHeaderCallBack getOmegaXmlHeaderCallBack(@Value("#{jobExecutionContext['file.information']}") FileInformationHeaderDto fileInformation){
return new OmegaXmlHeaderCallBack(fileInformation);
}
@Bean(destroyMethod="")
OmegaXmlFooterCallBack getOmegaXmlFooterCallBack(){
return new OmegaXmlFooterCallBack();
}
@JobScope
@Bean(name = "staxTransactionWriter", destroyMethod="")
public StaxEventItemWriter<TransactionPositionReport> staxTransactionItemWriter(OmegaXmlHeaderCallBack omegaXmlHeaderCallBack,
@Value("#{jobParameters['file.path']}") String path, @Value("#{jobParameters['submission.account']}") String submissionAccount) throws JAXBException {
Resource exportFileResource = new FileSystemResource(fileUtils.getFilePath(path, submissionAccount, Directory.TEMP, true));
OmegaXmlJaxb2Marshaller marshaller = new OmegaXmlJaxb2Marshaller();
marshaller.setContextPath("com.trax.europeangateway.model.dto.xsd.omega");
marshaller.setSupportJaxbElementClass(true);
marshaller.setCheckForXmlRootElement(false);
return new StaxEventItemWriterBuilder<TransactionPositionReport>()
.name("transactionWriter")
.resource(exportFileResource)
.marshaller(marshaller)
.rootTagName("{http://deutsche-boerse.com/DBRegHub}reportFile")
.headerCallback(omegaXmlHeaderCallBack)
.footerCallback(getOmegaXmlFooterCallBack())
.build();
}
@StepScope
@Bean
public PIExtractorItemProcessor extractItemProcessor() {
log.debug("Generating PIExtractorItemProcessor");
return new PIExtractorItemProcessor();
}
@Bean
public PIRemoverItemProcessor removeItemProcessor() {
log.debug("Generating PIRemoverItemProcessor");
return new PIRemoverItemProcessor();
}
@Bean
@StepScope
CompositeItemProcessor<TransactionPositionReport, ProcessorWriterDto> omegaXmlExtractAndRemoveItemProcessor() {
log.debug("Generating CompositeItemProcessor");
CompositeItemProcessor<TransactionPositionReport, ProcessorWriterDto> itemProcessor = new CompositeItemProcessor<>();
itemProcessor.setDelegates((List<? extends ItemProcessor<?, ?>>) Arrays.asList(extractItemProcessor(), removeItemProcessor()));
return itemProcessor;
}
@Bean
public EdsClientItemWriter<ProcessorWriterDto> edsClientItemWriter() {
log.debug("Generating EdsClientItemWriter");
return new EdsClientItemWriter<>();
}
@StepScope
@Bean(destroyMethod="")
public OmegaXmlFileWriter<ProcessorWriterDto> omegaXmlFileWriter(OmegaXmlHeaderCallBack omegaXmlHeaderCallBack, @Value("#{jobParameters['file.path']}") String path, @Value("#{jobParameters['submission.account']}") String submissionAccount) throws JAXBException {
log.debug("Generating OmegaXmlFileWriter");
return new OmegaXmlFileWriter(staxTransactionItemWriter(omegaXmlHeaderCallBack, path, submissionAccount));
}
@StepScope
@Bean(destroyMethod="")
public CompositeItemWriter<ProcessorWriterDto> omegaXmlCompositeItemWriter(OmegaXmlHeaderCallBack omegaXmlHeaderCallBack, @Value("#{jobParameters['file.path']}") String path, @Value("#{jobParameters['submission.account']}") String submissionAccount) throws JAXBException {
log.debug("Generating CompositeItemWriter");
CompositeItemWriter<ProcessorWriterDto> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(Arrays.asList(edsClientItemWriter(), omegaXmlFileWriter(omegaXmlHeaderCallBack, path, submissionAccount)));
return compositeItemWriter;
}
}
尝试删除文件时的堆栈跟踪
2021-06-27 10:14:09,270 DEBUG c.t.e.l.StepResultListener [europeanGateway-1] StepResultListener called afterStep for submitter 12345
2021-06-27 10:14:09,278 INFO o.s.b.c.s.AbstractStep [europeanGateway-1] Step: [OmegaXmlExtractAndReplacePersonalDataStep] executed in 1s873ms
2021-06-27 10:14:09,355 INFO o.s.b.c.j.SimpleStepHandler [europeanGateway-1] Executing step: [moveFileStep]
2021-06-27 10:14:09,387 ERROR o.s.b.c.s.AbstractStep [europeanGateway-1] Encountered an error executing step moveFileStep in job omegaXmlExtractAndReplacePersonalDataJob
java.nio.file.FileSystemException: xbuseuropeangatewayprocessingPRO_529900G3SW56SHYNPR95_20180403T145725003_MIXXXXXXT.XML: The process cannot access the file because it is being used by another process.
at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) ~[?:1.8.0_291]
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) ~[?:1.8.0_291]
at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102) ~[?:1.8.0_291]
at sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269) ~[?:1.8.0_291]
at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) ~[?:1.8.0_291]
at java.nio.file.Files.delete(Files.java:1126) ~[?:1.8.0_291]
at com.trax.europeangateway.config.OmegaXmlBatchConfig.lambda$1(OmegaXmlBatchConfig.java:119) ~[classes/:?]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) [spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_291]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_291]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_291]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_291]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
at com.sun.proxy.$Proxy142.execute(Unknown Source) [?:?]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410) [spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136) [spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319) [spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147) [spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_291]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_291]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_291]
2021-06-27 10:14:09,406 INFO o.s.b.c.s.AbstractStep [europeanGateway-1] Step: [moveFileStep] executed in 50ms
2021-06-27 10:14:09,437 ERROR c.t.e.l.JobResultListener [europeanGateway-1] ExtractPersonalData failed for submission account 12345
2021-06-27 10:14:09,451 INFO o.s.b.c.l.s.SimpleJobLauncher$1 [europeanGateway-1] Job: [SimpleJob: [name=omegaXmlExtractAndReplacePersonalDataJob]] completed with the following parameters: [{currentTime=1624785246755, submission.account=12345, file.path=xbuseuropeangatewayprocessingPRO_529900G3SW56SHYNPR95_20180403T145725003_MIXXXXXXT.XML}] and the following status: [FAILED] in 2s508ms
解决方案
您的问题是将CompositeItemWriter
与两个委托一起使用,但您没有在步骤中将委托注册为流。本例中发生的情况是未调用ItemStream#close
方法,因此资源未正确释放。
如reference documentation中所述,您需要手动将委派编写器(在您的情况下为edsClientItemWriter
和omegaXmlFileWriter
)注册为步骤中的项流。
相关文章