Skip to content

Commit

Permalink
Merge pull request #1 from go-bazzinga/call_offchain
Browse files Browse the repository at this point in the history
ml feed cache
  • Loading branch information
komal-sai-yral authored Sep 3, 2024
2 parents b489d7c + 62936ea commit 555980c
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 116 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/deploy-on-merge-to-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ jobs:
flyctl secrets set --stage --app "yral-ml-feed-server" PROJECT="hot-or-not-feed-intelligence"
flyctl secrets set --stage --app "yral-ml-feed-server" "SERVICE_CRED=$SERVICE_CRED"
flyctl secrets set --stage --app "yral-ml-feed-server" GS_VIDEO_BUCKET="yral-videos"
flyctl secrets set --stage --app "yral-ml-feed-server" "GRPC_OFF_CHAIN_JWT_TOKEN=$GRPC_OFF_CHAIN_JWT_TOKEN"
env:
FLY_API_TOKEN: ${{ secrets.YRAL_GPU_COMPUTE_TASKS_GITHUB_ACTION_FLY_IO_DEPLOYMENT_TOKEN }}
UPSTASH_URL: ${{ secrets.UPSTASH_URL }}
UPSTASH_TOKEN: ${{ secrets.UPSTASH_TOKEN }}
SERVICE_CRED: ${{ secrets.SERVICE_CRED }}
GRPC_OFF_CHAIN_JWT_TOKEN: ${{ secrets.ENCODED_JWT_TOKEN_FOR_CALLING_ML_FEED_SERVER_FROM_OFFCHAIN_AGENT_SERVER }}
- name: Deploy a docker container to fly.io
run: flyctl deploy --remote-only --yes
env:
Expand Down
10 changes: 10 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,15 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.out_dir(out_dir)
.compile(&[proto_file], &["proto"])?;

// offchain client
let proto_file = "contracts/projects/off_chain/offchain_canister.proto";
let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());

tonic_build::configure()
.build_client(true)
.build_server(false)
.out_dir(out_dir)
.compile(&[proto_file], &["proto"])?;

Ok(())
}
2 changes: 1 addition & 1 deletion contracts
2 changes: 2 additions & 0 deletions rust_src/consts.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
pub const ML_FEED_PY_SERVER: &str =
"python_proc.process.yral-ml-feed-server-staging-v2.internal:50059";

pub const OFF_CHAIN_AGENT: &str = "https://icp-off-chain-agent.fly.dev:443"; //"http://localhost:50051";
176 changes: 61 additions & 115 deletions rust_src/ml_feed_impl.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashSet;
use std::env;
use std::sync::Arc;

use candid::Principal;
Expand All @@ -7,9 +8,12 @@ use ml_feed::{FeedRequest, FeedResponse, PostItem, PostItemResponse};
use ml_feed::ml_feed_server::MlFeed;
use ml_feed_py::ml_feed_client::MlFeedClient;
use ml_feed_py::MlFeedRequest;
use off_chain::off_chain_canister_client::OffChainCanisterClient;
use tonic::metadata::MetadataValue;
use tonic::transport::Channel;
use tonic::{Request, Response, Status};

use crate::consts::ML_FEED_PY_SERVER;
use crate::consts::{ML_FEED_PY_SERVER, OFF_CHAIN_AGENT};
use crate::utils::{to_rfc3339, to_rfc3339_did_systemtime};
use crate::{canister, AppState};

Expand All @@ -25,6 +29,10 @@ pub mod ml_feed_py {
tonic::include_file_descriptor_set!("ml_feed_py_descriptor");
}

pub mod off_chain {
tonic::include_proto!("offchain_canister");
}

pub struct MLFeedService {
pub shared_state: Arc<AppState>,
}
Expand Down Expand Up @@ -54,7 +62,7 @@ impl MlFeed for MLFeedService {
.map_or(vec![], |x| x);

let mut client = match MlFeedClient::connect(
"http://python_proc.process.yral-ml-feed-server.internal:50059",
"http://python_proc.process.yral-ml-feed-server.internal:50059", // http://python_proc.process.yral-ml-feed-server.internal:50059"
)
.await
{
Expand Down Expand Up @@ -133,122 +141,60 @@ impl MlFeed for MLFeedService {
.filter(|e| seen.insert((e.canister_id.clone(), e.post_id)))
.collect::<Vec<PostItemResponse>>();

let response_items1 = response_items.clone();
tokio::spawn(async move {
send_to_offchain(canister_id, response_items1).await;
});

return Ok(Response::new(FeedResponse {
feed: response_items,
}));
}
}

pub async fn send_to_offchain(canister_id_principal_str: String, items: Vec<PostItemResponse>) {
let channel = Channel::from_static(OFF_CHAIN_AGENT)
.connect()
.await
.expect("channel creation failed");

let grpc_offchain_token =
env::var("GRPC_OFF_CHAIN_JWT_TOKEN").expect("GRPC_OFF_CHAIN_JWT_TOKEN must be set");

let token: MetadataValue<_> = format!("Bearer {}", grpc_offchain_token)
.parse()
.expect("invalid metadata value");

let mut client =
OffChainCanisterClient::with_interceptor(channel, move |mut req: Request<()>| {
req.metadata_mut().insert("authorization", token.clone());
Ok(req)
});

// let res = vec![
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 125,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 124,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 123,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 122,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 121,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 120,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 119,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 118,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 117,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 116,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 115,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 114,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 113,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 112,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 111,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 110,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 109,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 108,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 107,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 106,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 105,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 104,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 103,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 102,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 101,
// },
// PostItemResponse {
// canister_id: "76qol-iiaaa-aaaak-qelkq-cai".to_string(),
// post_id: 100,
// },
// ];

// let res_limited = res
// .into_iter()
// .take(limit as usize)
// .collect::<Vec<PostItemResponse>>();

// Ok(Response::new(FeedResponse { feed: res_limited }))
let offchain_items = items
.into_iter()
.map(|x| off_chain::MlFeedCacheItem {
post_id: x.post_id as u64,
canister_id: x.canister_id,
video_id: "".to_string(),
creator_principal_id: "".to_string(),
})
.collect::<Vec<off_chain::MlFeedCacheItem>>();

let request = tonic::Request::new(off_chain::UpdateMlFeedCacheRequest {
user_canister_id: canister_id_principal_str,
items: offchain_items,
});

let response = client.update_ml_feed_cache(request).await.map_err(|e| {
Status::internal(format!(
"Failed to get update_ml_feed_cache response: {}",
e
))
});

match response {
Ok(_) => (),
Err(e) => println!("Failed to get update_ml_feed_cache response: {}", e),
}
}

0 comments on commit 555980c

Please sign in to comment.