소프트웨어 개발/Scala - Functional

스파크 전치...

늘근이 2018. 6. 20. 01:42
 import breeze.linalg._

val df = spark.sparkContext.parallelize(List((1.0,3.0,5.0), (2.0,4.0,6.0))).toDF()

val darr = df.collect()
val da = darr.map{ row => Array(row.getAs[Double](0), row.getAs[Double](1), row.getAs[Double](2) )}

val ncol = da.transpose.length
val nrow = da.length

val dd = da.transpose.flatten

val dv = new DenseMatrix(nrow, ncol, dd)

// val rdd1 = spark.sparkContext.parallelize(da).toDF()

// val dv = DenseMatrix(da)

val transposed = dv.t.toArray
val transposed2 = dv.toArray

val rdd = spark.sparkContext.parallelize(transposed)
val dfresult = rdd.toDF()

val rdd2 = spark.sparkContext.parallelize(transposed2)
val dfresult2 = rdd2.toDF()

dfresult.show()
dfresult2.show()

val aaa = transposed.grouped(ncol).toList.transpose

println(aaa.length)

val bbb = aaa.map{ array => Row(array:_*) }

val ccc = spark.sparkContext.parallelize(bbb)

val schema = StructType((0 until nrow).map{ el => StructField(el.toString, DoubleType, true) }.toArray)

val ddd = spark.sqlContext.createDataFrame(ccc, schema)

ddd.show()