26 Octobre 2016

Résolution de bugs

Les deux fichiers m’indiquent sistématiquement la même erreur lors du pré-process :

16/10/26 09:03:30 WARN scheduler.TaskSetManager: Lost task 113.0 in stage 3.0 (TID 229, b54a482d57df): java.lang.NullPointerException
    at java.text.DecimalFormat.parse(DecimalFormat.java:1997)
    at java.text.NumberFormat.parse(NumberFormat.java:383)
    at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$4.apply$mcD$sp(CSVInferSchema.scala:259)
    at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$4.apply(CSVInferSchema.scala:259)
    at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$4.apply(CSVInferSchema.scala:259)
    at scala.util.Try.getOrElse(Try.scala:79)
    at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:259)
    at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:116)
    at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:85)
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:128)
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:127)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
    

Au début j’ai pensé que cela pouvait venir des valeurs nulles présentes dans le csv pour certaines valeurs. En essayant d’isoler le bug en faisant fonctionner le programme sur les 1000 premières lignes en éliminant les lignes comprenant des valeurs nulles j’ai pu remarquer que le programme ce terminait correctement, et contre mes attentes en introduisant une valeur nulle le programme fonctionne aussi. Le problème peut venir de la taille du fichier, nous allons essayer de générer des fichiers de 2, 5 et 10 Go puis essayer de trouver à partir de quelle taille le programme plante.

Premier diagnostic

La mémoire allouée par défaut par la JVM est 500 Mo je relance le programme en allouant 7 Go par exécuteur. On obtient exactement la même erreur.

Il est possible que le CSV contienne une erreur, comme un charactère mal placé. Essayons de parser le CSV avec un simple script python et de caster les deux collones qui nous intéressent dans des Doubles.

>>> with open('spm4.csv', newline='') as csvfile:
...     i = 0
...     reader = csv.reader(csvfile, delimiter=',')
...     for row in reader:
...         print(i)
...         if row[0] != "RAdeg" :
...             12.3214 + float(row[0])
...         i += 1
... 

L’exécution semble prendre beaucoup de temps. Essayons de trouver comment obtenir quelle ligne pose problème avec Spark, avec un try catch peut-être ? Mais cela me semble complexe car le fichier crossmatch.scala n’aparait pas dans la stacktrace (cela est logique car au moment de l’exécution le driver génère un DAG des opéartions à réaliser et génère un programme envoyé à chaque exécuteur.

Test en prenant un fichier fonctionnel

Je vais générer un fichier d’environ 15 Go en utilisant le fichier fonctionnel de départ et en le dupliquant.

/app/spark-2.0.1/bin/spark-submit \
    --class PreProcess \
    --master yarn \
    --deploy-mode cluster \
    --num-executors 6 \
    --executor-memory 7G \
    /mnt/jar/xmatch.scala-1.0.jar \
    hdfs://hdfs-namenode:8020/sdss9.test.csv RAdeg DEdeg 4096 240 hdfs://hdfs-namenode:8020/sdss9.test.parquet
    

Ce fichier pourtant valide, n’a pas fonctionné :

16/10/26 13:40:57 WARN scheduler.TaskSetManager: Lost task 16.0 in stage 4.0 (TID 248, 4cc166db2092): java.io.IOException: Stream is corrupted
        at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
        at org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
        at java.io.DataInputStream.read(DataInputStream.java:149)
        at org.spark_project.guava.io.ByteStreams.read(ByteStreams.java:899)
        at org.spark_project.guava.io.ByteStreams.readFully(ByteStreams.java:733)
        at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
        at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
        at scala.collection.Iterator$$anon$12.next(Iterator.scala:444)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
        at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 12244 of input buffer
        at net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:39)
        at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:205)
        ... 24 more

Gestion de la mémoire dans Spark

Explication rapide Documentation

Essayons de modifier l’algorithme de compression car au vu de la stacktrace c’est celui ci qui renvoie un exception :

–conf spark.io.compression.codec=snappy
On obtient au final la même erreur

Cette erreur est d’ailleur référencée sur le bug tracker Apache comme non résolue ici

Une suggestion est de désactiver la compression :

–conf spark.shuffle.compress=false –conf spark.shuffle.spill.compress=false
On obtient toujours une erreur au moment d’une lecture :

16/10/26 14:19:51 WARN scheduler.TaskSetManager: Lost task 95.0 in stage 4.0 (TID 327, 282f6afc53cc): java.io.EOFException: reached end of stream after reading 233940 bytes; 3801400 bytes expected
        at org.spark_project.guava.io.ByteStreams.readFully(ByteStreams.java:735)
        at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
        at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
        at scala.collection.Iterator$$anon$12.next(Iterator.scala:444)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
        at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
        
        

Test cache sur le disque

J’ai testé en rajoutant ceci au moment du calcul de l’indice healpix :

.persist(StorageLevel.MEMORY_AND_DISK)

On peut voir que les RDD sont effectivement placés sur le disque au vu des nombreux :

Added rdd_12_194 on disk on 282f6afc53cc:34816 (size: 65.2 MB)

Et cela fonctionne !

16/10/26 14:42:35 INFO yarn.Client: 
        client token: N/A
        diagnostics: N/A
        ApplicationMaster host: 10.0.0.28
        ApplicationMaster RPC port: 0
        queue: default
        start time: 1477492494982
        final status: SUCCEEDED
        tracking URL: http://fa8ac9f3e651:8088/proxy/application_1477467777659_0012/
        user: root

Le problème était donc qu’au moment du calcul des indices healpix les machines ne possédaient pas assez de mémoire pour pouvoir stocker le Dataframe complet, il en résultait donc des erreurs au moment où le programme voulait relire le Dataframe.

Les parties du Dataframe ne rentrant pas en mémoire au moment du calcul sont donc à ce moment stockées sur le disque et réutilisées lors de l’écriture du résultat, ce qui ne semblait pas être fait automatiquement.

Reprises des tests

Je vais donc lancer les tests sur les fichiers de 60 Go qui ne devraient normalement plus poser de soucis.