Skip to content

Commit

Permalink
Merge pull request #1 from mgorsk1/feat/revive
Browse files Browse the repository at this point in the history
revive this project
  • Loading branch information
takezoe committed Mar 23, 2024
2 parents a08a3f1 + 9022fa7 commit 3004a42
Show file tree
Hide file tree
Showing 7 changed files with 347 additions and 79 deletions.
51 changes: 47 additions & 4 deletions README.md
Expand Up @@ -6,23 +6,66 @@ An experimental [OpenLineage](https://github.com/OpenLineage/OpenLineage) integr

## Requirements

- Java 11
- Java 21
- Maven
- Trino 362
- [Marquez](https://github.com/MarquezProject/marquez)
- Trino 422


## Mapping of properties

| OpenLineage | Trino |
|--------------------|-----------------------------------------------------|
| JobFacet Name | QueryID |
| JobFacet Namespace | Query Environment (or configured in event listener) |
| RunFacet ID | QueryID |


## Installation

Build and copy trino-openlineage plugin:

```sh
mvn clean install -DskipTests
unzip ./target/trino-openlineage-362.zip -d $TRINO_HOME/plugin
unzip ./target/trino-openlineage-422.zip -d $TRINO_HOME/plugin
```

Add the following line to `$TRINO_HOME/etc/event-listener.properties`:

```properties
event-listener.name=openlineage
openlineage.url=http://localhost:5000
#openlineage.facets.trinoMetadata.enabled=false
#openlineage.facets.trinoQueryContext.enabled=true
#openlineage.facets.trinoQueryStatistics.enabled=false
#openlineage.namespace=default
#openlineage.apikey=xxxx
```

## Local testing

1. Build plugin:

```shell
mvn clean install -DskipTests
```

2. Run docker compose:

```shell
docker compose up -d
```

- Freshly built plugin will be automatically mounted to your trino pod.
- Configuration of the plugin will be taken from `event-listener.properties` file - adjust it to your will and restart trino pod for changes to take effect.

3. Run query creating new table:

```shell
docker exec -it oltrino trino --execute 'create table memory.default.test_table as select * from tpch.sf1.nation limit 1;'
```

4. Check logs of mock api:

```shell
docker logs olapi
```
25 changes: 25 additions & 0 deletions docker-compose.yaml
@@ -0,0 +1,25 @@
version: '3'

services:
trino:
image: trinodb/trino
container_name: oltrino
networks:
- trino-ol
ports:
- 8080:8080
volumes:
- ${PWD}/event-listener.properties:/etc/trino/event-listener.properties
- ${PWD}/target/trino-openlineage-442:/usr/lib/trino/plugin/openlineage
olapi:
image: mockserver/mockserver
container_name: olapi
environment:
MOCKSERVER_SERVER_PORT: 5000
networks:
- trino-ol
ports:
- 5000:5000

networks:
trino-ol:
6 changes: 5 additions & 1 deletion event-listener.properties
@@ -1,3 +1,7 @@
event-listener.name=openlineage
openlineage.url=http://localhost:5000
openlineage.url=http://olapi:5000
#openlineage.facets.trinoMetadata.enabled=false
#openlineage.facets.trinoQueryContext.enabled=true
#openlineage.facets.trinoQueryStatistics.enabled=false
#openlineage.namespace=default
#openlineage.apikey=xxxx
68 changes: 48 additions & 20 deletions pom.xml
@@ -1,32 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.trino</groupId>
<artifactId>trino-root</artifactId>
<version>362</version>
<version>442</version>
</parent>

<artifactId>trino-openlineage</artifactId>

<packaging>trino-plugin</packaging>

<description>Trino OpenLineage</description>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>http-client</artifactId>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand All @@ -39,14 +32,22 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>http-client</artifactId>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
</dependency>
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-java</artifactId>
<version>0.2.2</version>
<version>1.10.2</version>
</dependency>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
Expand All @@ -55,8 +56,18 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
Expand All @@ -66,4 +77,21 @@
</dependency>
</dependencies>

</project>
<build>
<plugins>
<plugin>
<groupId>com.github.ekryd.sortpom</groupId>
<artifactId>sortpom-maven-plugin</artifactId>
<version>3.4.0</version>
<executions>
<execution>
<goals>
<goal>sort</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Expand Up @@ -51,17 +51,20 @@ public void emit(OpenLineage.RunEvent event)
String json = objectMapper.writeValueAsString(event);
logger.info(json);

Request.Builder requestBuilder = Request.builder()
.setMethod("POST")
.setUri(URI.create(url + "/api/v1/lineage"))
.addHeader("Content-Type", "application/json")
.setBodyGenerator(StaticBodyGenerator.createStaticBodyGenerator(json.getBytes(StandardCharsets.UTF_8)));
Request.Builder requestBuilder =
Request.builder()
.setMethod("POST")
.setUri(URI.create(url + "/api/v1/lineage"))
.addHeader("Content-Type", "application/json")
.setBodyGenerator(
StaticBodyGenerator.createStaticBodyGenerator(
json.getBytes(StandardCharsets.UTF_8)));

if (apiKey.isPresent()) {
requestBuilder.addHeader("Authorization", "Bearer " + apiKey.get());
}
apiKey.ifPresent(s -> requestBuilder.addHeader("Authorization", "Bearer " + s));

StatusResponseHandler.StatusResponse status = jettyClient.execute(requestBuilder.build(), StatusResponseHandler.createStatusResponseHandler());
StatusResponseHandler.StatusResponse status =
jettyClient.execute(
requestBuilder.build(), StatusResponseHandler.createStatusResponseHandler());
logger.info("Response status: " + status.getStatusCode());
}
catch (Exception e) {
Expand Down

0 comments on commit 3004a42

Please sign in to comment.