diff --git a/README.md b/README.md
index 1fe0d4b..83c16b3 100644
--- a/README.md
+++ b/README.md
@@ -6,10 +6,19 @@ An experimental [OpenLineage](https://github.com/OpenLineage/OpenLineage) integr
## Requirements
-- Java 11
+- Java 21
- Maven
-- Trino 362
-- [Marquez](https://github.com/MarquezProject/marquez)
+- Trino 422
+
+
+## Mapping of properties
+
+| OpenLineage | Trino |
+|--------------------|-----------------------------------------------------|
+| JobFacet Name | QueryID |
+| JobFacet Namespace | Query Environment (or configured in event listener) |
+| RunFacet ID | QueryID |
+
## Installation
@@ -17,7 +26,7 @@ Build and copy trino-openlineage plugin:
```sh
mvn clean install -DskipTests
-unzip ./target/trino-openlineage-362.zip -d $TRINO_HOME/plugin
+unzip ./target/trino-openlineage-422.zip -d $TRINO_HOME/plugin
```
Add the following line to `$TRINO_HOME/etc/event-listener.properties`:
@@ -25,4 +34,38 @@ Add the following line to `$TRINO_HOME/etc/event-listener.properties`:
```properties
event-listener.name=openlineage
openlineage.url=http://localhost:5000
+#openlineage.facets.trinoMetadata.enabled=false
+#openlineage.facets.trinoQueryContext.enabled=true
+#openlineage.facets.trinoQueryStatistics.enabled=false
+#openlineage.namespace=default
+#openlineage.apikey=xxxx
+```
+
+## Local testing
+
+1. Build plugin:
+
+```shell
+mvn clean install -DskipTests
+```
+
+2. Run docker compose:
+
+```shell
+docker compose up -d
+```
+
+- Freshly built plugin will be automatically mounted to your trino pod.
+- Configuration of the plugin will be taken from `event-listener.properties` file - adjust it to your will and restart trino pod for changes to take effect.
+
+3. Run query creating new table:
+
+```shell
+docker exec -it oltrino trino --execute 'create table memory.default.test_table as select * from tpch.sf1.nation limit 1;'
+```
+
+4. Check logs of mock api:
+
+```shell
+docker logs olapi
```
diff --git a/docker-compose.yaml b/docker-compose.yaml
new file mode 100644
index 0000000..68be347
--- /dev/null
+++ b/docker-compose.yaml
@@ -0,0 +1,25 @@
+version: '3'
+
+services:
+ trino:
+ image: trinodb/trino
+ container_name: oltrino
+ networks:
+ - trino-ol
+ ports:
+ - 8080:8080
+ volumes:
+ - ${PWD}/event-listener.properties:/etc/trino/event-listener.properties
+ - ${PWD}/target/trino-openlineage-442:/usr/lib/trino/plugin/openlineage
+ olapi:
+ image: mockserver/mockserver
+ container_name: olapi
+ environment:
+ MOCKSERVER_SERVER_PORT: 5000
+ networks:
+ - trino-ol
+ ports:
+ - 5000:5000
+
+networks:
+ trino-ol:
diff --git a/event-listener.properties b/event-listener.properties
index aae83df..fdee234 100644
--- a/event-listener.properties
+++ b/event-listener.properties
@@ -1,3 +1,7 @@
event-listener.name=openlineage
-openlineage.url=http://localhost:5000
+openlineage.url=http://olapi:5000
+#openlineage.facets.trinoMetadata.enabled=false
+#openlineage.facets.trinoQueryContext.enabled=true
+#openlineage.facets.trinoQueryStatistics.enabled=false
+#openlineage.namespace=default
#openlineage.apikey=xxxx
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 215ff75..1131cca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,32 +1,25 @@
-
+
4.0.0
io.trino
trino-root
- 362
+ 442
trino-openlineage
+
trino-plugin
+ Trino OpenLineage
+
- 11
- 11
+ 21
+ 21
-
- io.airlift
- http-client
-
-
- io.airlift
- log
-
com.fasterxml.jackson.core
jackson-databind
@@ -39,14 +32,22 @@
com.google.guava
guava
+
+ io.airlift
+ http-client
+
+
+ io.airlift
+ log
+
io.openlineage
openlineage-java
- 0.2.2
+ 1.10.2
- io.trino
- trino-spi
+ com.fasterxml.jackson.core
+ jackson-annotations
provided
@@ -55,8 +56,18 @@
provided
- com.fasterxml.jackson.core
- jackson-annotations
+ io.opentelemetry
+ opentelemetry-api
+ provided
+
+
+ io.opentelemetry
+ opentelemetry-context
+ provided
+
+
+ io.trino
+ trino-spi
provided
@@ -66,4 +77,21 @@
-
\ No newline at end of file
+
+
+
+ com.github.ekryd.sortpom
+ sortpom-maven-plugin
+ 3.4.0
+
+
+
+ sort
+
+
+
+
+
+
+
+
diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageClient.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageClient.java
index 24f80ea..32ca863 100644
--- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageClient.java
+++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageClient.java
@@ -51,17 +51,20 @@ public void emit(OpenLineage.RunEvent event)
String json = objectMapper.writeValueAsString(event);
logger.info(json);
- Request.Builder requestBuilder = Request.builder()
- .setMethod("POST")
- .setUri(URI.create(url + "/api/v1/lineage"))
- .addHeader("Content-Type", "application/json")
- .setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(json.getBytes(StandardCharsets.UTF_8)));
+ Request.Builder requestBuilder =
+ Request.builder()
+ .setMethod("POST")
+ .setUri(URI.create(url + "/api/v1/lineage"))
+ .addHeader("Content-Type", "application/json")
+ .setBodyGenerator(
+ StaticBodyGenerator.createStaticBodyGenerator(
+ json.getBytes(StandardCharsets.UTF_8)));
- if (apiKey.isPresent()) {
- requestBuilder.addHeader("Authorization", "Bearer " + apiKey.get());
- }
+ apiKey.ifPresent(s -> requestBuilder.addHeader("Authorization", "Bearer " + s));
- StatusResponseHandler.StatusResponse status = jettyClient.execute(requestBuilder.build(), StatusResponseHandler.createStatusResponseHandler());
+ StatusResponseHandler.StatusResponse status =
+ jettyClient.execute(
+ requestBuilder.build(), StatusResponseHandler.createStatusResponseHandler());
logger.info("Response status: " + status.getStatusCode());
}
catch (Exception e) {
diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java
index 47ec518..6af0690 100644
--- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java
+++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListener.java
@@ -16,13 +16,20 @@
import com.google.common.collect.ImmutableList;
import io.openlineage.client.OpenLineage;
import io.trino.spi.eventlistener.EventListener;
+import io.trino.spi.eventlistener.OutputColumnMetadata;
import io.trino.spi.eventlistener.QueryCompletedEvent;
+import io.trino.spi.eventlistener.QueryContext;
import io.trino.spi.eventlistener.QueryCreatedEvent;
import io.trino.spi.eventlistener.QueryIOMetadata;
+import io.trino.spi.eventlistener.QueryMetadata;
import io.trino.spi.eventlistener.QueryOutputMetadata;
+import io.trino.spi.eventlistener.QueryStatistics;
+import java.lang.reflect.Field;
import java.net.URI;
+import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@@ -34,66 +41,172 @@ public class OpenLineageListener
{
private final OpenLineage ol = new OpenLineage(URI.create("https://github.com/takezoe/trino-openlineage"));
private final OpenLineageClient client;
+ private final Optional namespace;
+ private final Boolean trinoMetadataFacetEnabled;
+ private final Boolean trinoQueryContextFacetEnabled;
+ private final Boolean queryStatisticsFacetEnabled;
- public OpenLineageListener(String url, Optional apiKey)
+ public OpenLineageListener(String url, Optional namespace, Optional apiKey, Boolean trinoMetadataFacetEnabled, Boolean trinoQueryContextFacetEnabled, Boolean queryStatisticsFacetEnabled)
{
this.client = new OpenLineageClient(url, apiKey);
+ this.namespace = namespace;
+ this.trinoMetadataFacetEnabled = trinoMetadataFacetEnabled;
+ this.trinoQueryContextFacetEnabled = trinoQueryContextFacetEnabled;
+ this.queryStatisticsFacetEnabled = queryStatisticsFacetEnabled;
+ }
+
+ private UUID getQueryId(QueryMetadata queryMetadata)
+ {
+ return UUID.nameUUIDFromBytes(queryMetadata.getQueryId().getBytes(StandardCharsets.UTF_8));
}
@Override
public void queryCreated(QueryCreatedEvent queryCreatedEvent)
{
- // Do nothing here
+ UUID runID = getQueryId(queryCreatedEvent.getMetadata());
+
+ try {
+ sendStartEvent(runID, queryCreatedEvent);
+ }
+ catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
public void queryCompleted(QueryCompletedEvent queryCompletedEvent)
{
- UUID runID = UUID.randomUUID();
- sendStartEvent(runID, queryCompletedEvent);
- sendCompletedEvent(runID, queryCompletedEvent);
+ UUID runID = getQueryId(queryCompletedEvent.getMetadata());
+
+ try {
+ sendCompletedEvent(runID, queryCompletedEvent);
+ }
+ catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
}
- private void sendStartEvent(UUID runId, QueryCompletedEvent queryCompletedEvent)
+ private Optional getTrinoQueryContextFacet(QueryContext queryContext)
{
- OpenLineage.RunEvent startEvent = ol.newRunEventBuilder()
- .eventType("START")
- .eventTime(queryCompletedEvent.getExecutionStartTime().atZone(ZoneId.of("UTC")))
- .run(ol.newRunBuilder()
- .runId(runId)
- .build())
- .job(ol.newJobBuilder()
- .namespace(queryCompletedEvent.getContext().getUser())
- .name(queryCompletedEvent.getMetadata().getQueryId())
- .facets(ol.newJobFacetsBuilder()
- .sql(ol.newSQLJobFacet(queryCompletedEvent.getMetadata().getQuery()))
- .build())
- .build())
- .inputs(buildInputs(queryCompletedEvent.getIoMetadata()))
- .outputs(buildOutputs(queryCompletedEvent.getIoMetadata()))
- .build();
+ if (this.trinoQueryContextFacetEnabled) {
+ OpenLineage.RunFacet queryContextFacet = ol.newRunFacet();
+
+ queryContextFacet
+ .getAdditionalProperties()
+ .put("serverVersion", queryContext.getServerVersion());
+ queryContextFacet
+ .getAdditionalProperties()
+ .put("environment", queryContext.getEnvironment());
+ queryContext.getQueryType().ifPresent(queryType ->
+ queryContextFacet
+ .getAdditionalProperties()
+ .put("queryType", queryType));
+
+ return Optional.of(queryContextFacet);
+ }
+ return Optional.empty();
+ }
+
+ private Optional getTrinoMetadataFacet(QueryMetadata queryMetadata)
+ {
+ if (this.trinoMetadataFacetEnabled) {
+ OpenLineage.RunFacet trinoMetadataFacet = ol.newRunFacet();
+
+ queryMetadata.getPlan().ifPresent(
+ queryPlan -> trinoMetadataFacet
+ .getAdditionalProperties()
+ .put("queryPlan", queryPlan));
+
+ queryMetadata.getTransactionId().ifPresent(
+ transactionId -> trinoMetadataFacet
+ .getAdditionalProperties()
+ .put("transactionId", transactionId));
+
+ return Optional.of(trinoMetadataFacet);
+ }
+ return Optional.empty();
+ }
+
+ private Optional getTrinoQueryStatisticsFacet(QueryStatistics queryStatistics)
+ throws IllegalAccessException
+ {
+ if (this.queryStatisticsFacetEnabled) {
+ OpenLineage.RunFacet trinoQueryStatisticsFacet = ol.newRunFacet();
+
+ for (Field field : queryStatistics.getClass().getDeclaredFields()) {
+ field.setAccessible(true);
+ trinoQueryStatisticsFacet
+ .getAdditionalProperties()
+ .put(field.getName(), String.valueOf(field.get(queryStatistics)));
+ }
+ return Optional.of(trinoQueryStatisticsFacet);
+ }
+ return Optional.empty();
+ }
+
+ private void sendStartEvent(UUID runID, QueryCreatedEvent queryCreatedEvent)
+ throws IllegalAccessException
+ {
+ OpenLineage.RunFacetsBuilder runFacetsBuilder = ol.newRunFacetsBuilder();
+ Optional trinoMetadata = getTrinoMetadataFacet(queryCreatedEvent.getMetadata());
+ Optional trinoQueryContext = getTrinoQueryContextFacet(queryCreatedEvent.getContext());
+
+ trinoMetadata.ifPresent(runFacet -> runFacetsBuilder.put("trino.metadata", runFacet));
+ trinoQueryContext.ifPresent(runFacet -> runFacetsBuilder.put("trino.queryContext", runFacet));
+
+ OpenLineage.RunEvent startEvent =
+ ol.newRunEventBuilder()
+ .eventType(OpenLineage.RunEvent.EventType.START)
+ .eventTime(queryCreatedEvent.getCreateTime().atZone(ZoneId.of("UTC")))
+ .run(ol.newRunBuilder().runId(runID).facets(runFacetsBuilder.build()).build())
+ .job(
+ ol.newJobBuilder()
+ .namespace(getJobNamespace(queryCreatedEvent.getContext()))
+ .name(queryCreatedEvent.getMetadata().getQueryId())
+ .facets(
+ ol.newJobFacetsBuilder()
+ .sql(ol.newSQLJobFacet(queryCreatedEvent.getMetadata().getQuery()))
+ .build())
+ .build())
+ .build();
client.emit(startEvent);
}
private void sendCompletedEvent(UUID runID, QueryCompletedEvent queryCompletedEvent)
+ throws IllegalAccessException
{
boolean failed = queryCompletedEvent.getMetadata().getQueryState().equals("FAILED");
- OpenLineage.RunEvent completedEvent = ol.newRunEventBuilder()
- .eventType(failed ? "FAIL" : "COMPLETE")
- .eventTime(queryCompletedEvent.getEndTime().atZone(ZoneId.of("UTC")))
- .run(ol.newRunBuilder().runId(runID).build())
- .job(ol.newJobBuilder()
- .namespace(queryCompletedEvent.getContext().getUser())
- .name(queryCompletedEvent.getMetadata().getQueryId())
- .facets(ol.newJobFacetsBuilder()
- .sql(ol.newSQLJobFacet(queryCompletedEvent.getMetadata().getQuery()))
- .build())
- .build())
- .inputs(buildInputs(queryCompletedEvent.getIoMetadata()))
- .outputs(buildOutputs(queryCompletedEvent.getIoMetadata()))
- .build();
+ OpenLineage.RunFacetsBuilder runFacetsBuilder = ol.newRunFacetsBuilder();
+ Optional trinoMetadata = getTrinoMetadataFacet(queryCompletedEvent.getMetadata());
+ Optional trinoQueryStatistics = getTrinoQueryStatisticsFacet(queryCompletedEvent.getStatistics());
+ Optional trinoQueryContext = getTrinoQueryContextFacet(queryCompletedEvent.getContext());
+
+ trinoMetadata.ifPresent(runFacet -> runFacetsBuilder.put("trino.metadata", runFacet));
+ trinoQueryStatistics.ifPresent(runFacet -> runFacetsBuilder.put("trino.queryStatistics", runFacet));
+ trinoQueryContext.ifPresent(runFacet -> runFacetsBuilder.put("trino.queryContext", runFacet));
+
+ OpenLineage.RunEvent completedEvent =
+ ol.newRunEventBuilder()
+ .eventType(
+ failed
+ ? OpenLineage.RunEvent.EventType.FAIL
+ : OpenLineage.RunEvent.EventType.COMPLETE)
+ .eventTime(queryCompletedEvent.getEndTime().atZone(ZoneId.of("UTC")))
+ .run(ol.newRunBuilder().runId(runID).facets(runFacetsBuilder.build()).build())
+ .job(
+ ol.newJobBuilder()
+ .namespace(getJobNamespace(queryCompletedEvent.getContext()))
+ .name(queryCompletedEvent.getMetadata().getQueryId())
+ .facets(
+ ol.newJobFacetsBuilder()
+ .sql(ol.newSQLJobFacet(queryCompletedEvent.getMetadata().getQuery()))
+ .build())
+ .build())
+ .inputs(buildInputs(queryCompletedEvent.getIoMetadata()))
+ .outputs(buildOutputs(queryCompletedEvent.getIoMetadata()))
+ .build();
client.emit(completedEvent);
}
@@ -101,10 +214,22 @@ private void sendCompletedEvent(UUID runID, QueryCompletedEvent queryCompletedEv
private List buildInputs(QueryIOMetadata ioMetadata)
{
return ioMetadata.getInputs().stream().map(inputMetadata ->
- ol.newInputDatasetBuilder()
- .namespace(getDatasetNamespace(inputMetadata.getCatalogName()))
- .name(inputMetadata.getSchema() + "." + inputMetadata.getTable())
- .build()
+ ol.newInputDatasetBuilder()
+ .namespace(getDatasetNamespace(inputMetadata.getCatalogName()))
+ .name(inputMetadata.getSchema() + "." + inputMetadata.getTable())
+ .facets(ol.newDatasetFacetsBuilder()
+ .schema(ol.newSchemaDatasetFacetBuilder()
+ .fields(
+ inputMetadata
+ .getColumns()
+ .stream()
+ .map(field -> ol.newSchemaDatasetFacetFieldsBuilder()
+ .name(field)
+ .build()
+ ).toList())
+ .build()
+ ).build())
+ .build()
).collect(toImmutableList());
}
@@ -113,10 +238,41 @@ private List buildOutputs(QueryIOMetadata ioMetadata)
Optional outputs = ioMetadata.getOutput();
if (outputs.isPresent()) {
QueryOutputMetadata outputMetadata = outputs.get();
- return ImmutableList.of(ol.newOutputDatasetBuilder()
- .namespace(getDatasetNamespace(outputMetadata.getCatalogName()))
- .name(outputMetadata.getSchema() + "." + outputMetadata.getTable())
- .build());
+ List outputColumns = outputMetadata.getColumns().orElse(new ArrayList<>());
+
+ OpenLineage.ColumnLineageDatasetFacetBuilder columnLineageBuilder = ol.newColumnLineageDatasetFacetBuilder();
+
+ outputColumns.forEach(column ->
+ columnLineageBuilder.put(column.getColumnName(),
+ ol.newColumnLineageDatasetFacetFieldsAdditionalBuilder()
+ .inputFields(column
+ .getSourceColumns()
+ .stream()
+ .map(inputColumn -> ol.newColumnLineageDatasetFacetFieldsAdditionalInputFieldsBuilder()
+ .field(inputColumn.getColumnName())
+ .namespace(getDatasetNamespace(inputColumn.getCatalog()))
+ .name(inputColumn.getSchema() + "." + inputColumn.getTable())
+ .build())
+ .toList()
+ ).build()));
+
+ return ImmutableList.of(
+ ol.newOutputDatasetBuilder()
+ .namespace(getDatasetNamespace(outputMetadata.getCatalogName()))
+ .name(outputMetadata.getSchema() + "." + outputMetadata.getTable())
+ .facets(ol.newDatasetFacetsBuilder()
+ .columnLineage(columnLineageBuilder.build())
+ .schema(ol.newSchemaDatasetFacetBuilder()
+ .fields(
+ outputColumns.stream()
+ .map(column -> ol.newSchemaDatasetFacetFieldsBuilder()
+ .name(column.getColumnName())
+ .type(column.getColumnType())
+ .build())
+ .toList()
+ ).build()
+ ).build()
+ ).build());
}
else {
return ImmutableList.of();
@@ -133,4 +289,9 @@ private String getDatasetNamespace(String catalogName)
return catalogName;
}
}
+
+ private String getJobNamespace(QueryContext queryContext)
+ {
+ return "trino-" + this.namespace.orElse(queryContext.getEnvironment());
+ }
}
diff --git a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListenerFactory.java b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListenerFactory.java
index 9d37be1..3686285 100644
--- a/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListenerFactory.java
+++ b/src/main/java/io/github/takezoe/trino/openlineage/OpenLineageListenerFactory.java
@@ -38,9 +38,13 @@ public EventListener create(Map config)
{
String url = requireNonNull(config.get("openlineage.url"));
String apiKey = config.get("openlineage.apikey");
+ Boolean trinoMetadataFacetEnabled = Optional.ofNullable(config.get("openlineage.facets.trinoMetadata.enabled")).orElse("true").equalsIgnoreCase("true");
+ Boolean trinoQueryContextFacetEnabled = Optional.ofNullable(config.get("openlineage.facets.trinoQueryContext.enabled")).orElse("true").equalsIgnoreCase("true");
+ Boolean trinoQueryStatisticsFacetEnabled = Optional.ofNullable(config.get("openlineage.facets.trinoQueryStatistics.enabled")).orElse("true").equalsIgnoreCase("true");
+ String namespace = config.get("openlineage.namespace");
logger.info("openlineage.url: " + url);
- return new OpenLineageListener(url, Optional.ofNullable(apiKey));
+ return new OpenLineageListener(url, Optional.ofNullable(namespace), Optional.ofNullable(apiKey), trinoMetadataFacetEnabled, trinoQueryContextFacetEnabled, trinoQueryStatisticsFacetEnabled);
}
}