Skip to content

Commit

Permalink
redis docker - draft PR
Browse files Browse the repository at this point in the history
  • Loading branch information
benartuso committed Aug 18, 2023
1 parent 739c453 commit c1a9a16
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 55 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,6 @@ FROM debian:bullseye-slim AS runtime
WORKDIR /oxen-server
COPY --from=builder /usr/src/oxen-server/target/release/oxen-server /usr/local/bin
ENV SYNC_DIR=/var/oxen/data
ENV REDIS_URL=redis://redis/
EXPOSE 3001
CMD ["oxen-server", "start", "-p", "3001"]
5 changes: 5 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,8 @@ services:
labels:
- "traefik.enable=true"
- "traefik.http.routers.oxen.rule=Host(`0.0.0.0`)"
redis:
image: redis:latest
ports:
- "6379:6379"

2 changes: 1 addition & 1 deletion src/lib/src/api/remote/commits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ pub async fn bulk_post_push_complete(
) -> Result<(), OxenError> {
use serde_json::json;

let uri = format!("/commits/complete");
let uri = "/commits/complete".to_string();
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
log::debug!("bulk_post_push_complete: {}", url);
let body = serde_json::to_string(&json!(commits)).unwrap();
Expand Down
6 changes: 3 additions & 3 deletions src/lib/src/core/index/pusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ async fn push_missing_commit_dbs(
async fn push_missing_commit_entries(
local_repo: &LocalRepository,
remote_repo: &RemoteRepository,
branch: &Branch,
_branch: &Branch,
commits: &Vec<Commit>,
) -> Result<(), OxenError> {
log::debug!("rpush_entries num unsynced {}", commits.len());
Expand Down Expand Up @@ -418,13 +418,13 @@ async fn push_missing_commit_entries(

// Treat this as one giant commit for testing - the commit dbs will know the difference.

if unsynced_entries.len() != 0 {
if !unsynced_entries.is_empty() {
let all_entries = UnsyncedCommitEntries {
commit: commits[0].to_owned(),
entries: unsynced_entries,
};

let bar = Arc::new(ProgressBar::new(total_size as u64));
let bar = Arc::new(ProgressBar::new(total_size));

push_entries(
local_repo,
Expand Down
3 changes: 1 addition & 2 deletions src/server/src/app_data.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use redis::Client;
use std::{path::PathBuf, sync::Arc};
use std::path::PathBuf;

#[derive(Debug, Clone)]
pub struct OxenAppData {
Expand Down
42 changes: 9 additions & 33 deletions src/server/src/controllers/commits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ pub async fn latest_synced(req: HttpRequest) -> actix_web::Result<HttpResponse,
// Iterate first to last over commits
for commit in commits {
log::debug!("latest_synced checking commit {:?}", commit.id);
let _response = match commit_cacher::get_status(&repository, &commit) {
match commit_cacher::get_status(&repository, &commit) {
Ok(Some(CacherStatusType::Success)) => {
match content_validator::is_valid(&repository, &commit) {
Ok(true) => {
Expand All @@ -203,15 +203,15 @@ pub async fn latest_synced(req: HttpRequest) -> actix_web::Result<HttpResponse,
err => {
// Desired behavior here?
log::error!("latest_synced content_validator::is_valid error {err:?}");
return Ok(
HttpResponse::InternalServerError().json(IsValidStatusMessage {
return Ok(HttpResponse::InternalServerError().json(
IsValidStatusMessage {
status: String::from(STATUS_ERROR),
status_message: String::from(MSG_INTERNAL_SERVER_ERROR),
status_description: format!("Err: {err:?}"),
is_processing: false,
is_valid: false,
}),
);
},
));
}
}
}
Expand Down Expand Up @@ -891,7 +891,7 @@ pub async fn complete_bulk(req: HttpRequest, body: String) -> Result<HttpRespons
// name to the repo, should be in url path so okay to unwrap
let namespace: &str = req.match_info().get("namespace").unwrap();
let repo_name: &str = req.match_info().get("repo_name").unwrap();
let repo = get_repo(&app_data.path, namespace, repo_name)?;
let _repo = get_repo(&app_data.path, namespace, repo_name)?;
// Deserialize the "commits" param into Vec<Commit> with serde
let commits: Vec<Commit> = match serde_json::from_str(&body) {
Ok(commits) => commits,
Expand All @@ -900,8 +900,8 @@ pub async fn complete_bulk(req: HttpRequest, body: String) -> Result<HttpRespons

// Redis connection - TODO, make a globally accessible connection pool

let redis_url = std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost".to_string());
let redis_client = redis::Client::open(redis_url)?;
let redis_url = std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost/".to_string());
let redis_client = redis::Client::open(redis_url).expect("Failed to connect to redis");
let mut con = redis_client.get_connection()?;

// Get repo by name
Expand All @@ -920,12 +920,8 @@ pub async fn complete_bulk(req: HttpRequest, body: String) -> Result<HttpRespons
let commit = commit_reader
.get_commit_by_id(&commit_id)?
.ok_or(OxenError::revision_not_found(commit_id.clone().into()))?;
// TODO: don't clone repo

let repo_path_clone = repo_path.clone();
// let repo_clone = repo.clone();
// std::thread::spawn(move || {
// log::debug!("Processing commit {:?} on repo {:?}", commit, &repo_path_clone);
// });

// Append a task to the queue
let task = PostPushComplete {
Expand All @@ -944,26 +940,6 @@ pub async fn complete_bulk(req: HttpRequest, body: String) -> Result<HttpRespons
.arg("commit_queue")
.arg(task_bytes.clone())
.query(&mut con)?;

// let force = false;
// match commit_cacher::run_all(&repo_clone, &commit, force) {
// Ok(_) => {
// log::debug!(
// "Success processing commit {:?} on repo {:?}",
// commit,
// &repo_path_clone
// );
// }
// Err(err) => {
// log::error!(
// "Could not process commit {:?} on repo {:?}: {}",
// commit,
// &repo_path_clone,
// err
// );
// }
// };
log::debug!("continuing on with the controller for now...")
}
Ok(HttpResponse::Ok().json(StatusMessage::resource_created()))
}
Expand Down
15 changes: 7 additions & 8 deletions src/server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use actix_http::Request;
use actix_web::web::{Bytes, BytesMut};
use liboxen::config::UserConfig;
use liboxen::error::OxenError;

use liboxen::model::User;

pub mod app_data;
Expand All @@ -18,18 +16,17 @@ pub mod view;
extern crate log;

use actix_web::middleware::{Condition, Logger};
use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer};
use actix_web::{web, App, HttpServer};
use actix_web_httpauth::middleware::HttpAuthentication;
use bincode;

use clap::{Arg, Command};
use env_logger::Env;
use redis;

use serde::{Deserialize, Serialize};
use std::path::Path;
use std::time::Duration;
use tokio::time::sleep;

use crate::errors::OxenHttpError;
use crate::tasks::post_push_complete::PostPushComplete;

const VERSION: &str = liboxen::constants::OXEN_VERSION;
Expand Down Expand Up @@ -58,7 +55,9 @@ async fn main() -> std::io::Result<()> {

// Redis polling worker setup
async fn run_redis_poller() {
let redis_client = redis::Client::open("redis://127.0.0.1/").unwrap();
let redis_url =
std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost/".to_string());
let redis_client = redis::Client::open(redis_url).expect("Failed to connect to redis");

let mut con = redis_client.get_connection().unwrap();
loop {
Expand Down
3 changes: 1 addition & 2 deletions src/server/src/tasks/post_push_complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use liboxen::{
model::{Commit, LocalRepository},
};
use serde::{Deserialize, Serialize};
use std::{thread, time};

#[derive(Serialize, Deserialize, Debug)]
pub struct PostPushComplete {
Expand All @@ -12,7 +11,7 @@ pub struct PostPushComplete {
}

impl PostPushComplete {
pub fn run(self) -> () {
pub fn run(self) {
log::debug!(
"Running cachers for commit {:?} on repo {:?} from redis queue",
self.commit.id,
Expand Down
6 changes: 0 additions & 6 deletions src/server/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,6 @@ pub fn get_sync_dir() -> Result<PathBuf, OxenError> {
std::fs::create_dir_all(&sync_dir)?;
Ok(sync_dir)
}

pub fn get_redis_client() -> Result<Arc<redis::Client>, OxenError> {
let redis_url = std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost".to_string());
let client = redis::Client::open(redis_url.as_str())?;
Ok(Arc::new(client))
}
pub fn create_local_repo(
sync_dir: &Path,
namespace: &str,
Expand Down

0 comments on commit c1a9a16

Please sign in to comment.