Skip to content

Ingest Source Research

Antoni Ivanov edited this page Feb 23, 2024 · 23 revisions

The purpose of the page is to specify the requirements for a Data Source Ingestion in VDK, outlining the functionalities needed to serve the needs of Plugin Developers, Data Users, and Operators. It serves as a guide for generating ideas, for development.

Motivation

Data platforms like HuggingFace offer a plethora of ready-to-use datasets, but there is a glaring gap in facilitating user-generated datasets. Users encounter multiple hurdles in creating and managing datasets that can come from diverse sources like traditional databases, streaming platforms, cloud storage, or REST APIs. VDK aims to simplify and streamline the ingestion process with a Data Source Ingestion feature.

Versatile Data Kit (VDK) provides a robust framework for managing data pipelines and includes "destination" IIngester Plugin for data ingestion. However, it currently lacks the concept of "source" plugins, limiting its ability to fully manage the data lifecycle from source to destination.

High-level summary of what is needed:

  • For Plugin Developers The Data Sources Plugin API should allow developers to create versatile data source plugins with built-in features for secure connections and data filtering. The API should be designed for easy extensibility, focusing on reusability.

  • For Data Users VDK Data Sources User API should simplify data ingestion with minimal configuration and optional data transformation features. Metadata and metrics are integrated, providing transparency and control over data quality.

  • For Operators VDK supports should support or enable features like data partitioning, rate limiting, and backpressure for scalable operations. Monitoring, logging, and alerting are comprehensive, while auto-recovery and rollback add resilience.

Personas

  • Extension (Plugin) Developer . The developers building different data source plugins
  • (Data) User - the user (data practitioner, engineer, scientist) using the plugins to create their own datasets
  • Operator - this ingest would run in some infrastructure and need to be operated by IT or data teams or DevOps teams. It would need to scale and be robust and stable.

Requirements

Data Source Extension API

Target Persona: Plugin Developer

As a plugin developer, I want to be able to build a plugin for a new type of data source.

Connection Management

The API should allow to handle connectivity to data sources in a secure, efficient way with minimum user intervention. The API should be able to handle a variety of data sources extensions such as relational databases, NoSQL databases, file systems, cloud storages, web APIs, and so on.

Incremental Extraction

To avoid extracting all data every time, the API should support incremental extraction, often based on a timestamp or a specific ID field. This can be done by offering parameters to specify the field and the last value extracted to the user.

The point is though the API should provide abstractions for plugin developer to handle this as well. This is the requirement by the plugin developer for such mechanism.

Data Source Plugin Chaining

The Data Source Plugin API should allow developers to build plugins with reusable and extendable components. A developer should be able to create a plugin (e.g., Plugin A) that can be extended by another plugin (e.g., Plugin B).

Use Case Example:

  • Plugin httpExtractor: Responsible for making HTTP requests and fetching data.
  • Plugin Github API Extractor: Extends httpExtractor to tailor the request and response specifically for GitHub APIs.

Easy to integrate with existing open source data ingestion solutions

Airbyte and singer.io provide a pletoria of data sources for BI use cases. So people should continue being able to use them.

Data Source User facing APIs

Target Persona: Data user

As a data user who needs to ingest (extract) data from differnet soruces I want ot she build plugins for those sources to make my job for extracting and ingesting the data very easy (just configuration)

Transformation

Filtering

The API should allow the data users to specify criteria to extract only certain data, such as a specific date range or where certain conditions are met.

Re-mapping and other transformation

The API should provide exntesion api for user to apply simple transformaiton but only if needed. It should be optional and there should be default mapping.

Metadata and Metrics

There should be Metric API that allow extension to define source-associated standard metrics and calculates these metrics for the corresponding data source

Error Handling

The API should be able to gracefully handle errors and provide informative error messages. It's also critical to handle failures gracefully, with capabilities for error detection, retrying, and recovery.

Operation

Target Persona: Operator

Parallelism

The data to be extracted can be divided into logical partitions that can be processed independently, such as different tables in a database, different time ranges of data, or different geographical regions. The API could include parameters to specify the partitioning scheme for a given extraction job

Monitoring and Logging

API should expose essential metrics and logs for real-time and post-analysis for each data source.

Audit Trails

API should keep logs for activities like data extraction, errors, and access records.

Documentation

Provide concise operator-specific documentation for troubleshooting and best practices.

Design

Concepts

Data Source

A Data Source is a central component responsible for establishing and managing a connection to a specific set of data. It interacts with a given configuration and maintains a stateful relationship with the data it accesses. This stateful relationship can include information such as authentication tokens, data markers, or any other form of metadata that helps manage the data connection. The Data Source exposes various data streams through which data can be read.

Data Source Stream

A Data Source Stream is an abstraction over a subset of data in the Data Source. It can be thought of as a channel through which data flows. Each Data Source Stream has a unique name to identify it and includes methods to read data from the stream. For example for Database based data source , each table could be a separate stream. Streams can be ingested in parallel potentially.

Reading from the stream yields a sequence of Data Source Payloads

Data Source Payload

The Data Source Payload is a data structure that encapsulates the actual data along with its metadata. The payload consists of four main parts:

  • Data: containing the core data that needs to be ingested (e.g in database the table content)
  • Metadata: A dictionary containing additional contextual information about the data (for example timestamps, environment specific metadata, etc.)
  • State: Contains the state of the data soruce stream as of this payload. For example in case of incremental ingestion from a database table it would contain the value of a incremental key columns (le.g updated_time column in teh table) which can be used to restart/continue the ingestion later.

Singer integration (vdk-singer plugin)

https://www.singer.io/ is a open source framework providing over 100 different data sources implementations out of the box . In singer the data source is called a "tap".

This vdk-singer enables the integration of Singer Taps as data sources within the Versatile Data Kit (VDK) framework. It allows users to read data streams from Singer-compatible sources directly into VDK.

Configuration

tap_name is the name of the tap. The corresponding tap library (tap-xxx) need to be installed for example pip install tap-gitlab

Here is an example of how singer data source configuration will look like

config = SingerDataSourceConfiguration(
        tap_name="tap-gitlab",
        tap_config={
            "api_url": "https://gitlab.com/api/v4",
            "private_token": "",  # TODO
            "groups": "vmware-analytics",
            "projects": "vmware-analytics/versatile-data-kit",
            "start_date": "2018-01-01T00:00:00Z",
        },
        tap_auto_discover_schema=True,
    )

That's all it needs from user point of view. The Data Source frameowkr would read the tap and forward the data to configured target

Constraints and Limitations

  • Not all Singer Taps may be compatible with VDK as this is not extensively tested
  • Automatic schema discovery is dependent on the Singer Tap supporting this feature.
  • Incremetnal ingestion is dependent on the singer tap

Data Flow Input API

The Data Sources API being used directly is not very user friendly and very verbose so a high level Flow API is introduced

The API supports configurations both programmatically within the pipeline code and externally via a TOML file.

Using Python Code

In the data job, users define source and destination specifications:

source = SourceDefinition(id="auto", name="source-name", config={})
destination = DestinationDefinition(id="auto-dest", method="method-name")
with DataFlowInput(job_input) as flow_input:
    flow_input.start(source, destination)

Using TOML File

# Data Source Configuration
[sources.yourSourceId]
## Data Source Name. Installed dta sources can be seen using vdk data-sources --list
name = "<data-source-name>"
## The singer tap we will use
config = {
    ## Set the configuration for the data source. 
    ## You can see what config options are supported with vdk data-sources --config <data-source-name>
}

# repeat this for as many sources you want


# Data Destination Configuration. 
## Ingestion methods and targets are the same one as those accepted by send_object_for_ingestion
## See https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-core/src/vdk/api/job_input.py#L183
[destinations.yourDestinationId]
## the only required filed is method
method = "<method-name>"
## Optionally specify target 
## target =


# repeat this for as many destinations you want

# Data Flows from Source to Destination
[[flows]]
from = "yourSourceId"

Gaps and TODOs

  • Install singer taps in separate environments . Singer taps conflict a lot and they need to be each in separate env to be sure to work
  • Simplify data sources creation APIs. Create ipython magics that can make it easier to create new datasets
  • Test and simplify data sources creation reusability. Extending another data source is a bit clumsy. SOmething with vdk-ipython perhaps
  • Add ability for IIngester plugins to be more configurable. Currently one one configuration is possible per plugin but we may need multiple instances of the same type (e.g multiple postgres instances)
  • IINgester framework doesn't work with non-json serializable data making it hard to send bytes/binary data

References

Clone this wiki locally