# Copyright 2014 Google Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy # of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. from twisted.trial import unittest from txws import (is_hybi00, complete_hybi00, make_hybi00_frame, parse_hybi00_frames, http_headers, make_accept, mask, CLOSE, NORMAL, PING, PONG, parse_hybi07_frames) class TestHTTPHeaders(unittest.TestCase): def test_single_header(self): raw = "Connection: Upgrade" headers = http_headers(raw) self.assertTrue("Connection" in headers) self.assertEqual(headers["Connection"], "Upgrade") def test_single_header_newline(self): raw = "Connection: Upgrade\r\n" headers = http_headers(raw) self.assertEqual(headers["Connection"], "Upgrade") def test_multiple_headers(self): raw = "Connection: Upgrade\r\nUpgrade: WebSocket" headers = http_headers(raw) self.assertEqual(headers["Connection"], "Upgrade") self.assertEqual(headers["Upgrade"], "WebSocket") def test_origin_colon(self): """ Some headers have multiple colons in them. """ raw = "Origin: http://example.com:8080" headers = http_headers(raw) self.assertEqual(headers["Origin"], "http://example.com:8080") class TestKeys(unittest.TestCase): def test_make_accept_rfc(self): """ Test ``make_accept()`` using the keys listed in the RFC for HyBi-07 through HyBi-10. """ key = "dGhlIHNhbXBsZSBub25jZQ==" self.assertEqual(make_accept(key), "s3pPLMBiTxaQ9kYGzzhZRbK+xOo=") def test_make_accept_wikipedia(self): """ Test ``make_accept()`` using the keys listed on Wikipedia. """ key = "x3JJHMbDL1EzLkh9GBhXDw==" self.assertEqual(make_accept(key), "HSmrc0sMlYUkAGmm5OPpG2HaGWk=") class TestHyBi00(unittest.TestCase): def test_is_hybi00(self): headers = { "Sec-WebSocket-Key1": "hurp", "Sec-WebSocket-Key2": "derp", } self.assertTrue(is_hybi00(headers)) def test_is_hybi00_false(self): headers = { "Sec-WebSocket-Key1": "hurp", } self.assertFalse(is_hybi00(headers)) def test_complete_hybi00_wikipedia(self): """ Test complete_hybi00() using the keys listed on Wikipedia's WebSockets page. """ headers = { "Sec-WebSocket-Key1": "4 @1 46546xW%0l 1 5", "Sec-WebSocket-Key2": "12998 5 Y3 1 .P00", } challenge = "^n:ds[4U" self.assertEqual(complete_hybi00(headers, challenge), b"8jKS'y:G*Co,Wxa-") def test_make_hybi00(self): """ HyBi-00 frames are really, *really* simple. """ self.assertEqual(make_hybi00_frame("Test!"), b"\x00Test!\xff") def test_parse_hybi00_single(self): frame = b"\x00Test\xff" frames, buf = parse_hybi00_frames(frame) self.assertEqual(len(frames), 1) self.assertEqual(frames[0], (NORMAL, b"Test")) self.assertEqual(buf, b"") def test_parse_hybi00_multiple(self): frame = b"\x00Test\xff\x00Again\xff" frames, buf = parse_hybi00_frames(frame) self.assertEqual(len(frames), 2) self.assertEqual(frames[0], (NORMAL, b"Test")) self.assertEqual(frames[1], (NORMAL, b"Again")) self.assertEqual(buf, b"") def test_parse_hybi00_incomplete(self): frame = b"\x00Test" frames, buf = parse_hybi00_frames(frame) self.assertEqual(len(frames), 0) self.assertEqual(buf, b"\x00Test") def test_parse_hybi00_garbage(self): frame = b"trash\x00Test\xff" frames, buf = parse_hybi00_frames(frame) self.assertEqual(len(frames), 1) self.assertEqual(frames[0], (NORMAL, b"Test")) self.assertEqual(buf, b"") def test_socketio_crashers(self): """ A series of snippets which crash other WebSockets implementations (specifically, Socket.IO) are harmless to this implementation. """ frames = [ """[{"length":1}]""", """{"messages":[{"length":1}]}""", "hello", "hello", "hello", "{", "~m~EVJLFDJP~", ] for frame in frames: prepared = make_hybi00_frame(frame) frames, buf = parse_hybi00_frames(prepared) self.assertEqual(len(frames), 1) self.assertEqual(frames[0], (NORMAL, frame.encode('utf-8'))) self.assertEqual(buf, b"") class TestHyBi07Helpers(unittest.TestCase): """ HyBi-07 is best understood as a large family of helper functions which work together, somewhat dysfunctionally, to produce a mediocre Thanksgiving every other year. """ def test_mask_noop(self): key = b"\x00\x00\x00\x00" self.assertEqual(mask(b"Test", key), b"Test") def test_mask_noop_long(self): key = b"\x00\x00\x00\x00" self.assertEqual(mask(b"LongTest", key), b"LongTest") def test_parse_hybi07_unmasked_text(self): """ From HyBi-10, 4.7. """ frame = b"\x81\x05Hello" frames, buf = parse_hybi07_frames(frame) self.assertEqual(len(frames), 1) self.assertEqual(frames[0], (NORMAL, b"Hello")) self.assertEqual(buf, b"") def test_parse_hybi07_masked_text(self): """ From HyBi-10, 4.7. """ frame = b"\x81\x857\xfa!=\x7f\x9fMQX" frames, buf = parse_hybi07_frames(frame) self.assertEqual(len(frames), 1) self.assertEqual(frames[0], (NORMAL, b"Hello")) self.assertEqual(buf, b"") def test_parse_hybi07_unmasked_text_fragments(self): """ We don't care about fragments. We are totally unfazed. From HyBi-10, 4.7. """ frame = b"\x01\x03Hel\x80\x02lo" frames, buf = parse_hybi07_frames(frame) self.assertEqual(len(frames), 2) self.assertEqual(frames[0], (NORMAL, b"Hel")) self.assertEqual(frames[1], (NORMAL, b"lo")) self.assertEqual(buf, b"") def test_parse_hybi07_ping(self): """ From HyBi-10, 4.7. """ frame = b"\x89\x05Hello" frames, buf = parse_hybi07_frames(frame) self.assertEqual(len(frames), 1) self.assertEqual(frames[0], (PING, b"Hello")) self.assertEqual(buf, b"") def test_parse_hybi07_pong(self): """ From HyBi-10, 4.7. """ frame = b"\x8a\x05Hello" frames, buf = parse_hybi07_frames(frame) self.assertEqual(len(frames), 1) self.assertEqual(frames[0], (PONG, b"Hello")) self.assertEqual(buf, b"") def test_parse_hybi07_close_empty(self): """ A HyBi-07 close packet may have no body. In that case, it should use the generic error code 1000, and have no reason. """ frame = b"\x88\x00" frames, buf = parse_hybi07_frames(frame) self.assertEqual(len(frames), 1) self.assertEqual(frames[0], (CLOSE, (1000, b"No reason given"))) self.assertEqual(buf, b"") def test_parse_hybi07_close_reason(self): """ A HyBi-07 close packet must have its first two bytes be a numeric error code, and may optionally include trailing text explaining why the connection was closed. """ frame = b"\x88\x0b\x03\xe8No reason" frames, buf = parse_hybi07_frames(frame) self.assertEqual(len(frames), 1) self.assertEqual(frames[0], (CLOSE, (1000, b"No reason"))) self.assertEqual(buf, b"") def test_parse_hybi07_partial_no_length(self): frame = b"\x81" frames, buf = parse_hybi07_frames(frame) self.assertFalse(frames) self.assertEqual(buf, b"\x81") def test_parse_hybi07_partial_truncated_length_int(self): frame = b"\x81\xfe" frames, buf = parse_hybi07_frames(frame) self.assertFalse(frames) self.assertEqual(buf, b"\x81\xfe") def test_parse_hybi07_partial_truncated_length_double(self): frame = b"\x81\xff" frames, buf = parse_hybi07_frames(frame) self.assertFalse(frames) self.assertEqual(buf, b"\x81\xff") def test_parse_hybi07_partial_no_data(self): frame = b"\x81\x05" frames, buf = parse_hybi07_frames(frame) self.assertFalse(frames) self.assertEqual(buf, b"\x81\x05") def test_parse_hybi07_partial_truncated_data(self): frame = b"\x81\x05Hel" frames, buf = parse_hybi07_frames(frame) self.assertFalse(frames) self.assertEqual(buf, b"\x81\x05Hel")