5

Tôi có công việc phát ra từ Cassandra, xử lý/biến đổi/lọc dữ liệu và ghi kết quả vào Elasticsearch. Tôi sử dụng docker cho các bài kiểm tra tích hợp của mình, và tôi đang gặp rắc rối khi viết từ tia lửa tới Elasticsearch.Thư viện Elasticsearch-Hadoop không thể kết nối với bộ chứa docker

Dependencies:

"joda-time"    % "joda-time"   % "2.9.4", 
"javax.servlet"   % "javax.servlet-api" % "3.1.0", 
"org.elasticsearch"  % "elasticsearch"  % "2.3.2", 
"org.scalatest"   %% "scalatest"   % "2.2.1", 
"com.github.nscala-time" %% "nscala-time"  % "2.10.0", 
"cascading"    % "cascading-hadoop" % "2.6.3", 
"cascading"    % "cascading-local" % "2.6.3", 
"com.datastax.spark"  %% "spark-cassandra-connector" % "1.4.2", 
"com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5", 
"org.elasticsearch"  % "elasticsearch-hadoop"  % "2.3.2" excludeAll(ExclusionRule("org.apache.storm")), 
"org.apache.spark"  %% "spark-catalyst"   % "1.4.0" % "provided" 

Trong các thử nghiệm đơn vị của tôi, tôi có thể kết nối với elasticsearch sử dụng một TransportClient để thiết lập mẫu và chỉ số của tôi

aka. Này hoạt động

val conf = new SparkConf().setAppName("test_reindex").setMaster("local") 
    .set("spark.cassandra.input.split.size_in_mb", "67108864") 
    .set("spark.cassandra.connection.host", cassandraHostString) 
    .set("es.nodes", elasticsearchHostString) 
    .set("es.port", "9200") 
    .set("http.publish_host", "") 
sc = new SparkContext(conf) 
esClient = TransportClient.builder().build() 
esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(elasticsearchHostString), 9300)) 
esClient.admin().indices().preparePutTemplate(testTemplate).setSource(Source.fromInputStream(getClass.getResourceAsStream("/mytemplate.json")).mkString).execute().actionGet() 
esClient.admin().indices().prepareCreate(esTestIndex).execute().actionGet() 
esClient.admin().indices().prepareAliases().addAlias(esTestIndex, "hot").execute().actionGet() 

Tuy nhiên khi tôi cố gắng chạy

EsSpark.saveToEs(
    myRDD, 
    "hot/mytype", 
    Map("es.mapping.id" -> "id", "es.mapping.parent" -> "parent_id") 
) 

tôi nhận được đống này dấu vết

org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[172.17.0.2:9200]] 
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142) 
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434) 
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:442) 
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:518) 
at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:524) 
at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:491) 
at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:412) 
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:400) 
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40) 
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) 
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:89) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
16/08/08 12:30:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[172.17.0.2:9200]] 
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142) 
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434) 
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:442) 
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:518) 
at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:524) 
at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:491) 
at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:412) 
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:400) 
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40) 
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) 
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:89) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

tôi có thể xác minh sử dụng 'mạng Docker kiểm tra cầu rằng nó đang cố gắng để kết nối với địa chỉ ip chính xác.

docker network inspect bridge 
[ 
{ 
    "Name": "bridge", 
    "Id": "ef184e3be3637be28f854c3278f1c8647be822a9413120a8957de6d2d5355de1", 
    "Scope": "local", 
    "Driver": "bridge", 
    "EnableIPv6": false, 
    "IPAM": { 
     "Driver": "default", 
     "Options": null, 
     "Config": [ 
      { 
       "Subnet": "172.17.0.0/16", 
       "Gateway": "172.17.0.1" 
      } 
     ] 
    }, 
    "Internal": false, 
    "Containers": { 
     "0c79680de8ef815bbe4bdd297a6f845cce97ef18bb2f2c12da7fe364906c3676": { 
      "Name": "analytics_rabbitmq_1", 
      "EndpointID": "3f03fdabd015fa1e2af802558aa59523f4a3c8c72f1231d07c47a6c8e60ae0d4", 
      "MacAddress": "02:42:ac:11:00:04", 
      "IPv4Address": "172.17.0.4/16", 
      "IPv6Address": "" 
     }, 
     "9b1f37c8df344c50e042c4b3c75fcb2774888f93fd7a77719fb286bb13f76f38": { 
      "Name": "analytics_elasticsearch_1", 
      "EndpointID": "fb083d27aaf8c0db1aac90c2a1ea2f752c46d8ac045e365f4b9b7d1651038a56", 
      "MacAddress": "02:42:ac:11:00:02", 
      "IPv4Address": "172.17.0.2/16", 
      "IPv6Address": "" 
     }, 
     "ed0cfad868dbac29bda66de6bee93e7c8caf04d623d9442737a00de0d43c372a": { 
      "Name": "analytics_cassandra_1", 
      "EndpointID": "2efa95980d681b3627a7c5e952e2f01980cf5ffd0fe4ba6185b2cab735784df6", 
      "MacAddress": "02:42:ac:11:00:03", 
      "IPv4Address": "172.17.0.3/16", 
      "IPv6Address": "" 
     } 
    }, 
    "Options": { 
     "com.docker.network.bridge.default_bridge": "true", 
     "com.docker.network.bridge.enable_icc": "true", 
     "com.docker.network.bridge.enable_ip_masquerade": "true", 
     "com.docker.network.bridge.host_binding_ipv4": "0.0.0.0", 
     "com.docker.network.bridge.name": "docker0", 
     "com.docker.network.driver.mtu": "1500" 
    }, 
    "Labels": {} 
} 
] 

Tôi đang chạy mọi thứ cục bộ trên macbook/osx. Tôi đang thua lỗ vì sao tôi có thể kết nối với vùng chứa docker bằng TransportClient và thông qua trình duyệt của tôi, nhưng hàm EsSpark.saveToES (...) luôn thất bại.

+0

Ứng dụng của bạn có giống nhau không? – alpert

+1

Bạn có thể thử đặt tham số 'es.nodes.wan.only' thành true không? –

Trả lời

0

Bằng cách đặt

.config("es.nodes.wan.only", "true") 

có thể giải quyết vấn đề này

es.nodes.ingest.only

(mặc định false) Dù chỉ sử dụng Elasticsearch nút ingest. Khi được bật, elasticsearch-hadoop sẽ định tuyến tất cả các yêu cầu của nó (sau khi khám phá các nút, nếu được bật) thông qua nhập các nút trong cụm. Mục đích của cài đặt cấu hình này là để tránh làm phát sinh chi phí chuyển tiếp dữ liệu có nghĩa là đối với đường ống từ các nút không nhập; Thực sự chỉ hữu ích khi ghi dữ liệu vào đường ống dẫn nhập (xem es.ingest.pipeline ở trên).

Các vấn đề liên quan