소프트웨어 개발/Hadoop Ecosystem

Hadoop 빠르게 설치하고 쉽게 이용하기

늘근이 2015. 5. 31. 10:15

HADOOP 빠르게 설치하고 이용해보기.

 

하둡 빠르게 짚고 넘어가기

하둡은 파일 시스템으로써, RDBMS(관계형 데이터베이스)를 대체해버릴수 있는 강력한 요술봉이 아닙니다. 데이터 무결성과 정합성이 중요한 은행거래와 같은 경우는 RDBMS를 이용하고, 다량의 로그나 데이터 처리를 할때 빠르게 처리할 경우에는 HDFS를 이용합니다.

) New York Times 130년치 신문 -> PDF 변환 작업을 하루만에 끝냄. (기존 방식 10년 예상)

 

하둡 파일 시스템은 오픈소스이고 여러사람에 의해 발전되어왔기 때문에 사람을 힘들게 하는 SAP 설치보다 훨씬 간단합니다. 다만 windows 설치는 여의치 않기 때문에 linux로 진행합니다.

시스템 설치환경 (1)

Ubuntu Linux 14.04 Server

듀얼코어 1.6Ghz / 4.0GB / 500GB

 * 기본적으로 4대 정도는 있어야 분산환경 테스트가 유효하지만 자금문제로 타협.

 

다음의 페이지(http://www.apache.org/dyn/closer.cgi/hadoop/common)에서 다운로드를 받아서 서버에 올려서 압축을 해제하면 됩니다.



서버에 자바는 설치되어있을테니, 다음과 같은 명령어로 path를 확인하고 설정파일에 자바가 어디있는지 알려주기만 하면 됩니다.

$ which java

 

이제 설정파일을 수정만 하면 됩니다.

/conf $ vim hadoop-env.sh

바로 보이듯이, 필수 조건은 JAVA_HOME 만입니다. # 주석표시를 지워주는것을 잊지 말아주세요



 

그리고, 서버간 통신을 해야하기 때문에 SSH 키를 하나 생성해주어야 합니다.

$ ssh-keygen –t rsa

 

마지막으로, bin에 있는 start-all.sh 를 실행시키면 됩니다.

bin $ ./start-all.sh

 



* 서버의 설정에 따라, root로그인이 되지 않을경우 /etc/ssh/sshd_config 파일에서 PermitRootLogin yes 로 설정해주고 다시 restart해주어야 할 필요가 있을 수 있습니다.

 

예제 프로그램 wordcount 이용해보기

 

모든 세팅이 완료되었다면, 간단한 기능을 확인하기 위해, 하둡에서 제공하는 예제 프로그램인 wordcount를 이용해 삼성SDS 에 관련된 인터넷 정보들에서 얻을수 있는 단어들을 분석해 봅니다. 재밌겠죠?

먼저 무작위로 가져온 정보들을 txt파일로 저장한후(저장시 UTF-8 인코딩 필수), 다음과 같은 명령어로 wordcount프로그램을 실행합니다.

$ ./bin/hadoop jar hadoop-examples-*.jar wordcount test.txt output1

 

Jar로 되어있는 자바파일안 wordcount 프로그램을 실행시키되, sds.txt 파일을 읽어서 output1이라는 이름으로 출력하라는 것입니다. 다음과 같은 실행 결과 창을 볼수 있습니다.



실행로그가 위와같이 뜨는것을 확인했으니 실제 결과도 한번 들여다 봅니다. 결과는 현재 디렉터리에서 output1아래, 파일이름은 part-r-00000으로 생성이 되어있습니다.

$ ./bin/hadoop fs –cat output1/part-r-00000

 

 



갖가지 정보들이 count 되어서 정보를 출력해주는것을 확인할수있습니다.

 

사실, 단어의 갯수를 세주는것은 그다지 대단한 기능도 아닙니다. 그냥 ABAP COLLECT 기능이나 다를게 없습니다. 다만, 하둡은 밑단에서 여러 데이터 노드들이 병렬처리로 돌아가고 있기 때문에, 저급한 20만원도 안되는 넷북을 잔뜩 사서 필요한만큼 데이터노드로 연결해도 데이터 동기화나 멀티쓰레딩 프로그래밍의 문제로 골머리를 썩을 필요 없이 상당한 효과를 볼수 있다는 것입니다.

 

예제 프로그램 말고 실제로 프로그램을 짜기 위해서는 Eclipse JAVA 에서 hadoop을 라이브러리에 추가하고 여러가지 클래스를 import해서 짜게 됩니다.

 

이제 실제로 빅데이터 JAVA를 가지고 프로그램을 만들어보겠습니다.

먼저 빅데이터에 해당되는 샘플 데이터가 필요한데, stat-computing.org에는 여러 통계학적인 데이터를 제공하고 있습니다. 여기에는 SAP 샘플데이터 테이블에서 자주보던 항공표를 1987년부터 2008년까지의 자료를 다운로드 해볼수 있고 데이터는 총 12천 건이며, 순수 CSV파일로 약 11기가 정도 됩니다. 우리는 여기서 DELAY된 건수를 월별로 요약해서 보고 싶습니다.

 


 

하둡 프로그래밍은 크게 네가지 파일로 나눠져있다고 할수 있겠습니다.

드라이버(Driver) / 매퍼(Mapper) / 리듀서(Reducer) / 기타

드라이버는 복잡하게 생각할 필요 없이 그냥 시동을 거는 운전자와 같은 클래스로 보면 됩니다. 총 관리클래스로써 매퍼와 리듀서, 기타 클래스를 불러와 기본적인 세팅을 하고 요이땅!하는 동작을 실행합니다. 실제적인 main 메서드가 존재하고 있기 때문에 하둡 프로그램을 시동할때 부르는 클래스입니다.

매퍼는 해당하는 텍스트를 읽어서, 데이터를 매핑하면서 처리하는 클래스 입니다. 우리가 받는 비행기 정보를 읽으면서 delay된 정보가 나올때마다 ‘2015.01 , 1’ ‘2014.12 , 1’ ‘2013.12, 1’ 이러한 식으로 COLLECT하기 좋은 구조로 변환합니다. (네 아밥의 그 COLLECT와 비슷한 개념인듯 합니다.)

리듀서는 실제로 COLLECT를 시작해서 계산을 합하는 과정입니다. 실제로 매퍼에서 더해지기 좋은 구조로 변환했기 때문에 이제 분류별로 합해버리면 됩니다.

기타에서는 그밖에 다른 공통적인 로직이나, 필요한 부분을 자유롭게 쓰면 됩니다.

 

매퍼와 리듀스에 대해서는 다음과 같은 그림으로 이해할수 있습니다.



*출처 : http://chimera.labs.oreilly.com/books/1234000001811/ch01.html#runs_on_hadoop

Map 단계에서 텍스트를 읽어서 계산하기 편한 구조로 만들어지고 있고, Reduce단계에서 이를 하나의 정보로 묶는 작업을 하고 있습니다. 하둡에서는 자바를 통해 프로그래밍을 구현하고 있습니다. (소스코드 부록참조) 자바로 구현하기 위해서는 Build path hadoop-core.jar를 라이브러리로 등록을 하면 됩니다.





 

컴퓨팅 성능에 따라서 어느정도의 시간이 흐른 후에 다음과 같은 로그를 볼수있게 됩니다.



 

이제 지정해준 output폴더에 가보면 결과를 볼수 있습니다. (MS워드로 결과파일 열기결과)



실제로 그래프를 한번 그려보면,



데이터의 분석은 월별로도 분석할수 있고 년도의 흐름에 따라 분석할수도 있겠으나, 대체로 연말(11~12) 에 갈수록 딜레이가 심해지는 경향이 있습니다.

 

11기가가 넘는 텍스트 데이터를 분석하는데 있어(12천만건) 얼마나 시간이 걸리는지도 굉장히 중요한 사항입니다. 앞서 스펙을 언급드렸듯이, 실제로 테스팅한 기계는 굉장히 낙후하고 성능이 떨어져서 윈도우조차 깔기 어려웠던 넷북으로써, 빅데이터를 처리하는데에 있어 굉장히 시간이 많이 걸렸던것이 사실입니다. Map Reduce의 시간 비율만 참고하면 될것 같습니다. 시간은 다음과 같습니다.

 

테스트 기종

Map

Reduce

듀얼코어 1.6Ghz (넷북용)

43

8

듀얼코어 3Ghz X 4

3 (참고서적 주장)

 

결과로 알수있는것은, 몇가지 실험이 더 필요할수 있겠고 병렬적으로 계산을 하기 때문에 당연한 결과인지도 모르겠지만, scale-out / scale-up 에 따른 모든 컴퓨팅 자원의 업그레이드에 대해 계산속도가 상당히 선형적으로 개선될 수 있다는 것입니다.

이렇게 하둡은 어느정도의 코딩에 대한 기본 개념이 있으면 기초적인 지식을 가지고도 접근할수 있었습니다. 하둡은 하둡 뿐만아니라, 굉장히 광범위한 생태계를 가지고 있고, 이러한 수많은 기술들을 잘 이용을 하고 때에 맞는 조합을 할수 있으면 큰 힘이 될것같다는 생각이 듭니다.

삼성전자에서는 기존에 쌓여있는 어마어마한 데이터들을 어떻게 잘 분석해서 이용해볼까라는 생각을 가지고만 있다면, sqoop(http://sqoop.apache.org)와 같이 신속하게 데이터를 RDBMS에서 추출해 내어 비싸지 않은 장비들을 병렬로 연결하고, 하둡을 이용하여 데이터를 분석하고 ( 인메모리기반 spark(http://spark.apache.org) 가 굉장히 빠르다고 하네요 ) 유의미한 결과를 추출해낼 수 있을것 같습니다.

 

참고자료

l  시작하세요 하둡프로그래밍, 정재화, 위키북스, 2015

부록

(책에 있는 소스에서 변수 네이밍에서 조금 변형을 가했지만, 프로그램 구조는 개선점이 있는 것 같아 부록으로 뺍니다.)

드라이버 코드 – DelayDriver.java

package sflight;

 

 

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

 

public class DelayDriver {

        public static void main(String[] args) throws Exception {

              

               Configuration conf = new Configuration();

              

               //인자 확인

               if (args.length != 2) {

                       System.err.println("Need 2 arguments [Input] [Output]");

                       System.exit(2);

               }

              

               Job job = new Job(conf, "DelayCount");

              

               FileInputFormat.addInputPath(job, new Path(args[0]));

               FileOutputFormat.setOutputPath(job, new Path(args[1]));

              

               job.setJarByClass(DelayDriver.class);

               job.setMapperClass(DelayMapper.class);

               job.setReducerClass(DelayReducer.class);

               job.setInputFormatClass(TextInputFormat.class);

               job.setOutputFormatClass(TextOutputFormat.class);

              

               job.setOutputKeyClass(Text.class);

               job.setOutputValueClass(IntWritable.class);

               job.waitForCompletion(true);

              

              

        }//end main

}//end public class

 

 

매퍼 코드 – DelayMapper.java

package sflight;

 

import java.io.IOException;

 

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

public class DelayMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

       

        // 출력값

        private final static IntWritable outputValue = new IntWritable(1);

       

        // 출력키

        private Text outputKey = new Text();

       

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

               FlightParser parser = new FlightParser(value);

               outputKey.set(parser.getYear() + "," + parser.getMonth());

              

               if (parser.getDepartureDelayTime() > 0) {

                       context.write(outputKey, outputValue);

               }

        }

}

 

 

리듀서 코드 – DelayReducer.java

package sflight;

 

import java.io.IOException;

 

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class DelayReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

       

        private IntWritable result = new IntWritable();

       

        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

               int sum = 0;

               for (IntWritable value : values) sum += value.get();

               result.set(sum);

               context.write(key, result);

        }

       

}

 

 

기타 공통코드 – FlightParser.java

package sflight;

 

import org.apache.hadoop.io.Text;

 

public class FlightParser {

 

        int year;

        int month;

        int arriveDelayTime = 0;

        int departureDelayTime = 0;

        int distance = 0;

        boolean arriveDelayAvailable = true;

        boolean departureDelayAvailable = true;

        boolean distanceAvailable = true;

        String scarr;

 

        public FlightParser(Text text) {

               try {

                       String[] columns = text.toString().split(",");

                       year = Integer.parseInt(columns[0]);

                       month = Integer.parseInt(columns[1]);

                       scarr = columns[8];

                       if (!columns[15].equals("NA")) {

                              departureDelayTime = Integer.parseInt(columns[15]);

                       } else {

                              departureDelayAvailable = false;

                       }

                       if (!columns[14].equals("NA")) {

                              arriveDelayTime = Integer.parseInt(columns[14]);

                       } else {

                              arriveDelayAvailable = false;

                       }

                       if (!columns[18].equals("NA")) {

                              distance = Integer.parseInt(columns[18]);

                       } else {

                              distanceAvailable = false;

                       }

               } catch (Exception e) {

                       System.out.println("Error parsing a record : " + e.getMessage());

               }

        }

        // 부분에는 getter메서드를 구현합니다

}