Tags:
create new tag
, view all tags

Stage de Noémie Wali - UTBM - [7/09/15 au 19/02/16]

Important : cette page est réservée au suivi du stage, merci de ne pas la modifier

Informations générales pour les stagiaires

Pour toute information concernant ce stage : contacter André ou François-Xavier

Sujet

Documents de travail

  • ...

Stage (septembre - février 2016)

Septembre

  • 7:
    • accueil
    • recherches Big Data, Hadoop, MapReduce et autres technologies
    • début de l'installation et configuration de Hadoop avec la distribution Cloudera
  • 8:
    • problèmes rencontrés avec l'installation de la distribution Cloudera CDH 5
    • passage vers l'installation d'une autre distribution : Hortonworks Sandbox avec la VM
  • 9:
    • lectures sur le fonctionnement de HDP, Ambari, du système de fichiers HDFS, de Hive, Pig
    • prise en main de Hortonworks Data Platform avec Ambari
  • 10:
    • suite du tutoriel sur la prise en main de HDP :
      • chargement de données vers le système HDFS
      • manipulation de données avec Hive (création de tables, chargement de données vers la table, exécution de quelques requêtes SQL...)
  • 11:
    • séminaire : "The evolution of massive galaxies in different environments"
    • manipulation de données avec Hive : essais avec un extrait de données d'un catalogue de l'observatoire
      • problèmes rencontrés dans le remplissage de la table avec les données du fichier : solution trouvée avec le remplacement du délimitateur de champs initial ';' par ','
      • autre problème dans la requête d'affichage des données
  • 14:
    • résolution de l'affichage des données avec Hive
    • manipulation de données avec Apache Pig : chargement de tables de Hive vers Pig, utilisation des scripts en Pig Latin
  • 15:
    • manipulation de données avec Apache Spark
      • introduction à Spark
      • utilisation de commandes SQL depuis le terminal avec hiveContext.sql
      • création de RDD - Resilient Distributed Dataset
      • RDD transformations and actions
      • création et chargement de tables Hive depuis Spark
  • 16:
    • quelques autres exemples de manipulations de données avec Apache Spark
      • utilisation d'expressions en Python
      • introduction et utilisation du langage Scala associé à Spark
  • 17:
    • manipulation de quelques outils avec Spark (IPython Notebook, SparkSQL...)
  • 21:
    • Apache Spark avec Java
    • installation de Maven
    • création de fichiers test à partir de Vizier
  • 22:
    • installation d'Apache Spark et de Scala sur la machine hôte
      • premier essai depuis le terminal avec quelques problèmes rencontrés (installation de Scala réussie mais sans succès pour Spark)
      • deuxième essai directement depuis le site d'Apache Spark réussi (version de Spark pour Hadoop 2.6)
    • discussion avec André et François-Xavier autour de Spark et Hadoop
    • début du déploiement de Spark vers un cluster (installation de Spark en mode standalone sur chaque noeud du cluster)
  • 23-25:
    • déploiement de Spark : choix du mode Spark Standalone Cluster
      • un master et deux workers : les workers sont connectés au master
      • configuration des PCs pour permettre un accès SSH sans mot de passe : le master peut contrôler les workers
        • lancement et arrêt des workers depuis le master grâce aux scripts de Spark : start-slaves et stop-slaves ; les adresses IP ou noms des workers sont renseignés dans le fichier conf/slaves
      • lancement d'applications : elles apparaissent dans le cluster et sont bien exécutées par les workers
        • les pages web du master et des workers (localhost:8080 ou localhost:8081) affichent les applications en train d'être exécutées et celles qui sont terminées
  • 28:
    • configuration d'un troisième worker sur la machine maître : il faut ajouter localhost dans le fichier conf/slaves pour faire démarrer un worker sur le pc mais également penser à configurer l'accès SSH vers la machine elle-même
    • comparaison des temps d'exécution d'une application - exemple avec un programme qui calcule pi : le temps augmente significativement lorsque le programme est lancé dans le cluster (2 machines ou plus) ce qui est dû à la taille du fichier jar généré avec Eclipse (toutes les librairies étaient incluses dans le jar) ; temps de transfert du fichier vers les machines du cluster réduit en plaçant les librairies dans un fichier séparé
  • 29-30:
    • recherches et installation de Hadoop : version 2.6 compatible avec la version de Spark installée
    • but recherché : travailler avec Spark et Hadoop en même temps, utiliser Hadoop pour stocker les fichiers dans le cluster grâce au système de fichiers HDFS et lancer les applications avec Spark

Octobre

  • 01:
    • installation de Hadoop sur un simple noeud: téléchargement du paquet depuis le site d'Apache et modification des fichiers de configuration en vue notamment de l'utilisation du système de fichier hdfs
  • 02-06:
    • extension vers un cluster Hadoop : installation de Hadoop sur une 2e machine: on obtient un pc avec un namenode (maître) et un datanode (esclave) et un 2e pc avec seulement un datanode
  • 07-09:
    • chargement de fichiers vers hdfs
    • observations du fonctionnement de hdfs et de ses paramètres
    • on essaie maintenant d'adapter le fonctionnement du système hdfs à nos fichiers : lorsque l'on charge un fichier vers hdfs, en fonction de la taille du fichier, celui-ci est découpé en plusieurs blocs qui sont envoyés aux datanodes en fonction du nombre de réplications ; ce que l'on souhaite c'est que les éléments du fichier ayant la même clé se retrouvent sur la même machine
      • 2 stratégies à voir : politique de placement des blocs et politique de découpage des fichiers
      • concernant le placement des blocs : l'interface BlockPlacementPolicy peut être implémentée d'une autre manière que la classe BlockPlacementPolicyDefault utilisée par défaut pour le placement des blocs ; il faut ensuite préciser la nouvelle classe utilisée dans le fichier de configuration hdfs-site.xml
  • 12-13:
    • recherches sur la classe BlockPlacementPolicyDefault, lecture du code : dans les fichiers source de HDFS téléchargés lors de l'installation de Hadoop, il y a l'interface BlockPlacementPolicy qui est implémentée par la classe BlockPlacementPolicyDefault. J'ai cherché une autre implémentation de l'interface, autre que celle par défaut pour voir comment modifier la classe par défaut afin d'avoir notre propre système de placement des blocs mais les exemples trouvés datent un peu et ne sont pas vraiment comparables aux fichiers de Hadoop 2.6.
    • recherches du côté du découpage des fichiers : pas beaucoup de résultats...je n'ai pas trouvé de moyen de modifier le découpage par défaut des fichiers (à noter: la classe FileInputFormat concerne les blocs d'entrée pour les jobs MapReduce - par défaut l'application MapReduce est exécutée sur chaque bloc mais on peut demander à ce qu'elle soit exécutée sur de plus petites entités, dans ce cas le bloc (qui est une partie du fichier initial) est découpé en plusieurs parties.
  • 14-15:
    • installation de Hadoop sur une 3e machine avec quelques tests de chargement de fichiers
    • j'ai cherché des exemples de jointure de fichiers dans spark écrits en Java pour comprendre le fonctionnement des jointures et l'écriture d'un code Java avec spark
  • 16:
    • on laisse de côté pour l'instant le découpage des fichiers et le placement des blocs dans hdfs
    • problème de connexion dans spark entre le master et un des workers (130.79.128.183) : le pc est passé en 130.79.128.184 en wifi ce qui a empêché le master d'atteindre la machine (même en changeant l'adresse dans le fichier conf/slaves), on a finalement forcé la machine à se remettre en 183 et ça a résolu le problème
  • 19-23:
    • nouveau problème de connexion mais avec hdfs : la même machine n'est plus connectée au namenode de hdfs (l'installation de hadoop avait été faite quand le pc était passé à 184...?), j'ai désinstallé et réinstallé hadoop sur la machine sans succès..
      • problème résolu en observant les processus actifs sur la machine (ps -edf ou ps -edf | grep hdfs), hdfs tournait déjà sur le pc, il a suffi de tuer le processus et on a pu redémarrer un datanode connecté au namenode
    • recherches du côté des RDDs dans spark : une alternative à hdfs ? il y a en effet également un partitionnement et un placement des RDDs dans les différentes machines
    • écriture d'un premier code en java - jointure de fichiers :
      • chargement de fichiers hdfs dans des RDDs
      • affichage des partitions et du nombre d'enregistrements des RDDs
      • transformation des JavaRDD en JavaPairRDD grâce à une méthode permettant de séparer les clés des valeurs
      • 2 types de jointure : join et cogroup
        • join: le résultat retourne un RDD contenant les éléments dont la clé est commune aux deux fichiers. Ne retourne donc pas les clés qui ne se trouvent que dans un seul RDD. Performs a hash join across the cluster.
        • cogroup: retourne un RDD contenant, pour chaque clé des deux RDDs, la liste des valeurs du RDD 1 et du RDD 2.
      • enregistrement des RDDs vers hdfs
    • quelques problèmes rencontrés dans le programme:
      • java.lang.ClassNotFoundException: lors de l'exécution du programme, Eclipse ne retrouve pas la classe JoinCsv$1 (le problème vient peut-être du moment où l'on sort de eclipse - par ex sur les workers qui n'ont pas accès aux classes...)
        • une solution trouvée (à ajouter dans le programme) - indiquer l'emplacement du jar au programme :
          • String[] jars = {"/home/hduser/workspace/JoinCsv.jar"};
          • SparkConf sparkConf = new SparkConf().setAppName("JoinCsv").setMaster("spark://cds-stage-mv2:7077").setJars(jars);
      • test avec un fichier plus grand (4 000 000 de lignes): temps anormalement long, plus de 2 jours sans finir ! bloque au niveau des enregistrements vers hdfs (saveAsTextFile) après les opérations de join et cogroup - à voir
        • le fichier hdfs résultant de la méthode join a 1947 blocs (après avoir arrêté l'application de force) !
  • 26-27:
    • remarque sur la méthode join : le résultat ne retourne que des paires associées à une clé, au final on n'a pas toutes les valeurs associées à une même clé sur une même ligne (pb de répétition)
    • exemple de résultat obtenu :
      • (52,(kzajhghazghiazhbgiaoibgibaklnfgnakngiapngiagagbnaigfjgiozhgyganqo_1,kzajhghazghiazhbgiaoibgibaklnfgnakngiapngiagagbnaigfjgiozhgyganqo_37))
      • (52,(kzajhghazghiazhbgiaoibgibaklnfgnakngiapngiagagbnaigfjgiozhgyganqo_1,kzajhghazghiazhbgiaoibgibaklnfgnakngiapngiagagbnaigfjgiozhgyganqo_43))
    • le problème du programme vient peut-être de là, l'exemple avec la clé 52 a été fait avec un petit fichier, si on en prend un plus important avec 4000000 de lignes, le résultat peut vite devenir très volumineux
    • en ne gardant que la méthode cogroup (sans join) le programme met 10min à s'exécuter : le problème doit venir de la méthode join
    • en modifiant le code et en changeant les méthodes utilisées, on obtient le message suivant : java.lang.OutOfMemoryError: Java heap space
      • pas de changement même en ajoutant l'option -Xmx1024m (ou plus) pour allouer plus de mémoire à la JVM
      • quelques autres tests sans succès..
  • 28-29:
    • problème de mémoire (java heap space) résolu :
      • lancement de l'application depuis le terminal avec la commande spark-submit ~/workspace/JoinCsv.jar
      • sans changer la taille de Java heap space (par défaut à 1g)
      • mais avec Memory per Node à 2.0GB (mémoire utilisée par noeud à 1GB jusque là sur les 14.6 disponibles)
        • la mémoire utilisée peut être augmentée dans le code par : SparkConf sparkConf = new SparkConf().setAppName("JoinCsv")....set("spark.executor.memory", "8g");
    • il y a encore des pbs de connexion avec la machine cds-stage-ms1 (130.79.128.183)
      • exemple avec l'application précédente :
        • java.io.IOException: Bad connect ack with firstBadLink as 130.79.128.184:50010
        • le résultat final (fichier hdfs) n'est sauvegardé que sur 2 pcs (mv2 et mv1) alors que la réplication est à 3 : pb avec la machine ms1
      • il y a toujours le pb de l'adresse ip (183/184)
      • problème parfois résolu en redémarrant hdfs
  • 30:
    • tests avec différentes valeurs pour la memory per node et pour la taille du heap space - comparaison des temps d'exécution
    • avec le passage en Ethernet des 3 machines, le temps d'exécution du code de jointure est passé de ~5/10min à ~23s
    • finalement on garde la configuration suivante (~23s) :
      • pour 3 machines : 2G/noeud suffisent ; 1G pour le driver memory (ou heap space)
      • pour 2 machines : 3G/noeud suffisent ; 1G pour le driver memory

Novembre

  • 02-06:
    • recherches et travail sur le partitionnement des RDDs
    • méthode de partitionnement : partitionBy(Partitioner partitioner)
      • 2 classes existantes implémentant la classe Partitioner : HashPartitioner et RangePartitioner
      • on peut implémenter notre propre méthode de partitionnement grâce à un custom partitioner (on implémente la classe Partitioner)
  • 09-13:
    • rédaction d'un rapport intermédiaire
  • 16-20:
    • configuration de la machine ms2 (.184) pour Spark et Hadoop avec l'accès SSH sans mot de passe
    • recherches sur la jointure à +/- 1 : une clé x est associée à la clé x, à la clé x+1 et à la clé x-1 (par la suite, on pourra élargir la jointure à +/- delta)
      • pour écrire notre propre méthode de jointure, on passe par un autre type de variable, les DataFrames, des collections de données distribuées organisées en colonnes, sur lesquelles on exécute une requête SQL
  • 23-27:
    • configuration de la machine ms4 (.186) pour Spark et Hadoop avec l'accès SSH sans mot de passe
    • le nouveau code de jointure à +/- 1 fonctionne sur de petits fichiers mais prend énormément de temps sur des fichiers plus volumineux (306Mo)
      • sur un fichier de taille moyenne (6Mo), l'application s'exécute en 2.2h la première fois, puis après quelques modifications des paramètres du cluster et du code, le temps d'exécution varie entre 11min et 50min...
    • le cluster est maintenant constitué de 5 machines mais l'application Spark n'en utilise que 4..

Décembre

  • 01-11:
    • discussion et écriture d'un code de cross-match en 2 temps:
      • programme 1: préparation des fichiers HDFS
        • les fichiers HDFS sont chargés en JavaRDD
        • ceux-ci sont ensuite transformés en JavaPairRDD <Long, String> : la clé est calculée à partir des positions RA et DEC, la fonction retourne le n° de pixel dans lequel se trouve la source
        • les données (JavaPairRDD) sont ensuite réparties sur les machines en suivant un HashPartitioning sur les clés
        • les données réparties sont enregistrées dans HDFS avec le maintien de la structure <clé, valeur>
      • programme 2: traitement des données
        • on reprend les fichiers enregistrés précédemment dans HDFS et on les charge dans des PairRDDs avec la structure <clé, valeur>
        • on applique la méthode join sur les 2 RDDs puis on filtre les éléments résultant de la jointure, dont la distance entre les 2 sources est inférieure à un seuil défini
        • le résultat final est enregistré dans HDFS
    • problèmes rencontrés avec la 1ère partie du code (prog 1) :
      • à la fin du programme pour garder la structure <clé, valeur> lors de l'enregistrement vers HDFS, la méthode saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass) est utilisée, les classes de la clé et de la valeur doivent implémenter la classe Writable de Hadoop, or nos données sont de type <Long, Row>, il faut donc les convertir en des classes sérialisables par hadoop
      • il existe la classe LongWritable de hadoop pour les Long mais pour le type Row il faut écrire notre propre classe en implémentant Writable
    • Remarques:
      • pour les fichiers stockés dans HDFS, ils n'ont pas besoin d'être répliqués sur toutes les machines lorsque l'on lance un code Spark sur ces fichiers (ce qui n'est pas le cas lorsqu'ils sont sauvegardés en local sur les machines)
      • le répertoire utilisé par défaut pour le paramètre spark.local.dir est /tmp (répertoire destiné en particulier pour le transfert des données sur le disque..) ; ce répertoire ayant une capacité de stockage limitée et non suffisante pour les grands fichiers, spark.local.dir a été changé en /data/spark/tmp : cela a résolu des pb rencontrés lors de l'exécution du code qui s'arrêtait avant la fin en raison de l'espace insuffisant.
    • Réunion:
      • discussion du code de XMatch
      • écriture d'un code générique
      • installer YARN ?
      • configurer la 185
    • Tests du CrossMatch sur les fichiers allwise et sdss9
  • 14-23:
    • recherches sur une alternative à la classe Row compatible avec saveAsHadoopFile car pour l'instant le code n'est pas séparé en 2 parties
    • Tests du CrossMatch avec les machines (187 - 186 - 184 - 183) : échec - plusieurs erreurs apparaissent dans la console et dans les logs des machines, de différents types (CorruptedException: invalid type code: A4 - ExecutorLostFailure - ArrayIndexOutOfBoundsException: -1509949247 - Failed to write core dumps...)
      • j'ai essayé plusieurs solutions mais ça ne changeait rien aux erreurs
    • finalement le code s'est exécuté sans problème au bout de 4h en enlevant la machine 183 du cluster spark (avec seulement 186-187-184), les fichiers utilisés étaient sdss7.csv (54GB) et 2mass.csv (58GB)
    • le même code s'est terminé au bout de 3h avec un 4e nœud (188)

Janvier

  • 04-15:
    • séparation du code de XMatch en 2 parties (Data et CrossMatch) grâce à l'écriture de la classe MyRow implémentant la classe Writable de hadoop et remplaçant le type Row
      • dans le code, Long est remplacé par LongWritable et Row par MyRow, ce qui permet d'utiliser la méthode saveAsHadoopFile et donc de garder la structure du RDD lors de l'enregistrement vers HDFS
      • erreur survenue avec l'instruction jsc.sequenceFile(path, LongWritable.class, MyRow.class) au début de la 2e partie du code (CrossMatch) pour récupérer les fichiers binaires enregistrés dans HDFS dans la 1e partie : lorsque l'on vérifie le contenu des fichiers récupérés, on remarque qu'une même ligne est répétée du début à la fin du fichier ; il y avait une note à ce sujet sur le site de spark : "Because Hadoop's RecordReader class re-uses the same Writable object for each record, directly caching the returned RDD or directly passing it to an aggregation or shuffle operation will create many references to the same object. If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first copy them using a map function" - c'est pourquoi il a fallu ajouter un mapToPair après l'appel de sequenceFile pour récupérer les valeurs avant d'enregistrer les données.
    • Tests sur les fichiers allwise (448 Mo) et sdss9 (463 Mo) pour déterminer quelle valeur de Nside donne le meilleur temps d'exécution : il s'agit de Nside = 2048
    • Observation: le HashPartitioning permet que les clés identiques d'un même RDD se retrouvent sur les mêmes nœuds. 2 RDDs ayant le même partitionnement (par ex: HashPartitioning avec le même nombre de partitions) sont des "co-partitioned RDDs" : chez l'un et chez l'autre les mêmes clés sont placées sur les mêmes n° de partitions, ce qui n'implique pas obligatoirement que les RDDs soient "co-located", c'est-à-dire que les mêmes n° de partitions soient sur les mêmes nœuds. Au final, des clés communes aux deux RDDs ne se retrouvent pas systématiquement sur les mêmes machines, ce qui implique des transferts de données lors de la jointure des deux RDDs.
      • on a posé la question sur StackOverflow et sur la mailing list de spark pour savoir s'il y a un moyen de forcer spark à placer les mêmes n° de partitions sur les mêmes noeuds pour 2 RDDs différents mais on n'a eu aucune réponse...
      • recherches sur la question des co-located RDDs: d'autres personnes se posent la même question mais jusque là pas de réponse claire pour savoir si c'est paramétrable ou pas
    • Comme les machines 183 et 185 provoquaient souvent des erreurs, elles ont été réinstallées et remises à jour mais lorsqu'on les a à nouveau incluses dans le cluster spark, les mêmes types d'erreurs s'affichaient à nouveau sur la 183 (UTFDataFormatException - IllegalArgumentException: requirement failed: File segment length cannot be negative (got -21208804) mais la 185 semble fonctionner
    • tests de performance avec le code Data2 / CrossMatch2 sur les machines 187, 186, 184, 185

Février

  • 19: fin du stage

Sauvegardes

  • à définir au cas par cas suivant le sujet du stage

Liens et informations diverses

Liens (Noémie)
Liens (Joris)

Tutoriel d'installation (Joris)

Versions testables

Testé sur ...

Documentation

  • Rapport de stage: pdf
  • Présentation: pdf

Document divers

Compte Rendu hebdomadaire

Informations/travaux divers

  • ...

Travail post stage éventuel

Liste des améliorations à envisager

Bugs connus

Topic attachments
I Attachment Action Size Date Who Comment
PDFpdf RAPPORT_A15_INFO_ST40_WALI_NOEMIE.pdf manage 4778.9 K 2016-03-31 - 14:02 AndreSchaaff  
PDFpdf ST40_SOUTENANCE_A15_INFO_WALI_NOEMIE.pdf manage 1135.2 K 2016-03-31 - 14:01 AndreSchaaff  
Unknown file formattsv asu.tsv manage 17.2 K 2015-09-15 - 07:28 UnknownUser Premier extrait d'une base de données de l'observatoire manipulé
Topic revision: r36 - 2016-03-31 - AndreSchaaff
 
This site is powered by the TWiki collaboration platform Powered by PerlCopyright © 2008-2024 by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
Ideas, requests, problems regarding TWiki? Send feedback