java.nio.file.FileSystemException进程无法访问该文件,因为其他进程正在使用该文件

2022-02-28 00:00:00 file spring java spring-batch

我在spring-boot应用程序中使用spring batch。Spring Boot版本为2.3.3.RELEASE

我有一个multi-step job,在第一步中validates xml file header然后readtransactionchunk oriented step中,对每个事务做一些business logic,然后write将其返回到XML文件。在third也是最后一步中,当我尝试delete the input file时,它抛出FileSystemException

更新:即使在作业完成后,我也无法删除输入文件。

Spring批量配置

package com.trax.europeangateway.config;

import java.util.Arrays;
import java.util.List;

import javax.xml.bind.JAXBException;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.batch.item.xml.builder.StaxEventItemReaderBuilder;
import org.springframework.batch.item.xml.builder.StaxEventItemWriterBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import org.springframework.util.ClassUtils;

import com.trax.europeangateway.factory.XMLSchemaValidatorFactory;
import com.trax.europeangateway.itemprocessor.omegaxml.PIExtractorItemProcessor;
import com.trax.europeangateway.itemprocessor.omegaxml.PIRemoverItemProcessor;
import com.trax.europeangateway.itemwriter.omegaxml.EdsClientItemWriter;
import com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter;
import com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFooterCallBack;
import com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlHeaderCallBack;
import com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlJaxb2Marshaller;
import com.trax.europeangateway.listener.JobResultListener;
import com.trax.europeangateway.listener.StepResultListener;
import com.trax.europeangateway.model.dto.FileInformationHeaderDto;
import com.trax.europeangateway.model.dto.ProcessorWriterDto;
import com.trax.europeangateway.model.dto.xsd.omega.TransactionPositionReport;
import com.trax.europeangateway.model.enums.Directory;
import com.trax.europeangateway.service.AfterJobService;
import com.trax.europeangateway.service.ExtractHeaderOmegaXml;
import com.trax.europeangateway.util.FileUtils;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Configuration
public class OmegaXmlBatchConfig {
    
    @Autowired
    PIExtractorItemProcessor pIExtractorItemProcessor;
    
    @Autowired
    StepResultListener stepResultListener;
    
    @Autowired
    JobBuilderFactory jobBuilderFactory;
     
    @Autowired
    StepBuilderFactory stepBuilderFactory;
    
    @Autowired
    private FileUtils fileUtils;
    
    @Value( "${eugateway.batch.chunk.size}" )
    private int chunkSize;
    
    @JobScope
    @Bean (name = "extractOmegaXmlHeaderStep")
    public Step extractOmegaXmlHeaderStep(StepBuilderFactory steps , XMLSchemaValidatorFactory factory,
            @Value("#{jobParameters['file.path']}") String filePath,
            @Value("#{jobParameters['submission.account']}") String submitter) {
        return steps.get("omegaXmlExtractHeaderStep")
                .tasklet((contribution, chunkContext) -> {
                    //factory.validatePayload(filePath, submitter);
                    FileInformationHeaderDto fileInformation = new ExtractHeaderOmegaXml().readHeader(filePath);
                    ExecutionContext jobExecutionContext =  chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
                    jobExecutionContext.put("file.information", fileInformation);
                    return RepeatStatus.FINISHED;
                }).build();
    }
    
    @Bean (name = "OmegaXmlExtractAndReplacePersonalDataStep")
    public Step jobStep(ItemStreamReader<TransactionPositionReport> omegaXmlItemReader,
            CompositeItemProcessor<TransactionPositionReport, 
            ProcessorWriterDto> omegaXmlExtractAndRemoveItemProcessor,
            CompositeItemWriter<ProcessorWriterDto> omegaXmlCompositeItemWriter,
            StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory.get("OmegaXmlExtractAndReplacePersonalDataStep")
                .<TransactionPositionReport, ProcessorWriterDto>chunk(chunkSize)
                .reader(omegaXmlItemReader)
                .processor(omegaXmlExtractAndRemoveItemProcessor)
                .writer(omegaXmlCompositeItemWriter)
                .faultTolerant()
                .listener(stepResultListener)
                .build();
    }
    
    @JobScope
    @Bean (name = "moveFileStep")
    public Step moveFileStep(StepBuilderFactory steps , AfterJobService afterJobService, 
            @Value("#{jobParameters['file.path']}") String filePath,
            @Value("#{jobParameters['submission.account']}") String submitter) {
        return steps.get("moveFileStep")
                .tasklet((contribution, chunkContext) -> {
                    Path inputFile = new File(filePath).toPath();
                Files.delete(inputFile);
                    log.info("Submitter {}: File {} moved with status {}",submitter, filePath, status);
                    return RepeatStatus.FINISHED;
                }).build();
    }

    @Primary
    @Bean("omegaXmlJob")
    public Job extractPersonalDataJob(Step extractOmegaXmlHeaderStep, Step OmegaXmlExtractAndReplacePersonalDataStep,
            Step moveFileStep, JobResultListener jobListener,
            JobBuilderFactory jobBuilderFactory) {
        return jobBuilderFactory.get("omegaXmlExtractAndReplacePersonalDataJob")
                .incrementer(new RunIdIncrementer())
                .start(extractOmegaXmlHeaderStep)
                .next(OmegaXmlExtractAndReplacePersonalDataStep)
                .next(moveFileStep)
                .listener(jobListener)
                .build();
    }

    @StepScope
    @Bean(destroyMethod="doClose")
    public ItemStreamReader<TransactionPositionReport> omegaXmlItemReader(@Value("#{jobParameters['file.path']}") String path) {
        Jaxb2Marshaller transactionMarshaller = new Jaxb2Marshaller();
        transactionMarshaller.setMappedClass(TransactionPositionReport.class);
        transactionMarshaller.setPackagesToScan(ClassUtils.getPackageName(TransactionPositionReport.class));
        transactionMarshaller.setSupportJaxbElementClass(true);
        transactionMarshaller.setSupportDtd(true);
        log.debug("Generating StaxEventItemReader");
 
        return new StaxEventItemReaderBuilder<TransactionPositionReport>()
                .name("transactionPositionReport")
                .resource(new FileSystemResource(path))
                .addFragmentRootElements("transaction")
                .unmarshaller(transactionMarshaller)
                .build();
    }
    
    @Bean
    @JobScope
    OmegaXmlHeaderCallBack getOmegaXmlHeaderCallBack(@Value("#{jobExecutionContext['file.information']}") FileInformationHeaderDto fileInformation){
        return new OmegaXmlHeaderCallBack(fileInformation);
    }
    
    @Bean(destroyMethod="")
    OmegaXmlFooterCallBack getOmegaXmlFooterCallBack(){
        return new OmegaXmlFooterCallBack();
    }
    
    @JobScope
    @Bean(name = "staxTransactionWriter", destroyMethod="")
    public StaxEventItemWriter<TransactionPositionReport> staxTransactionItemWriter(OmegaXmlHeaderCallBack omegaXmlHeaderCallBack, 
            @Value("#{jobParameters['file.path']}") String path, @Value("#{jobParameters['submission.account']}") String submissionAccount) throws JAXBException {
        Resource exportFileResource = new FileSystemResource(fileUtils.getFilePath(path, submissionAccount, Directory.TEMP, true));
 
        OmegaXmlJaxb2Marshaller marshaller = new OmegaXmlJaxb2Marshaller();
        marshaller.setContextPath("com.trax.europeangateway.model.dto.xsd.omega");
        marshaller.setSupportJaxbElementClass(true);
        marshaller.setCheckForXmlRootElement(false);
        
        return new StaxEventItemWriterBuilder<TransactionPositionReport>()
                .name("transactionWriter")
                .resource(exportFileResource)
                .marshaller(marshaller)
                .rootTagName("{http://deutsche-boerse.com/DBRegHub}reportFile")
                .headerCallback(omegaXmlHeaderCallBack)
                .footerCallback(getOmegaXmlFooterCallBack())
                .build();
    }
    
    @StepScope
    @Bean
    public PIExtractorItemProcessor extractItemProcessor() {
        log.debug("Generating PIExtractorItemProcessor");
        return new PIExtractorItemProcessor();
    }
    
    @Bean
    public PIRemoverItemProcessor removeItemProcessor() {
        log.debug("Generating PIRemoverItemProcessor");
        return new PIRemoverItemProcessor();
    }
    
    @Bean
    @StepScope
    CompositeItemProcessor<TransactionPositionReport, ProcessorWriterDto> omegaXmlExtractAndRemoveItemProcessor() {
        log.debug("Generating CompositeItemProcessor");
        CompositeItemProcessor<TransactionPositionReport, ProcessorWriterDto> itemProcessor = new CompositeItemProcessor<>();
        itemProcessor.setDelegates((List<? extends ItemProcessor<?, ?>>) Arrays.asList(extractItemProcessor(), removeItemProcessor()));
        return itemProcessor;
    }
    
    @Bean
    public EdsClientItemWriter<ProcessorWriterDto> edsClientItemWriter() {
        log.debug("Generating EdsClientItemWriter");
        return new EdsClientItemWriter<>();
    }
    
    @StepScope
    @Bean(destroyMethod="")
    public OmegaXmlFileWriter<ProcessorWriterDto> omegaXmlFileWriter(OmegaXmlHeaderCallBack omegaXmlHeaderCallBack, @Value("#{jobParameters['file.path']}") String path, @Value("#{jobParameters['submission.account']}") String submissionAccount) throws JAXBException {
        log.debug("Generating OmegaXmlFileWriter");
        return new OmegaXmlFileWriter(staxTransactionItemWriter(omegaXmlHeaderCallBack, path, submissionAccount));
    }
    
    
    @StepScope
    @Bean(destroyMethod="")
    public CompositeItemWriter<ProcessorWriterDto> omegaXmlCompositeItemWriter(OmegaXmlHeaderCallBack omegaXmlHeaderCallBack, @Value("#{jobParameters['file.path']}") String path, @Value("#{jobParameters['submission.account']}") String submissionAccount) throws JAXBException {
        log.debug("Generating CompositeItemWriter");
        CompositeItemWriter<ProcessorWriterDto> compositeItemWriter = new CompositeItemWriter<>();
        compositeItemWriter.setDelegates(Arrays.asList(edsClientItemWriter(), omegaXmlFileWriter(omegaXmlHeaderCallBack, path, submissionAccount)));
        return compositeItemWriter;
    }
    
}

尝试删除文件时的堆栈跟踪

2021-06-27 10:14:09,270 DEBUG c.t.e.l.StepResultListener [europeanGateway-1] StepResultListener called afterStep for submitter 12345
2021-06-27 10:14:09,278 INFO o.s.b.c.s.AbstractStep [europeanGateway-1] Step: [OmegaXmlExtractAndReplacePersonalDataStep] executed in 1s873ms
2021-06-27 10:14:09,355 INFO o.s.b.c.j.SimpleStepHandler [europeanGateway-1] Executing step: [moveFileStep]
2021-06-27 10:14:09,387 ERROR o.s.b.c.s.AbstractStep [europeanGateway-1] Encountered an error executing step moveFileStep in job omegaXmlExtractAndReplacePersonalDataJob
java.nio.file.FileSystemException: xbuseuropeangatewayprocessingPRO_529900G3SW56SHYNPR95_20180403T145725003_MIXXXXXXT.XML: The process cannot access the file because it is being used by another process.

    at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) ~[?:1.8.0_291]
    at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) ~[?:1.8.0_291]
    at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102) ~[?:1.8.0_291]
    at sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269) ~[?:1.8.0_291]
    at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) ~[?:1.8.0_291]
    at java.nio.file.Files.delete(Files.java:1126) ~[?:1.8.0_291]
    at com.trax.europeangateway.config.OmegaXmlBatchConfig.lambda$1(OmegaXmlBatchConfig.java:119) ~[classes/:?]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) [spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_291]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_291]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_291]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_291]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) [spring-aop-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at com.sun.proxy.$Proxy142.execute(Unknown Source) [?:?]
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) [spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410) [spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136) [spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319) [spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147) [spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_291]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_291]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_291]
2021-06-27 10:14:09,406 INFO o.s.b.c.s.AbstractStep [europeanGateway-1] Step: [moveFileStep] executed in 50ms
2021-06-27 10:14:09,437 ERROR c.t.e.l.JobResultListener [europeanGateway-1] ExtractPersonalData failed for submission account 12345
2021-06-27 10:14:09,451 INFO o.s.b.c.l.s.SimpleJobLauncher$1 [europeanGateway-1] Job: [SimpleJob: [name=omegaXmlExtractAndReplacePersonalDataJob]] completed with the following parameters: [{currentTime=1624785246755, submission.account=12345, file.path=xbuseuropeangatewayprocessingPRO_529900G3SW56SHYNPR95_20180403T145725003_MIXXXXXXT.XML}] and the following status: [FAILED] in 2s508ms


解决方案

您的问题是将CompositeItemWriter与两个委托一起使用,但您没有在步骤中将委托注册为流。本例中发生的情况是未调用ItemStream#close方法,因此资源未正确释放。

如reference documentation中所述,您需要手动将委派编写器(在您的情况下为edsClientItemWriteromegaXmlFileWriter)注册为步骤中的项流。

相关文章