2013-03-20 38 views
6

Tôi hiểu rằng câu hỏi này trùng lặp câu hỏi tại using rabbitmq to send a message not string but structGửi một đối tượng sử dụng RabbitMQ

nếu để làm điều này bằng cách đầu tiên

first way

tôi có dấu vết sau:

java.io.EOFException 
at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2304) 
at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2773) 
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:798) 
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:298) 
at com.mdnaRabbit.worker.data.Data.fromBytes(Data.java:78) 
at com.mdnaRabbit.worker.App.main(App.java:41) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:601) 
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) 

Tôi đã kiểm tra và shure thư đó chuyển đổi thành byte hoàn toàn tốt trong lớp người gửi, nhưng người tiêu dùng không thể r đánh lừa nó.

đây là lớp sản xuất của tôi:

package com.mdnaRabbit.newt; 

import java.io.IOException; 
import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.MessageProperties; 
import org.apache.commons.lang.SerializationUtils; 
import com.mdnaRabbit.worker.data.Data; 

public class App { 

    private static final String TASK_QUEUE_NAME = "task_queue"; 

    public static void main(String[] argv) throws IOException{ 

     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 

     channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); 

     int i = 0; 

     do { 
      Data message = getMessage(); 
      byte [] byteMessage = message.getBytes(); 
      //System.out.println(byteMessage); 
      channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, byteMessage); 
      System.out.println(" [" + (i+1) + "] message Sent" + Data.fromBytes(byteMessage).getBody()); 
      i++; 
     } while (i<15); 

     channel.close(); 
     connection.close(); 
    } 

    private static Data getMessage(){ 
     Data data = new Data(); 
     data.setHeader("header"); 
     data.setDomainId("abc.com"); 
     data.setReceiver("me"); 
     data.setSender("he"); 
     data.setBody("body"); 
     return data; 
    } 

    private static String joinStrings(String[] strings, String delimiter){ 
     int length = strings.length; 
     if (length == 0) return ""; 
     StringBuilder words = new StringBuilder(strings[0]); 
     for (int i = 1; i < length; i++){ 
      words.append(delimiter).append(strings[i]); 
     } 
     return words.toString(); 
    } 
} 

đây là lớp người tiêu dùng của tôi:

package com.mdnaRabbit.worker; 

import java.io.IOException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 

import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.QueueingConsumer; 
import com.mdnaRabbit.worker.data.Data; 
import org.apache.commons.lang.SerializationUtils; 

public class App { 

    private static final String TASK_QUEUE_NAME = "task_queue"; 
    private static int i = 0; 
    public static void main(String[] argv) 
      throws IOException, 
      InterruptedException{ 

     ExecutorService threader = Executors.newFixedThreadPool(20); 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(threader); 
     final Channel channel = connection.createChannel(); 

     channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); 
     System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 

     channel.basicQos(20); 

     final QueueingConsumer consumer = new QueueingConsumer(channel); 
     channel.basicConsume(TASK_QUEUE_NAME, false, consumer); 

     try { 

      while (true) { 

         try {QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
          Data message = Data.fromBytes(delivery.getBody()); 
          //Data message = (Data) SerializationUtils.deserialize(delivery.getBody()); 

          System.out.println(" [" + (i++) +"] Received" + message.getBody()); 

          channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
         }catch (Exception e){ 
         } 
        } 
     } catch (Exception e){ 
      e.printStackTrace(); 
     } 
     channel.close(); 
     connection.close(); 
    } 

} 

đây là lớp dữ liệu của tôi:

package com.mdnaRabbit.worker.data; 

import java.io.*; 
import java.util.logging.Level; 
import java.util.logging.Logger; 

public class Data implements Serializable{ 

    public String header; 
    public String body; 
    public String domainId; 
    public String sender; 
    public String receiver; 

    public void setHeader(String head){ 
     this.header = head; 
    } 

    public String getHeader(){ 
     return header; 
    } 

    public void setBody(String body){ 
     this.body = body; 
    } 

    public String getBody(){ 
     return body; 
    } 

    public void setDomainId(String domainId){ 
     this.domainId = domainId; 
    } 

    public String getDomainId(){ 
     return domainId; 
    } 

    public void setSender(String sender){ 
     this.sender = sender; 
    } 

    public String getSender(){ 
     return sender; 
    } 

    public String getReceiver(){ 
     return receiver; 
    } 

    public void setReceiver(String receiver){ 
     this.receiver = receiver; 
    } 


    public byte[] getBytes() { 
     byte[]bytes; 
     ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
     try{ 
      ObjectOutputStream oos = new ObjectOutputStream(baos); 
      oos.writeObject(this); 
      oos.flush(); 
      oos.reset(); 
      bytes = baos.toByteArray(); 
      oos.close(); 
      baos.close(); 
     } catch(IOException e){ 
      bytes = new byte[] {}; 
      Logger.getLogger("bsdlog").log(Level.ALL, "unable to write to output stream" + e); 
     } 
     return bytes; 
    } 

    public static Data fromBytes(byte[] body) { 
     Data obj = null; 
     try { 
      ByteArrayInputStream bis = new ByteArrayInputStream(body); 
      ObjectInputStream ois = new ObjectInputStream(bis); 
      obj = (Data) ois.readObject(); 
      ois.close(); 
      bis.close(); 
     } 
     catch (IOException e) { 
      e.printStackTrace(); 
     } 
     catch (ClassNotFoundException ex) { 
      ex.printStackTrace(); 
     } 
     return obj; 
    } 
} 

tôi luôn luôn dường như người tiêu dùng nhận được thông điệp , bởi vì khi tôi không cố biến đổi nó thành đối tượng và chỉ viết System.out.println(delivery.getBody) nó hiển thị byte

+0

Tôi đã cố định các lỗi trong câu trả lời khác của tôi mà dẫn đến sự nhầm lẫn với tin nhắn và DataMessage lớp – robthewolf

Trả lời

4

Dường như mảng byte bạn nhận được trống. Điều này xảy ra vì điều này:

} catch(IOException e){ 
     bytes = new byte[] {}; 
    } 

Khi không có ngoại lệ, mã không được thông báo và chỉ gửi một mảng trống thay thế. Bạn nên đăng nhập ít nhất là lỗi.

Trường hợp ngoại lệ được tạo ra có thể do bạn đang cố gắng sắp xếp một lớp không thể tuần tự hóa được. Để thực hiện một lớp serializable bạn phải thêm "thực hiện Serializable" để khai của nó:

public class Data implements Serializable { 
+0

làm serializable không gây ra bất kỳ cải tiến –

+0

Tôi đã tìm thấy một lỗi trong mã của mình: Tôi đã không đặt giá trị trong lớp Dữ liệu. Nhưng ngay cả sau này tôi có cùng một vấn đề –

+0

Tôi đã tìm thấy giải pháp ở đây: [link] (http://stackoverflow.com/a/13174951/2082631), nhưng nó không hoạt động anyway –

Các vấn đề liên quan