How to do feature selection and reduction on a LIBSVM file in Spark using Python? -
i have couple of libsvm files have implement clustering in spark using python. file has space delimiter , first column represents type [ 1 or -1] , rest features in format [1:2.566]. there lot of columns , perform feature selection on [preferably implement chisquaretest model] , use pca or svd perform feature reduction process. but, not find decent tutorial python in spark implement these processes.
i found link online had sample script implement chisqtest in python. used same logic implement model , not done. under hypothesis testing division in link, code parallelizes rdd[labeledpoint] before passing chisqtest model. tried same logic in different manner , got different errors.
data = mlutils.loadlibsvmfile(sc, "path/filename.txt") label = data.map(lambda x: x.label) features = data.map(lambda x: x.features) obs = sc.parallelize(labeledpoint(label,features))
this gave me error stating typeerror: float() argument must string or number
then, normalized data using normalizer() , did same thing , got same error. so, wrote function returns labeledpoint
def parsepoint(line): values = [float(x) x in line.split(' ')] return sc.parallelize(labeledpoint(values[0],values[1:])) parseddata = data.map(lambda x: parsepoint(x)) obs = sc.parallelize(parseddata)
this gave me error stating pipeline rdd not iterable. tried several other methods , ended in error. please tell me going wrong? and, feature reduction process using pca or svd, not find sample script in python. inputs helpful me.
stack trace:
py4jjavaerror traceback (most recent call last) <ipython-input-1-8d0164c0957d> in <module>() 10 sct = sparkcontext() 11 data = mlutils.loadlibsvmfile(sct, "path") ---> 12 print data.take(1) 13 #label = data.map(lambda x: x.label) 14 #features = data.map(lambda x: x.features) spark_home\rdd.pyc in take(self, num) 1263 1264 p = range(partsscanned, min(partsscanned + numpartstotry, totalparts)) -> 1265 res = self.context.runjob(self, takeuptonumleft, p, true) 1266 1267 items += res spark_home\context.pyc in runjob(self, rdd, partitionfunc, partitions, allowlocal) 879 mappedrdd = rdd.mappartitions(partitionfunc) 880 port = self._jvm.pythonrdd.runjob(self._jsc.sc(), mappedrdd._jrdd, partitions, --> 881 allowlocal) 882 return list(_load_from_socket(port, mappedrdd._jrdd_deserializer)) 883 spark\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, --> 538 self.target_id, self.name) 539 540 temp_arg in temp_args: spark\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise py4jjavaerror( 299 'an error occurred while calling {0}{1}{2}.\n'. --> 300 format(target_id, '.', name), value) 301 else: 302 raise py4jerror( py4jjavaerror: error occurred while calling z:org.apache.spark.api.python.pythonrdd.runjob. : org.apache.spark.sparkexception: job aborted due stage failure: task 0 in stage 1.0 failed 1 times, recent failure: lost task 0.0 in stage 1.0 (tid 2, localhost): java.net.socketexception: connection reset peer: socket write error @ java.net.socketoutputstream.socketwrite0(native method) @ java.net.socketoutputstream.socketwrite(unknown source) @ java.net.socketoutputstream.write(unknown source) @ java.io.bufferedoutputstream.write(unknown source) @ java.io.dataoutputstream.write(unknown source) @ java.io.filteroutputstream.write(unknown source) @ org.apache.spark.api.python.pythonrdd$.org$apache$spark$api$python$pythonrdd$$wr ite$1(pythonrdd.scala:413) @ org.apache.spark.api.python.pythonrdd$$anonfun$writeiteratortostream$1.apply(pythonrdd.scala:425) @ org.apache.spark.api.python.pythonrdd$$anonfun$writeiteratortostream$1.apply(pythonrdd.scala:425) @ scala.collection.iterator$class.foreach(iterator.scala:727) @ org.apache.spark.interruptibleiterator.foreach(interruptibleiterator.scala:28) @ org.apache.spark.api.python.pythonrdd$.writeiteratortostream(pythonrdd.scala:425) @ org.apache.spark.api.python.pythonrdd$writerthread$$anonfun$run$3.apply(pythonrd d.scala:248) @ org.apache.spark.util.utils$.loguncaughtexceptions(utils.scala:1772) @ org.apache.spark.api.python.pythonrdd$writerthread.run(pythonrdd.scala:208) driver stacktrace: @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$ $failjobandindependentstages(dagscheduler.scala:1266) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler .scala:1257) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1256) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47) @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1256) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dag scheduler.scala:730) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:730) @ scala.option.foreach(option.scala:236) @ org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler.scala:730) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1450) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala :1411) @ org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48)
mlutils.loadlibsvmfile
returns rdd[labeledpoint]
can pass output directly statistics.chisqtest
. using example data:
from pyspark.mllib.util import mlutils pyspark.mllib.stat import statistics data = mlutils.loadlibsvmfile(sc, 'data/mllib/sample_libsvm_data.txt') chisqresults = statistics.chisqtest(data) print chisqresults[-1]
Comments
Post a Comment