2017-08-08 19 views
8

Tôi đang làm việc trên API luồng nhân bản sau khi tải. Trong khi làm việc trên nó đi qua hành vi bất thường.Tại sao Postgres Replication Stream không hoạt động khi được sử dụng trong chức năng riêng biệt?

Khi tôi sử dụng khe nhân bản viết toàn bộ mã bên trong khối chính, mọi thứ đều hoạt động tốt.

public class Server implements Config { 

public static void main(String[] args) { 
    Properties prop = new Properties(); 
    prop.load(new FileInputStream(System.getProperty("prop"))); 

    String user = prop.getProperty("user"); 
    String password = prop.getProperty("password"); 
    String url = prop.getProperty("url"); 

    Properties props = new Properties(); 
    PGProperty.USER.set(props, user); 
    PGProperty.PASSWORD.set(props, password); 
    PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4"); 
    PGProperty.REPLICATION.set(props, "database"); 
    PGProperty.PREFER_QUERY_MODE.set(props, "simple"); 

    Connection conn= null; 
    PGConnection replicationConnection= null; 
    PGReplicationStream stream = null; 

     conn = DriverManager.getConnection(url, props); 
     replicationConnection = conn.unwrap(PGConnection.class); 
     stream = replicationConnection.getReplicationAPI().replicationStream().logical() 
       .withSlotName("replication_slot") 
       .withSlotOption("include-xids", true) 
       .withSlotOption("include-timestamp", "on") 
       .withSlotOption("skip-empty-xacts", true) 
       .withStatusInterval(20, TimeUnit.SECONDS).start(); 

    while (true) { 
      ByteBuffer msg; 
      try { 
       msg = stream.readPending();   

      if (msg == null) { 
       TimeUnit.MILLISECONDS.sleep(10L); 
       continue; 
      } 

      int offset = msg.arrayOffset(); 
      byte[] source = msg.array(); 
      int length = source.length - offset; 
      // convert byte buffer into string 
      String data = new String(source, offset, length); 

      // then convert it into bufferedreader 
      BufferedReader reader = new BufferedReader(new StringReader(data)); 
      String line = reader.readLine(); 

      while (line != null) { 
       System.out.println(line); 
       line = reader.readLine(); 
      } 
      stream.setAppliedLSN(stream.getLastReceiveLSN()); 
      stream.setFlushedLSN(stream.getLastReceiveLSN()); 
     } catch (SQLException | IOException | InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
     } 
} 

Nhưng khi tôi cố gắng để tách dòng Logic sử dụng phương pháp riêng biệt như thế này

public class Server implements Config { 

public static void main(String[] args) { 
    PGReplicationStream stream = getReplicationStream(); 
     while (true) { 
      ByteBuffer msg; 
      try { 
       msg = stream.readPending(); 
    if (msg == null) { 
       TimeUnit.MILLISECONDS.sleep(10L); 
       continue; 
      } 

      int offset = msg.arrayOffset(); 
      byte[] source = msg.array(); 
      int length = source.length - offset; 
      String data = new String(source, offset, length); 

      BufferedReader reader = new BufferedReader(new StringReader(data)); 
      String line = reader.readLine(); 

      while (line != null) { 
       System.out.println(line); 
       line = reader.readLine(); 
      } 
      stream.setAppliedLSN(stream.getLastReceiveLSN()); 
      stream.setFlushedLSN(stream.getLastReceiveLSN()); 
     } catch (SQLException | IOException | InterruptedException e) { 
      e.printStackTrace(); 
     } 
     } 

} 
public static PGReplicationStream getReplicationStream() { 
    Properties prop = new Properties(); 
    try { 
     prop.load(new FileInputStream(System.getProperty("prop"))); 
    } catch (IOException e1) { 
     // TODO Auto-generated catch block 
     e1.printStackTrace(); 
    } 

    String user = prop.getProperty("user"); 
    String password = prop.getProperty("password"); 
    String url = prop.getProperty("url"); 

    Properties props = new Properties(); 
    PGProperty.USER.set(props, user); 
    PGProperty.PASSWORD.set(props, password); 
    PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4"); 
    PGProperty.REPLICATION.set(props, "database"); 
    PGProperty.PREFER_QUERY_MODE.set(props, "simple"); 

    Connection conn= null; 
    PGConnection replicationConnection= null; 
    PGReplicationStream stream = null; 
    try { 
     conn = DriverManager.getConnection(url, props); 
     replicationConnection = conn.unwrap(PGConnection.class); 
     stream = replicationConnection.getReplicationAPI().replicationStream().logical() 
       .withSlotName("replication_slot") 
       .withSlotOption("include-xids", true) 
       .withSlotOption("include-timestamp", "on") 
       .withSlotOption("skip-empty-xacts", true) 
       .withStatusInterval(20, TimeUnit.SECONDS).start(); 
    } catch (SQLException e) { 
     e.printStackTrace(); 
    } 
    return stream; 
} 

}

Sau khi đọc một số dữ liệu, chương trình được đưa ra lỗi

org.postgresql.util.PSQLException: Database connection failed when reading from copy 
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1028) 
at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:41) 
at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:155) 
at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:124) 
at org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:78) 
at Server.main(Server.java:47) 
Caused by: java.net.SocketException: Socket closed 
at java.net.SocketInputStream.socketRead0(Native Method) 
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
at java.net.SocketInputStream.read(SocketInputStream.java:171) 
at java.net.SocketInputStream.read(SocketInputStream.java:141) 
at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:140) 
at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:109) 
at org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:191) 
at org.postgresql.core.PGStream.receive(PGStream.java:495) 
at org.postgresql.core.PGStream.receive(PGStream.java:479) 
at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1161) 
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1026) 
... 5 more 

tôi don 't thấy bất kỳ sự khác biệt giữa cả hai cách tiếp cận. Nhưng chương trình hoạt động khác nhau. Ai đó có thể giải thích, có gì sai với cách tiếp cận thứ hai.

+0

Ứng dụng thứ nhất không bao giờ gặp sự cố? bạn đã khởi chạy nó nhiều lần với cùng thời lượng như ứng dụng thứ 2? Ứng dụng thứ hai vụ tai nạn ngẫu nhiên? về sau khi chạy trong ~ 5 phút? hoặc hoàn toàn ngẫu nhiên thời gian? – wargre

+0

Có. Ứng dụng thứ nhất không bao giờ gặp sự cố, ngay cả khi tôi chạy ứng dụng trong 1 giờ rưỡi. Ứng dụng thứ 2 luôn bị treo trong vòng 5 phút. – Dipesh

+0

Cùng một thông số? cùng một tài xế? (Bạn luôn không may mắn?: D) – wargre

Trả lời

1

Tôi tin rằng sự cố của bạn là với Connection. Nó được ra khỏi phạm vi một khi chức năng của bạn trở lại và nó là tối đa thu gom rác để thu thập nó và hoàn thành. trong hoàn thành, kết nối được đóng lại và có lẽ sau đó chương trình của bạn không thành công. Hãy thử di chuyển kết nối và các biến trung gian bắt buộc khác trong phạm vi có sẵn trong phương thức chính của bạn và thử lại.

1

Có một số hành vi không chính đáng vừa phải đối với việc đóng kết nối trong khi thực hiện COPY. Tôi tự hỏi liệu điều đó có liên quan không?

Nếu bộ nhớ phục vụ, vấn đề này là một thông điệp Chấm dứt (gửi vào connection.close() để chỉ ra rằng khách hàng muốn để ngắt kết nối protocol) là không tương đối đồng bộ để COPY hoạt động giao thức. Điều này có nghĩa là về mặt lý thuyết, thông báo Terminate của bạn có thể được chỉnh sửa ngay vào giữa thông báo CopyData. Trong thực tế, tôi đã từng thấy lỗi từ máy chủ phàn nàn về loại thư không mong muốn (Chấm dứt) trong COPY, vì ứng dụng khách là được yêu cầu để gửi CopyDone hoặc CopyFail trước khi chấm dứt trong khi COPY có hiệu lực. Tuy nhiên, tôi nghĩ rằng nó có thể là có thể nhận được nội dung tin nhắn rên rỉ.

Tôi nghĩ mã của bạn có thể dễ bị điều đó vì bạn khởi động một loạt các hoạt động COPY trong một chức năng riêng biệt và có khả năng kết nối có thể bị thu gom rác.

Như tôi thấy trong nhật ký kết nối socket được đóng trong khi giao thức sao chép vẫn đang đọc dữ liệu.

org.postgresql.util.PSQLException: Database connection failed when reading from copy 
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1028) 
at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:41) 
at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:155) 
at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:124) 
at org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:78) 
at Server.main(Server.java:47) 
Caused by: java.net.SocketException: Socket closed 
at java.net.SocketInputStream.socketRead0(Native Method) 
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
at java.net.SocketInputStream.read(SocketInputStream.java:171) 
at java.net.SocketInputStream.read(SocketInputStream.java:141) 
at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:140) 
at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:109) 
Các vấn đề liên quan