Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Druid Kafka jobs are going to Pending state #295

Open
nithinkanil opened this issue Jun 30, 2022 · 3 comments
Open

Druid Kafka jobs are going to Pending state #295

nithinkanil opened this issue Jun 30, 2022 · 3 comments

Comments

@nithinkanil
Copy link

Deployed Druid using the Kubernetes druid operator. We are using a small cluster with a single instance of broker, coordinators, historicals and routers. Using the druid UI we created a Kafka ingestion job. For parallel ingestion, we created 3 tasks. One task is running and the other two tasks are going to a pending state. There is no error in the log files

@AdheipSingh
Copy link
Contributor

this seems like a druid configuration specific issue. please provide sufficient evidence on the operator/k8s side to debug.

@nithinkanil
Copy link
Author

@AdheipSingh PFB the configuration for my druid cluster

# This spec only works on a single node kubernetes cluster(e.g. typical k8s cluster setup for dev using kind/minikube or single node AWS EKS cluster etc)
# as it uses local disk as "deep storage".
#
apiVersion: "druid.apache.org/v1alpha1"
kind: "Druid"
metadata:
  name: tiny-cluster
spec:
  image: apache/druid:0.22.1
  # Optionally specify image for all nodes. Can be specify on nodes also
  # imagePullSecrets:
  # - name: tutu
  startScript: /druid.sh
  podLabels:
    environment: stage
    release: alpha
  podAnnotations:
    dummykey: dummyval
  readinessProbe:
    httpGet:
      path: /status/health
      port: 8088
  securityContext:
    fsGroup: 1000
    runAsUser: 1000
    runAsGroup: 1000
  services:
    - spec:
        type: ClusterIP
        clusterIP: None
  commonConfigMountPath: "/opt/druid/conf/druid/cluster/_common"
  jvm.options: |-
    -server
    -XX:MaxDirectMemorySize=10240g
    -Duser.timezone=UTC
    -Dfile.encoding=UTF-8
    -Dlog4j.debug
    -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
    -Djava.io.tmpdir=/druid/data
  log4j.config: |-
    <?xml version="1.0" encoding="UTF-8" ?>
    <Configuration status="WARN">
        <Appenders>
            <Console name="Console" target="SYSTEM_OUT">
                <PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
            </Console>
        </Appenders>
        <Loggers>
            <Root level="info">
                <AppenderRef ref="Console"/>
            </Root>
        </Loggers>
    </Configuration>
  common.runtime.properties: |

    # Zookeeper
    druid.zk.service.host=tiny-cluster-zk-0.tiny-cluster-zk
    druid.zk.paths.base=/druid
    druid.zk.service.compress=false

    # Metadata Store
    druid.metadata.storage.type=derby
    druid.metadata.storage.connector.connectURI=jdbc:derby://localhost:1527/druid/data/derbydb/metadata.db;create=true
    druid.metadata.storage.connector.host=localhost
    druid.metadata.storage.connector.port=1527
    druid.metadata.storage.connector.createTables=true

    # Deep Storage
    druid.storage.type=local
    druid.storage.storageDirectory=/druid/deepstorage
    #
    # Extensions
    #
    druid.extensions.loadList=["druid-kafka-indexing-service"]

    #
    # Service discovery
    #
    druid.selectors.indexing.serviceName=druid/overlord
    druid.selectors.coordinator.serviceName=druid/coordinator

    druid.indexer.logs.type=file
    druid.indexer.logs.directory=/druid/data/indexing-logs
    druid.lookup.enableLookupSyncOnStartup=false

  metricDimensions.json: |-
    {
      "query/time" : { "dimensions" : ["dataSource", "type"], "type" : "timer"},
      "query/bytes" : { "dimensions" : ["dataSource", "type"], "type" : "count"},
      "query/node/time" : { "dimensions" : ["server"], "type" : "timer"},
      "query/node/ttfb" : { "dimensions" : ["server"], "type" : "timer"},
      "query/node/bytes" : { "dimensions" : ["server"], "type" : "count"},
      "query/node/backpressure": { "dimensions" : ["server"], "type" : "timer"},
      "query/intervalChunk/time" : { "dimensions" : [], "type" : "timer"},

      "query/segment/time" : { "dimensions" : [], "type" : "timer"},
      "query/wait/time" : { "dimensions" : [], "type" : "timer"},
      "segment/scan/pending" : { "dimensions" : [], "type" : "gauge"},
      "query/segmentAndCache/time" : { "dimensions" : [], "type" : "timer" },
      "query/cpu/time" : { "dimensions" : ["dataSource", "type"], "type" : "timer" },

      "query/count" : { "dimensions" : [], "type" : "count" },
      "query/success/count" : { "dimensions" : [], "type" : "count" },
      "query/failed/count" : { "dimensions" : [], "type" : "count" },
      "query/interrupted/count" : { "dimensions" : [], "type" : "count" },
      "query/timeout/count" : { "dimensions" : [], "type" : "count" },

      "query/cache/delta/numEntries" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/sizeBytes" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/hits" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/misses" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/evictions" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/hitRate" : { "dimensions" : [], "type" : "count", "convertRange" : true },
      "query/cache/delta/averageBytes" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/timeouts" : { "dimensions" : [], "type" : "count" },
      "query/cache/delta/errors" : { "dimensions" : [], "type" : "count" },

      "query/cache/total/numEntries" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/sizeBytes" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/hits" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/misses" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/evictions" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/hitRate" : { "dimensions" : [], "type" : "gauge", "convertRange" : true },
      "query/cache/total/averageBytes" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/timeouts" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/total/errors" : { "dimensions" : [], "type" : "gauge" },

      "ingest/events/thrownAway" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/events/unparseable" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/events/duplicate" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/events/processed" : { "dimensions" : ["dataSource", "taskType", "taskId"], "type" : "count" },
      "ingest/events/messageGap" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "ingest/rows/output" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/persists/count" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/persists/time" : { "dimensions" : ["dataSource"], "type" : "timer" },
      "ingest/persists/cpu" : { "dimensions" : ["dataSource"], "type" : "timer" },
      "ingest/persists/backPressure" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "ingest/persists/failed" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/handoff/failed" : { "dimensions" : ["dataSource"], "type" : "count" },
      "ingest/merge/time" : { "dimensions" : ["dataSource"], "type" : "timer" },
      "ingest/merge/cpu" : { "dimensions" : ["dataSource"], "type" : "timer" },

      "ingest/kafka/lag" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "ingest/kafka/maxLag" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "ingest/kafka/avgLag" : { "dimensions" : ["dataSource"], "type" : "gauge" },

      "task/success/count" : { "dimensions" : ["dataSource"], "type" : "count" },
      "task/failed/count" : { "dimensions" : ["dataSource"], "type" : "count" },
      "task/running/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "task/pending/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },

      "taskSlot/total/count" : { "dimensions" : [], "type" : "gauge" },
      "taskSlot/idle/count" : { "dimensions" : [], "type" : "gauge" },
      "taskSlot/busy/count" : { "dimensions" : [], "type" : "gauge" },
      "taskSlot/lazy/count" : { "dimensions" : [], "type" : "gauge" },
      "taskSlot/blacklisted/count" : { "dimensions" : [], "type" : "gauge" },

      "task/run/time" : { "dimensions" : ["dataSource", "taskType"], "type" : "timer" },
      "segment/added/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" },
      "segment/moved/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" },
      "segment/nuked/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count" },

      "segment/assigned/count" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/moved/count" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/dropped/count" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/deleted/count" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/unneeded/count" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/unavailable/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "segment/underReplicated/count" : { "dimensions" : ["dataSource", "tier"], "type" : "gauge" },
      "segment/cost/raw" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/cost/normalization" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/cost/normalized" : { "dimensions" : ["tier"], "type" : "count" },
      "segment/loadQueue/size" : { "dimensions" : ["server"], "type" : "gauge" },
      "segment/loadQueue/failed" : { "dimensions" : ["server"], "type" : "gauge" },
      "segment/loadQueue/count" : { "dimensions" : ["server"], "type" : "gauge" },
      "segment/dropQueue/count" : { "dimensions" : ["server"], "type" : "gauge" },
      "segment/size" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "segment/overShadowed/count" : { "dimensions" : [], "type" : "gauge" },

      "segment/max" : { "dimensions" : [], "type" : "gauge"},
      "segment/used" : { "dimensions" : ["dataSource", "tier", "priority"], "type" : "gauge" },
      "segment/usedPercent" : { "dimensions" : ["dataSource", "tier", "priority"], "type" : "gauge", "convertRange" : true },
      "segment/pendingDelete" : { "dimensions" : [], "type" : "gauge"},

      "jvm/pool/committed" : { "dimensions" : ["poolKind", "poolName"], "type" : "gauge" },
      "jvm/pool/init" : { "dimensions" : ["poolKind", "poolName"], "type" : "gauge" },
      "jvm/pool/max" : { "dimensions" : ["poolKind", "poolName"], "type" : "gauge" },
      "jvm/pool/used" : { "dimensions" : ["poolKind", "poolName"], "type" : "gauge" },
      "jvm/bufferpool/count" : { "dimensions" : ["bufferpoolName"], "type" : "gauge" },
      "jvm/bufferpool/used" : { "dimensions" : ["bufferpoolName"], "type" : "gauge" },
      "jvm/bufferpool/capacity" : { "dimensions" : ["bufferpoolName"], "type" : "gauge" },
      "jvm/mem/init" : { "dimensions" : ["memKind"], "type" : "gauge" },
      "jvm/mem/max" : { "dimensions" : ["memKind"], "type" : "gauge" },
      "jvm/mem/used" : { "dimensions" : ["memKind"], "type" : "gauge" },
      "jvm/mem/committed" : { "dimensions" : ["memKind"], "type" : "gauge" },
      "jvm/gc/count" : { "dimensions" : ["gcName", "gcGen"], "type" : "count" },
      "jvm/gc/cpu" : { "dimensions" : ["gcName", "gcGen"], "type" : "count" },

      "ingest/events/buffered" : { "dimensions" : ["serviceName", "bufferCapacity"], "type" : "gauge"},

      "sys/swap/free" : { "dimensions" : [], "type" : "gauge"},
      "sys/swap/max" : { "dimensions" : [], "type" : "gauge"},
      "sys/swap/pageIn" : { "dimensions" : [], "type" : "gauge"},
      "sys/swap/pageOut" : { "dimensions" : [], "type" : "gauge"},
      "sys/disk/write/count" : { "dimensions" : ["fsDevName"], "type" : "count"},
      "sys/disk/read/count" : { "dimensions" : ["fsDevName"], "type" : "count"},
      "sys/disk/write/size" : { "dimensions" : ["fsDevName"], "type" : "count"},
      "sys/disk/read/size" : { "dimensions" : ["fsDevName"], "type" : "count"},
      "sys/net/write/size" : { "dimensions" : [], "type" : "count"},
      "sys/net/read/size" : { "dimensions" : [], "type" : "count"},
      "sys/fs/used" : { "dimensions" : ["fsDevName", "fsDirName", "fsTypeName", "fsSysTypeName", "fsOptions"], "type" : "gauge"},
      "sys/fs/max" : { "dimensions" : ["fsDevName", "fsDirName", "fsTypeName", "fsSysTypeName", "fsOptions"], "type" : "gauge"},
      "sys/mem/used" : { "dimensions" : [], "type" : "gauge"},
      "sys/mem/max" : { "dimensions" : [], "type" : "gauge"},
      "sys/storage/used" : { "dimensions" : ["fsDirName"], "type" : "gauge"},
      "sys/cpu" : { "dimensions" : ["cpuName", "cpuTime"], "type" : "gauge"},

      "coordinator-segment/count" : { "dimensions" : ["dataSource"], "type" : "gauge" },
      "historical-segment/count" : { "dimensions" : ["dataSource", "tier", "priority"], "type" : "gauge" },

      "jetty/numOpenConnections" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/caffeine/total/requests" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/caffeine/total/loadTime" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/caffeine/total/evictionBytes" : { "dimensions" : [], "type" : "gauge" },
      "query/cache/memcached/total" : { "dimensions" : ["[MEM] Reconnecting Nodes (ReconnectQueue)",
        "[MEM] Request Rate: All",
        "[MEM] Average Bytes written to OS per write",
        "[MEM] Average Bytes read from OS per read",
        "[MEM] Response Rate: All (Failure + Success + Retry)",
        "[MEM] Response Rate: Retry",
        "[MEM] Response Rate: Failure",
        "[MEM] Response Rate: Success"],
        "type" : "gauge" },
      "query/cache/caffeine/delta/requests" : { "dimensions" : [], "type" : "count" },
      "query/cache/caffeine/delta/loadTime" : { "dimensions" : [], "type" : "count" },
      "query/cache/caffeine/delta/evictionBytes" : { "dimensions" : [], "type" : "count" },
      "query/cache/memcached/delta" : { "dimensions" : ["[MEM] Reconnecting Nodes (ReconnectQueue)",
        "[MEM] Request Rate: All",
        "[MEM] Average Bytes written to OS per write",
        "[MEM] Average Bytes read from OS per read",
        "[MEM] Response Rate: All (Failure + Success + Retry)",
        "[MEM] Response Rate: Retry",
        "[MEM] Response Rate: Failure",
        "[MEM] Response Rate: Success"],
        "type" : "count" }
    }

  volumeMounts:
    - mountPath: /druid/data
      name: data-volume
    - mountPath: /druid/deepstorage
      name: deepstorage-volume
  volumes:
    - name: data-volume
      emptyDir: {}
    - name: deepstorage-volume
      hostPath:
        path: /tmp/druid/deepstorage
        type: DirectoryOrCreate
  env:
    - name: POD_NAME
      valueFrom:
        fieldRef:
          fieldPath: metadata.name
    - name: POD_NAMESPACE
      valueFrom:
        fieldRef:
          fieldPath: metadata.namespace

  nodes:
    brokers:
      # Optionally specify for running broker as Deployment
      # kind: Deployment
      nodeType: "broker"
      # Optionally specify for broker nodes
      # imagePullSecrets:
      # - name: tutu
      druid.port: 8088
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/query/broker"
      replicas: 1
      runtime.properties: |
        druid.service=druid/broker
        # HTTP server threads
        druid.broker.http.numConnections=5
        druid.server.http.numThreads=10
        # Processing threads and buffers
        druid.processing.buffer.sizeBytes=1
        druid.processing.numMergeBuffers=1
        druid.processing.numThreads=1
        druid.sql.enable=true
      extra.jvm.options: |-
        -Xmx512M
        -Xms512M

    coordinators:
      # Optionally specify for running coordinator as Deployment
      # kind: Deployment
      nodeType: "coordinator"
      druid.port: 8088
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/master/coordinator-overlord"
      replicas: 1
      runtime.properties: |
        druid.service=druid/coordinator

        # HTTP server threads
        druid.coordinator.startDelay=PT30S
        druid.coordinator.period=PT30S

        # Configure this coordinator to also run as Overlord
        druid.coordinator.asOverlord.enabled=true
        druid.coordinator.asOverlord.overlordService=druid/overlord
        druid.indexer.queue.startDelay=PT30S
        druid.indexer.runner.type=local
      extra.jvm.options: |-
        -Xmx512M
        -Xms512M

    historicals:
      nodeType: "historical"
      druid.port: 8088
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/data/historical"
      replicas: 1
      runtime.properties: |
        druid.service=druid/historical
        druid.server.http.numThreads=5
        druid.processing.buffer.sizeBytes=536870912
        druid.processing.numMergeBuffers=1
        druid.processing.numThreads=1

        # Segment storage
        druid.segmentCache.locations=[{\"path\":\"/druid/data/segments\",\"maxSize\":10737418240}]
        druid.server.maxSize=10737418240
      extra.jvm.options: |-
        -Xmx512M
        -Xms512M
    middlemanagers:
      druid.port: 8080
      kind: StatefulSet
      nodeType: middleManager
      nodeConfigMountPath: /opt/druid/conf/druid/cluster/data/middleManager
      env:
        - name: DRUID_XMX
          value: 4096m
        - name: DRUID_XMS
          value: 4096m
        - name: AWS_REGION
          value: ap-southeast-1
      podDisruptionBudgetSpec:
        maxUnavailable: 1
      replicas: 2
      # resources:
      #   limits:
      #     cpu: 8
      #     memory: 8Gi
      #   requests:
      #     cpu: 8
      #     memory: 8Gi
      livenessProbe:
        initialDelaySeconds: 60
        periodSeconds: 5
        failureThreshold: 3
        httpGet:
          path: /status/health
          port: 8080
      readinessProbe:
        initialDelaySeconds: 60
        periodSeconds: 5
        failureThreshold: 3
        httpGet:
          path: /status/health
          port: 8080
      runtime.properties: |
          druid.service=druid/middleManager
          druid.worker.capacity=4
          druid.indexer.runner.javaOpts=-server -Xms2g -Xmx2g -XX:MaxDirectMemorySize=2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
          druid.indexer.task.baseTaskDir=var/druid/task
          # HTTP server threads
          druid.server.http.numThreads=500
          # Processing threads and buffers on Peons
          druid.indexer.fork.property.druid.processing.numMergeBuffers=2
          druid.indexer.fork.property.druid.processing.buffer.sizeBytes=32000000
          druid.indexer.fork.property.druid.processing.numThreads=2
      volumeClaimTemplates:
        -
          metadata:
            name: data-volume
          spec:
            accessModes:
              - ReadWriteOnce
            resources:
              requests:
                storage: 15Gi
      volumeMounts:
        -
          mountPath: /var/druid
          name: data-volume
        
    routers:
      nodeType: "router"
      druid.port: 8088
      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/query/router"
      replicas: 1
      runtime.properties: |
        druid.service=druid/router

        # HTTP proxy
        druid.router.http.numConnections=10
        druid.router.http.readTimeout=PT5M
        druid.router.http.numMaxThreads=10
        druid.server.http.numThreads=10

        # Service discovery
        druid.router.defaultBrokerServiceName=druid/broker
        druid.router.coordinatorServiceName=druid/coordinator

        # Management proxy to coordinator / overlord: required for unified web console.
        druid.router.managementProxy.enabled=true       
      extra.jvm.options: |-
        -Xmx512M
        -Xms512M

@acherla
Copy link

acherla commented Jul 11, 2022

Your druid.indexer.runner.type=local it should be set to druid.indexer.runner.type=remote to take advantage of the MM peons u configured. This will ensure your tasks are scheduled to the MM peons which have a capacity of 4, ensuring your tasks dont go into a pending state.

Coordinator logs FYI will tell you why tasks arent being scheduled.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants