Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a container_parser operator for container/k8s logs parsing #31959

Closed
ChrsMark opened this issue Mar 26, 2024 · 13 comments · Fixed by #32594
Closed

Introduce a container_parser operator for container/k8s logs parsing #31959

ChrsMark opened this issue Mar 26, 2024 · 13 comments · Fixed by #32594
Assignees
Labels
enhancement New feature or request receiver/filelog

Comments

@ChrsMark
Copy link
Member

ChrsMark commented Mar 26, 2024

Component(s)

receiver/filelog

Is your feature request related to a problem? Please describe.

At the moment the filelog receiver is capable to parse container logs from Kubernetes Pods but it requires lot's of specific configuration in order to support multiple container runtime formats.

As it is mentioned at #25251 the recommended configuration is the following:

receivers:
filelog:
exclude: []
include:
- /var/log/pods/*/*/*.log
include_file_name: false
include_file_path: true
operators:
- id: get-format
  routes:
  - expr: body matches "^\\{"
    output: parser-docker
  - expr: body matches "^[^ Z]+ "
    output: parser-crio
  - expr: body matches "^[^ Z]+Z"
    output: parser-containerd
  type: router
- id: parser-crio
  regex: ^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
  timestamp:
    layout: 2006-01-02T15:04:05.999999999Z07:00
    layout_type: gotime
    parse_from: attributes.time
  type: regex_parser
- combine_field: attributes.log
  combine_with: ""
  id: crio-recombine
  is_last_entry: attributes.logtag == 'F'
  max_log_size: 102400
  output: extract_metadata_from_filepath
  source_identifier: attributes["log.file.path"]
  type: recombine
- id: parser-containerd
  regex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
  timestamp:
    layout: '%Y-%m-%dT%H:%M:%S.%LZ'
    parse_from: attributes.time
  type: regex_parser
- combine_field: attributes.log
  combine_with: ""
  id: containerd-recombine
  is_last_entry: attributes.logtag == 'F'
  max_log_size: 102400
  output: extract_metadata_from_filepath
  source_identifier: attributes["log.file.path"]
  type: recombine
- id: parser-docker
  output: extract_metadata_from_filepath
  timestamp:
    layout: '%Y-%m-%dT%H:%M:%S.%LZ'
    parse_from: attributes.time
  type: json_parser
- id: extract_metadata_from_filepath
  parse_from: attributes["log.file.path"]
  regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]+)\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$
  type: regex_parser
- from: attributes.stream
  to: attributes["log.iostream"]
  type: move
- from: attributes.container_name
  to: resource["k8s.container.name"]
  type: move
- from: attributes.namespace
  to: resource["k8s.namespace.name"]
  type: move
- from: attributes.pod_name
  to: resource["k8s.pod.name"]
  type: move
- from: attributes.restart_count
  to: resource["k8s.container.restart_count"]
  type: move
- from: attributes.uid
  to: resource["k8s.pod.uid"]
  type: move
- from: attributes.log
  to: body
  type: move
start_at: beginning

Despite we have #24439 and #23339 for improving the k8s logs parsing experience I wonder if we should already solve the parsing part on filelog's receiver level by encoding within the implementation the parsing patterns.

That means essentially the following part:

- id: get-format
  routes:
  - expr: body matches "^\\{"
    output: parser-docker
  - expr: body matches "^[^ Z]+ "
    output: parser-crio
  - expr: body matches "^[^ Z]+Z"
    output: parser-containerd
  type: router
- id: parser-crio
  regex: ^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
  timestamp:
    layout: 2006-01-02T15:04:05.999999999Z07:00
    layout_type: gotime
    parse_from: attributes.time
  type: regex_parser
- combine_field: attributes.log
  combine_with: ""
  id: crio-recombine
  is_last_entry: attributes.logtag == 'F'
  max_log_size: 102400
  output: extract_metadata_from_filepath
  source_identifier: attributes["log.file.path"]
  type: recombine
- id: parser-containerd
  regex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
  timestamp:
    layout: '%Y-%m-%dT%H:%M:%S.%LZ'
    parse_from: attributes.time
  type: regex_parser
- combine_field: attributes.log
  combine_with: ""
  id: containerd-recombine
  is_last_entry: attributes.logtag == 'F'
  max_log_size: 102400
  output: extract_metadata_from_filepath
  source_identifier: attributes["log.file.path"]
  type: recombine
- id: parser-docker
  output: extract_metadata_from_filepath
  timestamp:
    layout: '%Y-%m-%dT%H:%M:%S.%LZ'
    parse_from: attributes.time
  type: json_parser

Describe the solution you'd like

With implementing the format's parsing as an operator we can ensure that nothing will be missing from user's configs and that the parsing implementation will also be easily covered by specific unit tests.

The configuration will then be simplified to something like the following:

receivers:
  filelog:
    exclude: []
    include:
      - /var/log/pods/*/*/*.log
    include_file_name: false
    include_file_path: true
    operators:
      - type: container_parser
        format: auto 

We can define the format specifically to docker, containerd or crio for those users that want to explicitly define it.
Additionally we could also support defining the stream (stdout/stderr) to focus parsing on like what Filebeat used to do.

cc: @djaglowski @TylerHelmuth

Describe alternatives you've considered

No response

Additional context

No response

@ChrsMark ChrsMark added enhancement New feature or request needs triage New item requiring triage labels Mar 26, 2024
Copy link
Contributor

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@TylerHelmuth
Copy link
Member

I am definitely in favor

@djaglowski
Copy link
Member

I am in favor of the idea. Generally we try not to write in code what can be done with config, but in this case it is a very specific use case which is also very common. This would make it easier to configure while also giving us the opportunity to optimize performance (e.g. by performing a fewer number of more targeted actions) and also add resiliency (e.g. implement a next-best behavior if X assumption fails).

@OverOrion
Copy link
Contributor

This is currently a huge pain for everyone using the Collector for k8s parsing, so huge thanks for the idea! 🚀

What do you think about incorporating the moves as well?
E.g.,

- from: attributes.stream
  to: attributes["log.iostream"]
  type: move
- from: attributes.container_name
  to: resource["k8s.container.name"]
  type: move
<all the other moves>

@ChrsMark
Copy link
Member Author

ChrsMark commented Apr 2, 2024

Sure @OverOrion, I think it makes sense to include the extract_metadata_from_filepath and moves as well within the implementation.

There are several things within the implementation that we will need to discuss like re-using already existing code and how much of it.
I'm crafting sth these days and will come with a draft PR soonish to discuss on it.

@ChrsMark
Copy link
Member Author

ChrsMark commented Apr 3, 2024

I have a draft which implements the parser. It's not widely tested but the basic scenarios seem to work. I will deal with tests and improvements once we verify the implementation details decided.

The main thing here is that there is a fundamental question to answer regarding the implementation. It's about how much we should re-use from the existent operators' implementations. Some operations like the routing and the base parsing are easier and makes sense to implement them within the new parser. This gives us the flexibility to easily extend, customize and error handle the flow. However the recombine part for the crio logs is already implemented by the recombine operator in a robust way so I thought it would make sense to re-use the operator directly.

Based on this, I implemented the parser following the hybrid approach bellow:

  1. detect the logs format within the code and pick the proper parsing function accordingly.
  2. if the format is crio then we need to attach a recombine operator as well right after the container parser.
  3. else if the format is docker or containerd we don't want to have the recombine in the pipeline so we should ensure that it's not there and remove it if needed (we could avoid the inject/remove overhead if we had a way to mute errors for operators that are considered to follow a best effort approach: [pkg/stanza] Add mute option for operators' error Handler #32145)
  4. this means that after the parse we have the recombine to take place and then the final move of the attributes.log to the body.
  5. also we handle the addMetadataFromFilePath part with adding a callback function after the basic parse part.

I spent some time thinking of anything that we could do differently but I concluded that the hybrid approach
is a fair one. Other approach would be to re-implement everything from scratch or implement the parser
as a wrapper which creates a group of operators instead, but I'm not sure how much flexibility this could give
us in the parsing and metadata part. That's why I only chose to include recombine as is. You can get an idea of how this would look like at ChrsMark@0c6cdaa. I'm not sure though if this would violate any implementation rules/guidelines of the stanza package and how it would be affected by #32058 :) but we do it already for the syslog one (also here).

@djaglowski and all what do you think about this?

@djaglowski
Copy link
Member

I agree with your assessment that we should reuse the recombine operator and rewrite the routing and parsing. However, I differ on a few other points.

I think it's probably not necessary to use move or other simple operators internally. You can just perform these steps directly with entry.Entry.Get/Set/Delete.

  1. if the format is crio then we need to attach a recombine operator as well right after the container parser
  2. else if the format is docker or containerd we don't want to have the recombine in the pipeline

This looks really complicated and it's not clear to me that we need to manage a pipeline internally, especially one that requires dynamic injection & removal of operators. Not only does this seem more complicated than necessary, but I suspect we would be leaving easy performance gains on the table.

Can we just instantiate an internal instance of the recombine operator, only pass logs to it if the format is crio, and figure out how to consume it's output asynchronously without a pipeline being involved?

@ChrsMark
Copy link
Member Author

ChrsMark commented Apr 22, 2024

Thank's @djaglowski for checking this!
All the move operations can happen programmatically without the use of an internal move operator except of the final one which moves the log to the body and needs to happen after the recombine part.
I'm looking into how we can avoid the internal pipeline handling for the recombine part that you mentioned and update here.

UPDATE: It looks like even the last move operator can be avoided if we set the combine_field of the recombine to body and have it being set programmatically on the main flow.

@ChrsMark
Copy link
Member Author

So I tried to remove the pipeline logic and consume messages asynchronously from the internal recombine operator.

I have a draft of this version that seems to work for the basic scenarios at 7dc6949. I just want to verify that we are ok with extending the recombine operator with the additional Entry channel in order to be able to send messages asynchronously back to the original parser. @djaglowski is this close to what you had in mind?

Note: We/I need to ensure that the stopping process happens sequentially to avoid losing any messages like in other issues we are facing with the filelog receiver.

@djaglowski
Copy link
Member

Rather than adding complexity to the recombine operator's internals, can we just use SetOutputs to inject a handler? We could reuse adapter.LogEmitter or define one similar to pull entries when they are emitted from the recombine operator.

@ChrsMark
Copy link
Member Author

ChrsMark commented Apr 23, 2024

That sounds reasonable. However, in order to reuse the adapter.LogEmitter we would need to move its implementation to a different place to avoid circular imports when we will try to import it in the container operator. Would that refactoring be a breaking change for the package?
Otherwise, we will need to go with a similar new "emitter".

UPDATE: I filed a draft to illustrate this refactoring. Let me know what you think.

djaglowski pushed a commit that referenced this issue Apr 23, 2024
…2629)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
This PR moves `LogEmitter` to a common place in
`pkg/stanza/operator/helper` so as to be reusable by operators without
hitting circular imports.
This is explained at
#31959 (comment).

**Link to tracking Issue:** <Issue number if applicable>

**Testing:** <Describe what testing was performed and which tests were
added.>

**Documentation:** <Describe the documentation added.>

Signed-off-by: ChrsMark <chrismarkou92@gmail.com>
@jsirianni
Copy link
Member

Love the idea, and am happy to help with testing. We use a fairly complicated operator pipeline for our container logging here, this would help simplify it.

rimitchell pushed a commit to rimitchell/opentelemetry-collector-contrib that referenced this issue May 8, 2024
…en-telemetry#32629)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
This PR moves `LogEmitter` to a common place in
`pkg/stanza/operator/helper` so as to be reusable by operators without
hitting circular imports.
This is explained at
open-telemetry#31959 (comment).

**Link to tracking Issue:** <Issue number if applicable>

**Testing:** <Describe what testing was performed and which tests were
added.>

**Documentation:** <Describe the documentation added.>

Signed-off-by: ChrsMark <chrismarkou92@gmail.com>
djaglowski pushed a commit that referenced this issue May 14, 2024
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
This PR implements the new container logs parser as it was proposed at
#31959.

**Link to tracking Issue:** <Issue number if applicable>
#31959

**Testing:** <Describe what testing was performed and which tests were
added.>

Added unit tests. Providing manual testing steps as well:

### How to test this manually

1. Using the following config file:
```yaml
receivers:
  filelog:
    start_at: end
    include_file_name: false
    include_file_path: true
    include:
    - /var/log/pods/*/*/*.log
    operators:
      - id: container-parser
        type: container
        output: m1
      - type: move
        id: m1
        from: attributes.k8s.pod.name
        to: attributes.val
      - id: some
        type: add
        field: attributes.key2.key_in
        value: val2

exporters:
  debug:
    verbosity: detailed

service:
  pipelines:
    logs:
      receivers: [filelog]
      exporters: [debug]
      processors: []
```
2. Start the collector:
`./bin/otelcontribcol_linux_amd64 --config
~/otelcol/container_parser/config.yaml`
3. Use the following bash script to create some logs:
```bash
#! /bin/bash

echo '2024-04-13T07:59:37.505201169-05:00 stdout P This is a very very long crio line th' >> /var/log/pods/kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler43/1.log
echo '{"log":"INFO: log line here","stream":"stdout","time":"2029-03-30T08:31:20.545192187Z"}' >> /var/log/pods/kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log
echo '2024-04-13T07:59:37.505201169-05:00 stdout F at is awesome! crio is awesome!' >> /var/log/pods/kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler43/1.log
echo '2021-06-22T10:27:25.813799277Z stdout P some containerd log th' >> /var/log/pods/kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log
echo '{"log":"INFO: another log line here","stream":"stdout","time":"2029-03-30T08:31:20.545192187Z"}' >> /var/log/pods/kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log
echo '2021-06-22T10:27:25.813799277Z stdout F at is super awesome! Containerd is awesome' >> /var/log/pods/kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log



echo '2024-04-13T07:59:37.505201169-05:00 stdout F standalone crio line which is awesome!' >> /var/log/pods/kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler43/1.log
echo '2021-06-22T10:27:25.813799277Z stdout F standalone containerd line that is super awesome!' >> /var/log/pods/kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log
```
4. Run the above as a bash script to verify any parallel processing.
Verify that the output is correct.


### Test manually on k8s

1. `make docker-otelcontribcol && docker tag otelcontribcol
otelcontribcol-dev:0.0.1 && kind load docker-image
otelcontribcol-dev:0.0.1`
2. Install using the following helm values file:
```yaml
mode: daemonset
presets:
  logsCollection:
    enabled: true

image:
  repository: otelcontribcol-dev
  tag: "0.0.1"
  pullPolicy: IfNotPresent

command:
  name: otelcontribcol

config:
  exporters:
    debug:
      verbosity: detailed
  receivers:
    filelog:
      start_at: end
      include_file_name: false
      include_file_path: true
      exclude:
        - /var/log/pods/default_daemonset-opentelemetry-collector*_*/opentelemetry-collector/*.log
      include:
        - /var/log/pods/*/*/*.log
      operators:
        - id: container-parser
          type: container
          output: some
        - id: some
          type: add
          field: attributes.key2.key_in
          value: val2


  service:
    pipelines:
      logs:
        receivers: [filelog]
        processors: [batch]
        exporters: [debug]
```
3. Check collector's output to verify the logs are parsed properly:
```console
2024-05-10T07:52:02.307Z	info	LogsExporter	{"kind": "exporter", "data_type": "logs", "name": "debug", "resource logs": 1, "log records": 2}
2024-05-10T07:52:02.307Z	info	ResourceLog #0
Resource SchemaURL: 
ScopeLogs #0
ScopeLogs SchemaURL: 
InstrumentationScope  
LogRecord #0
ObservedTimestamp: 2024-05-10 07:52:02.046236071 +0000 UTC
Timestamp: 2024-05-10 07:52:01.92533954 +0000 UTC
SeverityText: 
SeverityNumber: Unspecified(0)
Body: Str(otel logs at 07:52:01)
Attributes:
     -> log: Map({"iostream":"stdout"})
     -> time: Str(2024-05-10T07:52:01.92533954Z)
     -> k8s: Map({"container":{"name":"busybox","restart_count":"0"},"namespace":{"name":"default"},"pod":{"name":"daemonset-logs-6f6mn","uid":"1069e46b-03b2-4532-a71f-aaec06c0197b"}})
     -> logtag: Str(F)
     -> key2: Map({"key_in":"val2"})
     -> log.file.path: Str(/var/log/pods/default_daemonset-logs-6f6mn_1069e46b-03b2-4532-a71f-aaec06c0197b/busybox/0.log)
Trace ID: 
Span ID: 
Flags: 0
LogRecord #1
ObservedTimestamp: 2024-05-10 07:52:02.046411602 +0000 UTC
Timestamp: 2024-05-10 07:52:02.027386192 +0000 UTC
SeverityText: 
SeverityNumber: Unspecified(0)
Body: Str(otel logs at 07:52:02)
Attributes:
     -> log.file.path: Str(/var/log/pods/default_daemonset-logs-6f6mn_1069e46b-03b2-4532-a71f-aaec06c0197b/busybox/0.log)
     -> time: Str(2024-05-10T07:52:02.027386192Z)
     -> log: Map({"iostream":"stdout"})
     -> logtag: Str(F)
     -> k8s: Map({"container":{"name":"busybox","restart_count":"0"},"namespace":{"name":"default"},"pod":{"name":"daemonset-logs-6f6mn","uid":"1069e46b-03b2-4532-a71f-aaec06c0197b"}})
     -> key2: Map({"key_in":"val2"})
Trace ID: 
Span ID: 
Flags: 0
...
```


**Documentation:** <Describe the documentation added.>  Added

Signed-off-by: ChrsMark <chrismarkou92@gmail.com>
@ChrsMark
Copy link
Member Author

Completed via #32594

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request receiver/filelog
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants