소프트웨어 개발/Scala - Functional

Spark를 이용한 데이터분석 (1) - 준비하기

늘근이 2016. 12. 25. 10:47

본 포스팅은 

Advanced Analytics with Spark, 한빛미디어

스칼라와 기계학습, PACKT

참조.


스파크와 스칼라의 조합을 쓰는 이유 

- 보통은 R에서 프로토타입을 만들고 (REPL) 파이썬으로 래핑되어있는 라이브러리를 이용하거나 C / Java계열로 포팅해서 쓰는데 반해, 스칼라는 그 자체로 운영에 적용할수 있는 JVM기반의 언어이며 스파크 쉘로 탐색적 분석을 가능케 한다.

- 스칼라 언어에서 오는 반복적인 모델링이나 전처리등의 깔끔함.

- 하둡생태계와 통합하기에 좋은 점.

- 맵 이후 리듀스 단계를 지키지 않아도 되며 바로 다음 단계로 임시 결과를 넘길 수 있음


1) UC어바인 기계학습저장소로 표본 데이터 발췌 (curl은 data transfer tool로, https://www.lesstif.com/pages/viewpage.action?pageId=14745703 참조)


사용하려는 데이터는 독일의 병원에서 나온 레코드 링크 연구. 확실한 규칙없이 들어가있는 수백만건의 환자 기록 묶음이 들어있음


$ curl -L -o donation.zip http://bit.ly/1Aoywaq

$ unzip donation.zip

$ unzip 'block_*.zip'


2) 스파크 다운로드

http://spark.apache.org/downloads.html

자바런타임이 설치되어있어야함.


3) 텍스트파일 로딩 및 몇가지 테스트


scala> val rawblocks = sc.textFile("linkage")

rawblocks: org.apache.spark.rdd.RDD[String] = linkage MapPartitionsRDD[3] at textFile at <console>:24


scala> rawblocks.first

res3: String = "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"


scala> val head = rawblocks.take(10)

head: Array[String] = Array("id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match", 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE, 39086,47614,1,?,1,?,1,1,1,1,1,TRUE, 70031,70237,1,?,1,?,1,1,1,1,1,TRUE, 84795,97439,1,?,1,?,1,1,1,1,1,TRUE, 36950,42116,1,?,1,1,1,1,1,1,1,TRUE, 42413,48491,1,?,1,?,1,1,1,1,1,TRUE, 25965,64753,1,?,1,?,1,1,1,1,1,TRUE, 49451,90407,1,?,1,?,1,1,1,1,0,TRUE, 39932,40902,1,?,1,?,1,1,1,1,1,TRUE)


scala> head.length

res4: Int = 10


scala> rawblocks.count()

res5: Long = 6552407  


scala> head.foreach(println)

"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"

37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE

39086,47614,1,?,1,?,1,1,1,1,1,TRUE

70031,70237,1,?,1,?,1,1,1,1,1,TRUE

84795,97439,1,?,1,?,1,1,1,1,1,TRUE

36950,42116,1,?,1,1,1,1,1,1,1,TRUE

42413,48491,1,?,1,?,1,1,1,1,1,TRUE

25965,64753,1,?,1,?,1,1,1,1,1,TRUE

49451,90407,1,?,1,?,1,1,1,1,0,TRUE

39932,40902,1,?,1,?,1,1,1,1,1,TRUE


scala> def isHeader(line: String): Boolean = {

     | line.contains("id_1")

     | }

isHeader: (line: String)Boolean


scala> head.filter(isHeader).foreach(println)

"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"


scala> head.filterNot(isHeader).length

res9: Int = 9



여기까지 보면 상당히 편리한점을 발견할수 있는데, 스파크가 RDD (Resilient Distributed Dataset)를 이용함에 있어서 보통의 컬렉션과 같이 람다함수와 축약표현을 이용해서 쉽게 데이터를 핸들링 할 수 있다는 것이다. 이는 스칼라의 장점에서 나오는 것으로 여러 축약 표현을 가능케 한다.


다만 현재 파싱된 데이터가 모두 문자열이므로 적당히 타입을 변경해야 하는데, 다음과 같이 이용가능하다


scala> val line = head(5)

line: String = 36950,42116,1,?,1,1,1,1,1,1,1,TRUE


scala> val pieces = line.split(',')

pieces: Array[String] = Array(36950, 42116, 1, ?, 1, 1, 1, 1, 1, 1, 1, TRUE)


scala> val id1 = pieces(0).toInt

id1: Int = 36950


scala> val id2 = pieces(1).toInt

id2: Int = 42116


scala> val matched = pieces(11).toBoolean

matched: Boolean = true


암묵적 형변환을 통해 쉽게 타입변환이 가능하며, 잘 모르겠으면 [누구나 쉽게 스칼라 + 플레이 , 한빛미디어]를 참조하도록 한다. 이글은 끄적거림이기 때문에 별다른 내용은 없지만 이걸 보고있는 사람이 안쓰러우므로 필요한 사람 책을 하나 드리겠다



scala> case class MatchData(id1: Int, id2: Int, scores:Array[Double], matched:Boolean)


scala> def parse(line: String) = {

     | val pieces = line.split(',')

     | val id1 = pieces(0).toInt

     | val id2 = pieces(1).toInt

     | val scores = pieces.slice(2,11).map(toDouble)

     | val matched = pieces(11).toBoolean

     | MatchData(id1, id2, scores, matched)

     | }


scala> val mds = head.filter(x => !isHeader(x)).map(x => parse(x))


scala> val parsed = noheader.map(line => parse(line))


케이스클래스는 자바의 클래스보다 강력한 면이 있는게 귀찮게 오버라이딩하거나 게터세터를 설정해주지 않아도 된다. 마찬가지로 추가적인 이해는 저 강력한 책을 보면 된다.


하다보니 데이터셋 자체에 문제가 있는것 같아 일단 맛뵈기는 여기서 1단계 마무리