#!/usr/bin/env python3
# FemtoSIP -- A minimal SIP client
# Copyright (C) 2017-2023 Andreas Stöckel
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
import collections
import hashlib
import socket
import select
import random
import re
import time
import logging
# Create the logger
logger = logging.getLogger('femtosip')
###############################################################################
# Helper functions #
###############################################################################
def format_sip_header_field(key):
"""
Brings SIP header fields to a canonical form. E.g. 'content-length' becomes
'Content-Length', cseq becomes 'CSeq' and call-id becomes 'Call-ID'.
"""
if isinstance(key, bytes) or isinstance(key, bytearray):
key = str(key, 'ascii')
key = key.lower()
# Special cases
if key == 'call-id':
return 'Call-ID'
elif key == 'cseq':
return 'CSeq'
elif key == 'www-authenticate':
return 'WWW-Authenticate'
# Generic case
res = ''
for i in range(len(key)):
if i == 0 or key[i - 1] == '-':
res += key[i].upper()
else:
res += key[i]
return res
###############################################################################
# Digest handling (see RFC 7616) #
###############################################################################
class DigestTokenizer:
def __init__(self):
self.buf = ""
self.tokens = []
self.status = 0
def emit(self, special=None):
if self.buf or (self.status != 0):
self.tokens.append((False, self.buf))
if special is not None:
self.tokens.append((True, special))
self.buf = ""
def feed(self, data, last=True):
STATUS_INITIAL = 0
STATUS_IN_QUOTES = 1
STATUS_IN_QUOTES_ESCAPE = 2
for i in range(len(data)):
is_ws = data[i].isspace()
if self.status == STATUS_INITIAL:
if is_ws:
self.emit()
elif (data[i] == '=') or (data[i] == ","):
self.emit(data[i])
elif data[i] == "\"":
self.emit()
self.status = STATUS_IN_QUOTES
else:
self.buf += data[i]
elif self.status == STATUS_IN_QUOTES:
if data[i] == "\"":
self.emit()
self.status = STATUS_INITIAL
elif data[i] == "\\":
self.status = STATUS_IN_QUOTES_ESCAPE
else:
self.buf += data[i]
elif self.status == STATUS_IN_QUOTES_ESCAPE:
self.buf += data[i]
self.status = STATUS_IN_QUOTES
if last:
self.emit()
return self.tokens
@staticmethod
def tokenize(data):
return DigestTokenizer().feed(data)
def parse_digest(data):
ST_INITIAL, ST_KEY, ST_EQ, ST_VALUE, ST_COMMA = 0, 1, 2, 3, 4
tokens = DigestTokenizer.tokenize(data)
status, fields, key = ST_INITIAL, {}, None
for token in tokens:
if status == ST_INITIAL:
if token[0] or token[1].lower() != "digest":
return None
elif status == ST_KEY:
if token[0]:
return None
key = token[1].lower()
elif status == ST_EQ:
if (not token[0]) or (token[1] != "="):
return None
elif status == ST_VALUE:
if token[0]:
return None
fields[key] = token[1]
elif status == ST_COMMA:
if (not token[0]) or (token[1] != ","):
return None
status = max(1, (status + 1) % 5) # Wrap around back to ST_KEY
return fields
def digest_response(user, password, realm, nonce, method, uri):
ha1 = hashlib.md5()
ha1.update(
user.encode('utf-8') + b':' +
realm.encode('utf-8') + b':' +
password.encode('utf-8')
)
ha2 = hashlib.md5()
ha2.update(
method.upper().encode('utf-8') + b':' +
uri.encode('utf-8')
)
res = hashlib.md5()
res.update(
ha1.hexdigest().lower().encode('ascii') + b':' +
nonce.encode('utf-8') + b':' +
ha2.hexdigest().lower().encode('ascii')
)
return res.hexdigest().lower()
##############################################################################
# SIP response (i.e., HTTP-like header) parser #
##############################################################################
class ResponseParser:
"""
Parses HTTP-like response headers (such as responses from a SIP server).
"""
def __init__(self):
self.reset()
def reset(self):
self._had_lf = False
self._n_linebreaks = 0
self._status = 0
self._key = bytearray()
self._value = bytearray()
self._content_length = 0
self._skip_ws = True
self.protocol = bytearray()
self.code = bytearray()
self.message = bytearray()
self.fields = collections.OrderedDict()
self.body = bytearray()
def feed(self, data, callback):
"""
Parses the given bytes/bytearray and calls the given callback with each
parsed response. First and single parameter to the callback function is
this parser instance. Access the protocol, code, message, fields and
body member variables for the parsed content.
"""
response = [False]
def call_callback():
# Convert the message code to an integer
try:
self.code = int(str(self.code, 'ascii'))
except:
logger.error('Received invalid response code')
self.code = -1
# Convert the protocol and the message to a string
try:
self.protocol = str(self.protocol, 'ascii')
self.message = str(self.message, 'ascii')
except:
logger.error('Invalid protocol or message')
# Convert the body to "bytes"
self.body = bytes(self.body)
# Call the callback
response[0] = True
self._status = STATUS_INITIAL
callback(self)
STATUS_INITIAL = 0
STATUS_PROTOCOL = 1
STATUS_CODE = 2
STATUS_MESSAGE = 3
STATUS_HEADER_KEY = 4
STATUS_HEADER_VALUE = 5
STATUS_BODY = 6
# Iterate over the initial bytes and assemble the result
i = 0
while i < len(data):
# Skip whitespace if requested
is_ws = (data[i] == b' '[0] or data[i] == b'\t'[0])
if self._skip_ws and is_ws:
i += 1
continue
self._skip_ws = False
# Re-initialise
if self._status == STATUS_INITIAL:
self.reset()
self._status = STATUS_PROTOCOL
continue
# Handle linebreaks
if self._status != STATUS_BODY:
# Handle '\r'
if data[i] == b'\r'[0]:
self._had_lf = True
i += 1
continue
# Handle '\n\r'
if data[i] == b'\n'[0] and self._had_lf:
# Go to the next state if a line-feed is found
if self._status < STATUS_HEADER_KEY:
self._status = STATUS_HEADER_KEY
elif (self._status == STATUS_HEADER_KEY or
self._status == STATUS_HEADER_VALUE):
if len(self._key) > 0:
try:
key = format_sip_header_field(self._key)
if key == 'Content-Length':
try:
self._content_length = int(self._value)
except:
logger.error('Received invalid Content-Length')
self._content_length = 0
self.fields[key] = bytes(self._value.strip())
except:
logger.error('Invalid header key')
raise
self._key = bytearray()
self._value = bytearray()
self._status = STATUS_HEADER_KEY
# Count linebreaks, body starts with the second linebreak
self._n_linebreaks += 1
if self._n_linebreaks == 2:
self._status = STATUS_BODY
# We've handled this character, continue
i += 1
continue
# This is not a linebreak, reset the linebreak data
self._had_lf = False
self._n_linebreaks = 0
# Switch between fields in the first response line
if self._status < STATUS_MESSAGE and is_ws:
self._skip_ws = True
self._status += 1
continue
# Switch betwen states
if self._status == STATUS_PROTOCOL:
self.protocol.append(data[i])
elif self._status == STATUS_CODE:
self.code.append(data[i])
elif self._status == STATUS_MESSAGE:
self.message.append(data[i])
elif self._status == STATUS_HEADER_KEY:
if (data[i] == b':'[0]):
i += 1
self._skip_ws = True
self._status = STATUS_HEADER_VALUE
continue
self._key.append(data[i])
elif self._status == STATUS_HEADER_VALUE:
self._value.append(data[i])
elif self._status == STATUS_BODY:
if self._content_length > 0:
self.body.append(data[i])
self._content_length -= 1
if self._content_length == 0:
i += 1
call_callback()
continue
# Increase the read pointer by one
i += 1
# Explicitly call the callback if this is the end of the data
if self._status == STATUS_BODY and self._content_length == 0:
call_callback()
# Return true if the callback has been called
return response[0]
###############################################################################
# Main SIP state-machine #
###############################################################################
class SIP:
"""
The SIP class implements a minimal, incomplete, and likely broken SIP
endpoint that is capable of initiating a phone call to a third party
(without any NAT traversal).
"""
ALLOW = 'INVITE, ACK, CANCEL, OPTIONS, BYE, REFER, NOTIFY, MESSAGE, SUBSCRIBE, INFO'
def __init__(self, user, password, gateway, port, display_name=None, local_ip = None, protocol = "tcp"):
# Copy the parameters
self.user = user
self.password = password
self.local_ip, self.local_port = local_ip, None # Allow IP to be specified. Enables use on WAN.
self.gateway = gateway
self.port = port
self.display_name = display_name
self.protocol = protocol
# Create the response parser instance
self.response_parser = ResponseParser()
# Initilise the session parameters
self.seq = 0
self.session_id = self.make_random_digits(4)
self.session_version = self.make_random_digits(4)
@staticmethod
def make_sip_packet(method, uri, fields, data=b''):
"""
The sip_packet method assembles a single sip_packet consisting of a
request method, request uri, header fields, and some payload data.
The ContentLength header field is automatically added to the header
depending on the given payload data. Returns a bytes instance containg
the bytes that should be sent to the server.
method: is the request method. Will be converted to upper-case and
encoded as ascii.
uri: is the request URI that will be appended to the method. Will be
converted to ascii.
fields: is a dict containing the header fields as key-value pairs.
data: is optional payload data. Must by a bytes object.
"""
res = (method.upper().encode('ascii') + b' ' +
uri.encode('ascii') + b' SIP/2.0\r\n')
for key, value in fields.items():
res += format_sip_header_field(key).encode('ascii') + b': ' + value.encode('utf-8') + b'\r\n'
res += b'Content-Length: ' + str(len(data)).encode('ascii') + b'\r\n\r\n'
res += data
return res
@staticmethod
def make_random_digits(len=10):
res = ''
for i in range(len):
res += str(random.randint(1 if i == 0 else 0, 9))
return res
def make_branch(self):
return 'z9hG4bK' + self.make_random_digits(10)
def make_from_field(self, remote_host, tag):
from_field = ';tag=' + tag
if self.display_name:
from_field = '"' + self.display_name + '" ' + from_field
return from_field
def make_invite_sip_packet(self,
remote_id, remote_host,
branch, tag, call_id, seq, realm=None, nonce=None):
# Assemble the request uri
uri = 'sip:' + remote_id + '@' + remote_host
# Assemble the header fields
fields = collections.OrderedDict()
fields['Via'] = (
'SIP/2.0/' + self.protocol.upper() + ' ' + self.local_ip_header + ':' + str(self.port) +
';rport;branch=' + branch)
fields['From'] = self.make_from_field(remote_host, tag)
fields['To'] = (
'')
fields['Call-ID'] = str(call_id)
fields['CSeq'] = str(seq) + ' INVITE'
fields['Contact'] = (
'')
fields['Content-Type'] = 'application/sdp'
fields['Allow'] = self.ALLOW
fields['Max-Forwards'] = '70'
if (not realm is None) and (not nonce is None):
fields['Authorization'] = (
'Digest username=\"' + self.user + "\", " +
"realm=\"" + realm + "\", " +
"nonce=\"" + nonce + "\", " +
"uri=\"" + uri + "\", " +
"response=\"" + digest_response(
self.user, self.password,
realm, nonce, 'INVITE', uri) + "\", " +
"algorithm=\"MD5\"")
return self.make_sip_packet('INVITE', uri, fields)
def make_cancel_sip_packet(self, remote_id, remote_host, branch, tag, call_id, seq):
# Assemble the request uri
uri = 'sip:' + remote_id + '@' + remote_host
# Assemble the header fields
fields = collections.OrderedDict()
fields['Via'] = (
'SIP/2.0/' + self.protocol.upper() + ' ' + self.local_ip_header + ':' + str(self.port) +
';rport;branch=' + branch)
fields['From'] = self.make_from_field(remote_host, tag)
fields['To'] = (
'')
fields['Call-ID'] = str(call_id)
fields['CSeq'] = str(seq) + ' CANCEL'
fields['Max-Forwards'] = '70'
return self.make_sip_packet('CANCEL', uri, fields)
def make_bye_sip_packet(self, remote_id, remote_host, branch, tag, remote_tag, call_id, seq):
# Assemble the request uri
uri = 'sip:' + remote_id + '@' + remote_host
# Assemble the header fields
fields = collections.OrderedDict()
fields['Via'] = (
'SIP/2.0/' + self.protocol.upper() + ' ' + self.local_ip_header + ':' + str(self.port) +
';rport;branch=' + branch)
fields['From'] = self.make_from_field(remote_host, tag)
fields['To'] = (
';tag=' + remote_tag)
fields['Call-ID'] = str(call_id)
fields['CSeq'] = str(seq) + ' BYE'
fields['Max-Forwards'] = '70'
return self.make_sip_packet('BYE', uri, fields)
def make_socket(self):
if self.protocol.lower() == "udp":
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
elif self.protocol.lower() == "tcp":
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
elif self.protocol.lower() == "udp6":
sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
elif self.protocol.lower() == "tcp6":
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
else:
raise Exception('Unsupported protocol.')
sock.connect((self.gateway,self.port))
if self.local_ip:
self.local_port = sock.getsockname()[1]
else:
self.local_ip, self.local_port = sock.getsockname()[0:2]
sock.setblocking(0)
return sock
def call(self, remote_id, delay=15.0, timeout=1.0):
# Generate a call_id and increase the sequence number
self.seq += 1
tag = self.make_random_digits()
call_id = self.make_random_digits()
# Object containing the state of the s
state = {
'done': False,
'status': 'send_invite',
'tries': 0,
'realm': None,
'nonce': None,
'last_request': time.time(),
'delay_start': 0,
'remote_tag': None,
}
def error(msg):
logger.error(msg)
state['done'] = True
# Function advancing the state machine
def handle_response(res):
logger.info('response: ' + res.protocol + ' ' +
str(res.code) + ' ' + res.message)
# Handle the individual response codes
if res.code == 401:
# Increment the number of tries
state['tries'] += 1
# Abort if we get more than one authentication error in a row
if state['tries'] > 1:
error('Authentication failed. Check password and username.')
return
# Read realm and nonce
if not 'WWW-Authenticate' in res.fields:
error('Did not find "WWW-Authenticate" field')
return
auth = str(res.fields['WWW-Authenticate'], 'ascii')
auth_fields = parse_digest(auth)
if (auth_fields is None) or ('realm' not in auth_fields) or ('nonce' not in auth_fields):
error('Could not parse "WWW-Authenticate" header, authentication methods other than digest are not supported.')
state['realm'] = auth_fields['realm']
state['nonce'] = auth_fields['nonce']
# Try again
self.seq += 1
if state['status'].startswith('done_'):
state['status'] = state['status'][5:]
elif res.code == 100 or res.code == 101:
# Ignore this response, everything is fine
pass
elif res.code == 183 or res.code == 180:
if not 'From' in res.fields:
error('Did not find "To" field')
return
state['status'] = 'delay' # Phones are ringing, wait
state['delay_start'] = time.time()
elif res.code == 603: # Decline
state['done'] = True
elif res.code == 200:
if state['status'] == 'delay':
self.seq += 1
state['remote_tag'] = str(res.fields['To'].split(b';', 2)[-1].split(b'=', 2)[-1], 'ascii')
state['status'] = 'send_bye'
if state['status'] == 'done_send_bye':
state['done'] = True
elif res.code == 487 and state['status'] == 'done_send_cancel':
state['done'] = True
elif res.code >= 400:
error('Unhandled error.')
state['done'] = True
writebuf = bytearray()
with self.make_socket() as sock:
# IPv6 heuristics
self.protocol = self.protocol[:3]
self.local_ip_header = '[' + self.local_ip + ']' if ':' in self.local_ip else self.local_ip
self.gateway_header = '[' + self.gateway + ']' if ':' in self.gateway else self.gateway
while not state['done']:
now = time.time()
try:
if state['status'] == 'send_invite':
logger.info('request: INVITE sip:'
+ remote_id + '@' + self.gateway_header)
branch = self.make_branch()
writebuf += self.make_invite_sip_packet(
remote_id, self.gateway_header,
branch, tag, call_id, self.seq,
state['realm'], state['nonce'])
state['status'] = 'done_send_invite'
elif state['status'] == 'send_cancel':
logger.info('request: CANCEL sip:'
+ remote_id + '@' + self.gateway_header)
writebuf += self.make_cancel_sip_packet(
remote_id, self.gateway_header,
branch, tag, call_id, self.seq)
state['status'] = 'done_send_cancel'
elif state['status'] == 'send_bye':
logger.info('request: BYE sip:'
+ remote_id + '@' + self.gateway_header)
branch = self.make_branch()
writebuf += self.make_bye_sip_packet(
remote_id, self.gateway_header,
branch, tag, state['remote_tag'],
call_id, self.seq)
state['status'] = 'done_send_bye'
elif state['status'] == 'delay':
if now - state['delay_start'] > delay:
state['status'] = 'send_cancel'
elif now - state['last_request'] > timeout:
error('Timeout while waiting for server response')
# Check whether we can read or write from the socket
can_read, can_write, in_error = \
select.select([sock], [sock], [sock], 10e-3)
if len(in_error) > 0:
error('Socket error')
else:
if len(can_read) > 0:
readbuf = sock.recv(4096)
self.response_parser.feed(readbuf, handle_response)
if len(can_write) > 0 and len(writebuf) > 0:
state['last_request'] = time.time()
sent = sock.send(writebuf)
if sent == 0:
error('Error while writing to socket')
writebuf = writebuf[sent:]
except KeyboardInterrupt:
if state['status'] == 'delay':
state['status'] = 'send_cancel'
else:
state['done'] = True
#
# Main program
#
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(
description='A microscopic SIP client that can be used to ring a ' +
'phone.')
parser.add_argument('--gateway', required=True,
help='Hostname or IP address of the SIP server')
parser.add_argument('--port', default=5060, type=int,
help='Port of the SIP server (default 5060)')
parser.add_argument('--user', required=True,
help='Username used for authentication at the SIP server')
parser.add_argument('--password', default='',
help='Password used in conjunction with the user for authentication ' +
'at the SIP server. (default '')')
parser.add_argument('--displayname', default=None,
help='Alter the displayed caller name. (defaults to SIP configuration)')
parser.add_argument('--localip', default=None)
parser.add_argument('--protocol', default="tcp",
help='supported protocols: IPv4: udp/tcp, IPv6: udp6/tcp6')
parser.add_argument('--call', required=True,
help='Phone number of the endpoint that will be called')
parser.add_argument('--delay', default=15.0, type=float,
help='Pause in seconds until the call is canceled (default 15.0)')
parser.add_argument('--timeout', default=1.0, type=float,
help='Period in seconds the SIP server must respond within (default 1.0)')
args = parser.parse_args()
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
sip = SIP(args.user, args.password, args.gateway, args.port, args.displayname, args.localip, args.protocol)
sip.call(args.call, args.delay, args.timeout)