From 5555adff2455a902d4716b5af89722a181fc864b Mon Sep 17 00:00:00 2001 From: cwshugg Date: Tue, 29 Mar 2022 08:45:31 -0400 Subject: [PATCH] moved tests between the 'extra' and 'malicious' category, and added a new check for whether or not malicious tests should be run --- tests/server_unit_test_pserv.py | 598 +++++++++++++++++--------------- 1 file changed, 309 insertions(+), 289 deletions(-) diff --git a/tests/server_unit_test_pserv.py b/tests/server_unit_test_pserv.py index ec4c185..0d2620d 100755 --- a/tests/server_unit_test_pserv.py +++ b/tests/server_unit_test_pserv.py @@ -468,6 +468,12 @@ class Single_Conn_Malicious_Case(Doc_Print_Test_Case): self.username = 'user0' self.password = 'thepassword' + # Prepare the a_string for query checks + self.a_string = "aaaaaaaaaaaaaaaa" + for x in range(0, 6): + self.a_string = self.a_string + self.a_string + + def setUp(self): """ Test Name: None -- setUp function\n\ Number Connections: N/A \n\ @@ -490,7 +496,49 @@ class Single_Conn_Malicious_Case(Doc_Print_Test_Case): # Close the requests session self.session.close() + def test_method_check_long(self): + """ Test Name: test_method_check_4\n\ + Number Connections: 1 \n\ + Procedure: Test a request using a long method:\n\ + aa....aaa /api/login HTTP/1.1 + """ + http_connection = HTTPConnection(hostname, port) + run_method_check(http_connection, self.a_string*2, self.hostname) + http_connection.close() + def test_method_check_4(self): + """ Test Name: test_method_check_4\n\ + Number Connections: 1 \n\ + Procedure: Test a request using a different method than GET:\n\ + ASD /api/login HTTP/1.1 + """ + http_connection = HTTPConnection(hostname, port) + run_method_check(http_connection, "ASD", self.hostname) + http_connection.close() + + def test_login_post_invalid_body(self): + """ Test Name: test_login_post_invalid_body\n + Number Connections: One \n\ + Procedure: Simple POST request:\n\ + POST /api/login HTTP/1.1 + + Run a check for login by providing an ill-formed body i.e. not JSON + for /api/login. Not checking for response text. + """ + http_connection = HTTPConnection(hostname, port) + # Ill-formed body for the request + data = '"username": "%s", "password": "%s"' % (self.username, self.password) + + # POST request for login + http_connection.request("POST", "/api/login", data) + + # Get the server response + server_response = http_connection.getresponse() + + # Check the response code (403 or 400) + self.assertTrue(server_response.status == FORBIDDEN or + server_response.status == BAD_REQUEST) + http_connection.close() def test_multi_connection_disconnect(self): """ Test Name: test_multi_connection_disconnect\n\ @@ -722,6 +770,253 @@ class Single_Conn_Malicious_Case(Doc_Print_Test_Case): sock.close() + def test_auth_browser_cookies(self): + """ Test Name: test_auth_browser_cookies + Number Connections: N/A + Procedure: Sends cookies that are actually taken from one a web browser + when connecting to courses.cs.vt.edu. They're valid cookies, but they + should not be recognized by the server as valid authentication. + Similar idea for test_auth_wrong_cookie. A failure here means the server + might have crashed or it served a private file despite without checking + for valid authentication. + """ + # set up a few different cookies to try (no, these don't actually work + # - don't try any session hijacking with these cookies ;), we made + # sure they are invalid) + cookies = [ + ["IDMSESSID", "9DD957C450BBCFE9D75022A05DC71D0E701FE23AF0DEE777090831C9FFD087FF0EE5704771BA11D02B3FA5CC13F20B4F8A6758A02768E160AE1E100A8D4BECCE"], + ["auth_token", "[\"hokiebird\"\054 \"Hokie Bird\"].Yhy5-g.HXxh5WxmTawBv_LHPaTLnXNkYiI|5b9df2848955b572910a6ff3d2c98d27febbe6a8949c18cde52c8c11c91ed5437f40accae8f8b77a41e335e83556a3670d5f5178d8ddd4f8eb83e1a82974ce4a"], + ["session", ".eJwlzsFKw0AQgOFXKXuuZXczm93psV4qFBEs2GAkzM7OJEVNIaG2IL67hV7_e_L9mk4nmQezVvqaZWm6YzFrQ947cJoLRslQAaDW3hlSql2JZKUUT5m9S1LFCEKRMaqVKFqSg0hellofmStWUATRANYGmzmRdRA4E6KU2musEFXZ5aBY2hiyoGRukPMs013z3hq-zMO571uzXLTm8TSOp2xxei8fq2ZowkO_2h6uQ3i7fu_plvnpdtsX2u_Gw_Nnc3zyf_8CpUfl.Yjo3NQ.m-n22sd9bMNXyvtXpIS6dZ85Cv4"] + ] + + # now, come up with various combinations of cookies to try + cookie_combos = [] + for c1 in cookies: + combo = [] + combo.append(c1) + cookie_combos.append([c1]) + for c2 in cookies: + if c1 != c2: + combo.append(c2) + cookie_combos.append(combo) + + # loop through each of the cookies + for combo in cookie_combos: + # clear the session cookies and set cookies + self.session.cookies.clear() + for cookie in combo: + self.session.cookies.set(cookie[0], cookie[1]) + + # try making a GET /api/login request + response = None + try: + response = self.session.get('http://%s:%s/api/login' % (self.hostname, self.port), timeout=2) + except requests.exception.RequestException: + raise AssertionError("The server did not respond within 2s") + + # make sure the correct response code was sent + if response.status_code != requests.codes.ok: + raise AssertionError("The server responded with %d instead of 200 OK for a GET /api/login request" % + response.status_code) + + # make sure the JSON data returned is empty + if response.text.strip() != "{}": + raise AssertionError("The server returned something other than an empty JSON object ({}) for a " + "GET /api/login request with invalid cookies. Received: '%s'" % response.text) + + # now, try making a request for a private file + response = None + try: + response = self.session.get('http://%s:%s/private/secure.html' % (self.hostname, self.port), timeout=2) + except requests.exception.RequestException: + raise AssertionError("The server did not respond within 2s") + + # make sure we didn't receive a 200 OK + if response.status_code == requests.codes.ok: + raise AssertionError("The server served a private file despite not being authenticated.") + + +############################################################################## +## Class: Single_Conn_Bad_Case +## Test cases that aim for various errors in well-formed queries. +############################################################################## + +class Single_Conn_Bad_Case(Doc_Print_Test_Case): + """ + Test case for a single connection, using bad requests that are + well formed. The tests are aptly named for describing their effects. + Each case should be handled gracefully and without the server crashing. + """ + + def __init__(self, testname, hostname, port): + """ + Prepare the test case for creating connections. + """ + super(Single_Conn_Bad_Case, self).__init__(testname) + self.hostname = hostname + self.port = port + + N = 10 + self.username = 'user0' + self.invalid_username = ''.join(random.choice(string.ascii_lowercase + string.ascii_uppercase) for _ in range(N)) + self.password = 'thepassword' + self.invalid_password = ''.join(random.choice(string.ascii_lowercase + string.ascii_uppercase) for _ in range(N)) + self.private_file = 'private/secure.html' + + def setUp(self): + """ Test Name: None -- setUp function\n\ + Number Connections: N/A \n\ + Procedure: Opens the HTTP connection to the server. An error here \ + means the script was unable to create a connection to the \ + server. + """ + # make a 'session' object (used by some of the tests that were moved + # here) + self.session = requests.Session() + + # Make HTTP connection for the server + self.http_connection = HTTPConnection(self.hostname, self.port) + + # Connect to the server + self.http_connection.auto_open = 0 + self.http_connection.connect() + + def tearDown(self): + """ Test Name: None -- tearDown function\n\ + Number Connections: N/A \n\ + Procedure: Closes the HTTP connection to the server. An error here \ + means the server crashed after servicing the request from \ + the previous test. + """ + # Close the HTTP connection + self.http_connection.close() + if server.poll() is not None: + # self.fail("The server has crashed. Please investigate.") + print("The server has crashed. Please investigate.") + + def test_404_not_found_1(self): + """ Test Name: test_404_not_found_1\n\ + Number Connections: 1 \n\ + Procedure: Test a simple GET request for an illegal object URL:\n\ + GET /junk HTTP/1.1 + """ + run_404_check(self.http_connection, "/api/junk", self.hostname) + + def test_404_not_found_2(self): + """ Test Name: test_404_not_found_2\n\ + Number Connections: 1 \n\ + Procedure: Test a simple GET request for an illegal object URL:\n\ + GET /api/login/api/login HTTP/1.1 + """ + run_404_check(self.http_connection, "/api/login/api/login", self.hostname) + + def test_404_not_found_3(self): + """ Test Name: test_404_not_found_3\n\ + Number Connections: 1 \n\ + Procedure: Test a simple GET request for an illegal object URL:\n\ + GET /api/logon HTTP/1.1 + """ + run_404_check(self.http_connection, "/api/logon", self.hostname) + + def test_404_not_found_4(self): + """ Test Name: test_404_not_found_4\n\ + Number Connections: 1 \n\ + Procedure: Test a simple GET request for an illegal object URL:\n\ + GET /api/api/login HTTP/1.1 + """ + run_404_check(self.http_connection, "/api/api/login", self.hostname) + + def test_404_not_found_5(self): + """ Test Name: test_404_not_found_5\n\ + Number Connections: 1 \n\ + Procedure: Test a simple GET request for an illegal object URL:\n\ + GET /api/loginjunk HTTP/1.1 + """ + run_404_check(self.http_connection, "/api/api", self.hostname) + + def test_404_not_found_6(self): + """ Test Name: test_404_not_found_6\n\ + Number Connections: 1 \n\ + Procedure: Test a simple GET request for an illegal object URL:\n\ + GET /api/loginjunk HTTP/1.1 + """ + run_404_check(self.http_connection, "/api/loginjunk", self.hostname) + + def test_404_not_found_7(self): + """ Test Name: test_404_not_found_7\n\ + Number Connections: 1 \n\ + Procedure: Test a simple GET request for an illegal object URL:\n\ + GET /login/api HTTP/1.1 + """ + run_404_check(self.http_connection, "/login/api", self.hostname) + + def test_login_post_invalid_username(self): + """ Test Name: test_login_post_invalid_username\n + Number Connections: One \n\ + Procedure: Simple POST request:\n\ + POST /api/login HTTP/1.1 + + Run a check for login by providing an incorrect username using a + well-formed request for /api/login. Not checking for response text. + """ + # JSON body for the request + data = {"username": self.invalid_username, "password": self.password} + body = json.dumps(data) + + # POST request for login + self.http_connection.request("POST", "/api/login", body) + + # Get the server response + server_response = self.http_connection.getresponse() + + # Check the response code + self.assertEqual(server_response.status, FORBIDDEN) + + def test_login_post_invalid_password(self): + """ Test Name: test_login_post_invalid_password\n + Number Connections: One \n\ + Procedure: Simple POST request:\n\ + POST /api/login HTTP/1.1 + + Run a check for login by providing an incorrect password using a + well-formed request for /api/login. Not checking for response text. + """ + # JSON body for the request + data = {"username": self.username, "password": self.invalid_password} + body = json.dumps(data) + + # POST request for login + self.http_connection.request("POST", "/api/login", body) + + # Get the server response + server_response = self.http_connection.getresponse() + + # Check the response code + self.assertEqual(server_response.status, FORBIDDEN) + + def test_login_valid_body_extra_parameters(self): + """ Test Name: test_login_valid_body_extra_parameters\n + Number Connections: One \n\ + Procedure: Simple POST request:\n\ + POST /api/login HTTP/1.1 + + Run a check for login by providing a well-formed body with extra + parameters in the JSON body for /api/login. Not checking for + response text. + """ + # JSON body for the request + data = {"username": self.username, "password": self.password, "key": "value"} + body = json.dumps(data) + + # POST request for login + self.http_connection.request("POST", "/api/login", body) + + # Get the server response + server_response = self.http_connection.getresponse() + + # Check the response code + self.assertEqual(server_response.status, OK) + def test_auth_flipped_token(self): """ Test Name: test_auth_flipped_token Number Connections: N/A @@ -859,291 +1154,6 @@ class Single_Conn_Malicious_Case(Doc_Print_Test_Case): self.assertEqual(response.status_code, requests.codes.ok, "Server failed to respond with private file despite being authenticated.") - def test_auth_wrong_cookie2(self): - """ Test Name: test_auth_wrong_cookie2 - Number Connections: N/A - Procedure: Sends cookies that are actually taken from one a web browser - when connecting to courses.cs.vt.edu. They're valid cookies, but they - should not be recognized by the server as valid authentication. - Similar idea for test_auth_wrong_cookie. A failure here means the server - might have crashed or it served a private file despite without checking - for valid authentication. - """ - # set up a few different cookies to try (no, these don't actually work - # - don't try any session hijacking with these cookies ;), we made - # sure they are invalid) - cookies = [ - ["IDMSESSID", "9DD957C450BBCFE9D75022A05DC71D0E701FE23AF0DEE777090831C9FFD087FF0EE5704771BA11D02B3FA5CC13F20B4F8A6758A02768E160AE1E100A8D4BECCE"], - ["auth_token", "[\"hokiebird\"\054 \"Hokie Bird\"].Yhy5-g.HXxh5WxmTawBv_LHPaTLnXNkYiI|5b9df2848955b572910a6ff3d2c98d27febbe6a8949c18cde52c8c11c91ed5437f40accae8f8b77a41e335e83556a3670d5f5178d8ddd4f8eb83e1a82974ce4a"], - ["session", ".eJwlzsFKw0AQgOFXKXuuZXczm93psV4qFBEs2GAkzM7OJEVNIaG2IL67hV7_e_L9mk4nmQezVvqaZWm6YzFrQ947cJoLRslQAaDW3hlSql2JZKUUT5m9S1LFCEKRMaqVKFqSg0hellofmStWUATRANYGmzmRdRA4E6KU2musEFXZ5aBY2hiyoGRukPMs013z3hq-zMO571uzXLTm8TSOp2xxei8fq2ZowkO_2h6uQ3i7fu_plvnpdtsX2u_Gw_Nnc3zyf_8CpUfl.Yjo3NQ.m-n22sd9bMNXyvtXpIS6dZ85Cv4"] - ] - - # now, come up with various combinations of cookies to try - cookie_combos = [] - for c1 in cookies: - combo = [] - combo.append(c1) - cookie_combos.append([c1]) - for c2 in cookies: - if c1 != c2: - combo.append(c2) - cookie_combos.append(combo) - - # loop through each of the cookies - for combo in cookie_combos: - # clear the session cookies and set cookies - self.session.cookies.clear() - for cookie in combo: - self.session.cookies.set(cookie[0], cookie[1]) - - # try making a GET /api/login request - response = None - try: - response = self.session.get('http://%s:%s/api/login' % (self.hostname, self.port), timeout=2) - except requests.exception.RequestException: - raise AssertionError("The server did not respond within 2s") - - # make sure the correct response code was sent - if response.status_code != requests.codes.ok: - raise AssertionError("The server responded with %d instead of 200 OK for a GET /api/login request" % - response.status_code) - - # make sure the JSON data returned is empty - if response.text.strip() != "{}": - raise AssertionError("The server returned something other than an empty JSON object ({}) for a " - "GET /api/login request with invalid cookies. Received: '%s'" % response.text) - - # now, try making a request for a private file - response = None - try: - response = self.session.get('http://%s:%s/private/secure.html' % (self.hostname, self.port), timeout=2) - except requests.exception.RequestException: - raise AssertionError("The server did not respond within 2s") - - # make sure we didn't receive a 200 OK - if response.status_code == requests.codes.ok: - raise AssertionError("The server served a private file despite not being authenticated.") - - -############################################################################## -## Class: Single_Conn_Bad_Case -## Test cases that aim for various errors in well-formed queries. -############################################################################## - -class Single_Conn_Bad_Case(Doc_Print_Test_Case): - """ - Test case for a single connection, using bad requests that are - well formed. The tests are aptly named for describing their effects. - Each case should be handled gracefully and without the server crashing. - """ - - def __init__(self, testname, hostname, port): - """ - Prepare the test case for creating connections. - """ - super(Single_Conn_Bad_Case, self).__init__(testname) - self.hostname = hostname - self.port = port - - # Prepare the a_string for query checks - self.a_string = "aaaaaaaaaaaaaaaa" - for x in range(0, 6): - self.a_string = self.a_string + self.a_string - - N = 10 - self.username = 'user0' - self.invalid_username = ''.join(random.choice(string.ascii_lowercase + string.ascii_uppercase) for _ in range(N)) - self.password = 'thepassword' - self.invalid_password = ''.join(random.choice(string.ascii_lowercase + string.ascii_uppercase) for _ in range(N)) - - def setUp(self): - """ Test Name: None -- setUp function\n\ - Number Connections: N/A \n\ - Procedure: Opens the HTTP connection to the server. An error here \ - means the script was unable to create a connection to the \ - server. - """ - # Make HTTP connection for the server - self.http_connection = HTTPConnection(self.hostname, self.port) - - # Connect to the server - self.http_connection.auto_open = 0 - self.http_connection.connect() - - def tearDown(self): - """ Test Name: None -- tearDown function\n\ - Number Connections: N/A \n\ - Procedure: Closes the HTTP connection to the server. An error here \ - means the server crashed after servicing the request from \ - the previous test. - """ - # Close the HTTP connection - self.http_connection.close() - if server.poll() is not None: - # self.fail("The server has crashed. Please investigate.") - print("The server has crashed. Please investigate.") - - def test_404_not_found_1(self): - """ Test Name: test_404_not_found_1\n\ - Number Connections: 1 \n\ - Procedure: Test a simple GET request for an illegal object URL:\n\ - GET /junk HTTP/1.1 - """ - run_404_check(self.http_connection, "/api/junk", self.hostname) - - def test_404_not_found_2(self): - """ Test Name: test_404_not_found_2\n\ - Number Connections: 1 \n\ - Procedure: Test a simple GET request for an illegal object URL:\n\ - GET /api/login/api/login HTTP/1.1 - """ - run_404_check(self.http_connection, "/api/login/api/login", self.hostname) - - def test_404_not_found_3(self): - """ Test Name: test_404_not_found_3\n\ - Number Connections: 1 \n\ - Procedure: Test a simple GET request for an illegal object URL:\n\ - GET /api/logon HTTP/1.1 - """ - run_404_check(self.http_connection, "/api/logon", self.hostname) - - def test_404_not_found_4(self): - """ Test Name: test_404_not_found_4\n\ - Number Connections: 1 \n\ - Procedure: Test a simple GET request for an illegal object URL:\n\ - GET /api/api/login HTTP/1.1 - """ - run_404_check(self.http_connection, "/api/api/login", self.hostname) - - def test_404_not_found_5(self): - """ Test Name: test_404_not_found_5\n\ - Number Connections: 1 \n\ - Procedure: Test a simple GET request for an illegal object URL:\n\ - GET /api/loginjunk HTTP/1.1 - """ - run_404_check(self.http_connection, "/api/api", self.hostname) - - def test_404_not_found_6(self): - """ Test Name: test_404_not_found_6\n\ - Number Connections: 1 \n\ - Procedure: Test a simple GET request for an illegal object URL:\n\ - GET /api/loginjunk HTTP/1.1 - """ - run_404_check(self.http_connection, "/api/loginjunk", self.hostname) - - def test_404_not_found_7(self): - """ Test Name: test_404_not_found_7\n\ - Number Connections: 1 \n\ - Procedure: Test a simple GET request for an illegal object URL:\n\ - GET /login/api HTTP/1.1 - """ - run_404_check(self.http_connection, "/login/api", self.hostname) - def test_method_check_long(self): - """ Test Name: test_method_check_4\n\ - Number Connections: 1 \n\ - Procedure: Test a request using a long method:\n\ - aa....aaa /api/login HTTP/1.1 - """ - run_method_check(self.http_connection, self.a_string*2, self.hostname) - - def test_method_check_4(self): - """ Test Name: test_method_check_4\n\ - Number Connections: 1 \n\ - Procedure: Test a request using a different method than GET:\n\ - ASD /api/login HTTP/1.1 - """ - run_method_check(self.http_connection, "ASD", self.hostname) - - def test_login_post_invalid_username(self): - """ Test Name: test_login_post_invalid_username\n - Number Connections: One \n\ - Procedure: Simple POST request:\n\ - POST /api/login HTTP/1.1 - - Run a check for login by providing an incorrect username using a - well-formed request for /api/login. Not checking for response text. - """ - # JSON body for the request - data = {"username": self.invalid_username, "password": self.password} - body = json.dumps(data) - - # POST request for login - self.http_connection.request("POST", "/api/login", body) - - # Get the server response - server_response = self.http_connection.getresponse() - - # Check the response code - self.assertEqual(server_response.status, FORBIDDEN) - - def test_login_post_invalid_password(self): - """ Test Name: test_login_post_invalid_password\n - Number Connections: One \n\ - Procedure: Simple POST request:\n\ - POST /api/login HTTP/1.1 - - Run a check for login by providing an incorrect password using a - well-formed request for /api/login. Not checking for response text. - """ - # JSON body for the request - data = {"username": self.username, "password": self.invalid_password} - body = json.dumps(data) - - # POST request for login - self.http_connection.request("POST", "/api/login", body) - - # Get the server response - server_response = self.http_connection.getresponse() - - # Check the response code - self.assertEqual(server_response.status, FORBIDDEN) - - - def test_login_post_invalid_body(self): - """ Test Name: test_login_post_invalid_body\n - Number Connections: One \n\ - Procedure: Simple POST request:\n\ - POST /api/login HTTP/1.1 - - Run a check for login by providing an ill-formed body i.e. not JSON - for /api/login. Not checking for response text. - """ - # Ill-formed body for the request - data = '"username": "%s", "password": "%s"' % (self.username, self.password) - - # POST request for login - self.http_connection.request("POST", "/api/login", data) - - # Get the server response - server_response = self.http_connection.getresponse() - - # Check the response code (403 or 400) - self.assertTrue(server_response.status == FORBIDDEN or - server_response.status == BAD_REQUEST) - - def test_login_valid_body_extra_parameters(self): - """ Test Name: test_login_valid_body_extra_parameters\n - Number Connections: One \n\ - Procedure: Simple POST request:\n\ - POST /api/login HTTP/1.1 - - Run a check for login by providing a well-formed body with extra - parameters in the JSON body for /api/login. Not checking for - response text. - """ - # JSON body for the request - data = {"username": self.username, "password": self.password, "key": "value"} - body = json.dumps(data) - - # POST request for login - self.http_connection.request("POST", "/api/login", body) - - # Get the server response - server_response = self.http_connection.getresponse() - - # Check the response code - self.assertEqual(server_response.status, OK) - class Multi_Conn_Sequential_Case(Doc_Print_Test_Case): """ @@ -2919,14 +2929,24 @@ process. # Check if the server passed the extra tests if test_results.wasSuccessful(): print("\nYou have passed the Extra Tests for this project!\n") - else: - print("\nYou have NOT passed the Extra Tests for this project.\n" + - "Please examine the above errors, the Malicious Tests\n" + - "will not be run until the above tests pass.\n") + + # decide whether or not we should run the malicious tests + do_run_malicious = minreq_score == 1.0 and \ + auth_score == 1.0 and \ + extra_score == 1.0 + if not do_run_malicious: + print("\nYou have NOT passed one of the following test categories:\n" + " - %s\n - %s\n - %s\n" + "Please examine the errors above. The Malicious tests will not\n" + "be run until the above tests pass.\n" % + (test_categories["minreq"]["name"], + test_categories["auth"]["name"], + test_categories["extra"]["name"])) print_points(minreq_score, extra_score, 0, ipv6_score, auth_score, fallback_score, video_score) sys.exit() + print("Now running the MALICIOUS Tests. WARNING: These tests will not necessarily run fast!") time.sleep(1) # Run the malicious tests