Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(messenger): don't dispatch useless member updates #3837

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 13 additions & 7 deletions go/internal/messengerdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,7 @@ func (d *DBWrapper) AddMember(memberPK, groupPK, displayName, avatarCID string,
}
member.DisplayName = displayName

return tx.db.Create(&member).Error
return tx.db.Create(member).Error
}); errors.Is(err, errcode.ErrDBEntryAlreadyExists) {
return member, err
} else if err != nil {
Expand Down Expand Up @@ -1140,6 +1140,9 @@ func (d *DBWrapper) UpsertMember(memberPK, groupPK string, m messengertypes.Memb
if err != nil {
return nil, false, errcode.ErrDBRead.Wrap(err)
}
if um.Equals(em) {
um = nil
}

commonDetails := []tyber.StepMutator{tyber.WithJSONDetail("FinalMember", um)}
if isNew {
Expand Down Expand Up @@ -1741,16 +1744,19 @@ func (d *DBWrapper) CreateOrUpdateReaction(reaction *messengertypes.Reaction) (b
return updated, nil
}

func (d *DBWrapper) MarkMemberAsConversationCreator(memberPK, conversationPK string) error {
func (d *DBWrapper) MarkMemberAsConversationCreator(memberPK, conversationPK string) (*messengertypes.Member, error) {
member, err := d.GetMemberByPK(memberPK, conversationPK)
if err != nil {
return errcode.ErrDBRead.Wrap(err)
return nil, errcode.ErrDBRead.Wrap(err)
}

member.IsCreator = true
if err := d.db.Save(member).Error; err != nil {
return errcode.ErrDBWrite.Wrap(err)
if !member.IsCreator {
member.IsCreator = true
if err := d.db.Save(member).Error; err != nil {
return nil, errcode.ErrDBWrite.Wrap(err)
}
return member, nil
}

return nil
return nil, nil
}
179 changes: 94 additions & 85 deletions go/internal/messengerpayloads/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ func (h *EventHandler) contactRequestAccepted(contact *mt.Contact, memberPK []by
func (h *EventHandler) multiMemberGroupInitialMemberAnnounced(gme *protocoltypes.GroupMetadataEvent) error {
var ev protocoltypes.MultiMemberInitialMember
if err := proto.Unmarshal(gme.GetEvent(), &ev); err != nil {
return err
return errcode.ErrInvalidInput.Wrap(err)
}

mpkb := ev.GetMemberPK()
Expand All @@ -642,6 +642,9 @@ func (h *EventHandler) multiMemberGroupInitialMemberAnnounced(gme *protocoltypes
if err := h.db.TX(h.ctx, func(tx *messengerdb.DBWrapper) error {
// create or update member

var update *mt.Member
isNew := false

member, err := tx.GetMemberByPK(mpk, gpk)
if err != gorm.ErrRecordNotFound && err != nil {
return errcode.ErrDBRead.Wrap(err)
Expand All @@ -655,31 +658,27 @@ func (h *EventHandler) multiMemberGroupInitialMemberAnnounced(gme *protocoltypes

isMe := bytes.Equal(ownMemberPK, mpkb)

if _, err := tx.AddMember(mpk, gpk, "", "", isMe, true); err != nil {
if update, err = tx.AddMember(mpk, gpk, "", "", isMe, true); err != nil {
return errcode.ErrDBWrite.Wrap(err)
}
} else if err := tx.MarkMemberAsConversationCreator(member.PublicKey, gpk); err != nil {

isNew = true
} else if update, err = tx.MarkMemberAsConversationCreator(member.PublicKey, gpk); err != nil {
return errcode.ErrDBWrite.Wrap(err)
}

return nil
}); err != nil {
return errcode.ErrDBWrite.Wrap(err)
}

// dispatch update
{
member, err := h.db.GetMemberByPK(mpk, gpk)
if err != nil {
return errcode.ErrDBRead.Wrap(err)
}
if update != nil {
err = h.dispatcher.StreamEvent(mt.StreamEvent_TypeMemberUpdated, &mt.StreamEvent_MemberUpdated{Member: update}, isNew)
if err != nil {
return err
}

err = h.dispatcher.StreamEvent(mt.StreamEvent_TypeMemberUpdated, &mt.StreamEvent_MemberUpdated{Member: member}, true)
if err != nil {
return err
h.logger.Info("dispatched member update", zap.Any("member", member), zap.Bool("isNew", isNew))
}

h.logger.Info("dispatched member update", zap.Any("member", member), zap.Bool("isNew", true))
return nil
}); err != nil {
return errcode.ErrDBWrite.Wrap(err)
}

return nil
Expand All @@ -692,7 +691,7 @@ func (h *EventHandler) multiMemberGroupInitialMemberAnnounced(gme *protocoltypes
func (h *EventHandler) groupMemberDeviceAdded(gme *protocoltypes.GroupMetadataEvent) error {
var ev protocoltypes.GroupAddMemberDevice
if err := proto.Unmarshal(gme.GetEvent(), &ev); err != nil {
return err
return errcode.ErrInvalidInput.Wrap(err)
}

mpkb := ev.GetMemberPK()
Expand All @@ -715,86 +714,94 @@ func (h *EventHandler) groupMemberDeviceAdded(gme *protocoltypes.GroupMetadataEv

isMe := bytes.Equal(ownMemberPK, mpkb)

// Register device if not already known
if _, err := h.db.GetDeviceByPK(dpk); errors.Is(err, errcode.ErrNotFound) || errors.Is(err, gorm.ErrRecordNotFound) {
device, err := h.db.AddDevice(dpk, mpk)
if err != nil {
return err
}
if err := h.db.TX(h.ctx, func(d *messengerdb.DBWrapper) error {
// Register device if not already known
if _, err := d.GetDeviceByPK(dpk); errors.Is(err, errcode.ErrNotFound) || errors.Is(err, gorm.ErrRecordNotFound) {
device, err := d.AddDevice(dpk, mpk)
if err != nil {
return err
}

err = h.dispatcher.StreamEvent(mt.StreamEvent_TypeDeviceUpdated, &mt.StreamEvent_DeviceUpdated{Device: device}, true)
if err != nil {
h.logger.Error("error dispatching device updated", zap.Error(err))
err = h.dispatcher.StreamEvent(mt.StreamEvent_TypeDeviceUpdated, &mt.StreamEvent_DeviceUpdated{Device: device}, true)
if err != nil {
h.logger.Error("error dispatching device updated", zap.Error(err))
}
}
}

// Check whether a contact request has been accepted (a device from the contact has been added to the group)
if contact, err := h.db.GetContactByPK(mpk); err == nil && contact.GetState() == mt.Contact_OutgoingRequestSent {
if err := h.contactRequestAccepted(contact, mpkb); err != nil {
return err
// Check whether a contact request has been accepted (a device from the contact has been added to the group)
if contact, err := d.GetContactByPK(mpk); err == nil && contact.GetState() == mt.Contact_OutgoingRequestSent {
if err := h.contactRequestAccepted(contact, mpkb); err != nil {
return err
}
}
}

// check backlogs
userInfo := (*mt.AppMessage_SetUserInfo)(nil)
{
backlog, err := h.db.AttributeBacklogInteractions(dpk, gpk, mpk)
if err != nil {
return err
}
// check backlogs
userInfo := (*mt.AppMessage_SetUserInfo)(nil)
{
backlog, err := d.AttributeBacklogInteractions(dpk, gpk, mpk)
if err != nil {
return err
}

for _, elem := range backlog {
h.logger.Info("found elem in backlog", zap.String("type", elem.GetType().String()), logutil.PrivateString("device-pk", elem.GetDevicePublicKey()), logutil.PrivateString("conv", elem.GetConversationPublicKey()))
for _, elem := range backlog {
h.logger.Info("found elem in backlog", zap.String("type", elem.GetType().String()), logutil.PrivateString("device-pk", elem.GetDevicePublicKey()), logutil.PrivateString("conv", elem.GetConversationPublicKey()))

elem.MemberPublicKey = mpk
elem.MemberPublicKey = mpk

switch elem.GetType() {
case mt.AppMessage_TypeSetUserInfo:
var payload mt.AppMessage_SetUserInfo
switch elem.GetType() {
case mt.AppMessage_TypeSetUserInfo:
var payload mt.AppMessage_SetUserInfo

if err := proto.Unmarshal(elem.GetPayload(), &payload); err != nil {
return err
}
if err := proto.Unmarshal(elem.GetPayload(), &payload); err != nil {
return err
}

userInfo = &payload
userInfo = &payload

if err := h.db.DeleteInteractions([]string{elem.CID}); err != nil {
return err
}
if err := d.DeleteInteractions([]string{elem.CID}); err != nil {
return err
}

if err := h.dispatcher.StreamEvent(mt.StreamEvent_TypeInteractionDeleted, &mt.StreamEvent_InteractionDeleted{CID: elem.GetCID(), ConversationPublicKey: gpk}, false); err != nil {
return err
}
if err := h.dispatcher.StreamEvent(mt.StreamEvent_TypeInteractionDeleted, &mt.StreamEvent_InteractionDeleted{CID: elem.GetCID(), ConversationPublicKey: gpk}, false); err != nil {
return err
}

default:
if err := messengerutil.StreamInteraction(h.dispatcher, h.db, elem.CID, false); err != nil {
return err
default:
if err := messengerutil.StreamInteraction(h.dispatcher, d, elem.CID, false); err != nil {
return err
}
}
}
}
}

member := &mt.Member{
PublicKey: mpk,
ConversationPublicKey: gpk,
IsMe: isMe,
}
if userInfo != nil {
member.DisplayName = userInfo.GetDisplayName()
member.AvatarCID = userInfo.GetAvatarCID()
}
member := &mt.Member{
PublicKey: mpk,
ConversationPublicKey: gpk,
IsMe: isMe,
}
if userInfo != nil {
member.DisplayName = userInfo.GetDisplayName()
member.AvatarCID = userInfo.GetAvatarCID()
}

member, isNew, err := h.db.UpsertMember(mpk, gpk, *member)
if err != nil {
return err
}
update, isNew, err := d.UpsertMember(mpk, gpk, *member)
if err != nil {
return err
}

err = h.dispatcher.StreamEvent(mt.StreamEvent_TypeMemberUpdated, &mt.StreamEvent_MemberUpdated{Member: member}, isNew)
if err != nil {
return err
}
if update != nil {
err = h.dispatcher.StreamEvent(mt.StreamEvent_TypeMemberUpdated, &mt.StreamEvent_MemberUpdated{Member: update}, isNew)
if err != nil {
return err
}

h.logger.Info("dispatched member update", zap.Any("member", member), zap.Bool("isNew", isNew))
h.logger.Info("dispatched member update", zap.Any("member", member), zap.Bool("isNew", isNew))
}

return nil
}); err != nil {
return errcode.ErrDBWrite.Wrap(err)
}

return nil
}
Expand Down Expand Up @@ -973,7 +980,7 @@ func (h *EventHandler) handleAppMessageSetUserInfo(tx *messengerdb.DBWrapper, i
}
h.logger.Debug("interesting member SetUserInfo")

member, isNew, err := tx.UpsertMember(
update, isNew, err := tx.UpsertMember(
i.MemberPublicKey,
i.ConversationPublicKey,
mt.Member{DisplayName: payload.GetDisplayName(), AvatarCID: payload.GetAvatarCID(), InfoDate: i.GetSentDate()},
Expand All @@ -982,12 +989,14 @@ func (h *EventHandler) handleAppMessageSetUserInfo(tx *messengerdb.DBWrapper, i
return nil, false, err
}

err = h.dispatcher.StreamEvent(mt.StreamEvent_TypeMemberUpdated, &mt.StreamEvent_MemberUpdated{Member: member}, isNew)
if err != nil {
return nil, false, err
}
if update != nil {
err = h.dispatcher.StreamEvent(mt.StreamEvent_TypeMemberUpdated, &mt.StreamEvent_MemberUpdated{Member: update}, isNew)
if err != nil {
return nil, false, err
}

h.logger.Info("dispatched member update", zap.Any("member", member), zap.Bool("isNew", isNew))
h.logger.Info("dispatched member update", zap.Any("member", update), zap.Bool("isNew", isNew))
}

return i, false, nil
}
Expand Down
3 changes: 3 additions & 0 deletions go/pkg/bertymessenger/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func TestingInfra(ctx context.Context, t *testing.T, amount int, logger *zap.Log
protocols, cleanup := bertyprotocol.NewTestingProtocolWithMockedPeers(ctx, t, &bertyprotocol.TestingOpts{Logger: logger, Mocknet: mocknet}, nil, amount)
clients := make([]messengertypes.MessengerServiceClient, amount)

// wait for protocol warmup
time.Sleep(1 * time.Second)

// setup client
for i, p := range protocols {
// new messenger service
Expand Down
16 changes: 16 additions & 0 deletions go/pkg/messengertypes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,19 @@ func (am *AppMessage) TextRepresentation() (string, error) {
func (m *AppMessage_UserMessage) TextRepresentation() (string, error) {
return m.GetBody(), nil
}

func (m *Member) Equals(other *Member) bool {
if m == nil && other == nil {
return true
}
if !(m != nil && other != nil) {
return false
}
return m.PublicKey == other.PublicKey &&
m.DisplayName == other.DisplayName &&
m.AvatarCID == other.AvatarCID &&
m.ConversationPublicKey == other.ConversationPublicKey &&
m.IsMe == other.IsMe &&
m.IsCreator == other.IsCreator &&
m.InfoDate == other.InfoDate
}