Job job = new Job(conf);
if (t instanceof PathTarget) {
PathTarget pt = (PathTarget) t;
pt.configureForMapReduce(job, ptype, pt.getPath(), "out0");
CrunchOutputs.OutputConfig outConfig =
CrunchOutputs.getNamedOutputs(job.getConfiguration()).get("out0");
job.setOutputFormatClass(outConfig.bundle.getFormatClass());
job.setOutputKeyClass(outConfig.keyClass);
job.setOutputValueClass(outConfig.valueClass);
outConfig.bundle.configure(job.getConfiguration());
job.getConfiguration().set("crunch.namedoutput", "out0");
Path tmpPath = pipeline.createTempPath();
outRDD.saveAsNewAPIHadoopFile(
tmpPath.toString(),
c.getKeyClass(),
c.getValueClass(),
job.getOutputFormatClass(),
job.getConfiguration());
pt.handleOutputs(job.getConfiguration(), tmpPath, -1);
}
|