2015-04-21 15 views
12

Tôi đang cố truyền các tham số an toàn từ tasklet đến một bước trong cùng một công việc.Làm thế nào để chuyển an toàn các thông số từ Tasklet sang bước khi chạy các lệnh song song

Công việc của tôi bao gồm 3 tasklets (bước 1, bước 2, step3) cái khác và cuối cùng một step4 (bộ xử lý, đọc, viết)

công việc này đã được thực hiện nhiều lần song song.

Trong bước 1 bên trong tasklet Tôi đánh giá param (hashId) thông qua dịch vụ web) hơn là tôi đang đi qua nó khắp nơi trên chuỗi của tôi cho đến độc giả của tôi (mà trên bước 4)

Trong bước 3 tôi đang tạo ra param mới được gọi là: filePath dựa trên hashid và tôi gửi nó đến bước 4 (đầu đọc) như một vị trí tài nguyên tập tin

Tôi đang sử dụng stepExecution để truyền tham số này (hashId và filePath).

tôi đã cố gắng 3 cách làm việc đó thông qua tasklet:

để vượt qua param (hashId từ bước 1 vào bước 2 và từ bước 2 vào bước 3) Tôi đang làm điều này:

chunkContext.getStepContext() 
     .getStepExecution() 
     .getExecutionContext() 
     .put("hashId", hashId); 

Trong step4 tôi đang populating filepath dựa trên hashId và vượt qua nó theo cách này để bước cuối cùng của tôi (đó là bộ xử lý đọc và một nhà văn)

public class DownloadFileTasklet implements Tasklet, StepExecutionListener { 
.. 

    @Override 
    public RepeatStatus execute(ChunkContext chunkContext, ExecutionContext  
    executionContext) throws IOException { 

    String hashId = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext().get("hashId"); 

      ... 

filepath="...hashId.csv"; 
//I used here executionContextPromotionListener in order to promote those keys 

     chunkContext.getStepContext() 
     .getStepExecution() 
     .getExecutionContext() 
     .put("filePath", filePath); 
    } 

logger.info("filePath + "for hashId=" + hashId); 

} 
@Override 
public void beforeStep(StepExecution stepExecution) { 
    this.stepExecution = stepExecution; 
} 

Chú ý rằng tôi in hashId và filepath giá trị đúng được tôi đã hoàn thành bước đó (bước 3). bởi các bản ghi chúng nhất quán và được điền như mong đợi

Tôi cũng thêm nhật ký trong trình đọc của mình để xem nhật ký các thông số tôi nhận được.

@Bean 
    @StepScope 
    public ItemStreamReader<MyDTO> reader(@Value("#{jobExecutionContext[filePath]}") String filePath) { 
       logger.info("test filePath="+filePath+"); 

     return itemReader; 
    } 

Khi tôi thực hiện công việc này ~ 10 lần tôi có thể thấy rằng giá trị param filepath được điền bằng giá trị công việc khác filepath khi thực hiện song song

Đây là cách tôi thúc đẩy phím của công việc với executionContextPromotionListener:

định nghĩa

công việc:

@Bean 
    public Job processFileJob() throws Exception { 
     return this.jobs.get("processFileJob"). 
       start.(step1). 
       next(step2) 
       next(downloadFileTaskletStep()). //step3 
       next(processSnidFileStep()).build(); //step4 

    } 

bước 3 định nghĩa

public Step downloadFileTaskletStep() { 
     return this.steps.get("downloadFileTaskletStep").tasklet(downloadFileTasklet()).listener(executionContextPromotionListener()).build(); 
    } 


    @Bean 
    public org.springframework.batch.core.listener.ExecutionContextPromotionListener executionContextPromotionListener() { 
     ExecutionContextPromotionListener executionContextPromotionListener = new ExecutionContextPromotionListener(); 
     executionContextPromotionListener.setKeys(new String[]{"filePath"}); 
     return executionContextPromotionListener; 
    } 

kết quả đề Cùng rối tung các params

tôi có thể theo dõi kết quả qua bảng cơ sở dữ liệu hàng loạt mùa xuân: batch_job_execution_context.short_context:

đây bạn sẽ nhìn thấy các filePatch mà được xây dựng bởi các hashid là không giống nhau vào nguồn gốc hashId // bản ghi không chính xác ///

{"map": [{"entry": [{"string": "totalRecords", "int": 5}, {"string": " segmentId "," long ": 13}, {" string ": [" tệpPath ","/etc/mydir/services/notification_processor/files/2015_04_22/f1c7b0f2180b7e266d36f87fcf6fb7aa.csv "]}, {" chuỗi ": [" hashId "" 20df39d201fffc7444423cfdf2f43789 "]}]}]}

Bây giờ nếu chúng ta kiểm tra hồ sơ khác mà họ có vẻ tốt nhưng luôn luôn một hoặc hai điều sai lầm

.

// hồ sơ đúng

{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**c490c8282628b894727fc2a4d6fc0cb5**.csv"]},{"string":["hashId","**c490c8282628b894727fc2a4d6fc0cb5**"]}]}]} 

{"map":[{"entry":[{"string":"totalRecords","int":5},{"string":"segmentId","long":13},{"string":["filePath","\/etc\/mydir\/services\/notification_processor\/files\/2015_04_22\/**2b21d3047208729192b87e90e4a868e4**.csv"]},{"string":["hashId","**2b21d3047208729192b87e90e4a868e4**"]}]}]} 

Bất cứ ý tưởng tại sao tôi có những vấn đề Threading

+0

Làm thế nào chính xác là bạn đang chạy "công việc" của bạn đồng thời? Mỗi công việc sẽ có bối cảnh riêng biệt, do đó các thông số sẽ là duy nhất cho mỗi lần thực hiện công việc. – Palcente

+0

Tôi chỉ thực hiện cùng một công việc nhiều lần cùng một lúc thông qua jobLauncher. Các tham số ban đầu là an toàn. nhưng tôi tạo mới param (filePath) bên trong một tasklet và tôi phải gửi nó cho người đọc của tôi mà trên bước tiếp theo. và giá trị đó không nhất quán khi tôi thực hiện nhiều công việc song song. giống như không an toàn thread – rayman

+0

chỉ vì tò mò hãy thử '@Bean @Scope (" nguyên mẫu ")' – Palcente

Trả lời

2

Rà soát phương pháp cố gắng của bạn:

  • Phương pháp 1 - Chỉnh sửa JobParameters JobParameters không thay đổi trong công việc nên cố gắng sửa đổi chúng trong khi thực hiện công việc sẽ không được thực hiện.
  • Phương pháp 2 - Chỉnh sửa JobParameters v2 Phương pháp 2 thực sự giống như phương pháp 1, bạn sẽ chỉ nhận được tham chiếu đến JobParameters một cách khác.
  • Phương pháp 3 - Sử dụng ExecutionContextPromotionListener. Đây là cách chính xác, nhưng bạn đang làm những việc không chính xác. ExecutionContextPromotionListener xem xét bước củaExecutionContext và sao chép các khóa bạn chỉ định cho công việc ExecutionContext. Bạn đang thêm các khóa trực tiếp vào Job ExecutionContext, đây là một ý tưởng tồi.

Vì vậy, trong ngắn hạn, Phương pháp 3 là gần nhất để sửa chữa, nhưng bạn nên có thêm các thuộc tính bạn muốn chia sẻ đến bước của ExecutionContext và sau đó cấu hình ExecutionContextPromotionListener để thúc đẩy các phím phù hợp với của Job ExecutionContext.

Mã này sẽ được cập nhật như sau:

chunkContext.getStepContext() 
      .getStepExecution() 
      .getExecutionContext() 
      .put("filePath", filePath); 
+0

@ Cảm ơn đã cho tôi một dẫn! chỉ cần một làm rõ: "bạn nên thêm các thuộc tính mà bạn muốn chia sẻ với ExecutionContext của bước .." Không chắc tôi đã hiểu cách thực hiện đề xuất của bạn .. Tôi cấu hình ExecutionContextPromotionListener bằng các khóa và thêm nó làm trình lắng nghe cho tệp downloadFileTaskletStep. Bạn sẽ sửa đổi triển khai đó như thế nào? cảm ơn bạn!! – rayman

+0

Vì vậy, tôi nên sử dụng chunkContext và không phải là bướcExecution mà tôi đã khởi tạo tại beforeStep – rayman

+0

Xin chào Michael, tôi đã thử nó. Tôi đã thực hiện công việc đó 14 lần cùng một lúc và tôi có thể thấy một bản ghi không phù hợp. có nghĩa là bên trong tasklet tôi đang in hashid = x và khi tôi đến bước (trong cùng một công việc) tôi đang in hashid = y (của việc thực hiện công việc khác). Tôi có thể theo dõi điều đó thông qua bảng cơ sở dữ liệu: batch_job_execution_context và coulmn short_context – rayman

Các vấn đề liên quan