14 Octobre 2016

Monitoring des machines

On installe pcp sur chaque noeud (service qui va rapporter différentes statistiques : réseau, disques) puis on installe Graphana en local (interface web pour regrouper toutes ces données.

Exemple

Exemple

Sur cette capture d’écran on peut observer que durant un calcul avec Spark (ici un préprocessing les disques dur de tous les noeuds sont bloqués à 100% d’utilisation alors que le réseau ne semble pas limiter le temps de calcul. Il est donc possible d’améliorer les performances en utilisant des RAID de disques ou des SSD.

Résolution du bug lors du préprocessing avec des gros fichiers

Pour le moment si je teste le préprocessing avec des fichier plus importants (> 50 Go) le calcul s’arète au moment de l’enregistrement du parquet ou du RDD

Erreur

16/10/14 12:32:29 WARN scheduler.TaskSetManager: Lost task 20.0 in stage 4.0 (TID 961, 10.0.0.12): org.apache.spark.SparkException: Task failed while writing rows
        at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Stream is corrupted
        at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
        at org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
        at java.io.DataInputStream.read(DataInputStream.java:149)
        at org.spark_project.guava.io.ByteStreams.read(ByteStreams.java:899)
        at org.spark_project.guava.io.ByteStreams.readFully(ByteStreams.java:733)
        at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
        at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
        at scala.collection.Iterator$$anon$12.next(Iterator.scala:444)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
        at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:254)
        at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
        at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1345)
        at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
        ... 8 more
Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 11942 of input buffer
        at net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:39)
        at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:205)
        ... 25 more

Résolution possible

Comme on peut le voir la cause originale de l’erreur vient de net.jpountz.lz4, une recherche nous apprend que lz4 est un algorithme de compression et d’après at net.jpountz.lz4.LZ4JNIFastDecompressor.decompress le problème est survenu durant la décompression de données. Je pense qu’il pourrait s’agire de données mise en cache (par exemple lors du parsing des CSV) et ces données ont pu être corrompues. Une possibilité serait que des données aient été perdues suite à une mauvaise configuration (par exemple : le cache disque qui n’est pas utilisé et donc la RAM ne suffit pas).

Nouvelle erreur

16/10/14 14:34:05 ERROR executor.Executor: Exception in task 145.0 in stage 3.0 (TID 638)
java.lang.IllegalArgumentException
        at java.nio.Buffer.position(Buffer.java:244)
        at org.apache.spark.sql.execution.columnar.DirectCopyColumnType$class.extract(ColumnType.scala:373)
        at org.apache.spark.sql.execution.columnar.STRING$.extract(ColumnType.scala:391)
        at org.apache.spark.sql.execution.columnar.compression.PassThrough$Decoder.next(compressionSchemes.scala:60)
        at org.apache.spark.sql.execution.columnar.compression.CompressibleColumnAccessor$class.extractSingle(CompressibleColumnAccessor.scala:37)
        at org.apache.spark.sql.execution.columnar.NativeColumnAccessor.extractSingle(ColumnAccessor.scala:70)
        at org.apache.spark.sql.execution.columnar.BasicColumnAccessor.extractTo(ColumnAccessor.scala:56)
        at org.apache.spark.sql.execution.columnar.NativeColumnAccessor.org$apache$spark$sql$execution$columnar$NullableColumnAccessor$$super$extractTo(ColumnAccessor.scala:70)
        at org.apache.spark.sql.execution.columnar.NullableColumnAccessor$class.extractTo(NullableColumnAccessor.scala:52)
        at org.apache.spark.sql.execution.columnar.NativeColumnAccessor.extractTo(ColumnAccessor.scala:70)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.next(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.next(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
        

L’erreur précédente n’est plus survenue, cependant une autre erreur IllegalArgumentException est apparue