1、需求
系統每日從某個固定的目錄中讀取
csv
檔案,並在控制檯上列印。
2、解決方案
要解決上述需求,可以使用的方法有很多,此處選擇使用
Spring Batch
來實現。
3、注意事項
1、檔案路徑的獲取
此處簡單處理,讀取
JobParameters
中的日期,然後構建一個檔案路徑,並將檔案路徑放入到
ExecutionContext
中。此處為了簡單,檔案路徑會在程式中寫死,但是同時也會將檔案路徑存入到
ExecutionContext
中,並且在具體的某個
Step
中從
ExecutionContext
中獲取路徑。
注意:
ExecutionContext
中存入的資料雖然在各個
Step
中都可以獲取到,但是
不推薦
存入
比較大
的資料到
ExecutionContext
中,因為這個物件的資料需要存入到資料庫中。
2、各個Step如果獲取到ExecutionContext中的值
類上加入
@StepScope
註解
透過
@Value(“#{jobExecutionContext[‘importPath’]}”)
來獲取
eg:
@Bean@StepScopepublic FlatFileItemReader
解釋:在程式例項化
FlatFileItemReader
的時候,此時是沒有jobExecutionContext的,那麼就會報錯,如果加上
@StepScope
,此時就沒有問題了。
@StepScope
表示到達Step階段才例項化這個Bean
3、FlatFileItemReader使用注意
當我們使用
FlatFileItemReader
來讀取我們的csv檔案時,此處需要返回
FlatFileItemReader
型別,而不能直接返回
ItemReader
,否則可能出現如下錯誤
Reader must be open before it can be read
4、實現步驟
1、匯入依賴,配置
1、匯入依賴
2、初始化SpringBatch資料庫
spring。datasource。username=rootspring。datasource。password=root@1993spring。datasource。url=jdbc:mysql://127。0。0。1:3306/spring-batch?useUnicode=true&characterEncoding=utf8&autoReconnectForPools=true&useSSL=falsespring。datasource。driver-class-name=com。mysql。cj。jdbc。Driver# 程式啟動時,預設不執行jobspring。batch。job。enabled=falsespring。batch。jdbc。initialize-schema=always# 初始化spring-batch資料庫指令碼spring。batch。jdbc。schema=classpath:org/springframework/batch/core/schema-mysql。sql
2、構建檔案讀取路徑
此處我的想法是,在
JobExecutionListener
中完成檔案路徑的獲取,並將之放入到
ExecutionContext
,然後在各個
Step
中就可以獲取到檔案路徑的值了。
/** * 在此監聽器中,獲取到具體的需要讀取的檔案路徑,並儲存到 ExecutionContext * * @author huan。fu * @date 2022/8/30 - 22:22 */@Slf4jpublic class AssemblyReadCsvPathListener implements JobExecutionListener { @Override public void beforeJob(JobExecution jobExecution) { ExecutionContext executionContext = jobExecution。getExecutionContext(); JobParameters jobParameters = jobExecution。getJobParameters(); String importDate = jobParameters。getString(“importDate”); log。info(“從 job parameter 中獲取的 importDate 引數的值為:[{}]”, importDate); String readCsvPath = “data/person。csv”; log。info(“根據日期組裝需要讀取的csv路徑為:[{}],此處排除日期,直接寫一個死的路徑”, readCsvPath); executionContext。putString(“importPath”, readCsvPath); } @Override public void afterJob(JobExecution jobExecution) { }}
3、構建Tasklet,輸出檔案路徑
@Slf4j@Component@StepScopepublic class PrintImportFilePathTaskLet implements Tasklet { @Value(“#{jobExecutionContext[‘importPath’]}”) private String importFilePath; @Value(“#{jobParameters[‘importDate’]}”) private String importDate; @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { log。info(“從job parameter 中獲取到的 importDate:[{}],從 jobExecutionContext 中獲取的 importPath:[{}]”, importDate, importFilePath); return RepeatStatus。FINISHED; }}
需要注意的是,此類上加入了
@StepScope
註解
4、編寫實體類
@AllArgsConstructor@Getter@ToStringpublic class Person { /** * 使用者名稱 */ private String username; /** * 年齡 */ private Integer age; /** * 性別 */ private String sex;}
5、編寫Job配置
@Configuration@AllArgsConstructor@Slf4jpublic class ImportPersonJobConfig { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; private final PrintImportFilePathTaskLet printImportFilePathTaskLet; private final ItemReader * TaskletStep是一個非常簡單的介面,僅有一個方法——execute。 * TaskletStep會反覆的呼叫這個方法直到獲取一個RepeatStatus。FINISHED返回或者丟擲一個異常。 * 所有的Tasklet呼叫都會包裝在一個事物中。 * * @return Step */ private Step printParametersAndContextVariables() { return stepBuilderFactory。get(“print-context-params”) 。tasklet(printImportFilePathTaskLet) // 當job重啟時,如果達到了3此,則該step不在執行 。startLimit(3) // 當job重啟時,如果該step的是已經處理完成即COMPLETED狀態時,下方給false表示該step不在重啟,即不在執行 。allowStartIfComplete(false) // 新增 step 監聽 。listener(new CustomStepExecutionListener()) 。build(); }} 6、編寫Job啟動類 @Component@Slf4jpublic class StartImportPersonJob { @Autowired private Job importPersonJob; @Autowired private JobLauncher jobLauncher; @PostConstruct public void startJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException { JobParameters jobParameters = new JobParametersBuilder() 。addString(“importDate”, LocalDate。of(2022, 08, 31)。format(DateTimeFormatter。ofPattern(“yyyyMMdd”))) 。toJobParameters(); JobExecution execution = jobLauncher。run(importPersonJob, jobParameters); log。info(“job invoked”); }} 7、自動配置SpringBatch @SpringBootApplication@EnableBatchProcessingpublic class SpringBatchReadCsvApplication { public static void main(String[] args) { SpringApplication。run(SpringBatchReadCsvApplication。class, args); }} 主要是 @EnableBatchProcessing 註解 5、執行結果 執行結果 6、完整程式碼 https://gitee。com/huan1993/spring-cloud-parent/tree/master/spring-batch/spring-batch-read-csv