How to do feature selection and reduction on a LIBSVM file in Spark using Python? -


i have couple of libsvm files have implement clustering in spark using python. file has space delimiter , first column represents type [ 1 or -1] , rest features in format [1:2.566]. there lot of columns , perform feature selection on [preferably implement chisquaretest model] , use pca or svd perform feature reduction process. but, not find decent tutorial python in spark implement these processes.

i found link online had sample script implement chisqtest in python. used same logic implement model , not done. under hypothesis testing division in link, code parallelizes rdd[labeledpoint] before passing chisqtest model. tried same logic in different manner , got different errors.

data = mlutils.loadlibsvmfile(sc, "path/filename.txt") label = data.map(lambda x: x.label) features = data.map(lambda x: x.features) obs = sc.parallelize(labeledpoint(label,features)) 

this gave me error stating typeerror: float() argument must string or number

then, normalized data using normalizer() , did same thing , got same error. so, wrote function returns labeledpoint

def parsepoint(line):     values = [float(x) x in line.split(' ')]     return sc.parallelize(labeledpoint(values[0],values[1:])) parseddata = data.map(lambda x: parsepoint(x)) obs = sc.parallelize(parseddata) 

this gave me error stating pipeline rdd not iterable. tried several other methods , ended in error. please tell me going wrong? and, feature reduction process using pca or svd, not find sample script in python. inputs helpful me.

stack trace:

py4jjavaerror                             traceback (most recent call last) <ipython-input-1-8d0164c0957d> in <module>()   10 sct = sparkcontext()   11 data = mlutils.loadlibsvmfile(sct, "path")   ---> 12 print data.take(1)   13 #label = data.map(lambda x: x.label)   14 #features = data.map(lambda x: x.features)    spark_home\rdd.pyc in take(self, num)   1263    1264 p = range(partsscanned, min(partsscanned + numpartstotry,   totalparts))  -> 1265 res = self.context.runjob(self, takeuptonumleft, p, true)   1266    1267 items += res    spark_home\context.pyc in runjob(self, rdd, partitionfunc, partitions, allowlocal)    879         mappedrdd = rdd.mappartitions(partitionfunc)    880         port = self._jvm.pythonrdd.runjob(self._jsc.sc(),   mappedrdd._jrdd, partitions,    --> 881 allowlocal)    882         return list(_load_from_socket(port, mappedrdd._jrdd_deserializer))    883        spark\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py in __call__(self, *args)    536 answer = self.gateway_client.send_command(command)    537 return_value = get_return_value(answer, self.gateway_client,    --> 538 self.target_id, self.name)    539     540  temp_arg in temp_args:     spark\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)    298 raise py4jjavaerror(    299 'an error occurred while calling {0}{1}{2}.\n'.    --> 300  format(target_id, '.', name), value)    301   else:    302   raise py4jerror(    py4jjavaerror: error occurred while calling       z:org.apache.spark.api.python.pythonrdd.runjob.   : org.apache.spark.sparkexception: job aborted due stage failure: task   0 in stage 1.0 failed 1 times, recent failure: lost task 0.0 in stage 1.0   (tid 2, localhost): java.net.socketexception: connection reset peer: socket      write error    @ java.net.socketoutputstream.socketwrite0(native method)    @ java.net.socketoutputstream.socketwrite(unknown source)    @ java.net.socketoutputstream.write(unknown source)    @ java.io.bufferedoutputstream.write(unknown source)    @ java.io.dataoutputstream.write(unknown source)    @ java.io.filteroutputstream.write(unknown source)    @    org.apache.spark.api.python.pythonrdd$.org$apache$spark$api$python$pythonrdd$$wr       ite$1(pythonrdd.scala:413)    @   org.apache.spark.api.python.pythonrdd$$anonfun$writeiteratortostream$1.apply(pythonrdd.scala:425)    @  org.apache.spark.api.python.pythonrdd$$anonfun$writeiteratortostream$1.apply(pythonrdd.scala:425)    @ scala.collection.iterator$class.foreach(iterator.scala:727)    @   org.apache.spark.interruptibleiterator.foreach(interruptibleiterator.scala:28)    @ org.apache.spark.api.python.pythonrdd$.writeiteratortostream(pythonrdd.scala:425)    @ org.apache.spark.api.python.pythonrdd$writerthread$$anonfun$run$3.apply(pythonrd d.scala:248)    @ org.apache.spark.util.utils$.loguncaughtexceptions(utils.scala:1772)    @ org.apache.spark.api.python.pythonrdd$writerthread.run(pythonrdd.scala:208)    driver stacktrace:    @   org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$ $failjobandindependentstages(dagscheduler.scala:1266)   @  org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler .scala:1257)   @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1256)   @   scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59)  @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47)   @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1256)   @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dag scheduler.scala:730)   @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:730)   @ scala.option.foreach(option.scala:236)   @ org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler.scala:730)   @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1450)   @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala :1411)  @ org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48) 

mlutils.loadlibsvmfile returns rdd[labeledpoint] can pass output directly statistics.chisqtest. using example data:

from pyspark.mllib.util import mlutils pyspark.mllib.stat import statistics  data = mlutils.loadlibsvmfile(sc, 'data/mllib/sample_libsvm_data.txt') chisqresults = statistics.chisqtest(data)  print chisqresults[-1] 

Comments

Popular posts from this blog

c# - Better 64-bit byte array hash -

webrtc - Which ICE candidate am I using and why? -

php - Zend Framework / Skeleton-Application / Composer install issue -