Sam & Max » concurrency http://sametmax.com Du code, du cul Sat, 07 Nov 2015 10:56:13 +0000 en-US hourly 1 http://wordpress.org/?v=4.1 Remplacer les threads avec le module multiprocessing en Python 12 http://sametmax.com/remplacer-les-threads-avec-le-module-multiprocessing-en-python/ http://sametmax.com/remplacer-les-threads-avec-le-module-multiprocessing-en-python/#comments Tue, 31 Jul 2012 12:24:03 +0000 http://sametmax.com/?p=1430 Liferea. Liferea a pendant bien longtemps freezé l'intégralité de l'UI pendant la mise à jour de la liste d'articles (ben oui le temps de charger une page Web, la main loop attend). On peut éviter cela en utilisant des threads ou, dans notre, cas, de multiples processus.]]> Les threads Python sont limités par le Global Interpreter Lock, et si ils permettent de s’affranchir des problèmes de concurrence d’accès IO, ils sont inefficaces pour profiter de nos merveilleux processeurs multi-coeurs. Les coroutines, une alternative élégante aux threads, ont la même limitation.

Heureusement Python vient avec le module multiprocessing, qui permet justement de créer plusieurs processus séparés, et les orchestrer pour qu’ils travaillent ensemble, et ainsi saturer la consommation de ressource de nos serveurs modernes si chers et si puissants.

Prenons un employé de banque que nous appellerons A, et un épagneul Breton, que nous appellerons Catherine.

Euh non…

Prenons plutôt une application qui poll des flux RSS comme Liferea. Liferea a pendant bien longtemps freezé l’intégralité de l’UI pendant la mise à jour de la liste d’articles (ben oui le temps de charger une page Web, la main loop attend). On peut éviter cela en utilisant des threads ou, dans notre, cas, de multiples processus.

Bon, il y a peu de chance que Lifera soit CPU bound, donc c’est vrai que dans ce cas les threads feraient aussi bien, mais c’est pour l’exemple, bande de tatillons.

Pour notre cas de figure, nous avons besoin:

  • d’un process qui demande aux autres de vérifier les derniers flux RSS (pour simuler une interaction utilisateur);
  • d’un process qui va faire la vérification des flux RSS sans bloquer les autres process;
  • de feedparser, une lib Python qui parse les flux RSS;
  • d’un process qui va lancer tout ça, récupérer le résultat et l’afficher.

Pour feedparser avec pip:

    pip install feedparser

Ça c’est fait.

Pour le reste, on se fait un petit fichier rssmania.py:

# -*- coding: utf-8 -*-
 
import time
from time import mktime
from datetime import datetime
from multiprocessing import Process, Queue, TimeoutError
 
import feedparser
 
# cette fonction va être utilisée comme worker
# elle va lancer un process qui tourne en boucle et vérifie de manière
# régulière si il y a des flux à mettre à jour
def mettre_a_jour_les_flux(queue_flux_a_mettre_a_jour, queue_de_mises_a_jour_des_flux):
 
    last_update = {}
 
    while True: # une bonne boucle infinie pour la main loop
        try:
            # on vérifie si il y a un message dans la queue pendant 0.1 secondes
            # si oui, on parse le flux (sinon, ça raise une TimeoutError)
            flux = queue_flux_a_mettre_a_jour.get(0.1)
 
            feed = feedparser.parse(flux)
            nouveaux_articles = []
            # pour chaque article, on vérifie si la date de parution est
            # antérieur au dernier check, et si oui, on le déclare
            # "nouvel article"
            for article in feed.entries:
                try:
                    dt = datetime.fromtimestamp(mktime(article.updated_parsed))
                    if dt > last_update[flux]:
                        nouveaux_articles.append(article.link)
                except KeyError:
                    nouveaux_articles.append(article.link)
 
            # on balance tous les nouveaux articles dans la queue
            if nouveaux_articles:
                queue_de_mises_a_jour_des_flux.put((feed.feed.title, nouveaux_articles))
 
            last_update[flux] = datetime.now()
 
        # en cas de time out on repart sur un tour de boucle
        # si l'utilisateur fait CTRL+C sur le worker principal, il sera
        # broadcasté ici, donc on le catch et on exit proprement
        except TimeoutError:
            pass
        except KeyboardInterrupt:
            sys.exit(0)
 
 
# worker très basique qui demande la mise à jour de tous les flux
# c'est bourrin, mais c'est pour l'exemple on vous dit !
def demander_la_mise_a_jour_des_flux(queue_de_flux_a_mettre_a_jour, flux_rss):
    """
        Demande la mise à jour des flux toutes les 5 minutes
    """
 
    # pareil, petite boucle infinie, temporisation et gestion du CTRL + C
    # en gros on ne fait que remplir la queue toutes les 5 minutes
    # avec des urls
    while True:
 
        try:
            for flux in flux_rss:
                queue_de_flux_a_mettre_a_jour.put(flux)
 
            time.sleep(300)
 
        except KeyboardInterrupt:
            sys.exit(0)
 
 
# très important ce if, sinon sous windows le module sera importé plusieurs
# fois et lancera ce bloc plusieurs fois
if __name__ == '__main__':
 
    # les flux à mettre à jour, RAS
    flux_rss = (
        'http://sametmax.com/feed/',
        "http://sebsauvage.net/links/index.php?do=rss",
        "http://charlesleifer.com/blog/rss/",
        "http://xkcd.com/rss.xml"
    )
 
    # les queues. Ces objets sont comme des listes partageables entre
    # les workers, sur lesquelles on pourrait faire uniquement insert(0, elem)
    # (ici put(elem)) et pop() (ici get()). Des FIFO thread safe quoi.
    queue_de_flux_a_mettre_a_jour = Queue()
    queue_de_mises_a_jour_des_flux = Queue()
 
    # ici on créé nos workers: on dit quelle fonction lancer avec quels
    # arguments. Nos arguments ici sont essentiellement les queues,
    # puisque c'est ce qui va nous permettre de partager les infos
    # entre les process (qui sont sinon isolés les uns des autres)
    worker_qui_met_a_jour_les_flux = Process(target=mettre_a_jour_les_flux,
                                             args=(queue_de_flux_a_mettre_a_jour,
                                                   queue_de_mises_a_jour_des_flux))
 
    worker_qui_demande_la_mise_a_jour = Process(target=demander_la_mise_a_jour_des_flux,
                                                args=(queue_de_flux_a_mettre_a_jour,
                                                      flux_rss))
 
    # On démarre les workers, et à partir de là, 2 processus sont créés
    # et lançant chacun une fonction, les boucles infinies tournent joyeusement
    # et une personne est agressée toutes les 7 secondes à New York aussi,
    # mais on s'en fout dans notre cas présent.
    # Bien faire gaffe que les fonctions soient capables de tourner à vide :-)
    worker_qui_met_a_jour_les_flux.start()
    worker_qui_demande_la_mise_a_jour.start()
 
    # et voici notre worker principal, qui pop les nouveaux flux tout
    # frais, et les affiche à l'écran
    try:
        while True:
            try:
                feed, articles = queue_de_mises_a_jour_des_flux.get(0.2)
                print "Voici les derniers articles de %s :" % feed
                for article in articles:
                    print "- %s" % article
            except TimeoutError:
                pass
 
    except KeyboardInterrupt:
        pass
    finally:
        # si la boucle while s'arrête d'une manière ou d'une autre
        # on attend que les autres processus s'arrêtent avant de quitter
        # En vrai on mettrait beaucoup plus de code que ça, une file
        # de controle, peut être un handler de SIGTERM, etc
        # là on va à l'essentiel
        worker_qui_met_a_jour_les_flux.join()
        worker_qui_demande_la_mise_a_jour.join()
 
    print "Fin des haricots"

On lance le bouzin:

    python rssmania.py

Python se charge automatiquement de créer 2 subprocess, un qui lance la fonction mettre_a_jour_les_flux() et un pour demander_la_mise_a_jour_des_flux() puis il va faire tourner notre bouclinette principale avec amour.

Normalement, au premier lancement ça donne un truc comme ça:

Voici les derniers articles de Sam & Max: Python, Django, Git et du cul :
- http://sametmax.com/rassurez-vous-vous-netes-pas-bizarres/
- http://sametmax.com/fonctions-anonymes-en-python-ou-lambda/
- http://sametmax.com/deterer-le-cadavre-dun-troll-non-php-nest-pas-simple/
- http://sametmax.com/concurrence-sans-threads-en-python/
- http://sametmax.com/humour-reflexion-et-cul-la-formule-ne-date-pas-dhier/
- http://sametmax.com/state-machine-en-python-en-labsence-dalgos-recursifs-beneficiant-de-tail-call-optimisation/
- http://sametmax.com/appel-a-contributeurs-impertinents/
- http://sametmax.com/synchroniser-les-freeplugs-les-adaptateurs-reseaux-cpl-de-free/
- http://sametmax.com/incendie-en-espagne-un-megot-peut-se-tranformer-en-arme-mortelle/
- http://sametmax.com/jadore-les-context-managers-python/
Voici les derniers articles de Liens en vrac de sebsauvage :
- http://imgur.com/37R4c
- http://www.clubic.com/navigateur-internet-mobile/opera-mini/actualite-503834-opera-mini-depasse-200-utilisateurs.html
- http://www.lesnumeriques.com/jeux-video/impire-p14041/impire-creez-vos-donjons-comme-a-bonne-epoque-annees-bullfrog-n25461.html
- http://sebsauvage.net/links/index.php?Jr5VKg
- http://sebsauvage.net/links/index.php?PQUdwA
- http://imgur.com/A4xkr

Et 5 minutes plus tard (dans le cas improbable où un article a été publié entre temps), ça affiche les nouveaux articles.

Si vous appuyez sur CTRL + C, SIGINT va être envoyé à tous les workers, et ils vont tous s’arrêter gentiment. Normalement. En théorie. Souvent ça marche. Sur ma machine.

]]>
http://sametmax.com/remplacer-les-threads-avec-le-module-multiprocessing-en-python/feed/ 12