-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
10 changed files
with
658 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
import json, os | ||
from appointment.database.models import ExternalConnectionType | ||
from defines import auth_headers, TEST_USER_ID | ||
|
||
|
||
class TestAccount: | ||
def test_account_get_external_connections(self, with_client, make_external_connections): | ||
# add a couple of external connections to our test user | ||
username = 'username' | ||
type_id = json.dumps(['url', username]) | ||
zoom_ec = make_external_connections(TEST_USER_ID, type=ExternalConnectionType.zoom, type_id=type_id) | ||
assert zoom_ec.type_id == type_id | ||
google_ec = make_external_connections(TEST_USER_ID, type=ExternalConnectionType.google, type_id=type_id) | ||
assert google_ec.type_id == type_id | ||
|
||
# now get the list of our external connections and verify | ||
response = with_client.get( | ||
'/account/external-connections', headers=auth_headers | ||
) | ||
|
||
assert response.status_code == 200, response.text | ||
ext_connections = response.json() | ||
zoom_connections = ext_connections.get('zoom', None) | ||
assert len(zoom_connections) == 1 | ||
assert zoom_connections[0]['owner_id'] == TEST_USER_ID | ||
assert zoom_connections[0]['name'] == zoom_ec.name | ||
google_connections = ext_connections.get('google', None) | ||
assert len(google_connections) == 1 | ||
assert google_connections[0]['owner_id'] == TEST_USER_ID | ||
assert google_connections[0]['name'] == google_ec.name | ||
|
||
def test_account_available_emails(self, with_client, make_external_connections): | ||
# currently we have one email available | ||
test_user_email = os.environ.get('TEST_USER_EMAIL') | ||
user_email_list = [ test_user_email ] | ||
|
||
# get available emails and confirm | ||
response = with_client.get( | ||
'/account/available-emails', headers=auth_headers | ||
) | ||
|
||
assert response.status_code == 200, response.text | ||
email_list_ret = response.json() | ||
assert email_list_ret == user_email_list | ||
|
||
# now add another email/name via a google connection | ||
type_id = json.dumps(['url', test_user_email]) | ||
google_ec = make_external_connections(TEST_USER_ID, type=ExternalConnectionType.google, type_id=type_id) | ||
user_email_list.append(google_ec.name) | ||
|
||
# get available emails again and confirm new one was added | ||
response = with_client.get( | ||
'/account/available-emails', headers=auth_headers | ||
) | ||
|
||
assert response.status_code == 200, response.text | ||
email_list_ret = response.json() | ||
assert email_list_ret == user_email_list |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -131,6 +131,23 @@ def test_token_creates_user(self, with_db, with_client): | |
) | ||
assert response.status_code == 403, response.text | ||
|
||
def test_token_fails_due_to_invalid_auth_scheme(self, with_db, with_client, make_pro_subscriber): | ||
"""Test that our username/password authentication fails when auth scheme is fxa""" | ||
saved_scheme = os.environ['AUTH_SCHEME'] | ||
os.environ['AUTH_SCHEME'] = 'fxa' | ||
password = 'test' | ||
bad_password = 'test2' | ||
|
||
subscriber = make_pro_subscriber(password=password) | ||
|
||
# Test good credentials | ||
response = with_client.post( | ||
'/token', | ||
data={'username': subscriber.email, 'password': password}, | ||
) | ||
os.environ['AUTH_SCHEME'] = saved_scheme | ||
assert response.status_code == 405, response.text | ||
|
||
|
||
class TestFXA: | ||
def test_fxa_login(self, with_client): | ||
|
@@ -227,6 +244,18 @@ def test_fxa_with_allowlist_and_with_invite(self, with_client, with_l10n, make_i | |
assert 'url' in data | ||
assert data.get('url') == FXA_CLIENT_PATCH.get('authorization_url') | ||
|
||
def test_fxa_login_fail_with_invalid_auth_scheme(self, with_client): | ||
saved_scheme = os.environ['AUTH_SCHEME'] | ||
os.environ['AUTH_SCHEME'] = 'NOT-fxa' | ||
response = with_client.get( | ||
'/fxa_login', | ||
params={ | ||
'email': FXA_CLIENT_PATCH.get('subscriber_email'), | ||
}, | ||
) | ||
os.environ['AUTH_SCHEME'] = saved_scheme | ||
assert response.status_code == 405, response.text | ||
|
||
def test_fxa_callback_with_invite(self, with_db, with_client, monkeypatch, make_invite): | ||
"""Test that our callback function correctly handles the session states, and creates a new subscriber""" | ||
os.environ['AUTH_SCHEME'] = 'fxa' | ||
|
@@ -460,6 +489,29 @@ def test_fxa_token_failed_due_to_empty_auth(self, make_basic_subscriber, with_cl | |
|
||
assert response.status_code == 401, response.text | ||
|
||
def test_fxa_token_failed_due_to_invalid_auth_scheme(self, with_client, make_basic_subscriber): | ||
saved_scheme = os.environ['AUTH_SCHEME'] | ||
os.environ['AUTH_SCHEME'] = 'NOT-fxa' | ||
|
||
# Clear get_subscriber dep, so we can retrieve the real subscriber info later | ||
del with_client.app.dependency_overrides[auth.get_subscriber] | ||
|
||
subscriber = make_basic_subscriber(email='[email protected]') | ||
access_token_expires = timedelta(minutes=float(10)) | ||
one_time_access_token = create_access_token(data={ | ||
'sub': f'uid-{subscriber.id}', | ||
'jti': secrets.token_urlsafe(16) | ||
}, expires_delta=access_token_expires) | ||
|
||
# Exchange the one-time token with a long-living token | ||
response = with_client.post( | ||
'/fxa-token', headers={ | ||
'Authorization': f'Bearer {one_time_access_token}' | ||
} | ||
) | ||
os.environ['AUTH_SCHEME'] = saved_scheme | ||
assert response.status_code == 405, response.text | ||
|
||
|
||
class TestCalDAV: | ||
def test_auth(self, with_db, with_client): | ||
|
@@ -510,3 +562,26 @@ def test_disconnect(self, with_db, with_client, make_external_connections, make_ | |
|
||
calendar = repo.calendar.get(db, calendar.id) | ||
assert calendar is None | ||
|
||
|
||
class TestGoogle: | ||
def test_disconnect(self, with_db, with_client, make_external_connections, make_google_calendar): | ||
"""Ensure we remove the external google connection and any related calendars""" | ||
username = 'username' | ||
type_id = json.dumps(['url', username]) | ||
ec = make_external_connections(TEST_USER_ID, type=models.ExternalConnectionType.google, type_id=type_id) | ||
calendar = make_google_calendar(subscriber_id=TEST_USER_ID) | ||
|
||
response = with_client.post( | ||
'/google/disconnect', json={'type_id': ec.type_id}, | ||
headers=auth_headers | ||
) | ||
|
||
assert response.status_code == 200, response.content | ||
|
||
with with_db() as db: | ||
ecs = repo.external_connection.get_by_type(db, TEST_USER_ID, models.ExternalConnectionType.google, type_id=type_id) | ||
assert len(ecs) == 0 | ||
|
||
calendar = repo.calendar.get(db, calendar.id) | ||
assert calendar is None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,8 @@ def test_health_for_locale(self, with_client): | |
def test_access_without_authentication_token(self, with_client): | ||
# response = client.get("/login") | ||
# assert response.status_code == 401 | ||
response = with_client.get('/me') | ||
assert response.status_code == 401 | ||
response = with_client.put('/me') | ||
assert response.status_code == 401 | ||
response = with_client.get('/me/calendars') | ||
|
@@ -52,21 +54,65 @@ def test_access_without_authentication_token(self, with_client): | |
assert response.status_code == 401 | ||
response = with_client.delete('/cal/1') | ||
assert response.status_code == 401 | ||
response = with_client.post('/caldav/auth') | ||
assert response.status_code == 401 | ||
response = with_client.post('/caldav/disconnect') | ||
assert response.status_code == 401 | ||
response = with_client.post('/rmt/calendars') | ||
assert response.status_code == 401 | ||
response = with_client.get('/rmt/cal/1/' + DAY1 + '/' + DAY5) | ||
assert response.status_code == 401 | ||
response = with_client.post('/rmt/sync') | ||
assert response.status_code == 401 | ||
response = with_client.get('/account/available-emails') | ||
assert response.status_code == 401 | ||
response = with_client.get('/account/download') | ||
assert response.status_code == 401 | ||
response = with_client.get('/account/external-connections/') | ||
assert response.status_code == 401 | ||
response = with_client.delete('/account/delete') | ||
assert response.status_code == 401 | ||
response = with_client.get('/google/auth') | ||
assert response.status_code == 401 | ||
response = with_client.post('/google/disconnect') | ||
assert response.status_code == 401 | ||
response = with_client.post('/schedule') | ||
assert response.status_code == 401 | ||
response = with_client.get('/schedule') | ||
assert response.status_code == 401 | ||
response = with_client.get('/schedule/0') | ||
assert response.status_code == 401 | ||
response = with_client.put('/schedule/0') | ||
assert response.status_code == 401 | ||
response = with_client.get('/invite') | ||
assert response.status_code == 401 | ||
response = with_client.post('/invite/generate/1') | ||
assert response.status_code == 401 | ||
response = with_client.put('/invite/revoke/1') | ||
assert response.status_code == 401 | ||
response = with_client.get('/subscriber') | ||
assert response.status_code == 401 | ||
response = with_client.put('/subscriber/enable/[email protected]') | ||
assert response.status_code == 401 | ||
response = with_client.put('/subscriber/disable/[email protected]') | ||
assert response.status_code == 401 | ||
response = with_client.post('/subscriber/setup') | ||
assert response.status_code == 401 | ||
response = with_client.post('/waiting-list/invite') | ||
assert response.status_code == 401 | ||
|
||
def test_send_feedback(self, with_client): | ||
response = with_client.post( | ||
'/support', json={'topic': 'Hello World', 'details': 'Hello World but longer'}, headers=auth_headers | ||
) | ||
assert response.status_code == 200 | ||
|
||
def test_send_feedback_no_email_configured(self, with_client): | ||
"""Attempt to send feedback with no support email configured; expect error""" | ||
saved_email = os.environ['SUPPORT_EMAIL'] | ||
os.environ['SUPPORT_EMAIL'] = '' | ||
response = with_client.post( | ||
'/support', json={'topic': 'Hello World', 'details': 'Hello World but longer'}, headers=auth_headers | ||
) | ||
os.environ['SUPPORT_EMAIL'] = saved_email | ||
assert response.status_code == 500 |
Oops, something went wrong.