Skip to content

Commit

Permalink
Vault 2823 cc namespace (#12393)
Browse files Browse the repository at this point in the history
* vault-2823 adding changes

* VAULT-2823 adding alias

* Vault-2823 addressing comments

* Vault-2823 removing comments

* Vault-2823 removing comments

* vault-2823 removing q debug

* adding changelog

* Vault-2823 updating external test

* adding approved changes

* fixing returns

* fixing returns
  • Loading branch information
akshya96 committed Sep 7, 2021
1 parent 1a2f420 commit 650cf8a
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 90 deletions.
3 changes: 3 additions & 0 deletions changelog/12393.txt
@@ -0,0 +1,3 @@
```release-note: improvement
core: observe the client counts broken down by namespace for partial month client count
```
148 changes: 116 additions & 32 deletions vault/activity_log.go
Expand Up @@ -67,6 +67,11 @@ type segmentInfo struct {
entitySequenceNumber uint64
}

type clients struct {
distinctEntities uint64
nonEntityTokens uint64
}

// ActivityLog tracks unique entity counts and non-entity token counts.
// It handles assembling log fragments (and sending them to the active
// node), writing log segments, and precomputing queries.
Expand Down Expand Up @@ -119,10 +124,6 @@ type ActivityLog struct {
// Channel to stop background processing
doneCh chan struct{}

// All known active entities this month; use fragmentLock read-locked
// to check whether it already exists.
activeEntities map[string]struct{}

// track metadata and contents of the most recent log segment
currentSegment segmentInfo

Expand All @@ -143,6 +144,16 @@ type ActivityLog struct {

// for testing: is config currently being invalidated. protected by l
configInvalidationInProgress bool

// entityTracker tracks active entities this month. Protected by fragmentLock.
entityTracker *EntityTracker
}

type EntityTracker struct {
// All known active entities this month; use fragmentLock read-locked
// to check whether it already exists.
activeEntities map[string]struct{}
entityCountByNamespaceID map[string]uint64
}

// These non-persistent configuration options allow us to disable
Expand Down Expand Up @@ -174,7 +185,10 @@ func NewActivityLog(core *Core, logger log.Logger, view *BarrierView, metrics me
sendCh: make(chan struct{}, 1), // buffered so it can be triggered by fragment size
writeCh: make(chan struct{}, 1), // same for full segment
doneCh: make(chan struct{}, 1),
activeEntities: make(map[string]struct{}),
entityTracker: &EntityTracker{
activeEntities: make(map[string]struct{}),
entityCountByNamespaceID: make(map[string]uint64),
},
currentSegment: segmentInfo{
startTimestamp: 0,
currentEntities: &activity.EntityActivityLog{
Expand Down Expand Up @@ -531,7 +545,7 @@ func (a *ActivityLog) loadPriorEntitySegment(ctx context.Context, startTime time
// Or the feature has been disabled.
if a.enabled && startTime.Unix() == a.currentSegment.startTimestamp {
for _, ent := range out.Entities {
a.activeEntities[ent.EntityID] = struct{}{}
a.entityTracker.addEntity(ent)
}
}
a.fragmentLock.Unlock()
Expand Down Expand Up @@ -571,7 +585,7 @@ func (a *ActivityLog) loadCurrentEntitySegment(ctx context.Context, startTime ti
}

for _, ent := range out.Entities {
a.activeEntities[ent.EntityID] = struct{}{}
a.entityTracker.addEntity(ent)
}

return nil
Expand Down Expand Up @@ -683,7 +697,8 @@ func (a *ActivityLog) resetCurrentLog() {
a.currentSegment.entitySequenceNumber = 0

a.fragment = nil
a.activeEntities = make(map[string]struct{})
a.entityTracker.activeEntities = make(map[string]struct{})
a.entityTracker.entityCountByNamespaceID = make(map[string]uint64)
a.standbyFragmentsReceived = make([]*activity.LogFragment, 0)
}

Expand Down Expand Up @@ -1094,7 +1109,8 @@ func (a *ActivityLog) perfStandbyFragmentWorker() {

// clear active entity set
a.fragmentLock.Lock()
a.activeEntities = make(map[string]struct{})
a.entityTracker.activeEntities = make(map[string]struct{})
a.entityTracker.entityCountByNamespaceID = make(map[string]uint64)
a.fragmentLock.Unlock()

// Set timer for next month.
Expand Down Expand Up @@ -1282,7 +1298,7 @@ func (a *ActivityLog) AddEntityToFragment(entityID string, namespaceID string, t

a.fragmentLock.RLock()
if a.enabled {
_, present = a.activeEntities[entityID]
_, present = a.entityTracker.activeEntities[entityID]
} else {
present = true
}
Expand All @@ -1296,20 +1312,20 @@ func (a *ActivityLog) AddEntityToFragment(entityID string, namespaceID string, t
defer a.fragmentLock.Unlock()

// Re-check entity ID after re-acquiring lock
_, present = a.activeEntities[entityID]
_, present = a.entityTracker.activeEntities[entityID]
if present {
return
}

a.createCurrentFragment()

a.fragment.Entities = append(a.fragment.Entities,
&activity.EntityRecord{
EntityID: entityID,
NamespaceID: namespaceID,
Timestamp: timestamp,
})
a.activeEntities[entityID] = struct{}{}
entityRecord := &activity.EntityRecord{
EntityID: entityID,
NamespaceID: namespaceID,
Timestamp: timestamp,
}
a.fragment.Entities = append(a.fragment.Entities, entityRecord)
a.entityTracker.addEntity(entityRecord)
}

func (a *ActivityLog) AddTokenToFragment(namespaceID string) {
Expand Down Expand Up @@ -1353,7 +1369,7 @@ func (a *ActivityLog) receivedFragment(fragment *activity.LogFragment) {
}

for _, e := range fragment.Entities {
a.activeEntities[e.EntityID] = struct{}{}
a.entityTracker.addEntity(e)
}

a.standbyFragmentsReceived = append(a.standbyFragmentsReceived, fragment)
Expand Down Expand Up @@ -1770,7 +1786,7 @@ func (a *ActivityLog) PartialMonthMetrics(ctx context.Context) ([]metricsutil.Ga
// Empty list
return []metricsutil.GaugeLabelValues{}, nil
}
count := len(a.activeEntities)
count := len(a.entityTracker.activeEntities)

return []metricsutil.GaugeLabelValues{
{
Expand All @@ -1792,26 +1808,94 @@ func (c *Core) activeEntityGaugeCollector(ctx context.Context) ([]metricsutil.Ga

// partialMonthClientCount returns the number of clients used so far this month.
// If activity log is not enabled, the response will be nil
func (a *ActivityLog) partialMonthClientCount(ctx context.Context) map[string]interface{} {
func (a *ActivityLog) partialMonthClientCount(ctx context.Context) (map[string]interface{}, error) {
a.fragmentLock.RLock()
defer a.fragmentLock.RUnlock()

if !a.enabled {
// nothing to count
return nil
return nil, nil
}
byNamespace := make([]*ClientCountInNamespace, 0)
responseData := make(map[string]interface{})
totalEntities := 0
totalTokens := 0

clientCountTable := createClientCountTable(a.entityTracker.entityCountByNamespaceID, a.currentSegment.tokenCount.CountByNamespaceID)

entityCount := len(a.activeEntities)
var tokenCount int
for _, countByNS := range a.currentSegment.tokenCount.CountByNamespaceID {
tokenCount += int(countByNS)
queryNS, err := namespace.FromContext(ctx)
if err != nil {
return nil, err
}
clientCount := entityCount + tokenCount

responseData := make(map[string]interface{})
responseData["distinct_entities"] = entityCount
responseData["non_entity_tokens"] = tokenCount
responseData["clients"] = clientCount
for nsID, clients := range clientCountTable {
ns, err := NamespaceByID(ctx, nsID, a.core)
if err != nil {
return nil, err
}

// Only include namespaces that are the queryNS or within it. If queryNS is the
// root namespace, include all namespaces, even those which have been deleted.
if a.includeInResponse(queryNS, ns) {
var displayPath string
if ns == nil {
displayPath = fmt.Sprintf("deleted namespace %q", nsID)
} else {
displayPath = ns.Path
}

byNamespace = append(byNamespace, &ClientCountInNamespace{
NamespaceID: nsID,
NamespacePath: displayPath,
Counts: ClientCountResponse{
DistinctEntities: int(clients.distinctEntities),
NonEntityTokens: int(clients.nonEntityTokens),
Clients: int(clients.distinctEntities + clients.nonEntityTokens),
},
})

totalEntities += int(clients.distinctEntities)
totalTokens += int(clients.nonEntityTokens)

}
}

sort.Slice(byNamespace, func(i, j int) bool {
return byNamespace[i].NamespaceID < byNamespace[j].NamespaceID
})

return responseData
responseData["by_namespace"] = byNamespace
responseData["distinct_entities"] = totalEntities
responseData["non_entity_tokens"] = totalTokens
responseData["clients"] = totalEntities + totalTokens

return responseData, nil
}

//createClientCountTable maps the entitycount and token count to the namespace id
func createClientCountTable(entityMap map[string]uint64, tokenMap map[string]uint64) map[string]*clients {
clientCountTable := make(map[string]*clients)
for nsID, count := range entityMap {
if _, ok := clientCountTable[nsID]; !ok {
clientCountTable[nsID] = &clients{distinctEntities: 0, nonEntityTokens: 0}
}
clientCountTable[nsID].distinctEntities += count

}

for nsID, count := range tokenMap {
if _, ok := clientCountTable[nsID]; !ok {
clientCountTable[nsID] = &clients{distinctEntities: 0, nonEntityTokens: 0}
}
clientCountTable[nsID].nonEntityTokens += count

}
return clientCountTable
}

func (et *EntityTracker) addEntity(e *activity.EntityRecord) {
if _, ok := et.activeEntities[e.EntityID]; !ok {
et.activeEntities[e.EntityID] = struct{}{}
et.entityCountByNamespaceID[e.NamespaceID] += 1
}
}

0 comments on commit 650cf8a

Please sign in to comment.