2013-08-27 23 views
7

Tôi đang làm việc thông qua các quickstart Kafka:Kafka Quickstart: Tôi cần những phụ thuộc nào?

http://kafka.apache.org/07/quickstart.html

và cơ bản ví dụ Nhóm Tiêu Dùng:

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

Tôi đã được mã hóa lên người tiêu dùng và ConsumerThreadPool như trên:

import kafka.consumer.KafkaStream; 
import kafka.consumer.ConsumerIterator; 

public class Consumer implements Runnable { 

    private KafkaStream m_stream; 
    private Integer m_threadNumber; 

    public Consumer(KafkaStream a_stream, Integer a_threadNumber) { 
     m_threadNumber = a_threadNumber; 
     m_stream = a_stream; 
    } 

    public void run() { 
     ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); 
     while (it.hasNext()) { 
      System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message())); 

     } 
     System.out.println("Shutting down Thread: " + m_threadNumber); 
    } 
} 

Một vài khía cạnh khác: Tôi là người dùng ng mùa xuân để quản lý sở thú của tôi:

import javax.inject.Named; 
import java.util.Properties; 
import kafka.consumer.ConsumerConfig; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.ComponentScan; 
import org.springframework.context.annotation.Configuration; 

@Configuration 
@ComponentScan("com.truecar.inventory.worker.core") 
public class AppConfig { 

    @Bean 
    @Named("consumerConfig") 
    private static ConsumerConfig createConsumerConfig() { 
     String zookeeperAddress = "127.0.0.1:2181"; 
     String groupId = "inventory"; 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", zookeeperAddress); 
     props.put("group.id", groupId); 
     props.put("zookeeper.session.timeout.ms", "400"); 
     props.put("zookeeper.sync.time.ms", "200"); 
     props.put("auto.commit.interval.ms", "1000"); 
     return new ConsumerConfig(props); 
    } 
} 

Và tôi đang biên dịch với Maven và plugin OneJar Maven. Tuy nhiên, tôi biên dịch và sau đó chạy kết quả một bình tôi nhận được lỗi sau:

Aug 26, 2013 6:15:41 PM org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider registerDefaultFilters 
INFO: JSR-330 'javax.inject.Named' annotation found and supported for component scanning 
Exception in thread "main" java.lang.reflect.InvocationTargetException 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at com.simontuffs.onejar.Boot.run(Boot.java:340) 
at com.simontuffs.onejar.Boot.main(Boot.java:166) 
Caused by: java.lang.NoClassDefFoundError: scala/ScalaObject 
at java.lang.ClassLoader.defineClass1(Native Method) 
at java.lang.ClassLoader.defineClass(ClassLoader.java:792) 
at com.simontuffs.onejar.JarClassLoader.defineClass(JarClassLoader.java:803) 
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:710) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
at java.lang.Class.getDeclaredMethods0(Native Method) 
at java.lang.Class.privateGetDeclaredMethods(Class.java:2521) 
at java.lang.Class.getDeclaredMethods(Class.java:1845) 
at org.springframework.core.type.StandardAnnotationMetadata.getAnnotatedMethods(StandardAnnotationMetadata.java:180) 
at org.springframework.context.annotation.ConfigurationClassParser.doProcessConfigurationClass(ConfigurationClassParser.java:222) 
at org.springframework.context.annotation.ConfigurationClassParser.processConfigurationClass(ConfigurationClassParser.java:165) 
at org.springframework.context.annotation.ConfigurationClassParser.parse(ConfigurationClassParser.java:140) 
at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:282) 
at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:223) 
at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:630) 
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:461) 
at org.springframework.context.annotation.AnnotationConfigApplicationContext.<init>(AnnotationConfigApplicationContext.java:73) 
at com.truecar.inventory.worker.core.consumer.ConsumerThreadPool.<clinit>(ConsumerThreadPool.java:31) 
at com.truecar.inventory.worker.core.application.Starter.main(Starter.java:20) 
... 6 more 
Caused by: java.lang.ClassNotFoundException: scala.ScalaObject 
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:713) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
... 27 more 

Bây giờ, tôi biết rất ít về Kafka, và không có gì về Scala. Làm thế nào để sửa lỗi này? Tôi nên thử gì tiếp theo? Đây có phải là một vấn đề được biết đến? Tôi có cần phụ thuộc khác không? Dưới đây là phiên bản Kafka trong pom.xml của tôi:

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.9.2</artifactId> 
    <version>0.8.0-beta1</version> 
</dependency> 

Cập nhật: Tôi đã liên lạc với mailing list Kafka dev, và họ cho tôi biết một số yêu cầu phiên bản cụ thể cho các phụ thuộc scala. Tuy nhiên, cũng có một phụ thuộc log4j không có giấy tờ, kết quả trong một thời gian chạy khác, không biên dịch thời gian, ngoại lệ.

Exception in thread "main" java.lang.reflect.InvocationTargetException 
Caused by: java.lang.NoSuchMethodError: ch.qos.logback.classic.Logger.filterAndLog(Ljava/lang/String;Lorg/slf4j/Marker;Lch/qos/logback/classic/Level;Ljava/lang/String;[Ljava/lang/Object;Ljava/lang/Throwable;)V 
at org.apache.log4j.Category.log(Category.java:333) 
at org.apache.commons.logging.impl.Log4JLogger.debug(Log4JLogger.java:177) 

Một Cập nhật:

Tôi tìm thấy sự phụ thuộc log4j đúng:

<dependency> 
     <groupId>log4j</groupId> 
     <artifactId>log4j</artifactId> 
     <version>1.2.17</version> 
    </dependency> 

Nhưng bây giờ tôi đang gặp một thậm chí khó hiểu hơn thời gian chạy ngoại lệ:

Exception in thread "main" java.lang.reflect.InvocationTargetException 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at com.simontuffs.onejar.Boot.run(Boot.java:340) 
at com.simontuffs.onejar.Boot.main(Boot.java:166) 
Caused by: java.lang.NoClassDefFoundError: org/I0Itec/zkclient/IZkStateListener 
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64) 
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66) 
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100) 
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) 

Tại đây tôi nhận được loại cảm giác WTF. Vì vậy, tôi đã thêm một sự phụ thuộc:

<dependency> 
     <groupId>com.101tec</groupId> 
     <artifactId>zkclient</artifactId> 
     <version>0.3</version> 
    </dependency> 

Nhưng tiếp xúc thêm một ngoại lệ runtime này:

Exception in thread "main" java.lang.reflect.InvocationTargetException 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at com.simontuffs.onejar.Boot.run(Boot.java:340) 
at com.simontuffs.onejar.Boot.main(Boot.java:166) 
Caused by: java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge 
at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:146) 
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:113) 
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64) 
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66) 
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100) 
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) 

Tôi hy vọng sẽ có thể nhận được ví dụ bé này lên và chạy, nhưng có lẽ đây là mức giá để trả tiền để sử dụng các sản phẩm beta? Có lẽ tôi nên chuyển sang Apache Active MQ. Nhưng điều đó nghe có vẻ ít vui hơn. Tui bỏ lỡ điều gì vậy?

Trả lời

9

Vấn đề là kafka beta was built in a way that pom generated with a jar isn't valid and maven could not recognize it and parse properly, do đó tìm nạp phụ thuộc transitive. Chúng tôi đã quản lý để giảm thiểu vấn đề này bằng cách tranh thủ tất cả các phụ thuộc từ đó pom (scala, zk, vv) trong định nghĩa pom của chúng tôi. Chúng tôi đang chờ bản beta tiếp theo của kafka, trong đó vấn đề sẽ được khắc phục.

Danh sách phụ thuộc đầy đủ bên dưới. Lưu ý rằng bạn phải thay đổi phụ thuộc phiên bản scala cho phù hợp với postfix của tạo phẩm kafka của bạn.

<dependency> 
      <groupId>org.scala-lang</groupId> 
      <artifactId>scala-library</artifactId> 
      <version>2.8.0</version> 
     </dependency> 
     <dependency> 
      <groupId>log4j</groupId> 
      <artifactId>log4j</artifactId> 
      <version>1.2.15</version> 
      <exclusions> 
       <exclusion> 
        <groupId>com.sun.jmx</groupId> 
        <artifactId>jmxri</artifactId> 
       </exclusion> 
       <exclusion> 
        <groupId>com.sun.jdmk</groupId> 
        <artifactId>jmxtools</artifactId> 
       </exclusion> 
       <exclusion> 
        <groupId>javax.jms</groupId> 
        <artifactId>jms</artifactId> 
       </exclusion> 
      </exclusions> 
     </dependency> 
     <dependency> 
      <groupId>net.sf.jopt-simple</groupId> 
      <artifactId>jopt-simple</artifactId> 
      <version>3.2</version> 
     </dependency> 
     <dependency> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-simple</artifactId> 
      <version>1.6.4</version> 
     </dependency> 
     <dependency> 
      <groupId>org.scala-lang</groupId> 
      <artifactId>scala-compiler</artifactId> 
      <version>2.8.0</version> 
     </dependency> 
     <dependency> 
      <groupId>com.101tec</groupId> 
      <artifactId>zkclient</artifactId> 
      <version>0.3</version> 
     </dependency> 
     <dependency> 
      <groupId>com.yammer.metrics</groupId> 
      <artifactId>metrics-core</artifactId> 
      <version>2.2.0</version> 
     </dependency> 
     <dependency> 
      <groupId>com.yammer.metrics</groupId> 
      <artifactId>metrics-annotation</artifactId> 
      <version>2.2.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.easymock</groupId> 
      <artifactId>easymock</artifactId> 
      <version>3.0</version> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.scalatest</groupId> 
      <artifactId>scalatest</artifactId> 
      <version>1.2</version> 
      <scope>test</scope> 
     </dependency> 

Đối với

Maybe I should switch to Apache Active MQ. But that sounds less fun. Am I missing something?

Vâng, bạn không quên rằng đây là beta phát hành? Một số điều xấu đang xảy ra, thực sự, nhưng hiện tại chúng tôi đang chạy kafka 0.7 mà không cần bất kỳ nỗ lực nào.

+0

Awesome, cảm ơn bạn đã trả lời. Tôi muốn thử 0.7, nhưng trên Maven chỉ có 0.8 lọ có sẵn. Bạn sẽ đề xuất điều gì về truy cập có lập trình? –

+0

@DavidWilliams chúng tôi sử dụng [kafka được xây dựng bởi twitter] (http://search.maven.org/#artifactdetails%7Ccom.twitter%7Ckafka_2.9.2%7C0.7.0%7Cjar) cho 0.7. Bạn có ý nghĩa gì khi truy cập * lập trình *? –

+0

Ah, phải cụ thể hơn trong Java. Các hiện vật Maven duy nhất cho Kafka là 0,8 –

3

tôi thấy cấu hình này phụ thuộc để được chức năng:

<dependencies> 
    <dependency> 
     <groupId>org.springframework</groupId> 
     <artifactId>spring-core</artifactId> 
     <version>3.2.4.RELEASE</version> 
    </dependency> 
    <dependency> 
     <groupId>org.springframework</groupId> 
     <artifactId>spring-context</artifactId> 
     <version>3.2.4.RELEASE</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.9.2</artifactId> 
     <version>0.8.0-beta1</version> 
    </dependency> 
    <dependency> 
     <groupId>javax.inject</groupId> 
     <artifactId>javax.inject</artifactId> 
     <version>1</version> 
    </dependency> 
    <dependency> 
     <groupId>org.scala-lang</groupId> 
     <artifactId>scala-library</artifactId> 
     <version>2.9.2</version> 
    </dependency> 
    <dependency> 
     <groupId>log4j</groupId> 
     <artifactId>log4j</artifactId> 
     <version>1.2.17</version> 
    </dependency> 
    <dependency> 
     <groupId>com.101tec</groupId> 
     <artifactId>zkclient</artifactId> 
     <version>0.3</version> 
    </dependency> 
    <dependency> 
     <groupId>com.yammer.metrics</groupId> 
     <artifactId>metrics-core</artifactId> 
     <version>2.2.0</version> 
    </dependency> 
</dependencies> 
Các vấn đề liên quan