Les threads Python sont limités par le Global Interpreter Lock, et si ils permettent de s’affranchir des problèmes de concurrence d’accès IO, ils sont inefficaces pour profiter de nos merveilleux processeurs multi-coeurs. Les coroutines, une alternative élégante aux threads, ont la même limitation.
Heureusement Python vient avec le module multiprocessing, qui permet justement de créer plusieurs processus séparés, et les orchestrer pour qu’ils travaillent ensemble, et ainsi saturer la consommation de ressource de nos serveurs modernes si chers et si puissants.
Prenons un employé de banque que nous appellerons A, et un épagneul Breton, que nous appellerons Catherine.
Euh non…
Prenons plutôt une application qui poll des flux RSS comme Liferea. Liferea a pendant bien longtemps freezé l’intégralité de l’UI pendant la mise à jour de la liste d’articles (ben oui le temps de charger une page Web, la main loop attend). On peut éviter cela en utilisant des threads ou, dans notre, cas, de multiples processus.
Bon, il y a peu de chance que Lifera soit CPU bound, donc c’est vrai que dans ce cas les threads feraient aussi bien, mais c’est pour l’exemple, bande de tatillons.
Pour notre cas de figure, nous avons besoin:
- d’un process qui demande aux autres de vérifier les derniers flux RSS (pour simuler une interaction utilisateur);
- d’un process qui va faire la vérification des flux RSS sans bloquer les autres process;
- de
feedparser
, une lib Python qui parse les flux RSS; - d’un process qui va lancer tout ça, récupérer le résultat et l’afficher.
Pour feedparser avec pip:
pip install feedparser |
Ça c’est fait.
Pour le reste, on se fait un petit fichier rssmania.py:
# -*- coding: utf-8 -*- import time from time import mktime from datetime import datetime from multiprocessing import Process, Queue, TimeoutError import feedparser # cette fonction va être utilisée comme worker # elle va lancer un process qui tourne en boucle et vérifie de manière # régulière si il y a des flux à mettre à jour def mettre_a_jour_les_flux(queue_flux_a_mettre_a_jour, queue_de_mises_a_jour_des_flux): last_update = {} while True: # une bonne boucle infinie pour la main loop try: # on vérifie si il y a un message dans la queue pendant 0.1 secondes # si oui, on parse le flux (sinon, ça raise une TimeoutError) flux = queue_flux_a_mettre_a_jour.get(0.1) feed = feedparser.parse(flux) nouveaux_articles = [] # pour chaque article, on vérifie si la date de parution est # antérieur au dernier check, et si oui, on le déclare # "nouvel article" for article in feed.entries: try: dt = datetime.fromtimestamp(mktime(article.updated_parsed)) if dt > last_update[flux]: nouveaux_articles.append(article.link) except KeyError: nouveaux_articles.append(article.link) # on balance tous les nouveaux articles dans la queue if nouveaux_articles: queue_de_mises_a_jour_des_flux.put((feed.feed.title, nouveaux_articles)) last_update[flux] = datetime.now() # en cas de time out on repart sur un tour de boucle # si l'utilisateur fait CTRL+C sur le worker principal, il sera # broadcasté ici, donc on le catch et on exit proprement except TimeoutError: pass except KeyboardInterrupt: sys.exit(0) # worker très basique qui demande la mise à jour de tous les flux # c'est bourrin, mais c'est pour l'exemple on vous dit ! def demander_la_mise_a_jour_des_flux(queue_de_flux_a_mettre_a_jour, flux_rss): """ Demande la mise à jour des flux toutes les 5 minutes """ # pareil, petite boucle infinie, temporisation et gestion du CTRL + C # en gros on ne fait que remplir la queue toutes les 5 minutes # avec des urls while True: try: for flux in flux_rss: queue_de_flux_a_mettre_a_jour.put(flux) time.sleep(300) except KeyboardInterrupt: sys.exit(0) # très important ce if, sinon sous windows le module sera importé plusieurs # fois et lancera ce bloc plusieurs fois if __name__ == '__main__': # les flux à mettre à jour, RAS flux_rss = ( 'http://sametmax.com/feed/', "http://sebsauvage.net/links/index.php?do=rss", "http://charlesleifer.com/blog/rss/", "http://xkcd.com/rss.xml" ) # les queues. Ces objets sont comme des listes partageables entre # les workers, sur lesquelles on pourrait faire uniquement insert(0, elem) # (ici put(elem)) et pop() (ici get()). Des FIFO thread safe quoi. queue_de_flux_a_mettre_a_jour = Queue() queue_de_mises_a_jour_des_flux = Queue() # ici on créé nos workers: on dit quelle fonction lancer avec quels # arguments. Nos arguments ici sont essentiellement les queues, # puisque c'est ce qui va nous permettre de partager les infos # entre les process (qui sont sinon isolés les uns des autres) worker_qui_met_a_jour_les_flux = Process(target=mettre_a_jour_les_flux, args=(queue_de_flux_a_mettre_a_jour, queue_de_mises_a_jour_des_flux)) worker_qui_demande_la_mise_a_jour = Process(target=demander_la_mise_a_jour_des_flux, args=(queue_de_flux_a_mettre_a_jour, flux_rss)) # On démarre les workers, et à partir de là, 2 processus sont créés # et lançant chacun une fonction, les boucles infinies tournent joyeusement # et une personne est agressée toutes les 7 secondes à New York aussi, # mais on s'en fout dans notre cas présent. # Bien faire gaffe que les fonctions soient capables de tourner à vide :-) worker_qui_met_a_jour_les_flux.start() worker_qui_demande_la_mise_a_jour.start() # et voici notre worker principal, qui pop les nouveaux flux tout # frais, et les affiche à l'écran try: while True: try: feed, articles = queue_de_mises_a_jour_des_flux.get(0.2) print "Voici les derniers articles de %s :" % feed for article in articles: print "- %s" % article except TimeoutError: pass except KeyboardInterrupt: pass finally: # si la boucle while s'arrête d'une manière ou d'une autre # on attend que les autres processus s'arrêtent avant de quitter # En vrai on mettrait beaucoup plus de code que ça, une file # de controle, peut être un handler de SIGTERM, etc # là on va à l'essentiel worker_qui_met_a_jour_les_flux.join() worker_qui_demande_la_mise_a_jour.join() print "Fin des haricots" |
On lance le bouzin:
python rssmania.py |
Python se charge automatiquement de créer 2 subprocess, un qui lance la fonction mettre_a_jour_les_flux()
et un pour demander_la_mise_a_jour_des_flux()
puis il va faire tourner notre bouclinette principale avec amour.
Normalement, au premier lancement ça donne un truc comme ça:
Voici les derniers articles de Sam & Max: Python, Django, Git et du cul : - http://sametmax.com/rassurez-vous-vous-netes-pas-bizarres/ - http://sametmax.com/fonctions-anonymes-en-python-ou-lambda/ - http://sametmax.com/deterer-le-cadavre-dun-troll-non-php-nest-pas-simple/ - http://sametmax.com/concurrence-sans-threads-en-python/ - http://sametmax.com/humour-reflexion-et-cul-la-formule-ne-date-pas-dhier/ - http://sametmax.com/state-machine-en-python-en-labsence-dalgos-recursifs-beneficiant-de-tail-call-optimisation/ - http://sametmax.com/appel-a-contributeurs-impertinents/ - http://sametmax.com/synchroniser-les-freeplugs-les-adaptateurs-reseaux-cpl-de-free/ - http://sametmax.com/incendie-en-espagne-un-megot-peut-se-tranformer-en-arme-mortelle/ - http://sametmax.com/jadore-les-context-managers-python/ Voici les derniers articles de Liens en vrac de sebsauvage : - http://imgur.com/37R4c - http://www.clubic.com/navigateur-internet-mobile/opera-mini/actualite-503834-opera-mini-depasse-200-utilisateurs.html - http://www.lesnumeriques.com/jeux-video/impire-p14041/impire-creez-vos-donjons-comme-a-bonne-epoque-annees-bullfrog-n25461.html - http://sebsauvage.net/links/index.php?Jr5VKg - http://sebsauvage.net/links/index.php?PQUdwA - http://imgur.com/A4xkr
Et 5 minutes plus tard (dans le cas improbable où un article a été publié entre temps), ça affiche les nouveaux articles.
Si vous appuyez sur CTRL + C, SIGINT
va être envoyé à tous les workers, et ils vont tous s’arrêter gentiment. Normalement. En théorie. Souvent ça marche. Sur ma machine.
Et à partir de 3.2 on pensera à utiliser
concurrent.futures
et ici ProcessPool pour gérer facilement des groupes de processus (ou de threads avec la même interface).Pour celui qui a cherché:
difference entre start et run multiprocessing python:
start()
est la méthode qu’on appel pour demander à Python de lancer un processus séparer.
est la méthode que Python va appeler quand il démarrera le processus séparé.run()
run()
est appelé aprèsstart()
automatiquement, et on peut overriderrun()
quand on souhaite contrôler la création du worker.En général, on ne fait que lancer
start()
, et on laisse Python s’occuper du reste.Au fait, la réplique demandée dans le title vient du “Péril jeune” :)
pour checker la date il s’agit plutot d’un article.updated_parsed
plutôt que article.published_parsed
Mais sinon nickel la démo, merci!
Corrigé
Et là bim un flux mal formé et le script ne fonctionne plus !
feed = feedparser.parse(flux) mais où est le bozo de higs ;)
C’est un exemple sur le multiprocessing, par sur le parsing de flux, spéce de désanuseur.
Bonjour,
Une question con mais je suis bloqué là. Comment je fais si j’utilise cette méthode et que je veux tout tuer, arrêter tous les workers, et terminer mon script ?
Je cherche une méthode pas trop bourrin bien sûr, car j’utilise plus ou moins votre exemple de code au sein d’une GUI.
Non je me suis gouré, c’est pas ici que je voulais poser la question, mais plutôt là ….
Désolé
Je réponds quand même ici sinon c’est zarb. Ca va dépendre de l’événement qui va déclencher l’arrêt : saisie clavier, événement réseau, signal d’interruption, etc.
La fermeture de la fenêtre principale, qui n’a à peu près aucun moyen de communiquer avec les workers.
Pour l’instant je fais ça, dans le fichier principal, qui lance la fenêtre:
Mais c’est un peu bourrin, je suis obligé de fermer la fenêtre.