Skip to content

Commit

Permalink
Initial implementation of alerting framework. (#2631)
Browse files Browse the repository at this point in the history
  • Loading branch information
scudette committed Apr 18, 2023
1 parent 38332f7 commit e8eadcc
Show file tree
Hide file tree
Showing 23 changed files with 480 additions and 29 deletions.
9 changes: 9 additions & 0 deletions artifacts/definitions/Server/Internal/Alerts.yaml
@@ -0,0 +1,9 @@
name: Server.Internal.Alerts
description: |
An internal event queue for alerts. All alerts sent from clients are
collected in this event queue.
Alerts are expected to be low frequency and high value and may be
generated client or server side.
type: SERVER_EVENT
11 changes: 5 additions & 6 deletions artifacts/definitions/Server/Utils/DeleteManyFlows.yaml
Expand Up @@ -20,7 +20,7 @@ parameters:
description: If specified only target these hosts
type: regex
- name: DateBefore
default: "2022-01-01"
description: Only select flows created before this date. If not set we choose all flows.
type: timestamp
- name: CreatorRegex
default: "H\\..+"
Expand All @@ -33,6 +33,7 @@ parameters:

sources:
- query: |
LET DateBefore <= DateBefore || now()
LET hits = SELECT * FROM foreach(row={
SELECT client_id,
os_info.hostname AS hostname
Expand All @@ -55,11 +56,9 @@ sources:
SELECT * FROM foreach(row=hits,
query={
SELECT client_id, hostname, creator,
session_id, artifacts, created, Type, deleted
FROM Artifact.Server.Utils.DeleteFlow(
ClientId=client_id,
FlowId=session_id,
ReallyDoIt=ReallyDoIt)
session_id, artifacts, created, Type, Data, Error
FROM delete_flow(client_id=client_id,
flow_id=session_id, really_do_it=ReallyDoIt)
WHERE log(message=format(format="Deleting flow %v from %v",
args=[session_id, hostname]))
}, workers=10)
Expand Down
85 changes: 83 additions & 2 deletions flows/client_flow_runner.go
Expand Up @@ -152,6 +152,7 @@ func (self *ClientFlowRunner) MonitoringLogMessage(
if err != nil {
return err
}
log_path_manager.Clock = utils.GetTime()

// Write the logs asynchronously
file_store_factory := file_store.GetFileStore(self.config_obj)
Expand All @@ -168,9 +169,47 @@ func (self *ClientFlowRunner) MonitoringLogMessage(

rs_writer.WriteJSONL([]byte(payload), int(response.NumberOfRows))

if response.Level == logging.ALERT {
return self.processMonitoringAlert(ctx, client_id, artifact_name, response)
}

return nil
}

func (self *ClientFlowRunner) processMonitoringAlert(
ctx context.Context, client_id, artifact string,
msg *crypto_proto.LogMessage) error {

tmp := &log_message{}
err := json.Unmarshal([]byte(msg.Jsonl), tmp)
if err != nil {
return err
}

alert := &services.AlertMessage{}
err = json.Unmarshal([]byte(tmp.Message), alert)
if err != nil {
return err
}

alert.ClientId = client_id
alert.Artifact = artifact
alert.ArtifactType = "CLIENT_EVENT"

serialized, err := json.Marshal(alert)
if err != nil {
return err
}
serialized = append(serialized, '\n')

journal, err := services.GetJournal(self.config_obj)
if err != nil {
return err
}
return journal.PushJsonlToArtifact(ctx, self.config_obj,
serialized, 1, "Server.Internal.Alerts", "server", "")
}

func (self *ClientFlowRunner) MonitoringVQLResponse(
ctx context.Context, client_id, flow_id string,
response *actions_proto.VQLResponse) error {
Expand Down Expand Up @@ -224,7 +263,7 @@ func (self *ClientFlowRunner) ProcessSingleMessage(
}

if msg.LogMessage != nil {
err := self.LogMessage(client_id, flow_id, msg.LogMessage)
err := self.LogMessage(ctx, client_id, flow_id, msg.LogMessage)
if err != nil {
return fmt.Errorf("LogMessage: %w", err)
}
Expand Down Expand Up @@ -469,6 +508,7 @@ func (self *ClientFlowRunner) VQLResponse(
if err != nil {
return err
}
path_manager.Clock = utils.GetTime()

file_store_factory := file_store.GetFileStore(self.config_obj)
rs_writer, err := result_sets.NewResultSetWriter(
Expand All @@ -486,8 +526,45 @@ func (self *ClientFlowRunner) VQLResponse(
return nil
}

type log_message struct {
Message string `json:"message"`
}

func (self *ClientFlowRunner) processAlert(
ctx context.Context, client_id, flow_id string,
msg *crypto_proto.LogMessage) error {

tmp := &log_message{}
err := json.Unmarshal([]byte(msg.Jsonl), tmp)
if err != nil {
return err
}

alert := &services.AlertMessage{}
err = json.Unmarshal([]byte(tmp.Message), alert)
if err != nil {
return err
}

alert.ClientId = client_id
alert.FlowId = flow_id

serialized, err := json.Marshal(alert)
if err != nil {
return err
}
serialized = append(serialized, '\n')

journal, err := services.GetJournal(self.config_obj)
if err != nil {
return err
}
return journal.PushJsonlToArtifact(ctx, self.config_obj,
serialized, 1, "Server.Internal.Alerts", "server", "")
}

func (self *ClientFlowRunner) LogMessage(
client_id, flow_id string,
ctx context.Context, client_id, flow_id string,
msg *crypto_proto.LogMessage) error {

flow_path_manager := paths.NewFlowPathManager(client_id, flow_id).Log()
Expand All @@ -510,6 +587,10 @@ func (self *ClientFlowRunner) LogMessage(

rs_writer.WriteJSONL([]byte(payload), uint64(msg.NumberOfRows))

if msg.Level == logging.ALERT {
return self.processAlert(ctx, client_id, flow_id, msg)
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions gui/velociraptor/src/components/utils/log_level.jsx
Expand Up @@ -15,6 +15,7 @@ export default class LogLevel extends Component {
case "ERROR":
case "INFO":
case "DEBUG":
case "ALERT":
icon = T(this.props.type);
break;
default:
Expand Down
4 changes: 4 additions & 0 deletions logging/levels.go
Expand Up @@ -7,4 +7,8 @@ const (
INFO = "INFO"
WARNING = "WARN"
DEBUG = "DEBUG"

// An alert is a special type of log message which is routed by
// the server into the alert queue.
ALERT = "ALERT"
)
2 changes: 1 addition & 1 deletion logging/logging.go
Expand Up @@ -420,7 +420,7 @@ func SplitIntoLevelAndLog(b []byte) (level, message string) {
if len(parts) == 2 {
level := strings.ToUpper(parts[0])
switch level {
case DEFAULT, ERROR, INFO, WARNING, DEBUG:
case DEFAULT, ERROR, INFO, WARNING, DEBUG, ALERT:
return level, parts[1]
}
}
Expand Down
33 changes: 32 additions & 1 deletion responder/flow_context.go
Expand Up @@ -281,7 +281,38 @@ func (self *FlowContext) flushLogMessages(ctx context.Context) {
}
}

func (self *FlowContext) AddLogMessage(level string, msg string) {
// Alert messages are sent in their own packet because the server will
// redirect them into the alert queue.
func (self *FlowContext) sendAlertMessage(
ctx context.Context, level string,
// msg containes serialized services.AlertMessage
msg string) {

self.mu.Lock()
id := self.log_messages_id
self.log_messages_id++
self.mu.Unlock()

self.output <- &crypto_proto.VeloMessage{
SessionId: self.flow_id,
RequestId: constants.LOG_SINK,
LogMessage: &crypto_proto.LogMessage{
Id: int64(id),
NumberOfRows: 1,
Jsonl: json.Format(
"{\"client_time\":%d,\"level\":%q,\"message\":%q}\n",
int(utils.GetTime().Now().Unix()), level, msg),
Level: logging.ALERT,
}}
}

func (self *FlowContext) AddLogMessage(
ctx context.Context, level string, msg string) {
if level == logging.ALERT {
self.sendAlertMessage(ctx, level, msg)
return
}

self.mu.Lock()
defer self.mu.Unlock()

Expand Down
38 changes: 36 additions & 2 deletions responder/monitoring.go
Expand Up @@ -95,14 +95,48 @@ func (self *MonitoringContext) NextUploadId() int64 {
return result
}

func (self *MonitoringContext) AddLogMessage(level string, msg string) {
// Alert messages are sent in their own packet because the server will
// redirect them into the alert queue.
func (self *MonitoringContext) sendAlertMessage(
ctx context.Context, level string,

// msg containes serialized services.AlertMessage
msg string) {

self.mu.Lock()
id := self.log_messages_id
self.log_messages_id++
self.mu.Unlock()

self.output <- &crypto_proto.VeloMessage{
SessionId: "F.Monitoring",
RequestId: constants.LOG_SINK,
LogMessage: &crypto_proto.LogMessage{
Id: int64(id),
NumberOfRows: 1,
Jsonl: json.Format(
"{\"client_time\":%d,\"level\":%q,\"message\":%q}\n",
int(utils.GetTime().Now().Unix()), level, msg),
Level: logging.ALERT,
Artifact: self.artifact,
}}
}

func (self *MonitoringContext) AddLogMessage(
ctx context.Context, level string, msg string) {
if level == logging.ALERT {
self.sendAlertMessage(ctx, level, msg)
return
}

self.mu.Lock()
defer self.mu.Unlock()

self.log_message_count++
self.log_messages = append(self.log_messages, json.Format(
"{\"client_time\":%d,\"level\":%q,\"message\":%q}\n",
int(utils.GetTime().Now().Unix()), level, msg)...)

}

func (self *MonitoringContext) getLogMessages() (
Expand Down Expand Up @@ -205,7 +239,7 @@ func (self *MonitoringResponder) Return(ctx context.Context) {}

// Logs will be batched.
func (self *MonitoringResponder) Log(ctx context.Context, level string, msg string) {
self.monitoring_context.AddLogMessage(level, msg)
self.monitoring_context.AddLogMessage(ctx, level, msg)
}

func (self *MonitoringResponder) NextUploadId() int64 {
Expand Down
2 changes: 1 addition & 1 deletion responder/responder.go
Expand Up @@ -206,7 +206,7 @@ func (self *FlowResponder) Return(ctx context.Context) {
func (self *FlowResponder) Log(ctx context.Context, level string, msg string) {
// We dont need to hold the lock because we are just delegating to
// the flow context.
self.flow_context.AddLogMessage(level, msg)
self.flow_context.AddLogMessage(ctx, level, msg)

// Capture the first message at error level.
// FIXME: Support server provided error regex patterns
Expand Down
6 changes: 6 additions & 0 deletions server/fixtures/TestMonitoringAlerts.golden
@@ -0,0 +1,6 @@
{
"/server_artifacts/Server.Internal.Alerts/2020-10-07.json": [
"{\"client_id\":\"C.5170dd5921551ec7\",\"name\":\"\",\"timestamp\":\"0001-01-01T00:00:00Z\",\"event_data\":{},\"artifact\":\"Generic.Client.Stats\",\"artifact_type\":\"CLIENT_EVENT\",\"_ts\":1602103388}",
""
]
}

0 comments on commit e8eadcc

Please sign in to comment.