Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pingora tutorial start added #461

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions pingora_tutorial/src/ctx.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use async_trait::async_trait;
use log::info;
use std::sync::atomic::{AtomicUsize, Ordering};

use pingora_core::server::configuration::Opt;
use pingora_core::server::Server;
use pingora_core::upstreams::peer::HttpPeer;
use pingora_core::Result;
use pingora_http::RequestHeader;
use pingora_proxy::{ProxyHttp, Session};

/// Global request counter using `AtomicUsize` for thread-safe atomic operations
static REQ_COUNTER: AtomicUsize = AtomicUsize::new(0);

pub struct MyProxy {
/// Counter for beta users
beta_counter: AtomicUsize,
}

pub struct MyCtx {
beta_user: bool,
}

fn check_beta_user(req: &RequestHeader) -> bool {
// Simple logic to check if user is beta
req.headers.get("beta-flag").is_some()
}

#[async_trait]
impl ProxyHttp for MyProxy {
type CTX = MyCtx;

fn new_ctx(&self) -> Self::CTX {
MyCtx { beta_user: false }
}

async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
ctx.beta_user = check_beta_user(session.req_header());
Ok(false) // Continue processing the request
}

async fn upstream_peer(
&self,
_session: &mut Session,
ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>> {
// Increment the global request counter atomically
let req_counter = REQ_COUNTER.fetch_add(1, Ordering::Relaxed) + 1;

let addr = if ctx.beta_user {
// Increment the beta user counter atomically
let beta_count = self.beta_counter.fetch_add(1, Ordering::Relaxed) + 1;
info!("I'm a beta user #{beta_count}");
("1.0.0.1", 443)
} else {
info!("I'm a user #{req_counter}");
("1.1.1.1", 443)
};

let peer = Box::new(HttpPeer::new(
addr,
true,
"one.one.one.one".to_string(),
));
Ok(peer)
}
}

// To run the example:
// RUST_LOG=INFO cargo run --example ctx
// curl 127.0.0.1:6190 -H "Host: one.one.one.one"
// curl 127.0.0.1:6190 -H "Host: one.one.one.one" -H "beta-flag: 1"
fn main() {
env_logger::init();

// Read command line arguments
let opt = Opt::parse_args();
let mut my_server = Server::new(Some(opt)).unwrap();
my_server.bootstrap();

let mut my_proxy = pingora_proxy::http_proxy_service(
&my_server.configuration,
MyProxy {
beta_counter: AtomicUsize::new(0),
},
);
my_proxy.add_tcp("0.0.0.0:6190");

my_server.add_service(my_proxy);
my_server.run_forever();
}
121 changes: 121 additions & 0 deletions pingora_tutorial/src/gateway.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use async_trait::async_trait;
use log::info;
use prometheus::{IntCounter, register_int_counter};

use pingora_core::server::Server;
use pingora_core::upstreams::peer::HttpPeer;
use pingora_core::Result;
use pingora_http::{RequestHeader, ResponseHeader};
use pingora_proxy::{ProxyHttp, Session};

fn check_login(req: &RequestHeader) -> bool {
// Implement your login check logic here
req.headers.get("Authorization")
.map(|v| v.as_bytes() == b"password")
.unwrap_or(false)
}

pub struct MyGateway {
req_metric: IntCounter,
}

#[async_trait]
impl ProxyHttp for MyGateway {
type CTX = ();

fn new_ctx(&self) -> Self::CTX {
()
}

async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
if session.req_header().uri.path().starts_with("/login")
&& !check_login(session.req_header())
{
let _ = session.respond_error(403).await;
// Return true to indicate early response
return Ok(true);
}
Ok(false)
}

async fn upstream_peer(
&self,
session: &mut Session,
_ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>> {
let addr = if session.req_header().uri.path().starts_with("/family") {
("1.0.0.1", 443)
} else {
("1.1.1.1", 443)
};

info!("Connecting to {:?}", addr);

let peer = Box::new(HttpPeer::new(
addr,
true,
"one.one.one.one".to_string(),
));
Ok(peer)
}

async fn response_filter(
&self,
_session: &mut Session,
upstream_response: &mut ResponseHeader,
_ctx: &mut Self::CTX,
) -> Result<()> {
// Replace existing header if any
upstream_response
.insert_header("Server", "MyGateway")
.unwrap();
// Remove unsupported header
upstream_response.remove_header("alt-svc");

Ok(())
}

async fn logging(
&self,
session: &mut Session,
_e: Option<&pingora_core::Error>,
_ctx: &mut Self::CTX,
) {
let response_code = session
.response_written()
.map_or(0, |resp| resp.status.as_u16());
info!(
"Request to {} responded with status code {}",
session.req_header().uri.path(),
response_code
);

self.req_metric.inc();
}
}

fn main() {
env_logger::init();

// Create the server without options
let mut my_server = Server::new(None).unwrap();
my_server.bootstrap();

let req_metric = register_int_counter!("req_counter", "Number of requests").unwrap();

let mut my_proxy = pingora_proxy::http_proxy_service(
&my_server.configuration,
MyGateway {
req_metric,
},
);
my_proxy.add_tcp("0.0.0.0:6191");
my_server.add_service(my_proxy);

let mut prometheus_service_http =
pingora_core::services::listening::Service::prometheus_http_service();
prometheus_service_http.add_tcp("127.0.0.1:6192");
my_server.add_service(prometheus_service_http);

my_server.run_forever();
}
76 changes: 76 additions & 0 deletions pingora_tutorial/src/grpc_web_modules.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use async_trait::async_trait;

use pingora_core::server::Server;
use pingora_core::upstreams::peer::HttpPeer;
use pingora_core::Result;
use pingora_core::{
modules::http::{
grpc_web::{GrpcWeb, GrpcWebBridge},
HttpModules,
},
};
use pingora_proxy::{ProxyHttp, Session};

/// This example shows how to use the gRPC-web bridge module

pub struct GrpcWebBridgeProxy;

#[async_trait]
impl ProxyHttp for GrpcWebBridgeProxy {
type CTX = ();

fn new_ctx(&self) -> Self::CTX {
()
}

fn init_downstream_modules(&self, modules: &mut HttpModules) {
// Add the gRPC web module
modules.add_module(Box::new(GrpcWeb))
}

async fn early_request_filter(
&self,
session: &mut Session,
_ctx: &mut Self::CTX,
) -> Result<()> {
let grpc = session
.downstream_modules_ctx
.get_mut::<GrpcWebBridge>()
.expect("GrpcWebBridge module added");

// Initialize gRPC module for this request
grpc.init();
Ok(())
}

async fn upstream_peer(
&self,
_session: &mut Session,
_ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>> {
// This needs to be your gRPC server
let grpc_peer = Box::new(HttpPeer::new(
("1.1.1.1", 443),
true,
"one.one.one.one".to_string(),
));
Ok(grpc_peer)
}
}

// RUST_LOG=INFO cargo run --example grpc_web_module

fn main() {
env_logger::init();

// Create server without command line arguments
let mut my_server = Server::new(None).unwrap();
my_server.bootstrap();

let mut my_proxy =
pingora_proxy::http_proxy_service(&my_server.configuration, GrpcWebBridgeProxy);
my_proxy.add_tcp("0.0.0.0:6194");

my_server.add_service(my_proxy);
my_server.run_forever();
}
85 changes: 85 additions & 0 deletions pingora_tutorial/src/load_loadbalancer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use async_trait::async_trait;
use log::info;
use pingora_core::services::background::background_service;
use std::{sync::Arc, time::Duration};

use pingora_core::server::Server;
use pingora_core::upstreams::peer::HttpPeer;
use pingora_core::Result;
use pingora_core::{
listeners::tls::TlsSettings,
};
use pingora_load_balancing::{health_check, selection::RoundRobin, LoadBalancer};
use pingora_proxy::{ProxyHttp, Session};
use pingora_http::RequestHeader;

pub struct LB(Arc<LoadBalancer<RoundRobin>>);

#[async_trait]
impl ProxyHttp for LB {
type CTX = ();

fn new_ctx(&self) -> Self::CTX {
()
}

async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> {
let upstream = self
.0
.select(b"", 256) // hash doesn't matter
.unwrap();

info!("upstream peer is: {:?}", upstream);

let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()));
Ok(peer)
}

async fn upstream_request_filter(
&self,
_session: &mut Session,
upstream_request: &mut RequestHeader,
_ctx: &mut Self::CTX,
) -> Result<()> {
upstream_request
.insert_header("Host", "one.one.one.one")
.unwrap();
Ok(())
}
}

// RUST_LOG=INFO cargo run --example load_balancer
fn main() {
env_logger::init();

// Create the server without command line arguments
let mut my_server = Server::new(None).unwrap();
my_server.bootstrap();

// "127.0.0.1:343" is just a bad server
let mut upstreams =
LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443", "127.0.0.1:343"]).unwrap();

// We add health check in the background so that the bad server is never selected.
let hc = health_check::TcpHealthCheck::new();
upstreams.set_health_check(hc);
upstreams.health_check_frequency = Some(Duration::from_secs(1));

let background = background_service("health check", upstreams);

let upstreams = background.task();

let mut lb = pingora_proxy::http_proxy_service(&my_server.configuration, LB(upstreams));
lb.add_tcp("0.0.0.0:6188");

let cert_path = format!("{}/tests/keys/server.crt", env!("CARGO_MANIFEST_DIR"));
let key_path = format!("{}/tests/keys/key.pem", env!("CARGO_MANIFEST_DIR"));

let mut tls_settings = TlsSettings::intermediate(&cert_path, &key_path).unwrap();
tls_settings.enable_h2();
lb.add_tls_with_settings("0.0.0.0:6189", None, tls_settings);

my_server.add_service(lb);
my_server.add_service(background);
my_server.run_forever();
}
Loading