diff --git a/backend/test/integration/test_account.py b/backend/test/integration/test_account.py new file mode 100644 index 00000000..e1386ed9 --- /dev/null +++ b/backend/test/integration/test_account.py @@ -0,0 +1,58 @@ +import json, os +from uuid import uuid4 +from appointment.database.models import ExternalConnectionType +from defines import auth_headers, TEST_USER_ID + + +class TestAccount: + def test_account_get_external_connections(self, with_client, make_external_connections): + # add a couple of external connections to our test user + type_id = str(uuid4()) + zoom_ec = make_external_connections(TEST_USER_ID, type=ExternalConnectionType.zoom, type_id=type_id) + assert zoom_ec.type_id == type_id + google_ec = make_external_connections(TEST_USER_ID, type=ExternalConnectionType.google, type_id=type_id) + assert google_ec.type_id == type_id + + # now get the list of our external connections and verify + response = with_client.get( + '/account/external-connections', headers=auth_headers + ) + + assert response.status_code == 200, response.text + ext_connections = response.json() + zoom_connections = ext_connections.get('zoom', None) + assert len(zoom_connections) == 1 + assert zoom_connections[0]['owner_id'] == TEST_USER_ID + assert zoom_connections[0]['name'] == zoom_ec.name + google_connections = ext_connections.get('google', None) + assert len(google_connections) == 1 + assert google_connections[0]['owner_id'] == TEST_USER_ID + assert google_connections[0]['name'] == google_ec.name + + def test_account_available_emails(self, with_client, make_external_connections): + # currently we have one email available + test_user_email = os.environ.get('TEST_USER_EMAIL') + user_email_list = [ test_user_email ] + + # get available emails and confirm + response = with_client.get( + '/account/available-emails', headers=auth_headers + ) + + assert response.status_code == 200, response.text + email_list_ret = response.json() + assert email_list_ret == user_email_list + + # now add another email/name via a google connection + type_id = str(uuid4()) + google_ec = make_external_connections(TEST_USER_ID, type=ExternalConnectionType.google, type_id=type_id) + user_email_list.append(google_ec.name) + + # get available emails again and confirm new one was added + response = with_client.get( + '/account/available-emails', headers=auth_headers + ) + + assert response.status_code == 200, response.text + email_list_ret = response.json() + assert email_list_ret == user_email_list diff --git a/backend/test/integration/test_appointment.py b/backend/test/integration/test_appointment.py index b55de71d..bb4c126b 100644 --- a/backend/test/integration/test_appointment.py +++ b/backend/test/integration/test_appointment.py @@ -20,7 +20,6 @@ def list_events(self, start, end): end = dateutil.parser.parse(end) from appointment.database import schemas - print('list events!') return [ schemas.Event( title=generated_appointment.title, @@ -36,7 +35,6 @@ def list_events(self, start, end): monkeypatch.setattr(CalDavConnector, 'list_events', list_events) path = f'/rmt/cal/{generated_appointment.calendar_id}/' + DAY1 + '/' + DAY3 - print(f'>>> {path}') response = with_client.get(path, headers=auth_headers) assert response.status_code == 200, response.text data = response.json() @@ -45,6 +43,15 @@ def list_events(self, start, end): assert data[0]['start'] == generated_appointment.slots[0].start.isoformat() assert data[0]['end'] == dateutil.parser.parse(DAY3).isoformat() + def test_get_remote_caldav_events_inavlid_calendar(self, with_client, make_appointment): + generated_appointment = make_appointment() + + path = f'/rmt/cal/{generated_appointment.calendar_id + 999}/' + DAY1 + '/' + DAY3 + response = with_client.get(path, headers=auth_headers) + assert response.status_code == 404, response.text + data = response.json() + assert data['detail']['id'] == 'CALENDAR_NOT_FOUND' + def test_get_invitation_ics_file(self, with_client, make_appointment): generated_appointment = make_appointment() diff --git a/backend/test/integration/test_auth.py b/backend/test/integration/test_auth.py index dd807036..50084fb7 100644 --- a/backend/test/integration/test_auth.py +++ b/backend/test/integration/test_auth.py @@ -1,7 +1,6 @@ -import json -import os -import secrets +import os, json, secrets from datetime import timedelta +from uuid import uuid4 from unittest.mock import patch from appointment.dependencies import auth @@ -131,6 +130,23 @@ def test_token_creates_user(self, with_db, with_client): ) assert response.status_code == 403, response.text + def test_token_fails_due_to_invalid_auth_scheme(self, with_db, with_client, make_pro_subscriber): + """Test that our username/password authentication fails when auth scheme is fxa""" + saved_scheme = os.environ['AUTH_SCHEME'] + os.environ['AUTH_SCHEME'] = 'fxa' + password = 'test' + bad_password = 'test2' + + subscriber = make_pro_subscriber(password=password) + + # Test good credentials + response = with_client.post( + '/token', + data={'username': subscriber.email, 'password': password}, + ) + os.environ['AUTH_SCHEME'] = saved_scheme + assert response.status_code == 405, response.text + class TestFXA: def test_fxa_login(self, with_client): @@ -227,6 +243,18 @@ def test_fxa_with_allowlist_and_with_invite(self, with_client, with_l10n, make_i assert 'url' in data assert data.get('url') == FXA_CLIENT_PATCH.get('authorization_url') + def test_fxa_login_fail_with_invalid_auth_scheme(self, with_client): + saved_scheme = os.environ['AUTH_SCHEME'] + os.environ['AUTH_SCHEME'] = 'NOT-fxa' + response = with_client.get( + '/fxa_login', + params={ + 'email': FXA_CLIENT_PATCH.get('subscriber_email'), + }, + ) + os.environ['AUTH_SCHEME'] = saved_scheme + assert response.status_code == 405, response.text + def test_fxa_callback_with_invite(self, with_db, with_client, monkeypatch, make_invite): """Test that our callback function correctly handles the session states, and creates a new subscriber""" os.environ['AUTH_SCHEME'] = 'fxa' @@ -460,6 +488,29 @@ def test_fxa_token_failed_due_to_empty_auth(self, make_basic_subscriber, with_cl assert response.status_code == 401, response.text + def test_fxa_token_failed_due_to_invalid_auth_scheme(self, with_client, make_basic_subscriber): + saved_scheme = os.environ['AUTH_SCHEME'] + os.environ['AUTH_SCHEME'] = 'NOT-fxa' + + # Clear get_subscriber dep, so we can retrieve the real subscriber info later + del with_client.app.dependency_overrides[auth.get_subscriber] + + subscriber = make_basic_subscriber(email='apple@example.org') + access_token_expires = timedelta(minutes=float(10)) + one_time_access_token = create_access_token(data={ + 'sub': f'uid-{subscriber.id}', + 'jti': secrets.token_urlsafe(16) + }, expires_delta=access_token_expires) + + # Exchange the one-time token with a long-living token + response = with_client.post( + '/fxa-token', headers={ + 'Authorization': f'Bearer {one_time_access_token}' + } + ) + os.environ['AUTH_SCHEME'] = saved_scheme + assert response.status_code == 405, response.text + class TestCalDAV: def test_auth(self, with_db, with_client): @@ -510,3 +561,25 @@ def test_disconnect(self, with_db, with_client, make_external_connections, make_ calendar = repo.calendar.get(db, calendar.id) assert calendar is None + + +class TestGoogle: + def test_disconnect(self, with_db, with_client, make_external_connections, make_google_calendar): + """Ensure we remove the external google connection and any related calendars""" + type_id = str(uuid4()) + ec = make_external_connections(TEST_USER_ID, type=models.ExternalConnectionType.google, type_id=type_id) + calendar = make_google_calendar(subscriber_id=TEST_USER_ID) + + response = with_client.post( + '/google/disconnect', json={'type_id': ec.type_id}, + headers=auth_headers + ) + + assert response.status_code == 200, response.content + + with with_db() as db: + ecs = repo.external_connection.get_by_type(db, TEST_USER_ID, models.ExternalConnectionType.google, type_id=type_id) + assert len(ecs) == 0 + + calendar = repo.calendar.get(db, calendar.id) + assert calendar is None diff --git a/backend/test/integration/test_calendar.py b/backend/test/integration/test_calendar.py index 6e97470d..96ed6e01 100644 --- a/backend/test/integration/test_calendar.py +++ b/backend/test/integration/test_calendar.py @@ -197,6 +197,14 @@ def test_update_foreign_calendar(self, with_client, make_pro_subscriber, provide ) assert response.status_code == 403, response.text + def test_update_invalid_calendar_id(self, with_client, request): + response = with_client.put( + f'/cal/{9999}', + json={'title': 'b', 'url': 'b', 'user': 'b', 'password': 'b'}, + headers=auth_headers, + ) + assert response.status_code == 404, response.text + @pytest.mark.parametrize('provider,factory_name', get_calendar_factory()) def test_connect_calendar(self, with_client, provider, factory_name, request): generated_calendar = request.getfixturevalue(factory_name)() @@ -321,6 +329,34 @@ def test_connect_more_calendars_than_tier_allows( response = with_client.post(f'/cal/{cal[2].id}/connect', headers=auth_headers) assert response.status_code == 403, response.text + def test_create_connection_failure(self, with_client, make_google_calendar, request): + """Attempt to create google calendar connection without having external connection, expect failure""" + response = with_client.post( + '/cal', + json={ + 'title': 'A google calendar', + 'color': '#123456', + 'provider': CalendarProvider.google.value, + 'url': 'test', + 'user': 'test', + 'password': 'test', + }, + headers=auth_headers, + ) + assert response.status_code == 400, response.text + + @pytest.mark.parametrize('provider,factory_name', get_calendar_factory()) + def test_disconnect_calendar(self, with_client, provider, factory_name, request): + new_calendar = request.getfixturevalue(factory_name)(connected=True) + + response = with_client.post(f'/cal/{new_calendar.id}/disconnect', headers=auth_headers) + assert response.status_code == 200, response.text + data = response.json() + assert data['title'] == new_calendar.title + assert data['color'] == new_calendar.color + assert data['id'] == new_calendar.id + assert not data['connected'] + class TestCaldav: """Tests for caldav specific functionality""" @@ -414,3 +450,49 @@ def test_update_existing_caldav_calendar_without_password(self, with_client, wit assert cal.url == os.getenv('CALDAV_TEST_CALENDAR_URL') assert cal.user == os.getenv('CALDAV_TEST_USER') assert cal.password == '' + + +class TestGoogle: + """Tests for google specific functionality""" + def test_read_remote_google_calendar_connection_error( + self, + monkeypatch, + with_client, + make_pro_subscriber, + make_caldav_calendar, + make_schedule, + ): + """ Attempt to read remote google calendar without having external connection first; expect error """ + # Patch up the caldav constructor, and list_calendars (this test is for google only) + class MockGoogleConnector: + @staticmethod + def __init__( + self, + subscriber_id, + calendar_id, + redis_instance, + db, + remote_calendar_id, + google_client, + google_tkn: str = None, + ): + pass + + monkeypatch.setattr(GoogleConnector, '__init__', MockGoogleConnector.__init__) + + test_url = 'https://caldav.thunderbird.net/' + test_user = 'thunderbird' + + response = with_client.post( + '/rmt/calendars', + json={ + 'provider': CalendarProvider.google.value, + 'url': test_url, + 'user': test_user, + 'password': 'caw', + }, + headers=auth_headers, + ) + assert response.status_code == 400, response.text + data = response.json() + assert data['detail']['id'] == 'REMOTE_CALENDAR_CONNECTION_ERROR' diff --git a/backend/test/integration/test_general.py b/backend/test/integration/test_general.py index 447e923a..71e13e08 100644 --- a/backend/test/integration/test_general.py +++ b/backend/test/integration/test_general.py @@ -1,4 +1,4 @@ -import os +import os, pytest from defines import DAY1, DAY5, auth_headers @@ -29,40 +29,51 @@ def test_health_for_locale(self, with_client): assert response.status_code == 200 assert response.json() == 'Zustand in Ordnung' - def test_access_without_authentication_token(self, with_client): - # response = client.get("/login") - # assert response.status_code == 401 - response = with_client.put('/me') - assert response.status_code == 401 - response = with_client.get('/me/calendars') - assert response.status_code == 401 - response = with_client.get('/me/appointments') - assert response.status_code == 401 - response = with_client.get('/me/signature') - assert response.status_code == 401 - response = with_client.post('/me/signature') - assert response.status_code == 401 - response = with_client.post('/cal') - assert response.status_code == 401 - response = with_client.get('/cal/1') - assert response.status_code == 401 - response = with_client.put('/cal/1') - assert response.status_code == 401 - response = with_client.post('/cal/1/connect') - assert response.status_code == 401 - response = with_client.delete('/cal/1') - assert response.status_code == 401 - response = with_client.post('/rmt/calendars') - assert response.status_code == 401 - response = with_client.get('/rmt/cal/1/' + DAY1 + '/' + DAY5) - assert response.status_code == 401 - response = with_client.post('/rmt/sync') - assert response.status_code == 401 - response = with_client.get('/account/download') - assert response.status_code == 401 - response = with_client.delete('/account/delete') - assert response.status_code == 401 - response = with_client.get('/google/auth') + @pytest.mark.parametrize('api_method, api_route', [ + ('get', '/me'), + ('put', '/me'), + ('get', '/me/calendars'), + ('get', '/me/appointments'), + ('get', '/me/signature'), + ('post', '/me/signature'), + ('post', '/cal'), + ('get', '/cal/1'), + ('put', '/cal/1'), + ('post', '/cal/1/connect'), + ('delete', '/cal/1'), + ('post', '/caldav/auth'), + ('post', '/caldav/disconnect'), + ('post', '/rmt/calendars'), + ('get', '/rmt/cal/1/' + DAY1 + '/' + DAY5), + ('post', '/rmt/sync'), + ('get', '/account/available-emails'), + ('get', '/account/download'), + ('get', '/account/external-connections/'), + ('delete', '/account/delete'), + ('get', '/google/auth'), + ('post', '/google/disconnect'), + ('post', '/schedule'), + ('get', '/schedule'), + ('get', '/schedule/0'), + ('put', '/schedule/0'), + ('get', '/invite'), + ('post', '/invite/generate/1'), + ('put', '/invite/revoke/1'), + ('get', '/subscriber'), + ('put', '/subscriber/enable/someemail@email.com'), + ('put', '/subscriber/disable/someemail@email.com'), + ('post', '/subscriber/setup'), + ('post', '/waiting-list/invite'), + ]) + def test_access_without_authentication_token(self, with_client, api_method, api_route): + if api_method == 'post': + response = with_client.post(f'{api_route}') + elif api_method == 'get': + response = with_client.get(f'{api_route}') + elif api_method == 'put': + response = with_client.put(f'{api_route}') + else: + response = with_client.delete(f'{api_route}') assert response.status_code == 401 def test_send_feedback(self, with_client): @@ -70,3 +81,13 @@ def test_send_feedback(self, with_client): '/support', json={'topic': 'Hello World', 'details': 'Hello World but longer'}, headers=auth_headers ) assert response.status_code == 200 + + def test_send_feedback_no_email_configured(self, with_client): + """Attempt to send feedback with no support email configured; expect error""" + saved_email = os.environ['SUPPORT_EMAIL'] + os.environ['SUPPORT_EMAIL'] = '' + response = with_client.post( + '/support', json={'topic': 'Hello World', 'details': 'Hello World but longer'}, headers=auth_headers + ) + os.environ['SUPPORT_EMAIL'] = saved_email + assert response.status_code == 500 diff --git a/backend/test/integration/test_invite.py b/backend/test/integration/test_invite.py index 158b5eb5..6d6c0747 100644 --- a/backend/test/integration/test_invite.py +++ b/backend/test/integration/test_invite.py @@ -1,9 +1,12 @@ import os +from datetime import datetime from defines import auth_headers, TEST_USER_ID from appointment.database import repo - +from appointment.database.models import InviteStatus class TestInvite: + today = today = datetime.today().date() + def test_send_invite_email_requires_admin(self, with_db, with_client): """Ensures send_invite_email requires an admin user""" @@ -50,6 +53,140 @@ def test_send_invite_email(self, with_db, with_client): subscriber = repo.subscriber.get_by_email(db, invite_email) assert subscriber is not None + def test_send_invite_email_subscriber_already_exists(self, with_db, with_client, make_pro_subscriber): + """Ensures send_invite_email fails if subscriber already exists""" + + os.environ['APP_ADMIN_ALLOW_LIST'] = '@example.org' + + the_other_guy = make_pro_subscriber() + + response = with_client.post( + '/invite/send', + json={'email': the_other_guy.email}, + headers=auth_headers, + ) + assert response.status_code == 400, response.text + data = response.json() + assert data['detail']['id'] == 'CREATE_SUBSCRIBER_ALREADY_EXISTS' + + def test_send_invite_email_subscriber_fails(self, with_db, with_client): + """Ensures send_invite_email fails if invite email is invalid""" + + os.environ['APP_ADMIN_ALLOW_LIST'] = '@example.org' + + invite_email = 'hi' + + response = with_client.post( + '/invite/send', + json={'email': invite_email}, + headers=auth_headers, + ) + assert response.status_code == 422, response.text + data = response.json() + assert 'not a valid email address' in data['detail'][0]['msg'] + + def test_get_all_invites_requires_admin(self, with_db, with_client): + """Ensures getting all invites requires an admin user""" + + os.environ['APP_ADMIN_ALLOW_LIST'] = '@notexample.org' + + response = with_client.get( + '/invite', + headers=auth_headers, + ) + assert response.status_code == 401, response.text + data = response.json() + assert data['detail']['id'] == 'INVALID_PERMISSION_LEVEL' + + def test_get_all_invites(self, with_db, with_client, make_invite): + """Ensures we can get all invites""" + os.environ['APP_ADMIN_ALLOW_LIST'] = '@example.org' + + invites = [make_invite(owner_id=TEST_USER_ID) for _ in range(2)] + + response = with_client.get( + '/invite', + headers=auth_headers, + ) + assert response.status_code == 200, response.text + invite_list = response.json() + assert len(invite_list) == 2 + assert invite_list[0]['code'] != invite_list[1]['code'] + + for next_invite in invite_list: + assert next_invite['owner_id'] == TEST_USER_ID + assert next_invite['code'] is not None + date_created = datetime.fromisoformat(next_invite['time_created']).date() + assert date_created == self.today + + def test_generate_invites(self, with_client): + """Ensures we can generate new invites""" + response = with_client.post( + '/invite/generate/5', + headers=auth_headers, + ) + assert response.status_code == 200, response.text + invite_list = response.json() + assert len(invite_list) == 5 + + for next_invite in invite_list: + assert next_invite['status'] == InviteStatus.active.value + assert next_invite['code'] is not None + date_created = datetime.fromisoformat(next_invite['time_created']).date() + assert date_created == self.today + + response = with_client.get( + '/invite', + headers=auth_headers, + ) + assert response.status_code == 200, response.text + data = response.json() + assert len(data) == 5 + + def test_revoke_invite(self, with_db, with_client, make_invite): + """Ensures we can revoke an invite code""" + os.environ['APP_ADMIN_ALLOW_LIST'] = '@example.org' + + test_invite = make_invite(owner_id=TEST_USER_ID) + assert test_invite.status == InviteStatus.active + + response = with_client.put( + f'/invite/revoke/{test_invite.code}', + headers=auth_headers, + ) + assert response.status_code == 200, response.text + + # verify our invite now has a status of revoked + response = with_client.get( + '/invite', + headers=auth_headers, + ) + + assert response.status_code == 200, response.text + invite_list = response.json() + assert len(invite_list) == 1 + assert invite_list[0]['status'] == InviteStatus.revoked.value + + # attempt to revoke the same already-revoked invite code, expect fail + response = with_client.put( + f'/invite/revoke/{test_invite.code}', + headers=auth_headers, + ) + assert response.status_code == 403, response.text + data = response.json() + assert data['detail']['id'] == 'INVITE_CODE_NOT_AVAILABLE' + + + def test_revoke_invite_not_found(self, with_db, with_client, make_invite): + """Ensures revoking an invite code fails if code is invalid""" + response = with_client.put( + '/invite/revoke/99999', + headers=auth_headers, + ) + assert response.status_code == 404, response.text + data = response.json() + assert data['detail']['id'] == 'INVITE_CODE_NOT_FOUND' + class TestPublicInvites: def test_empty(self, with_client): diff --git a/backend/test/integration/test_profile.py b/backend/test/integration/test_profile.py index a823bafd..2fd3d61c 100644 --- a/backend/test/integration/test_profile.py +++ b/backend/test/integration/test_profile.py @@ -52,3 +52,18 @@ def test_signed_short_link_refresh(self, with_client): assert response.status_code == 200, response.text url_new = response.json()['url'] assert url_old != url_new + + def test_update_me_username_taken(self, with_db, with_client, make_pro_subscriber): + """Attempt to update current subscriber's profile with already existing username""" + other_subscriber = make_pro_subscriber(username='thunderbird1') + + response = with_client.put( + '/me', + json={ + 'username': 'thunderbird1', + 'name': 'Changed Name', + 'secondary_email': 'adifferentone@example.org', + }, + headers=auth_headers, + ) + assert response.status_code == 403, response.text diff --git a/backend/test/integration/test_schedule.py b/backend/test/integration/test_schedule.py index 1f67250d..b3912760 100644 --- a/backend/test/integration/test_schedule.py +++ b/backend/test/integration/test_schedule.py @@ -285,6 +285,21 @@ def test_update_foreign_schedule( ) assert response.status_code == 403, response.text + def test_update_schedule_unconnected_calendar( + self, with_client, make_caldav_calendar, make_schedule, schedule_input + ): + generated_calendar = make_caldav_calendar(connected=False) + generated_schedule = make_schedule(calendar_id=generated_calendar.id) + + response = with_client.put( + f'/schedule/{generated_schedule.id}', + json={'calendar_id': generated_schedule.calendar_id, **schedule_input}, + headers=auth_headers, + ) + assert response.status_code == 403, response.text + data = response.json() + assert data['detail']['id'] == 'CALENDAR_NOT_CONNECTED' + def test_public_availability( self, monkeypatch, with_client, make_pro_subscriber, make_caldav_calendar, make_schedule ): @@ -463,6 +478,20 @@ def get_busy_time(self, calendar_ids, start, end): else models.BookingStatus.booked.value ) + def test_public_availability_sched_not_active(self, with_client, make_pro_subscriber): + subscriber = make_pro_subscriber() + signed_url = signed_url_by_subscriber(subscriber) + + # Check availability at the start of the schedule + response = with_client.post( + '/schedule/public/availability', + json={'url': signed_url}, + headers=auth_headers, + ) + assert response.status_code == 404, response.text + data = response.json() + assert data['detail']['id'] == 'SCHEDULE_NOT_ACTIVE' + class TestRequestScheduleAvailability: @pytest.fixture diff --git a/backend/test/integration/test_subscriber.py b/backend/test/integration/test_subscriber.py new file mode 100644 index 00000000..c71a372f --- /dev/null +++ b/backend/test/integration/test_subscriber.py @@ -0,0 +1,195 @@ +import os +from datetime import datetime +from defines import auth_headers, TEST_USER_ID +from appointment.database.models import SubscriberLevel + + +class TestSubscriber: + def test_get_all_subscribers(self, with_client, make_basic_subscriber): + # make our current subscriber admin + os.environ['APP_ADMIN_ALLOW_LIST'] = '@example.org' + + response = with_client.get( + '/subscriber', headers=auth_headers + ) + + assert response.status_code == 200, response.text + data = response.json() + number_of_subscribers = len(data) + assert number_of_subscribers > 0 + + # verify values + test_subscriber = data[TEST_USER_ID -1] + assert test_subscriber['username'] == os.getenv('TEST_USER_EMAIL') + assert test_subscriber['email'] == os.getenv('TEST_USER_EMAIL') + assert test_subscriber['preferred_email'] == os.getenv('TEST_USER_EMAIL') + assert test_subscriber['name'] == 'Test Account' + assert test_subscriber['short_link_hash'] == 'abc1234' + assert test_subscriber['id'] == TEST_USER_ID + assert test_subscriber['language'] == 'en' + assert test_subscriber['timezone'] == None + assert test_subscriber['is_setup'] == False + assert test_subscriber['ftue_level'] == 0 + assert test_subscriber['level'] == SubscriberLevel.pro.value + + # now make a new basic subscriber + new_subscriber = make_basic_subscriber() + + # get subscribers again + response = with_client.get( + '/subscriber', headers=auth_headers + ) + + assert response.status_code == 200, response.text + data = response.json() + + # the last subscriber in the returned list is the latest one we created, verify values + assert len(data) == number_of_subscribers + 1 + subscriber_ret = data[len(data) -1] + assert subscriber_ret['username'] == new_subscriber.username + assert subscriber_ret['email'] == new_subscriber.email + assert subscriber_ret['preferred_email'] == new_subscriber.preferred_email + assert subscriber_ret['name'] == new_subscriber.name + assert subscriber_ret['short_link_hash'] == new_subscriber.short_link_hash + assert subscriber_ret['id'] == number_of_subscribers + 1 + assert subscriber_ret['language'] == new_subscriber.language + assert subscriber_ret['timezone'] == new_subscriber.timezone + assert subscriber_ret['is_setup'] == new_subscriber.is_setup + assert subscriber_ret['ftue_level'] == new_subscriber.ftue_level + assert subscriber_ret['level'] == new_subscriber.level.value + + def test_get_all_subscribers_no_admin(self, with_client): + # ensure our current subscriber is not admin + os.environ['APP_ADMIN_ALLOW_LIST'] = '@notexample.org' + + response = with_client.get( + '/subscriber', headers=auth_headers + ) + assert response.status_code == 401, response.text + data = response.json() + assert data['detail']['id'] == 'INVALID_PERMISSION_LEVEL' + + def test_disable_enable_subscriber(self, with_client, make_basic_subscriber): + # make our current subscriber admin + os.environ['APP_ADMIN_ALLOW_LIST'] = '@example.org' + + # make a new subscriber + new_subscriber = make_basic_subscriber() + assert new_subscriber.time_deleted is None + + # disable our new subscriber and verify + response = with_client.put( + f'/subscriber/disable/{new_subscriber.email}', headers=auth_headers + ) + + assert response.status_code == 200, response.text + response = with_client.get( + '/subscriber', headers=auth_headers + ) + + assert response.status_code == 200, response.text + data = response.json() + subscriber_ret = data[len(data) -1] + assert subscriber_ret['time_deleted'] is not None + + today = today = datetime.today().date() + date_deleted = datetime.fromisoformat(subscriber_ret['time_deleted']).date() + assert date_deleted == today + + # attempt to disable same subscriber again, expect fail + response = with_client.put( + f'/subscriber/disable/{new_subscriber.email}', headers=auth_headers + ) + + assert response.status_code == 400, response.text + data = response.json() + assert data['detail']['id'] == 'SUBSCRIBER_ALREADY_DELETED' + + # now enable the deleted subscriber and verify + response = with_client.put( + f'/subscriber/enable/{new_subscriber.email}', headers=auth_headers + ) + + assert response.status_code == 200, response.text + response = with_client.get( + '/subscriber', headers=auth_headers + ) + + assert response.status_code == 200, response.text + data = response.json() + subscriber_ret = data[len(data) -1] + assert subscriber_ret['time_deleted'] is None + + # attempt to enable the same subscriber again, expect fail + response = with_client.put( + f'/subscriber/enable/{new_subscriber.email}', headers=auth_headers + ) + + assert response.status_code == 400, response.text + data = response.json() + assert data['detail']['id'] == 'SUBSCRIBER_ALREADY_ENABLED' + + def test_disable_subscriber_self_delete_failure(self, with_client): + # make our current subscriber admin + os.environ['APP_ADMIN_ALLOW_LIST'] = '@example.org' + + # disable our current subscriber and verify + response = with_client.put( + f'/subscriber/disable/{os.getenv('TEST_USER_EMAIL')}', headers=auth_headers + ) + + assert response.status_code == 403, response.text + data = response.json() + assert data['detail']['id'] == 'SUBSCRIBER_SELF_DELETE' + + def test_disable_subscriber_not_found(self, with_client, make_basic_subscriber): + # make our current subscriber admin + os.environ['APP_ADMIN_ALLOW_LIST'] = '@example.org' + + # disable a subscriber that doesn't exist + response = with_client.put( + '/subscriber/disable/this-user-does-not-exist@someemail.com', headers=auth_headers + ) + + assert response.status_code == 404, response.text + data = response.json() + assert data['detail']['id'] == 'SUBSCRIBER_NOT_FOUND' + + def test_disable_subscriber_no_admin(self, with_client): + # ensure our current subscriber is not admin + os.environ['APP_ADMIN_ALLOW_LIST'] = '@notexample.org' + + response = with_client.put( + f'/subscriber/disable/{os.getenv('TEST_USER_EMAIL')}', headers=auth_headers + ) + + assert response.status_code == 401, response.text + data = response.json() + assert data['detail']['id'] == 'INVALID_PERMISSION_LEVEL' + + + def test_enable_subscriber_not_found(self, with_client, make_basic_subscriber): + # make our current subscriber admin + os.environ['APP_ADMIN_ALLOW_LIST'] = '@example.org' + + # disable a subscriber that doesn't exist + response = with_client.put( + '/subscriber/enable/this-user-does-not-exist@someemail.com', headers=auth_headers + ) + + assert response.status_code == 404, response.text + data = response.json() + assert data['detail']['id'] == 'SUBSCRIBER_NOT_FOUND' + + + def test_enable_subscriber_no_admin(self, with_client): + # ensure our current subscriber is not admin + os.environ['APP_ADMIN_ALLOW_LIST'] = '@notexample.org' + + response = with_client.put( + f'/subscriber/enable/{os.getenv('TEST_USER_EMAIL')}', headers=auth_headers + ) + + assert response.status_code == 401, response.text + data = response.json() + assert data['detail']['id'] == 'INVALID_PERMISSION_LEVEL' diff --git a/backend/test/integration/test_waiting_list.py b/backend/test/integration/test_waiting_list.py index e87c7516..da7c182e 100644 --- a/backend/test/integration/test_waiting_list.py +++ b/backend/test/integration/test_waiting_list.py @@ -192,6 +192,21 @@ def test_bad_token_data_email_not_in_list(self, with_db, with_client): with with_db() as db: assert not db.query(models.WaitingList).filter(models.WaitingList.email == email).first() + def test_action_failed(self, with_db, with_client, make_waiting_list): + email = 'hello@example.org' + + waiting_list = make_waiting_list(email='hellokitty@example.org') + + serializer = URLSafeSerializer(os.getenv('SIGNED_SECRET'), 'waiting-list') + confirm_token = serializer.dumps({'email': email, 'action': WaitingListAction.CONFIRM_EMAIL.value}) + + response = with_client.post('/waiting-list/action', json={'token': confirm_token}) + + # expect the waiting list confirm email action to fail as email not on the list + assert response.status_code == 400, response.json() + data = response.json() + assert data['detail']['id'] == 'WAITING_LIST_FAIL' + class TestWaitingListActionLeave: def assert_waiting_list_exists(self, db, waiting_list, success=True):