Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vault 2823 cc namespace #12393

Merged
merged 16 commits into from Sep 7, 2021
Merged
3 changes: 3 additions & 0 deletions changelog/12393.txt
@@ -0,0 +1,3 @@
```release-note: improvement
core: observe the client counts broken down by namespace for partial month client count
```
148 changes: 116 additions & 32 deletions vault/activity_log.go
Expand Up @@ -67,6 +67,11 @@ type segmentInfo struct {
entitySequenceNumber uint64
}

type clients struct {
distinctEntities uint64
nonEntityTokens uint64
}

// ActivityLog tracks unique entity counts and non-entity token counts.
// It handles assembling log fragments (and sending them to the active
// node), writing log segments, and precomputing queries.
Expand Down Expand Up @@ -119,10 +124,6 @@ type ActivityLog struct {
// Channel to stop background processing
doneCh chan struct{}

// All known active entities this month; use fragmentLock read-locked
// to check whether it already exists.
activeEntities map[string]struct{}

// track metadata and contents of the most recent log segment
currentSegment segmentInfo

Expand All @@ -143,6 +144,16 @@ type ActivityLog struct {

// for testing: is config currently being invalidated. protected by l
configInvalidationInProgress bool

// entityTracker tracks active entities this month. Protected by fragmentLock.
entityTracker *EntityTracker
}

type EntityTracker struct {
// All known active entities this month; use fragmentLock read-locked
// to check whether it already exists.
activeEntities map[string]struct{}
entityCountByNamespaceID map[string]uint64
}

// These non-persistent configuration options allow us to disable
Expand Down Expand Up @@ -174,7 +185,10 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
sendCh: make(chan struct{}, 1), // buffered so it can be triggered by fragment size
writeCh: make(chan struct{}, 1), // same for full segment
doneCh: make(chan struct{}, 1),
activeEntities: make(map[string]struct{}),
entityTracker: &EntityTracker{
activeEntities: make(map[string]struct{}),
entityCountByNamespaceID: make(map[string]uint64),
},
currentSegment: segmentInfo{
startTimestamp: 0,
currentEntities: &activity.EntityActivityLog{
Expand Down Expand Up @@ -531,7 +545,7 @@ func (a *ActivityLog) loadPriorEntitySegment(ctx context.Context, startTime time
// Or the feature has been disabled.
if a.enabled && startTime.Unix() == a.currentSegment.startTimestamp {
for _, ent := range out.Entities {
a.activeEntities[ent.EntityID] = struct{}{}
a.entityTracker.addEntity(ent)
}
}
a.fragmentLock.Unlock()
Expand Down Expand Up @@ -571,7 +585,7 @@ func (a *ActivityLog) loadCurrentEntitySegment(ctx context.Context, startTime ti
}

for _, ent := range out.Entities {
a.activeEntities[ent.EntityID] = struct{}{}
a.entityTracker.addEntity(ent)
}

return nil
Expand Down Expand Up @@ -683,7 +697,8 @@ func (a *ActivityLog) resetCurrentLog() {
a.currentSegment.entitySequenceNumber = 0

a.fragment = nil
a.activeEntities = make(map[string]struct{})
a.entityTracker.activeEntities = make(map[string]struct{})
a.entityTracker.entityCountByNamespaceID = make(map[string]uint64)
a.standbyFragmentsReceived = make([]*activity.LogFragment, 0)
}

Expand Down Expand Up @@ -1094,7 +1109,8 @@ func (a *ActivityLog) perfStandbyFragmentWorker() {

// clear active entity set
a.fragmentLock.Lock()
a.activeEntities = make(map[string]struct{})
a.entityTracker.activeEntities = make(map[string]struct{})
a.entityTracker.entityCountByNamespaceID = make(map[string]uint64)
a.fragmentLock.Unlock()

// Set timer for next month.
Expand Down Expand Up @@ -1282,7 +1298,7 @@ func (a *ActivityLog) AddEntityToFragment(entityID string, namespaceID string, t

a.fragmentLock.RLock()
if a.enabled {
_, present = a.activeEntities[entityID]
_, present = a.entityTracker.activeEntities[entityID]
} else {
present = true
}
Expand All @@ -1296,20 +1312,20 @@ func (a *ActivityLog) AddEntityToFragment(entityID string, namespaceID string, t
defer a.fragmentLock.Unlock()

// Re-check entity ID after re-acquiring lock
_, present = a.activeEntities[entityID]
_, present = a.entityTracker.activeEntities[entityID]
if present {
return
}

a.createCurrentFragment()

a.fragment.Entities = append(a.fragment.Entities,
&activity.EntityRecord{
EntityID: entityID,
NamespaceID: namespaceID,
Timestamp: timestamp,
})
a.activeEntities[entityID] = struct{}{}
entityRecord := &activity.EntityRecord{
EntityID: entityID,
NamespaceID: namespaceID,
Timestamp: timestamp,
}
a.fragment.Entities = append(a.fragment.Entities, entityRecord)
a.entityTracker.addEntity(entityRecord)
}

func (a *ActivityLog) AddTokenToFragment(namespaceID string) {
Expand Down Expand Up @@ -1353,7 +1369,7 @@ func (a *ActivityLog) receivedFragment(fragment *activity.LogFragment) {
}

for _, e := range fragment.Entities {
a.activeEntities[e.EntityID] = struct{}{}
a.entityTracker.addEntity(e)
}

a.standbyFragmentsReceived = append(a.standbyFragmentsReceived, fragment)
Expand Down Expand Up @@ -1770,7 +1786,7 @@ func (a *ActivityLog) PartialMonthMetrics(ctx context.Context) ([]metricsutil.Ga
// Empty list
return []metricsutil.GaugeLabelValues{}, nil
}
count := len(a.activeEntities)
count := len(a.entityTracker.activeEntities)

return []metricsutil.GaugeLabelValues{
{
Expand All @@ -1792,26 +1808,94 @@ func (c *Core) activeEntityGaugeCollector(ctx context.Context) ([]metricsutil.Ga

// partialMonthClientCount returns the number of clients used so far this month.
// If activity log is not enabled, the response will be nil
func (a *ActivityLog) partialMonthClientCount(ctx context.Context) map[string]interface{} {
func (a *ActivityLog) partialMonthClientCount(ctx context.Context) (map[string]interface{}, error) {
a.fragmentLock.RLock()
defer a.fragmentLock.RUnlock()

if !a.enabled {
// nothing to count
return nil
return nil, nil
}
byNamespace := make([]*ClientCountInNamespace, 0)
responseData := make(map[string]interface{})
totalEntities := 0
totalTokens := 0

clientCountTable := createClientCountTable(a.entityTracker.entityCountByNamespaceID, a.currentSegment.tokenCount.CountByNamespaceID)

entityCount := len(a.activeEntities)
var tokenCount int
for _, countByNS := range a.currentSegment.tokenCount.CountByNamespaceID {
tokenCount += int(countByNS)
queryNS, err := namespace.FromContext(ctx)
if err != nil {
return nil, err
}
clientCount := entityCount + tokenCount

responseData := make(map[string]interface{})
responseData["distinct_entities"] = entityCount
responseData["non_entity_tokens"] = tokenCount
responseData["clients"] = clientCount
for nsID, clients := range clientCountTable {
ns, err := NamespaceByID(ctx, nsID, a.core)
if err != nil {
return nil, err
}

// Only include namespaces that are the queryNS or within it. If queryNS is the
// root namespace, include all namespaces, even those which have been deleted.
if a.includeInResponse(queryNS, ns) {
var displayPath string
if ns == nil {
displayPath = fmt.Sprintf("deleted namespace %q", nsID)
} else {
displayPath = ns.Path
}

byNamespace = append(byNamespace, &ClientCountInNamespace{
NamespaceID: nsID,
NamespacePath: displayPath,
Counts: ClientCountResponse{
DistinctEntities: int(clients.distinctEntities),
NonEntityTokens: int(clients.nonEntityTokens),
Clients: int(clients.distinctEntities + clients.nonEntityTokens),
},
})

totalEntities += int(clients.distinctEntities)
totalTokens += int(clients.nonEntityTokens)

}
}

sort.Slice(byNamespace, func(i, j int) bool {
return byNamespace[i].NamespaceID < byNamespace[j].NamespaceID
})

return responseData
responseData["by_namespace"] = byNamespace
responseData["distinct_entities"] = totalEntities
responseData["non_entity_tokens"] = totalTokens
responseData["clients"] = totalEntities + totalTokens

return responseData, nil
}

//createClientCountTable maps the entitycount and token count to the namespace id
func createClientCountTable(entityMap map[string]uint64, tokenMap map[string]uint64) map[string]*clients {
clientCountTable := make(map[string]*clients)
for nsID, count := range entityMap {
if _, ok := clientCountTable[nsID]; !ok {
clientCountTable[nsID] = &clients{distinctEntities: 0, nonEntityTokens: 0}
}
clientCountTable[nsID].distinctEntities += count

}

for nsID, count := range tokenMap {
if _, ok := clientCountTable[nsID]; !ok {
clientCountTable[nsID] = &clients{distinctEntities: 0, nonEntityTokens: 0}
}
clientCountTable[nsID].nonEntityTokens += count

}
return clientCountTable
}

func (et *EntityTracker) addEntity(e *activity.EntityRecord) {
if _, ok := et.activeEntities[e.EntityID]; !ok {
et.activeEntities[e.EntityID] = struct{}{}
et.entityCountByNamespaceID[e.NamespaceID] += 1
}
}