Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
Tags
- Spring
- vaadin
- 공정능력
- NPM
- SQL
- plugin
- table
- xPlatform
- SPC
- react
- R
- GIT
- 보조정렬
- IntelliJ
- mybatis
- tomcat
- Kotlin
- Java
- Android
- JavaScript
- es6
- Sqoop
- Express
- hadoop
- Python
- MSSQL
- mapreduce
- window
- SSL
- Eclipse
Archives
- Today
- Total
DBILITY
hadoop secondary sort and multiple outputs exercise 본문
반응형
항공지연통계 보조정렬+다중출력 적용 실습을 그냥 해 보았습니다.
-
package com.dbility.hadoop.sort.secondary; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * * Description * * * @author hyperrookie@gmail.com * * @version 1.0.0 * @date 2017. 2. 8. */ public class AsaDelaySortMultiouputsDriver extends Configured implements Tool { public static class WindowsLocalFileSystem extends LocalFileSystem { public WindowsLocalFileSystem() { super(); } public boolean mkdirs(final Path f, final FsPermission permission) throws IOException { final boolean result = super.mkdirs(f); this.setPermission(f, permission); return result; } public void setPermission(final Path p, final FsPermission permission) throws IOException { try { super.setPermission(p, permission); } catch (IOException ioe) { System.err.println(ioe.getMessage()); } } } public static class PerformParser { private String year; private int month; private int arrivalDelayTime = 0; private int departureDelayTime = 0; private boolean arrivalDelayAvailable = false; private boolean departureDelayAvailable = false; public PerformParser(Text value) { String[] columns = value.toString().split(","); year = columns[0]; month = Integer.parseInt(columns[1]); if ( !"NA".equals(columns[14]) ) { arrivalDelayAvailable = true; arrivalDelayTime = Integer.parseInt(columns[14]); } if ( !"NA".equals(columns[15]) ) { departureDelayAvailable = true; departureDelayTime = Integer.parseInt(columns[15]); } } public String getYear() { return year; } public int getMonth() { return month; } public int getArrivalDelayTime() { return arrivalDelayTime; } public int getDepartureDelayTime() { return departureDelayTime; } public boolean isArrivalDelayAvailable() { return arrivalDelayAvailable; } public boolean isDepartureDelayAvailable() { return departureDelayAvailable; } } public static class CompositeKey implements WritableComparable<CompositeKey> { private String year; private Integer month; public CompositeKey() { } public String getYear() { return year; } public void setYear(String year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public void readFields(DataInput in) throws IOException { year = WritableUtils.readString(in); month = in.readInt(); } public void write(DataOutput out) throws IOException { WritableUtils.writeString(out, year); out.writeInt(month); } public int compareTo(CompositeKey obj) { int result = year.compareTo(obj.getYear()); if ( result == 0) result = month == obj.month ? 0 : ( month < obj.month ? -1 : 1 ); return result; } @Override public String toString() { return (new StringBuilder()).append(year).append("\t").append(month).toString(); } } public static class AsaSortPartitioner extends Partitioner<CompositeKey, IntWritable> { @Override public int getPartition(CompositeKey key, IntWritable value, int numReduceTasks) { return key.getYear().split(",")[0].hashCode() % numReduceTasks; } } public static class AsaSortGroupingComparator extends WritableComparator { public AsaSortGroupingComparator() { super(CompositeKey.class,true); } @SuppressWarnings("rawtypes") @Override public int compare(WritableComparable a, WritableComparable b) { CompositeKey k1 = (CompositeKey)a; CompositeKey k2 = (CompositeKey)b; return k1.getYear().compareTo(k2.getYear()); } } public static class AsaSortComparator extends WritableComparator { public AsaSortComparator() { super(CompositeKey.class,true); } @Override public int compare(Object a, Object b) { CompositeKey k1 = (CompositeKey)a; CompositeKey k2 = (CompositeKey)b; int cmp = k1.getYear().compareTo(k2.getYear()); if ( cmp != 0) return cmp; return k1.getMonth() == k2.getMonth() ? 0 :( k1.getMonth() < k2.getMonth() ? -1 : 1 ); } } public enum AsaCounter { ArrivalDelayThirtyUnder,ArrivalDelaySixtyUnder,ArrivalDelaySixtyAndOver, DepartureDelayThirtyUnder,DepartureDelaySixtyUnder,DepartureDelaySixtyAndOver } public static class AsaSortMapper extends Mapper<LongWritable, Text, CompositeKey, IntWritable> { private CompositeKey outputKey = new CompositeKey(); private final static IntWritable outputValue = new IntWritable(1); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { PerformParser parser = new PerformParser(value); if ( parser.isArrivalDelayAvailable() && parser.getArrivalDelayTime() > 0 ) { if ( parser.getArrivalDelayTime() < 30 ) context.getCounter(AsaCounter.ArrivalDelayThirtyUnder).increment(1); else if ( parser.getArrivalDelayTime() < 60 ) context.getCounter(AsaCounter.ArrivalDelaySixtyUnder).increment(1); else context.getCounter(AsaCounter.ArrivalDelaySixtyAndOver).increment(1); outputKey.setYear("A,"+parser.getYear()); outputKey.setMonth(parser.getMonth()); context.write(outputKey, outputValue); } if ( parser.isDepartureDelayAvailable() && parser.getDepartureDelayTime() > 0 ) { if ( parser.getDepartureDelayTime() < 30 ) context.getCounter(AsaCounter.DepartureDelayThirtyUnder).increment(1); else if ( parser.getDepartureDelayTime() < 60 ) context.getCounter(AsaCounter.DepartureDelaySixtyUnder).increment(1); else context.getCounter(AsaCounter.DepartureDelaySixtyAndOver).increment(1); outputKey.setYear("D,"+parser.getYear()); outputKey.setMonth(parser.getMonth()); context.write(outputKey, outputValue); } } } public static class AsaSortCombiner extends Reducer<CompositeKey, IntWritable, CompositeKey, IntWritable> { private IntWritable outputValue = new IntWritable(); private int sum; @Override protected void reduce(CompositeKey key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { sum = 0; for (IntWritable value : values) { sum+=value.get(); } outputValue.set(sum); context.write(key, outputValue); } } public static class AsaSortReducer extends Reducer<CompositeKey, IntWritable, Text, IntWritable> { private Text outputKey = new Text(); private IntWritable outputValue = new IntWritable(); private int sum = 0; private MultipleOutputs<Text, IntWritable> mos; @Override protected void setup(Context context) throws IOException, InterruptedException { mos = new MultipleOutputs<Text, IntWritable>(context); } @Override protected void reduce(CompositeKey key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { String[] columns = key.getYear().split(","); int preMonth = key.getMonth(); if ( columns[0].equals("A") ) { for (IntWritable value : values) { if ( preMonth != key.getMonth() ){ outputKey.set(columns[1]+"\t"+preMonth); outputValue.set(sum); mos.write("arrival", outputKey, outputValue); sum = 0; } sum+=value.get(); preMonth = key.getMonth(); } if ( preMonth == key.getMonth() ){ outputKey.set(columns[1]+"\t"+preMonth); outputValue.set(sum); mos.write("arrival", outputKey, outputValue); } } if ( columns[0].equals("D") ) { for (IntWritable value : values) { if ( preMonth != key.getMonth() ){ outputKey.set(columns[1]+"\t"+preMonth); outputValue.set(sum); mos.write("departure", outputKey, outputValue); sum = 0; } sum+=value.get(); preMonth = key.getMonth(); } if ( preMonth == key.getMonth() ){ outputKey.set(columns[1]+"\t"+preMonth); outputValue.set(sum); mos.write("departure", outputKey, outputValue); } } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { mos.close(); } } public int run(String[] args) throws Exception { String[] remainArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); if ( remainArgs.length != 2 ) { System.out.println("Usage : hadoop jar jarName.jar mainClass <input path> <ouput path>"); return -1; } Job job = new Job(getConf(), "WordCountSort"); job.setJarByClass(AsaDelaySortMultiouputsDriver.class); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(AsaSortMapper.class); job.setMapOutputKeyClass(CompositeKey.class); job.setMapOutputValueClass(IntWritable.class); job.setCombinerClass(AsaSortCombiner.class); job.setNumReduceTasks(2); job.setPartitionerClass(AsaSortPartitioner.class); job.setGroupingComparatorClass(AsaSortGroupingComparator.class); job.setSortComparatorClass(AsaSortComparator.class); job.setReducerClass(AsaSortReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //job.setOutputFormatClass(TextOutputFormat.class); MultipleOutputs.addNamedOutput(job, "arrival", TextOutputFormat.class, Text.class, IntWritable.class); MultipleOutputs.addNamedOutput(job, "departure", TextOutputFormat.class, Text.class, IntWritable.class); FileInputFormat.addInputPath(job, new Path(remainArgs[0])); FileOutputFormat.setOutputPath(job, new Path(remainArgs[1])); FileSystem hdfs = FileSystem.get(getConf()); Path path = new Path(remainArgs[1]); if ( hdfs.exists(path) ) hdfs.delete(path, true); return job.waitForCompletion(true) ? 0 : -2; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); /*conf.set("fs.default.name", "file:///"); conf.set("mapred.job.tracker", "local"); conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization"); conf.set("fs.file.impl", "com.dbility.hadoop.sort.secondary.AsaDelaySortMultiouputsDriver$WindowsLocalFileSystem"); args = new String[] {"d:/hadoop_test/*","d:/hadoop_test/output/"};*/ Runtime.getRuntime().exit(ToolRunner.run(conf, new AsaDelaySortMultiouputsDriver(), args)); } }
- arrival-r-00000
2007 1 286334 2007 2 284152 2007 3 293360 2007 4 273055 2007 5 275332 2007 6 326446 2007 7 326559 2007 8 317197 2007 9 225751 2007 10 270098 2007 11 242722 2007 12 332449 2008 1 611876 2008 2 278902 2008 3 294556 2008 4 256142 2008 5 254673 2008 6 295897 2008 7 264630 2008 8 239737 2008 9 169959 2008 10 183582 2008 11 181506 2008 12 280493
- departure-r-00000
2007 1 536270 2007 2 259288 2007 3 276261 2007 4 249097 2007 5 241699 2007 6 307986 2007 7 307864 2007 8 298530 2007 9 195615 2007 10 231129 2007 11 217557 2007 12 304011 2008 1 551959 2008 2 252765 2008 3 271969 2008 4 220864 2008 5 220614 2008 6 271014 2008 7 253632 2008 8 231349 2008 9 147061 2008 10 162531 2008 11 157278 2008 12 263949
참고 서적 : 시작하세요! 하둡프로그래밍 개정 2판(위키북스) - 정재화 지음
반응형
'bigdata > hadoop' 카테고리의 다른 글
hadoop secondary sort exercise 2 ( 보조 정렬 실습 2 ) (0) | 2017.02.21 |
---|---|
hadoop partial sort exercise ( 부분 정렬 실습 ) (0) | 2017.02.16 |
hadoop secondary sort exercise ( 보조 정렬 실습 ) (0) | 2017.02.06 |
hadoop Secondary Sort ( 보조 정렬 ) (0) | 2016.12.06 |
입력한 공항을 도착지로 년도별 최대 지연 도착 항공편 구하기 (0) | 2016.11.30 |
Comments