use std::collections::HashMap; use axum::{extract::State, http::HeaderMap, Json}; use sqlx::PgPool; use crate::{ auth::get_user_from_header, database::model::Member as DbMember, model::{ member::{Groups, Name, Roles}, Member, }, util::convert_vec, AppState, }; pub async fn migrate_request( State(state): State, headers: HeaderMap, body: String, ) -> Result, crate::Error> { let user = get_user_from_header(&state.pool, &headers).await?; user.authorize(&state.pool, Some(Roles::ADMIN), None) .await?; tracing::info!("Migration is requested"); // Convert the input CSV to a vector of members let members_new: Vec = Row::from_csv_many(&body)? .into_iter() .map(|m| m.into()) .collect(); let members_old = convert_vec(DbMember::get_all(&state.pool).await?); let members_diff = generate_diff(members_new, members_old); let count = state .migration_store .lock() .await .insert(members_diff.clone()); Ok(Json(MigrationResponse::from((count, members_diff)))) } pub async fn migrate_confirm( State(state): State, headers: HeaderMap, body: String, ) -> Result<(), crate::Error> { let user = get_user_from_header(&state.pool, &headers).await?; user.authorize(&state.pool, Some(Roles::ADMIN), None) .await?; tracing::info!("Migration is confirmed"); let count = match body.trim().parse::() { Ok(c) => c, Err(_) => { return Err(crate::Error::BadRequest { expected: String::from("u32"), }) } }; let mut store = state.migration_store.lock().await; let members_diff = match store.remove(&count) { Some(m) => m, None => return Err(crate::Error::NotFound), }; let inserted_len = members_diff.insert.len(); let update_len = members_diff.update.len(); let remove_len = members_diff.remove.len(); migrate_transaction(&state.pool, members_diff).await?; tracing::info!( "Migration is successfully executed. Inserted: {}, updated: {}, removed: {}", inserted_len, update_len, remove_len ); Ok(()) } async fn migrate_transaction(pool: &PgPool, members_diff: MembersDiff) -> Result<(), sqlx::Error> { let mut transaction = pool.begin().await?; DbMember::insert_many(&mut transaction, convert_vec(members_diff.insert)).await?; DbMember::update_many(&mut transaction, convert_vec(members_diff.update)).await?; let members_remove_ids: Vec = members_diff.remove.into_iter().map(|m| m.id).collect(); DbMember::remove_many(&mut transaction, &members_remove_ids).await?; transaction.commit().await?; Ok(()) } // Create a row for the csv file #[derive(Debug, serde::Deserialize, Clone)] struct Row { #[serde(rename = "Relatiecode")] id: String, #[serde(rename = "Roepnaam")] first_name: String, // #[serde(rename = "Tussenvoegsel(s)")] // middle_name: String, // #[serde(rename = "Achternaam")] // last_name: String, #[serde(rename = "E-mail")] email: String, #[serde(rename = "Verenigingssporten")] groups: String, #[serde(rename = "Diploma dropdown 1")] diploma: Option, } #[derive(Debug, Clone)] pub struct MembersDiff { insert: Vec, update: Vec, remove: Vec, } #[derive(serde::Serialize)] pub struct MigrationResponse { count: u32, insert: Vec<(String, Name)>, update: Vec<(String, Name)>, remove: Vec<(String, Name)>, } #[derive(Default)] pub struct MigrationStore { pub store: HashMap, pub count: u32, } impl Row { fn from_csv_many(input: &str) -> Result, csv::Error> { let mut rdr = csv::ReaderBuilder::new() .delimiter(b';') .from_reader(input.as_bytes()); let members: Result, csv::Error> = rdr.deserialize().collect(); members } fn groups_parsed(&self) -> Groups { let mut groups: Vec = Vec::new(); let group_parts: Vec<&str> = self.groups.split(", ").collect(); for group in group_parts { let hour_parts: Vec<&str> = group.split(" - ").collect(); if let Some(group) = hour_parts.get(1) { groups.push(group.to_uppercase()) } } let groups_string = groups.join("|"); bitflags::parser::from_str(&groups_string).unwrap_or(Groups::empty()) } } impl From for Name { fn from(val: Row) -> Self { Name { first: val.first_name, full: "Temporarely full name".to_string(), } } } impl From for Member { fn from(val: Row) -> Self { let name: Name = val.clone().into(); Member { id: val.id.clone(), name, registration_token: None, diploma: val.diploma.clone(), groups: val.groups_parsed(), roles: Roles::MEMBER, } } } impl From<(u32, MembersDiff)> for MigrationResponse { fn from(value: (u32, MembersDiff)) -> Self { let members_insert: Vec<(String, Name)> = value.1.insert.into_iter().map(|m| (m.id, m.name)).collect(); let members_update: Vec<(String, Name)> = value.1.update.into_iter().map(|m| (m.id, m.name)).collect(); let members_remove: Vec<(String, Name)> = value.1.remove.into_iter().map(|m| (m.id, m.name)).collect(); Self { count: value.0, insert: members_insert, update: members_update, remove: members_remove, } } } impl MigrationStore { fn insert(&mut self, members_diff: MembersDiff) -> u32 { let count = self.count + 1; self.store.insert(count, members_diff); self.count = count; count } fn get(&self, id: &u32) -> Option<&MembersDiff> { self.store.get(id) } fn remove(&mut self, id: &u32) -> Option { self.store.remove(id) } } fn generate_diff(members_new: Vec, members_old: Vec) -> MembersDiff { let members_old_map: HashMap = members_old .iter() .map(|m| (m.id.clone(), m.clone())) .collect(); let members_new_map: HashMap = members_new .iter() .map(|m| (m.id.clone(), m.clone())) .collect(); let mut members_insert: Vec = Vec::new(); let mut members_update: Vec = Vec::new(); let mut members_remove: Vec = Vec::new(); for old_member in members_old { if let Some(new_member) = members_new_map.get(&old_member.id) { members_update.push(Member { id: old_member.id, name: new_member.name.clone(), registration_token: old_member.registration_token, diploma: new_member.diploma.clone(), groups: new_member.groups, roles: old_member.roles, }) } else { members_remove.push(old_member); } } for new_member in members_new { if !members_old_map.contains_key(&new_member.id) { members_insert.push(new_member); } } MembersDiff { insert: members_insert, update: members_update, remove: members_remove, } }