Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

telegraf mqtt output plugin. Missing information #15121

Closed
alexmc1510 opened this issue Apr 8, 2024 · 13 comments
Closed

telegraf mqtt output plugin. Missing information #15121

alexmc1510 opened this issue Apr 8, 2024 · 13 comments
Labels
bug unexpected problem or unintended behavior

Comments

@alexmc1510
Copy link

alexmc1510 commented Apr 8, 2024

Relevant telegraf.conf

###############################################################################
#                            OUTPUT PLUGINS                                   #
###############################################################################


# Configuration for MQTT server to send metrics to
[[outputs.mqtt]]
  ## MQTT Brokers
  ## The list of brokers should only include the hostname or IP address and the
  ## port to the broker. This should follow the format `[{scheme}://]{host}:{port}`. For
  ## example, `localhost:1883` or `mqtt://localhost:1883`.
  ## Scheme can be any of the following: tcp://, mqtt://, tls://, mqtts://
  ## non-TLS and TLS servers can not be mix-and-matched.
  servers = ["${TELEGRAF_BROKER}"]

   ## Protocol can be `3.1.1` or `5`. Default is `3.1.1`
   # procotol = "3.1.1"

   ## MQTT Topic for Producer Messages
   ## MQTT outputs send metrics to this topic format:
   ## {{ .TopicPrefix }}/{{ .Hostname }}/{{ .PluginName }}/{{ .Tag "tag_key" }}
   ## (e.g. prefix/web01.example.com/mem/some_tag_value)
   ## Each path segment accepts either a template placeholder, an environment variable, or a tag key
   ## of the form `{{.Tag "tag_key_name"}}`. Empty path elements as well as special MQTT characters
   ## (such as `+` or `#`) are invalid to form the topic name and will lead to an error.
   ## In case a tag is missing in the metric, that path segment omitted for the final topic.
   topic = "${TELEGRAF_TOPIC}/{{ .Tag \"subtopic1\" }}/{{ .Tag \"subtopic2\" }}"

   ## QoS policy for messages
   ## The mqtt QoS policy for sending messages.
   ## See https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.0.0/com.ibm.mq.dev.doc/q029090_.htm
   ##   0 = at most once
   ##   1 = at least once
   ##   2 = exactly once
   qos = 1

   ## Keep Alive
   ## Defines the maximum length of time that the broker and client may not
   ## communicate. Defaults to 0 which turns the feature off.
   ##
   ## For version v2.0.12 and later mosquitto there is a bug
   ## (see https://github.com/eclipse/mosquitto/issues/2117), which requires
   ## this to be non-zero. As a reference eclipse/paho.mqtt.golang defaults to 30.
   # keep_alive = 0

   ## username and password to connect MQTT server.
   username = "${TELEGRAF_USER}"
   password = "${TELEGRAF_PASS}"

   ## client ID
   ## The unique client id to connect MQTT server. If this parameter is not set
   ## then a random ID is generated.
   client_id = "${COMPUTERNAME}"

   ## Timeout for write operations. default: 5s
   # timeout = "5s"

   ## Optional TLS Config
   tls_ca = "${TELEGRAF_HOME}//certs//CA.cer"
   # tls_cert = "/etc/telegraf/cert.pem"
   # tls_key = "/etc/telegraf/key.pem"

   ## Use TLS but skip chain & host verification
   # insecure_skip_verify = false

   ## When true, metrics will be sent in one MQTT message per flush. Otherwise,
   ## metrics are written one metric per MQTT message.
   ## DEPRECATED: Use layout option instead
   # batch = false

   ## When true, metric will have RETAIN flag set, making broker cache entries until someone
   ## actually reads it
   # retain = false

   ## Layout of the topics published.
   ## The following choices are available:
   ##   non-batch -- send individual messages, one for each metric
   ##   batch     -- send all metric as a single message per MQTT topic
   ## NOTE: The following options will ignore the 'data_format' option and send single values
   ##   field     -- send individual messages for each field, appending its name to the metric topic
   ##   homie-v4  -- send metrics with fields and tags according to the 4.0.0 specs
   ##                see https://homieiot.github.io/specification/
   layout = "batch"

   ## HOMIE specific settings
   ## The following options provide templates for setting the device name
   ## and the node-ID for the topics. Both options are MANDATORY and can contain
   ## {{ .PluginName }} (metric name), {{ .Tag "key"}} (tag reference to 'key')
   ## or constant strings. The templays MAY NOT contain slashes!
   # homie_device_name = ""
   # homie_node_id = ""

   ## Each data format has its own unique set of configuration options, read
   ## more about them here:
   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
   data_format = "influx"

   ## Optional MQTT 5 publish properties
   ## These setting only apply if the "protocol" property is set to 5. This must
   ## be defined at the end of the plugin settings, otherwise TOML will assume
   ## anything else is part of this table. For more details on publish properties
   ## see the spec:
   ## https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901109
   # [outputs.mqtt.v5]
   #   content_type = ""
   #   response_topic = ""
   #   message_expiry = "0s"
   #   topic_alias = 0
   # [outputs.mqtt.v5.user_properties]
   #   "key1" = "value 1"
   #   "key2" = "value 2"


###############################################################################
#                                  INPUTS                                     #
###############################################################################

# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
  ## Broker URLs for the MQTT server or cluster.  To connect to multiple
  ## clusters or standalone servers, use a separate plugin instance.
  ##   example: servers = ["tcp://localhost:1883"]
  ##            servers = ["ssl://localhost:1883"]
  ##            servers = ["ws://localhost:1883"]
  servers = ["tcp://127.0.0.1:1884"]

  ## Topics that will be subscribed to.
  topics = [
    "Diziscop/Data",
    "Dizimonitor/IsAlive",
    "Dizimonitor/Project",
    "Dizimonitor/Equipments"
  ]

  ## The message topic will be stored in a tag specified by this value.  If set
  ## to the empty string no topic tag will be created.
  # topic_tag = "topic"

  ## QoS policy for messages
  ##   0 = at most once
  ##   1 = at least once
  ##   2 = exactly once
  ##
  ## When using a QoS of 1 or 2, you should enable persistent_session to allow
  ## resuming unacknowledged messages.
  qos = 1

  ## Connection timeout for initial connection in seconds
  # connection_timeout = "30s"

  ## Max undelivered messages
  ## This plugin uses tracking metrics, which ensure messages are read to
  ## outputs before acknowledging them to the original broker to ensure data
  ## is not lost. This option sets the maximum messages to read from the
  ## broker that have not been written by an output.
  ##
  ## This value needs to be picked with awareness of the agent's
  ## metric_batch_size value as well. Setting max undelivered messages too high
  ## can result in a constant stream of data batches to the output. While
  ## setting it too low may never flush the broker's messages.
  # max_undelivered_messages = 1000

  ## Persistent session disables clearing of the client session on connection.
  ## In order for this option to work you must also set client_id to identify
  ## the client.  To receive messages that arrived while the client is offline,
  ## also set the qos option to 1 or 2 and don't forget to also set the QoS when
  ## publishing. Finally, using a persistent session will use the initial
  ## connection topics and not subscribe to any new topics even after
  ## reconnecting or restarting without a change in client ID.
  # persistent_session = false

  ## If unset, a random client ID will be generated.
  # client_id = ""

  ## Username and password to connect MQTT server.
  # username = "telegraf"
  # password = "metricsmetricsmetricsmetrics"

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## Client trace messages
  ## When set to true, and debug mode enabled in the agent settings, the MQTT
  ## client's messages are included in telegraf logs. These messages are very
  ## noisey, but essential for debugging issues.
  # client_trace = false

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

  ## Enable extracting tag values from MQTT topics
  ## _ denotes an ignored entry in the topic path
  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "+/+"
  #   measurement = ""
    tags = "subtopic1/subtopic2"
  #   fields = ""
  ## Value supported is int, float, unit
  #   [[inputs.mqtt_consumer.topic.types]]
  #      key = type


  # Filter metrics with repeating field values
[[processors.dedup]]
  ## Maximum time to suppress output
  dedup_interval = "600s"

Logs from Telegraf

2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     done putting puback msg on obound
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     startIncomingComms: received publish, msgId:58418
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     logic waiting for msg on ibound
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     obound priority msg to write, type*packets.PubackPacket
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     putting puback msg on obound
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [store]   memorystore del: message58417was deleted
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     done putting puback msg on obound
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     outgoing waiting for an outbound message
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     putting puback msg on obound
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [store]   memorystore del: message58418was deleted
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     obound priority msg to write, type*packets.PubackPacket
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     done putting puback msg on obound
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     outgoing waiting for an outbound message
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     obound priority msg to write, type*packets.PubackPacket
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     outgoing waiting for an outbound message
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [client]  enter Publish
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [client]  sending publish message, topic:enterprise/country/plant/devicetype/deviceid/Data
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     obound msg to write2121
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     obound wrote msg, id:2121
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     outgoing waiting for an outbound message
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     startIncoming Received Message
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     startIncomingComms: got msg on ibound
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     startIncomingComms: received publish, msgId:58419
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     logic waiting for msg on ibound
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     putting puback msg on obound
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [store]   memorystore del: message58419was deleted
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     done putting puback msg on obound
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     obound priority msg to write, type*packets.PubackPacket
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     outgoing waiting for an outbound message
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     startIncoming Received Message
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     startIncomingComms: got msg on ibound
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [store]   memorystore del: message2121was deleted
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     startIncomingComms: received puback, id:2121
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     logic waiting for msg on ibound
2024-03-26T14:08:10+01:00 D! [outputs.mqtt] Wrote batch of 505 metrics in 21.7669ms
2024-03-26T14:08:10+01:00 D! [outputs.mqtt] Buffer fullness: 1 / 10000 metrics
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     startIncoming Received Message
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     startIncoming Received Message
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     startIncomingComms: got msg on ibound
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     startIncomingComms: received publish, msgId:58420
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     logic waiting for msg on ibound
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     startIncomingComms: got msg on ibound
2024-03-26T14:08:10+01:00 D! [inputs.mqtt_consumer] [net]     startIncomingComms: received publish, msgId:58421

System info

Telegraf 1.29.5 Windows 10

Docker

No response

Steps to reproduce

  1. Start to publish "single line" very high frequency data to a topic
  2. Configure telegraf mqtt output for batch generation as per attached file
  3. Detect missing packets
    ...

Expected behavior

Error on debug mode or non missing information

Actual behavior

Random missing packets of data

Additional info

image

image

Both signals have the same packet lost...meaning, the issue is related to a packet loss. The time window is exactly 4 sec, the size of a packet.

Full log:

telegraf.2024-03-26-1711458535.log

@alexmc1510 alexmc1510 added the bug unexpected problem or unintended behavior label Apr 8, 2024
@alexmc1510 alexmc1510 changed the title telegraf output plugin. Missing information telegraf mqtt output plugin. Missing information Apr 8, 2024
@srebhan
Copy link
Contributor

srebhan commented Apr 9, 2024

@alexmc1510 I do have a few questions... You are saying

Start to publish "single line" very high frequency data to a topic

what order are we talking here about?
Are you using any additional processors or aggregators? What are your settings for the output batch size and flush-interval?

@powersj powersj added the waiting for response waiting for response from contributor label Apr 10, 2024
@alexmc1510
Copy link
Author

alexmc1510 commented Apr 16, 2024

Hello, sorry for the late response.

Can you reformulate your doubt? What I mean with: "Start to publish "single line" very high frequency data to a topic" is that the device publish data to a specific topic at very high frequency and the final end of the config is to create batches of lines (multiline message) in the output in order to avoid network load and packet missing.
Regarding additional processors or aggregators, as you can see in the config file, I have dedup, There is no more config than the one I have mentioned. The settings for the output are mentioned in the config file and if you refer to the general telegraf configuration part, here you are:

# Global tags can be specified here in key="value" format.
[global_tags]
  # dc = "us-east-1" # will tag all metrics with dc=us-east-1
  # rack = "1a"
  ## Environment variables can be used as tags, and throughout the config file
  # user = "$USER"

# Configuration for telegraf agent
[agent]
  interval = "10s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = ""
  debug = false
  quiet = false
  hostname = ""
  omit_hostname = false

@telegraf-tiger telegraf-tiger bot removed the waiting for response waiting for response from contributor label Apr 16, 2024
@powersj
Copy link
Contributor

powersj commented May 10, 2024

@alexmc1510,

Can you reformulate your doubt?

@srebhan is asking how many metrics are you sending when you see this occur?

Based on what you have provided so far, I have no insight into what is missing, how much data you think is missing, or even why you think data is missing.

[[processors.dedup]]

You are using a dedup processor, which means if something is considered to be a duplicate it would be dropped.

@powersj powersj added the waiting for response waiting for response from contributor label May 10, 2024
@alexmc1510
Copy link
Author

Hello. Sorry for the late reply. I have done a deep debug of the issue but without a more detailed log I don't know how to really continue. At least, I see that the error is constrained into telegraf. Let me explain the test environment:
IOT device with the following configuration:

  1. Scenario: MQTT Broker - Bridge - MQTT Broker - Telegraf - InfluxDB
  2. Scenario: MQTT Broker - Telegraf (MQTT IN - MQTT OUT) - MQTT Broker - Telegraf - InfluxDB

In both scenarios I have detected missing data and I have debugged one by one:

  • No missing data in Edge broker

  • No missing data in bridge or edge Telegraf

  • I forwarded the output to local file in order to discard Influx as a potential candidate.

  • I see missing data in the Telegraf instance in charge of collecting data from MQTT and sending it to InfluxDB. In both scenarios exactly at the same time.
    image

  • I see the following error in the telegraf log:

2024-05-13T07:30:10Z E! [inputs.mqtt_consumer::inputs_mqtt] Error in plugin: connection lost: pingresp not received, disconnecting
2024-05-13T07:30:10Z D! [inputs.mqtt_consumer::inputs_mqtt] Disconnected [tcp://xxxxxxxxxx:1883]
  • I see the following "CPU" glitch in telegaf container:
    image
  • I don't see any network glitch in telegraf container.
  • Important to mention that telegraf and mosquitto are running as containers in the same computer so...there should not be any network error.

Could someone give me an idea about what could I debug in order to find the root cause?

Thanks in advance

@telegraf-tiger telegraf-tiger bot removed the waiting for response waiting for response from contributor label May 13, 2024
@powersj
Copy link
Contributor

powersj commented May 13, 2024

I forwarded the output to local file in order to discard Influx as a potential candidate.

Forwarded the output of what? Telegraf? If not, have you used outputs.file to checked that?

I see the following error in the telegraf log:
I don't see any network glitch in telegraf container.

These two statements disagree with each other. That error looks like you could miss some metrics. Did you track this down further? Have you enabled client_trace = false in your mqtt_consumer config to get all debug statements from the MQTT client itself?!

Important to mention that telegraf and mosquitto are running as containers in the same computer so...there should not be any network error.

That is not a safe assumption. You could absolutely have some sort of misconfigured item, some sort of limiting, DNS interruption, etc. that occurs while running in docker.

Nothing so far points at an issue that is actionable or actually in Telegraf. Without any additional information this issue will be closed.

@powersj powersj added the waiting for response waiting for response from contributor label May 13, 2024
@alexmc1510
Copy link
Author

Hello,

I have deep into the detail of the error doing the following:

  1. I detect random disconnections from the telegraf client in mosquitto.
  2. I enable client_trace in telegraf in order to catch the error.
  3. I see the first occurence in some minutes and I capture the following error in telegraf log:
2024-05-14T12:38:54Z D! [inputs.mqtt_consumer::inputs_mqtt] [net]     startIncoming Received Message
2024-05-14T12:38:54Z E! [outputs.influxdb::outputs_influxdb_prod] When writing to [https://myinfluxserver:8086]: received error write failed: partial write: field type conflict: input field "value" on measurement "Err Servo Y" is type integer, already exists as type float dropped=4; discarding points
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [pinger]  ping check14.997487436
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [pinger]  pingresp not received, disconnecting
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [client]  internalConnLost called
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [client]  stopCommsWorkers called
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [client]  internalConnLost waiting on workers
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [client]  stopCommsWorkers waiting for workers
2024-05-14T12:38:55Z D! [outputs.influxdb::outputs_influxdb_prod] Wrote batch of 1000 metrics in 315.35891ms
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [net]     logic waiting for msg on ibound
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [net]     startIncomingComms: got msg on ibound
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [net]     startIncomingComms: received publish, msgId:0
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [net]     logic waiting for msg on ibound
2024-05-14T12:38:55Z D! [outputs.influxdb::outputs_influxdb_prod] Buffer fullness: 1490 / 10000 metrics
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [net]     incoming complete
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [net]     startIncomingComms: ibound complete
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [net]     startIncomingComms goroutine complete
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [net]     outgoing waiting for an outbound message
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [net]     outgoing waiting for an outbound message
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [net]     outgoing waiting for an outbound message
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [net]     outgoing comms stopping
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [net]     startComms closing outError
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [router]  matchAndDispatch exiting
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [client]  incoming comms goroutine done
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [client]  startCommsWorkers output redirector finished
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [client]  stopCommsWorkers waiting for comms
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [client]  stopCommsWorkers done
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [client]  internalConnLost workers stopped
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [client]  BUG BUG BUG reconnection function is nil<nil>
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [msgids]  cleaned up
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [client]  internalConnLost complete
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] [client]  status is already disconnected
2024-05-14T12:38:55Z E! [inputs.mqtt_consumer::inputs_mqtt] Error in plugin: connection lost: pingresp not received, disconnecting
2024-05-14T12:38:55Z D! [inputs.mqtt_consumer::inputs_mqtt] Disconnected [tcp://172.17.0.4:1883]
2024-05-14T12:38:56Z D! [outputs.file::outputs_file_mydevice1] Wrote batch of 203 metrics in 2.439502ms
2024-05-14T12:38:56Z D! [outputs.file::outputs_file_mydevice1] Buffer fullness: 0 / 10000 metrics
2024-05-14T12:38:56Z D! [outputs.file::outputs_file_mydevice2] Wrote batch of 133 metrics in 6.940891ms
2024-05-14T12:38:56Z D! [outputs.file::outputs_file_mydevice2] Buffer fullness: 0 / 10000 metrics
2024-05-14T12:38:56Z D! [outputs.file::outputs_file] Wrote batch of 665 metrics in 8.032494ms
2024-05-14T12:38:56Z D! [outputs.file::outputs_file] Buffer fullness: 0 / 10000 metrics
2024-05-14T12:38:56Z E! [outputs.influxdb::outputs_influxdb_prod] When writing to [https://myinfluxserver:8086]: received error write failed: partial write: field type conflict: input field "value" on measurement "Err Servo X" is type float, already exists as type integer dropped=3; discarding points
2024-05-14T12:38:57Z D! [outputs.influxdb::outputs_influxdb_prod] Wrote batch of 1000 metrics in 396.769104ms
2024-05-14T12:38:57Z E! [outputs.influxdb::outputs_influxdb_prod] When writing to [https://myinfluxserver:8086]: received error write failed: partial write: field type conflict: input field "value" on measurement "Err Servo X" is type float, already exists as type integer dropped=1; discarding points
2024-05-14T12:38:57Z D! [outputs.influxdb::outputs_influxdb_prod] Wrote batch of 493 metrics in 291.718099ms
2024-05-14T12:38:57Z D! [outputs.influxdb::outputs_influxdb_prod] Buffer fullness: 0 / 10000 metrics
2024-05-14T12:39:00Z D! [inputs.mqtt_consumer::inputs_mqtt] Connecting [tcp://172.17.0.4:1883]

And answering to your question:

Forwarded the output of what? Telegraf? If not, have you used outputs.file to checked that?

With forwarding I mean, sending the same data to an outputs file.

Now I am sure that the error is constrained to telegraf and I don't really understand why a pingcheck time shorter than others is crashing with error "pingresp not received, disconnecting".

Could you suggest me how to continue the debugging activities?

Thanks in advance

@telegraf-tiger telegraf-tiger bot removed the waiting for response waiting for response from contributor label May 14, 2024
@powersj
Copy link
Contributor

powersj commented May 14, 2024

[inputs.mqtt_consumer::inputs_mqtt] [pinger] pingresp not received, disconnecting

As this is looking like networking issues, my suggestion is to simplify your set up first. Remove things out of the containers first and make sure your networking config in this set up is not the source of your issues.

What ports do you have opened? Do you have a very strict firewall set up between these containers? Can you reproduce or see this same behavior if in the same container or if they are both outside?

@powersj powersj added the waiting for response waiting for response from contributor label May 14, 2024
@alexmc1510
Copy link
Author

Hello, thanks for your quick response.
Forgive me but what do you mean with "Remove things out of the containers first." and with "same container or outside"? My configuration is telegraf running in one container (latest image) and mosquitto (latest image) in a different one. Port between containers is opened and I have tried configuring the mqtt server url first with an external URL and after with the internal IP or the bridge internal network of docker. Same behavior in both configurations.

@telegraf-tiger telegraf-tiger bot removed the waiting for response waiting for response from contributor label May 14, 2024
@powersj
Copy link
Contributor

powersj commented May 14, 2024

I'm suggestion running this outside containers. Remove networking or any configuration between containers as potential cause. I say this because this is a common set up for users.

@powersj powersj added the waiting for response waiting for response from contributor label May 14, 2024
@alexmc1510
Copy link
Author

I will try to run telegraf outside a container, nevertheless, it will not help to answer the question:

  • Why a pingresp shorter than some others is crashing the connection chain?

Regards

@telegraf-tiger telegraf-tiger bot removed the waiting for response waiting for response from contributor label May 14, 2024
@powersj
Copy link
Contributor

powersj commented May 14, 2024

Why a pingresp shorter than some others is crashing the connection chain?

I am not an expert on mosquitto and can't provide any insight into that.

@powersj powersj added the waiting for response waiting for response from contributor label May 14, 2024
@alexmc1510
Copy link
Author

Hello, sorry for the late reply. during the weekend I have done some test and progress with the problem:

  1. Installation of mosquitto and telegraf outside docker: Same problem and missing information.
  2. Change of mqtt broker from mosquitto to EMQX cluster (2 nodes): Same problem and missing information.
  3. Change of QoS in telegraf from 0 to 1: No data in the output. Nothing.
  4. I found the following topic in influxdata community with both of you participating:

InfluxData Community
I have configured my telegraf with the following information based on @srebhan message:

Telegraf will collect messages until either flush_interval (±jitter) is reached or metric_batch_size number of metrics arrived. So in your case, you will receive 5k messages (due to your max_undelivered_messages setting and then Telegraf waits for the 30 seconds (flush_interval) to pass by.
So in your case, you are filling in the 5k messages in the first 5 seconds and then wait 25 seconds to flush the metrics as the metric_batch_size is never full.

As a solution I would increase your max_undelivered_messages to say twice the metric_batch_size. Make sure that metric_buffer_limit is still greater than the batch size by margin (say e.g. factor 2 or more). You can additionally reduce the flush_interval to control the maximum latency for your metrics if the rate drops for some reason.

interval = 10s
metric_batch_size = 2500
metric_buffer_limit = 10000
qos = 1
max_undelivered_messages = 5000

Now it is working like a charm. Nevertheless, I have modified the parameters based on the message but not really understanding the meaning of the parameters. Could you clarify a bit how they impact the data capture? Why de default ones were not working properly?

Thanks in advance

@telegraf-tiger telegraf-tiger bot removed the waiting for response waiting for response from contributor label May 22, 2024
@powersj
Copy link
Contributor

powersj commented May 22, 2024

max_undelivered_messages = 5000

The MQTT consumer input plugin will consume messages as it can. The max_undelivered_messages sets the upper limit to the number of messages it will consume at one time. By default, this is 1000, you have increased it to 5000. This will allow the input to consume more messages at any given time.

interval = 10s

This setting is essentially ignored by the mqtt consumer as it will go and read messages as it needs to. We do have some connection checking during at each interval, but the plugin does not read or generate metrics at this interval.

metric_buffer_limit = 10000
metric_batch_size = 2500

These are the buffer limit or how many metrics Telegraf will buffer at any given time. And the batch size, how many metrics, Telegraf will send at each flush interval, default 10s.

Why de default ones were not working properly?

This is still not clear to me either as your charts don't really explain what data you were capturing and why it might not get captured.

Glad you got it working so I'll close this.

@powersj powersj closed this as not planned Won't fix, can't repro, duplicate, stale May 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug unexpected problem or unintended behavior
Projects
None yet
Development

No branches or pull requests

3 participants