diff --git a/README.md b/README.md index 1fe0d4b..83c16b3 100644 --- a/README.md +++ b/README.md @@ -6,10 +6,19 @@ An experimental [OpenLineage](https://github.com/OpenLineage/OpenLineage) integr ## Requirements -- Java 11 +- Java 21 - Maven -- Trino 362 -- [Marquez](https://github.com/MarquezProject/marquez) +- Trino 422 + + +## Mapping of properties + +| OpenLineage | Trino | +|--------------------|-----------------------------------------------------| +| JobFacet Name | QueryID | +| JobFacet Namespace | Query Environment (or configured in event listener) | +| RunFacet ID | QueryID | + ## Installation @@ -17,7 +26,7 @@ Build and copy trino-openlineage plugin: ```sh mvn clean install -DskipTests -unzip ./target/trino-openlineage-362.zip -d $TRINO_HOME/plugin +unzip ./target/trino-openlineage-422.zip -d $TRINO_HOME/plugin ``` Add the following line to `$TRINO_HOME/etc/event-listener.properties`: @@ -25,4 +34,38 @@ Add the following line to `$TRINO_HOME/etc/event-listener.properties`: ```properties event-listener.name=openlineage openlineage.url=http://localhost:5000 +#openlineage.facets.trinoMetadata.enabled=false +#openlineage.facets.trinoQueryContext.enabled=true +#openlineage.facets.trinoQueryStatistics.enabled=false +#openlineage.namespace=default +#openlineage.apikey=xxxx +``` + +## Local testing + +1. Build plugin: + +```shell +mvn clean install -DskipTests +``` + +2. Run docker compose: + +```shell +docker compose up -d +``` + +- Freshly built plugin will be automatically mounted to your trino pod. +- Configuration of the plugin will be taken from `event-listener.properties` file - adjust it to your will and restart trino pod for changes to take effect. + +3. Run query creating new table: + +```shell +docker exec -it oltrino trino --execute 'create table memory.default.test_table as select * from tpch.sf1.nation limit 1;' +``` + +4. Check logs of mock api: + +```shell +docker logs olapi ``` diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..68be347 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,25 @@ +version: '3' + +services: + trino: + image: trinodb/trino + container_name: oltrino + networks: + - trino-ol + ports: + - 8080:8080 + volumes: + - ${PWD}/event-listener.properties:/etc/trino/event-listener.properties + - ${PWD}/target/trino-openlineage-442:/usr/lib/trino/plugin/openlineage + olapi: + image: mockserver/mockserver + container_name: olapi + environment: + MOCKSERVER_SERVER_PORT: 5000 + networks: + - trino-ol + ports: + - 5000:5000 + +networks: + trino-ol: diff --git a/event-listener.properties b/event-listener.properties index aae83df..fdee234 100644 --- a/event-listener.properties +++ b/event-listener.properties @@ -1,3 +1,7 @@ event-listener.name=openlineage -openlineage.url=http://localhost:5000 +openlineage.url=http://olapi:5000 +#openlineage.facets.trinoMetadata.enabled=false +#openlineage.facets.trinoQueryContext.enabled=true +#openlineage.facets.trinoQueryStatistics.enabled=false +#openlineage.namespace=default #openlineage.apikey=xxxx \ No newline at end of file diff --git a/pom.xml b/pom.xml index 215ff75..1131cca 100644 --- a/pom.xml +++ b/pom.xml @@ -1,32 +1,25 @@ - + 4.0.0 io.trino trino-root - 362 + 442 trino-openlineage + trino-plugin + Trino OpenLineage + - 11 - 11 + 21 + 21 - - io.airlift - http-client - - - io.airlift - log - com.fasterxml.jackson.core jackson-databind @@ -39,14 +32,22 @@ com.google.guava guava + + io.airlift + http-client + + + io.airlift + log + io.openlineage openlineage-java - 0.2.2 + 1.10.2 - io.trino - trino-spi + com.fasterxml.jackson.core + jackson-annotations provided @@ -55,8 +56,18 @@ provided - com.fasterxml.jackson.core - jackson-annotations + io.opentelemetry + opentelemetry-api + provided + + + io.opentelemetry + opentelemetry-context + provided + + + io.trino + trino-spi provided @@ -66,4 +77,21 @@ - \ No newline at end of file + + + + com.github.ekryd.sortpom + sortpom-maven-plugin + 3.4.0 + + + + sort + + + + + + + + diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageClient.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageClient.java index 24f80ea..32ca863 100644 --- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageClient.java +++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageClient.java @@ -51,17 +51,20 @@ public void emit(OpenLineage.RunEvent event) String json = objectMapper.writeValueAsString(event); logger.info(json); - Request.Builder requestBuilder = Request.builder() - .setMethod("POST") - .setUri(URI.create(url + "/api/v1/lineage")) - .addHeader("Content-Type", "application/json") - .setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(json.getBytes(StandardCharsets.UTF_8))); + Request.Builder requestBuilder = + Request.builder() + .setMethod("POST") + .setUri(URI.create(url + "/api/v1/lineage")) + .addHeader("Content-Type", "application/json") + .setBodyGenerator( + StaticBodyGenerator.createStaticBodyGenerator( + json.getBytes(StandardCharsets.UTF_8))); - if (apiKey.isPresent()) { - requestBuilder.addHeader("Authorization", "Bearer " + apiKey.get()); - } + apiKey.ifPresent(s -> requestBuilder.addHeader("Authorization", "Bearer " + s)); - StatusResponseHandler.StatusResponse status = jettyClient.execute(requestBuilder.build(), StatusResponseHandler.createStatusResponseHandler()); + StatusResponseHandler.StatusResponse status = + jettyClient.execute( + requestBuilder.build(), StatusResponseHandler.createStatusResponseHandler()); logger.info("Response status: " + status.getStatusCode()); } catch (Exception e) { diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java index 47ec518..6af0690 100644 --- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java +++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java @@ -16,13 +16,20 @@ import com.google.common.collect.ImmutableList; import io.openlineage.client.OpenLineage; import io.trino.spi.eventlistener.EventListener; +import io.trino.spi.eventlistener.OutputColumnMetadata; import io.trino.spi.eventlistener.QueryCompletedEvent; +import io.trino.spi.eventlistener.QueryContext; import io.trino.spi.eventlistener.QueryCreatedEvent; import io.trino.spi.eventlistener.QueryIOMetadata; +import io.trino.spi.eventlistener.QueryMetadata; import io.trino.spi.eventlistener.QueryOutputMetadata; +import io.trino.spi.eventlistener.QueryStatistics; +import java.lang.reflect.Field; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.time.ZoneId; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -34,66 +41,172 @@ public class OpenLineageListener { private final OpenLineage ol = new OpenLineage(URI.create("https://github.com/takezoe/trino-openlineage")); private final OpenLineageClient client; + private final Optional namespace; + private final Boolean trinoMetadataFacetEnabled; + private final Boolean trinoQueryContextFacetEnabled; + private final Boolean queryStatisticsFacetEnabled; - public OpenLineageListener(String url, Optional apiKey) + public OpenLineageListener(String url, Optional namespace, Optional apiKey, Boolean trinoMetadataFacetEnabled, Boolean trinoQueryContextFacetEnabled, Boolean queryStatisticsFacetEnabled) { this.client = new OpenLineageClient(url, apiKey); + this.namespace = namespace; + this.trinoMetadataFacetEnabled = trinoMetadataFacetEnabled; + this.trinoQueryContextFacetEnabled = trinoQueryContextFacetEnabled; + this.queryStatisticsFacetEnabled = queryStatisticsFacetEnabled; + } + + private UUID getQueryId(QueryMetadata queryMetadata) + { + return UUID.nameUUIDFromBytes(queryMetadata.getQueryId().getBytes(StandardCharsets.UTF_8)); } @Override public void queryCreated(QueryCreatedEvent queryCreatedEvent) { - // Do nothing here + UUID runID = getQueryId(queryCreatedEvent.getMetadata()); + + try { + sendStartEvent(runID, queryCreatedEvent); + } + catch (IllegalAccessException e) { + throw new RuntimeException(e); + } } @Override public void queryCompleted(QueryCompletedEvent queryCompletedEvent) { - UUID runID = UUID.randomUUID(); - sendStartEvent(runID, queryCompletedEvent); - sendCompletedEvent(runID, queryCompletedEvent); + UUID runID = getQueryId(queryCompletedEvent.getMetadata()); + + try { + sendCompletedEvent(runID, queryCompletedEvent); + } + catch (IllegalAccessException e) { + throw new RuntimeException(e); + } } - private void sendStartEvent(UUID runId, QueryCompletedEvent queryCompletedEvent) + private Optional getTrinoQueryContextFacet(QueryContext queryContext) { - OpenLineage.RunEvent startEvent = ol.newRunEventBuilder() - .eventType("START") - .eventTime(queryCompletedEvent.getExecutionStartTime().atZone(ZoneId.of("UTC"))) - .run(ol.newRunBuilder() - .runId(runId) - .build()) - .job(ol.newJobBuilder() - .namespace(queryCompletedEvent.getContext().getUser()) - .name(queryCompletedEvent.getMetadata().getQueryId()) - .facets(ol.newJobFacetsBuilder() - .sql(ol.newSQLJobFacet(queryCompletedEvent.getMetadata().getQuery())) - .build()) - .build()) - .inputs(buildInputs(queryCompletedEvent.getIoMetadata())) - .outputs(buildOutputs(queryCompletedEvent.getIoMetadata())) - .build(); + if (this.trinoQueryContextFacetEnabled) { + OpenLineage.RunFacet queryContextFacet = ol.newRunFacet(); + + queryContextFacet + .getAdditionalProperties() + .put("serverVersion", queryContext.getServerVersion()); + queryContextFacet + .getAdditionalProperties() + .put("environment", queryContext.getEnvironment()); + queryContext.getQueryType().ifPresent(queryType -> + queryContextFacet + .getAdditionalProperties() + .put("queryType", queryType)); + + return Optional.of(queryContextFacet); + } + return Optional.empty(); + } + + private Optional getTrinoMetadataFacet(QueryMetadata queryMetadata) + { + if (this.trinoMetadataFacetEnabled) { + OpenLineage.RunFacet trinoMetadataFacet = ol.newRunFacet(); + + queryMetadata.getPlan().ifPresent( + queryPlan -> trinoMetadataFacet + .getAdditionalProperties() + .put("queryPlan", queryPlan)); + + queryMetadata.getTransactionId().ifPresent( + transactionId -> trinoMetadataFacet + .getAdditionalProperties() + .put("transactionId", transactionId)); + + return Optional.of(trinoMetadataFacet); + } + return Optional.empty(); + } + + private Optional getTrinoQueryStatisticsFacet(QueryStatistics queryStatistics) + throws IllegalAccessException + { + if (this.queryStatisticsFacetEnabled) { + OpenLineage.RunFacet trinoQueryStatisticsFacet = ol.newRunFacet(); + + for (Field field : queryStatistics.getClass().getDeclaredFields()) { + field.setAccessible(true); + trinoQueryStatisticsFacet + .getAdditionalProperties() + .put(field.getName(), String.valueOf(field.get(queryStatistics))); + } + return Optional.of(trinoQueryStatisticsFacet); + } + return Optional.empty(); + } + + private void sendStartEvent(UUID runID, QueryCreatedEvent queryCreatedEvent) + throws IllegalAccessException + { + OpenLineage.RunFacetsBuilder runFacetsBuilder = ol.newRunFacetsBuilder(); + Optional trinoMetadata = getTrinoMetadataFacet(queryCreatedEvent.getMetadata()); + Optional trinoQueryContext = getTrinoQueryContextFacet(queryCreatedEvent.getContext()); + + trinoMetadata.ifPresent(runFacet -> runFacetsBuilder.put("trino.metadata", runFacet)); + trinoQueryContext.ifPresent(runFacet -> runFacetsBuilder.put("trino.queryContext", runFacet)); + + OpenLineage.RunEvent startEvent = + ol.newRunEventBuilder() + .eventType(OpenLineage.RunEvent.EventType.START) + .eventTime(queryCreatedEvent.getCreateTime().atZone(ZoneId.of("UTC"))) + .run(ol.newRunBuilder().runId(runID).facets(runFacetsBuilder.build()).build()) + .job( + ol.newJobBuilder() + .namespace(getJobNamespace(queryCreatedEvent.getContext())) + .name(queryCreatedEvent.getMetadata().getQueryId()) + .facets( + ol.newJobFacetsBuilder() + .sql(ol.newSQLJobFacet(queryCreatedEvent.getMetadata().getQuery())) + .build()) + .build()) + .build(); client.emit(startEvent); } private void sendCompletedEvent(UUID runID, QueryCompletedEvent queryCompletedEvent) + throws IllegalAccessException { boolean failed = queryCompletedEvent.getMetadata().getQueryState().equals("FAILED"); - OpenLineage.RunEvent completedEvent = ol.newRunEventBuilder() - .eventType(failed ? "FAIL" : "COMPLETE") - .eventTime(queryCompletedEvent.getEndTime().atZone(ZoneId.of("UTC"))) - .run(ol.newRunBuilder().runId(runID).build()) - .job(ol.newJobBuilder() - .namespace(queryCompletedEvent.getContext().getUser()) - .name(queryCompletedEvent.getMetadata().getQueryId()) - .facets(ol.newJobFacetsBuilder() - .sql(ol.newSQLJobFacet(queryCompletedEvent.getMetadata().getQuery())) - .build()) - .build()) - .inputs(buildInputs(queryCompletedEvent.getIoMetadata())) - .outputs(buildOutputs(queryCompletedEvent.getIoMetadata())) - .build(); + OpenLineage.RunFacetsBuilder runFacetsBuilder = ol.newRunFacetsBuilder(); + Optional trinoMetadata = getTrinoMetadataFacet(queryCompletedEvent.getMetadata()); + Optional trinoQueryStatistics = getTrinoQueryStatisticsFacet(queryCompletedEvent.getStatistics()); + Optional trinoQueryContext = getTrinoQueryContextFacet(queryCompletedEvent.getContext()); + + trinoMetadata.ifPresent(runFacet -> runFacetsBuilder.put("trino.metadata", runFacet)); + trinoQueryStatistics.ifPresent(runFacet -> runFacetsBuilder.put("trino.queryStatistics", runFacet)); + trinoQueryContext.ifPresent(runFacet -> runFacetsBuilder.put("trino.queryContext", runFacet)); + + OpenLineage.RunEvent completedEvent = + ol.newRunEventBuilder() + .eventType( + failed + ? OpenLineage.RunEvent.EventType.FAIL + : OpenLineage.RunEvent.EventType.COMPLETE) + .eventTime(queryCompletedEvent.getEndTime().atZone(ZoneId.of("UTC"))) + .run(ol.newRunBuilder().runId(runID).facets(runFacetsBuilder.build()).build()) + .job( + ol.newJobBuilder() + .namespace(getJobNamespace(queryCompletedEvent.getContext())) + .name(queryCompletedEvent.getMetadata().getQueryId()) + .facets( + ol.newJobFacetsBuilder() + .sql(ol.newSQLJobFacet(queryCompletedEvent.getMetadata().getQuery())) + .build()) + .build()) + .inputs(buildInputs(queryCompletedEvent.getIoMetadata())) + .outputs(buildOutputs(queryCompletedEvent.getIoMetadata())) + .build(); client.emit(completedEvent); } @@ -101,10 +214,22 @@ private void sendCompletedEvent(UUID runID, QueryCompletedEvent queryCompletedEv private List buildInputs(QueryIOMetadata ioMetadata) { return ioMetadata.getInputs().stream().map(inputMetadata -> - ol.newInputDatasetBuilder() - .namespace(getDatasetNamespace(inputMetadata.getCatalogName())) - .name(inputMetadata.getSchema() + "." + inputMetadata.getTable()) - .build() + ol.newInputDatasetBuilder() + .namespace(getDatasetNamespace(inputMetadata.getCatalogName())) + .name(inputMetadata.getSchema() + "." + inputMetadata.getTable()) + .facets(ol.newDatasetFacetsBuilder() + .schema(ol.newSchemaDatasetFacetBuilder() + .fields( + inputMetadata + .getColumns() + .stream() + .map(field -> ol.newSchemaDatasetFacetFieldsBuilder() + .name(field) + .build() + ).toList()) + .build() + ).build()) + .build() ).collect(toImmutableList()); } @@ -113,10 +238,41 @@ private List buildOutputs(QueryIOMetadata ioMetadata) Optional outputs = ioMetadata.getOutput(); if (outputs.isPresent()) { QueryOutputMetadata outputMetadata = outputs.get(); - return ImmutableList.of(ol.newOutputDatasetBuilder() - .namespace(getDatasetNamespace(outputMetadata.getCatalogName())) - .name(outputMetadata.getSchema() + "." + outputMetadata.getTable()) - .build()); + List outputColumns = outputMetadata.getColumns().orElse(new ArrayList<>()); + + OpenLineage.ColumnLineageDatasetFacetBuilder columnLineageBuilder = ol.newColumnLineageDatasetFacetBuilder(); + + outputColumns.forEach(column -> + columnLineageBuilder.put(column.getColumnName(), + ol.newColumnLineageDatasetFacetFieldsAdditionalBuilder() + .inputFields(column + .getSourceColumns() + .stream() + .map(inputColumn -> ol.newColumnLineageDatasetFacetFieldsAdditionalInputFieldsBuilder() + .field(inputColumn.getColumnName()) + .namespace(getDatasetNamespace(inputColumn.getCatalog())) + .name(inputColumn.getSchema() + "." + inputColumn.getTable()) + .build()) + .toList() + ).build())); + + return ImmutableList.of( + ol.newOutputDatasetBuilder() + .namespace(getDatasetNamespace(outputMetadata.getCatalogName())) + .name(outputMetadata.getSchema() + "." + outputMetadata.getTable()) + .facets(ol.newDatasetFacetsBuilder() + .columnLineage(columnLineageBuilder.build()) + .schema(ol.newSchemaDatasetFacetBuilder() + .fields( + outputColumns.stream() + .map(column -> ol.newSchemaDatasetFacetFieldsBuilder() + .name(column.getColumnName()) + .type(column.getColumnType()) + .build()) + .toList() + ).build() + ).build() + ).build()); } else { return ImmutableList.of(); @@ -133,4 +289,9 @@ private String getDatasetNamespace(String catalogName) return catalogName; } } + + private String getJobNamespace(QueryContext queryContext) + { + return "trino-" + this.namespace.orElse(queryContext.getEnvironment()); + } } diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListenerFactory.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListenerFactory.java index 9d37be1..3686285 100644 --- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListenerFactory.java +++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListenerFactory.java @@ -38,9 +38,13 @@ public EventListener create(Map config) { String url = requireNonNull(config.get("openlineage.url")); String apiKey = config.get("openlineage.apikey"); + Boolean trinoMetadataFacetEnabled = Optional.ofNullable(config.get("openlineage.facets.trinoMetadata.enabled")).orElse("true").equalsIgnoreCase("true"); + Boolean trinoQueryContextFacetEnabled = Optional.ofNullable(config.get("openlineage.facets.trinoQueryContext.enabled")).orElse("true").equalsIgnoreCase("true"); + Boolean trinoQueryStatisticsFacetEnabled = Optional.ofNullable(config.get("openlineage.facets.trinoQueryStatistics.enabled")).orElse("true").equalsIgnoreCase("true"); + String namespace = config.get("openlineage.namespace"); logger.info("openlineage.url: " + url); - return new OpenLineageListener(url, Optional.ofNullable(apiKey)); + return new OpenLineageListener(url, Optional.ofNullable(namespace), Optional.ofNullable(apiKey), trinoMetadataFacetEnabled, trinoQueryContextFacetEnabled, trinoQueryStatisticsFacetEnabled); } }