public static void main(String[] args) throws Exception {
Config conf = new Config();
WindowsStoreFactory mapState = new InMemoryWindowsStoreFactory();
if (args.length == 0) {
List<? extends WindowConfig> list = Arrays.asList(
SlidingCountWindow.of(1000, 100)
,TumblingCountWindow.of(1000)
,SlidingDurationWindow.of(new BaseWindowedBolt.Duration(6, TimeUnit.SECONDS), new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS))
,TumblingDurationWindow.of(new BaseWindowedBolt.Duration(3, TimeUnit.SECONDS))
);
for (WindowConfig windowConfig : list) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordCounter", conf, buildTopology(mapState, windowConfig));
Utils.sleep(60 * 1000);
cluster.shutdown();
}
System.exit(0);
} else {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(mapState, SlidingCountWindow.of(1000, 100)));
}
}
|