Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix user deletion #8700

Merged
merged 2 commits into from
May 30, 2024
Merged

Fix user deletion #8700

merged 2 commits into from
May 30, 2024

Conversation

seanh
Copy link
Contributor

@seanh seanh commented May 7, 2024

Replace UserDeleteService with a completely new service that marks users as deleted and then purges their data incrementally in the background. This should fix issues with the user delete admin page timing out when trying to delete users with large amounts of data because UserDeleteService tries to delete all the user's data at once during the web request and this takes too long.

The way the new service works is:

  1. The delete user admin page calls UserDeleteService.delete_user() which:

    1. Sets user.deleted = True
    2. Adds a "purge_user" job to the job table in the DB
    3. Adds a record of the deletion to the user_deletion table

    All three are done in the same DB transaction, so it's not possible to mark a user as deleted but fail to add the "purge_user" job. The "purge_user" job won't be deleted from the DB until all of the user's data has been purged.

  2. A periodic purge_deleted_users() task (Add purge_deleted_users() task to h h-periodic#441) runs every hour and calls UserDeleteService.purge_deleted_users() which:

    1. Takes "purge_user" jobs from the table
    2. Deletes up to a fixed maximum amount of the user's data (authtickets, tokens, flags, feature cohort memberships, annotations, groups, group memberships, group creators, and finally the user row itself). If it has deleted all of the user's data then purge_deleted_users() deletes the "purge_user" job. Otherwise it leaves the job on the queue and it will pick up the same job again next time and continue working on it.
      1. purge_deleted_users() doesn't actually delete annotations from the DB, it just marks them as deleted by setting Annotation.deleted=True. The existing purge_deleted_annotations() task will come along later and actually delete them from the DB. This is the same way that annotations are deleted when deleting them one-by-one via the API, so it's good to be consistent. It's done this way because the WebSocket code can have issues if annotations are deleted immediately. Also, JobQueueService.add_by_id() can only add a "sync_annotation" job for an annotation if that annotation still exists in the DB.
      2. Whenever purge_deleted_users() marks an annotation as deleted it also adds a "sync_annotation" job to the job queue. This is done in the same DB transaction, so it's not possible to mark an annotation as deleted without adding a "sync_annotation" job to the queue.
      3. The already-existing periodic task sync_annotations() will then consume these "sync_annotation" jobs and delete the annotations from Elasticseach, and won't delete the "sync_annotation" job from the queue until it has verified that the annotation has been deleted from Elasticsearch.

Testing

  1. Checkout Add purge_deleted_users() task to h h-periodic#441 and run h-periodic's make dev

  2. You might want to hack h-periodic to run the purge-deleted-annotations and purge-deleted-users tasks more frequently:

    diff --git a/h_periodic/h_beat.py b/h_periodic/h_beat.py
    index 3ddf9dc..46d07a0 100644
    --- a/h_periodic/h_beat.py
    +++ b/h_periodic/h_beat.py
    @@ -23,7 +23,7 @@ celery.conf.update(
             "purge-deleted-annotations": {
                 "options": {"expires": 1800},
                 "task": "h.tasks.cleanup.purge_deleted_annotations",
    -            "schedule": timedelta(hours=1),
    +            "schedule": timedelta(minutes=1),
             },
             "purge-expired-authtickets": {
                 "options": {"expires": 1800},
    @@ -48,7 +48,7 @@ celery.conf.update(
             "purge-deleted-users": {
                 "options": {"expires": 1800},
                 "task": "h.tasks.cleanup.purge_deleted_users",
    -            "schedule": timedelta(hours=1),
    +            "schedule": timedelta(minutes=1),
             },
             "sync-annotations": {
                 "options": {"expires": 30},
  3. You might want to hack h to delete fewer rows per run of the purge_deleted_users() task, to force it to take more than one task run to purge the test user:

    diff --git a/h/services/user_delete.py b/h/services/user_delete.py
    index 3f38b339f..3f965af39 100644
    --- a/h/services/user_delete.py
    +++ b/h/services/user_delete.py
    @@ -61,7 +61,7 @@ class UserDeleteService:
                 )
             )
     
    -    def purge_deleted_users(self, limit=1000):
    +    def purge_deleted_users(self, limit=2):
             """Incrementally purge data of users who've been marked as deleted."""
             jobs = self.job_queue.get(self.job_name, limit)
  4. Log in to http://localhost:5000/login as devdata_admin in one browser and as devdata_user in another browser (logging in will create two authtickets in the DB, one of which will be deleted when we delete devdata_user below)

  5. Create a developer token for devdata_user: go to http://localhost:5000/account/developer and click Generate your API token

  6. Go to http://localhost:5000/docs/help as devdata_user and log in to the client. This will create an OAuth token in the DB

  7. Create a couple of annotations on http://localhost:5000/docs/help as devdata_user

  8. Go to http://localhost:5000/docs/help as devdata_admin, log in, and flag one of devdata_user's annotations by clicking the flag icon on the annotation

  9. Create an annotation of http://localhost:5000/docs/help as devdata_admin

  10. Go to http://localhost:5000/docs/help as devdata_user and flag devdata_admin's annotation

  11. As devdata_admin go to http://localhost:5000/admin/features/cohorts, create a feature cohort, and add devdata_user to the feature cohort

  12. As devdata_user go to http://localhost:5000/groups/new and create three groups

  13. As devdata_user go to http://localhost:5000/docs/help and create some annotations in the first group

  14. Have devdata_admin join the second group (you need to copy the group's invite link as devdata_user and open it as devdata_admin)

  15. Have devdata_admin join the third group and create an annotation in it

  16. As devdata_admin go to http://localhost:5000/groups/new, create a group, copy the invite link, and have devdata_user join the group

  17. At this point you should be able to see lots of data belonging to devdata_user and their annotations in the DB: annotations in the annotationtable; an auth ticket (used for logging in to h's web pages) in authticket; a feature cohort membership in featurecohort_user; flags in flag; groups in group; group memberships in user_group; an OAuth 2 token (used by the client) and a developer token in token; There are some other related tables where users may have data, e.g. annotation_slim, annotation_metadata, annotation_moderation, user_identity, but these have foreign keys to annotation or user with ON DELETE CASCADE so any data in these tables will be deleted when the annotations or user are deleted.

  18. You should also be able to see devdata_user's annotations in Elasticsearch at http://localhost:9200/_search.

  19. Go to http://localhost:5000/admin/users?username=devdata_user&authority=localhost as devdata_admin and delete devdata_user

You should see output like this in the terminal:

Task h.tasks.cleanup.purge_deleted_users[...] received
...
h.tasks.cleanup.purge_deleted_users[...] Deleted 0 rows from authticket
h.tasks.cleanup.purge_deleted_users[...] Deleted 1 rows from token
h.tasks.cleanup.purge_deleted_users[...] Deleted 0 rows from flag
h.tasks.cleanup.purge_deleted_users[...] Deleted 0 rows from featurecohort_user
h.tasks.cleanup.purge_deleted_users[...] Deleted 1 rows from annotation
h.tasks.cleanup.purge_deleted_users[...] Enqueued job to delete annotation from Elasticsearch: ...
h.tasks.cleanup.purge_deleted_users[...] Task h.tasks.cleanup.purge_deleted_users[...] succeeded in 0....s: None
...
Task h.tasks.cleanup.purge_deleted_users[...] received
...
h.tasks.cleanup.purge_deleted_users[...] Deleted 0 rows from authticket
h.tasks.cleanup.purge_deleted_users[...] Deleted 0 rows from token
h.tasks.cleanup.purge_deleted_users[...] Deleted 0 rows from flag
h.tasks.cleanup.purge_deleted_users[...] Deleted 0 rows from featurecohort_user
h.tasks.cleanup.purge_deleted_users[...] Deleted 2 rows from annotation
h.tasks.cleanup.purge_deleted_users[...] Enqueued job to delete annotation from Elasticsearch: ...
h.tasks.cleanup.purge_deleted_users[...] Enqueued job to delete annotation from Elasticsearch: ...
h.tasks.cleanup.purge_deleted_users[...] Task h.tasks.cleanup.purge_deleted_users[...] succeeded in 0....s: None
...
Task h.tasks.cleanup.purge_deleted_users[...] received
...
h.tasks.cleanup.purge_deleted_users[...] Deleted 0 rows from authticket
h.tasks.cleanup.purge_deleted_users[...] Deleted 0 rows from token
h.tasks.cleanup.purge_deleted_users[...] Deleted 0 rows from flag
h.tasks.cleanup.purge_deleted_users[...] Deleted 0 rows from featurecohort_user
h.tasks.cleanup.purge_deleted_users[...] Deleted 0 rows from annotation
h.tasks.cleanup.purge_deleted_users[...] Deleted 0 rows from group
h.tasks.cleanup.purge_deleted_users[...] Deleted 0 rows from user_group
h.tasks.cleanup.purge_deleted_users[...] Updated 0 rows from group
h.tasks.cleanup.purge_deleted_users[...] Deleted 1 rows from user
h.tasks.cleanup.purge_deleted_users[...] Task h.tasks.cleanup.purge_deleted_users[...] succeeded in 0....s: None
...
Task h.tasks.cleanup.purge_deleted_users[...] received
...
h.tasks.cleanup.purge_deleted_users[...] There are no 'purge_user' jobs
Task h.tasks.cleanup.purge_deleted_users[...] succeeded in 0....s: None

You should also see messages from the sync_annotations task about syncing the deleted annotations. It can take a minute or two after deleting the annotations from the DB before these appear:

{'Synced/UserDeleteService.delete_annotations/Deleted_from_db': 1, 'Synced/UserDeleteService.delete_annotations/Total': 1, 'Synced/Total': 1}
...
{'Completed/UserDeleteService.delete_annotations/Deleted_from_db': 1, 'Completed/UserDeleteService.delete_annotations/Total': 1, 'Completed/Total': 1}

If you inspect the DB and Elasticsearch you should see that all the user's data and annotations are gone.

@seanh seanh requested a review from marcospri May 7, 2024 11:32
@seanh seanh changed the title Add user.deleted column Fix user deletion May 7, 2024
@seanh seanh force-pushed the fix-user-delete branch 3 times, most recently from 7bc00a5 to de7bf40 Compare May 9, 2024 17:06
@seanh seanh force-pushed the fix-user-delete branch 8 times, most recently from d1449b0 to 36825a2 Compare May 23, 2024 16:27
@seanh seanh force-pushed the fix-user-delete branch 4 times, most recently from c9669ef to a825e0f Compare May 23, 2024 18:42
@seanh seanh marked this pull request as ready for review May 23, 2024 18:42
):
self._db = db_session
self._annotation_delete_service = annotation_delete_service
job_name = "purge_user"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reckon this belongs to:

SYNC_ANNOTATION = "sync_annotation"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I think this JobName enum is in the wrong place.

Having it on JobQueueService is a pain because JobQueueService is mocked in UserDeleteService's tests, so JobQueueService.JobName.PURGE_USER is a mock not a string, which causes SQLAlchemy to crash when UserDeleteService tries to write a mock to the DB.

It can be fixed by moving JobName onto models.Job instead, which I think is where it belongs anyway: JobQueueService may not be the only code reading and writing the job table: d8fcf1e

I am slightly dubious about the value of these enums. The existing JobName.SYNC_ANNOTATION and ANNOTATION_SLIM aren't actually used anywhere! All the code just duplicates the strings.

continue

try:
purger.delete_authtickets(user)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this task will go as far down this list as the limit permits.

When finishing via the break the transactions will be committed and next time around it will continue down the list.


def delete_user(self, user):
"""Delete `user`."""
self.worker.delete(User, select(User.id).where(User.id == user.id))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nice to have would be to have UserDeletion.completed_at and update it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would just be misleading unfortunately. At the time when the user row is deleted the user may actually still have annotations (and annotation slims etc) in the DB waiting to be purged by the purge-deleted-annotations task, and they may still have annotations in Elasticsearch waiting to be purged by the sync-annotation task.

Copy link
Member

@marcospri marcospri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All good, thanks for the detailed testing instructions.

Comment on lines +52 to +55
class JobName(str, Enum):
SYNC_ANNOTATION = "sync_annotation"
ANNOTATION_SLIM = "annotation_slim"
PURGE_USER = "purge_user"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this enum from the service onto the model and added PURGE_USER.

Enums on services get mocked in the unit tests of other services which is a pain (e.g. SQLAlchemy crashes when trying to write mocks to the DB). I think the model is probably a better place for this enum anyway: JobQueueService may not be the only code that needs to work directly with the job table.

Note that the existing SYNC_ANNOTATION and ANNOTATION_SLIM in this enum aren't used anyway, everywhere just uses the strings directly!

sa.func.count(Annotation.id) # pylint:disable=not-callable
num_annotations=self.db.scalar(
select(
func.count(Annotation.id) # pylint:disable=not-callable
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Counting the annotation table here. Will this be fast enough? May want to change this to AnnotationSlim once back-filling it is finished.

Comment on lines +118 to +137
def delete_authtickets(self, user):
"""Delete all AuthTicket's belonging to `user`."""
self.worker.delete(
AuthTicket, select(AuthTicket.id).where(AuthTicket.user == user)
)

def delete_tokens(self, user):
"""Delete all tokens belonging to `user`."""
self.worker.delete(Token, select(Token.id).where(Token.user == user))

def delete_flags(self, user):
"""Delete all flags created by `user`."""
self.worker.delete(Flag, select(Flag.id).where(Flag.user_id == user.id))

def delete_featurecohort_memberships(self, user):
"""Remove `user` from all feature cohorts."""
self.worker.delete(
FeatureCohortUser,
select(FeatureCohortUser.id).where(FeatureCohortUser.user_id == user.id),
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all these methods may belong in model-specific services (AuthTicketService etc) where they can potentially be re-used by other code. But that's a refactoring that I don't want to bite off right now.

Comment on lines 139 to 187
def delete_annotations(self, user):
"""Delete all of `user`'s annotations from both Postgres and Elasticsearch."""
now = datetime.utcnow()

deleted_annotation_ids = self.worker.update(
Annotation,
select(Annotation.id).where(Annotation.userid == user.userid),
{
# We don't actually delete the user's annotations from the DB,
# we only mark them as deleted.
# The marked-as-deleted annotations will later be purged by the
# periodic purge_deleted_annotations() task.
#
# This is because there are parts of h that don't work if
# annotations are deleted immediately, including the WebSocket
# and the call to JobQueueService.add_by_id() below.
#
# This is the same as what happens when annotations are deleted
# individually by the API.
"deleted": True,
# Bump the annotation's updated time when marking it as deleted.
# This is to prevent the purge_deleted_annotations() task from
# purging the annotation too soon: that task only purges
# deleted annotations whose updated time is more than ten mins
# ago.
"updated": now,
},
)

# Whenever we update annotations we also need to update the corresponding annotation_slims.
num_deleted_annotation_slims = self.db.execute(
update(AnnotationSlim)
.where(AnnotationSlim.pubid.in_(deleted_annotation_ids))
.values({"deleted": True, "updated": now})
).rowcount
log.info("Updated %d rows from annotation_slim", num_deleted_annotation_slims)

# Add jobs to the queue so the annotations will eventually be deleted from Elasticsearch.
for annotation_id in deleted_annotation_ids:
self.job_queue.add_by_id(
name="sync_annotation",
annotation_id=annotation_id,
tag="UserDeleteService.delete_annotations",
schedule_in=60,
)
log.info(
"Enqueued job to delete annotation from Elasticsearch: %s",
annotation_id,
)
Copy link
Contributor Author

@seanh seanh May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This duplicates code from AnnotationDeleteService.delete() (set annotation.deleted = True, update annotation.updated, update the corresponding annotation_slim) but does it in a more efficient way and with support for a limit (via LimitedWorker).

This should probably be refactored to remove the duplication: AnnotationDeleteService.delete() should probably be refactored to work more efficiently like this delete_annotation() method and with support for deleting multiple annotations and using a limited worker. But I don't think I want to bite that off right now.

AnnotationDeleteService.delete() also sends an AnnotationEvent for the deleted annotation but I don't think I want to do that here: I don't think we want bulk-deleting annotations to cause an explosion of AnnotationEvent's to be sent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AnnotationEvent's would:

  1. Cause the annotations to be deleted from Elasticsearch:

    h/h/subscribers.py

    Lines 41 to 50 in f1669fd

    @subscriber(AnnotationEvent)
    def annotation_sync(event):
    """Ensure an annotation is synchronised to Elasticsearch."""
    # Checking feature flags opens a connection to the database. As this event
    # is processed after the main transaction has closed, we must open a new
    # transaction to ensure we don't leave an un-closed transaction
    with event.request.tm:
    search_index = event.request.find_service(name="search_index")
    search_index.handle_annotation_event(event)
    . Deleting annotations from Elasticsearch would already be achieved by the jobs that're added to the queue by both AnnotationDeleteService.delete() and UserDeleteService. But the AnnotationEvent causes an attempt to be made to delete the annotation from Elasticsearch synchronously, meaning the job queue only needs to delete it if this attempt fails. This is part of why the current method for deleting users doesn't work: it attempts to synchronously delete all the user's annotations from Elasticsearch during the admin pages form submission request, and times out. I don't think we want this synchronous attempt when bulk-deleting all the user's annotations: we actually just want to let the job queue handle it in time.

  2. Publish an annotation event to the message queue (for the WebSocket):

    h/h/subscribers.py

    Lines 53 to 65 in f1669fd

    @subscriber(AnnotationEvent)
    def publish_annotation_event(event):
    """Publish an annotation event to the message queue."""
    data = {
    "action": event.action,
    "annotation_id": event.annotation_id,
    "src_client_id": event.request.headers.get("X-Client-Id"),
    }
    try:
    event.request.realtime.publish_annotation(data)
    except RealtimeMessageQueueError as err:
    report_exception(err)
    . I guess this may make the deleted annotations disappear from connected client without a reload? But I don't think we want this when bulk-deleting all a user's annotations: I don't think we want to cause an explosion in WebSocket messages.

  3. Cause reply notifications to be sent:

    h/h/subscribers.py

    Lines 68 to 89 in f1669fd

    @subscriber(AnnotationEvent)
    def send_reply_notifications(event):
    """Queue any reply notification emails triggered by an annotation event."""
    request = event.request
    with request.tm:
    annotation = request.find_service(AnnotationReadService).get_annotation_by_id(
    event.annotation_id
    )
    notification = reply.get_notification(request, annotation, event.action)
    if notification is None:
    return
    send_params = emails.reply_notification.generate(request, notification)
    try:
    mailer.send.delay(*send_params)
    except OperationalError as err: # pragma: no cover
    # We could not connect to rabbit! So carry on
    report_exception(err)
    . Presumably no notification emails are actually sent when annotations are deleted, but some code would be executed to process the event and figure out that there's no notification to be sent. Again we don't want this: executing this extra code again and again for lots of annotations is pointless and could cause a timeout.

I think that's everything the AnnotationEvent's do

Comment on lines +189 to +264
def delete_groups(self, user):
"""
Delete groups created by `user` that have no annotations.

Delete groups that were created by `user` and that don't contain any
non-deleted annotations.

If delete_annotations() (above) is called first then all of `user`'s
own annotations will already have been deleted, so ultimately any
groups created by `user` that don't contain any annotations by *other*
users will get deleted.

Known issue: if this method does not delete a group because it contains
annotations by other users, and those annotations other users are later
deleted, then the group will no longer contain any annotations but will
never be deleted.

Known issue: this will also delete all of the groups memberships due to
a foreign key constraint with ondelete="cascade". If a group had a
really large number of members this could take too long and cause a
timeout.
"""
# pylint:disable=not-callable,use-implicit-booleaness-not-comparison-to-zero
self.worker.delete(
Group,
select(Group.id)
.where(Group.creator_id == user.id)
.outerjoin(
Annotation,
and_(
Annotation.groupid == Group.pubid,
Annotation.deleted.is_(False),
),
)
.group_by(Group.id)
.having(func.count(Annotation.id) == 0),
)

def delete_group_memberships(self, user):
"""
Delete `user`'s group memberships.

Known issue: this can leave groups that were created by `user` in an
odd state - `user` will no longer be a member of the group but will
still be its creator. But this state can occur in other ways as well,
for example the web interface currently allows a group's creator to
leave the group.

If `delete_group_creators()` (below) is called after this method then
the situation will be only temporary: `user` will soon be removed as
the group's creator as well.
"""
self.worker.delete(
GroupMembership,
select(GroupMembership.id)
.where(GroupMembership.user_id == user.id)
.join(Group, GroupMembership.group_id == Group.id),
)

def delete_group_creators(self, user):
"""
Delete `user` as the creator of any groups they created.

Known issue: this will leave groups in an odd state - with no creator
(group.creator = None).
"""
self.worker.update(
Group, select(Group.id).where(Group.creator == user), {"creator_id": None}
)
Copy link
Contributor Author

@seanh seanh May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods related to deleting groups should probably be moved to GroupDeleteService.

GroupDeleteService already has its own method for deleting a group (it's only used by the admin pages: users can't delete groups) but it deletes all the group's annotations (which is not what we want here) and isn't scalable (it tries to delete everything all at once, which will time-out for large groups).

@seanh seanh requested a review from marcospri May 29, 2024 12:11
Copy link
Member

@marcospri marcospri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If done another round of (lighter) testing and all looks good.

seanh added 2 commits May 30, 2024 09:45
Not sure how this was passing coverage before but it's not now.
@seanh seanh merged commit 41e805b into main May 30, 2024
9 checks passed
@seanh seanh deleted the fix-user-delete branch May 30, 2024 08:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants