Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vault 2823 cc namespace #12393

Merged
merged 16 commits into from Sep 7, 2021
Merged
91 changes: 50 additions & 41 deletions vault/activity_log.go
Expand Up @@ -124,10 +124,6 @@ type ActivityLog struct {
// Channel to stop background processing
doneCh chan struct{}

// All known active entities this month; use fragmentLock read-locked
// to check whether it already exists.
activeEntities map[string]struct{}

// track metadata and contents of the most recent log segment
currentSegment segmentInfo

Expand All @@ -149,8 +145,14 @@ type ActivityLog struct {
// for testing: is config currently being invalidated. protected by l
configInvalidationInProgress bool

// All known active entity count by namespace ID this month; use fragmentLock read-locked
// entityTracker tracks active entities this month. Protected by fragmentLock.
entityTracker *EntityTracker
}

type EntityTracker struct {
// All known active entities this month; use fragmentLock read-locked
// to check whether it already exists.
activeEntities map[string]struct{}
entityCountByNamespaceID map[string]uint64
}

Expand All @@ -173,18 +175,20 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
}

a := &ActivityLog{
core: core,
configOverrides: &core.activityLogConfig,
logger: logger,
view: view,
metrics: metrics,
nodeID: hostname,
newFragmentCh: make(chan struct{}, 1),
sendCh: make(chan struct{}, 1), // buffered so it can be triggered by fragment size
writeCh: make(chan struct{}, 1), // same for full segment
doneCh: make(chan struct{}, 1),
activeEntities: make(map[string]struct{}),
entityCountByNamespaceID: make(map[string]uint64),
core: core,
configOverrides: &core.activityLogConfig,
logger: logger,
view: view,
metrics: metrics,
nodeID: hostname,
newFragmentCh: make(chan struct{}, 1),
sendCh: make(chan struct{}, 1), // buffered so it can be triggered by fragment size
writeCh: make(chan struct{}, 1), // same for full segment
doneCh: make(chan struct{}, 1),
entityTracker: &EntityTracker{
activeEntities: make(map[string]struct{}),
entityCountByNamespaceID: make(map[string]uint64),
},
currentSegment: segmentInfo{
startTimestamp: 0,
currentEntities: &activity.EntityActivityLog{
Expand Down Expand Up @@ -541,7 +545,7 @@ func (a *ActivityLog) loadPriorEntitySegment(ctx context.Context, startTime time
// Or the feature has been disabled.
if a.enabled && startTime.Unix() == a.currentSegment.startTimestamp {
for _, ent := range out.Entities {
a.AddEntity(ent)
a.entityTracker.addEntity(ent)
}
}
a.fragmentLock.Unlock()
Expand Down Expand Up @@ -581,7 +585,7 @@ func (a *ActivityLog) loadCurrentEntitySegment(ctx context.Context, startTime ti
}

for _, ent := range out.Entities {
a.AddEntity(ent)
a.entityTracker.addEntity(ent)
}

return nil
Expand Down Expand Up @@ -693,8 +697,8 @@ func (a *ActivityLog) resetCurrentLog() {
a.currentSegment.entitySequenceNumber = 0

a.fragment = nil
a.activeEntities = make(map[string]struct{})
a.entityCountByNamespaceID = make(map[string]uint64)
a.entityTracker.activeEntities = make(map[string]struct{})
a.entityTracker.entityCountByNamespaceID = make(map[string]uint64)
a.standbyFragmentsReceived = make([]*activity.LogFragment, 0)
}

Expand Down Expand Up @@ -1105,8 +1109,8 @@ func (a *ActivityLog) perfStandbyFragmentWorker() {

// clear active entity set
a.fragmentLock.Lock()
a.activeEntities = make(map[string]struct{})
a.entityCountByNamespaceID = make(map[string]uint64)
a.entityTracker.activeEntities = make(map[string]struct{})
a.entityTracker.entityCountByNamespaceID = make(map[string]uint64)
a.fragmentLock.Unlock()

// Set timer for next month.
Expand Down Expand Up @@ -1294,7 +1298,7 @@ func (a *ActivityLog) AddEntityToFragment(entityID string, namespaceID string, t

a.fragmentLock.RLock()
if a.enabled {
_, present = a.activeEntities[entityID]
_, present = a.entityTracker.activeEntities[entityID]
} else {
present = true
}
Expand All @@ -1308,21 +1312,20 @@ func (a *ActivityLog) AddEntityToFragment(entityID string, namespaceID string, t
defer a.fragmentLock.Unlock()

// Re-check entity ID after re-acquiring lock
_, present = a.activeEntities[entityID]
_, present = a.entityTracker.activeEntities[entityID]
if present {
return
}

a.createCurrentFragment()

a.fragment.Entities = append(a.fragment.Entities,
&activity.EntityRecord{
EntityID: entityID,
NamespaceID: namespaceID,
Timestamp: timestamp,
})
a.entityCountByNamespaceID[namespaceID] += 1
a.activeEntities[entityID] = struct{}{}
entityRecord := &activity.EntityRecord{
EntityID: entityID,
NamespaceID: namespaceID,
Timestamp: timestamp,
}
a.fragment.Entities = append(a.fragment.Entities, entityRecord)
a.entityTracker.addEntity(entityRecord)
}

func (a *ActivityLog) AddTokenToFragment(namespaceID string) {
Expand Down Expand Up @@ -1366,7 +1369,7 @@ func (a *ActivityLog) receivedFragment(fragment *activity.LogFragment) {
}

for _, e := range fragment.Entities {
a.AddEntity(e)
a.entityTracker.addEntity(e)
}

a.standbyFragmentsReceived = append(a.standbyFragmentsReceived, fragment)
Expand Down Expand Up @@ -1783,7 +1786,7 @@ func (a *ActivityLog) PartialMonthMetrics(ctx context.Context) ([]metricsutil.Ga
// Empty list
return []metricsutil.GaugeLabelValues{}, nil
}
count := len(a.activeEntities)
count := len(a.entityTracker.activeEntities)

return []metricsutil.GaugeLabelValues{
{
Expand Down Expand Up @@ -1818,7 +1821,7 @@ func (a *ActivityLog) partialMonthClientCount(ctx context.Context) (map[string]i
totalEntities := 0
totalTokens := 0

clientCountTable := createClientCountTable(a.entityCountByNamespaceID, a.currentSegment.tokenCount.CountByNamespaceID)
clientCountTable := createClientCountTable(a.entityTracker.entityCountByNamespaceID, a.currentSegment.tokenCount.CountByNamespaceID)

queryNS, err := namespace.FromContext(ctx)
if err != nil {
Expand All @@ -1832,6 +1835,8 @@ func (a *ActivityLog) partialMonthClientCount(ctx context.Context) (map[string]i
return responseData, err
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be return nil, err.

}

// Only include namespaces that are the queryNS or within it. If queryNS is the
// root namespace, include all namespaces, even those which have been deleted.
if a.includeInResponse(queryNS, ns) {
var displayPath string
if ns == nil {
Expand All @@ -1849,12 +1854,17 @@ func (a *ActivityLog) partialMonthClientCount(ctx context.Context) (map[string]i
Clients: int(clients.distinctEntities + clients.nonEntityTokens),
},
})

totalEntities += int(clients.distinctEntities)
totalTokens += int(clients.nonEntityTokens)

}
}

sort.Slice(byNamespace, func(i, j int) bool {
return byNamespace[i].NamespaceID < byNamespace[j].NamespaceID
})

responseData["by_namespace"] = byNamespace
responseData["distinct_entities"] = totalEntities
responseData["non_entity_tokens"] = totalTokens
Expand Down Expand Up @@ -1886,10 +1896,9 @@ func createClientCountTable(entityMap map[string]uint64, tokenMap map[string]uin

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

}

func (a *ActivityLog) AddEntity(e *activity.EntityRecord) {
if _, ok := a.activeEntities[e.EntityID]; !ok {
a.activeEntities[e.EntityID] = struct{}{}
a.entityCountByNamespaceID[e.NamespaceID] += 1
func (et *EntityTracker) addEntity(e *activity.EntityRecord) {
if _, ok := et.activeEntities[e.EntityID]; !ok {
et.activeEntities[e.EntityID] = struct{}{}
et.entityCountByNamespaceID[e.NamespaceID] += 1
}

}