Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

revive this project #1

Merged
merged 16 commits into from Mar 23, 2024
Merged
51 changes: 47 additions & 4 deletions README.md
Expand Up @@ -6,23 +6,66 @@ An experimental [OpenLineage](https://github.com/OpenLineage/OpenLineage) integr

## Requirements

- Java 11
- Java 21
- Maven
- Trino 362
- [Marquez](https://github.com/MarquezProject/marquez)
- Trino 422


## Mapping of properties

| OpenLineage | Trino |
|--------------------|-----------------------------------------------------|
| JobFacet Name | QueryID |
| JobFacet Namespace | Query Environment (or configured in event listener) |
| RunFacet ID | QueryID |


## Installation

Build and copy trino-openlineage plugin:

```sh
mvn clean install -DskipTests
unzip ./target/trino-openlineage-362.zip -d $TRINO_HOME/plugin
unzip ./target/trino-openlineage-422.zip -d $TRINO_HOME/plugin
```

Add the following line to `$TRINO_HOME/etc/event-listener.properties`:

```properties
event-listener.name=openlineage
openlineage.url=http://localhost:5000
#openlineage.facets.trinoMetadata.enabled=false
#openlineage.facets.trinoQueryContext.enabled=true
#openlineage.facets.trinoQueryStatistics.enabled=false
#openlineage.namespace=default
#openlineage.apikey=xxxx
```

## Local testing

1. Build plugin:

```shell
mvn clean install -DskipTests
```

2. Run docker compose:

```shell
docker compose up -d
```

- Freshly built plugin will be automatically mounted to your trino pod.
- Configuration of the plugin will be taken from `event-listener.properties` file - adjust it to your will and restart trino pod for changes to take effect.

3. Run query creating new table:

```shell
docker exec -it oltrino trino --execute 'create table memory.default.test_table as select * from tpch.sf1.nation limit 1;'
```

4. Check logs of mock api:

```shell
docker logs olapi
```
25 changes: 25 additions & 0 deletions docker-compose.yaml
mgorsk1 marked this conversation as resolved.
Show resolved Hide resolved
@@ -0,0 +1,25 @@
version: '3'

services:
trino:
image: trinodb/trino
container_name: oltrino
networks:
- trino-ol
ports:
- 8080:8080
volumes:
- ${PWD}/event-listener.properties:/etc/trino/event-listener.properties
- ${PWD}/target/trino-openlineage-442:/usr/lib/trino/plugin/openlineage
olapi:
image: mockserver/mockserver
container_name: olapi
environment:
MOCKSERVER_SERVER_PORT: 5000
networks:
- trino-ol
ports:
- 5000:5000

networks:
trino-ol:
6 changes: 5 additions & 1 deletion event-listener.properties
@@ -1,3 +1,7 @@
event-listener.name=openlineage
openlineage.url=http://localhost:5000
openlineage.url=http://olapi:5000
#openlineage.facets.trinoMetadata.enabled=false
#openlineage.facets.trinoQueryContext.enabled=true
#openlineage.facets.trinoQueryStatistics.enabled=false
#openlineage.namespace=default
#openlineage.apikey=xxxx
68 changes: 48 additions & 20 deletions pom.xml
@@ -1,32 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.trino</groupId>
<artifactId>trino-root</artifactId>
<version>362</version>
<version>442</version>
</parent>

<artifactId>trino-openlineage</artifactId>

<packaging>trino-plugin</packaging>

<description>Trino OpenLineage</description>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>http-client</artifactId>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand All @@ -39,14 +32,22 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>http-client</artifactId>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
</dependency>
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-java</artifactId>
<version>0.2.2</version>
<version>1.10.2</version>
</dependency>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
Expand All @@ -55,8 +56,18 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
Expand All @@ -66,4 +77,21 @@
</dependency>
</dependencies>

</project>
<build>
<plugins>
<plugin>
<groupId>com.github.ekryd.sortpom</groupId>
<artifactId>sortpom-maven-plugin</artifactId>
<version>3.4.0</version>
<executions>
<execution>
<goals>
<goal>sort</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Expand Up @@ -51,17 +51,20 @@ public void emit(OpenLineage.RunEvent event)
String json = objectMapper.writeValueAsString(event);
logger.info(json);

Request.Builder requestBuilder = Request.builder()
.setMethod("POST")
.setUri(URI.create(url + "/api/v1/lineage"))
.addHeader("Content-Type", "application/json")
.setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(json.getBytes(StandardCharsets.UTF_8)));
Request.Builder requestBuilder =
Request.builder()
.setMethod("POST")
.setUri(URI.create(url + "/api/v1/lineage"))
.addHeader("Content-Type", "application/json")
.setBodyGenerator(
StaticBodyGenerator.createStaticBodyGenerator(
json.getBytes(StandardCharsets.UTF_8)));

if (apiKey.isPresent()) {
requestBuilder.addHeader("Authorization", "Bearer " + apiKey.get());
}
apiKey.ifPresent(s -> requestBuilder.addHeader("Authorization", "Bearer " + s));

StatusResponseHandler.StatusResponse status = jettyClient.execute(requestBuilder.build(), StatusResponseHandler.createStatusResponseHandler());
StatusResponseHandler.StatusResponse status =
jettyClient.execute(
requestBuilder.build(), StatusResponseHandler.createStatusResponseHandler());
logger.info("Response status: " + status.getStatusCode());
}
catch (Exception e) {
Expand Down