2012-02-29 38 views
29

Tôi đang cố gắng khám phá Protocol Buffer (PB) trong nền tảng Linux và ngôn ngữ mã hóa của tôi là C++. Tôi tìm thấy các ví dụ trong tài liệu giao thức trực tuyến đệm nhưng không có gì cụ thể để gửi và nhận socket (Hoặc tôi đã bỏ qua nó hoàn toàn :)). Vì vậy, tôi quyết định thêm tin nhắn Length trước tin nhắn thực tế và gửi nó qua socket. Tôi sẽ đánh giá cao nếu bất cứ ai có thể đề xuất một giải pháp tốt hơn so với những gì tôi đang lập kế hoạch để làm và cũng là có bất cứ điều gì đã sẵn sàng thực hiện trong PB để tạo các gói như vậy.Protocol Buffer over socket trong C++

Nhưng tôi vẫn kết thúc với sự cố ở phía máy chủ nơi tôi phải giải mã gói. Nói nếu khách hàng gửi một gói 10 byte trong đó 4 byte đầu tiên là độ dài của gói; Nhưng không thể biết được độ dài trước khi giải mã gói tin. Vì vậy, ngay cả khi tôi đọc 4 byte đầu tiên làm thế nào để tôi suy ra giá trị với một nửa gói đọc bằng cách sử dụng Protocol Buffer.

Trả lời

25

Cuối cùng tôi có thể làm cho nó hoạt động. Tôi đang đăng các mã ở đây để người ta có thể xem xét và bình luận về nó cũng như nếu một số người muốn thực hiện nó trong c + +, đoạn mã này có thể giúp đỡ. Đó là một đoạn mã tồi tàn, ý định của tôi là làm cho Protobuf hoạt động theo chiều dài tiền tố. Tôi đã lấy mã của máy chủ khách hàng từ một số trang web mà tôi không nhớ và tôi đã sửa đổi nó để chứa protobuf. Ở đây máy chủ đầu tiên nhìn vào ổ cắm và nhận được chiều dài của tổng số gói và sau đó đọc socket thực tế được thực hiện để đọc toàn bộ gói. Có thể có hàng tỷ cách để làm điều đó nhưng đối với giải pháp nhanh chóng tôi đã làm nó theo cách này. Nhưng tôi cần phải tìm một cách tốt hơn để tránh 2 recv cho mỗi gói, nhưng trong tình trạng của tôi tất cả các tin nhắn có kích thước khác nhau, vì vậy đây là cách duy nhất tôi đoán.

Proto tập tin

message log_packet { 
    required fixed64 log_time =1; 
    required fixed32 log_micro_sec =2; 
    required fixed32 sequence_no =3; 
    required fixed32 shm_app_id =4; 
    required string packet_id =5; 
    required string log_level=6; 
    required string log_msg=7; 
    } 

Nghị định thư đệm Khách hàng Mã

#include <unistd.h> 
#include "message.pb.h" 
#include <iostream> 
#include <google/protobuf/message.h> 
#include <google/protobuf/descriptor.h> 
#include <google/protobuf/io/zero_copy_stream_impl.h> 
#include <google/protobuf/io/coded_stream.h> 
#include <google/protobuf/io/zero_copy_stream_impl_lite.h> 


using namespace google::protobuf::io; 

using namespace std; 
int main(int argv, char** argc){ 

/* Coded output stram */ 

log_packet payload ; 

payload.set_log_time(10); 
payload.set_log_micro_sec(10); 
payload.set_sequence_no(1); 
payload.set_shm_app_id(101); 
payload.set_packet_id("TST"); 
payload.set_log_level("DEBUG"); 
payload.set_log_msg("What shall we say then"); 

cout<<"size after serilizing is "<<payload.ByteSize()<<endl; 
int siz = payload.ByteSize()+4; 
char *pkt = new char [siz]; 
google::protobuf::io::ArrayOutputStream aos(pkt,siz); 
CodedOutputStream *coded_output = new CodedOutputStream(&aos); 
coded_output->WriteVarint32(payload.ByteSize()); 
payload.SerializeToCodedStream(coded_output); 

     int host_port= 1101; 
     char* host_name="127.0.0.1"; 

     struct sockaddr_in my_addr; 

     char buffer[1024]; 
     int bytecount; 
     int buffer_len=0; 

     int hsock; 
     int * p_int; 
     int err; 

     hsock = socket(AF_INET, SOCK_STREAM, 0); 
     if(hsock == -1){ 
       printf("Error initializing socket %d\n",errno); 
       goto FINISH; 
     } 

     p_int = (int*)malloc(sizeof(int)); 
     *p_int = 1; 

     if((setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1)|| 
       (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1)){ 
       printf("Error setting options %d\n",errno); 
       free(p_int); 
       goto FINISH; 
     } 
     free(p_int); 

     my_addr.sin_family = AF_INET ; 
     my_addr.sin_port = htons(host_port); 

     memset(&(my_addr.sin_zero), 0, 8); 
     my_addr.sin_addr.s_addr = inet_addr(host_name); 
     if(connect(hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1){ 
       if((err = errno) != EINPROGRESS){ 
         fprintf(stderr, "Error connecting socket %d\n", errno); 
         goto FINISH; 
       } 
     } 




     for (int i =0;i<10000;i++){ 
      for (int j = 0 ;j<10;j++) { 

       if((bytecount=send(hsock, (void *) pkt,siz,0))== -1) { 
         fprintf(stderr, "Error sending data %d\n", errno); 
         goto FINISH; 
       } 
       printf("Sent bytes %d\n", bytecount); 
       usleep(1); 
     } 
     } 
     delete pkt; 

FINISH: 
     close(hsock); 

} 

Nghị định thư đệm máy chủ Mã

#include <fcntl.h> 
#include <string.h> 
#include <stdlib.h> 
#include <errno.h> 
#include <stdio.h> 
#include <netinet/in.h> 
#include <resolv.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <unistd.h> 
#include <pthread.h> 
#include "message.pb.h" 
#include <iostream> 
#include <google/protobuf/io/coded_stream.h> 
#include <google/protobuf/io/zero_copy_stream_impl.h> 

using namespace std; 
using namespace google::protobuf::io; 



void* SocketHandler(void*); 

int main(int argv, char** argc){ 

     int host_port= 1101; 

     struct sockaddr_in my_addr; 

     int hsock; 
     int * p_int ; 
     int err; 

     socklen_t addr_size = 0; 
     int* csock; 
     sockaddr_in sadr; 
     pthread_t thread_id=0; 

     hsock = socket(AF_INET, SOCK_STREAM, 0); 
     if(hsock == -1){ 
       printf("Error initializing socket %d\n", errno); 
       goto FINISH; 
     } 

     p_int = (int*)malloc(sizeof(int)); 
     *p_int = 1; 

     if((setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1)|| 
       (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1)){ 
       printf("Error setting options %d\n", errno); 
       free(p_int); 
       goto FINISH; 
     } 
     free(p_int); 

     my_addr.sin_family = AF_INET ; 
     my_addr.sin_port = htons(host_port); 

     memset(&(my_addr.sin_zero), 0, 8); 
     my_addr.sin_addr.s_addr = INADDR_ANY ; 

     if(bind(hsock, (sockaddr*)&my_addr, sizeof(my_addr)) == -1){ 
       fprintf(stderr,"Error binding to socket, make sure nothing else is listening on this port %d\n",errno); 
       goto FINISH; 
     } 
     if(listen(hsock, 10) == -1){ 
       fprintf(stderr, "Error listening %d\n",errno); 
       goto FINISH; 
     } 

     //Now lets do the server stuff 

     addr_size = sizeof(sockaddr_in); 

     while(true){ 
       printf("waiting for a connection\n"); 
       csock = (int*)malloc(sizeof(int)); 
       if((*csock = accept(hsock, (sockaddr*)&sadr, &addr_size))!= -1){ 
         printf("---------------------\nReceived connection from %s\n",inet_ntoa(sadr.sin_addr)); 
         pthread_create(&thread_id,0,&SocketHandler, (void*)csock); 
         pthread_detach(thread_id); 
       } 
       else{ 
         fprintf(stderr, "Error accepting %d\n", errno); 
       } 
     } 

FINISH: 
;//oops 
} 

google::protobuf::uint32 readHdr(char *buf) 
{ 
    google::protobuf::uint32 size; 
    google::protobuf::io::ArrayInputStream ais(buf,4); 
    CodedInputStream coded_input(&ais); 
    coded_input.ReadVarint32(&size);//Decode the HDR and get the size 
    cout<<"size of payload is "<<size<<endl; 
    return size; 
} 

void readBody(int csock,google::protobuf::uint32 siz) 
{ 
    int bytecount; 
    log_packet payload; 
    char buffer [siz+4];//size of the payload and hdr 
    //Read the entire buffer including the hdr 
    if((bytecount = recv(csock, (void *)buffer, 4+siz, MSG_WAITALL))== -1){ 
       fprintf(stderr, "Error receiving data %d\n", errno); 
     } 
    cout<<"Second read byte count is "<<bytecount<<endl; 
    //Assign ArrayInputStream with enough memory 
    google::protobuf::io::ArrayInputStream ais(buffer,siz+4); 
    CodedInputStream coded_input(&ais); 
    //Read an unsigned integer with Varint encoding, truncating to 32 bits. 
    coded_input.ReadVarint32(&siz); 
    //After the message's length is read, PushLimit() is used to prevent the CodedInputStream 
    //from reading beyond that length.Limits are used when parsing length-delimited 
    //embedded messages 
    google::protobuf::io::CodedInputStream::Limit msgLimit = coded_input.PushLimit(siz); 
    //De-Serialize 
    payload.ParseFromCodedStream(&coded_input); 
    //Once the embedded message has been parsed, PopLimit() is called to undo the limit 
    coded_input.PopLimit(msgLimit); 
    //Print the message 
    cout<<"Message is "<<payload.DebugString(); 

} 

void* SocketHandler(void* lp){ 
    int *csock = (int*)lp; 

     char buffer[4]; 
     int bytecount=0; 
     string output,pl; 
     log_packet logp; 

     memset(buffer, '\0', 4); 

     while (1) { 
     //Peek into the socket and get the packet size 
     if((bytecount = recv(*csock, 
         buffer, 
           4, MSG_PEEK))== -1){ 
       fprintf(stderr, "Error receiving data %d\n", errno); 
     }else if (bytecount == 0) 
       break; 
     cout<<"First read byte count is "<<bytecount<<endl; 
     readBody(*csock,readHdr(buffer)); 
     } 

FINISH: 
     free(csock); 
    return 0; 
} 
+0

Cảm ơn bạn rất nhiều. –

+0

+1 cho câu hỏi và +1 cho câu trả lời, điều này hữu ích. –

+1

Cảm ơn! Điều này rất hữu ích với một vấn đề mà tôi đã có –

10

Thật không may, protobuf không cung cấp một cách để "đóng gói" (khoanh vùng) tin nhắn của bạn:

Nếu bạn muốn viết nhiều thông điệp vào một tập tin duy nhất hoặc suối, nó là tùy thuộc vào bạn để giữ theo dõi nơi một tin nhắn kết thúc và bắt đầu tiếp theo. Định dạng dây Protocol Buffer không tự định hướng, do đó, trình phân tích cú pháp bộ đệm giao thức không thể xác định nơi thư kết thúc trên số của riêng chúng. Cách dễ nhất để giải quyết vấn đề này là viết kích thước mỗi thư trước khi bạn tự viết thư.

(từ documentation của họ)

Vì vậy, họ về cơ bản đề nghị giải pháp cùng bạn đến.

+0

Cảm ơn Tamas vì vậy tôi sẽ đi với cách tiếp cận của tôi – punith