public StormTopology buildProducerTopology(Properties prop) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 2);
KafkaBolt bolt = new KafkaBolt().withProducerProperties(prop)
.withTopicSelector(new DefaultTopicSelector("test"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "word"));
builder.setBolt("forwardToKafka", bolt, 1).shuffleGrouping("spout");
return builder.createTopology();
}
|