public void execute(Tuple tuple, BasicOutputCollector collector) {
TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
int curr = tuple.getInteger(2);
Integer prev = tuple.getInteger(3);
int currBucket = curr / BUCKET_SIZE;
Integer prevBucket = null;
if (prev != null) {
prevBucket = prev / BUCKET_SIZE;
}
if (prevBucket == null) {
collector.emit(new Values(attempt, currBucket, 1));
}
else if (currBucket != prevBucket) {
collector.emit(new Values(attempt, currBucket, 1));
collector.emit(new Values(attempt, prevBucket, -1));
}
}
|