Skip to content

Commit

Permalink
feat: added RPCs StopAirflowCommand, ExecuteAirflowCommand, PollAirfl…
Browse files Browse the repository at this point in the history
…owCommand, DatabaseFailover, FetchDatabaseProperties

PiperOrigin-RevId: 540051332
  • Loading branch information
Google APIs authored and Copybara-Service committed Jun 13, 2023
1 parent 50a39ef commit b84c697
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ apis:

types:
- name: google.cloud.orchestration.airflow.service.v1.CheckUpgradeResponse
- name: google.cloud.orchestration.airflow.service.v1.DatabaseFailoverResponse
- name: google.cloud.orchestration.airflow.service.v1.ExecuteAirflowCommandResponse
- name: google.cloud.orchestration.airflow.service.v1.FetchDatabasePropertiesResponse
- name: google.cloud.orchestration.airflow.service.v1.LoadSnapshotResponse
- name: google.cloud.orchestration.airflow.service.v1.OperationMetadata
- name: google.cloud.orchestration.airflow.service.v1.PollAirflowCommandResponse
- name: google.cloud.orchestration.airflow.service.v1.SaveSnapshotResponse
- name: google.cloud.orchestration.airflow.service.v1.StopAirflowCommandResponse

documentation:
summary: Manages Apache Airflow environments on Google Cloud Platform.
Expand Down
214 changes: 214 additions & 0 deletions google/cloud/orchestration/airflow/service/v1/environments.proto
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,33 @@ service Environments {
};
}

// Executes Airflow CLI command.
rpc ExecuteAirflowCommand(ExecuteAirflowCommandRequest)
returns (ExecuteAirflowCommandResponse) {
option (google.api.http) = {
post: "/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand"
body: "*"
};
}

// Stops Airflow CLI command execution.
rpc StopAirflowCommand(StopAirflowCommandRequest)
returns (StopAirflowCommandResponse) {
option (google.api.http) = {
post: "/v1/{environment=projects/*/locations/*/environments/*}:stopAirflowCommand"
body: "*"
};
}

// Polls Airflow CLI command execution and fetches logs.
rpc PollAirflowCommand(PollAirflowCommandRequest)
returns (PollAirflowCommandResponse) {
option (google.api.http) = {
post: "/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand"
body: "*"
};
}

// Creates a snapshots of a Cloud Composer environment.
//
// As a result of this operation, snapshot of environment's state is stored
Expand Down Expand Up @@ -123,6 +150,27 @@ service Environments {
metadata_type: "google.cloud.orchestration.airflow.service.v1.OperationMetadata"
};
}

// Triggers database failover (only for highly resilient environments).
rpc DatabaseFailover(DatabaseFailoverRequest)
returns (google.longrunning.Operation) {
option (google.api.http) = {
post: "/v1/{environment=projects/*/locations/*/environments/*}:databaseFailover"
body: "*"
};
option (google.longrunning.operation_info) = {
response_type: "google.cloud.orchestration.airflow.service.v1.DatabaseFailoverResponse"
metadata_type: "google.cloud.orchestration.airflow.service.v1.OperationMetadata"
};
}

// Fetches database properties.
rpc FetchDatabaseProperties(FetchDatabasePropertiesRequest)
returns (FetchDatabasePropertiesResponse) {
option (google.api.http) = {
get: "/v1/{environment=projects/*/locations/*/environments/*}:fetchDatabaseProperties"
};
}
}

// Create a new environment.
Expand Down Expand Up @@ -307,6 +355,119 @@ message UpdateEnvironmentRequest {
google.protobuf.FieldMask update_mask = 3;
}

// Execute Airflow Command request.
message ExecuteAirflowCommandRequest {
// The resource name of the environment in the form:
// "projects/{projectId}/locations/{locationId}/environments/{environmentId}".
string environment = 1;

// Airflow command.
string command = 2;

// Airflow subcommand.
string subcommand = 3;

// Parameters for the Airflow command/subcommand as an array of arguments.
// It may contain positional arguments like `["my-dag-id"]`, key-value
// parameters like `["--foo=bar"]` or `["--foo","bar"]`,
// or other flags like `["-f"]`.
repeated string parameters = 4;
}

// Response to ExecuteAirflowCommandRequest.
message ExecuteAirflowCommandResponse {
// The unique ID of the command execution for polling.
string execution_id = 1;

// The name of the pod where the command is executed.
string pod = 2;

// The namespace of the pod where the command is executed.
string pod_namespace = 3;

// Error message. Empty if there was no error.
string error = 4;
}

// Stop Airflow Command request.
message StopAirflowCommandRequest {
// The resource name of the environment in the form:
// "projects/{projectId}/locations/{locationId}/environments/{environmentId}".
string environment = 1;

// The unique ID of the command execution.
string execution_id = 2;

// The name of the pod where the command is executed.
string pod = 3;

// The namespace of the pod where the command is executed.
string pod_namespace = 4;

// If true, the execution is terminated forcefully (SIGKILL). If false, the
// execution is stopped gracefully, giving it time for cleanup.
bool force = 5;
}

// Response to StopAirflowCommandRequest.
message StopAirflowCommandResponse {
// Whether the execution is still running.
bool is_done = 1;

// Output message from stopping execution request.
repeated string output = 2;
}

// Poll Airflow Command request.
message PollAirflowCommandRequest {
// The resource name of the environment in the form:
// "projects/{projectId}/locations/{locationId}/environments/{environmentId}"
string environment = 1;

// The unique ID of the command execution.
string execution_id = 2;

// The name of the pod where the command is executed.
string pod = 3;

// The namespace of the pod where the command is executed.
string pod_namespace = 4;

// Line number from which new logs should be fetched.
int32 next_line_number = 5;
}

// Response to PollAirflowCommandRequest.
message PollAirflowCommandResponse {
// Contains information about a single line from logs.
message Line {
// Number of the line.
int32 line_number = 1;

// Text content of the log line.
string content = 2;
}

// Information about how a command ended.
message ExitInfo {
// The exit code from the command execution.
int32 exit_code = 1;

// Error message. Empty if there was no error.
string error = 2;
}

// Output from the command execution. It may not contain the full output
// and the caller may need to poll for more lines.
repeated Line output = 1;

// Whether the command execution has finished and there is no more output.
bool output_end = 2;

// The result exit status of the command.
ExitInfo exit_info = 3;
}

// Request to create a snapshot of a Cloud Composer environment.
message SaveSnapshotRequest {
// The resource name of the source environment in the form:
Expand Down Expand Up @@ -357,6 +518,44 @@ message LoadSnapshotRequest {
// Response to LoadSnapshotRequest.
message LoadSnapshotResponse {}

// Request to trigger database failover (only for highly resilient
// environments).
message DatabaseFailoverRequest {
// Target environment:
// "projects/{projectId}/locations/{locationId}/environments/{environmentId}"
string environment = 1;
}

// Response for DatabaseFailoverRequest.
message DatabaseFailoverResponse {}

// Request to fetch properties of environment's database.
message FetchDatabasePropertiesRequest {
// Required. The resource name of the environment, in the form:
// "projects/{projectId}/locations/{locationId}/environments/{environmentId}"
string environment = 1 [
(google.api.field_behavior) = REQUIRED,
(google.api.resource_reference) = {
type: "composer.googleapis.com/Environment"
}
];
}

// Response for FetchDatabasePropertiesRequest.
message FetchDatabasePropertiesResponse {
// The Compute Engine zone that the instance is currently serving from.
string primary_gce_zone = 1;

// The Compute Engine zone that the failover instance is currently serving
// from for a regional Cloud SQL instance.
string secondary_gce_zone = 2;

// The availability status of the failover replica. A false status indicates
// that the failover replica is out of sync. The primary instance can only
// fail over to the failover replica when the status is true.
bool is_failover_replica_available = 3;
}

// Configuration information for an environment.
message EnvironmentConfig {
// The size of the Cloud Composer environment.
Expand All @@ -374,6 +573,15 @@ message EnvironmentConfig {
ENVIRONMENT_SIZE_LARGE = 3;
}

// Resilience mode of the Cloud Composer Environment.
enum ResilienceMode {
// Default mode doesn't change environment parameters.
RESILIENCE_MODE_UNSPECIFIED = 0;

// Enabled High Resilience mode, including Cloud SQL HA.
HIGH_RESILIENCE = 1;
}

// Output only. The Kubernetes Engine cluster used to run this environment.
string gke_cluster = 1;

Expand Down Expand Up @@ -478,6 +686,12 @@ message EnvironmentConfig {
// This field is supported for Cloud Composer environments in versions
// composer-2.*.*-airflow-*.*.* and newer.
RecoveryConfig recovery_config = 18 [(google.api.field_behavior) = OPTIONAL];

// Optional. Resilience mode of the Cloud Composer Environment.
//
// This field is supported for Cloud Composer environments in versions
// composer-2.2.0-airflow-*.*.* and newer.
ResilienceMode resilience_mode = 19 [(google.api.field_behavior) = OPTIONAL];
}

// Network-level access control policy for the Airflow web server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ message OperationMetadata {

// Loads snapshot of the resource operation.
LOAD_SNAPSHOT = 6;

// Triggers failover of environment's Cloud SQL instance (only for highly
// resilient environments).
DATABASE_FAILOVER = 7;
}

// Output only. The current operation state.
Expand Down

0 comments on commit b84c697

Please sign in to comment.