Remplacer les threads avec le module multiprocessing en Python

Les threads Python sont limités par le Global Interpreter Lock, et si ils permettent de s’affranchir des problèmes de concurrence d’accès IO, ils sont inefficaces pour profiter de nos merveilleux processeurs multi-coeurs. Les coroutines, une alternative élégante aux threads, ont la même limitation.

Heureusement Python vient avec le module multiprocessing, qui permet justement de créer plusieurs processus séparés, et les orchestrer pour qu’ils travaillent ensemble, et ainsi saturer la consommation de ressource de nos serveurs modernes si chers et si puissants.

Prenons un employé de banque que nous appellerons A, et un épagneul Breton, que nous appellerons Catherine.

Euh non…

Prenons plutôt une application qui poll des flux RSS comme Liferea. Liferea a pendant bien longtemps freezé l’intégralité de l’UI pendant la mise à jour de la liste d’articles (ben oui le temps de charger une page Web, la main loop attend). On peut éviter cela en utilisant des threads ou, dans notre, cas, de multiples processus.

Bon, il y a peu de chance que Lifera soit CPU bound, donc c’est vrai que dans ce cas les threads feraient aussi bien, mais c’est pour l’exemple, bande de tatillons.

Pour notre cas de figure, nous avons besoin:

  • d’un process qui demande aux autres de vérifier les derniers flux RSS (pour simuler une interaction utilisateur);
  • d’un process qui va faire la vérification des flux RSS sans bloquer les autres process;
  • de feedparser, une lib Python qui parse les flux RSS;
  • d’un process qui va lancer tout ça, récupérer le résultat et l’afficher.

Pour feedparser avec pip:

    pip install feedparser

Ça c’est fait.

Pour le reste, on se fait un petit fichier rssmania.py:

# -*- coding: utf-8 -*-
 
import time
from time import mktime
from datetime import datetime
from multiprocessing import Process, Queue, TimeoutError
 
import feedparser
 
# cette fonction va être utilisée comme worker
# elle va lancer un process qui tourne en boucle et vérifie de manière
# régulière si il y a des flux à mettre à jour
def mettre_a_jour_les_flux(queue_flux_a_mettre_a_jour, queue_de_mises_a_jour_des_flux):
 
    last_update = {}
 
    while True: # une bonne boucle infinie pour la main loop
        try:
            # on vérifie si il y a un message dans la queue pendant 0.1 secondes
            # si oui, on parse le flux (sinon, ça raise une TimeoutError)
            flux = queue_flux_a_mettre_a_jour.get(0.1)
 
            feed = feedparser.parse(flux)
            nouveaux_articles = []
            # pour chaque article, on vérifie si la date de parution est
            # antérieur au dernier check, et si oui, on le déclare
            # "nouvel article"
            for article in feed.entries:
                try:
                    dt = datetime.fromtimestamp(mktime(article.updated_parsed))
                    if dt > last_update[flux]:
                        nouveaux_articles.append(article.link)
                except KeyError:
                    nouveaux_articles.append(article.link)
 
            # on balance tous les nouveaux articles dans la queue
            if nouveaux_articles:
                queue_de_mises_a_jour_des_flux.put((feed.feed.title, nouveaux_articles))
 
            last_update[flux] = datetime.now()
 
        # en cas de time out on repart sur un tour de boucle
        # si l'utilisateur fait CTRL+C sur le worker principal, il sera
        # broadcasté ici, donc on le catch et on exit proprement
        except TimeoutError:
            pass
        except KeyboardInterrupt:
            sys.exit(0)
 
 
# worker très basique qui demande la mise à jour de tous les flux
# c'est bourrin, mais c'est pour l'exemple on vous dit !
def demander_la_mise_a_jour_des_flux(queue_de_flux_a_mettre_a_jour, flux_rss):
    """
        Demande la mise à jour des flux toutes les 5 minutes
    """
 
    # pareil, petite boucle infinie, temporisation et gestion du CTRL + C
    # en gros on ne fait que remplir la queue toutes les 5 minutes
    # avec des urls
    while True:
 
        try:
            for flux in flux_rss:
                queue_de_flux_a_mettre_a_jour.put(flux)
 
            time.sleep(300)
 
        except KeyboardInterrupt:
            sys.exit(0)
 
 
# très important ce if, sinon sous windows le module sera importé plusieurs
# fois et lancera ce bloc plusieurs fois
if __name__ == '__main__':
 
    # les flux à mettre à jour, RAS
    flux_rss = (
        'http://sametmax.com/feed/',
        "http://sebsauvage.net/links/index.php?do=rss",
        "http://charlesleifer.com/blog/rss/",
        "http://xkcd.com/rss.xml"
    )
 
    # les queues. Ces objets sont comme des listes partageables entre
    # les workers, sur lesquelles on pourrait faire uniquement insert(0, elem)
    # (ici put(elem)) et pop() (ici get()). Des FIFO thread safe quoi.
    queue_de_flux_a_mettre_a_jour = Queue()
    queue_de_mises_a_jour_des_flux = Queue()
 
    # ici on créé nos workers: on dit quelle fonction lancer avec quels
    # arguments. Nos arguments ici sont essentiellement les queues,
    # puisque c'est ce qui va nous permettre de partager les infos
    # entre les process (qui sont sinon isolés les uns des autres)
    worker_qui_met_a_jour_les_flux = Process(target=mettre_a_jour_les_flux,
                                             args=(queue_de_flux_a_mettre_a_jour,
                                                   queue_de_mises_a_jour_des_flux))
 
    worker_qui_demande_la_mise_a_jour = Process(target=demander_la_mise_a_jour_des_flux,
                                                args=(queue_de_flux_a_mettre_a_jour,
                                                      flux_rss))
 
    # On démarre les workers, et à partir de là, 2 processus sont créés
    # et lançant chacun une fonction, les boucles infinies tournent joyeusement
    # et une personne est agressée toutes les 7 secondes à New York aussi,
    # mais on s'en fout dans notre cas présent.
    # Bien faire gaffe que les fonctions soient capables de tourner à vide :-)
    worker_qui_met_a_jour_les_flux.start()
    worker_qui_demande_la_mise_a_jour.start()
 
    # et voici notre worker principal, qui pop les nouveaux flux tout
    # frais, et les affiche à l'écran
    try:
        while True:
            try:
                feed, articles = queue_de_mises_a_jour_des_flux.get(0.2)
                print "Voici les derniers articles de %s :" % feed
                for article in articles:
                    print "- %s" % article
            except TimeoutError:
                pass
 
    except KeyboardInterrupt:
        pass
    finally:
        # si la boucle while s'arrête d'une manière ou d'une autre
        # on attend que les autres processus s'arrêtent avant de quitter
        # En vrai on mettrait beaucoup plus de code que ça, une file
        # de controle, peut être un handler de SIGTERM, etc
        # là on va à l'essentiel
        worker_qui_met_a_jour_les_flux.join()
        worker_qui_demande_la_mise_a_jour.join()
 
    print "Fin des haricots"

On lance le bouzin:

    python rssmania.py

Python se charge automatiquement de créer 2 subprocess, un qui lance la fonction mettre_a_jour_les_flux() et un pour demander_la_mise_a_jour_des_flux() puis il va faire tourner notre bouclinette principale avec amour.

Normalement, au premier lancement ça donne un truc comme ça:

Voici les derniers articles de Sam & Max: Python, Django, Git et du cul :
- http://sametmax.com/rassurez-vous-vous-netes-pas-bizarres/
- http://sametmax.com/fonctions-anonymes-en-python-ou-lambda/
- http://sametmax.com/deterer-le-cadavre-dun-troll-non-php-nest-pas-simple/
- http://sametmax.com/concurrence-sans-threads-en-python/
- http://sametmax.com/humour-reflexion-et-cul-la-formule-ne-date-pas-dhier/
- http://sametmax.com/state-machine-en-python-en-labsence-dalgos-recursifs-beneficiant-de-tail-call-optimisation/
- http://sametmax.com/appel-a-contributeurs-impertinents/
- http://sametmax.com/synchroniser-les-freeplugs-les-adaptateurs-reseaux-cpl-de-free/
- http://sametmax.com/incendie-en-espagne-un-megot-peut-se-tranformer-en-arme-mortelle/
- http://sametmax.com/jadore-les-context-managers-python/
Voici les derniers articles de Liens en vrac de sebsauvage :
- http://imgur.com/37R4c
- http://www.clubic.com/navigateur-internet-mobile/opera-mini/actualite-503834-opera-mini-depasse-200-utilisateurs.html
- http://www.lesnumeriques.com/jeux-video/impire-p14041/impire-creez-vos-donjons-comme-a-bonne-epoque-annees-bullfrog-n25461.html
- http://sebsauvage.net/links/index.php?Jr5VKg
- http://sebsauvage.net/links/index.php?PQUdwA
- http://imgur.com/A4xkr

Et 5 minutes plus tard (dans le cas improbable où un article a été publié entre temps), ça affiche les nouveaux articles.

Si vous appuyez sur CTRL + C, SIGINT va être envoyé à tous les workers, et ils vont tous s’arrêter gentiment. Normalement. En théorie. Souvent ça marche. Sur ma machine.

Articles similaires:

  1. Concurrence sans threads en python
  2. Diminuer la charge cpu d’un process avec renice
  3. Vérifier la RAM que consomme un process sous Linux avec pmap
  4. Executer une commande sur un serveur distant via ssh avec Python
  5. Récupérer le load average d’un serveur avec python + fonction sleep

flattr this!

8 comments

  1. Et à partir de 3.2 on pensera à utiliser concurrent.futures et ici ProcessPool pour gérer facilement des groupes de processus (ou de threads avec la même interface).

  2. Pour celui qui a cherché:

    difference entre start et run multiprocessing python:

    start() est la méthode qu’on appel pour demander à Python de lancer un processus séparer.

    run()
    est la méthode que Python va appeler quand il démarrera le processus séparé.

    run() est appelé après start() automatiquement, et on peut overrider run() quand on souhaite contrôler la création du worker.

    En général, on ne fait que lancer start(), et on laisse Python s’occuper du reste.

  3. Au fait, la réplique demandée dans le title vient du “Péril jeune” :)

  4. pour checker la date il s’agit plutot d’un article.updated_parsed
    plutôt que article.published_parsed

    Mais sinon nickel la démo, merci!

  5. Corrigé

  6. Désanuseur

    Et là bim un flux mal formé et le script ne fonctionne plus !

  7. Désanuseur

    feed = feedparser.parse(flux) mais où est le bozo de higs ;)

  8. C’est un exemple sur le multiprocessing, par sur le parsing de flux, spéce de désanuseur.

Flux RSS des commentaires

Leave a Reply

Your email address will not be published. Required fields are marked *

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong> <pre lang="" line="" escaped="" highlight="">

Jouer à mario en attendant que les autres répondent