IdentifiantMot de passe
Loading...
Mot de passe oublié ?Je m'inscris ! (gratuit)

Tester du code généré par le framework Spark

Première partie : la théorie
Image non disponible

Spark est plus puissant et plus simple à utiliser que MapReduce, mais comment tester du code Spark ? Voici notre vision !

Spark est un framework de calcul distribué créé à Berkeley en 2010. Il connaît une adoption impressionnante. Plusieurs raisons à cela : des performances de très loin supérieures à MapReduce, et un framework à la fois beaucoup plus simple d'utilisation et multilangage (Scala, Python, Java ou R).

Chez Xebia, nous aimons le code bien fait. En tant qu'artisan de la Data, nous avons donc cherché la façon qui nous paraissait la meilleure pour tester notre code Spark.

Nous avons donc décidé d'écrire une série d'articles centrée sur les tests des programmes Spark. Plusieurs objectifs : présenter les problèmes liés à l'écriture de tests pour Spark, les outils et frameworks, et enfin, les manques de ces outils.
Dans ce premier article, nous allons nous concentrer sur la théorie : ce qu'il faut tester, comment le tester et les spécificités apportées à nos tests sur Spark.

Dans cette série d'articles, nous ne présenterons pas d'exemples en Java ni en R. Nous nous concentrerons sur les API Scala et Python qui sont à notre avis à favoriser, mais aussi les plus utilisées par la communauté. Les notions globales introduites restent cependant les mêmes quel que soit le langage.

Pour vos suggestions et commentaires, un espace est créé sur le forum.
Commentez Donner une note à l´article (5)

Article lu   fois.

L'auteur

Liens sociaux

Viadeo Twitter Facebook Share on Google+   

I. Mais que doit-on tester dans son code Spark ?

I-A. Un code plus utilisable, lisible et plus maintenable

Image non disponible

« Tester, ça prend trop de temps », « Tester, ce n'est pas simple, je ne sais pas ce que je vais écrire dans mon code, donc pas facile à tester », voire même « tester ? ». Autant de réactions que l'on obtient trop fréquemment lorsque l'on évoque le sujet, quel que soit le domaine. Alors répondons tout de suite à ces arguments :

  • « Tester, ça prend trop de temps » : il a été déjà démontré à maintes reprises le gain de temps à tester son code. Tester, c'est identifier les erreurs au plus tôt. Et plus une erreur est corrigée tôt, moins il faudra de temps pour la corriger. On récupère donc aisément l'augmentation de temps passer à tester dans la réduction de temps passé à la correction de bogues.
  • « Tester, ce n'est pas simple, je ne sais pas comment je vais écrire mon code à l'avance » : tester, c'est aussi se forcer à réfléchir à une manière élégante d'écrire son code. Puisque dans un test, on se place en tant qu'appelant des différentes méthodes, on est en mesure de mieux mesurer ce qu'elle doit ou ne doit pas faire. C'est en effet plus compliqué, mais on obtient facilement quelque chose de plus utilisable.
  • « Tester ? » : … nous ne nous abaisserons pas à répondre à cette remarque pourtant assez fréquente, qui relève ici plutôt de la provocation.

À cela, nous voyons de nombreux avantages à nous concentrer sur une bonne écriture des tests. Cela permet entre autres de mieux comprendre et structurer son code, de le rendre plus lisible et donc mieux maintenable. Il permet de se focaliser sur le besoin métier. Enfin, si avec tout ceci, vous ne convainquez pas les sponsors de votre projet, parlez réduction du stock d'anomalies et gain de satisfaction du client.

I-B. La programmation fonctionnelle nous vient en aide

Depuis quelques années maintenant, nous avons vu l'essor des langages de programmation fonctionnelle. Java 8, intégrant les lambda, et les streams et Scala, en sont des exemples. Mais on peut aussi bien faire du fonctionnel dans de nombreux langages.

Dans l'univers où nous travaillons, la donnée est au centre de toutes les attentions. La programmation fonctionnelle va grandement nous venir en aide ici puisque nous pourrons exprimer une grande partie de nos traitements à travers des transformations de données : map, flatmap, reduce, etc. Pour ceux qui auraient raté les bases, un petit rappel :

Map/filter/reduce in a tweet:
map([Image non disponible, Image non disponible, Image non disponible], cook)
=> [Image non disponible, Image non disponible, Image non disponible]
filter([Image non disponible, Image non disponible, Image non disponible], isVegetarian)
=> [Image non disponible, Image non disponible] reduce([Image non disponible, Image non disponible], eat)
=> Image non disponible
— Steven Luscher (@steveluscher) June 10, 2016

Maintenant que vous vous rappelez ces méthodes, nous allons explorer des exemples types d'application de ces concepts dans Spark 2.0. Nous allons ici séparer les descriptions pour Scala et Python.

En Scala, nous allons naturellement appliquer ce concept :

 
Sélectionnez
1.
2.
3.
4.
5.
val result = dataFrame.filter(positiveBalance(_))

def positiveBalance(input: Row): Boolean = {
  input.getInt(input.fieldIndex("income")) - input.getInt(input.fieldIndex("outcome")) > 0
}

Pour réaliser cela en Python, nous devrons en général passer par le concept des UDF (User Defined Functions). Le problème avec une UDF Python est que le framework va constamment effectuer des tâches de sérialisation/dé-sérialisation entre Python et la JVM (Java Virtual Machine), dégradant ainsi les performances globales du programme. Attention donc à limiter l'utilisation d'UDF.

De plus, le fait que le code Python n'est pas fortement typé statiquement simplifie souvent l'écriture, mais signifie en contrepartie que certaines erreurs ne seront visibles que lors de l'exécution. Nous allons donc préférer extraire le code d'une façon légèrement différente :

 
Sélectionnez
1.
2.
3.
4.
result = only_positive_balance(input_data_frame)

def only_positive_balance(data_frame, income_col='income', outcome_col='outcome'):
  return data_frame.filter(data_frame[income_col] - data_frame[outcome_col] > 0)

Avec cette syntaxe, nous nous assurons (un peu plus) que l'objet passé dans le paramètre data_frame se rapproche de la structure d'une DataFrame Spark. Nous nous assurons également de ne pas appeler d'UDF et donc de conserver des performances correctes.

Dans les deux cas, l'important sera d'extraire une partie testable de notre code. Nous pourrons donc nous concentrer au mieux (de façon moins explicite pour Python malheureusement) sur le fait de tester des fonctions métier, et non pas des paradigmes internes de Spark. Ce que nous remarquerons ici, c'est le fait que les objets passés en entrées et sorties de ces fonctions sont fréquemment spécifiques à Spark. Les outils que nous utiliserons devront donc pouvoir gérer la manipulation d'objets de Spark, ce qui ne pose généralement pas de problème.

I-C. Des chaînes de transformations

La définition même du test unitaire est de tester un petit morceau de code pour s'assurer qu'il remplit bien son rôle. Pour le traitement de nos données, nous allons chaîner des transformations et des fonctions simples à travers nos concepts fonctionnels. Comme vu dans le paragraphe précédent, tous ces éléments sont facilement testables. Mais qu'en est-il de l'appel à la chaîne de transformations ? Comment s'assurer que les appels successifs aux bonnes méthodes sont réalisés, et dans le bon ordre, pour répondre au besoin métier ?

Nous préférons ici nous poser la question : est-ce qu'un test unitaire doit tester le pipeline de données ? De manière générale, nous allons inclure ces tests dans un test d'intégration. Le test d'intégration est plus global et est autorisé à prendre plus de temps en raison de ses liens avec des éléments externes. Plusieurs éléments peuvent être vérifiés ici :

  • Les transformations peuvent-elles être appelées sur le jeu de données ? En Spark, dans une chaîne de transformation, nombreuses sont les étapes qui vont changer le schéma du jeu de données. La validation de la présence des colonnes requises est une étape réalisée à l'exécution. Ce type de test peut donc convenir à vérifier la bonne configuration de la chaîne de transformation ;
  • Le schéma de sortie de la transformation est-il celui attendu ?
  • Les colonnes ajoutées par le pipeline contiennent-elles les bonnes valeurs ?

I-D. Mais… et les modèles probabilistes ?

À ce stade, nous voyons arriver des Data Scientists avec un argument qui leur est propre. « Un test unitaire doit être déterministe ». Ce que je réalise est un modèle probabiliste et donc par essence il est non-déterministe. Dès lors, il faut vraiment chercher à comprendre ce que l'on souhaite tester.

Nous n'allons pas chercher à tester le fonctionnement de l'algorithme utilisé (à moins que vous implémentiez votre propre algorithme). Cela est (normalement, et on l'espère) déjà réalisé par l'auteur de l'algorithme. Ce que nous pouvons tester, c'est la performance du modèle avec les paramètres que l'on fournit, le tout sur un jeu de données bien défini. On cherchera donc par exemple à fixer un seuil minimal par rapport à une métrique bien identifiée.

L'enjeu le plus important ici sera la définition du jeu de données en entrée. Plusieurs méthodes existent pour définir ce jeu d'entrée :

  • Effectuer un échantillonnage des données de productions. Cette méthode nécessite de fréquemment mettre à jour le jeu de tests pour refléter tout changement de comportement des données. Elle peut également nécessiter une phase d'anonymisation ;
  • Générer des données aléatoirement sur la base d'un schéma donné. Cette méthode est plus simple mais répondra moins fréquemment au besoin, car il est difficile de refléter le contenu des vraies données.

II. Nos premiers points de blocages

Nous avons vu les bases de la théorie. Ceci était censé représenter un monde idéal où tout se passe bien et où le test est donc facile. Dans la réalité, nous nous sommes heurtés à différents problèmes auxquels nous allons tenter de répondre, ou au moins, énoncer les pièges à éviter.

II-A. La complexité d'un système distribué

Dans le monde de la Data, nous avons fréquemment affaire à des systèmes distribués et complexes. Par exemple, les données d'entrée sont dans un fichier sur disque, ou bien dans une base de données, ou encore dans un cluster HDFS (Hadoop Distributed File System ). Dans nos tests, nous allons essayer de nous abstraire au maximum de ces contraintes. Cela passe avant tout par un bon découpage de son code. Une fois cela fait, nous aurons deux solutions, en fonction du cas dans lequel on se trouve.

Le cas le plus simple est celui du test unitaire. Ici, on ne voudra pas accéder à un élément externe. Nous allons donc mocker (créer des objets factices) tout ce qu'il est nécessaire de mocker, c'est-à-dire utiliser une fausse implémentation de certains composants. Assurez-vous de bien tester le comportement et non l'implémentation. Demandez-vous si votre test doit échouer selon telle ou telle modification de l'implémentation. Ici, nous pourrons utiliser des bibliothèques telles que ScalaMock ou Mockito en Scala, ou bien les mocks de unittest en Python.

En ce qui concerne le test d'intégration, on tentera de limiter les mocks. Si possible, on souhaitera accéder à un environnement d'intégration. Nous préférerons en général l'utilisation d'un pseudo environnement local. Le plus connu autour de l'écosystème Hadoop est Hadoop minicluster. Il existe également Hadoop-unit, un wrapper de minicluster, qui permet de simplifier l'utilisation d'Hadoop minicluster. Il est toutefois à noter qu'il s'agit de bibliothèques Java. Nous ne pourrons donc les utiliser nativement que pour du test d'intégration en Java ou Scala. Heureusement, hadoop-unit fourni également un outil en ligne de commande qui permet de démarrer/arrêter le minicluster en ligne de commande. Ceci, inclus dans un build Jenkins, permettra de l'utiliser dans un test d'intégration en Python.

Dans la suite de cet article, nous allons nous focaliser sur Spark spécifiquement. Nous ne tiendrons donc pas compte des éléments externes tels que Impala par exemple.

II-B. Définir le bon jeu de tests

Pour définir un test, nous allons avoir besoin d'un jeu de données en entrée. Quelle granularité choisir ? Où mettre le jeu de données à disposition ? Un jeu de données trop gros rendrait la lecture des tests illisible. Un jeu de données trop petit ne comprendrait pas les cas minimaux que l'on souhaite tester.

Ici, nous allons revenir aux bases. Nous avons précédemment vu que nous pourrions profiter du paradigme fonctionnel. Eh bien, c'est l'occasion ou jamais ! En découpant correctement son code, on peut obtenir des transformations de données fonctionnelles. Nous allons donc tester en priorité les fonctions que nous avons définies. Ces fonctions devraient normalement être relativement simples et représenter des besoins métiers particuliers. Si on reprend l'exemple du filtre que nous avons cité précédemment, nous filtrions simplement les lignes ayant un revenu positif. Il est alors simple de définir les données du jeu d'entrée. En effet, il se limite à quelques cas simples (plus de gains que de dépenses, moins de gains que de dépenses, autant de gains que de dépenses, et les cas avec des valeurs manquantes).

Maintenant, un élément nous paraît important à préciser ici : CHAQUE cas simple identifié doit faire l'objet de son propre test. Nous ne devons en aucun cas tester plusieurs cas dans un unique test dans le but de simplifier l'analyse de problèmes. Maintenant que nous avons vu cela, la localisation des données devient triviale. Nous allons définir clairement dans le code de nos tests ces jeux de données.

Partant de ce concept, nous avons pu rédiger la majeure partie de nos tests. Seul le cas de la validation d'un modèle d'apprentissage automatique (Machine Learning) est difficilement applicable. Dans ce cas précis, le jeu de données est de taille importante. Nous allons donc, dans ce cas unique, déplacer le jeu de données dans un fichier externe. Attention toutefois au format de ce fichier. Nous préférerons en général un format de fichier non binaire afin de simplifier la lecture des modifications dans l'outil de gestion de configuration.

II-C. Le temps d'exécution des tests

Xebia a défini il y a quelques années de bonnes pratiques de développement à travers les Xebia Essentials. L'une d'elles est « Tests should be fast, reliable and independant ». Après avoir implémenté nos premiers tests Spark, nous nous sommes heurtés à un problème majeur : cette bonne pratique n'est pas respectée du point de vue du temps d'exécution. Même en local, le démarrage du contexte d'exécution Spark prend quelques secondes, voire dizaines de secondes. Ce surcoût, qui semble petit, est en fait très pénalisant dans une optique de développement « test first ». Heureusement, il n'intervient qu'une seule et unique fois au démarrage de la suite de tests.

À ce jour, nous n'avons pas identifié de solution et ce problème reste un problème majeur.

II-D. Le démarrage du contexte Spark

Lors de nos premiers développements Spark, alors en version 1.X, nous nous sommes également heurtés à des problèmes liés au lancement du HiveContext. En effet, si celui-ci est démarré en local, il ne peut pas l'être une seconde fois. Il est important que ce contexte soit unique dans l'application. Nous vous conseillons donc de créer un singleton de ce contexte afin de vous assurer de toujours appeler le même contexte.

Depuis Spark 2, ce problème est résolu par l'arrivée de la SparkSession. Cet objet est construit à l'aide d'un builder par une méthode getOrCreate qui s'occupe de gérer une instance unique. Nous vous conseillons tout de même de sortir l'utilisation de ce builder dans une classe externe à vos tests afin de ne pas dupliquer ce code.

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
object SharedSparkSession {
  val sparkSession = SparkSession
    .builder()
    .appName("SparkSession for unit tests")
    .master("local[*]")
    .getOrCreate()
}


class SimpleSpec extends FlatSpec {
  "Some Test" should "tests something" in {
    // Given
    val rdd: RDD[SimpleCaseClass] = ???
    val dataFrame = SharedSparkSession.sparkSession.createDataFrame(rdd)
    ???
  }
}
 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
spark_context = SparkContext()
spark_session = SparkSession(spark_context).builder.master("local[*]").appName("Unit tests").getOrCreate()

class TestSimpleOperation(unittest2.TestCase):

    def test_with_balance(self):
        # Given
        data_frame = spark_session.createDataFrame([
            Row(some_column=42)
        ])

        ...

Une solution plutôt élégante en Scala consiste à placer cet objet partagé dans un trait.

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
trait SparkSessionProvider {
  val sparkSession = SparkSession
    .builder()
    .appName("SparkSession for unit tests")
    .master("local[*]")
    .getOrCreate()
}


class SimpleSpec extends FlatSpec with SparkSessionProvider {
  "Some Test" should "tests something" in {
    // Given
    val rdd: RDD[SimpleCaseClass] = ???
    val dataFrame = sparkSession.createDataFrame(rdd)
    ???
  }
}

II-E. Le streaming et ses besoins spécifiques

Un dernier souci que nous avons rencontré concerne la diffusion en continu (streaming). Spark permet de réaliser des traitements à faible latence à travers Spark Streaming. Il s'agit en fait de micro batchs exécutés à intervalles réguliers. Le principe d'un tel job est qu'il n'est pas censé se terminer. Comment alors le tester ?

Lors de nos premiers tests, nous étions arrivés à la même conclusion que pour les chaînes de traitement. Nous ne testions pas que le bon fonctionnement du streaming (ceci est censé être testé par Spark), mais plutôt que les traitements effectués lors de chaque micro batchs soient corrects. Nous extrayons donc une fonction correspondant au traitement, que nous allons fournir à Spark Streaming et que l'on pourra tester comme une fonction classique.

Cependant, nous sommes arrivés à un moment où nous avons dû utiliser la fonction updateStateByKey. Cette fonction impose l'utilisation du streaming. Pour effectuer des tests sur son retour, nous avons dû nous intéresser à d'autres solutions comme sscheck. Cette librairie se base sur le Property Based Testing afin de tester des jobs en streaming. Nous entrerons plus en détail sur des exemples d'implémentations dans un prochain article.

III. Conclusion

Dans cet article, nous avons exploré différents aspects du test dans Spark. Nous avons défini ce qu'il convenait de faire, les bonnes pratiques. Pour résumer :

  • Spark fait usage de la programmation fonctionnelle. Utilisez-la à votre avantage pour tester correctement les fonctions que vous fournirez à Spark ;
  • Réfléchissez à ce que vous voulez tester. Il ne s'agit pas de tester le framework mais bien vos comportements fonctionnels. Il est très simple de tomber dans ce piège, spécialement dans le cadre de Machine Learning et de streaming ;
  • Nous n'avons actuellement pas identifié de solution au temps de démarrage du contexte Spark. Il vous sera cependant nécessaire de le démarrer une seule et unique fois pour l'intégralité de vos tests.

Plus question donc de refuser de tester son code.

L'idée principale à retirer de cet article est que la plupart des points décrits ici ne sont pas spécifiques à Spark. Vous souhaitez sûrement une preuve que tout ceci est bien applicable ? Il s'agira de notre prochain article dans lequel nous traiterons des différents outils que nous avons à disposition et de leur utilisation. Nous aborderons aussi les manques que nous avons identifiés dans ces outils.

IV. Notes de la rédaction Developpez.com

Nous remercions Xebia pour l'autorisation à publier ce tutoriel sur Developpez.com.

Nos remerciements également à Winjerome pour la mise au gabarit et Maxy35 pour la relecture orthographique.

Vous avez aimé ce tutoriel ? Alors partagez-le en cliquant sur les boutons suivants : Viadeo Twitter Facebook Share on Google+   

Copyright © 2017 Sylvain Lequeux. Aucune reproduction, même partielle, ne peut être faite de ce site ni de l'ensemble de son contenu : textes, documents, images, etc. sans l'autorisation expresse de l'auteur. Sinon vous encourez selon la loi jusqu'à trois ans de prison et jusqu'à 300 000 € de dommages et intérêts.