Tôi đang cố truyền các tham số an toàn từ tasklet đến một bước trong cùng một công việc.Làm thế nào để chuyển an toàn các thông số từ Tasklet sang bước khi chạy các lệnh song song
Công việc của tôi bao gồm 3 tasklets (bước 1, bước 2, step3) cái khác và cuối cùng một step4 (bộ xử lý, đọc, viết)
công việc này đã được thực hiện nhiều lần song song.
Trong bước 1 bên trong tasklet Tôi đánh giá param (hashId) thông qua dịch vụ web) hơn là tôi đang đi qua nó khắp nơi trên chuỗi của tôi cho đến độc giả của tôi (mà trên bước 4)
Trong bước 3 tôi đang tạo ra param mới được gọi là: filePath dựa trên hashid và tôi gửi nó đến bước 4 (đầu đọc) như một vị trí tài nguyên tập tin
Tôi đang sử dụng stepExecution để truyền tham số này (hashId và filePath).
tôi đã cố gắng 3 cách làm việc đó thông qua tasklet:
để vượt qua param (hashId từ bước 1 vào bước 2 và từ bước 2 vào bước 3) Tôi đang làm điều này:
chunkContext.getStepContext()
.getStepExecution()
.getExecutionContext()
.put("hashId", hashId);
Trong step4 tôi đang populating filepath dựa trên hashId và vượt qua nó theo cách này để bước cuối cùng của tôi (đó là bộ xử lý đọc và một nhà văn)
public class DownloadFileTasklet implements Tasklet, StepExecutionListener {
..
@Override
public RepeatStatus execute(ChunkContext chunkContext, ExecutionContext
executionContext) throws IOException {
String hashId = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().get("hashId");
...
filepath="...hashId.csv";
//I used here executionContextPromotionListener in order to promote those keys
chunkContext.getStepContext()
.getStepExecution()
.getExecutionContext()
.put("filePath", filePath);
}
logger.info("filePath + "for hashId=" + hashId);
}
@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
Chú ý rằng tôi in hashId và filepath giá trị đúng được tôi đã hoàn thành bước đó (bước 3). bởi các bản ghi chúng nhất quán và được điền như mong đợi
Tôi cũng thêm nhật ký trong trình đọc của mình để xem nhật ký các thông số tôi nhận được.
@Bean
@StepScope
public ItemStreamReader<MyDTO> reader(@Value("#{jobExecutionContext[filePath]}") String filePath) {
logger.info("test filePath="+filePath+");
return itemReader;
}
Khi tôi thực hiện công việc này ~ 10 lần tôi có thể thấy rằng giá trị param filepath được điền bằng giá trị công việc khác filepath khi thực hiện song song
Đây là cách tôi thúc đẩy phím của công việc với executionContextPromotionListener:
định nghĩacông việc:
@Bean
public Job processFileJob() throws Exception {
return this.jobs.get("processFileJob").
start.(step1).
next(step2)
next(downloadFileTaskletStep()). //step3
next(processSnidFileStep()).build(); //step4
}
bước 3 định nghĩa
public Step downloadFileTaskletStep() {
return this.steps.get("downloadFileTaskletStep").tasklet(downloadFileTasklet()).listener(executionContextPromotionListener()).build();
}
@Bean
public org.springframework.batch.core.listener.ExecutionContextPromotionListener executionContextPromotionListener() {
ExecutionContextPromotionListener executionContextPromotionListener = new ExecutionContextPromotionListener();
executionContextPromotionListener.setKeys(new String[]{"filePath"});
return executionContextPromotionListener;
}
kết quả đề Cùng rối tung các params
tôi có thể theo dõi kết quả qua bảng cơ sở dữ liệu hàng loạt mùa xuân: batch_job_execution_context.short_context:
đây bạn sẽ nhìn thấy các filePatch mà được xây dựng bởi các hashid là không giống nhau vào nguồn gốc hashId // bản ghi không chính xác ///
{"map": [{"entry": [{"string": "totalRecords", "int": 5}, {"string": " segmentId "," long ": 13}, {" string ": [" tệpPath ","/etc/mydir/services/notification_processor/files/2015_04_22/f1c7b0f2180b7e266d36f87fcf6fb7aa.csv "]}, {" chuỗi ": [" hashId "" 20df39d201fffc7444423cfdf2f43789 "]}]}]}
Bây giờ nếu chúng ta kiểm tra hồ sơ khác mà họ có vẻ tốt nhưng luôn luôn một hoặc hai điều sai lầm
.// hồ sơ đúng
{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**c490c8282628b894727fc2a4d6fc0cb5**.csv"]},{"string":["hashId","**c490c8282628b894727fc2a4d6fc0cb5**"]}]}]}
{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**2b21d3047208729192b87e90e4a868e4**.csv"]},{"string":["hashId","**2b21d3047208729192b87e90e4a868e4**"]}]}]}
Bất cứ ý tưởng tại sao tôi có những vấn đề Threading
Làm thế nào chính xác là bạn đang chạy "công việc" của bạn đồng thời? Mỗi công việc sẽ có bối cảnh riêng biệt, do đó các thông số sẽ là duy nhất cho mỗi lần thực hiện công việc. – Palcente
Tôi chỉ thực hiện cùng một công việc nhiều lần cùng một lúc thông qua jobLauncher. Các tham số ban đầu là an toàn. nhưng tôi tạo mới param (filePath) bên trong một tasklet và tôi phải gửi nó cho người đọc của tôi mà trên bước tiếp theo. và giá trị đó không nhất quán khi tôi thực hiện nhiều công việc song song. giống như không an toàn thread – rayman
chỉ vì tò mò hãy thử '@Bean @Scope (" nguyên mẫu ")' – Palcente