2012-11-13 28 views
6

Tôi đang viết một ứng dụng liên quan đến việc viết các khối dữ liệu lớn đáng kể vào một OutputStream (thuộc một Socket). Điều làm cho điều này phức tạp hơn một chút là thường có nhiều luồng cố gắng ghi vào cùng một OutputStream. Hiện tại, tôi có nó được thiết kế sao cho OutputStream mà dữ liệu đang được ghi vào trong chủ đề của nó. Luồng chứa một hàng đợi (LinkedList), nó thăm dò các mảng byte và viết chúng càng sớm càng tốt.Viết đồng thời theo Chuẩn OutputStream

private class OutputStreamWriter implements Runnable { 

    private final LinkedList<byte[]> chunkQueue = new LinkedList<byte[]>(); 

    public void run() { 
     OutputStream outputStream = User.this.outputStream; 
     while (true) { 
      try { 
       if (chunkQueue.isEmpty()) { 
        Thread.sleep(100); 
        continue; 
       } 
       outputStream.write(chunkQueue.poll()); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 
    } 
} 

Vấn đề với thiết kế này là càng ngày càng nhiều dữ liệu xảy ra, càng ngày càng có nhiều dữ liệu xếp hàng và không được ghi nhanh hơn. Ban đầu, khi dữ liệu được đưa vào hàng đợi, nó được viết thực tế ngay lập tức. Sau khoảng 15 giây, dữ liệu bắt đầu tụt lại phía sau; sự chậm trễ phát triển từ thời điểm dữ liệu được xếp hàng đợi đến thời điểm dữ liệu thực sự được ghi. Khi thời gian trôi đi, sự chậm trễ này trở nên lâu hơn và lâu hơn. Đó là rất đáng chú ý.

Một cách để khắc phục điều này sẽ là một loại triển khai ConcurrentOutputStream cho phép dữ liệu được gửi mà không bị chặn để ghi không bắt đầu được sao lưu (heck, hàng đợi sẽ không cần thiết). Tôi không biết nếu có một sự thực hiện như vậy - tôi đã không thể tìm thấy một - và cá nhân tôi không nghĩ rằng nó thậm chí có thể viết một.

Vì vậy, không ai có bất kỳ đề xuất nào về cách tôi có thể thiết kế lại điều này?

+4

Điều đó không mang tính xây dựng. Có gì sai với nó? –

+0

BufferedOutputStream? –

+0

Ngoài ra, bạn có đang đồng bộ hóa các sửa đổi đối với danh sách liên kết của mình không? Bởi vì nó không phải là chủ đề an toàn bởi thiết kế. Ngoài ra, loại luồng đầu ra nào bạn đang xếp chồng lên trên đầu ra của ổ cắm và bạn đang đẩy bao nhiêu dữ liệu vào nó? – Perception

Trả lời

4

Thông lượng của ổ cắm bị hạn chế; nếu nó chậm hơn thông lượng tạo dữ liệu của bạn, dữ liệu phải được đệm lên, không có cách nào xung quanh điều đó. Viết "đồng thời" sẽ không giúp gì cả.

Bạn có thể xem xét tạm dừng tạo dữ liệu khi dữ liệu được xếp hàng vượt quá giới hạn nhất định, để giảm mức tiêu thụ bộ nhớ.

+0

Tôi chỉ đang ném đồ đạc vào tường ở đây, nhưng, về SocketChannel thì sao? –

+0

Tôi không nghĩ rằng nó sẽ giúp.Nút cổ chai là băng thông mạng. – irreputable

0

Tôi đồng ý với @irreputable rằng việc viết đồng thời sẽ không giúp ích gì một chút. Thay vào đó, bạn nên nhìn vào phía sản xuất, tức là ở những gì bạn đã có.

  1. Sử dụng BlockingQueue thay vì một LinkedList.

  2. Sử dụng hoạt động thăm dò chặn của hàng đợi, thay vì chỉ là một giấc ngủ mù cho 100msl, theo định nghĩa sẽ lãng phí 50% thời gian trung bình. Trong một thời gian dài mà thực sự có thể tăng lên.

0

tôi cần một bộ lọc để chặn các kết nối chậm mà tôi cần phải đóng các kết nối DB càng sớm càng tốt vì vậy tôi ban đầu sử dụng ống Java nhưng khi nhìn kỹ hơn thực hiện của họ, đó là tất cả đồng bộ vì vậy tôi kết thúc việc tạo QueueInputStream riêng tôi sử dụng một bộ đệm nhỏ và hàng đợi Chặn để đặt bộ đệm trong hàng đợi một lần đã đầy, nó được khóa tự do trừ khi các điều kiện khóa được sử dụng tại LinkedBlockingQueue với sự trợ giúp của bộ đệm nhỏ nó phải rẻ, lớp này chỉ được dự định được sử dụng cho một nhà sản xuất và người tiêu dùng duy nhất trên mỗi trường hợp và bạn nên chuyển một ExecutorService để bắt đầu phát trực tuyến các byte đã xếp hàng của bạn đến OutputStream cuối cùng:

import java.io.IOException; 
import java.io.OutputStream; 
import java.util.concurrent.*; 

public class QueueOutputStream extends OutputStream 
{ 
    private static final int DEFAULT_BUFFER_SIZE=1024; 
    private static final byte[] END_SIGNAL=new byte[]{}; 

    private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>(); 
    private final byte[] buffer; 

    private boolean closed=false; 
    private int count=0; 

    public QueueOutputStream() 
    { 
    this(DEFAULT_BUFFER_SIZE); 
    } 

    public QueueOutputStream(final int bufferSize) 
    { 
    if(bufferSize<=0){ 
     throw new IllegalArgumentException("Buffer size <= 0"); 
    } 
    this.buffer=new byte[bufferSize]; 
    } 

    private synchronized void flushBuffer() 
    { 
    if(count>0){ 
     final byte[] copy=new byte[count]; 
     System.arraycopy(buffer,0,copy,0,count); 
     queue.offer(copy); 
     count=0; 
    } 
    } 

    @Override 
    public synchronized void write(final int b) throws IOException 
    { 
    if(closed){ 
     throw new IllegalStateException("Stream is closed"); 
    } 
    if(count>=buffer.length){ 
     flushBuffer(); 
    } 
    buffer[count++]=(byte)b; 
    } 

    @Override 
    public synchronized void write(final byte[] b, final int off, final int len) throws IOException 
    { 
    super.write(b,off,len); 
    } 

    @Override 
    public synchronized void close() throws IOException 
    { 
    flushBuffer(); 
    queue.offer(END_SIGNAL); 
    closed=true; 
    } 

    public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream) 
    { 
    return executor.submit(
      new Callable<Void>() 
      { 
       @Override 
       public Void call() throws Exception 
       { 
       try{ 
        byte[] buffer=queue.take(); 
        while(buffer!=END_SIGNAL){ 
        outputStream.write(buffer); 
        buffer=queue.take(); 
        } 
        outputStream.flush(); 
       } catch(Exception e){ 
        close(); 
        throw e; 
       } finally{ 
        outputStream.close(); 
       } 
       return null; 
       } 
      } 
    ); 
    } 
Các vấn đề liên quan