본 포스팅은
Advanced Analytics with Spark, 한빛미디어
스칼라와 기계학습, PACKT
참조.
스파크와 스칼라의 조합을 쓰는 이유
- 보통은 R에서 프로토타입을 만들고 (REPL) 파이썬으로 래핑되어있는 라이브러리를 이용하거나 C / Java계열로 포팅해서 쓰는데 반해, 스칼라는 그 자체로 운영에 적용할수 있는 JVM기반의 언어이며 스파크 쉘로 탐색적 분석을 가능케 한다.
- 스칼라 언어에서 오는 반복적인 모델링이나 전처리등의 깔끔함.
- 하둡생태계와 통합하기에 좋은 점.
- 맵 이후 리듀스 단계를 지키지 않아도 되며 바로 다음 단계로 임시 결과를 넘길 수 있음
1) UC어바인 기계학습저장소로 표본 데이터 발췌 (curl은 data transfer tool로, https://www.lesstif.com/pages/viewpage.action?pageId=14745703 참조)
사용하려는 데이터는 독일의 병원에서 나온 레코드 링크 연구. 확실한 규칙없이 들어가있는 수백만건의 환자 기록 묶음이 들어있음
$ curl -L -o donation.zip http://bit.ly/1Aoywaq
$ unzip donation.zip
$ unzip 'block_*.zip'
2) 스파크 다운로드
http://spark.apache.org/downloads.html
자바런타임이 설치되어있어야함.
3) 텍스트파일 로딩 및 몇가지 테스트
scala> val rawblocks = sc.textFile("linkage")
rawblocks: org.apache.spark.rdd.RDD[String] = linkage MapPartitionsRDD[3] at textFile at <console>:24
scala> rawblocks.first
res3: String = "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
scala> val head = rawblocks.take(10)
head: Array[String] = Array("id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match", 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE, 39086,47614,1,?,1,?,1,1,1,1,1,TRUE, 70031,70237,1,?,1,?,1,1,1,1,1,TRUE, 84795,97439,1,?,1,?,1,1,1,1,1,TRUE, 36950,42116,1,?,1,1,1,1,1,1,1,TRUE, 42413,48491,1,?,1,?,1,1,1,1,1,TRUE, 25965,64753,1,?,1,?,1,1,1,1,1,TRUE, 49451,90407,1,?,1,?,1,1,1,1,0,TRUE, 39932,40902,1,?,1,?,1,1,1,1,1,TRUE)
scala> head.length
res4: Int = 10
scala> rawblocks.count()
res5: Long = 6552407
scala> head.foreach(println)
"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
39086,47614,1,?,1,?,1,1,1,1,1,TRUE
70031,70237,1,?,1,?,1,1,1,1,1,TRUE
84795,97439,1,?,1,?,1,1,1,1,1,TRUE
36950,42116,1,?,1,1,1,1,1,1,1,TRUE
42413,48491,1,?,1,?,1,1,1,1,1,TRUE
25965,64753,1,?,1,?,1,1,1,1,1,TRUE
49451,90407,1,?,1,?,1,1,1,1,0,TRUE
39932,40902,1,?,1,?,1,1,1,1,1,TRUE
scala> def isHeader(line: String): Boolean = {
| line.contains("id_1")
| }
isHeader: (line: String)Boolean
scala> head.filter(isHeader).foreach(println)
"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
scala> head.filterNot(isHeader).length
res9: Int = 9
여기까지 보면 상당히 편리한점을 발견할수 있는데, 스파크가 RDD (Resilient Distributed Dataset)를 이용함에 있어서 보통의 컬렉션과 같이 람다함수와 축약표현을 이용해서 쉽게 데이터를 핸들링 할 수 있다는 것이다. 이는 스칼라의 장점에서 나오는 것으로 여러 축약 표현을 가능케 한다.
다만 현재 파싱된 데이터가 모두 문자열이므로 적당히 타입을 변경해야 하는데, 다음과 같이 이용가능하다
scala> val line = head(5)
line: String = 36950,42116,1,?,1,1,1,1,1,1,1,TRUE
scala> val pieces = line.split(',')
pieces: Array[String] = Array(36950, 42116, 1, ?, 1, 1, 1, 1, 1, 1, 1, TRUE)
scala> val id1 = pieces(0).toInt
id1: Int = 36950
scala> val id2 = pieces(1).toInt
id2: Int = 42116
scala> val matched = pieces(11).toBoolean
matched: Boolean = true
암묵적 형변환을 통해 쉽게 타입변환이 가능하며, 잘 모르겠으면 [누구나 쉽게 스칼라 + 플레이 , 한빛미디어]를 참조하도록 한다. 이글은 끄적거림이기 때문에 별다른 내용은 없지만 이걸 보고있는 사람이 안쓰러우므로 필요한 사람 책을 하나 드리겠다
scala> case class MatchData(id1: Int, id2: Int, scores:Array[Double], matched:Boolean)
scala> def parse(line: String) = {
| val pieces = line.split(',')
| val id1 = pieces(0).toInt
| val id2 = pieces(1).toInt
| val scores = pieces.slice(2,11).map(toDouble)
| val matched = pieces(11).toBoolean
| MatchData(id1, id2, scores, matched)
| }
scala> val mds = head.filter(x => !isHeader(x)).map(x => parse(x))
scala> val parsed = noheader.map(line => parse(line))
케이스클래스는 자바의 클래스보다 강력한 면이 있는게 귀찮게 오버라이딩하거나 게터세터를 설정해주지 않아도 된다. 마찬가지로 추가적인 이해는 저 강력한 책을 보면 된다.
하다보니 데이터셋 자체에 문제가 있는것 같아 일단 맛뵈기는 여기서 1단계 마무리
'소프트웨어 개발 > Scala - Functional' 카테고리의 다른 글
Spark 멀티노드 슬레이브 활성화 및 에러노트 (0) | 2017.01.08 |
---|---|
라즈베리파이와 똥컴으로 하둡 구성 (작업중) (0) | 2017.01.05 |
순회 출력 (0) | 2016.09.16 |
스칼라 loop break (0) | 2016.09.14 |
Scala 다차원 배열 및 초기화 (0) | 2016.09.14 |