Résolution des problèmes de connectivité Spark Utilisation du CLI hdfs
La commande pour démarer un worker est la suivante (depuis un container actif afin d’avoir directement le log) :
/usr/local/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077 --properties-file /spark-defaults.conf
Lors du lancement d’un worker en tant que service il n’arrive pas à se connecter au master :
WARN Worker: Failed to connect to master spark-master:7077
Par contre cela fonctionne sans problème si on lance cette commande depuis le même container que le master (spark arrive donc à résoudre le nom spark-master ainsi qu’à accéder au master par l’IP virtuelle)
Premièrement s’assurer que les noeuds sont connectés en mode swarm et qu’un réseau spark-net soit créé en overlay :
docker network create -d overlay spark-net
J’ai finalement utilisé une autre image Docker
Il est donc possible de démarer un Master avec :
docker service create --name spark-master --network spark-net gettyimages/spark /usr/spark-2.0.0/bin/spark-class org.apache.spark.deploy.master.Master
Et de démarer un Worker sur chaque noeud avec :
docker service create --replicas=20 --name spark-worker --network spark-net gettyimages/spark /usr/spark-2.0.0/bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
l’option replicas correspond au nombre de workers que l’on veut créer. Il est possible de la remplacer par --mode global
afin de créer un worker par noeud.
Une fois HDFS fonctionnel on pourrait voir comment cette image à été conçue afin de pouvoir en assurer la maintenance nous même en cas d’abandon du projet par ses créateurs
Le 8 septembre j’avais déjà tenté rapidement de mettre HDFS en place mais il y avais un problème : la configuration pointait au mauvais endroit.
docker service create --name hdfs-namenode -e CORE_CONF_fs_defaultFS=hdfs://hdfs-namenode:8020 -e CLUSTER_NAME=test -e CORE_CONF_hadoop_http_staticuser_user=root -e HDFS_CONF_dfs_webhdfs_enabled=true -e HDFS_CONF_dfs_permissions_enabled=false --network spark-net -p 50070:50070 bde2020/hadoop-namenode
docker service create --name hdfs-datanode --replicas 3 -e CORE_CONF_fs_defaultFS=hdfs://hdfs-namenode:8020 -e CORE_CONF_hadoop_http_staticuser_user=root -e HDFS_CONF_dfs_webhdfs_enabled=true -e HDFS_CONF_dfs_permissions_enabled=false --network spark-net bde2020/hadoop-datanode
Un programme de test en Java m’a été fournit et consiste en un crossmatch de deux très grandes listes d’étoiles.
Un problème se pose directement, comment accéder a HDFS sachant que les noeuds sont connecté entre eux par un réseau privé. Plusieurs solutions sont possible :
La deuxieme solution me semble plus viable sur le long terme car une fois cette passerelle mise en place la suite sera beaucoup plus simple.
On peut déjà par commencer à tester simplement si on arrive à créer un dossier sur htfs. On crée un service avec hadoop :
docker service create --name hdfs-shell -e CORE_CONF_fs_defaultFS=hdfs://hdfs-namenode:8020 -e CORE_CONF_hadoop_http_staticuser_user=root -e HDFS_CONF_dfs_webhdfs_enabled=true -e HDFS_CONF_dfs_permissions_enabled=false --network spark-net bde2020/hadoop-base sleep 3000
le sleep 3000 permet d’avoir le temps de se connecter en ssh sur la machine qui a démaré le service. Il suffit ensuite de démarer un bash dessus :
docker exec -it hdfs-shell.1.4yo0kelhiewklle02476cmqhu /bin/bash
Le nom du container sera surement différent car il est généré en partie aléatoirement. Une fois dans le bash :
root@f97fb790dc35:/# hdfs dfs -mkdir /test
16/09/12 12:49:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
root@f97fb790dc35:/# hdfs dfs -ls /
16/09/12 12:52:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 1 items
drwxr-xr-x - root supergroup 0 2016-09-12 12:49 /test
Lors de la création d’un service il est possible de forcer un noeud à le prendre en charge. Dans notre cas cela est utile pour lancer un container connecté au réseau spark-net et y accéder directement depuis notre machine.
docker service create --name hdfs-shell -e CORE_CONF_fs_defaultFS=hdfs://hdfs-namenode:8020 -e CORE_CONF_hadoop_http_staticuser_user=root -e HDFS_CONF_dfs_webhdfs_enabled=true -e HDFS_CONF_dfs_permissions_enabled=false --network spark-net --constraint node.id==f308skpi0pxi41dujetjsot64 --mount type=bind,src=/data/spark-test,dst=/data,readonly bde2020/hadoop-base sleep 3000
docker service create --name spark-shell --constraint node.id==f308skpi0pxi41dujetjsot64 --mount type=bind,src=/data/spark-test,dst=/data,readonly --network spark-net gettyimages/spark sleep 3000
docker exec -it spark-shell.1.4m8vc58feiy190q3o8em73pk6 /bin/bash
root@a3aef1042121:/usr/spark-2.0.0# hdfs dfs -fs hdfs-namenode:8020 -ls /
16/09/12 14:08:51 WARN fs.FileSystem: "hdfs-namenode:8020" is a deprecated filesystem name. Use "hdfs://hdfs-namenode:8020/" instead.
Found 2 items
-rw-r--r-- 1 root supergroup 1539629271 2016-09-12 13:40 /sdss9.l2.i0.csv
-rw-r--r-- 1 root supergroup 284655961 2016-09-12 13:42 /tmass.l2.i0.csv
Il ne reste maintenant plus qu’a comprendre comment fonctionne le programme en Java et comment le lancer
Le script fournit ne permettait pas de spécifier le namenode, j’ai donc ajouté une variable hdfs_namenode=hdfs://hdfs-namenode:8020/
Il reste un problème par rapport à Scala, pour la suite j’utiliserais l’autre programme en Java uniquement
Signature invalide lors du lancement du JAR
spark-submit --master spark://spark-master:7077 \
--class cds.xmatch.spark.PreprocessData \
--name CDS_XMatch \
--executor-memory=13g \
--conf spark.local.dir=/tmp/spark \
--conf spark.storage.memoryFraction=0 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerialize \
--driver-memory 2G --driver-java-options "-ea" \
cds.xmatch.spark.jar hdfs://hdfs-namenode:8020/data/xmatch/sdss9.l2.i0.csv \
RAdeg 1 2 4096 10 hdfs://hdfs-namenode:8020/data/xmatch/sdss9.rdd
spark-submit --master spark://spark-master:7077 \
--class cds.xmatch.spark.PreprocessData \
--name CDS_XMatch \
--executor-memory=13g \
--conf spark.local.dir=/tmp/spark \
--conf spark.storage.memoryFraction=0 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerialize \
--driver-memory 2G --driver-java-options "-ea" \
cds.xmatch.spark.jar hdfs://hdfs-namenode:8020/data/xmatch/tmass.l2.i0.csv \
RAJ2000 1 2 4096 10 hdfs://hdfs-namenode:8020/data/xmatch/tmass.rdd
spark-submit --master spark://spark-master:7077 \
--class cds.xmatch.spark.CrossMatch \
--name CDS_XMatch \
--executor-memory=13g \
--conf spark.local.dir=/tmp/spark \
--conf spark.storage.memoryFraction=0 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerialize \
--driver-memory 2G --driver-java-options "-ea" \
cds.xmatch.spark.jar hdfs://hdfs-namenode:8020/data/xmatch/sdss9.rdd \
hdfs://hdfs-namenode:8020/data/xmatch/tmass.rdd 4096 5 \
hdfs://hdfs-namenode:8020/data/xmatch/xmatchRes.txt
Le preprocess à l’air de fonctionner correctement, par contre le crossmatch se termine rapidement et renvoie plusieurs erreur. Il est possible que j’ai fais une erreur pour le preprocess du fichier Tmass car il n’y avais aucun exemple avec ce type de données.
16/09/12 15:41:53 WARN scheduler.TaskSetManager: Lost task 3.0 in stage 1.0 (TID 23, 10.0.0.10): java.lang.AbstractMethodError: cds.xmatch.spark.CrossMatch$2.call(Ljava/lang/Object;)Ljava/util/Iterator;
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$3$1.apply(JavaRDDLike.scala:142)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$3$1.apply(JavaRDDLike.scala:142)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/09/12 15:19:17 WARN scheduler.TaskSetManager: Lost task 4.2 in stage 1.0 (TID 57, 10.0.0.10): java.lang.AbstractMethodError: cds.xmatch.spark.CrossMatch$2.call(Ljava/lang/Object;)Ljava/util/Iterator;
16/09/12 15:19:18 INFO scheduler.DAGScheduler: Job 0 failed: saveAsTextFile at CrossMatch.java:138, took 12.562289 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0 (TID 59, 10.0.0.10): ExecutorLostFailure (executor 9 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.