12 Septembre 2016

Articles intéressants

Résolution des problèmes de connectivité Spark Utilisation du CLI hdfs

Problème à résoudre

La commande pour démarer un worker est la suivante (depuis un container actif afin d’avoir directement le log) :

    /usr/local/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077 --properties-file /spark-defaults.conf

Lors du lancement d’un worker en tant que service il n’arrive pas à se connecter au master :

    WARN Worker: Failed to connect to master spark-master:7077

Par contre cela fonctionne sans problème si on lance cette commande depuis le même container que le master (spark arrive donc à résoudre le nom spark-master ainsi qu’à accéder au master par l’IP virtuelle)

Solution fonctionelle Spark en mode swarm

Premièrement s’assurer que les noeuds sont connectés en mode swarm et qu’un réseau spark-net soit créé en overlay :

docker network create -d overlay spark-net

J’ai finalement utilisé une autre image Docker

Il est donc possible de démarer un Master avec :

docker service create --name spark-master --network spark-net gettyimages/spark /usr/spark-2.0.0/bin/spark-class org.apache.spark.deploy.master.Master

Et de démarer un Worker sur chaque noeud avec :

docker service create --replicas=20 --name spark-worker --network spark-net gettyimages/spark /usr/spark-2.0.0/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077

l’option replicas correspond au nombre de workers que l’on veut créer. Il est possible de la remplacer par --mode global afin de créer un worker par noeud.

Une fois HDFS fonctionnel on pourrait voir comment cette image à été conçue afin de pouvoir en assurer la maintenance nous même en cas d’abandon du projet par ses créateurs

Installation de HDFS en mode swarm

Emplacement par défaut HDFS

Le 8 septembre j’avais déjà tenté rapidement de mettre HDFS en place mais il y avais un problème : la configuration pointait au mauvais endroit.

Commandes fonctionelles

docker service create --name hdfs-namenode -e CORE_CONF_fs_defaultFS=hdfs://hdfs-namenode:8020 -e CLUSTER_NAME=test -e CORE_CONF_hadoop_http_staticuser_user=root -e HDFS_CONF_dfs_webhdfs_enabled=true -e HDFS_CONF_dfs_permissions_enabled=false --network spark-net -p 50070:50070 bde2020/hadoop-namenode

docker service create --name hdfs-datanode --replicas 3 -e CORE_CONF_fs_defaultFS=hdfs://hdfs-namenode:8020 -e CORE_CONF_hadoop_http_staticuser_user=root -e HDFS_CONF_dfs_webhdfs_enabled=true -e HDFS_CONF_dfs_permissions_enabled=false --network spark-net bde2020/hadoop-datanode

Test du système

Un programme de test en Java m’a été fournit et consiste en un crossmatch de deux très grandes listes d’étoiles.

Fonctionnement

  • Les deux fichiers serons mis sur le cluster HDFS afin que chaque worker puisse y accéder.
  • On modifie le code source Java afin d’indiquer les bonnes IP (notement pour HDFS).
  • On envoie un JAR au master node Spark en lui indiquant quelle classe démarer.

1. Envoi des fichiers sur le HDFS

Un problème se pose directement, comment accéder a HDFS sachant que les noeuds sont connecté entre eux par un réseau privé. Plusieurs solutions sont possible :

  • Créer une image Docker contenant les fichiers et les instructions
  • Trouver le moyen de faire une passerelle

La deuxieme solution me semble plus viable sur le long terme car une fois cette passerelle mise en place la suite sera beaucoup plus simple.

Test préliminaire

On peut déjà par commencer à tester simplement si on arrive à créer un dossier sur htfs. On crée un service avec hadoop :

docker service create --name hdfs-shell -e CORE_CONF_fs_defaultFS=hdfs://hdfs-namenode:8020 -e CORE_CONF_hadoop_http_staticuser_user=root -e HDFS_CONF_dfs_webhdfs_enabled=true -e HDFS_CONF_dfs_permissions_enabled=false --network spark-net bde2020/hadoop-base sleep 3000

le sleep 3000 permet d’avoir le temps de se connecter en ssh sur la machine qui a démaré le service. Il suffit ensuite de démarer un bash dessus :

docker exec -it hdfs-shell.1.4yo0kelhiewklle02476cmqhu /bin/bash

Le nom du container sera surement différent car il est généré en partie aléatoirement. Une fois dans le bash :

root@f97fb790dc35:/# hdfs dfs -mkdir /test
16/09/12 12:49:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

root@f97fb790dc35:/# hdfs dfs -ls /
16/09/12 12:52:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 1 items
drwxr-xr-x   - root supergroup          0 2016-09-12 12:49 /test

Meilleure solution

Lors de la création d’un service il est possible de forcer un noeud à le prendre en charge. Dans notre cas cela est utile pour lancer un container connecté au réseau spark-net et y accéder directement depuis notre machine.

docker service create --name hdfs-shell -e CORE_CONF_fs_defaultFS=hdfs://hdfs-namenode:8020 -e CORE_CONF_hadoop_http_staticuser_user=root -e HDFS_CONF_dfs_webhdfs_enabled=true -e HDFS_CONF_dfs_permissions_enabled=false --network spark-net --constraint node.id==f308skpi0pxi41dujetjsot64 --mount type=bind,src=/data/spark-test,dst=/data,readonly bde2020/hadoop-base sleep 3000

2. Test depuis un container Spark

docker service create --name spark-shell --constraint node.id==f308skpi0pxi41dujetjsot64 --mount type=bind,src=/data/spark-test,dst=/data,readonly --network spark-net gettyimages/spark sleep 3000

docker exec -it spark-shell.1.4m8vc58feiy190q3o8em73pk6 /bin/bash

root@a3aef1042121:/usr/spark-2.0.0# hdfs dfs -fs hdfs-namenode:8020 -ls /          
16/09/12 14:08:51 WARN fs.FileSystem: "hdfs-namenode:8020" is a deprecated filesystem name. Use "hdfs://hdfs-namenode:8020/" instead.
Found 2 items
-rw-r--r--   1 root supergroup 1539629271 2016-09-12 13:40 /sdss9.l2.i0.csv
-rw-r--r--   1 root supergroup  284655961 2016-09-12 13:42 /tmass.l2.i0.csv

Il ne reste maintenant plus qu’a comprendre comment fonctionne le programme en Java et comment le lancer

-. Modification du script run.sh

Le script fournit ne permettait pas de spécifier le namenode, j’ai donc ajouté une variable hdfs_namenode=hdfs://hdfs-namenode:8020/

Il reste un problème par rapport à Scala, pour la suite j’utiliserais l’autre programme en Java uniquement

3. Lancement des commandes indiquées dans le code source

Signature invalide lors du lancement du JAR

Preprocess

spark-submit --master spark://spark-master:7077 \
            --class cds.xmatch.spark.PreprocessData \
            --name CDS_XMatch \
            --executor-memory=13g \
            --conf spark.local.dir=/tmp/spark \
            --conf spark.storage.memoryFraction=0 \
            --conf spark.serializer=org.apache.spark.serializer.KryoSerialize \
            --driver-memory 2G --driver-java-options "-ea" \
            cds.xmatch.spark.jar hdfs://hdfs-namenode:8020/data/xmatch/sdss9.l2.i0.csv \
            RAdeg 1 2 4096 10 hdfs://hdfs-namenode:8020/data/xmatch/sdss9.rdd
            
            
spark-submit --master spark://spark-master:7077 \
            --class cds.xmatch.spark.PreprocessData \
            --name CDS_XMatch \
            --executor-memory=13g \
            --conf spark.local.dir=/tmp/spark \
            --conf spark.storage.memoryFraction=0 \
            --conf spark.serializer=org.apache.spark.serializer.KryoSerialize \
            --driver-memory 2G --driver-java-options "-ea" \
            cds.xmatch.spark.jar hdfs://hdfs-namenode:8020/data/xmatch/tmass.l2.i0.csv \
            RAJ2000 1 2 4096 10 hdfs://hdfs-namenode:8020/data/xmatch/tmass.rdd
            
            

Crossmatch

spark-submit --master spark://spark-master:7077 \
     --class cds.xmatch.spark.CrossMatch \
     --name CDS_XMatch \
     --executor-memory=13g \
     --conf spark.local.dir=/tmp/spark \
     --conf spark.storage.memoryFraction=0 \
     --conf spark.serializer=org.apache.spark.serializer.KryoSerialize \
     --driver-memory 2G --driver-java-options "-ea" \
     cds.xmatch.spark.jar hdfs://hdfs-namenode:8020/data/xmatch/sdss9.rdd \
     hdfs://hdfs-namenode:8020/data/xmatch/tmass.rdd 4096 5 \
     hdfs://hdfs-namenode:8020/data/xmatch/xmatchRes.txt

4. Résultats obtenus

Le preprocess à l’air de fonctionner correctement, par contre le crossmatch se termine rapidement et renvoie plusieurs erreur. Il est possible que j’ai fais une erreur pour le preprocess du fichier Tmass car il n’y avais aucun exemple avec ce type de données.

16/09/12 15:41:53 WARN scheduler.TaskSetManager: Lost task 3.0 in stage 1.0 (TID 23, 10.0.0.10): java.lang.AbstractMethodError: cds.xmatch.spark.CrossMatch$2.call(Ljava/lang/Object;)Ljava/util/Iterator;
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$3$1.apply(JavaRDDLike.scala:142)
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$3$1.apply(JavaRDDLike.scala:142)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
        at org.apache.spark.scheduler.Task.run(Task.scala:85)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Exemples

16/09/12 15:19:17 WARN scheduler.TaskSetManager: Lost task 4.2 in stage 1.0 (TID 57, 10.0.0.10): java.lang.AbstractMethodError: cds.xmatch.spark.CrossMatch$2.call(Ljava/lang/Object;)Ljava/util/Iterator;

16/09/12 15:19:18 INFO scheduler.DAGScheduler: Job 0 failed: saveAsTextFile at CrossMatch.java:138, took 12.562289 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0 (TID 59, 10.0.0.10): ExecutorLostFailure (executor 9 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

A faire

  • Faire passer l’interface de hdfs par le proxy nginx
  • Configurer les service hdfs pour que le stockage soit éffectué sur une partition avec plus de place
  • Compiler le code java avec le jar de Spark-2.0 dans le classpath pour voir si ça fait des erreurs