Remplacer les threads avec le module multiprocessing en Python


Les threads Python sont limités par le Global Interpreter Lock, et si ils permettent de s’affranchir des problèmes de concurrence d’accès IO, ils sont inefficaces pour profiter de nos merveilleux processeurs multi-coeurs. Les coroutines, une alternative élégante aux threads, ont la même limitation.

Heureusement Python vient avec le module multiprocessing, qui permet justement de créer plusieurs processus séparés, et les orchestrer pour qu’ils travaillent ensemble, et ainsi saturer la consommation de ressource de nos serveurs modernes si chers et si puissants.

Prenons un employé de banque que nous appellerons A, et un épagneul Breton, que nous appellerons Catherine.

Euh non…

Prenons plutôt une application qui poll des flux RSS comme Liferea. Liferea a pendant bien longtemps freezé l’intégralité de l’UI pendant la mise à jour de la liste d’articles (ben oui le temps de charger une page Web, la main loop attend). On peut éviter cela en utilisant des threads ou, dans notre, cas, de multiples processus.

Bon, il y a peu de chance que Lifera soit CPU bound, donc c’est vrai que dans ce cas les threads feraient aussi bien, mais c’est pour l’exemple, bande de tatillons.

Pour notre cas de figure, nous avons besoin:

  • d’un process qui demande aux autres de vérifier les derniers flux RSS (pour simuler une interaction utilisateur);
  • d’un process qui va faire la vérification des flux RSS sans bloquer les autres process;
  • de feedparser, une lib Python qui parse les flux RSS;
  • d’un process qui va lancer tout ça, récupérer le résultat et l’afficher.

Pour feedparser avec pip:

    pip install feedparser

Ça c’est fait.

Pour le reste, on se fait un petit fichier rssmania.py:

# -*- coding: utf-8 -*-
 
import time
from time import mktime
from datetime import datetime
from multiprocessing import Process, Queue, TimeoutError
 
import feedparser
 
# cette fonction va être utilisée comme worker
# elle va lancer un process qui tourne en boucle et vérifie de manière
# régulière si il y a des flux à mettre à jour
def mettre_a_jour_les_flux(queue_flux_a_mettre_a_jour, queue_de_mises_a_jour_des_flux):
 
    last_update = {}
 
    while True: # une bonne boucle infinie pour la main loop
        try:
            # on vérifie si il y a un message dans la queue pendant 0.1 secondes
            # si oui, on parse le flux (sinon, ça raise une TimeoutError)
            flux = queue_flux_a_mettre_a_jour.get(0.1)
 
            feed = feedparser.parse(flux)
            nouveaux_articles = []
            # pour chaque article, on vérifie si la date de parution est
            # antérieur au dernier check, et si oui, on le déclare
            # "nouvel article"
            for article in feed.entries:
                try:
                    dt = datetime.fromtimestamp(mktime(article.updated_parsed))
                    if dt > last_update[flux]:
                        nouveaux_articles.append(article.link)
                except KeyError:
                    nouveaux_articles.append(article.link)
 
            # on balance tous les nouveaux articles dans la queue
            if nouveaux_articles:
                queue_de_mises_a_jour_des_flux.put((feed.feed.title, nouveaux_articles))
 
            last_update[flux] = datetime.now()
 
        # en cas de time out on repart sur un tour de boucle
        # si l'utilisateur fait CTRL+C sur le worker principal, il sera
        # broadcasté ici, donc on le catch et on exit proprement
        except TimeoutError:
            pass
        except KeyboardInterrupt:
            sys.exit(0)
 
 
# worker très basique qui demande la mise à jour de tous les flux
# c'est bourrin, mais c'est pour l'exemple on vous dit !
def demander_la_mise_a_jour_des_flux(queue_de_flux_a_mettre_a_jour, flux_rss):
    """
        Demande la mise à jour des flux toutes les 5 minutes
    """
 
    # pareil, petite boucle infinie, temporisation et gestion du CTRL + C
    # en gros on ne fait que remplir la queue toutes les 5 minutes
    # avec des urls
    while True:
 
        try:
            for flux in flux_rss:
                queue_de_flux_a_mettre_a_jour.put(flux)
 
            time.sleep(300)
 
        except KeyboardInterrupt:
            sys.exit(0)
 
 
# très important ce if, sinon sous windows le module sera importé plusieurs
# fois et lancera ce bloc plusieurs fois
if __name__ == '__main__':
 
    # les flux à mettre à jour, RAS
    flux_rss = (
        'http://sametmax.com/feed/',
        "http://sebsauvage.net/links/index.php?do=rss",
        "http://charlesleifer.com/blog/rss/",
        "http://xkcd.com/rss.xml"
    )
 
    # les queues. Ces objets sont comme des listes partageables entre
    # les workers, sur lesquelles on pourrait faire uniquement insert(0, elem)
    # (ici put(elem)) et pop() (ici get()). Des FIFO thread safe quoi.
    queue_de_flux_a_mettre_a_jour = Queue()
    queue_de_mises_a_jour_des_flux = Queue()
 
    # ici on créé nos workers: on dit quelle fonction lancer avec quels
    # arguments. Nos arguments ici sont essentiellement les queues,
    # puisque c'est ce qui va nous permettre de partager les infos
    # entre les process (qui sont sinon isolés les uns des autres)
    worker_qui_met_a_jour_les_flux = Process(target=mettre_a_jour_les_flux,
                                             args=(queue_de_flux_a_mettre_a_jour,
                                                   queue_de_mises_a_jour_des_flux))
 
    worker_qui_demande_la_mise_a_jour = Process(target=demander_la_mise_a_jour_des_flux,
                                                args=(queue_de_flux_a_mettre_a_jour,
                                                      flux_rss))
 
    # On démarre les workers, et à partir de là, 2 processus sont créés
    # et lançant chacun une fonction, les boucles infinies tournent joyeusement
    # et une personne est agressée toutes les 7 secondes à New York aussi,
    # mais on s'en fout dans notre cas présent.
    # Bien faire gaffe que les fonctions soient capables de tourner à vide :-)
    worker_qui_met_a_jour_les_flux.start()
    worker_qui_demande_la_mise_a_jour.start()
 
    # et voici notre worker principal, qui pop les nouveaux flux tout
    # frais, et les affiche à l'écran
    try:
        while True:
            try:
                feed, articles = queue_de_mises_a_jour_des_flux.get(0.2)
                print "Voici les derniers articles de %s :" % feed
                for article in articles:
                    print "- %s" % article
            except TimeoutError:
                pass
 
    except KeyboardInterrupt:
        pass
    finally:
        # si la boucle while s'arrête d'une manière ou d'une autre
        # on attend que les autres processus s'arrêtent avant de quitter
        # En vrai on mettrait beaucoup plus de code que ça, une file
        # de controle, peut être un handler de SIGTERM, etc
        # là on va à l'essentiel
        worker_qui_met_a_jour_les_flux.join()
        worker_qui_demande_la_mise_a_jour.join()
 
    print "Fin des haricots"

On lance le bouzin:

    python rssmania.py

Python se charge automatiquement de créer 2 subprocess, un qui lance la fonction mettre_a_jour_les_flux() et un pour demander_la_mise_a_jour_des_flux() puis il va faire tourner notre bouclinette principale avec amour.

Normalement, au premier lancement ça donne un truc comme ça:

Voici les derniers articles de Sam & Max: Python, Django, Git et du cul :
- http://sametmax.com/rassurez-vous-vous-netes-pas-bizarres/
- http://sametmax.com/fonctions-anonymes-en-python-ou-lambda/
- http://sametmax.com/deterer-le-cadavre-dun-troll-non-php-nest-pas-simple/
- http://sametmax.com/concurrence-sans-threads-en-python/
- http://sametmax.com/humour-reflexion-et-cul-la-formule-ne-date-pas-dhier/
- http://sametmax.com/state-machine-en-python-en-labsence-dalgos-recursifs-beneficiant-de-tail-call-optimisation/
- http://sametmax.com/appel-a-contributeurs-impertinents/
- http://sametmax.com/synchroniser-les-freeplugs-les-adaptateurs-reseaux-cpl-de-free/
- http://sametmax.com/incendie-en-espagne-un-megot-peut-se-tranformer-en-arme-mortelle/
- http://sametmax.com/jadore-les-context-managers-python/
Voici les derniers articles de Liens en vrac de sebsauvage :
- http://imgur.com/37R4c
- http://www.clubic.com/navigateur-internet-mobile/opera-mini/actualite-503834-opera-mini-depasse-200-utilisateurs.html
- http://www.lesnumeriques.com/jeux-video/impire-p14041/impire-creez-vos-donjons-comme-a-bonne-epoque-annees-bullfrog-n25461.html
- http://sebsauvage.net/links/index.php?Jr5VKg
- http://sebsauvage.net/links/index.php?PQUdwA
- http://imgur.com/A4xkr

Et 5 minutes plus tard (dans le cas improbable où un article a été publié entre temps), ça affiche les nouveaux articles.

Si vous appuyez sur CTRL + C, SIGINT va être envoyé à tous les workers, et ils vont tous s’arrêter gentiment. Normalement. En théorie. Souvent ça marche. Sur ma machine.

15 thoughts on “Remplacer les threads avec le module multiprocessing en Python

  • Soli

    Et à partir de 3.2 on pensera à utiliser concurrent.futures et ici ProcessPool pour gérer facilement des groupes de processus (ou de threads avec la même interface).

  • Sam Post author

    Pour celui qui a cherché:

    difference entre start et run multiprocessing python:

    start() est la méthode qu’on appel pour demander à Python de lancer un processus séparer.

    run()
    est la méthode que Python va appeler quand il démarrera le processus séparé.

    run() est appelé après start() automatiquement, et on peut overrider run() quand on souhaite contrôler la création du worker.

    En général, on ne fait que lancer start(), et on laisse Python s’occuper du reste.

  • c2_4b

    pour checker la date il s’agit plutot d’un article.updated_parsed
    plutôt que article.published_parsed

    Mais sinon nickel la démo, merci!

  • Désanuseur

    Et là bim un flux mal formé et le script ne fonctionne plus !

  • Désanuseur

    feed = feedparser.parse(flux) mais où est le bozo de higs ;)

  • Sam Post author

    C’est un exemple sur le multiprocessing, par sur le parsing de flux, spéce de désanuseur.

  • Rifif

    Bonjour,

    Une question con mais je suis bloqué là. Comment je fais si j’utilise cette méthode et que je veux tout tuer, arrêter tous les workers, et terminer mon script ?

    Je cherche une méthode pas trop bourrin bien sûr, car j’utilise plus ou moins votre exemple de code au sein d’une GUI.

  • Sam Post author

    Je réponds quand même ici sinon c’est zarb. Ca va dépendre de l’événement qui va déclencher l’arrêt : saisie clavier, événement réseau, signal d’interruption, etc.

  • Rififi

    La fermeture de la fenêtre principale, qui n’a à peu près aucun moyen de communiquer avec les workers.

    Pour l’instant je fais ça, dans le fichier principal, qui lance la fenêtre:

    if __name__ == '__main__':
        os.setpgrp() # create new process group, become its leader
        try:
            main()
        finally:
            os.killpg(0, signal.SIGKILL) # kill all processes in my group

    Mais c’est un peu bourrin, je suis obligé de fermer la fenêtre.

  • Axen

    Hello, je voulais savoir comment on traitait une erreur dans un processus enfant ?

    Dans mon cas j’utilise inotify d’un coté, et j’ai lancé un processus enfant qui va analyser un fichier (de sorte que si l’ordi s’éteint l’analyse puisse reprendre) qui se remplit en fonction de ce que relève inotify.

    Or, par le fruit du hasard il m’est apparu une erreur bête, peu gênante mais qui m’a amené à me poser cette question, en effet le répertoire était mal orthographié pour le fichier, le processus enfant a pris une erreur mais le script lui en revanche ne s’est pas arrêté.

  • Axen

    Re,

    Vu que c’était votre code je pensais avoir une réponse, mais merci ceci dit.

    Je suis revenu que récemment sur mon code, vu qu’il ne se produite que lorsque je sors du programme via ctrl+C

    Apparemment si je ne me plante pas pour la solution, cela viendrait du positionnement des .join(), que j’ai mis comme sur votre exemple après finally. Cela me renvoie une erreur atexit. En les mettant à la fin du try, il semblerait que l’on sorte correctement du programme, avec un keyboardinterrupt

    Cordialement.

Comments are closed.

Des questions Python sans rapport avec l'article ? Posez-les sur IndexError.