|
| 1 | +use crate::crud_table::{ |
| 2 | + AdminCrudError, AdminCrudTable, ItemWithId, ListQueryParams, ListableTrait, PaginatedResult, |
| 3 | +}; |
| 4 | +use crate::entity; |
| 5 | +use futures::FutureExt; |
| 6 | +use metrics::{counter, gauge, Counter, Gauge}; |
| 7 | +use moka::future::Cache; |
| 8 | +use moka::notification::ListenerFuture; |
| 9 | +use sea_orm::ActiveValue::Unchanged; |
| 10 | +use sea_orm::{ |
| 11 | + ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, NotSet, PaginatorTrait, |
| 12 | + QueryFilter, Set, |
| 13 | +}; |
| 14 | +use serde::{Deserialize, Serialize}; |
| 15 | +use std::time::Duration; |
| 16 | +use utoipa::ToSchema; |
| 17 | +use uuid::Uuid; |
| 18 | + |
| 19 | +#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)] |
| 20 | +pub struct ApiKey { |
| 21 | + pub key: Uuid, |
| 22 | + pub description: String, |
| 23 | + pub project_id: Uuid, |
| 24 | + pub created_at: Option<chrono::NaiveDateTime>, |
| 25 | +} |
| 26 | + |
| 27 | +pub struct ApiKeyController { |
| 28 | + db: DatabaseConnection, |
| 29 | + authorization_cache: Cache<Uuid, Uuid>, |
| 30 | + authorization_cache_gauge: Gauge, |
| 31 | + authorization_cache_hit: Counter, |
| 32 | + authorization_cache_miss: Counter, |
| 33 | + authorization_invalid_key: Counter, |
| 34 | +} |
| 35 | + |
| 36 | +impl ApiKeyController { |
| 37 | + pub fn new(db: DatabaseConnection) -> Self { |
| 38 | + let authorization_cache_capacity = 100; |
| 39 | + gauge!("ingest_api_key_cache_capacity").set(authorization_cache_capacity as f64); |
| 40 | + |
| 41 | + let authorization_cache_gauge = gauge!("ingest_api_key_cache_total"); |
| 42 | + let authorization_cache_gauge_for_fut = authorization_cache_gauge.clone(); |
| 43 | + |
| 44 | + let authorization_cache = Cache::builder() |
| 45 | + .max_capacity(authorization_cache_capacity) |
| 46 | + .time_to_live(Duration::from_secs(1)) |
| 47 | + .async_eviction_listener(move |_, _, _| -> ListenerFuture { |
| 48 | + authorization_cache_gauge_for_fut.decrement(1); |
| 49 | + async {}.boxed() |
| 50 | + }) |
| 51 | + .build(); |
| 52 | + |
| 53 | + Self { |
| 54 | + db, |
| 55 | + authorization_cache, |
| 56 | + authorization_cache_gauge, |
| 57 | + authorization_cache_hit: counter!("ingest_api_key_cache_hit"), |
| 58 | + authorization_cache_miss: counter!("ingest_api_key_cache_miss"), |
| 59 | + authorization_invalid_key: counter!("ingest_api_key_invalid"), |
| 60 | + } |
| 61 | + } |
| 62 | +} |
| 63 | + |
| 64 | +impl From<entity::api_key::Model> for ApiKey { |
| 65 | + fn from(value: entity::api_key::Model) -> Self { |
| 66 | + ApiKey { |
| 67 | + key: value.key, |
| 68 | + description: value.description, |
| 69 | + project_id: value.project_id, |
| 70 | + created_at: Some(value.created_at), |
| 71 | + } |
| 72 | + } |
| 73 | +} |
| 74 | + |
| 75 | +impl AdminCrudTable for ApiKeyController { |
| 76 | + type Item = ApiKey; |
| 77 | + |
| 78 | + async fn get_by_id(&self, id: Uuid) -> Result<Option<Self::Item>, AdminCrudError> { |
| 79 | + let query = entity::api_key::Entity::find_by_id(id) |
| 80 | + .one(&self.db) |
| 81 | + .await?; |
| 82 | + Ok(query.map(ApiKey::from)) |
| 83 | + } |
| 84 | + |
| 85 | + async fn list( |
| 86 | + &self, |
| 87 | + params: ListQueryParams, |
| 88 | + ) -> Result<PaginatedResult<ItemWithId<Self::Item>>, AdminCrudError> { |
| 89 | + let params = params.try_into()?; |
| 90 | + let query = entity::api_key::Entity::find() |
| 91 | + .apply_params(¶ms) |
| 92 | + .unwrap() |
| 93 | + .all(&self.db) |
| 94 | + .await?; |
| 95 | + |
| 96 | + Ok(PaginatedResult { |
| 97 | + items: query |
| 98 | + .into_iter() |
| 99 | + .map(|m| ItemWithId { |
| 100 | + id: m.id, |
| 101 | + item: ApiKey::from(m), |
| 102 | + }) |
| 103 | + .collect(), |
| 104 | + total: entity::api_key::Entity::find() |
| 105 | + .apply_filter(¶ms)? |
| 106 | + .count(&self.db) |
| 107 | + .await?, |
| 108 | + }) |
| 109 | + } |
| 110 | + |
| 111 | + async fn create(&self, item: Self::Item) -> Result<ItemWithId<Self::Item>, AdminCrudError> { |
| 112 | + let id = Uuid::now_v7(); |
| 113 | + let key = Uuid::new_v4(); |
| 114 | + |
| 115 | + entity::api_key::ActiveModel { |
| 116 | + id: Set(id), |
| 117 | + key: Set(key), |
| 118 | + project_id: Set(item.project_id), |
| 119 | + description: Set(item.description.to_string()), |
| 120 | + created_at: NotSet, |
| 121 | + } |
| 122 | + .insert(&self.db) |
| 123 | + .await?; |
| 124 | + |
| 125 | + Ok(ItemWithId { |
| 126 | + id, |
| 127 | + item: ApiKey { |
| 128 | + key, |
| 129 | + description: item.description.to_string(), |
| 130 | + project_id: item.project_id, |
| 131 | + created_at: Some(chrono::Utc::now().naive_utc()), |
| 132 | + }, |
| 133 | + }) |
| 134 | + } |
| 135 | + |
| 136 | + async fn update( |
| 137 | + &self, |
| 138 | + id: Uuid, |
| 139 | + item: Self::Item, |
| 140 | + ) -> Result<ItemWithId<Self::Item>, AdminCrudError> { |
| 141 | + entity::api_key::ActiveModel { |
| 142 | + id: Unchanged(id), |
| 143 | + key: NotSet, |
| 144 | + project_id: NotSet, |
| 145 | + description: Set(item.description.to_string()), |
| 146 | + created_at: NotSet, |
| 147 | + } |
| 148 | + .update(&self.db) |
| 149 | + .await?; |
| 150 | + Ok(ItemWithId { id, item }) |
| 151 | + } |
| 152 | + |
| 153 | + async fn delete(&self, id: Uuid) -> Result<(), AdminCrudError> { |
| 154 | + entity::api_key::Entity::delete_by_id(id) |
| 155 | + .exec(&self.db) |
| 156 | + .await?; |
| 157 | + Ok(()) |
| 158 | + } |
| 159 | +} |
| 160 | + |
| 161 | +pub enum ApiKeyError { |
| 162 | + InvalidApiKey, |
| 163 | + InternalError, |
| 164 | +} |
| 165 | + |
| 166 | +impl ApiKeyController { |
| 167 | + pub async fn authorize_api_key(&self, key: &str) -> Result<Uuid, ApiKeyError> { |
| 168 | + let Ok(key_uuid) = Uuid::try_parse(key) else { |
| 169 | + self.authorization_invalid_key.increment(1); |
| 170 | + return Err(ApiKeyError::InvalidApiKey); |
| 171 | + }; |
| 172 | + |
| 173 | + let cached_project_id = self.authorization_cache.get(&key_uuid).await; |
| 174 | + |
| 175 | + if let Some(project_id) = cached_project_id { |
| 176 | + // Cache Hit |
| 177 | + self.authorization_cache_hit.increment(1); |
| 178 | + Ok(project_id) |
| 179 | + } else { |
| 180 | + // Cache Miss |
| 181 | + self.authorization_cache_miss.increment(1); |
| 182 | + |
| 183 | + let Some(api_key_entry) = entity::api_key::Entity::find() |
| 184 | + .filter(entity::api_key::Column::Key.eq(key_uuid)) |
| 185 | + .one(&self.db) |
| 186 | + .await |
| 187 | + .map_err(|_| ApiKeyError::InternalError)? |
| 188 | + else { |
| 189 | + self.authorization_invalid_key.increment(1); |
| 190 | + return Err(ApiKeyError::InvalidApiKey); |
| 191 | + }; |
| 192 | + |
| 193 | + let project_id = api_key_entry.project_id; |
| 194 | + |
| 195 | + self.authorization_cache.insert(key_uuid, project_id).await; |
| 196 | + self.authorization_cache_gauge.increment(1); |
| 197 | + |
| 198 | + Ok(project_id) |
| 199 | + } |
| 200 | + } |
| 201 | +} |
0 commit comments