2012-04-05 24 views
5

Tôi cần truyền các thông điệp tới các tiến trình PHP CLI thông qua stdin từ Java. Tôi muốn giữ lại khoảng 20 tiến trình PHP đang chạy trong một nhóm, như vậy khi tôi chuyển một thông điệp tới hồ bơi, nó sẽ gửi mỗi thông điệp đến một luồng riêng biệt, giữ một hàng đợi các thông điệp được gửi đi. Tôi muốn các quy trình PHP này tồn tại càng lâu càng tốt, đưa ra một quy trình mới nếu một cái chết. Tôi đã xem xét việc làm điều này với một hồ bơi thread tĩnh, nhưng nó có vẻ được thiết kế nhiều hơn cho các nhiệm vụ thực hiện và chỉ đơn giản là chết. Làm thế nào tôi có thể làm điều này, với một giao diện đơn giản để truyền thông điệp đến hồ bơi? Tôi có phải thực hiện "hồ bơi chủ đề" tùy chỉnh của riêng mình không?ThreadPool của CLI Processes

+0

Rất giống với câu hỏi này: http://stackoverflow.com/questions/2592093/php-thread-pool –

+1

tôi có bất kỳ đầu ra từ PHP như vậy mà bạn biết khi nào nó được xử lý xong? – Clint

+0

Nó sẽ không bao giờ được xử lý. Nếu một người chết, tôi cần phải sinh ra một cái mới để thay thế nó. Tôi sẽ chuyển dữ liệu cho họ theo kiểu vòng tròn thông qua stdin. – Will

Trả lời

4

Tôi đang cung cấp một số mã với điều này vì tôi nghĩ rằng nó sẽ làm cho mọi việc rõ ràng hơn. Về cơ bản bạn cần phải giữ một hồ bơi của các đối tượng quá trình xung quanh. Hãy thận trọng rằng mỗi quy trình này đều có đầu vào, đầu ra và luồng lỗi mà bạn cần quản lý theo một cách nào đó. Trong ví dụ của tôi, tôi chỉ chuyển hướng lỗi và đầu ra đến bảng điều khiển quy trình chính. Bạn có thể thiết lập callbacks và trình xử lý để có được kết quả đầu ra của chương trình PHP nếu cần thiết. Nếu bạn chỉ là nhiệm vụ xử lý và không quan tâm những gì PHP nói sau đó để nó như là hoặc chuyển hướng đến một tập tin.

Tôi đang sử dụng thư viện Apache Commons Pool cho ObjectPool. Không cần phải tái tạo lại.

Bạn sẽ có một nhóm gồm 20 quy trình chạy chương trình PHP của bạn. Điều này một mình sẽ không giúp bạn có được những gì bạn cần. Bạn có thể muốn xử lý các tác vụ đối với tất cả 20 quy trình này "cùng một lúc". Vì vậy, bạn cũng sẽ cần một ThreadPool mà sẽ kéo một Process từ ObjectPool của bạn.

Bạn cũng sẽ cần phải hiểu rằng nếu bạn giết, hoặc CTRL-C quá trình Java của bạn, quy trình init sẽ tiếp quản các quy trình php của bạn và họ sẽ chỉ ngồi đó. Bạn có thể muốn giữ một bản ghi tất cả các pid của các tiến trình PHP mà bạn sinh ra, và sau đó làm sạch chúng nếu bạn chạy lại chương trình Java của mình.

public class StackOverflow_10037379 { 

    private static Logger sLogger = Logger.getLogger(StackOverflow_10037379.class.getName()); 

    public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> { 

     private String mProcessToRun; 

     public CLIPoolableObjectFactory(String processToRun) { 
      mProcessToRun = processToRun; 
     } 

     @Override 
     public Process makeObject() throws Exception { 
      ProcessBuilder builder = new ProcessBuilder(); 
      builder.redirectError(Redirect.INHERIT); 
      // I am being lazy, but really the InputStream is where 
      // you can get any output of the PHP Process. This setting 
      // will make it output to the current processes console. 
      builder.redirectOutput(Redirect.INHERIT); 
      builder.redirectInput(Redirect.PIPE); 
      builder.command(mProcessToRun); 
      return builder.start(); 
     } 

     @Override 
     public boolean validateObject(Process process) { 
      try { 
       process.exitValue(); 
       return false; 
      } catch (IllegalThreadStateException ex) { 
       return true; 
      } 
     } 

     @Override 
     public void destroyObject(Process process) throws Exception { 
      // If PHP has a way to stop it, do that instead of destroy 
      process.destroy(); 
     } 

     @Override 
     public void passivateObject(Process process) throws Exception { 
      // Should really try to read from the InputStream of the Process 
      // to prevent lock-ups if Rediret.INHERIT is not used. 
     } 
    } 

    public static class CLIWorkItem implements Runnable { 

     private ObjectPool<Process> mPool; 
     private String mWork; 

     public CLIWorkItem(ObjectPool<Process> pool, String work) { 
      mPool = pool; 
      mWork = work; 
     } 

     @Override 
     public void run() { 
      Process workProcess = null; 
      try { 
       workProcess = mPool.borrowObject(); 
       OutputStream os = workProcess.getOutputStream(); 
       os.write(mWork.getBytes(Charset.forName("UTF-8"))); 
       os.flush(); 
       // Because of the INHERIT rule with the output stream 
       // the console stream overwrites itself. REMOVE THIS in production. 
       Thread.sleep(100); 
      } catch (Exception ex) { 
       sLogger.log(Level.SEVERE, null, ex); 
      } finally { 
       if (workProcess != null) { 
        try { 
         // Seriously.. so many exceptions. 
         mPool.returnObject(workProcess); 
        } catch (Exception ex) { 
         sLogger.log(Level.SEVERE, null, ex); 
        } 
       } 
      } 
     } 
    } 

    public static void main(String[] args) throws Exception { 

     // Change the 5 to 20 in your case. 
     // Also change mock_php.exe to /usr/bin/php or wherever. 
     ObjectPool<Process> pool = 
       new GenericObjectPool<>(
       new CLIPoolableObjectFactory("mock_php.exe"), 5);   

     // This will only allow you to queue 100 work items at a time. I would suspect 
     // that if you only want 20 PHP processes running at a time and this queue 
     // filled up you'll need to implement some other strategy as you are doing 
     // more work than PHP can keep up with. You'll need to block at some point 
     // or throw work away. 
     BlockingQueue<Runnable> queue = 
      new ArrayBlockingQueue<>(100, true); 

     ThreadPoolExecutor executor = 
      new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue); 

     // print some stuff out. 
     executor.execute(new CLIWorkItem(pool, "Message 1\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 2\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 3\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 4\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 5\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 6\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 7\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 8\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 9\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 10\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 11\r\n")); 

     executor.shutdown(); 
     executor.awaitTermination(4000, TimeUnit.HOURS); 

     pool.close();   
    } 
} 

Output Chương trình Run:

12172 - Message 2 
10568 - Message 1 
4804 - Message 3 
11916 - Message 4 
11116 - Message 5 
12172 - Message 6 
4804 - Message 7 
10568 - Message 8 
11916 - Message 9 
11116 - Message 10 
12172 - Message 11 

Mã C++ chương trình để chỉ ra là những gì đầu vào:

#include <windows.h> 
#include <iostream> 
#include <string> 

int main(int argc, char* argv[]) 
{ 
    DWORD pid = GetCurrentProcessId(); 
    std::string line; 
    while (true) {  
     std::getline (std::cin, line); 
     std::cout << pid << " - " << line << std::endl; 
    } 

    return 0; 
} 

Cập nhật

Xin lỗi vì sự chậm trễ. Đây là một phiên bản JDK 6 cho bất cứ ai quan tâm. Bạn sẽ phải chạy một chuỗi riêng biệt để đọc tất cả các đầu vào từ InputStream của quá trình. Tôi đã thiết lập mã này lên để sinh ra một luồng mới dọc theo mỗi tiến trình mới. Chủ đề đó luôn luôn đọc từ quá trình miễn là nó còn sống. Thay vì xuất trực tiếp vào một tệp, tôi đã thiết lập nó sao cho nó sử dụng khung công tác Ghi nhật ký. Bằng cách đó, bạn có thể thiết lập cấu hình ghi nhật ký để chuyển đến tệp, cuộn qua, chuyển tới bảng điều khiển, v.v. mà không bị mã hóa cứng để chuyển đến tệp.

Bạn sẽ nhận thấy rằng tôi chỉ bắt đầu một Gobbler duy nhất cho mỗi quá trình mặc dù Process có stdout và stderr. Tôi chuyển hướng stderr để stdout chỉ để làm cho mọi thứ dễ dàng hơn. Rõ ràng jdk6 chỉ hỗ trợ kiểu chuyển hướng này.

public class StackOverflow_10037379_jdk6 { 

    private static Logger sLogger = Logger.getLogger(StackOverflow_10037379_jdk6.class.getName()); 

    // Shamelessy taken from Google and modified. 
    // I don't know who the original Author is. 
    public static class StreamGobbler extends Thread { 

     InputStream is; 
     Logger logger; 
     Level level; 

     StreamGobbler(String logName, Level level, InputStream is) { 
      this.is = is; 
      this.logger = Logger.getLogger(logName); 
      this.level = level; 
     } 

     public void run() { 
      try { 
       InputStreamReader isr = new InputStreamReader(is); 
       BufferedReader br = new BufferedReader(isr); 
       String line = null; 
       while ((line = br.readLine()) != null) { 
        logger.log(level, line); 
       } 
      } catch (IOException ex) { 
       logger.log(Level.SEVERE, "Failed to read from Process.", ex); 
      } 
      logger.log(
        Level.INFO, 
        String.format("Exiting Gobbler for %s.", logger.getName())); 
     } 
    } 

    public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> { 

     private String mProcessToRun; 

     public CLIPoolableObjectFactory(String processToRun) { 
      mProcessToRun = processToRun; 
     } 

     @Override 
     public Process makeObject() throws Exception { 
      ProcessBuilder builder = new ProcessBuilder(); 
      builder.redirectErrorStream(true); 
      builder.command(mProcessToRun); 
      Process process = builder.start(); 
      StreamGobbler loggingGobbler = 
        new StreamGobbler(
        String.format("process.%s", process.hashCode()), 
        Level.INFO, 
        process.getInputStream()); 
      loggingGobbler.start(); 
      return process; 
     } 

     @Override 
     public boolean validateObject(Process process) { 
      try { 
       process.exitValue(); 
       return false; 
      } catch (IllegalThreadStateException ex) { 
       return true; 
      } 
     } 

     @Override 
     public void destroyObject(Process process) throws Exception { 
      // If PHP has a way to stop it, do that instead of destroy 
      process.destroy(); 
     } 

     @Override 
     public void passivateObject(Process process) throws Exception { 
      // Should really try to read from the InputStream of the Process 
      // to prevent lock-ups if Rediret.INHERIT is not used. 
     } 
    } 

    public static class CLIWorkItem implements Runnable { 

     private ObjectPool<Process> mPool; 
     private String mWork; 

     public CLIWorkItem(ObjectPool<Process> pool, String work) { 
      mPool = pool; 
      mWork = work; 
     } 

     @Override 
     public void run() { 
      Process workProcess = null; 
      try { 
       workProcess = mPool.borrowObject(); 
       OutputStream os = workProcess.getOutputStream(); 
       os.write(mWork.getBytes(Charset.forName("UTF-8"))); 
       os.flush(); 
       // Because of the INHERIT rule with the output stream 
       // the console stream overwrites itself. REMOVE THIS in production. 
       Thread.sleep(100); 
      } catch (Exception ex) { 
       sLogger.log(Level.SEVERE, null, ex); 
      } finally { 
       if (workProcess != null) { 
        try { 
         // Seriously.. so many exceptions. 
         mPool.returnObject(workProcess); 
        } catch (Exception ex) { 
         sLogger.log(Level.SEVERE, null, ex); 
        } 
       } 
      } 
     } 
    } 

    public static void main(String[] args) throws Exception { 

     // Change the 5 to 20 in your case. 
     ObjectPool<Process> pool = 
       new GenericObjectPool<Process>(
       new CLIPoolableObjectFactory("mock_php.exe"), 5); 

     BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(100, true); 

     ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue); 

     // print some stuff out. 
     executor.execute(new CLIWorkItem(pool, "Message 1\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 2\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 3\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 4\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 5\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 6\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 7\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 8\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 9\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 10\r\n")); 
     executor.execute(new CLIWorkItem(pool, "Message 11\r\n")); 

     executor.shutdown(); 
     executor.awaitTermination(4000, TimeUnit.HOURS); 

     pool.close(); 
    } 
} 

Output

Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 9440 - Message 3 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 8776 - Message 2 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 6100 - Message 1 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 10096 - Message 4 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 8868 - Message 5 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 8868 - Message 8 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 6100 - Message 10 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 8776 - Message 9 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 10096 - Message 6 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 9440 - Message 7 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: 6100 - Message 11 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: Exiting Gobbler for process.295131993. 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: Exiting Gobbler for process.756434719. 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: Exiting Gobbler for process.332711452. 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: Exiting Gobbler for process.1981440623. 
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run 
INFO: Exiting Gobbler for process.1043636732. 
+0

Wow, cảm ơn câu trả lời kỹ lưỡng! Thực hiện thử nghiệm dựa trên điều này ngay bây giờ. Thực sự đánh giá cao nó. – Will

+0

Vì vậy, tôi đang sử dụng Java6 và không có Chuyển hướng. Làm cách nào để ngăn chặn quá trình stdout/stderr của quá trình chặn? Trong trường hợp sử dụng thông thường của tôi, tôi sẽ muốn viết một quá trình và chuyển hướng stdout/stderr để tách các logfiles (không bị chặn). – Will

+1

@Will Đã cập nhật với phiên bản jdk6. –

1

Đặt cược tốt nhất của bạn ở đây là sử dụng các chức năng pcntl để chia nhỏ quy trình, nhưng giao tiếp giữa các quá trình rất khó. Tôi sẽ khuyên bạn nên tạo một hàng đợi mà các tiến trình của bạn có thể đọc từ, thay vì cố chuyển các thông điệp tới dòng lệnh.

Beanstalk có một số ứng dụng khách PHP mà bạn có thể sử dụng để xử lý thông báo giữa các quy trình.

+0

Rất tiếc, có thể câu hỏi của tôi không rõ ràng - sẽ chỉnh sửa. Đây là một câu hỏi Java. Tôi muốn một threadpool java của quá trình cli chạy dài (/ usr/bin/php trong trường hợp này). Tôi cần để có thể gửi một cái gì đó đến hồ bơi, mà sau đó sẽ được ghi vào stdin trên một trong các quy trình CLI. – Will