public static StormTopology buildDevicesTopology() {
String deviceID = "device-id";
String count = "count";
Fields allFields = new Fields(deviceID, count);
RandomNumberGeneratorSpout spout = new RandomNumberGeneratorSpout(allFields, 10, 1000);
TridentTopology topology = new TridentTopology();
Stream devicesStream = topology.newStream("devicegen-spout", spout).
each(allFields, new Debug("##### devices"));
devicesStream.minBy(deviceID).
each(allFields, new Debug("#### device with min id"));
devicesStream.maxBy(count).
each(allFields, new Debug("#### device with max count"));
return topology.build();
}
|