From 56eff15f2901382571670840e83fb38c8be32289 Mon Sep 17 00:00:00 2001 From: mgorsk1 Date: Wed, 20 Mar 2024 16:35:27 +0100 Subject: [PATCH 01/11] :tada: Init --- README.md | 6 +- docker-compose.yaml | 25 +++++++ event-listener.properties | 2 +- pom.xml | 68 +++++++++++++------ .../trino/openlineage/OpenLineageClient.java | 4 +- .../openlineage/OpenLineageListener.java | 4 +- 6 files changed, 80 insertions(+), 29 deletions(-) create mode 100644 docker-compose.yaml diff --git a/README.md b/README.md index 1fe0d4b..fbdf65b 100644 --- a/README.md +++ b/README.md @@ -6,9 +6,9 @@ An experimental [OpenLineage](https://github.com/OpenLineage/OpenLineage) integr ## Requirements -- Java 11 +- Java 21 - Maven -- Trino 362 +- Trino 422 - [Marquez](https://github.com/MarquezProject/marquez) ## Installation @@ -17,7 +17,7 @@ Build and copy trino-openlineage plugin: ```sh mvn clean install -DskipTests -unzip ./target/trino-openlineage-362.zip -d $TRINO_HOME/plugin +unzip ./target/trino-openlineage-422.zip -d $TRINO_HOME/plugin ``` Add the following line to `$TRINO_HOME/etc/event-listener.properties`: diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..68be347 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,25 @@ +version: '3' + +services: + trino: + image: trinodb/trino + container_name: oltrino + networks: + - trino-ol + ports: + - 8080:8080 + volumes: + - ${PWD}/event-listener.properties:/etc/trino/event-listener.properties + - ${PWD}/target/trino-openlineage-442:/usr/lib/trino/plugin/openlineage + olapi: + image: mockserver/mockserver + container_name: olapi + environment: + MOCKSERVER_SERVER_PORT: 5000 + networks: + - trino-ol + ports: + - 5000:5000 + +networks: + trino-ol: diff --git a/event-listener.properties b/event-listener.properties index aae83df..8b5770e 100644 --- a/event-listener.properties +++ b/event-listener.properties @@ -1,3 +1,3 @@ event-listener.name=openlineage -openlineage.url=http://localhost:5000 +openlineage.url=http://olapi:5000 #openlineage.apikey=xxxx \ No newline at end of file diff --git a/pom.xml b/pom.xml index 215ff75..1131cca 100644 --- a/pom.xml +++ b/pom.xml @@ -1,32 +1,25 @@ - + 4.0.0 io.trino trino-root - 362 + 442 trino-openlineage + trino-plugin + Trino OpenLineage + - 11 - 11 + 21 + 21 - - io.airlift - http-client - - - io.airlift - log - com.fasterxml.jackson.core jackson-databind @@ -39,14 +32,22 @@ com.google.guava guava + + io.airlift + http-client + + + io.airlift + log + io.openlineage openlineage-java - 0.2.2 + 1.10.2 - io.trino - trino-spi + com.fasterxml.jackson.core + jackson-annotations provided @@ -55,8 +56,18 @@ provided - com.fasterxml.jackson.core - jackson-annotations + io.opentelemetry + opentelemetry-api + provided + + + io.opentelemetry + opentelemetry-context + provided + + + io.trino + trino-spi provided @@ -66,4 +77,21 @@ - \ No newline at end of file + + + + com.github.ekryd.sortpom + sortpom-maven-plugin + 3.4.0 + + + + sort + + + + + + + + diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageClient.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageClient.java index 24f80ea..adaafb3 100644 --- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageClient.java +++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageClient.java @@ -57,9 +57,7 @@ public void emit(OpenLineage.RunEvent event) .addHeader("Content-Type", "application/json") .setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(json.getBytes(StandardCharsets.UTF_8))); - if (apiKey.isPresent()) { - requestBuilder.addHeader("Authorization", "Bearer " + apiKey.get()); - } + apiKey.ifPresent(s -> requestBuilder.addHeader("Authorization", "Bearer " + s)); StatusResponseHandler.StatusResponse status = jettyClient.execute(requestBuilder.build(), StatusResponseHandler.createStatusResponseHandler()); logger.info("Response status: " + status.getStatusCode()); diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java index 47ec518..d6115e5 100644 --- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java +++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java @@ -57,7 +57,7 @@ public void queryCompleted(QueryCompletedEvent queryCompletedEvent) private void sendStartEvent(UUID runId, QueryCompletedEvent queryCompletedEvent) { OpenLineage.RunEvent startEvent = ol.newRunEventBuilder() - .eventType("START") + .eventType(OpenLineage.RunEvent.EventType.START) .eventTime(queryCompletedEvent.getExecutionStartTime().atZone(ZoneId.of("UTC"))) .run(ol.newRunBuilder() .runId(runId) @@ -81,7 +81,7 @@ private void sendCompletedEvent(UUID runID, QueryCompletedEvent queryCompletedEv boolean failed = queryCompletedEvent.getMetadata().getQueryState().equals("FAILED"); OpenLineage.RunEvent completedEvent = ol.newRunEventBuilder() - .eventType(failed ? "FAIL" : "COMPLETE") + .eventType(failed ? OpenLineage.RunEvent.EventType.FAIL : OpenLineage.RunEvent.EventType.COMPLETE) .eventTime(queryCompletedEvent.getEndTime().atZone(ZoneId.of("UTC"))) .run(ol.newRunBuilder().runId(runID).build()) .job(ol.newJobBuilder() From 95d4c4c1d54c733391cf15a817561e8b4386daac Mon Sep 17 00:00:00 2001 From: mgorsk1 Date: Thu, 21 Mar 2024 11:26:00 +0100 Subject: [PATCH 02/11] :tada: Init --- event-listener.properties | 2 + .../trino/openlineage/OpenLineageClient.java | 17 +- .../openlineage/OpenLineageListener.java | 163 +++++++++++++----- .../OpenLineageListenerFactory.java | 4 +- 4 files changed, 137 insertions(+), 49 deletions(-) diff --git a/event-listener.properties b/event-listener.properties index 8b5770e..22ced6e 100644 --- a/event-listener.properties +++ b/event-listener.properties @@ -1,3 +1,5 @@ event-listener.name=openlineage openlineage.url=http://olapi:5000 +openlineage.facets.trinoMetadata.enabled=true +openlineage.facets.trinoQueryStatistics.enabled=false #openlineage.apikey=xxxx \ No newline at end of file diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageClient.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageClient.java index adaafb3..32ca863 100644 --- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageClient.java +++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageClient.java @@ -51,15 +51,20 @@ public void emit(OpenLineage.RunEvent event) String json = objectMapper.writeValueAsString(event); logger.info(json); - Request.Builder requestBuilder = Request.builder() - .setMethod("POST") - .setUri(URI.create(url + "/api/v1/lineage")) - .addHeader("Content-Type", "application/json") - .setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(json.getBytes(StandardCharsets.UTF_8))); + Request.Builder requestBuilder = + Request.builder() + .setMethod("POST") + .setUri(URI.create(url + "/api/v1/lineage")) + .addHeader("Content-Type", "application/json") + .setBodyGenerator( + StaticBodyGenerator.createStaticBodyGenerator( + json.getBytes(StandardCharsets.UTF_8))); apiKey.ifPresent(s -> requestBuilder.addHeader("Authorization", "Bearer " + s)); - StatusResponseHandler.StatusResponse status = jettyClient.execute(requestBuilder.build(), StatusResponseHandler.createStatusResponseHandler()); + StatusResponseHandler.StatusResponse status = + jettyClient.execute( + requestBuilder.build(), StatusResponseHandler.createStatusResponseHandler()); logger.info("Response status: " + status.getStatusCode()); } catch (Exception e) { diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java index d6115e5..6f23bd8 100644 --- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java +++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java @@ -19,8 +19,11 @@ import io.trino.spi.eventlistener.QueryCompletedEvent; import io.trino.spi.eventlistener.QueryCreatedEvent; import io.trino.spi.eventlistener.QueryIOMetadata; +import io.trino.spi.eventlistener.QueryMetadata; import io.trino.spi.eventlistener.QueryOutputMetadata; +import io.trino.spi.eventlistener.QueryStatistics; +import java.lang.reflect.Field; import java.net.URI; import java.time.ZoneId; import java.util.List; @@ -34,10 +37,14 @@ public class OpenLineageListener { private final OpenLineage ol = new OpenLineage(URI.create("https://github.com/takezoe/trino-openlineage")); private final OpenLineageClient client; + private final Boolean trinoMetadataFacetEnabled; + private final Boolean queryStatisticsFacetEnabled; - public OpenLineageListener(String url, Optional apiKey) + public OpenLineageListener(String url, Optional apiKey, Boolean trinoMetadataFacetEnabled, Boolean queryStatisticsFacetEnabled) { this.client = new OpenLineageClient(url, apiKey); + this.trinoMetadataFacetEnabled = trinoMetadataFacetEnabled; + this.queryStatisticsFacetEnabled = queryStatisticsFacetEnabled; } @Override @@ -50,50 +57,121 @@ public void queryCreated(QueryCreatedEvent queryCreatedEvent) public void queryCompleted(QueryCompletedEvent queryCompletedEvent) { UUID runID = UUID.randomUUID(); - sendStartEvent(runID, queryCompletedEvent); - sendCompletedEvent(runID, queryCompletedEvent); + try { + sendStartEvent(runID, queryCompletedEvent); + } + catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + try { + sendCompletedEvent(runID, queryCompletedEvent); + } + catch (IllegalAccessException e) { + throw new RuntimeException(e); + } } - private void sendStartEvent(UUID runId, QueryCompletedEvent queryCompletedEvent) + private void sendStartEvent(UUID runID, QueryCompletedEvent queryCompletedEvent) + throws IllegalAccessException { - OpenLineage.RunEvent startEvent = ol.newRunEventBuilder() - .eventType(OpenLineage.RunEvent.EventType.START) - .eventTime(queryCompletedEvent.getExecutionStartTime().atZone(ZoneId.of("UTC"))) - .run(ol.newRunBuilder() - .runId(runId) - .build()) - .job(ol.newJobBuilder() - .namespace(queryCompletedEvent.getContext().getUser()) - .name(queryCompletedEvent.getMetadata().getQueryId()) - .facets(ol.newJobFacetsBuilder() - .sql(ol.newSQLJobFacet(queryCompletedEvent.getMetadata().getQuery())) - .build()) - .build()) - .inputs(buildInputs(queryCompletedEvent.getIoMetadata())) - .outputs(buildOutputs(queryCompletedEvent.getIoMetadata())) - .build(); + OpenLineage.RunFacetsBuilder runFacetsBuilder = ol.newRunFacetsBuilder(); + + if (this.trinoMetadataFacetEnabled) { + OpenLineage.RunFacet trinoMetadataFacet = ol.newRunFacet(); + QueryMetadata trinoMetadata = queryCompletedEvent.getMetadata(); + + trinoMetadataFacet.getAdditionalProperties().put("queryPlan", String.valueOf(trinoMetadata.getPlan())); + trinoMetadataFacet.getAdditionalProperties().put("transactionId", String.valueOf(trinoMetadata.getTransactionId())); + + runFacetsBuilder.put("trino.metadata", trinoMetadataFacet); + } + + if (this.queryStatisticsFacetEnabled) { + OpenLineage.RunFacet trinoQueryStatisticsFacet = ol.newRunFacet(); + QueryStatistics trinoQueryStatistics = queryCompletedEvent.getStatistics(); + + for (Field field : trinoQueryStatistics.getClass().getDeclaredFields()) { + field.setAccessible(true); + trinoQueryStatisticsFacet + .getAdditionalProperties() + .put(field.getName(), String.valueOf(field.get(trinoQueryStatistics))); + } + + runFacetsBuilder.put("trino.queryStatistics", trinoQueryStatisticsFacet); + } + + OpenLineage.RunEvent startEvent = + ol.newRunEventBuilder() + .eventType(OpenLineage.RunEvent.EventType.START) + .eventTime(queryCompletedEvent.getExecutionStartTime().atZone(ZoneId.of("UTC"))) + .run(ol.newRunBuilder().runId(runID).facets(runFacetsBuilder.build()).build()) + .job( + ol.newJobBuilder() + .namespace(queryCompletedEvent.getContext().getUser()) + .name(queryCompletedEvent.getMetadata().getQueryId()) + .facets( + ol.newJobFacetsBuilder() + .sql(ol.newSQLJobFacet(queryCompletedEvent.getMetadata().getQuery())) + .build()) + .build()) + .inputs(buildInputs(queryCompletedEvent.getIoMetadata())) + .outputs(buildOutputs(queryCompletedEvent.getIoMetadata())) + .build(); client.emit(startEvent); } private void sendCompletedEvent(UUID runID, QueryCompletedEvent queryCompletedEvent) + throws IllegalAccessException { boolean failed = queryCompletedEvent.getMetadata().getQueryState().equals("FAILED"); - OpenLineage.RunEvent completedEvent = ol.newRunEventBuilder() - .eventType(failed ? OpenLineage.RunEvent.EventType.FAIL : OpenLineage.RunEvent.EventType.COMPLETE) - .eventTime(queryCompletedEvent.getEndTime().atZone(ZoneId.of("UTC"))) - .run(ol.newRunBuilder().runId(runID).build()) - .job(ol.newJobBuilder() - .namespace(queryCompletedEvent.getContext().getUser()) - .name(queryCompletedEvent.getMetadata().getQueryId()) - .facets(ol.newJobFacetsBuilder() - .sql(ol.newSQLJobFacet(queryCompletedEvent.getMetadata().getQuery())) - .build()) - .build()) - .inputs(buildInputs(queryCompletedEvent.getIoMetadata())) - .outputs(buildOutputs(queryCompletedEvent.getIoMetadata())) - .build(); + OpenLineage.RunFacetsBuilder runFacetsBuilder = ol.newRunFacetsBuilder(); + + if (this.trinoMetadataFacetEnabled) { + OpenLineage.RunFacet trinoMetadataFacet = ol.newRunFacet(); + QueryMetadata trinoMetadata = queryCompletedEvent.getMetadata(); + + trinoMetadataFacet.getAdditionalProperties().put("queryPlan", String.valueOf(trinoMetadata.getPlan())); + trinoMetadataFacet.getAdditionalProperties().put("transactionId", String.valueOf(trinoMetadata.getTransactionId())); + + runFacetsBuilder.put("trino.metadata", trinoMetadataFacet); + } + + if (this.queryStatisticsFacetEnabled) { + OpenLineage.RunFacet trinoQueryStatisticsFacet = ol.newRunFacet(); + QueryStatistics trinoQueryStatistics = queryCompletedEvent.getStatistics(); + + for (Field field : trinoQueryStatistics.getClass().getDeclaredFields()) { + field.setAccessible(true); + trinoQueryStatisticsFacet + .getAdditionalProperties() + .put(field.getName(), String.valueOf(field.get(trinoQueryStatistics))); + } + + runFacetsBuilder.put("trino.queryStatistics", trinoQueryStatisticsFacet); + } + + OpenLineage.RunEvent completedEvent = + ol.newRunEventBuilder() + .eventType( + failed + ? OpenLineage.RunEvent.EventType.FAIL + : OpenLineage.RunEvent.EventType.COMPLETE) + .eventTime(queryCompletedEvent.getEndTime().atZone(ZoneId.of("UTC"))) + .run(ol.newRunBuilder().runId(runID).facets(runFacetsBuilder.build()).build()) + .job( + ol.newJobBuilder() + .namespace(queryCompletedEvent.getContext().getUser()) + .name(queryCompletedEvent.getMetadata().getQueryId()) + .facets( + ol.newJobFacetsBuilder() + .sql(ol.newSQLJobFacet(queryCompletedEvent.getMetadata().getQuery())) + .build()) + .build()) + .inputs(buildInputs(queryCompletedEvent.getIoMetadata())) + .outputs(buildOutputs(queryCompletedEvent.getIoMetadata())) + .build(); client.emit(completedEvent); } @@ -101,10 +179,10 @@ private void sendCompletedEvent(UUID runID, QueryCompletedEvent queryCompletedEv private List buildInputs(QueryIOMetadata ioMetadata) { return ioMetadata.getInputs().stream().map(inputMetadata -> - ol.newInputDatasetBuilder() - .namespace(getDatasetNamespace(inputMetadata.getCatalogName())) - .name(inputMetadata.getSchema() + "." + inputMetadata.getTable()) - .build() + ol.newInputDatasetBuilder() + .namespace(getDatasetNamespace(inputMetadata.getCatalogName())) + .name(inputMetadata.getSchema() + "." + inputMetadata.getTable()) + .build() ).collect(toImmutableList()); } @@ -113,10 +191,11 @@ private List buildOutputs(QueryIOMetadata ioMetadata) Optional outputs = ioMetadata.getOutput(); if (outputs.isPresent()) { QueryOutputMetadata outputMetadata = outputs.get(); - return ImmutableList.of(ol.newOutputDatasetBuilder() - .namespace(getDatasetNamespace(outputMetadata.getCatalogName())) - .name(outputMetadata.getSchema() + "." + outputMetadata.getTable()) - .build()); + return ImmutableList.of( + ol.newOutputDatasetBuilder() + .namespace(getDatasetNamespace(outputMetadata.getCatalogName())) + .name(outputMetadata.getSchema() + "." + outputMetadata.getTable()) + .build()); } else { return ImmutableList.of(); diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListenerFactory.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListenerFactory.java index 9d37be1..5f82f9b 100644 --- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListenerFactory.java +++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListenerFactory.java @@ -38,9 +38,11 @@ public EventListener create(Map config) { String url = requireNonNull(config.get("openlineage.url")); String apiKey = config.get("openlineage.apikey"); + Boolean trinoMetadataFacetEnabled = config.get("openlineage.facets.trinoMetadata.enabled").equalsIgnoreCase("true"); + Boolean trinoQueryStatisticsFacetEnabled = config.get("openlineage.facets.trinoQueryStatistics.enabled").equalsIgnoreCase("true"); logger.info("openlineage.url: " + url); - return new OpenLineageListener(url, Optional.ofNullable(apiKey)); + return new OpenLineageListener(url, Optional.ofNullable(apiKey), trinoMetadataFacetEnabled, trinoQueryStatisticsFacetEnabled); } } From be5a5690c00e4889b31a3645f623d6ba84f45c1d Mon Sep 17 00:00:00 2001 From: mgorsk1 Date: Thu, 21 Mar 2024 12:25:58 +0100 Subject: [PATCH 03/11] refactor --- event-listener.properties | 2 +- .../openlineage/OpenLineageListener.java | 67 +++++++++---------- 2 files changed, 31 insertions(+), 38 deletions(-) diff --git a/event-listener.properties b/event-listener.properties index 22ced6e..38a568b 100644 --- a/event-listener.properties +++ b/event-listener.properties @@ -1,5 +1,5 @@ event-listener.name=openlineage openlineage.url=http://olapi:5000 openlineage.facets.trinoMetadata.enabled=true -openlineage.facets.trinoQueryStatistics.enabled=false +openlineage.facets.trinoQueryStatistics.enabled=true #openlineage.apikey=xxxx \ No newline at end of file diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java index 6f23bd8..eb7ac37 100644 --- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java +++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java @@ -71,34 +71,46 @@ public void queryCompleted(QueryCompletedEvent queryCompletedEvent) } } - private void sendStartEvent(UUID runID, QueryCompletedEvent queryCompletedEvent) - throws IllegalAccessException + private Optional getTrinoMetadataFacet(QueryMetadata queryMetadata) { - OpenLineage.RunFacetsBuilder runFacetsBuilder = ol.newRunFacetsBuilder(); - if (this.trinoMetadataFacetEnabled) { OpenLineage.RunFacet trinoMetadataFacet = ol.newRunFacet(); - QueryMetadata trinoMetadata = queryCompletedEvent.getMetadata(); - - trinoMetadataFacet.getAdditionalProperties().put("queryPlan", String.valueOf(trinoMetadata.getPlan())); - trinoMetadataFacet.getAdditionalProperties().put("transactionId", String.valueOf(trinoMetadata.getTransactionId())); - runFacetsBuilder.put("trino.metadata", trinoMetadataFacet); + if (queryMetadata.getPlan().isPresent()) { + trinoMetadataFacet.getAdditionalProperties().put("queryPlan", queryMetadata.getPlan().orElse("")); + } + if (queryMetadata.getTransactionId().isPresent()) { + trinoMetadataFacet.getAdditionalProperties().put("transactionId", queryMetadata.getTransactionId().orElse("")); + } + return Optional.of(trinoMetadataFacet); } + return Optional.empty(); + } + private Optional getTrinoQueryStatisticsFacet(QueryStatistics queryStatistics) + throws IllegalAccessException + { if (this.queryStatisticsFacetEnabled) { OpenLineage.RunFacet trinoQueryStatisticsFacet = ol.newRunFacet(); - QueryStatistics trinoQueryStatistics = queryCompletedEvent.getStatistics(); - for (Field field : trinoQueryStatistics.getClass().getDeclaredFields()) { + for (Field field : queryStatistics.getClass().getDeclaredFields()) { field.setAccessible(true); trinoQueryStatisticsFacet .getAdditionalProperties() - .put(field.getName(), String.valueOf(field.get(trinoQueryStatistics))); + .put(field.getName(), String.valueOf(field.get(queryStatistics))); } - - runFacetsBuilder.put("trino.queryStatistics", trinoQueryStatisticsFacet); + return Optional.of(trinoQueryStatisticsFacet); } + return Optional.empty(); + } + + private void sendStartEvent(UUID runID, QueryCompletedEvent queryCompletedEvent) + throws IllegalAccessException + { + OpenLineage.RunFacetsBuilder runFacetsBuilder = ol.newRunFacetsBuilder(); + Optional trinoMetadata = getTrinoMetadataFacet(queryCompletedEvent.getMetadata()); + + trinoMetadata.ifPresent(runFacet -> runFacetsBuilder.put("trino.metadata", runFacet)); OpenLineage.RunEvent startEvent = ol.newRunEventBuilder() @@ -127,30 +139,11 @@ private void sendCompletedEvent(UUID runID, QueryCompletedEvent queryCompletedEv boolean failed = queryCompletedEvent.getMetadata().getQueryState().equals("FAILED"); OpenLineage.RunFacetsBuilder runFacetsBuilder = ol.newRunFacetsBuilder(); + Optional trinoMetadata = getTrinoMetadataFacet(queryCompletedEvent.getMetadata()); + Optional trinoQueryStatistics = getTrinoQueryStatisticsFacet(queryCompletedEvent.getStatistics()); - if (this.trinoMetadataFacetEnabled) { - OpenLineage.RunFacet trinoMetadataFacet = ol.newRunFacet(); - QueryMetadata trinoMetadata = queryCompletedEvent.getMetadata(); - - trinoMetadataFacet.getAdditionalProperties().put("queryPlan", String.valueOf(trinoMetadata.getPlan())); - trinoMetadataFacet.getAdditionalProperties().put("transactionId", String.valueOf(trinoMetadata.getTransactionId())); - - runFacetsBuilder.put("trino.metadata", trinoMetadataFacet); - } - - if (this.queryStatisticsFacetEnabled) { - OpenLineage.RunFacet trinoQueryStatisticsFacet = ol.newRunFacet(); - QueryStatistics trinoQueryStatistics = queryCompletedEvent.getStatistics(); - - for (Field field : trinoQueryStatistics.getClass().getDeclaredFields()) { - field.setAccessible(true); - trinoQueryStatisticsFacet - .getAdditionalProperties() - .put(field.getName(), String.valueOf(field.get(trinoQueryStatistics))); - } - - runFacetsBuilder.put("trino.queryStatistics", trinoQueryStatisticsFacet); - } + trinoMetadata.ifPresent(runFacet -> runFacetsBuilder.put("trino.metadata", runFacet)); + trinoQueryStatistics.ifPresent(runFacet -> runFacetsBuilder.put("trino.queryStatistics", runFacet)); OpenLineage.RunEvent completedEvent = ol.newRunEventBuilder() From ec8606baa2a9dfa40fae1d9797fa71ea265b0df4 Mon Sep 17 00:00:00 2001 From: mgorsk1 Date: Fri, 22 Mar 2024 12:22:43 +0100 Subject: [PATCH 04/11] :tada: refactor --- .../openlineage/OpenLineageListener.java | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java index eb7ac37..dbddc6b 100644 --- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java +++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java @@ -25,6 +25,7 @@ import java.lang.reflect.Field; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.time.ZoneId; import java.util.List; import java.util.Optional; @@ -47,22 +48,29 @@ public OpenLineageListener(String url, Optional apiKey, Boolean trinoMet this.queryStatisticsFacetEnabled = queryStatisticsFacetEnabled; } - @Override - public void queryCreated(QueryCreatedEvent queryCreatedEvent) + private UUID getQueryId(QueryMetadata queryMetadata) { - // Do nothing here + return UUID.nameUUIDFromBytes(queryMetadata.getQueryId().getBytes(StandardCharsets.UTF_8)); } @Override - public void queryCompleted(QueryCompletedEvent queryCompletedEvent) + public void queryCreated(QueryCreatedEvent queryCreatedEvent) { - UUID runID = UUID.randomUUID(); + UUID runID = getQueryId(queryCreatedEvent.getMetadata()); + try { - sendStartEvent(runID, queryCompletedEvent); + sendStartEvent(runID, queryCreatedEvent); } catch (IllegalAccessException e) { throw new RuntimeException(e); } + } + + @Override + public void queryCompleted(QueryCompletedEvent queryCompletedEvent) + { + UUID runID = getQueryId(queryCompletedEvent.getMetadata()); + try { sendCompletedEvent(runID, queryCompletedEvent); } @@ -104,30 +112,28 @@ private Optional getTrinoQueryStatisticsFacet(QueryStatist return Optional.empty(); } - private void sendStartEvent(UUID runID, QueryCompletedEvent queryCompletedEvent) + private void sendStartEvent(UUID runID, QueryCreatedEvent queryCreatedEvent) throws IllegalAccessException { OpenLineage.RunFacetsBuilder runFacetsBuilder = ol.newRunFacetsBuilder(); - Optional trinoMetadata = getTrinoMetadataFacet(queryCompletedEvent.getMetadata()); + Optional trinoMetadata = getTrinoMetadataFacet(queryCreatedEvent.getMetadata()); trinoMetadata.ifPresent(runFacet -> runFacetsBuilder.put("trino.metadata", runFacet)); OpenLineage.RunEvent startEvent = ol.newRunEventBuilder() .eventType(OpenLineage.RunEvent.EventType.START) - .eventTime(queryCompletedEvent.getExecutionStartTime().atZone(ZoneId.of("UTC"))) + .eventTime(queryCreatedEvent.getCreateTime().atZone(ZoneId.of("UTC"))) .run(ol.newRunBuilder().runId(runID).facets(runFacetsBuilder.build()).build()) .job( ol.newJobBuilder() - .namespace(queryCompletedEvent.getContext().getUser()) - .name(queryCompletedEvent.getMetadata().getQueryId()) + .namespace(queryCreatedEvent.getContext().getUser()) + .name(queryCreatedEvent.getMetadata().getQueryId()) .facets( ol.newJobFacetsBuilder() - .sql(ol.newSQLJobFacet(queryCompletedEvent.getMetadata().getQuery())) + .sql(ol.newSQLJobFacet(queryCreatedEvent.getMetadata().getQuery())) .build()) .build()) - .inputs(buildInputs(queryCompletedEvent.getIoMetadata())) - .outputs(buildOutputs(queryCompletedEvent.getIoMetadata())) .build(); client.emit(startEvent); From bd473f1e9c37ae54c88d3158037cde6b18ff7609 Mon Sep 17 00:00:00 2001 From: mgorsk1 Date: Fri, 22 Mar 2024 15:47:56 +0100 Subject: [PATCH 05/11] :tada: Init --- .../openlineage/OpenLineageListener.java | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java index dbddc6b..e4c4829 100644 --- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java +++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java @@ -27,6 +27,7 @@ import java.net.URI; import java.nio.charset.StandardCharsets; import java.time.ZoneId; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -181,6 +182,18 @@ private List buildInputs(QueryIOMetadata ioMetadata) ol.newInputDatasetBuilder() .namespace(getDatasetNamespace(inputMetadata.getCatalogName())) .name(inputMetadata.getSchema() + "." + inputMetadata.getTable()) + .facets(ol.newDatasetFacetsBuilder() + .schema(ol.newSchemaDatasetFacetBuilder() + .fields( + inputMetadata + .getColumns() + .stream() + .map(field -> ol.newSchemaDatasetFacetFieldsBuilder() + .name(field) + .build() + ).toList()) + .build() + ).build()) .build() ).collect(toImmutableList()); } @@ -194,7 +207,21 @@ private List buildOutputs(QueryIOMetadata ioMetadata) ol.newOutputDatasetBuilder() .namespace(getDatasetNamespace(outputMetadata.getCatalogName())) .name(outputMetadata.getSchema() + "." + outputMetadata.getTable()) - .build()); + .facets(ol.newDatasetFacetsBuilder() + .schema(ol.newSchemaDatasetFacetBuilder() + .fields( + outputMetadata + .getColumns() + .orElse(new ArrayList<>()) + .stream() + .map(column -> ol.newSchemaDatasetFacetFieldsBuilder() + .name(column.getColumnName()) + .type(column.getColumnType()) + .build()) + .toList() + ).build() + ).build() + ).build()); } else { return ImmutableList.of(); From 8a7b4c51e48c43cf6ac9b674273595f3d576c67e Mon Sep 17 00:00:00 2001 From: mgorsk1 Date: Fri, 22 Mar 2024 16:54:18 +0100 Subject: [PATCH 06/11] :tada: add column lineage --- .../openlineage/OpenLineageListener.java | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java index e4c4829..e172e19 100644 --- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java +++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import io.openlineage.client.OpenLineage; import io.trino.spi.eventlistener.EventListener; +import io.trino.spi.eventlistener.OutputColumnMetadata; import io.trino.spi.eventlistener.QueryCompletedEvent; import io.trino.spi.eventlistener.QueryCreatedEvent; import io.trino.spi.eventlistener.QueryIOMetadata; @@ -203,17 +204,33 @@ private List buildOutputs(QueryIOMetadata ioMetadata) Optional outputs = ioMetadata.getOutput(); if (outputs.isPresent()) { QueryOutputMetadata outputMetadata = outputs.get(); + List outputColumns = outputMetadata.getColumns().orElse(new ArrayList<>()); + + OpenLineage.ColumnLineageDatasetFacetBuilder columnLineageBuilder = ol.newColumnLineageDatasetFacetBuilder(); + + outputColumns.forEach(column -> + columnLineageBuilder.put(column.getColumnName(), + ol.newColumnLineageDatasetFacetFieldsAdditionalBuilder() + .inputFields(column + .getSourceColumns() + .stream() + .map(inputColumn -> ol.newColumnLineageDatasetFacetFieldsAdditionalInputFieldsBuilder() + .field(inputColumn.getColumnName()) + .namespace(inputColumn.getCatalog()) + .name(inputColumn.getSchema() + "." + inputColumn.getTable()) + .build()) + .toList() + ).build())); + return ImmutableList.of( ol.newOutputDatasetBuilder() .namespace(getDatasetNamespace(outputMetadata.getCatalogName())) .name(outputMetadata.getSchema() + "." + outputMetadata.getTable()) .facets(ol.newDatasetFacetsBuilder() + .columnLineage(columnLineageBuilder.build()) .schema(ol.newSchemaDatasetFacetBuilder() .fields( - outputMetadata - .getColumns() - .orElse(new ArrayList<>()) - .stream() + outputColumns.stream() .map(column -> ol.newSchemaDatasetFacetFieldsBuilder() .name(column.getColumnName()) .type(column.getColumnType()) From 587eebd340ac9783455c062e2159482d971b2c23 Mon Sep 17 00:00:00 2001 From: mgorsk1 Date: Fri, 22 Mar 2024 16:56:27 +0100 Subject: [PATCH 07/11] indent --- .../takezoe/trino/openlineage/OpenLineageListener.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java index e172e19..71146bf 100644 --- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java +++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java @@ -232,9 +232,9 @@ private List buildOutputs(QueryIOMetadata ioMetadata) .fields( outputColumns.stream() .map(column -> ol.newSchemaDatasetFacetFieldsBuilder() - .name(column.getColumnName()) - .type(column.getColumnType()) - .build()) + .name(column.getColumnName()) + .type(column.getColumnType()) + .build()) .toList() ).build() ).build() From 1e4c7b89914148bda6e3f5707f0ffa7a1135a0b4 Mon Sep 17 00:00:00 2001 From: mgorsk1 Date: Fri, 22 Mar 2024 17:10:23 +0100 Subject: [PATCH 08/11] refactor job/dataset namespaces --- .../trino/openlineage/OpenLineageListener.java | 15 +++++++++++---- .../openlineage/OpenLineageListenerFactory.java | 3 ++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java index 71146bf..3be45ba 100644 --- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java +++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java @@ -40,12 +40,14 @@ public class OpenLineageListener { private final OpenLineage ol = new OpenLineage(URI.create("https://github.com/takezoe/trino-openlineage")); private final OpenLineageClient client; + private final String namespace; private final Boolean trinoMetadataFacetEnabled; private final Boolean queryStatisticsFacetEnabled; - public OpenLineageListener(String url, Optional apiKey, Boolean trinoMetadataFacetEnabled, Boolean queryStatisticsFacetEnabled) + public OpenLineageListener(String url, Optional namespace, Optional apiKey, Boolean trinoMetadataFacetEnabled, Boolean queryStatisticsFacetEnabled) { this.client = new OpenLineageClient(url, apiKey); + this.namespace = namespace.orElse("default"); this.trinoMetadataFacetEnabled = trinoMetadataFacetEnabled; this.queryStatisticsFacetEnabled = queryStatisticsFacetEnabled; } @@ -129,7 +131,7 @@ private void sendStartEvent(UUID runID, QueryCreatedEvent queryCreatedEvent) .run(ol.newRunBuilder().runId(runID).facets(runFacetsBuilder.build()).build()) .job( ol.newJobBuilder() - .namespace(queryCreatedEvent.getContext().getUser()) + .namespace(getJobNamespace()) .name(queryCreatedEvent.getMetadata().getQueryId()) .facets( ol.newJobFacetsBuilder() @@ -163,7 +165,7 @@ private void sendCompletedEvent(UUID runID, QueryCompletedEvent queryCompletedEv .run(ol.newRunBuilder().runId(runID).facets(runFacetsBuilder.build()).build()) .job( ol.newJobBuilder() - .namespace(queryCompletedEvent.getContext().getUser()) + .namespace(getJobNamespace()) .name(queryCompletedEvent.getMetadata().getQueryId()) .facets( ol.newJobFacetsBuilder() @@ -216,7 +218,7 @@ private List buildOutputs(QueryIOMetadata ioMetadata) .stream() .map(inputColumn -> ol.newColumnLineageDatasetFacetFieldsAdditionalInputFieldsBuilder() .field(inputColumn.getColumnName()) - .namespace(inputColumn.getCatalog()) + .namespace(getDatasetNamespace(inputColumn.getCatalog())) .name(inputColumn.getSchema() + "." + inputColumn.getTable()) .build()) .toList() @@ -255,4 +257,9 @@ private String getDatasetNamespace(String catalogName) return catalogName; } } + + private String getJobNamespace() + { + return this.namespace; + } } diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListenerFactory.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListenerFactory.java index 5f82f9b..8452a1f 100644 --- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListenerFactory.java +++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListenerFactory.java @@ -40,9 +40,10 @@ public EventListener create(Map config) String apiKey = config.get("openlineage.apikey"); Boolean trinoMetadataFacetEnabled = config.get("openlineage.facets.trinoMetadata.enabled").equalsIgnoreCase("true"); Boolean trinoQueryStatisticsFacetEnabled = config.get("openlineage.facets.trinoQueryStatistics.enabled").equalsIgnoreCase("true"); + String namespace = config.get("openlineage.namespace"); logger.info("openlineage.url: " + url); - return new OpenLineageListener(url, Optional.ofNullable(apiKey), trinoMetadataFacetEnabled, trinoQueryStatisticsFacetEnabled); + return new OpenLineageListener(url, Optional.ofNullable(namespace), Optional.ofNullable(apiKey), trinoMetadataFacetEnabled, trinoQueryStatisticsFacetEnabled); } } From 550107a4706a5f6d264730ee701911356bf21ad4 Mon Sep 17 00:00:00 2001 From: mgorsk1 Date: Fri, 22 Mar 2024 18:06:30 +0100 Subject: [PATCH 09/11] improve job namespace --- .../github/takezoe/trino/openlineage/OpenLineageListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java index 3be45ba..9c19eec 100644 --- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java +++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java @@ -260,6 +260,6 @@ private String getDatasetNamespace(String catalogName) private String getJobNamespace() { - return this.namespace; + return "trino-" + this.namespace; } } From aeee62d5988656bda38ab9003fec9fe8b3d7f2a2 Mon Sep 17 00:00:00 2001 From: mgorsk1 Date: Fri, 22 Mar 2024 18:33:48 +0100 Subject: [PATCH 10/11] :tada: Init --- event-listener.properties | 6 +- .../openlineage/OpenLineageListener.java | 58 ++++++++++++++----- .../OpenLineageListenerFactory.java | 7 ++- 3 files changed, 53 insertions(+), 18 deletions(-) diff --git a/event-listener.properties b/event-listener.properties index 38a568b..fdee234 100644 --- a/event-listener.properties +++ b/event-listener.properties @@ -1,5 +1,7 @@ event-listener.name=openlineage openlineage.url=http://olapi:5000 -openlineage.facets.trinoMetadata.enabled=true -openlineage.facets.trinoQueryStatistics.enabled=true +#openlineage.facets.trinoMetadata.enabled=false +#openlineage.facets.trinoQueryContext.enabled=true +#openlineage.facets.trinoQueryStatistics.enabled=false +#openlineage.namespace=default #openlineage.apikey=xxxx \ No newline at end of file diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java index 9c19eec..6af0690 100644 --- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java +++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java @@ -18,6 +18,7 @@ import io.trino.spi.eventlistener.EventListener; import io.trino.spi.eventlistener.OutputColumnMetadata; import io.trino.spi.eventlistener.QueryCompletedEvent; +import io.trino.spi.eventlistener.QueryContext; import io.trino.spi.eventlistener.QueryCreatedEvent; import io.trino.spi.eventlistener.QueryIOMetadata; import io.trino.spi.eventlistener.QueryMetadata; @@ -40,15 +41,17 @@ public class OpenLineageListener { private final OpenLineage ol = new OpenLineage(URI.create("https://github.com/takezoe/trino-openlineage")); private final OpenLineageClient client; - private final String namespace; + private final Optional namespace; private final Boolean trinoMetadataFacetEnabled; + private final Boolean trinoQueryContextFacetEnabled; private final Boolean queryStatisticsFacetEnabled; - public OpenLineageListener(String url, Optional namespace, Optional apiKey, Boolean trinoMetadataFacetEnabled, Boolean queryStatisticsFacetEnabled) + public OpenLineageListener(String url, Optional namespace, Optional apiKey, Boolean trinoMetadataFacetEnabled, Boolean trinoQueryContextFacetEnabled, Boolean queryStatisticsFacetEnabled) { this.client = new OpenLineageClient(url, apiKey); - this.namespace = namespace.orElse("default"); + this.namespace = namespace; this.trinoMetadataFacetEnabled = trinoMetadataFacetEnabled; + this.trinoQueryContextFacetEnabled = trinoQueryContextFacetEnabled; this.queryStatisticsFacetEnabled = queryStatisticsFacetEnabled; } @@ -83,17 +86,42 @@ public void queryCompleted(QueryCompletedEvent queryCompletedEvent) } } + private Optional getTrinoQueryContextFacet(QueryContext queryContext) + { + if (this.trinoQueryContextFacetEnabled) { + OpenLineage.RunFacet queryContextFacet = ol.newRunFacet(); + + queryContextFacet + .getAdditionalProperties() + .put("serverVersion", queryContext.getServerVersion()); + queryContextFacet + .getAdditionalProperties() + .put("environment", queryContext.getEnvironment()); + queryContext.getQueryType().ifPresent(queryType -> + queryContextFacet + .getAdditionalProperties() + .put("queryType", queryType)); + + return Optional.of(queryContextFacet); + } + return Optional.empty(); + } + private Optional getTrinoMetadataFacet(QueryMetadata queryMetadata) { if (this.trinoMetadataFacetEnabled) { OpenLineage.RunFacet trinoMetadataFacet = ol.newRunFacet(); - if (queryMetadata.getPlan().isPresent()) { - trinoMetadataFacet.getAdditionalProperties().put("queryPlan", queryMetadata.getPlan().orElse("")); - } - if (queryMetadata.getTransactionId().isPresent()) { - trinoMetadataFacet.getAdditionalProperties().put("transactionId", queryMetadata.getTransactionId().orElse("")); - } + queryMetadata.getPlan().ifPresent( + queryPlan -> trinoMetadataFacet + .getAdditionalProperties() + .put("queryPlan", queryPlan)); + + queryMetadata.getTransactionId().ifPresent( + transactionId -> trinoMetadataFacet + .getAdditionalProperties() + .put("transactionId", transactionId)); + return Optional.of(trinoMetadataFacet); } return Optional.empty(); @@ -121,8 +149,10 @@ private void sendStartEvent(UUID runID, QueryCreatedEvent queryCreatedEvent) { OpenLineage.RunFacetsBuilder runFacetsBuilder = ol.newRunFacetsBuilder(); Optional trinoMetadata = getTrinoMetadataFacet(queryCreatedEvent.getMetadata()); + Optional trinoQueryContext = getTrinoQueryContextFacet(queryCreatedEvent.getContext()); trinoMetadata.ifPresent(runFacet -> runFacetsBuilder.put("trino.metadata", runFacet)); + trinoQueryContext.ifPresent(runFacet -> runFacetsBuilder.put("trino.queryContext", runFacet)); OpenLineage.RunEvent startEvent = ol.newRunEventBuilder() @@ -131,7 +161,7 @@ private void sendStartEvent(UUID runID, QueryCreatedEvent queryCreatedEvent) .run(ol.newRunBuilder().runId(runID).facets(runFacetsBuilder.build()).build()) .job( ol.newJobBuilder() - .namespace(getJobNamespace()) + .namespace(getJobNamespace(queryCreatedEvent.getContext())) .name(queryCreatedEvent.getMetadata().getQueryId()) .facets( ol.newJobFacetsBuilder() @@ -151,9 +181,11 @@ private void sendCompletedEvent(UUID runID, QueryCompletedEvent queryCompletedEv OpenLineage.RunFacetsBuilder runFacetsBuilder = ol.newRunFacetsBuilder(); Optional trinoMetadata = getTrinoMetadataFacet(queryCompletedEvent.getMetadata()); Optional trinoQueryStatistics = getTrinoQueryStatisticsFacet(queryCompletedEvent.getStatistics()); + Optional trinoQueryContext = getTrinoQueryContextFacet(queryCompletedEvent.getContext()); trinoMetadata.ifPresent(runFacet -> runFacetsBuilder.put("trino.metadata", runFacet)); trinoQueryStatistics.ifPresent(runFacet -> runFacetsBuilder.put("trino.queryStatistics", runFacet)); + trinoQueryContext.ifPresent(runFacet -> runFacetsBuilder.put("trino.queryContext", runFacet)); OpenLineage.RunEvent completedEvent = ol.newRunEventBuilder() @@ -165,7 +197,7 @@ private void sendCompletedEvent(UUID runID, QueryCompletedEvent queryCompletedEv .run(ol.newRunBuilder().runId(runID).facets(runFacetsBuilder.build()).build()) .job( ol.newJobBuilder() - .namespace(getJobNamespace()) + .namespace(getJobNamespace(queryCompletedEvent.getContext())) .name(queryCompletedEvent.getMetadata().getQueryId()) .facets( ol.newJobFacetsBuilder() @@ -258,8 +290,8 @@ private String getDatasetNamespace(String catalogName) } } - private String getJobNamespace() + private String getJobNamespace(QueryContext queryContext) { - return "trino-" + this.namespace; + return "trino-" + this.namespace.orElse(queryContext.getEnvironment()); } } diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListenerFactory.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListenerFactory.java index 8452a1f..3686285 100644 --- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListenerFactory.java +++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListenerFactory.java @@ -38,12 +38,13 @@ public EventListener create(Map config) { String url = requireNonNull(config.get("openlineage.url")); String apiKey = config.get("openlineage.apikey"); - Boolean trinoMetadataFacetEnabled = config.get("openlineage.facets.trinoMetadata.enabled").equalsIgnoreCase("true"); - Boolean trinoQueryStatisticsFacetEnabled = config.get("openlineage.facets.trinoQueryStatistics.enabled").equalsIgnoreCase("true"); + Boolean trinoMetadataFacetEnabled = Optional.ofNullable(config.get("openlineage.facets.trinoMetadata.enabled")).orElse("true").equalsIgnoreCase("true"); + Boolean trinoQueryContextFacetEnabled = Optional.ofNullable(config.get("openlineage.facets.trinoQueryContext.enabled")).orElse("true").equalsIgnoreCase("true"); + Boolean trinoQueryStatisticsFacetEnabled = Optional.ofNullable(config.get("openlineage.facets.trinoQueryStatistics.enabled")).orElse("true").equalsIgnoreCase("true"); String namespace = config.get("openlineage.namespace"); logger.info("openlineage.url: " + url); - return new OpenLineageListener(url, Optional.ofNullable(namespace), Optional.ofNullable(apiKey), trinoMetadataFacetEnabled, trinoQueryStatisticsFacetEnabled); + return new OpenLineageListener(url, Optional.ofNullable(namespace), Optional.ofNullable(apiKey), trinoMetadataFacetEnabled, trinoQueryContextFacetEnabled, trinoQueryStatisticsFacetEnabled); } } From 03f2b8cd3756610d5aab32098750c7b42aeeec72 Mon Sep 17 00:00:00 2001 From: mgorsk1 Date: Sat, 23 Mar 2024 09:28:21 +0100 Subject: [PATCH 11/11] :tada: Init --- README.md | 45 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index fbdf65b..83c16b3 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,16 @@ An experimental [OpenLineage](https://github.com/OpenLineage/OpenLineage) integr - Java 21 - Maven - Trino 422 -- [Marquez](https://github.com/MarquezProject/marquez) + + +## Mapping of properties + +| OpenLineage | Trino | +|--------------------|-----------------------------------------------------| +| JobFacet Name | QueryID | +| JobFacet Namespace | Query Environment (or configured in event listener) | +| RunFacet ID | QueryID | + ## Installation @@ -25,4 +34,38 @@ Add the following line to `$TRINO_HOME/etc/event-listener.properties`: ```properties event-listener.name=openlineage openlineage.url=http://localhost:5000 +#openlineage.facets.trinoMetadata.enabled=false +#openlineage.facets.trinoQueryContext.enabled=true +#openlineage.facets.trinoQueryStatistics.enabled=false +#openlineage.namespace=default +#openlineage.apikey=xxxx +``` + +## Local testing + +1. Build plugin: + +```shell +mvn clean install -DskipTests +``` + +2. Run docker compose: + +```shell +docker compose up -d +``` + +- Freshly built plugin will be automatically mounted to your trino pod. +- Configuration of the plugin will be taken from `event-listener.properties` file - adjust it to your will and restart trino pod for changes to take effect. + +3. Run query creating new table: + +```shell +docker exec -it oltrino trino --execute 'create table memory.default.test_table as select * from tpch.sf1.nation limit 1;' +``` + +4. Check logs of mock api: + +```shell +docker logs olapi ```