2015-05-25 18 views
13

Tôi đã nghiên cứu khả năng cho Websphere MQ như một nguồn dữ liệu cho phát trực tiếp vì nó cần thiết trong một trong các trường hợp sử dụng của chúng tôi. Tôi đã biết rằng MQTT là giao thức hỗ trợ giao tiếp từ cấu trúc dữ liệu MQ nhưng vì tôi là người mới bắt đầu phát trực tuyến, tôi cần một số ví dụ làm việc cho cùng. Có ai cố gắng kết nối MQ với phát trực tiếp tia lửa không. Hãy nghĩ ra cách tốt nhất để làm như vậy.Websphere MQ là nguồn dữ liệu cho Apache Spark Streaming

+1

Bỏ phiếu để đóng như off-topic vì nó không phù hợp với nguyên tắc câu hỏi của Stack Overflow. Tôi khuyên bạn nên hỏi những câu hỏi về kiến ​​trúc và tính khả thi rộng rãi này tại http://mqseries.net hoặc một trong những diễn đàn MQ trực tuyến khác. –

+0

Tôi nghĩ rằng nó có thể chỉ là một vấn đề ngữ âm. Thay vì mơ hồ _ "Tôi đã nhìn vào điều này. Giải pháp tốt nhất là gì?" _ Bạn có thể hỏi một câu hỏi trực tiếp. _ "Làm thế nào để đọc dữ liệu từ Websphere MQ thông qua Apache Spark?" _ Nếu bạn biết thêm về phía Websphere MQ của câu hỏi, bạn có thể thêm nhiều thông tin hơn về điều đó. Nó có hỗ trợ SQL không? Làm thế nào để bạn thường truy vấn nó? Những gì khách hàng tồn tại cho nó? Sau đó, một người biết Spark có thể giúp bạn. –

Trả lời

3

Vì vậy, tôi gửi bài ở đây mã làm việc cho CustomMQReceiver nối các Websphere MQ và đọc dữ liệu:

public class CustomMQReciever extends Receiver<String> { String host = null; 
int port = -1; 
String qm=null; 
String qn=null; 
String channel=null; 
transient Gson gson=new Gson(); 
transient MQQueueConnection qCon= null; 

Enumeration enumeration =null; 

public CustomMQReciever(String host , int port, String qm, String channel, String qn) { 
    super(StorageLevel.MEMORY_ONLY_2()); 
    this.host = host; 
    this.port = port; 
    this.qm=qm; 
    this.qn=qn; 
    this.channel=channel; 

} 

public void onStart() { 
    // Start the thread that receives data over a connection 
    new Thread() { 
     @Override public void run() { 
      try { 
       initConnection(); 
       receive(); 
      } 
      catch (JMSException ex) 
      { 
       ex.printStackTrace(); 
      } 
     } 
    }.start(); 
} 
public void onStop() { 
    // There is nothing much to do as the thread calling receive() 
    // is designed to stop by itself isStopped() returns false 
} 

/** Create a MQ connection and receive data until receiver is stopped */ 
private void receive() { 
    System.out.print("Started receiving messages from MQ"); 

    try { 

    JMSMessage receivedMessage= null; 

     while (!isStopped() && enumeration.hasMoreElements()) 
     { 

      receivedMessage= (JMSMessage) enumeration.nextElement(); 
      String userInput = convertStreamToString(receivedMessage); 
      //System.out.println("Received data :'" + userInput + "'"); 
      store(userInput); 
     } 

     // Restart in an attempt to connect again when server is active again 
     //restart("Trying to connect again"); 

     stop("No More Messages To read !"); 
     qCon.close(); 
     System.out.println("Queue Connection is Closed"); 

    } 
    catch(Exception e) 
    { 
     e.printStackTrace(); 
     restart("Trying to connect again"); 
    } 
    catch(Throwable t) { 
     // restart if there is any other error 
     restart("Error receiving data", t); 
    } 
    } 

    public void initConnection() throws JMSException 
{ 
    MQQueueConnectionFactory conFactory= new MQQueueConnectionFactory(); 
    conFactory.setHostName(host); 
    conFactory.setPort(port); 
    conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP); 
    conFactory.setQueueManager(qm); 
    conFactory.setChannel(channel); 


    qCon= (MQQueueConnection) conFactory.createQueueConnection(); 
    MQQueueSession qSession=(MQQueueSession) qCon.createQueueSession(false, 1); 
    MQQueue queue=(MQQueue) qSession.createQueue(qn); 
    MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue); 
    qCon.start(); 

    enumeration= browser.getEnumeration(); 
    } 

@Override 
public StorageLevel storageLevel() { 
    return StorageLevel.MEMORY_ONLY_2(); 
} 
} 
1

Tôi tin rằng bạn có thể sử dụng JMS để kết nối để kết nối Websphere MQ, và Apache Camel thể được sử dụng để kết nối với Websphere MQ. Bạn có thể tạo một Receiver tùy chỉnh như vậy (lưu ý rằng mô hình này cũng có thể được sử dụng mà không JMS):

class JMSReceiver(topicName: String, cf: String, jndiProviderURL: String) 
    extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) with Serializable { 
    //Transient as this will get passed to the Workers from the Driver 
    @transient 
    var camelContextOption: Option[DefaultCamelContext] = None 

    def onStart() = { 
    camelContextOption = Some(new DefaultCamelContext()) 
    val camelContext = camelContextOption.get 
    val env = new Properties() 
    env.setProperty("java.naming.factory.initial", "???") 
    env.setProperty("java.naming.provider.url", jndiProviderURL) 
    env.setProperty("com.webmethods.jms.clientIDSharing", "true") 
    val namingContext = new InitialContext(env); //using the properties file to create context 

    //Lookup Connection Factory 
    val connectionFactory = namingContext.lookup(cf).asInstanceOf[javax.jms.ConnectionFactory] 
    camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)) 

    val builder = new RouteBuilder() { 
     def configure() = { 
      from(s"jms://topic:$topicName?jmsMessageType=Object&clientId=$clientId&durableSubscriptionName=${topicName}_SparkDurable&maxConcurrentConsumers=10") 
      .process(new Processor() { 
      def process(exchange: Exchange) = { 
       exchange.getIn.getBody match { 
       case s: String => store(s) 
       } 
      } 
      }) 
     } 
     } 
    } 
    builders.foreach(camelContext.addRoutes) 
    camelContext.start() 
    } 

    def onStop() = if(camelContextOption.isDefined) camelContextOption.get.stop() 
} 

Sau đó bạn có thể tạo một DStream sự kiện của bạn như sau:

val myDStream = ssc.receiverStream(new JMSReceiver("MyTopic", "MyContextFactory", "MyJNDI")) 
Các vấn đề liên quan