#!/usr/bin/env node /* vim: set ft=javascript: */ /* * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ /* * Copyright 2020 Joyent, Inc. */ /* * fastserve: command-line fast server for demo and testing */ var mod_assertplus = require('assert-plus'); var mod_artedi = require('artedi'); var mod_bunyan = require('bunyan'); var mod_cmdutil = require('cmdutil'); var mod_getopt = require('posix-getopt'); var mod_fastdemo = require('../lib/demo_server'); var mod_fast = require('../lib/fast'); var mod_jsprim = require('jsprim'); var mod_kang = require('kang'); var mod_net = require('net'); var mod_os = require('os'); var mod_restify = require('restify'); var mod_util = require('util'); var OPTS = { '-p,--port': 'fast RPC listening port(default: 2030)', '-m,--mon-port': 'monitoring server listening port (default: port+800)', '-q,--quiesce': 'enable quiesce (default: false)' }; /* construct the usage message */ var usageMessage = ' OPTIONS'; Object.keys(OPTS).forEach(function (arg) { usageMessage += mod_util.format('\n\t%s\t%s', arg, OPTS[arg]); }); function main() { var option; /* default configuration values - monitorPort is set later */ var config = { 'quiesce': false, 'fastPort': 2030 }; mod_cmdutil.configure({ 'synopses': [ '[OPTIONS]' ], 'usageMessage': usageMessage }); mod_cmdutil.exitOnEpipe(); var parser = new mod_getopt.BasicParser('p:(port)m:(mon-port)' + 'q(quiesce)', process.argv); while ((option = parser.getopt()) !== undefined) { switch (option.option) { case 'q': config.quiesce = true; break; case 'p': config.fastPort = mod_jsprim.parseInteger(option.optarg); break; case 'm': config.monitorPort = mod_jsprim.parseInteger(option.optarg); break; default: mod_assertplus.equal('?', option.option); mod_cmdutil.usage(); break; } } /* set default monitoring port if none provided */ if (!config.monitorPort) { config.monitorPort = config.fastPort + 800; } function isInvalidPort(port) { return (isNaN(port) || port < 1 || port > 65535); } /* validate port configuration */ if (isInvalidPort(config.fastPort)) { mod_cmdutil.usage('invalid fast TCP port: %s\n', config.fastPort); } if (isInvalidPort(config.monitorPort)) { mod_cmdutil.usage('invalid monitoring TCP port: %s\n', config.monitorPort); } if (config.fastPort === config.monitorPort) { mod_cmdutil.usage('fast and monitoring TCP ports must differ:' + ' %s\n', config.fastPort); } if (parser.optind() !== process.argv.length) { mod_cmdutil.usage( 'Positional arguments found when none were expected: %s', process.argv.slice(parser.optind()).join(' ')); } fastDemoServer(config); } function fastDemoServer(args) { var fastPort, log, sock, collector, fastserver; var monitorPort; mod_assertplus.object(args, 'args'); mod_assertplus.number(args.fastPort, 'args.fastPort'); mod_assertplus.number(args.monitorPort, 'args.monitorPort'); mod_assertplus.bool(args.quiesce, 'args.quiesce'); collector = mod_artedi.createCollector({ 'labels': { 'component': 'fastserve' } }); log = new mod_bunyan({ 'name': 'fastserve', 'level': process.env['LOG_LEVEL'] || 'trace' }); log.info('starting fast server'); sock = mod_net.createServer({ 'allowHalfOpen': true }); fastserver = new mod_fast.FastServer({ 'log': log, 'collector': collector, 'server': sock }); mod_fastdemo.demoRpcs().forEach(function (r) { fastserver.registerRpcMethod(r); }); fastPort = args.fastPort; monitorPort = args.monitorPort; sock.listen(fastPort, function () { var nsigs = 0; log.info({ 'fastPort': fastPort }, 'listening for fast requests'); var kangOpts = { 'uri_base': '/kang', 'service_name': 'fastserve', 'version': '1.0.0', 'ident': mod_os.hostname() + '/' + process.pid, 'list_types': fastserver.kangListTypes.bind(fastserver), 'list_objects': fastserver.kangListObjects.bind(fastserver), 'get': fastserver.kangGetObject.bind(fastserver), 'stats': fastserver.kangStats.bind(fastserver) }; var monitor_server = mod_restify.createServer({ name: 'monitor' }); monitor_server.get('/metrics', function (req, res, next) { req.on('end', function () { collector.collect(mod_artedi.FMT_PROM, function (err, metrics) { if (err) { next(err); return; } res.setHeader('Content-Type', 'text/plain; version=0.0.4'); res.send(metrics); next(); }); }); req.resume(); }); monitor_server.get('/kang/.*', mod_kang.knRestifyHandler(kangOpts)); monitor_server.listen(monitorPort, '0.0.0.0', function () { log.info({ 'monitorPort': monitorPort }, 'listening for kang and metric requests'); }); process.on('SIGINT', function () { if (++nsigs == 1) { sock.close(); function shutdown() { monitor_server.close(); fastserver.close(); } if (args.quiesce) { log.info('quiescing server'); fastserver.onConnsDestroyed(shutdown); } else { shutdown(); } } }); }); } main();