Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release tag and Control-c fixes #12

Merged
merged 6 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 11 additions & 21 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"flag"
"fmt"
"os"
"os/signal"
"path/filepath"
"runtime"
"strings"
Expand All @@ -27,7 +26,7 @@ var (
"gemma2:9b-instruct-q8_0",
"gemma2:9b-instruct-fp16",

"llama3.1:latest",
"llama3.1:latest",
"llama3.1:8b-instruct-q8_0",
"llama3.1:8b-instruct-fp16",
"llama3.1:70b-instruct-q4_0",
Expand All @@ -39,9 +38,9 @@ var (
"qwen2.5:7b-instruct-fp16",
"qwen2.5:32b-instruct-fp16",
"qwen2.5-coder:1.5b",

"deepseek-coder:6.7b",

"mixtral:8x7b",
}
// https://github.com/andthattoo/ollama-workflows/edit/main/src/program/models.rs#L76
Expand Down Expand Up @@ -171,7 +170,7 @@ func main() {
}

// get latest dkn_compute binary version
latestVersion, err := utils.GetComputeLatestTag(*use_compute_dev_version)
computeVersion, err := utils.GetComputeVersionTag(!(*use_compute_dev_version), *use_compute_dev_version, false)
if err != nil {
fmt.Println("Couldn't get the latest dkn-compute version")
utils.ExitWithDelay(1)
Expand All @@ -181,24 +180,24 @@ func main() {
// check dkn-compute binary has already installed
if utils.FileExists(utils.ComputeBinaryFileName()) {
// compare current and latest versions
if latestVersion != envvars["DKN_COMPUTE_VERSION"] {
fmt.Printf("New dkn-compute version detected (%s), downloading it...\n", latestVersion)
if err := utils.DownloadLatestComputeBinary(latestVersion, working_dir, dkn_compute_binary); err != nil {
if computeVersion != envvars["DKN_COMPUTE_VERSION"] {
fmt.Printf("New dkn-compute version detected (%s), downloading it...\n", computeVersion)
if err := utils.DownloadLatestComputeBinary(computeVersion, working_dir, dkn_compute_binary); err != nil {
fmt.Printf("Error during downloading the latest dkn-compute binary %s\n", err)
utils.ExitWithDelay(1)
}
envvars["DKN_COMPUTE_VERSION"] = latestVersion
envvars["DKN_COMPUTE_VERSION"] = computeVersion
} else {
fmt.Printf("Current version is up to date (%s)\n", envvars["DKN_COMPUTE_VERSION"])
}
} else {
// couldn't find the dkn-compute binary, download it
fmt.Printf("Downloading the latest dkn-compute binary (%s)\n", latestVersion)
if err := utils.DownloadLatestComputeBinary(latestVersion, working_dir, dkn_compute_binary); err != nil {
fmt.Printf("Downloading the latest dkn-compute binary (%s)\n", computeVersion)
if err := utils.DownloadLatestComputeBinary(computeVersion, working_dir, dkn_compute_binary); err != nil {
fmt.Printf("Error during downloading the latest dkn-compute binary %s\n", err)
utils.ExitWithDelay(1)
}
envvars["DKN_COMPUTE_VERSION"] = latestVersion
envvars["DKN_COMPUTE_VERSION"] = computeVersion
}

// dump the final env
Expand Down Expand Up @@ -245,16 +244,7 @@ func main() {
fmt.Printf("ERROR during running exe, %s", err)
utils.ExitWithDelay(1)
}
fmt.Println("All good! Compute node is up and running.")
fmt.Println("\nUse Control-C to exit")

sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
<-sig

fmt.Println("\nShutting down...")

fmt.Println("\nbye")
os.Exit(0)
}

Expand Down
17 changes: 11 additions & 6 deletions utils/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,19 @@ func RunCommand(working_dir string, outputDest string, wait bool, timeout time.D
// If wait is false, handle output asynchronously
if !wait {
go func() {
// Handle logging asynchronously, so it continues even after the main program ends
// Ensure to check if logFile is not nil
if logFile != nil {
stdoutPipe, _ := cmd.StdoutPipe()
stderrPipe, _ := cmd.StderrPipe()

// Start goroutines to copy the command's stdout and stderr to the log file
go io.Copy(logFile, stdoutPipe)
go io.Copy(logFile, stderrPipe)
stdoutPipe, stdoutErr := cmd.StdoutPipe()
stderrPipe, stderrErr := cmd.StderrPipe()

// Check for pipe errors before starting goroutines
if stdoutErr == nil && stdoutPipe != nil {
go io.Copy(logFile, stdoutPipe)
}
if stderrErr == nil && stderrPipe != nil {
go io.Copy(logFile, stderrPipe)
}
}
// Ensure the process runs to completion
cmd.Wait()
Expand Down
173 changes: 135 additions & 38 deletions utils/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,74 +10,152 @@ import (
"strings"
)

// GetComputeLatestTag fetches the latest tag from the DKN Compute Node repository on GitHub.
// This tag represents the latest version of the compute node.
// GetComputeVersionTag fetches a specific tag from the DKN Compute Node repository on GitHub based on the provided parameters.
// It can return the latest stable release, the latest development version, or the previous stable release.
//
// Parameters:
// - dev: A boolean parameter. If true, it returns the latest tag with the '-dev' suffix.
// If false, it returns the latest tag without the '-dev' suffix.
// - latest: If true, it returns the latest stable release from the repository.
// - dev: If true, it returns the latest tag with the '-dev' suffix.
// - previous_latest: If true, it returns the previous stable release before the latest.
//
// Returns:
// - string: The latest tag (version) as a string, filtered by the '-dev' suffix based on the dev parameter.
// - string: The requested tag (version) as a string, based on the provided parameters.
// - error: An error if the request fails, the response cannot be parsed, or no valid tags are found.
func GetComputeLatestTag(dev bool) (string, error) {
//
// Note:
// - If `latest` is true, the function fetches the latest release from the GitHub API.
// - If `dev` is true, it searches for the latest tag with the '-dev' suffix from the sorted tags.
// - If `previous_latest` is true, it returns the previous stable release tag (ignoring '-dev' tags).
// - The function prioritizes parameters in the following order: latest, dev, previous_latest.
func GetComputeVersionTag(latest bool, dev bool, previous_latest bool) (string, error) {
if latest {
url := "https://api.github.com/repos/firstbatchxyz/dkn-compute-node/releases/latest"

resp, err := http.Get(url)
if err != nil {
return "", fmt.Errorf("failed to make request: %v", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("failed to get latest release, status code: %d", resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed to read response: %w", err)
}

// Create a map to store the response
var result map[string]interface{}
if err := json.Unmarshal(body, &result); err != nil {
return "", fmt.Errorf("failed to parse JSON: %v", err)
}

// Extract the tag_name from the map
tagName, ok := result["tag_name"].(string)
if !ok {
return "", fmt.Errorf("tag_name not found or not a string")
}

return tagName, nil

} else if dev {
tags, err := GetSortedTags()
if err != nil {
return "", err
}
// Iterate through the tags and return the first one based on the 'dev' parameter
for _, tag := range tags {
tagName, ok := tag["name"].(string)
if !ok {
return "", fmt.Errorf("failed to extract tag name")
}

// Return the first tag with '-dev' suffix if dev is true
if strings.HasSuffix(tagName, "-dev") {
return tagName, nil
}
}
} else if previous_latest {
tags, err := GetSortedTags()
if err != nil {
return "", err
}
latest_encountered := false
// Iterate through the tags and return the previous latest (in the order of semantic versioning)
for _, tag := range tags {
tagName, ok := tag["name"].(string)
if !ok {
return "", fmt.Errorf("failed to extract tag name")
}

// skip the latest tag and -dev suffix tags
if !strings.HasSuffix(tagName, "-dev") && !latest_encountered {
latest_encountered = true
continue
}
//
if !strings.HasSuffix(tagName, "-dev") && latest_encountered {
return tagName, nil
}
}
}

return "", fmt.Errorf("no valid tags found")
}

// GetSortedTags retrieves all tags from the DKN Compute Node repository on GitHub and returns them as a sorted list.
//
// It fetches the tags from the GitHub API and parses them into a list of maps. Each map represents a tag with its attributes.
//
// Returns:
// - []map[string]interface{}: A slice of maps representing the tags from the repository, each containing tag attributes (e.g., name, commit).
// - error: An error if the request fails, the response cannot be parsed, or no tags are found.
func GetSortedTags() ([]map[string]interface{}, error) {
url := "https://api.github.com/repos/firstbatchxyz/dkn-compute-node/tags"

// get and parse the tags
// get and parse the all the tags
resp, err := http.Get(url)
if err != nil {
return "", fmt.Errorf("failed to make request: %w", err)
return nil, fmt.Errorf("failed to make request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("request failed with status code: %d", resp.StatusCode)
return nil, fmt.Errorf("request failed with status code: %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed to read response: %w", err)
return nil, fmt.Errorf("failed to read response: %w", err)
}

var tags []map[string]interface{}
if err := json.Unmarshal(body, &tags); err != nil {
return "", fmt.Errorf("failed to parse JSON: %w", err)
return nil, fmt.Errorf("failed to parse JSON: %w", err)
}
if len(tags) == 0 {
return "", fmt.Errorf("no tags found")
return nil, fmt.Errorf("no tags found")
}

// Iterate through the tags and return the first one based on the 'dev' parameter
for _, tag := range tags {
tagName, ok := tag["name"].(string)
if !ok {
return "", fmt.Errorf("failed to extract tag name")
}

if dev {
// Return the first tag with '-dev' suffix if dev is true
if strings.HasSuffix(tagName, "-dev") {
return tagName, nil
}
} else {
// Return the first tag without '-dev' suffix if dev is false
if !strings.HasSuffix(tagName, "-dev") {
return tagName, nil
}
}
}

return "", fmt.Errorf("no valid tags found")
return tags, nil
}

// DownloadLatestComputeBinary downloads the latest compute binary for the current operating system and architecture
// from the DKN Compute Node GitHub repository.
// from the DKN Compute Node GitHub repository, and saves it to the specified directory with the specified file name.
//
// Parameters:
// - version: The version of the binary to download (e.g., v0.2.4).
// - workingDir: The directory where the binary will be saved.
// - file: The name of the file to save the binary as.
//
// Returns:
// - error: An error if the download or file preparation fails.
// - error: An error if the download, file preparation, or version retrieval fails.
//
// Behavior:
// - Constructs the download URL based on the provided version, operating system, and architecture.
// - If the specified version cannot be downloaded (e.g., due to a 404 error), the function attempts to download the previous stable version.
// - If the previous version download also fails, an error is returned.
// - After downloading, the function applies necessary permissions to the binary by calling `PrepareComputeBinary`.
func DownloadLatestComputeBinary(version, workingDir, file string) error {
os, arch := GetOSAndArch()
extension := ""
Expand All @@ -88,8 +166,27 @@ func DownloadLatestComputeBinary(version, workingDir, file string) error {
// releases/download/v0.2.4-dev
url := fmt.Sprintf("https://github.com/firstbatchxyz/dkn-compute-node/releases/download/%s/%s", version, asset_name)
destPath := filepath.Join(workingDir, file)
if err := DownloadFile(url, destPath); err != nil {
return err
status_code, err := DownloadFile(url, destPath)
if err != nil {
if status_code == 404 {
// if the release exists but the downloads responds with 404, it means the build didn't finished yet
// use the previous latest version
fmt.Println("Warning: The latest compute binaries are currently being built. Downloading the previous version. You can restart the launcher in ~20 minutes to run the latest version.")
version, err = GetComputeVersionTag(false, false, true)
if err != nil {
return err
}
asset_name := fmt.Sprintf("dkn-compute-binary-%s-%s%s", os, arch, extension)
url := fmt.Sprintf("https://github.com/firstbatchxyz/dkn-compute-node/releases/download/%s/%s", version, asset_name)
_, err = DownloadFile(url, destPath)
if err != nil {
// if its couldn't download the previous latest version, raise an error
return err
}
} else {
// raise error for any other status code
return err
}
}

// give the executable privledges etc.
Expand Down
22 changes: 14 additions & 8 deletions utils/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,37 +107,43 @@ func FileExists(parts ...string) bool {
}

// DownloadFile downloads a file from the specified URL and saves it to the specified path.
// It returns the HTTP response status code and an error if any issue occurs during the download or file writing process.
//
// Parameters:
// - url: The URL from which to download the file.
// - path: The local file path where the downloaded file will be saved.
//
// Returns:
// - error: Returns an error if the download or file writing fails, otherwise nil.
func DownloadFile(url, path string) error {
// - int: The HTTP response status code if the download is successful or the specific response code if a failure occurs.
// If the error is unrelated to the HTTP response (e.g., file creation error), it returns -1.
// - error: Returns an error if the download, HTTP response, or file writing fails; otherwise, returns nil.
func DownloadFile(url, path string) (int, error) {
resp, err := http.Get(url)
// use -1 for errors unrelated to http response
response_status_code := -1
if err != nil {
return fmt.Errorf("failed to download file: %v", err)
return response_status_code, fmt.Errorf("failed to download file: %v", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("bad status: %s", resp.Status)
response_status_code = resp.StatusCode
return response_status_code, fmt.Errorf("bad status: %s", resp.Status)
}

// write it as .env
out, err := os.Create(path)
if err != nil {
return fmt.Errorf("failed to create file: %v", err)
return -1, fmt.Errorf("failed to create file: %v", err)
}
defer out.Close()

// write the body to file
_, err = io.Copy(out, resp.Body)
if err != nil {
return fmt.Errorf("failed to write to file: %v", err)
return -1, fmt.Errorf("failed to write to file: %v", err)
}
return nil
return 200, nil
}

// FetchEnvFileFromDknRepo downloads the .env example file from the DKN GitHub repository
Expand All @@ -153,7 +159,7 @@ func FetchEnvFileFromDknRepo(working_dir string) (map[string]string, error) {
// fetch from github
url := "https://raw.githubusercontent.com/firstbatchxyz/dkn-compute-node/master/.env.example"
path := filepath.Join(working_dir, ".env")
if err := DownloadFile(url, path); err != nil {
if _, err := DownloadFile(url, path); err != nil {
return nil, err
}

Expand Down