Skip to content

Commit

Permalink
Optionally run Dekaf UI; describe command
Browse files Browse the repository at this point in the history
  • Loading branch information
visortelle committed Apr 24, 2024
1 parent de1973b commit a6e545f
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 45 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ serde_yaml = "0.9.34"
strum = { version = "0.26.2", features = ["derive"] }
tokio = { version = "1.37.0", features = ["full", "test-util"] }
uuid = { version = "1.8.0", features = ["v4", "fast-rng"] }
webbrowser = "1.0.0"
196 changes: 179 additions & 17 deletions src/docker_compose/docker_compose.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,72 @@
use anyhow::Result;
use serde::Serialize;

use crate::InstanceConfig;
use std::cmp::min;

pub fn generate_template(instance_name: String, instance_config: InstanceConfig) -> String {
pub trait PrintInfo {
fn print_info(&self);
}

#[derive(Debug, Clone, Serialize)]
pub struct PulsarProxyOutput {
pub web_service_url: Option<String>,
pub web_service_host_url: Option<String>,
pub broker_service_url: Option<String>,
pub broker_service_host_url: Option<String>,
pub docker_compose_template: String,
}

#[derive(Debug, Clone, Serialize)]
pub struct ClusterOutput {
pub cluster_index: u32,
pub web_service_host_url: Option<String>,
pub broker_service_host_url: Option<String>,
pub dekaf_host_url: Option<String>,
pub docker_compose_template: String,
}

impl PrintInfo for ClusterOutput {
fn print_info(&self) {
println!("Cluster index: {}", self.cluster_index);
println!("Pulsar broker service URL: {}", self.broker_service_host_url.clone().unwrap_or("-".to_string()));
println!("Pulsar web service URL: {}", self.web_service_host_url.clone().unwrap_or("-".to_string()));

if self.dekaf_host_url.is_some() {
println!("Dekaf management UI is available URL: {:?}", self.dekaf_host_url);
}
}
}

#[derive(Debug, Clone, Serialize)]
pub struct InstanceOutput {
pub instance_name: String,
pub clusters: Vec<ClusterOutput>,
pub docker_compose_template: String,
}

impl PrintInfo for InstanceOutput {
fn print_info(&self) {
println!("Instance name: {}", self.instance_name);

let num_clusters = self.clusters.len();
println!("Clusters ({num_clusters}):");
for cluster in &self.clusters {
cluster.print_info();
}
}
}

#[derive(Debug, Clone, Serialize)]
pub struct DekafOutput {
pub dekaf_host_url: String,
pub docker_compose_template: String,
}

pub fn generate_instance(
instance_name: String,
instance_config: InstanceConfig,
) -> Result<InstanceOutput> {
let zookeepers_per_cluster = instance_config.num_zookeepers;
let zookeeper_templates = (0..zookeepers_per_cluster)
.map(|zookeeper_index| {
Expand All @@ -16,17 +81,24 @@ pub fn generate_template(instance_name: String, instance_config: InstanceConfig)

let num_clusters = instance_config.num_clusters;
let cluster_names = (0..num_clusters).map(|i| format!("cluster-{}", i));
let cluster_templates = cluster_names

let clusters: Vec<ClusterOutput> = cluster_names
.clone()
.enumerate()
.map(|(cluster_index, cluster_name)| {
generate_cluster_template(
generate_cluster(
instance_name.clone(),
instance_config.clone(),
cluster_name,
u32::try_from(cluster_index).unwrap(),
)
.unwrap()
})
.collect();

let clusters_template = clusters
.iter()
.map(|cluster| cluster.docker_compose_template.clone())
.collect::<Vec<String>>()
.join("\n\n");

Expand All @@ -48,9 +120,11 @@ pub fn generate_template(instance_name: String, instance_config: InstanceConfig)
let instance_config_str = serde_yaml::to_string(&instance_config)
.unwrap()
.lines()
.map(|l| format!("████# {}", l)).collect::<Vec<String>>().join("\n");
.map(|l| format!("████# {}", l))
.collect::<Vec<String>>()
.join("\n");

format! {"
let docker_compose_template = format! {"
# This file is autogenerated by puls, the tool to manage local Pulsar instances
# More info https://github.com/tealtools/puls
#
Expand All @@ -59,7 +133,7 @@ pub fn generate_template(instance_name: String, instance_config: InstanceConfig)
services:
{zookeeper_templates}
{cluster_templates}
{clusters_template}
volumes:
{bookie_volumes_template}
Expand All @@ -70,15 +144,23 @@ networks:
████████driver: bridge
"}
.trim()
.replace('█', " ")
.replace('█', " ");

let instance_output = InstanceOutput {
instance_name: instance_name.clone(),
clusters,
docker_compose_template: docker_compose_template.clone(),
};

Ok(instance_output)
}

pub fn generate_cluster_template(
pub fn generate_cluster(
instance_name: String,
instance_config: InstanceConfig,
cluster_name: String,
cluster_index: u32,
) -> String {
) -> Result<ClusterOutput> {
let pulsar_init_job_template = generate_pulsar_init_job_template(
instance_name.clone(),
instance_config.clone(),
Expand Down Expand Up @@ -110,12 +192,14 @@ pub fn generate_cluster_template(
.collect::<Vec<String>>()
.join("\n");

let pulsar_proxy_template = generate_pulsar_proxy_template(
let pulsar_proxy_output = generate_pulsar_proxy(
instance_name.clone(),
instance_config.clone(),
cluster_name.clone(),
cluster_index,
);
)?;

let pulsar_proxy_template = pulsar_proxy_output.docker_compose_template;

let post_cluster_create_job_template = generate_post_cluster_create_job_template(
instance_name.clone(),
Expand All @@ -124,7 +208,17 @@ pub fn generate_cluster_template(
cluster_index,
);

format! {"
let dekaf_output: Option<DekafOutput> = if instance_config.with_dekaf {
Some(generate_dekaf(instance_name.clone(), cluster_index)?)
} else {
None
};

let dekaf_template = dekaf_output.clone()
.map(|dekaf| dekaf.docker_compose_template)
.unwrap_or("".to_string());

let docker_compose_template = format! {"
████# BEGIN Pulsar cluster {cluster_name} definition
{pulsar_init_job_template}
Expand All @@ -136,18 +230,30 @@ pub fn generate_cluster_template(
{pulsar_proxy_template}
{post_cluster_create_job_template}
{dekaf_template}
████# END Pulsar cluster {cluster_name} definition
"}
.trim()
.to_string()
.to_string();

let cluster_output = ClusterOutput {
cluster_index,
web_service_host_url: pulsar_proxy_output.web_service_host_url,
broker_service_host_url: pulsar_proxy_output.broker_service_host_url,
dekaf_host_url: dekaf_output.map(|dekaf| dekaf.dekaf_host_url),
docker_compose_template,
};

Ok(cluster_output)
}

pub fn generate_pulsar_proxy_template(
pub fn generate_pulsar_proxy(
instance_name: String,
instance_config: InstanceConfig,
cluster_name: String,
cluster_index: u32,
) -> String {
) -> Result<PulsarProxyOutput> {
let pulsar_version = instance_config.pulsar_version;
let zookeepers_per_cluster = instance_config.num_zookeepers;
let depends_on_zookeeper_template = (0..zookeepers_per_cluster)
Expand All @@ -165,9 +271,14 @@ pub fn generate_pulsar_proxy_template(
.join("\n");

let web_service_port = (cluster_index.to_string() + "8080").parse::<u32>().unwrap();
let web_service_url = format!("http://pulsar-proxy-{cluster_name}:8080");
let web_service_host_url = format!("http://localhost:{web_service_port}");

let broker_service_port = (cluster_index.to_string() + "6650").parse::<u32>().unwrap();
let broker_service_url = format!("pulsar://pulsar-proxy-{cluster_name}:6650");
let broker_service_host_url = format!("pulsar://localhost:{broker_service_port}");

format! {"
let docker_compose_template = format! {"
████# Pulsar proxy for cluster {cluster_name}
████pulsar-proxy-{cluster_name}:
████████image: apachepulsar/pulsar:{pulsar_version}
Expand All @@ -192,7 +303,17 @@ pub fn generate_pulsar_proxy_template(
{depends_on_brokers_template}
████████networks:
████████████- pulsar-net-{instance_name}
"}
"};

let pulsar_proxy_output = PulsarProxyOutput {
web_service_url: Some(web_service_url.clone()),
web_service_host_url: Some(web_service_host_url.clone()),
broker_service_url: Some(broker_service_url.clone()),
broker_service_host_url: Some(broker_service_host_url.clone()),
docker_compose_template: docker_compose_template.clone(),
};

Ok(pulsar_proxy_output)
}

pub fn generate_zookeeper_template(
Expand Down Expand Up @@ -347,6 +468,47 @@ pub fn generate_post_cluster_create_job_template(
.to_string()
}

pub fn generate_dekaf(instance_name: String, cluster_index: u32) -> Result<DekafOutput> {
let depends_on_pulsar_proxy_template = format!("████████████pulsar-proxy-cluster-{cluster_index}:\n████████████████condition: service_healthy");

let port = (cluster_index.to_string() + "8090").parse::<u32>().unwrap();

let dekaf_host_url = format!("http://localhost:{port}");

let docker_compose_template = format! {"
████# Dekaf UI for cluster-{cluster_index}
████# More info: https://dekaf.io
████dekaf-ui-cluster-{cluster_index}:
████████image: tealtools/dekaf:latest
████████restart: on-failure
████████environment:
████████████- DEKAF_PULSAR_NAME={instance_name}-cluster-{cluster_index}
████████████- DEKAF_PULSAR_WEB_URL=http://pulsar-proxy-cluster-{cluster_index}:8080
████████████- DEKAF_PULSAR_BROKER_URL=pulsar://pulsar-proxy-cluster-{cluster_index}:6650
████████████- DEKAF_PUBLIC_BASE_URL={dekaf_host_url}
████████ports:
████████████- {port}:8090
████████healthcheck:
████████████test: [\"CMD\", \"curl\", \"--fail\", \"http://127.0.0.1:8090/admin/v2/brokers/health\"]
████████████interval: 10s
████████████timeout: 5s
████████████retries: 20
████████depends_on:
{depends_on_pulsar_proxy_template}
████████networks:
████████████- pulsar-net-{instance_name}
"}
.trim()
.to_string();

let dekaf_output = DekafOutput {
dekaf_host_url,
docker_compose_template,
};

Ok(dekaf_output)
}

pub fn generate_broker_template(
instance_name: String,
instance_config: InstanceConfig,
Expand Down
5 changes: 5 additions & 0 deletions src/instance_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ pub struct InstanceConfig {

#[arg(long, default_value = DEFAULT_NUM_ZOOKEEPERS)]
pub num_zookeepers: u32,

/// Enable Pulsar management UI for the instance
#[arg(long, default_value_t = false)]
pub with_dekaf: bool,
}

impl Default for InstanceConfig {
Expand All @@ -34,6 +38,7 @@ impl Default for InstanceConfig {
num_brokers: DEFAULT_NUM_BROKERS.parse().unwrap(),
num_bookies: DEFAULT_NUM_BOOKIES.parse().unwrap(),
num_zookeepers: DEFAULT_NUM_ZOOKEEPERS.parse().unwrap(),
with_dekaf: true,
}
}
}

0 comments on commit a6e545f

Please sign in to comment.