首頁/ 汽車/ 正文

使用SpringBatch讀取csv檔案

1、需求

系統每日從某個固定的目錄中讀取

csv

檔案,並在控制檯上列印。

2、解決方案

要解決上述需求,可以使用的方法有很多,此處選擇使用

Spring Batch

來實現。

3、注意事項

1、檔案路徑的獲取

此處簡單處理,讀取

JobParameters

中的日期,然後構建一個檔案路徑,並將檔案路徑放入到

ExecutionContext

中。此處為了簡單,檔案路徑會在程式中寫死,但是同時也會將檔案路徑存入到

ExecutionContext

中,並且在具體的某個

Step

中從

ExecutionContext

中獲取路徑。

注意:

ExecutionContext

中存入的資料雖然在各個

Step

中都可以獲取到,但是

不推薦

存入

比較大

的資料到

ExecutionContext

中,因為這個物件的資料需要存入到資料庫中。

2、各個Step如果獲取到ExecutionContext中的值

類上加入

@StepScope

註解

透過

@Value(“#{jobExecutionContext[‘importPath’]}”)

來獲取

eg:

@Bean@StepScopepublic FlatFileItemReader readCsvItemReader(@Value(“#{jobExecutionContext[‘importPath’]}”) String importPath) { // 讀取資料 return new FlatFileItemReaderBuilder() 。name(“read-csv-file”) 。resource(new ClassPathResource(importPath)) 。delimited()。delimiter(“,”) 。names(“username”, “age”, “sex”) 。fieldSetMapper(new RecordFieldSetMapper<>(Person。class)) 。build();}

解釋:在程式例項化

FlatFileItemReader

的時候,此時是沒有jobExecutionContext的,那麼就會報錯,如果加上

@StepScope

,此時就沒有問題了。

@StepScope

表示到達Step階段才例項化這個Bean

3、FlatFileItemReader使用注意

當我們使用

FlatFileItemReader

來讀取我們的csv檔案時,此處需要返回

FlatFileItemReader

型別,而不能直接返回

ItemReader

,否則可能出現如下錯誤

Reader must be open before it can be read

4、實現步驟

1、匯入依賴,配置

1、匯入依賴

org。springframework。boot spring-boot-starter-batch

2、初始化SpringBatch資料庫

spring。datasource。username=rootspring。datasource。password=root@1993spring。datasource。url=jdbc:mysql://127。0。0。1:3306/spring-batch?useUnicode=true&characterEncoding=utf8&autoReconnectForPools=true&useSSL=falsespring。datasource。driver-class-name=com。mysql。cj。jdbc。Driver# 程式啟動時,預設不執行jobspring。batch。job。enabled=falsespring。batch。jdbc。initialize-schema=always# 初始化spring-batch資料庫指令碼spring。batch。jdbc。schema=classpath:org/springframework/batch/core/schema-mysql。sql

2、構建檔案讀取路徑

此處我的想法是,在

JobExecutionListener

中完成檔案路徑的獲取,並將之放入到

ExecutionContext

,然後在各個

Step

中就可以獲取到檔案路徑的值了。

/** * 在此監聽器中,獲取到具體的需要讀取的檔案路徑,並儲存到 ExecutionContext * * @author huan。fu * @date 2022/8/30 - 22:22 */@Slf4jpublic class AssemblyReadCsvPathListener implements JobExecutionListener { @Override public void beforeJob(JobExecution jobExecution) { ExecutionContext executionContext = jobExecution。getExecutionContext(); JobParameters jobParameters = jobExecution。getJobParameters(); String importDate = jobParameters。getString(“importDate”); log。info(“從 job parameter 中獲取的 importDate 引數的值為:[{}]”, importDate); String readCsvPath = “data/person。csv”; log。info(“根據日期組裝需要讀取的csv路徑為:[{}],此處排除日期,直接寫一個死的路徑”, readCsvPath); executionContext。putString(“importPath”, readCsvPath); } @Override public void afterJob(JobExecution jobExecution) { }}

3、構建Tasklet,輸出檔案路徑

@Slf4j@Component@StepScopepublic class PrintImportFilePathTaskLet implements Tasklet { @Value(“#{jobExecutionContext[‘importPath’]}”) private String importFilePath; @Value(“#{jobParameters[‘importDate’]}”) private String importDate; @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { log。info(“從job parameter 中獲取到的 importDate:[{}],從 jobExecutionContext 中獲取的 importPath:[{}]”, importDate, importFilePath); return RepeatStatus。FINISHED; }}

需要注意的是,此類上加入了

@StepScope

註解

4、編寫實體類

@AllArgsConstructor@Getter@ToStringpublic class Person { /** * 使用者名稱 */ private String username; /** * 年齡 */ private Integer age; /** * 性別 */ private String sex;}

5、編寫Job配置

@Configuration@AllArgsConstructor@Slf4jpublic class ImportPersonJobConfig { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final PrintImportFilePathTaskLet printImportFilePathTaskLet; private final ItemReader readCsvItemReader; @Bean public Job importPersonJob() { // 獲取一個job builder, jobName可以是不存在的 return jobBuilderFactory。get(“import-person-job”) // 新增job execution 監聽器 。listener(new AssemblyReadCsvPathListener()) // 列印 job parameters 和 ExecutionContext 中的值 。start(printParametersAndContextVariables()) // 讀取csv的資料並處理 。next(handleCsvFileStep()) 。build(); } /** * 讀取資料 * 注意:此處需要返回 FlatFileItemReader型別,而不要返回ItemReader * 否則可能報如下異常 Reader must be open before it can be read * * @param importPath 檔案路徑 * @return reader */ @Bean @StepScope public FlatFileItemReader readCsvItemReader(@Value(“#{jobExecutionContext[‘importPath’]}”) String importPath) { // 讀取資料 return new FlatFileItemReaderBuilder() 。name(“read-csv-file”) 。resource(new ClassPathResource(importPath)) 。delimited()。delimiter(“,”) 。names(“username”, “age”, “sex”) 。fieldSetMapper(new RecordFieldSetMapper<>(Person。class)) 。build(); } @Bean public Step handleCsvFileStep() { // 每讀取一條資料,交給這個處理 ItemProcessor processor = item -> { if (item。getAge() > 25) { log。info(“使用者[{}]的年齡:[{}>25]不處理”, item。getUsername(), item。getAge()); return null; } return item; }; // 讀取到了 chunk 大小的資料後,開始執行寫入 ItemWriter itemWriter = items -> { log。info(“開始寫入資料”); for (Person item : items) { log。info(“{}”, item); } }; return stepBuilderFactory。get(“handle-csv-file”) // 每讀取2條資料,執行一次write,當每read一條資料後,都會執行process 。chunk(2) // 讀取資料 。reader(readCsvItemReader) // 讀取一條資料就開始處理 。processor(processor) // 當讀取的資料的數量到達 chunk 時,呼叫該方法進行處理 。writer(itemWriter) 。build(); } /** * 列印 job parameters 和 ExecutionContext 中的值 *

* TaskletStep是一個非常簡單的介面,僅有一個方法——execute。 * TaskletStep會反覆的呼叫這個方法直到獲取一個RepeatStatus。FINISHED返回或者丟擲一個異常。 * 所有的Tasklet呼叫都會包裝在一個事物中。 * * @return Step */ private Step printParametersAndContextVariables() { return stepBuilderFactory。get(“print-context-params”) 。tasklet(printImportFilePathTaskLet) // 當job重啟時,如果達到了3此,則該step不在執行 。startLimit(3) // 當job重啟時,如果該step的是已經處理完成即COMPLETED狀態時,下方給false表示該step不在重啟,即不在執行 。allowStartIfComplete(false) // 新增 step 監聽 。listener(new CustomStepExecutionListener()) 。build(); }}

6、編寫Job啟動類

@Component@Slf4jpublic class StartImportPersonJob { @Autowired private Job importPersonJob; @Autowired private JobLauncher jobLauncher; @PostConstruct public void startJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException { JobParameters jobParameters = new JobParametersBuilder() 。addString(“importDate”, LocalDate。of(2022, 08, 31)。format(DateTimeFormatter。ofPattern(“yyyyMMdd”))) 。toJobParameters(); JobExecution execution = jobLauncher。run(importPersonJob, jobParameters); log。info(“job invoked”); }}

7、自動配置SpringBatch

@SpringBootApplication@EnableBatchProcessingpublic class SpringBatchReadCsvApplication { public static void main(String[] args) { SpringApplication。run(SpringBatchReadCsvApplication。class, args); }}

主要是

@EnableBatchProcessing

註解

5、執行結果

使用SpringBatch讀取csv檔案

執行結果

6、完整程式碼

https://gitee。com/huan1993/spring-cloud-parent/tree/master/spring-batch/spring-batch-read-csv

相關文章

頂部