#!/usr/bin/env python
# zmqc: a small but powerful command-line interface to ZMQ.
## Usage:
# zmqc [-0] [-r | -w] (-b | -c) SOCK_TYPE [-o SOCK_OPT=VALUE...] address [address ...]
## Examples:
# zmqc -rc SUB 'tcp://127.0.0.1:5000'
#
# Subscribe to 'tcp://127.0.0.1:5000', reading messages from it and printing
# them to the console. This will subscribe to all messages by default.
#
# ls | zmqc -wb PUSH 'tcp://*:4000'
#
# Send the name of every file in the current directory as a message from a
# PUSH socket bound to port 4000 on all interfaces. Don't forget to quote the
# address to avoid glob expansion.
#
# zmqc -rc PULL 'tcp://127.0.0.1:5202' | tee $TTY | zmqc -wc PUSH 'tcp://127.0.0.1:5404'
#
# Read messages coming from a PUSH socket bound to port 5202 (note that we're
# connecting with a PULL socket), echo them to the active console, and
# forward them to a PULL socket bound to port 5404 (so we're connecting with
# a PUSH).
#
# zmqc -n 10 -0rb PULL 'tcp://*:4123' | xargs -0 grep 'pattern'
#
# Bind to a PULL socket on port 4123, receive 10 messages from the socket
# (with each message representing a filename), and grep the files for
# `'pattern'`. The `-0` option means messages will be NULL-delimited rather
# than separated by newlines, so that filenames with spaces in them are not
# considered two separate arguments by xargs.
#
# echo "hello" | zmqc -c REQ 'tcp://127.0.0.1:4000'
#
# Send the string "hello" through a REQ socket connected to localhost port
# 4000, print whatever you get back and finish. In this way, REQ sockets can
# be used for a rudimentary form of RPC in shell scripts.
#
# coproc zmqc -b REP 'tcp://*:4000'
# tr -u '[a-z]' '[A-Z]' <&p >&p &
# echo "hello" | zmqc -c REQ 'tcp://127.0.0.1:4000'
#
# First, start a ZeroMQ REP socket listening on port 4000. The 'coproc' shell
# command runs this as a shell coprocess, which allows us to run the next
# line, tr. This will read its input from the REP socket's output, translate
# all lowercase characters to uppercase, and send them back to the REP
# socket's input. This, again, is run in the background. Finally, connect a
# REQ socket to that REP socket and send the string "hello" through it: you
# should just see the string "HELLO" printed on stdout.
## License:
# This is free and unencumbered software released into the public domain.
#
# Anyone is free to copy, modify, publish, use, compile, sell, or
# distribute this software, either in source code form or as a compiled
# binary, for any purpose, commercial or non-commercial, and by any
# means.
#
# In jurisdictions that recognize copyright laws, the author or authors
# of this software dedicate any and all copyright interest in the
# software to the public domain. We make this dedication for the benefit
# of the public at large and to the detriment of our heirs and
# successors. We intend this dedication to be an overt act of
# relinquishment in perpetuity of all present and future rights to this
# software under copyright law.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
# For more information, please refer to
import argparse
import array
import errno
import itertools
import re
import sys
import zmq
import pkg_resources
__version__ = pkg_resources.require("zmqc")[0].version
class ParserError(Exception):
"""An exception which occurred when parsing command-line arguments."""
pass
parser = argparse.ArgumentParser(
prog='zmqc',
usage=
"%(prog)s [-h] [-v] [-0] [-r | -w] (-b | -c)\n "
"SOCK_TYPE [-o SOCK_OPT=VALUE...]\n "
"address [address ...]",
description="zmqc is a small but powerful command-line interface to "
"ZeroMQ. It allows you to create a socket of a given type, bind or "
"connect it to multiple addresses, set options on it, and receive or send "
"messages over it using standard I/O, in the shell or in scripts.",
epilog="This is free and unencumbered software released into the public "
"domain. For more information, please refer to .",
)
parser.add_argument('--version', action='version', version=__version__)
parser.add_argument('-0',
dest='delimiter', action='store_const',
const='\x00', default='\n',
help="Separate messages on input/output should be "
"delimited by NULL characters (instead of newlines). Use "
"this if your messages may contain newlines, and you want "
"to avoid ambiguous message borders.")
parser.add_argument('-n', metavar='NUM',
dest='number', type=int, default=None,
help="Receive/send only NUM messages. By default, zmqc "
"lives forever in 'read' mode, or until the end of input "
"in 'write' mode.")
mode_group = parser.add_argument_group(
title='Mode',
description="Whether to read from or write to the socket. For PUB/SUB "
"sockets, this option is invalid since the behavior will always be write "
"and read respectively. For REQ/REP sockets, zmqc will alternate between "
"reading and writing as part of the request/response cycle.")
mode = mode_group.add_mutually_exclusive_group(required=False)
mode.add_argument('-r', '--read',
dest='mode', action='store_const', const='r',
help="Read messages from the socket onto stdout.")
mode.add_argument('-w', '--write',
dest='mode', action='store_const', const='w',
help="Write messages from stdin to the socket.")
behavior_group = parser.add_argument_group(title='Behavior')
behavior = behavior_group.add_mutually_exclusive_group(required=True)
behavior.add_argument('-b', '--bind',
dest='behavior', action='store_const', const='bind',
help="Bind to the specified address(es).")
behavior.add_argument('-c', '--connect',
dest='behavior', action='store_const', const='connect',
help="Connect to the specified address(es).")
sock_params = parser.add_argument_group(title='Socket parameters')
sock_type = sock_params.add_argument('sock_type', metavar='SOCK_TYPE',
choices=('PUSH', 'PULL', 'PUB', 'SUB', 'REQ', 'REP', 'PAIR'), type=str.upper,
help="Which type of socket to create. Must be one of 'PUSH', 'PULL', "
"'PUB', 'SUB', 'REQ', 'REP' or 'PAIR'. See `man zmq_socket` for an "
"explanation of the different types. 'DEALER' and 'ROUTER' sockets are "
"currently unsupported.")
sock_opts = sock_params.add_argument('-o', '--option',
metavar='SOCK_OPT=VALUE', dest='sock_opts', action='append', default=[],
help="Socket option names and values to set on the created socket. "
"Consult `man zmq_setsockopt` for a comprehensive list of options. Note "
"that you can safely omit the 'ZMQ_' prefix from the option name. If the "
"created socket is of type 'SUB', and no 'SUBSCRIBE' options are given, "
"the socket will automatically be subscribed to everything.")
addresses = sock_params.add_argument('addresses', nargs='+', metavar='address',
help="One or more addresses to bind/connect to. Must be in full ZMQ "
"format (e.g. 'tcp://:')")
def read_until_delimiter(stream, delimiter):
"""
Read from a stream until a given delimiter or EOF, or raise EOFError.
>>> io = StringIO("abcXdefgXfoo")
>>> read_until_delimiter(io, "X")
"abc"
>>> read_until_delimiter(io, "X")
"defg"
>>> read_until_delimiter(io, "X")
"foo"
>>> read_until_delimiter(io, "X")
Traceback (most recent call last):
...
EOFError
"""
output = array.array('c')
c = stream.read(1)
while c and c != delimiter:
output.append(c)
c = stream.read(1)
if not (c or output):
raise EOFError
return output.tostring()
def get_sockopts(sock_opts):
"""
Turn a list of 'OPT=VALUE' into a list of (opt_code, value).
Work on byte string options:
>>> get_sockopts(['SUBSCRIBE=', 'SUBSCRIBE=abc'])
[(6, ''), (6, 'abc')]
Automatically convert integer options to integers:
>>> zmqc.get_sockopts(['LINGER=0', 'LINGER=-1', 'LINGER=50'])
[(17, 0), (17, -1), (17, 50)]
Spew on invalid input:
>>> zmqc.get_sockopts(['LINGER=foo'])
Traceback (most recent call last):
...
zmqc.ParserError: Invalid value for option LINGER: 'foo'
>>> zmqc.get_sockopts(['NONEXISTENTOPTION=blah'])
Traceback (most recent call last):
...
zmqc.ParserError: Unrecognised socket option: 'NONEXISTENTOPTION'
"""
try:
import zmq.sugar as optslib
except:
import zmq.core.constants as optslib
option_coerce = {
int: set(optslib.int_sockopts).union(
optslib.int64_sockopts),
str: set(optslib.bytes_sockopts)
}
options = []
for option in sock_opts:
match = re.match(r'^([A-Z_]+)\=(.*)$', option)
if not match:
raise ParserError("Invalid option spec: %r" % match)
opt_name = match.group(1)
if opt_name.startswith('ZMQ_'):
opt_name = opt_name[4:]
try:
opt_code = getattr(optslib.constants, opt_name.upper())
except AttributeError:
raise ParserError("Unrecognised socket option: %r" % (
match.group(1),))
opt_value = match.group(2)
for converter, opt_codes in option_coerce.iteritems():
if opt_code in opt_codes:
try:
opt_value = converter(opt_value)
except (TypeError, ValueError):
raise ParserError("Invalid value for option %s: %r" % (
opt_name, opt_value))
break
options.append((opt_code, opt_value))
return options
def main():
args = parser.parse_args()
# Do some initial validation which is more complex than what can be
# specified in the argument parser alone.
if args.sock_type == 'SUB' and args.mode == 'w':
parser.error("Cannot write to a SUB socket")
elif args.sock_type == 'PUB' and args.mode == 'r':
parser.error("Cannot read from a PUB socket")
elif args.mode is not None and args.sock_type in ('REQ', 'REP'):
parser.error("Cannot choose a read/write mode with a %s socket" %
args.sock_type)
elif args.mode is None and args.sock_type not in ('REQ', 'REP'):
parser.error("one of the arguments -r/--read -w/--write is required")
# We also have to work around the fact that 'required' mutually exclusive
# groups are not enforced when you put them in an argument group other
# than the top-level parser.
if args.behavior is None:
parser.error("one of the arguments -b/--bind -c/--connect is required")
context = zmq.Context.instance()
sock = context.socket(getattr(zmq, args.sock_type))
# Set any specified socket options.
try:
sock_opts = get_sockopts(args.sock_opts)
except ParserError as exc:
parser.error(str(exc))
else:
for opt_code, opt_value in sock_opts:
sock.setsockopt(opt_code, opt_value)
# If we have a 'SUB' socket that's not explicitly subscribed to
# anything, subscribe it to everything.
if (sock.socket_type == zmq.SUB and
not any(opt_code == zmq.SUBSCRIBE
for (opt_code, _) in sock_opts)):
sock.setsockopt(zmq.SUBSCRIBE, '')
# Bind or connect to the provided addresses.
for address in args.addresses:
getattr(sock, args.behavior)(address)
# Live forever if no `-n` argument was given, otherwise die after a fixed
# number of messages.
if args.number is None:
iterator = itertools.repeat(None)
else:
iterator = itertools.repeat(None, args.number)
try:
if args.sock_type == 'REQ':
req_loop(iterator, sock, args.delimiter, sys.stdin, sys.stdout)
elif args.sock_type == 'REP':
rep_loop(iterator, sock, args.delimiter, sys.stdin, sys.stdout)
elif args.mode == 'r':
read_loop(iterator, sock, args.delimiter, sys.stdout)
elif args.mode == 'w':
write_loop(iterator, sock, args.delimiter, sys.stdin)
except StopIteration:
# StopIteration is a sentinel for end of input, iterator exhaustion
# (that is, we've processed the maximum number of messages) or Ctrl-C.
# All need to be handled in the same way.
return
finally:
sock.close()
def req_loop(iterator, sock, delimiter, input, output):
"""Write/read interaction for a REQ socket."""
for _ in iterator:
write(sock, delimiter, input)
read(sock, delimiter, output)
def rep_loop(iterator, sock, delimiter, input, output):
"""Read/write interaction for a REP socket."""
for _ in iterator:
read(sock, delimiter, output)
write(sock, delimiter, input)
def read_loop(iterator, sock, delimiter, output):
"""Continously get messages from the socket and print them on output."""
for _ in iterator:
read(sock, delimiter, output)
def write_loop(iterator, sock, delimiter, input):
"""Continously get messages from input and send them through a socket."""
for _ in iterator:
write(sock, delimiter, input)
def read(sock, delimiter, output):
"""Read one message from a socket onto an output stream."""
try:
message = sock.recv()
output.write(message + delimiter)
output.flush()
except KeyboardInterrupt:
raise StopIteration
except IOError as exc:
if exc.errno == errno.EPIPE:
raise StopIteration
raise
def write(sock, delimiter, input):
"""Write one message from an input stream into a socket."""
try:
message = read_until_delimiter(input, delimiter)
sock.send(message)
except (KeyboardInterrupt, EOFError):
raise StopIteration
if __name__ == '__main__':
main()