Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Could not load job definition error "TypeError: unhashable type: 'list'" #21815

Closed
kamegg13 opened this issue May 13, 2024 · 5 comments · Fixed by #21819
Closed

Could not load job definition error "TypeError: unhashable type: 'list'" #21815

kamegg13 opened this issue May 13, 2024 · 5 comments · Fixed by #21819
Labels
area: metadata Related to metadata type: bug Something isn't working

Comments

@kamegg13
Copy link

Dagster version

dagster, version 1.7.5

What's the issue?

Hi there, I am using Dagster with Docker-compose with a gRPC server. I am trying to materialize Airbyte Assets with Dagster, but I am encountering an error in the assets log materialization:

Could not load job definition.
TypeError: unhashable type: 'list'

Stack Trace:
  File "/usr/local/lib/python3.10/site-packages/dagster/_grpc/impl.py", line 136, in core_execute_run
    recon_job.get_definition()
,  File "/usr/local/lib/python3.10/site-packages/dagster/_core/definitions/reconstruct.py", line 338, in __hash__
    self._hash = hash_collection(self)
,  File "/usr/local/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 270, in hash_collection
    return hash(make_hashable(collection))
,  File "/usr/local/lib/python3.10/site-packages/pydantic/_internal/_model_construction.py", line 451, in hash_func
    return hash(getter(self.__dict__))

image

When I use Dagster with Python locally, version 1.7.3, it works perfectly fine. However, when I use Dagster 1.7.5 with docker, I always get the same error, even with other basic assets. The error occurs as soon as I declare my Airbyte assets in the code. Below is the smallest code that I can use to reproduce the error:


from dagster import (
    Definitions,
    EnvVar,
    load_assets_from_current_module,
    asset
)
from dagster_airbyte import AirbyteResource, load_assets_from_airbyte_instance

#* ----- Airbyte instance ----
airbyte_instance = AirbyteResource(
    host=EnvVar('AIRBYTE_HOST').get_value(),
    port=EnvVar('AIRBYTE_PORT').get_value(),
    username=EnvVar('AIRBYTE_USERNAME').get_value(),
    password=EnvVar('AIRBYTE_PASSWORD').get_value(),
)

#* ----- Load assets from Airbyte instance -----

airbyte_assets = load_assets_from_airbyte_instance(airbyte_instance)

#*--- Test asset ---
@asset
def testasset(context):
    return "Hello World"

#* ----- Definitions -----
all_assets = load_assets_from_current_module()
defs = Definitions(assets=all_assets)

thanks for your help

What did you expect to happen?

I expect it to work normally, like on local dagster Core. My EnvVar are correctly set up, and these airbyte assets importation and materialization worked correctly on Dagster 1.7.3, I used the same code but it doesn't work now. Something has been modified on the new version 1.7.5.

How to reproduce?

  1. Launch Airbyte Locally on localhost:8000 with default credentials
  2. Create a connection between a source and a destination on Airbyte (you can use Faker)
  3. Use dagster with Docker (If you can) in a multi-container configuration (you can use the docker-dagster example repository)
  4. Use the code below on dagster 1.7.5 and try to materialize an asset (Airbyte Asset or another one)
    code:

from dagster import (
    Definitions,
    EnvVar,
    load_assets_from_current_module,
    asset
)
from dagster_airbyte import AirbyteResource, load_assets_from_airbyte_instance

#* ----- Airbyte instance ----
airbyte_instance = AirbyteResource(
    host=EnvVar('AIRBYTE_HOST').get_value(),
    port=EnvVar('AIRBYTE_PORT').get_value(),
    username=EnvVar('AIRBYTE_USERNAME').get_value(),
    password=EnvVar('AIRBYTE_PASSWORD').get_value(),
)

#* ----- Load assets from Airbyte instance -----

airbyte_assets = load_assets_from_airbyte_instance(airbyte_instance)

#*--- Test asset ---
@asset
def testasset(context):
    return "Hello World"

#* ----- Definitions -----
all_assets = load_assets_from_current_module()
defs = Definitions(assets=all_assets)

Deployment type

Docker Compose

Deployment details

Docker version 3.7
python:3.10-slim

Using 2 dockerfiles: Docker_user_code and Docker_dagster (
You need to add your airbyte credentials in the env variables in the docker-compose.yml , specially in the docker_daemon and the docker_user_code

main.py

from dagster import (
    Definitions,
    EnvVar,
    load_assets_from_current_module,
    asset
)
from dagster_airbyte import AirbyteResource, load_assets_from_airbyte_instance
#* ----- Airbyte instance ----
airbyte_instance = AirbyteResource(
    host=EnvVar('AIRBYTE_HOST').get_value(),
    port=EnvVar('AIRBYTE_PORT').get_value(),
    username=EnvVar('AIRBYTE_USERNAME').get_value(),
    password=EnvVar('AIRBYTE_PASSWORD').get_value(),
)
#* ----- Load assets from Airbyte instance -----
airbyte_assets = load_assets_from_airbyte_instance(airbyte_instance)
#*--- Test asset ---
@asset
def testasset(context):
    return "Hello World"
#* ----- Definitions -----
all_assets = load_assets_from_current_module()
defs = Definitions(assets=all_assets)

Dagster.yaml

telemetry:
  enabled: false

scheduler:
  module: dagster.core.scheduler
  class: DagsterDaemonScheduler

run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator

run_launcher:
  module: dagster_docker
  class: DockerRunLauncher
  config:
    env_vars:
      - DAGSTER_POSTGRES_USER
      - DAGSTER_POSTGRES_PASSWORD
      - DAGSTER_POSTGRES_DB
      - AIRBYTE_HOST
      - AIRBYTE_PORT
      - AIRBYTE_USERNAME
      - AIRBYTE_PASSWORD
      - DAGSTER_LOG_LEVEL
      
    network: docker_network
    container_kwargs:
      volumes: # Make docker client accessible to any launched containers as well
        - /var/run/docker.sock:/var/run/docker.sock
        - /tmp/io_manager_storage:/tmp/io_manager_storage

run_storage:
  module: dagster_postgres.run_storage
  class: PostgresRunStorage
  config:
    postgres_db:
      hostname: docker_postgresql
      username:
        env: DAGSTER_POSTGRES_USER
      password:
        env: DAGSTER_POSTGRES_PASSWORD
      db_name:
        env: DAGSTER_POSTGRES_DB
      port: 5432

schedule_storage:
  module: dagster_postgres.schedule_storage
  class: PostgresScheduleStorage
  config:
    postgres_db:
      hostname: docker_postgresql
      username:
        env: DAGSTER_POSTGRES_USER
      password:
        env: DAGSTER_POSTGRES_PASSWORD
      db_name:
        env: DAGSTER_POSTGRES_DB
      port: 5432

# Configuration for the event log storage in Dagster.
event_log_storage:
  module: dagster_postgres.event_log
  class: PostgresEventLogStorage
  config:
    # Configuration for the PostgreSQL database.
    postgres_db:
      hostname: docker_postgresql
      username:
        env: DAGSTER_POSTGRES_USER  # The username for connecting to the PostgreSQL database.
      password:
        env: DAGSTER_POSTGRES_PASSWORD  # The password for connecting to the PostgreSQL database.
      db_name:  # The name of the PostgreSQL database.
        env: DAGSTER_POSTGRES_DB
      port: 5432
  

Dockerfile_dagster

# Dagster libraries to run both dagster-webserver and the dagster-daemon. Does not
# need to have access to any pipeline code.

FROM python:3.10-slim

RUN pip install \
    dagster \
    dagster-graphql \
    dagster-webserver \
    dagster-postgres \
    dagster-docker \


# Set $DAGSTER_HOME and copy dagster instance and workspace YAML there
ENV DAGSTER_HOME=/opt/dagster/dagster_home/

RUN mkdir -p $DAGSTER_HOME

COPY dagster.yaml workspace.yaml $DAGSTER_HOME

WORKDIR $DAGSTER_HOME

Dockerfile_user_code

FROM python:3.10-slim

RUN pip install \
    dagster \
    dagster-postgres \
    dagster-docker \
    dagster_airbyte \

RUN apt-get update && apt-get install -y curl
#
WORKDIR /opt/dagster/app
COPY main.py /opt/dagster/app

EXPOSE 4000
# CMD allows this to be overridden from run launchers or executors that want
# to run other commands against your repository
CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-f","main.py"]

workspace.yaml

load_from:
  # Each entry here corresponds to a service in the docker-compose file that exposes user code.
  - grpc_server:
      host: docker_user_code
      port: 4000
      location_name: "pipeline"

docker-compose.yml

version: "3.7"

services:

  docker_postgresql:
    image: postgres:11
    container_name: docker_postgresql
    environment:
      POSTGRES_USER: "postgres_user"
      POSTGRES_PASSWORD: "postgres_password"
      POSTGRES_DB: "postgres_db"
      TEST_VAR: "test_valuepostgres"
    networks:
      - docker_network

  docker_user_code:
    environment:
      DAGSTER_POSTGRES_USER: "postgres_user"
      DAGSTER_POSTGRES_PASSWORD: "postgres_password"
      DAGSTER_POSTGRES_DB: "postgres_db"
      DAGSTER_CURRENT_IMAGE: "docker_user_code_image"

      DAGSTER_LOG_LEVEL: DEBUG
      AIRBYTE_HOST: 
      AIRBYTE_PORT: "8000"
      AIRBYTE_USERNAME: 
      AIRBYTE_PASSWORD: 

    build:
      context: .
      dockerfile: ./Dockerfile_user_code
    container_name: docker_user_code
    image: docker_user_code_image
    restart: always
    networks:
      - docker_network

  docker_webserver:
    build:
      context: .
      dockerfile: ./Dockerfile_dagster
    entrypoint:
      - dagster-webserver
      - -h
      - "0.0.0.0"
      - -p
      - "3000"
      - -w
      - workspace.yaml
    container_name: docker_webserver
    expose:
      - "3000"
    ports:
      - "3000:3000"
    environment:
      DAGSTER_POSTGRES_USER: "postgres_user"
      DAGSTER_POSTGRES_PASSWORD: "postgres_password"
      DAGSTER_POSTGRES_DB: "postgres_db"

    volumes: # Make docker client accessible so we can terminate containers from the webserver
      - /var/run/docker.sock:/var/run/docker.sock
      - /tmp/io_manager_storage:/tmp/io_manager_storage
    networks:
      - docker_network
    depends_on:
      - docker_postgresql
      - docker_user_code

  # This service runs the dagster-daemon process, which is responsible for taking runs
  # off of the queue and launching them, as well as creating runs from schedules or sensors.
  docker_daemon:
    build:
      context: .
      dockerfile: ./Dockerfile_dagster
    entrypoint:
      - dagster-daemon
      - run
    container_name: docker_daemon
    restart: on-failure
    environment:
      DAGSTER_POSTGRES_USER: "postgres_user"
      DAGSTER_POSTGRES_PASSWORD: "postgres_password"
      DAGSTER_POSTGRES_DB: "postgres_db"
      AIRBYTE_HOST: "" # ADD ENV VAR HERE 
      AIRBYTE_PORT: "8000"
      AIRBYTE_USERNAME: 
      AIRBYTE_PASSWORD:   
      DAGSTER_LOG_LEVEL: DEBUG
    env_file:
      - .env
    volumes: # Make docker client accessible so we can launch containers using host docker
      - /var/run/docker.sock:/var/run/docker.sock
      - /tmp/io_manager_storage:/tmp/io_manager_storage
    networks:
      - docker_network
    depends_on:
      - docker_postgresql
      - docker_user_code

networks:
  docker_network:
    driver: bridge
    name: docker_network

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

@kamegg13 kamegg13 added the type: bug Something isn't working label May 13, 2024
@lamalex
Copy link
Contributor

lamalex commented May 13, 2024

I am having the same problem after upgrading to 1.7.5 but my assets are dbt assets, not airbyte

@garethbrickman garethbrickman changed the title Could not load Job Definitions Error (Airbyte / Docker / gRPC) Could not load job definition error "TypeError: unhashable type: 'list'" May 13, 2024
@garethbrickman
Copy link
Contributor

Could you post full stack traces for the error?

@lamalex
Copy link
Contributor

lamalex commented May 13, 2024

dagster._core.errors.DagsterExecutionInterruptedError

  File "/Users/alauni/Code/dbt/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_plan.py", line 282, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
  File "/Users/alauni/Code/dbt/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 523, in core_dagster_event_sequence_for_step
    for user_event in _step_output_error_checked_user_event_sequence(
  File "/Users/alauni/Code/dbt/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 202, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
  File "/Users/alauni/Code/dbt/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/execute_step.py", line 100, in _process_asset_results_to_events
    for user_event in user_event_sequence:
  File "/Users/alauni/Code/dbt/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/compute.py", line 208, in execute_core_compute
    for step_output in _yield_compute_results(step_context, inputs, compute_fn, compute_context):
  File "/Users/alauni/Code/dbt/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/compute.py", line 177, in _yield_compute_results
    for event in iterate_with_context(
  File "/Users/alauni/Code/dbt/.venv/lib/python3.11/site-packages/dagster/_utils/__init__.py", line 465, in iterate_with_context
    next_output = next(iterator)
                  ^^^^^^^^^^^^^^
  File "/Users/alauni/Code/dbt/orchestration_dagster/usercode/assets/dbt.py", line 122, in _databricks_nonincremental_assets
    yield from dbt_databricks.cli(
  File "/Users/alauni/Code/dbt/.venv/lib/python3.11/site-packages/dagster_dbt/core/resources_v2.py", line 675, in stream
    for event in self.stream_raw_events():
  File "/Users/alauni/Code/dbt/.venv/lib/python3.11/site-packages/dagster_dbt/core/resources_v2.py", line 692, in stream_raw_events
    for log in self._stdout or self._stream_stdout():
  File "/Users/alauni/Code/dbt/.venv/lib/python3.11/site-packages/dagster_dbt/core/resources_v2.py", line 794, in _stream_stdout
    self.process.wait(timeout=self.termination_timeout_seconds)
  File "/opt/homebrew/Cellar/python@3.11/3.11.7_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/subprocess.py", line 1264, in wait
    return self._wait(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.7_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/subprocess.py", line 2040, in _wait
    time.sleep(delay)
  File "/Users/alauni/Code/dbt/.venv/lib/python3.11/site-packages/dagster/_utils/interrupts.py", line 82, in _new_signal_handler
    raise error_cls()

The above exception occurred during handling of the following exception:
dagster._core.errors.DagsterExecutionInterruptedError

  File "/Users/alauni/Code/dbt/.venv/lib/python3.11/site-packages/dagster_dbt/core/resources_v2.py", line 786, in _stream_stdout
    for raw_line in self.process.stdout or []:
  File "/Users/alauni/Code/dbt/.venv/lib/python3.11/site-packages/dagster/_utils/interrupts.py", line 82, in _new_signal_handler
    raise error_cls()

@lamalex
Copy link
Contributor

lamalex commented May 13, 2024

downgrading to 1.7.4 restores normal behavior

@OwenKephart
Copy link
Contributor

Hi @kamegg13, thank you for the detailed report. I've opened up a PR to resolve this issue, which we should be able to get into this week's 1.7.6 release.

@garethbrickman garethbrickman added the area: metadata Related to metadata label May 13, 2024
sryza pushed a commit that referenced this issue May 14, 2024
…of metadata to error (#21819)

## Summary & Motivation

Resolves: #21815

Note that while the test involved here is referencing
`CacheableAssetsDefinition`, this same error would occur with regular
assets. Any asset using a `MetadataValue` which contains within it a
mutable object (which would be table schema and table column lineage
which contain lists I believe) would encounter a similar error.

After switching the core `MetadataValue` class from a `NamedTuple` to a
pydantic model, the code we use to generate hashes for our serializable
objects stopped handling them. This PR fixes this.

## How I Tested These Changes

After updating the test to include metadata of this type, observed the
mentioned error. These changes resolved it.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: metadata Related to metadata type: bug Something isn't working
Projects
None yet
4 participants