Hadoop-自定义InputFileFormat-Demo


以 excel 的文件举例

pom.xml

<dependency>
    <groupId>net.sourceforge.jexcelapi</groupId>
    <artifactId>jxl</artifactId>
    <version>2.6.12</version>
</dependency>

ExcelInputFormat.java

public class ExcelFileInputFormat extends FileInputFormat<IntWritable,Text> {
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }
    public RecordReader<IntWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new ExcelRecordReader();
    }
}

ExcelRecordReader.java

public class ExcelRecordReader extends RecordReader<IntWritable,Text> {
    private int rows;
    private int current = -1;
    private Sheet sheet;
    private Workbook workbook;
    
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        FileSplit filesplit = (FileSplit) split;
        Configuration conf= context.getConfiguration();
        Path filePath =filesplit.getPath();
        FileSystem fs=filePath.getFileSystem(conf);
        FSDataInputStream inputStream = fs.open(filePath);
        try {
            workbook = Workbook.getWorkbook(inputStream);
            sheet = workbook.getSheets()[0];
            rows = sheet.getRows();
        } catch (BiffException e) {
            e.printStackTrace();
        }
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if(current<rows-1) {
            current++;
            return true;
        }
        return false;
    }

    @Override
    public IntWritable getCurrentKey() throws IOException, InterruptedException {
        return new IntWritable(current);
    }

    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        StringBuffer sb = new StringBuffer("");
        for(int i=0; i<sheet.getColumns(); i++){
            Cell cell = sheet.getCell(i, current);
            sb.append(cell.getContents() + " ");
        }
        return new Text(sb.toString());
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return current/rows;
    }

    @Override
    public void close() throws IOException {
        workbook.close();
    }
}

ExcelMapper.java

public class ExcelMapper extends Mapper<IntWritable,Text,IntWritable,Text> {
    @Override
    protected void map(IntWritable key, Text value, Context context) throws IOException, InterruptedException {
        super.map(key, value, context);
    }
}

ExcelJob.java

public class ExcelJob {
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(ExcelJob.class);
        job.setMapperClass(ExcelMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setInputFormatClass(ExcelFileInputFormat.class);
        //不需要Reduce
        job.setNumReduceTasks(0);
        //指定文件得读取位置
        FileInputFormat.setInputPaths(job, new Path("/wc/excel"));
        //指定文件得输出位置
        FileOutputFormat.setOutputPath(job, new Path("/wc/excelout"));

        System.exit(job.waitForCompletion(true) ? 0 : -1);
    }
}

文章作者: 钱不寒
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 钱不寒 !
  目录