2012-09-11 28 views
5

Tôi muốn biết liệu Camel có thể điều chỉnh dựa trên nội dung của trao đổi hay không.Điều chỉnh dựa trên nội dung

Tình huống như sau: Tôi phải gọi dịch vụ web qua xà phòng. Trong số đó, các tham số được gửi tới dịch vụ web đó có một customerId. Vấn đề là webservice gửi lại một lỗi nếu có nhiều hơn 1 yêu cầu mỗi phút cho một customerId nhất định.

Tôi tự hỏi liệu có thể thực hiện điều chỉnh trên mỗi customerId với Camel không. Vì vậy, điều chỉnh không nên được thực hiện cho tất cả các tin nhắn nhưng chỉ cho các tin nhắn với cùng một customerId.

Hãy cho tôi biết cách tôi có thể triển khai điều này hoặc nếu tôi cần làm rõ câu hỏi của mình.

+2

của nó thực sự là một ý tưởng tốt để hỗ trợ thêm cho các nhóm trong hiện Throttler EIP. Điều đó nói rằng mặc dù các tin nhắn sẽ được giữ trong bộ nhớ trong khi họ chờ đợi để được phát hành. Hãy để tôi đăng nhập một JIRA để tăng cường này. –

+0

Chỉ cần cho các hồ sơ ở đây là vé: https://issues.apache.org/jira/browse/CAMEL-5599 –

+0

@ClausIbsen - nó được một thời gian, nhưng tôi muốn ném hỗ trợ đằng sau tính năng đó! – Denise

Trả lời

1

ActiveMQ Message Groups được thiết kế để xử lý trường hợp này. Vì vậy, nếu bạn có thể giới thiệu một hàng đợi JMS hop trong tuyến đường của bạn, sau đó chỉ cần đặt tiêu đề JMSXGroupId cho customerId. Sau đó, trong một tuyến đường khác, bạn có thể tiêu thụ từ hàng đợi này và gửi đến dịch vụ web của bạn để có được hành vi mà bạn mô tả.

cũng thấy http://camel.apache.org/parallel-processing-and-ordering.html để biết thêm thông tin ...

1

Trong khi ActiveMQ nhắn nhóm chắc chắn sẽ giải quyết xử lý song song của độc đáo ID khách hàng, đánh giá của tôi Claus là đúng đó giới thiệu một ga đối với từng nhóm duy nhất thể hiện một tính năng chưa thực hiện cho Camel/ActiveMQ.

Nhóm thư sẽ không đáp ứng SLA được mô tả. Mặc dù mỗi nhóm thông điệp (tương ứng với ID khách hàng) sẽ được xử lý theo một chuỗi cho mỗi nhóm, miễn là yêu cầu mất ít hơn một phút để nhận phản hồi, yêu cầu của một yêu cầu mỗi phút cho mỗi khách hàng sẽ không được thực thi .

Điều đó nói rằng, tôi rất muốn biết liệu có thể kết hợp các Nhóm thông báo và chiến lược ga theo cách mô phỏng yêu cầu tính năng trong JIRA hay không. Nỗ lực của tôi cho đến nay đã thất bại. Tôi đã suy nghĩ một cái gì đó dọc theo những dòng:

<route> 
    <from uri="activemq:pending?maxConcurrentConsumers=10"/> 
    <throttle timePeriodMillis="60000"> 
    <constant>1</constant> 
    <to uri="mock:endpoint"/> 
    </throttle> 
</route> 

Tuy nhiên, ga dường như được áp dụng cho toàn bộ các yêu cầu di chuyển đến thiết bị đầu cuối, và không phải đến từng người tiêu dùng cá nhân. Tôi phải thừa nhận, tôi hơi ngạc nhiên khi thấy hành vi đó. Kỳ vọng của tôi là ga sẽ áp dụng riêng cho từng người tiêu dùng, điều này sẽ đáp ứng SLA trong câu hỏi ban đầu, miễn là các thông báo bao gồm ID khách hàng trong tiêu đề JMSXGroupId.

0

Tôi đã gặp một vấn đề tương tự và cuối cùng đã đưa ra giải pháp được mô tả ở đây.

giả định của tôi là:

  • thứ tự của thông điệp là không quan trọng (mặc dù nó có thể được giải quyết bằng cách tái sequencer)
  • Tổng số lượng tin nhắn mỗi ID của khách hàng là không lớn nên thời gian chạy không bão hòa .

Cách tiếp cận giải pháp:

  • Run aggregator trong 1 phút trong khi sử dụng ID khách hàng để lắp ráp thông điệp với ID khách hàng tương tự vào một danh sách
  • Sử dụng Splitter để phân chia các danh sách thành thông điệp cá nhân
  • Gửi tin nhắn đầu tiên từ bộ chia đến dịch vụ thực tế
  • Định tuyến lại phần còn lại của danh sách trở lại bộ tổng hợp.
phiên bản

Java DSL là một chút dễ dàng hơn để hiểu: phiên bản XML

final AggregationStrategy aggregationStrategy = AggregationStrategies.flexible(Object.class) 
     .accumulateInCollection(ArrayList.class); 

from("direct:start") 
    .log("Receiving ${body}") 
    .aggregate(header("customerID"), aggregationStrategy).completionTimeout(60000) 
     .log("Aggregate: releasing ${body}") 
     .split(body()) 
     .choice() 
      .when(header(Exchange.SPLIT_INDEX).isEqualTo(0)) 
       .log("*** Processing: ${body}") 
       .to("mock:result") 
      .otherwise() 
       .to("seda:delay") 
     .endChoice(); 

from("seda:delay") 
    .delay(0) 
    .to("direct:start"); 

mùa xuân trông giống như sau:

<!-- this is our aggregation strategy defined as a spring bean --> 
<!-- see http://stackoverflow.com/questions/27404726/how-does-one-set-the-pick-expression-for-apache-camels-flexibleaggregationstr --> 
<bean id="_flexible0" class="org.apache.camel.util.toolbox.FlexibleAggregationStrategy"/> 
<bean id="_flexible2" factory-bean="_flexible0" factory-method="accumulateInCollection"> 
    <constructor-arg value="java.util.ArrayList" /> 
</bean> 

<camelContext xmlns="http://camel.apache.org/schema/spring"> 
     <route> 
      <from uri="direct:start"/> 
      <log message="Receiving ${body}"/> 
      <aggregate strategyRef="_flexible2" completionTimeout="60000" > 
       <correlationExpression> 
        <xpath>/order/@customerID</xpath> 
       </correlationExpression> 
       <log message="Aggregate: releasing ${body}"/> 
       <split> 
        <simple>${body}</simple> 
        <choice> 
         <when> 
          <simple>${header.CamelSplitIndex} == 0</simple> 
          <log message="*** Processing: ${body}"/> 
          <to uri="mock:result"/> 
         </when> 
         <otherwise> 
          <log message="--- Delaying: ${body}"/> 
          <to uri="seda:delay" /> 
         </otherwise> 
        </choice> 
       </split> 
      </aggregate> 
     </route> 

     <route> 
      <from uri="seda:delay"/> 
      <to uri="direct:start"/> 
     </route> 
</camelContext> 
Các vấn đề liên quan