Tôi đã nghiên cứu khả năng cho Websphere MQ như một nguồn dữ liệu cho phát trực tiếp vì nó cần thiết trong một trong các trường hợp sử dụng của chúng tôi. Tôi đã biết rằng MQTT là giao thức hỗ trợ giao tiếp từ cấu trúc dữ liệu MQ nhưng vì tôi là người mới bắt đầu phát trực tuyến, tôi cần một số ví dụ làm việc cho cùng. Có ai cố gắng kết nối MQ với phát trực tiếp tia lửa không. Hãy nghĩ ra cách tốt nhất để làm như vậy.Websphere MQ là nguồn dữ liệu cho Apache Spark Streaming
13
A
Trả lời
3
Vì vậy, tôi gửi bài ở đây mã làm việc cho CustomMQReceiver nối các Websphere MQ và đọc dữ liệu:
public class CustomMQReciever extends Receiver<String> { String host = null;
int port = -1;
String qm=null;
String qn=null;
String channel=null;
transient Gson gson=new Gson();
transient MQQueueConnection qCon= null;
Enumeration enumeration =null;
public CustomMQReciever(String host , int port, String qm, String channel, String qn) {
super(StorageLevel.MEMORY_ONLY_2());
this.host = host;
this.port = port;
this.qm=qm;
this.qn=qn;
this.channel=channel;
}
public void onStart() {
// Start the thread that receives data over a connection
new Thread() {
@Override public void run() {
try {
initConnection();
receive();
}
catch (JMSException ex)
{
ex.printStackTrace();
}
}
}.start();
}
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a MQ connection and receive data until receiver is stopped */
private void receive() {
System.out.print("Started receiving messages from MQ");
try {
JMSMessage receivedMessage= null;
while (!isStopped() && enumeration.hasMoreElements())
{
receivedMessage= (JMSMessage) enumeration.nextElement();
String userInput = convertStreamToString(receivedMessage);
//System.out.println("Received data :'" + userInput + "'");
store(userInput);
}
// Restart in an attempt to connect again when server is active again
//restart("Trying to connect again");
stop("No More Messages To read !");
qCon.close();
System.out.println("Queue Connection is Closed");
}
catch(Exception e)
{
e.printStackTrace();
restart("Trying to connect again");
}
catch(Throwable t) {
// restart if there is any other error
restart("Error receiving data", t);
}
}
public void initConnection() throws JMSException
{
MQQueueConnectionFactory conFactory= new MQQueueConnectionFactory();
conFactory.setHostName(host);
conFactory.setPort(port);
conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
conFactory.setQueueManager(qm);
conFactory.setChannel(channel);
qCon= (MQQueueConnection) conFactory.createQueueConnection();
MQQueueSession qSession=(MQQueueSession) qCon.createQueueSession(false, 1);
MQQueue queue=(MQQueue) qSession.createQueue(qn);
MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);
qCon.start();
enumeration= browser.getEnumeration();
}
@Override
public StorageLevel storageLevel() {
return StorageLevel.MEMORY_ONLY_2();
}
}
1
Tôi tin rằng bạn có thể sử dụng JMS để kết nối để kết nối Websphere MQ, và Apache Camel thể được sử dụng để kết nối với Websphere MQ. Bạn có thể tạo một Receiver tùy chỉnh như vậy (lưu ý rằng mô hình này cũng có thể được sử dụng mà không JMS):
class JMSReceiver(topicName: String, cf: String, jndiProviderURL: String)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) with Serializable {
//Transient as this will get passed to the Workers from the Driver
@transient
var camelContextOption: Option[DefaultCamelContext] = None
def onStart() = {
camelContextOption = Some(new DefaultCamelContext())
val camelContext = camelContextOption.get
val env = new Properties()
env.setProperty("java.naming.factory.initial", "???")
env.setProperty("java.naming.provider.url", jndiProviderURL)
env.setProperty("com.webmethods.jms.clientIDSharing", "true")
val namingContext = new InitialContext(env); //using the properties file to create context
//Lookup Connection Factory
val connectionFactory = namingContext.lookup(cf).asInstanceOf[javax.jms.ConnectionFactory]
camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory))
val builder = new RouteBuilder() {
def configure() = {
from(s"jms://topic:$topicName?jmsMessageType=Object&clientId=$clientId&durableSubscriptionName=${topicName}_SparkDurable&maxConcurrentConsumers=10")
.process(new Processor() {
def process(exchange: Exchange) = {
exchange.getIn.getBody match {
case s: String => store(s)
}
}
})
}
}
}
builders.foreach(camelContext.addRoutes)
camelContext.start()
}
def onStop() = if(camelContextOption.isDefined) camelContextOption.get.stop()
}
Sau đó bạn có thể tạo một DStream sự kiện của bạn như sau:
val myDStream = ssc.receiverStream(new JMSReceiver("MyTopic", "MyContextFactory", "MyJNDI"))
Các vấn đề liên quan
- 1. Apache Spark streaming vs Spring XD Streams
- 2. Apache Spark Streaming, Cách xử lý lỗi phụ thuộc Downstream
- 3. WebSphere MQ .NET API
- 4. Websphere MQ v8 - MQRC_NOT_AUTHORIZED - 2035
- 5. Kafka Streaming + Spark Streaming + Học máy
- 6. Phiên bản WebSphere MQ Client mới nhất là gì?
- 7. Các nhóm tin nhắn trong WebSphere MQ
- 8. Kiên trì Spark streaming ra
- 9. Cách dữ liệu nhị phân 'Ống' trong Apache Spark
- 10. Spark Streaming với bảng tra cứu động
- 11. Cách truyền dữ liệu qua nhiều khoảng thời gian theo đợt trong Spark Streaming
- 12. Kết hợp các kết quả từ hàng loạt RDD với streaming RDD trong Apache Spark
- 13. dữ liệu streaming qua ajax
- 14. Apache Camel với IBM MQ
- 15. Apache Spark vs Apache Spark 2
- 16. Kết nối với Websphere MQ bằng Java với SSL/Keystore
- 17. WebSphere MQ .NET - Sự khác biệt giữa IBM.XMS.dll và amqmdnet.dll
- 18. Kịch bản yêu cầu/trả lời của IBM WebSphere MQ
- 19. Apache POI Streaming (SXSSF) để đọc
- 20. Điều gì có nghĩa là bởi chế độ ràng buộc trong WebSphere MQ?
- 21. Cấu trúc Lambda với Apache Spark
- 22. Web dữ liệu streaming trong Java EE
- 23. Làm thế nào để chạy Nguồn Apache Spark trong C#
- 24. socket.io streaming dữ liệu nhị phân
- 25. Spark Streaming với phụ thuộc Kafka 2.0.0
- 26. Apache Spark và Apache Storm
- 27. Rails 3.1.1 HTTP streaming với Apache + Passenger
- 28. Apache-Spark: Bản đồ (_._ 2) viết tắt là gì?
- 29. Apache Spark Stderr và Stdout
- 30. Apache Spark vs Akka
Bỏ phiếu để đóng như off-topic vì nó không phù hợp với nguyên tắc câu hỏi của Stack Overflow. Tôi khuyên bạn nên hỏi những câu hỏi về kiến trúc và tính khả thi rộng rãi này tại http://mqseries.net hoặc một trong những diễn đàn MQ trực tuyến khác. –
Tôi nghĩ rằng nó có thể chỉ là một vấn đề ngữ âm. Thay vì mơ hồ _ "Tôi đã nhìn vào điều này. Giải pháp tốt nhất là gì?" _ Bạn có thể hỏi một câu hỏi trực tiếp. _ "Làm thế nào để đọc dữ liệu từ Websphere MQ thông qua Apache Spark?" _ Nếu bạn biết thêm về phía Websphere MQ của câu hỏi, bạn có thể thêm nhiều thông tin hơn về điều đó. Nó có hỗ trợ SQL không? Làm thế nào để bạn thường truy vấn nó? Những gì khách hàng tồn tại cho nó? Sau đó, một người biết Spark có thể giúp bạn. –