Merge branch 'master' of git.cs.vt.edu:cs3214-staff/pserv

This commit is contained in:
Godmar Back 2022-11-17 01:11:40 -05:00
commit ad249906f4

View File

@ -74,7 +74,9 @@ def run_connection_check_empty_login(http_conn, hostname):
server_response = http_conn.getresponse()
# Check the response status code
assert server_response.status == OK, "Server failed to respond"
assert server_response.status == OK, "Server failed to respond. " \
"This test will fail until persistent connections are implemented (i.e. HTTP/1.1 support). " \
"We recommend you implement this before moving forward."
# Check the data included in the server's response
assert check_empty_login_respnse(server_response.read().decode('utf-8')), \
@ -1068,7 +1070,8 @@ class Multi_Conn_Sequential_Case(Doc_Print_Test_Case):
""" Test Name: test_two_connections\n\
Number Connections: 2 \n\
Procedure: Run 2 connections simultaneously for simple GET requests:\n\
GET /api/login HTTP/1.1
GET /api/login HTTP/1.1
NOTE: this test requires HTTP/1.1 persistent connection support.
"""
# Append two connections to the list
@ -1092,10 +1095,11 @@ class Multi_Conn_Sequential_Case(Doc_Print_Test_Case):
def test_four_connections(self):
""" Test Name: test_two_connections\n\
""" Test Name: test_four_connections\n\
Number Connections: 4 \n\
Procedure: Run 4 connections simultaneously for simple GET requests:\n\
GET /api/login HTTP/1.1
GET /api/login HTTP/1.1
NOTE: this test requires HTTP/1.1 persistent connection support.
"""
# Append four connections to the list
@ -1115,10 +1119,11 @@ class Multi_Conn_Sequential_Case(Doc_Print_Test_Case):
run_connection_check_empty_login(http_conn, self.hostname)
def test_eight_connections(self):
""" Test Name: test_two_connections\n\
""" Test Name: test_eight_connections\n\
Number Connections: 8 \n\
Procedure: Run 8 connections simultaneously for simple GET requests:\n\
GET /api/login HTTP/1.1
GET /api/login HTTP/1.1
NOTE: this test requires HTTP/1.1 persistent connection support.
"""
# Append eight connections to the list
@ -1247,6 +1252,17 @@ class Access_Control(Doc_Print_Test_Case):
# Close the HTTP connection
self.session.close()
# =============================== Helpers ================================ #
# Does a lower-case search for headers within a response's headers. If
# found, the first ocurrence is returned (the header's value is returned).
def find_header(self, response, name):
for header in response.headers:
if header.lower() == name.lower():
return response.headers[header]
return None
# ================================ Tests ================================= #
def test_access_control_private_valid_token(self):
""" Test Name: test_access_control_private_valid_token
Number Connections: N/A
@ -1562,6 +1578,56 @@ class Access_Control(Doc_Print_Test_Case):
self.assertEqual(response.status_code, requests.codes.not_found,
"Server did not respond with 404 when it should have, possible IDOR?")
def test_login_content_type(self):
""" Test Name: test_login_content_type
Number Connections: N/A
Procedure: Checks to ensure the Content-Type header is being sent in
responses to GETs and POSTs to /api/login (both with AND
without Cookie headers). A failure here means either:
- 'Content-Type' is not a part of some or all of your /api/login responses, OR
- The value of your 'Content-Type' header is not what it should be.
"""
# inner helper function that takes a response and checks for the correct
# content-type header
def check_content_type(response):
# search for the content-type header and ensure we see "application/json"
content_type = self.find_header(response, "Content-Type")
content_expect = "application/json"
if content_type == None:
raise AssertionError("Server didn't respond with the Content-Type header when sent a request to /api/login")
if content_type.lower() != content_expect:
raise AssertionError("Server didn't respond with the correct Content-Type value when sent a request to /api/login. "
"Expected: '%s', received: '%s'" % (content_expect, content_type))
# first, we'll build the /api/login url
login_url = "http://%s:%s/api/login" % (self.hostname, self.port)
# TEST 1: send a simple GET /api/login with NO COOKIE
try:
response = self.session.get(login_url, timeout=2)
check_content_type(response)
except requests.exceptions.RequestException:
raise AssertionError("The server did not respond within 2s")
# TEST 2: try sending a POST /api/login with the correct credentials
try:
response = self.session.post(login_url,
json={'username': self.username, 'password': self.password},
timeout=2)
check_content_type(response)
# Ensure that the user is authenticated
self.assertEqual(response.status_code, requests.codes.ok, "Authentication failed.")
except requests.exceptions.RequestException:
raise AssertionError("The server did not respond within 2s")
# TEST 3: send one more GET /api/login with the cookie we just received
try:
response = self.session.get(login_url, timeout=2)
check_content_type(response)
except requests.exceptions.RequestException:
raise AssertionError("The server did not respond within 2s")
class Fallback(Doc_Print_Test_Case):
"""
Test cases for HTML 5 fallback, using good requests that expect a
@ -1805,7 +1871,7 @@ class Authentication(Doc_Print_Test_Case):
pool = ThreadPool(30)
pool.map(test_expiry_authentication, range(30))
pool.terminate()
def test_jwt_claims_json(self):
""" Test Name: test_jwt_claims_json
Number Connections: N/A
@ -1818,6 +1884,7 @@ class Authentication(Doc_Print_Test_Case):
self.sessions.append(requests.Session())
for i in range(30):
# ----------------------- Login JSON Check ----------------------- #
# Login using the default credentials
try:
response = self.sessions[i].post('http://%s:%s/api/login' % (self.hostname, self.port),
@ -1833,19 +1900,13 @@ class Authentication(Doc_Print_Test_Case):
# Convert the response to JSON
data = response.json()
# Verify that the JWT contains 'iat'
# ensure all expected fields are present
assert 'iat' in data, "Could not find the claim 'iat' in the JSON object."
# Verify that the JWT contains 'iat'
assert 'exp' in data, "Could not find the claim 'exp' in the JSON object."
# Verify that the JWT contains 'sub'
assert 'sub' in data, "Could not find the claim 'sub' in the JSON object."
# Verify that the 'iat' claim to is a valid date from self.current_year
# verify that the two timestamps are valid dates
assert datetime.fromtimestamp(data['iat']).year == self.current_year, "'iat' returned is not a valid date"
# Verify that the 'exp' claim to is a valid date from self.current_year
assert datetime.fromtimestamp(data['exp']).year == self.current_year, "'exp' returned is not a valid date"
# Verify that the subject claim to is set to the right username
@ -1854,6 +1915,34 @@ class Authentication(Doc_Print_Test_Case):
except ValueError:
raise AssertionError('The login API did not return a valid JSON object')
# --------------------- Login GET JSON Check --------------------- #
# send a GET request to retrieve the same claims as above
try:
response = self.sessions[i].get('http://%s:%s/api/login' % (self.hostname, self.port),
timeout=2)
except requests.exceptions.RequestException:
raise AssertionError("The server did not respond within 2s")
try:
# Convert the response to JSON
data = response.json()
# ensure all expected fields are present
assert 'iat' in data, "Could not find the claim 'iat' in the JSON object."
assert 'exp' in data, "Could not find the claim 'exp' in the JSON object."
assert 'sub' in data, "Could not find the claim 'sub' in the JSON object."
# Verify that the two timestamps are valid dates from self.current_year
assert datetime.fromtimestamp(data['iat']).year == self.current_year, "'iat' returned is not a valid date"
assert datetime.fromtimestamp(data['exp']).year == self.current_year, "'exp' returned is not a valid date"
# Verify that the subject claim to is set to the right username
assert data['sub'] == self.username, "The subject claim 'sub' should be set to %s" % self.username
except ValueError:
raise AssertionError('The login GET API did not return a valid JSON object')
# Sleep for a short duration before testing again
time.sleep(random.random() / 10.0)
@ -2112,7 +2201,7 @@ class VideoStreaming(Doc_Print_Test_Case):
"""
# build a collection of URLs to try
url_prefix = "http://%s:%s" % (self.hostname, self.port)
resources = ["/", "/index.html", "/public/index.html", "/api/login", "/api/video", "/v1.mp4"]
resources = ["/index.html", "/public/index.html", "/v1.mp4"]
# do the following for each URL
occurrences = 0
@ -2131,7 +2220,7 @@ class VideoStreaming(Doc_Print_Test_Case):
# make sure the correct status code was received
if response.status_code != requests.codes.ok:
raise AssertionError("Server responded with %d instead of 200 OK when requested with %s" %
response.status_code, resource)
(response.status_code, resource))
# search the header dictionary (lowercase comparison) for Accept-Ranges
accept_ranges_expect = "bytes"
@ -2141,11 +2230,12 @@ class VideoStreaming(Doc_Print_Test_Case):
# make sure the correct value is given ("bytes")
if "bytes" not in accept_ranges:
raise AssertionError("Server responded with an unexpected Accept-Ranges values. "
"Expected: %s, received: %s" % (accept_ranges_expect, response.headers[header]))
"Expected: %s, received: %s" % (accept_ranges_expect, response.headers["Accept-Ranges"]))
# if no occurrences were found, throw an error
if occurrences == 0:
raise AssertionError("Failed to find the Accept-Ranges header in the server's responses.")
raise AssertionError("Failed to find the Accept-Ranges header in the server's responses. "
"Your server must send 'Accept-Ranges: bytes' in its HTTP responses when serving static files.")
def test_video_get(self):
""" Test Name: test_video_get
@ -2208,7 +2298,7 @@ class VideoStreaming(Doc_Print_Test_Case):
vidsize = os.path.getsize(self.vids[0])
url = "http://%s:%s/%s" % (self.hostname, self.port, vid)
# set up a few range request values to test with the video
ranges = [[0, 1], [0, 100], [300, 500], [1000, -1], [-1, 1000]]
ranges = [[0, 1], [0, 100], [300, 500], [1000, -1]]#, [-1, 1000]]
# iterate across each range array to test each one
for rg in ranges:
@ -2226,21 +2316,22 @@ class VideoStreaming(Doc_Print_Test_Case):
prepared_req.url = url
response = self.session.send(prepared_req, timeout=2)
except requests.exceptions.RequestException:
raise AssertionError("The server did not respond within 2s")
raise AssertionError("The server did not respond within 2s\nRange request sent: '%s'" % rgheader)
# make sure the correct status code was received
if response.status_code != requests.codes.partial_content:
raise AssertionError("Server responded with %d instead of 206 PARTIAL CONTENT when range-requested with a valid video" %
response.status_code)
raise AssertionError("Server responded with %d instead of 206 PARTIAL CONTENT when range-requested with a valid video"
"\nRange request sent: '%s'" % (response.status_code, rgheader))
# check for the content-type header
content_type = self.find_header(response, "Content-Type")
content_expect = "video/mp4"
if content_type == None:
raise AssertionError("Server didn't respond with the Content-Type header when requested with a valid video")
raise AssertionError("Server didn't respond with the Content-Type header when requested with a valid video"
"\nRange request sent: '%s'" % rgheader)
if content_type.lower() != content_expect:
raise AssertionError("Server didn't respond with the correct Content-Type value when requested with a valid video. "
"Expected: %s, received: %s" % (content_expect, content_type))
"Expected: %s, received: %s\nRange request sent: '%s'" % (content_expect, content_type, rgheader))
# check for the content-length header and make sure it's the correct
# value based on the current range value we're trying
@ -2251,27 +2342,29 @@ class VideoStreaming(Doc_Print_Test_Case):
elif rg[1] == -1:
content_length_expect = vidsize - rg[0]
if content_length == None:
raise AssertionError("Server didn't respond with the Content-Length header when requested with a valid video")
raise AssertionError("Server didn't respond with the Content-Length header when requested with a valid video"
"\nRange request sent: '%s'" % rgheader)
if content_length != str(content_length_expect):
raise AssertionError("Server didn't respond with the correct Content-Length value when requested with a valid video. "
"Expected: %s, received: %s" % (content_length_expect, content_length))
"Expected: %s, received: %s\nRange request sent: '%s'" % (content_length_expect, content_length, rgheader))
# check for the Content-Range header and make sure it's the correct
# value
content_range = self.find_header(response, "Content-Range")
byte_start = rg[0] if rg[0] != -1 else vidsize - rg[1]
content_range_expect = "bytes %d-%d/%d" % (byte_start, byte_start + content_length_expect, vidsize)
content_range_expect = "bytes %d-%d/%d" % (byte_start, byte_start + content_length_expect - 1, vidsize)
if content_range == None:
raise AssertionError("Server didn't respond with the Content-Range header when requested with a valid video")
if content_type.lower() != content_expect:
raise AssertionError("Server didn't respond with the Content-Range header when requested with a valid video"
"\nRange request sent: '%s'" % rgheader)
if content_range.lower() != content_range_expect:
raise AssertionError("Server didn't respond with the correct Content-Range value when requested with a valid video. "
"Expected: '%s', received: '%s'" % (content_range_expect, content_range))
"Expected: '%s', received: '%s'\nRange request sent: '%s'" % (content_range_expect, content_range, rgheader))
# finally, we'll compare the actual bytes that were received. They
# must match the exact bytes found in the original file
if not self.compare_file_bytes(self.vids[0], response, byte_start, content_length_expect):
raise AssertionError("Server didn't send the correct bytes. Should have been bytes %d-%d" %
(byte_start, byte_start + content_length_expect - 1))
raise AssertionError("Server didn't send the correct bytes. Should have been bytes %d-%d"
"\nRange request sent: '%s'" % (byte_start, byte_start + content_length_expect - 1, rgheader))
###############################################################################
@ -2641,6 +2734,12 @@ process.
if individual_test is not None:
single_test_suite = unittest.TestSuite()
testclass = findtest(individual_test)
# make sure the class was found
if testclass == None:
print("Couldn't find a test with the name '%s'" % individual_test)
sys.exit(1)
if testclass == Authentication:
killserver(server)
server.wait()