{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# Console related imports.\n", "from subprocess import Popen, PIPE\n", "import os\n", "from IPython.utils.py3compat import bytes_to_str, string_types\n", "\n", "# Widget related imports.\n", "from IPython.html import widgets\n", "from IPython.display import display" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Define function to run a process without blocking the input." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def read_process(process, append_output):\n", " \"\"\" Try to read the stdout and stderr of a process and render it using \n", " the append_output method provided\n", " \n", " Parameters\n", " ----------\n", " process: Popen handle\n", " append_output: method handle\n", " Callback to render output. Signature of\n", " append_output(output, [prefix=])\"\"\"\n", " \n", " try:\n", " stdout = process.stdout.read()\n", " if stdout is not None and len(stdout) > 0:\n", " append_output(stdout, prefix=' ')\n", " except:\n", " pass\n", " \n", " try:\n", " stderr = process.stderr.read()\n", " if stderr is not None and len(stderr) > 0:\n", " append_output(stderr, prefix='ERR ')\n", " except:\n", " pass\n", "\n", "\n", "def set_pipe_nonblocking(pipe):\n", " \"\"\"Set a pipe as non-blocking\"\"\"\n", " try:\n", " import fcntl\n", " fl = fcntl.fcntl(pipe, fcntl.F_GETFL)\n", " fcntl.fcntl(pipe, fcntl.F_SETFL, fl | os.O_NONBLOCK)\n", " except:\n", " pass\n", "\n", "kernel = get_ipython().kernel\n", "def run_command(command, append_output, has_user_exited=None):\n", " \"\"\"Run a command asyncronously\n", " \n", " Parameters\n", " ----------\n", " command: str\n", " Shell command to launch a process with.\n", " append_output: method handle\n", " Callback to render output. Signature of\n", " append_output(output, [prefix=])\n", " has_user_exited: method handle\n", " Check to see if the user wants to stop the command.\n", " Must return a boolean.\"\"\"\n", " \n", " # Echo input.\n", " append_output(command, prefix='>>> ')\n", " \n", " # Create the process. Make sure the pipes are set as non-blocking.\n", " process = Popen(command, shell=True, stdout=PIPE, stderr=PIPE)\n", " set_pipe_nonblocking(process.stdout)\n", " set_pipe_nonblocking(process.stderr)\n", " \n", " # Only continue to read from the command \n", " while (has_user_exited is None or not has_user_exited()) and process.poll() is None:\n", " read_process(process, append_output)\n", " kernel.do_one_iteration() # Run IPython iteration. This is the code that\n", " # makes this operation non-blocking. This will\n", " # allow widget messages and callbacks to be \n", " # processed.\n", " \n", " # If the process is still running, the user must have exited.\n", " if process.poll() is None:\n", " process.kill()\n", " else:\n", " read_process(process, append_output) # Read remainer\n", " \n", " \n", " \n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create the console widgets without displaying them." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "console_container = widgets.VBox(visible=False)\n", "console_container.padding = '10px'\n", "\n", "output_box = widgets.Textarea()\n", "output_box.height = '400px'\n", "output_box.font_family = 'monospace'\n", "output_box.color = '#AAAAAA'\n", "output_box.background_color = 'black'\n", "output_box.width = '800px'\n", "\n", "input_box = widgets.Text()\n", "input_box.font_family = 'monospace'\n", "input_box.color = '#AAAAAA'\n", "input_box.background_color = 'black'\n", "input_box.width = '800px'\n", "\n", "console_container.children = [output_box, input_box]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Hook the process execution methods up to our console widgets." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "\n", "def append_output(output, prefix):\n", " if isinstance(output, string_types):\n", " output_str = output\n", " else:\n", " output_str = bytes_to_str(output)\n", " output_lines = output_str.split('\\n')\n", " formatted_output = '\\n'.join([prefix + line for line in output_lines if len(line) > 0]) + '\\n'\n", " output_box.value += formatted_output\n", " output_box.scroll_to_bottom()\n", " \n", "def has_user_exited():\n", " return not console_container.visible\n", "\n", "def handle_input(sender):\n", " sender.disabled = True\n", " try:\n", " command = sender.value\n", " sender.value = ''\n", " run_command(command, append_output=append_output, has_user_exited=has_user_exited)\n", " finally:\n", " sender.disabled = False\n", " \n", "input_box.on_submit(handle_input)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create the button that will be used to display and hide the console. Display both the console container and the new button used to toggle it." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "toggle_button = widgets.Button(description=\"Start Console\")\n", "def toggle_console(sender):\n", " console_container.visible = not console_container.visible\n", " if console_container.visible:\n", " toggle_button.description=\"Stop Console\"\n", " input_box.disabled = False\n", " else:\n", " toggle_button.description=\"Start Console\"\n", "toggle_button.on_click(toggle_console)\n", "\n", "display(toggle_button)\n", "display(console_container)" ] } ], "metadata": { "kernelspec": { "codemirror_mode": { "name": "python", "version": 2 }, "display_name": "Python 2", "language": "python", "name": "python2" } }, "nbformat": 4, "nbformat_minor": 0 }