Tôi đang làm việc trên API luồng nhân bản sau khi tải. Trong khi làm việc trên nó đi qua hành vi bất thường.Tại sao Postgres Replication Stream không hoạt động khi được sử dụng trong chức năng riêng biệt?
Khi tôi sử dụng khe nhân bản viết toàn bộ mã bên trong khối chính, mọi thứ đều hoạt động tốt.
public class Server implements Config {
public static void main(String[] args) {
Properties prop = new Properties();
prop.load(new FileInputStream(System.getProperty("prop")));
String user = prop.getProperty("user");
String password = prop.getProperty("password");
String url = prop.getProperty("url");
Properties props = new Properties();
PGProperty.USER.set(props, user);
PGProperty.PASSWORD.set(props, password);
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
PGProperty.REPLICATION.set(props, "database");
PGProperty.PREFER_QUERY_MODE.set(props, "simple");
Connection conn= null;
PGConnection replicationConnection= null;
PGReplicationStream stream = null;
conn = DriverManager.getConnection(url, props);
replicationConnection = conn.unwrap(PGConnection.class);
stream = replicationConnection.getReplicationAPI().replicationStream().logical()
.withSlotName("replication_slot")
.withSlotOption("include-xids", true)
.withSlotOption("include-timestamp", "on")
.withSlotOption("skip-empty-xacts", true)
.withStatusInterval(20, TimeUnit.SECONDS).start();
while (true) {
ByteBuffer msg;
try {
msg = stream.readPending();
if (msg == null) {
TimeUnit.MILLISECONDS.sleep(10L);
continue;
}
int offset = msg.arrayOffset();
byte[] source = msg.array();
int length = source.length - offset;
// convert byte buffer into string
String data = new String(source, offset, length);
// then convert it into bufferedreader
BufferedReader reader = new BufferedReader(new StringReader(data));
String line = reader.readLine();
while (line != null) {
System.out.println(line);
line = reader.readLine();
}
stream.setAppliedLSN(stream.getLastReceiveLSN());
stream.setFlushedLSN(stream.getLastReceiveLSN());
} catch (SQLException | IOException | InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Nhưng khi tôi cố gắng để tách dòng Logic sử dụng phương pháp riêng biệt như thế này
public class Server implements Config {
public static void main(String[] args) {
PGReplicationStream stream = getReplicationStream();
while (true) {
ByteBuffer msg;
try {
msg = stream.readPending();
if (msg == null) {
TimeUnit.MILLISECONDS.sleep(10L);
continue;
}
int offset = msg.arrayOffset();
byte[] source = msg.array();
int length = source.length - offset;
String data = new String(source, offset, length);
BufferedReader reader = new BufferedReader(new StringReader(data));
String line = reader.readLine();
while (line != null) {
System.out.println(line);
line = reader.readLine();
}
stream.setAppliedLSN(stream.getLastReceiveLSN());
stream.setFlushedLSN(stream.getLastReceiveLSN());
} catch (SQLException | IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
public static PGReplicationStream getReplicationStream() {
Properties prop = new Properties();
try {
prop.load(new FileInputStream(System.getProperty("prop")));
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
String user = prop.getProperty("user");
String password = prop.getProperty("password");
String url = prop.getProperty("url");
Properties props = new Properties();
PGProperty.USER.set(props, user);
PGProperty.PASSWORD.set(props, password);
PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
PGProperty.REPLICATION.set(props, "database");
PGProperty.PREFER_QUERY_MODE.set(props, "simple");
Connection conn= null;
PGConnection replicationConnection= null;
PGReplicationStream stream = null;
try {
conn = DriverManager.getConnection(url, props);
replicationConnection = conn.unwrap(PGConnection.class);
stream = replicationConnection.getReplicationAPI().replicationStream().logical()
.withSlotName("replication_slot")
.withSlotOption("include-xids", true)
.withSlotOption("include-timestamp", "on")
.withSlotOption("skip-empty-xacts", true)
.withStatusInterval(20, TimeUnit.SECONDS).start();
} catch (SQLException e) {
e.printStackTrace();
}
return stream;
}
}
Sau khi đọc một số dữ liệu, chương trình được đưa ra lỗi
org.postgresql.util.PSQLException: Database connection failed when reading from copy
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1028)
at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:41)
at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:155)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:124)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:78)
at Server.main(Server.java:47)
Caused by: java.net.SocketException: Socket closed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:140)
at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:109)
at org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:191)
at org.postgresql.core.PGStream.receive(PGStream.java:495)
at org.postgresql.core.PGStream.receive(PGStream.java:479)
at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1161)
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1026)
... 5 more
tôi don 't thấy bất kỳ sự khác biệt giữa cả hai cách tiếp cận. Nhưng chương trình hoạt động khác nhau. Ai đó có thể giải thích, có gì sai với cách tiếp cận thứ hai.
Ứng dụng thứ nhất không bao giờ gặp sự cố? bạn đã khởi chạy nó nhiều lần với cùng thời lượng như ứng dụng thứ 2? Ứng dụng thứ hai vụ tai nạn ngẫu nhiên? về sau khi chạy trong ~ 5 phút? hoặc hoàn toàn ngẫu nhiên thời gian? – wargre
Có. Ứng dụng thứ nhất không bao giờ gặp sự cố, ngay cả khi tôi chạy ứng dụng trong 1 giờ rưỡi. Ứng dụng thứ 2 luôn bị treo trong vòng 5 phút. – Dipesh
Cùng một thông số? cùng một tài xế? (Bạn luôn không may mắn?: D) – wargre