mirror of
https://github.com/andreili/SBC_builder.git
synced 2025-08-23 19:04:06 +02:00
280 lines
11 KiB
Python
280 lines
11 KiB
Python
from pathlib import Path
|
|
import shutil, git, os, subprocess, glob
|
|
from rich import console, progress
|
|
from . import *
|
|
|
|
class GitRemoteProgress(git.RemoteProgress):
|
|
OP_CODES = [
|
|
"BEGIN",
|
|
"CHECKING_OUT",
|
|
"COMPRESSING",
|
|
"COUNTING",
|
|
"END",
|
|
"FINDING_SOURCES",
|
|
"RECEIVING",
|
|
"RESOLVING",
|
|
"WRITING",
|
|
]
|
|
OP_CODE_MAP = {
|
|
getattr(git.RemoteProgress, _op_code): _op_code for _op_code in OP_CODES
|
|
}
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self.progressbar = progress.Progress(
|
|
progress.SpinnerColumn(),
|
|
# *progress.Progress.get_default_columns(),
|
|
progress.TextColumn("[progress.description]{task.description}"),
|
|
progress.BarColumn(),
|
|
progress.TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
|
|
"eta",
|
|
progress.TimeRemainingColumn(),
|
|
progress.TextColumn("{task.fields[message]}"),
|
|
console=console.Console(),
|
|
transient=False,
|
|
)
|
|
self.progressbar.start()
|
|
self.active_task = None
|
|
|
|
def __del__(self) -> None:
|
|
# logger.info("Destroying bar...")
|
|
self.progressbar.stop()
|
|
|
|
@classmethod
|
|
def get_curr_op(cls, op_code: int) -> str:
|
|
"""Get OP name from OP code."""
|
|
# Remove BEGIN- and END-flag and get op name
|
|
op_code_masked = op_code & cls.OP_MASK
|
|
return cls.OP_CODE_MAP.get(op_code_masked, "?").title()
|
|
|
|
def update(
|
|
self,
|
|
op_code: int,
|
|
cur_count: str | float,
|
|
max_count: str | float | None = None,
|
|
message: str | None = "",
|
|
) -> None:
|
|
# Start new bar on each BEGIN-flag
|
|
if op_code & self.BEGIN:
|
|
self.curr_op = self.get_curr_op(op_code)
|
|
# logger.info("Next: %s", self.curr_op)
|
|
self.active_task = self.progressbar.add_task(
|
|
description=self.curr_op,
|
|
total=max_count,
|
|
message=message,
|
|
)
|
|
|
|
self.progressbar.update(
|
|
task_id=self.active_task,
|
|
completed=cur_count,
|
|
message=message,
|
|
)
|
|
|
|
# End progress monitoring on each END-flag
|
|
if op_code & self.END:
|
|
# logger.info("Done: %s", self.curr_op)
|
|
self.progressbar.update(
|
|
task_id=self.active_task,
|
|
message=f"[bright_black]{message}",
|
|
)
|
|
|
|
class Sources:
|
|
def __init__(self, name, url):
|
|
self.name = name
|
|
self.url = url
|
|
self.bare_dir = f"{ROOT_DIR}/bare_git/{name}"
|
|
self.bare_done_marker = f"{self.bare_dir}/.git_done_marker"
|
|
|
|
def init_source_path(self, board_name, is_shared):
|
|
if (is_shared):
|
|
self.board_name = ""
|
|
self.worktree = f"{self.name}"
|
|
self.worktree_dir = f"{self.bare_dir}/.git/worktrees/{self.worktree}"
|
|
self.work_dir = f"{ROOT_DIR}/build/common/{self.worktree}"
|
|
self.work_done_marker = f"{self.work_dir}/.git_done_marker"
|
|
else:
|
|
self.board_name = board_name
|
|
self.worktree = f"{self.name}_{board_name}"
|
|
self.worktree_dir = f"{self.bare_dir}/.git/worktrees/{self.worktree}"
|
|
self.work_dir = f"{ROOT_DIR}/build/{board_name}/{self.worktree}"
|
|
self.work_done_marker = f"{self.work_dir}/.git_done_marker"
|
|
|
|
def set_git_params(self, version, _type):
|
|
self.version = version
|
|
self.type = _type
|
|
|
|
def sync(self):
|
|
self.git_bare_check()
|
|
self.git_work_clone()
|
|
#git_bare_check "${git_url}" "${git_dir}"
|
|
#git_bare_clone "${git_dir}" "${git_work}" "${git_tag}"
|
|
#source_patch "${patch_dir}" ${source_dir}
|
|
|
|
def git_bare_clone(self):
|
|
Logger.download(f"\tClone dedicated repo '{self.url}'")
|
|
git.Repo.clone_from(self.url, self.bare_dir, multi_options=["--tags", "--no-checkout"],
|
|
progress=GitRemoteProgress())
|
|
repo = git.Repo(self.bare_dir)
|
|
cfg_wr = repo.config_writer()
|
|
cfg_wr.add_value("remote \"origin\"", "fetch", "+refs/tags/*:refs/remotes/origin/*").release()
|
|
Path(self.bare_done_marker).touch()
|
|
|
|
def git_bare_check(self):
|
|
Logger.git(f"Check a bare repo for '{self.name}'")
|
|
bare_dir = Path(self.bare_dir)
|
|
bare_marker = Path(self.bare_done_marker)
|
|
if (not bare_dir.is_dir()) or (not bare_marker.is_file()):
|
|
if (bare_dir.is_dir()):
|
|
Logger.git("\tRemove GIT bare directory - don't find 'finish' marker!")
|
|
shutil.rmtree(self.bare_dir)
|
|
self.git_bare_clone()
|
|
Logger.download("\tFetch updates from dedicated repo")
|
|
git.Repo(self.bare_dir).remote("origin").fetch(progress=GitRemoteProgress())
|
|
|
|
def git_work_tree_init(self):
|
|
if (not Path(f"{self.work_dir}/.git").is_file()):
|
|
Logger.git("\tAdd to worktree")
|
|
self.repo_bare.git.worktree("add", self.work_dir, "master", "--no-checkout", "--force")
|
|
git_rel_dir = os.path.relpath(self.worktree_dir, start=self.work_dir)
|
|
with open(f"{self.work_dir}/.git", 'w') as f:
|
|
f.write(f"gitdir: {git_rel_dir}")
|
|
f.close()
|
|
with open(f"{self.worktree_dir}/gitdir", 'w') as f:
|
|
f.write(f"{self.work_dir}/.git")
|
|
f.close()
|
|
|
|
def git_work_get_hash_remote(self):
|
|
if (self.type == "branch"):
|
|
tags = self.repo_bare.git.ls_remote("--branches", "origin", f"{self.version}")
|
|
return tags.split('\t')[0]
|
|
elif (self.type == "head"):
|
|
tags = self.repo_bare.git.ls_remote("origin", "HEAD")
|
|
return tags.split('\t')[0]
|
|
elif (self.type == "commit"):
|
|
Logger.error("Unsupported source type!")
|
|
elif (self.type == "tag"):
|
|
tags = self.repo_bare.git.ls_remote("--tags", "origin", f"tags/{self.version}")
|
|
return tags.split('\t')[0]
|
|
|
|
def __parse_version(self):
|
|
if (self.type == "branch"):
|
|
return f"origin/{self.version}"
|
|
elif (self.type == "head"):
|
|
return f"{self.version}"
|
|
elif (self.type == "commit"):
|
|
return f"{self.version}"
|
|
elif (self.type == "tag"):
|
|
return f"{self.version}"
|
|
|
|
def git_work_reset_state(self):
|
|
hash_local = self.repo.git.rev_parse("@")
|
|
hash_remote = self.git_work_get_hash_remote()
|
|
if (hash_local == "") or (hash_local == "@") or (hash_local != hash_remote):
|
|
Logger.git(f"\tUpdate references: {hash_local}->{hash_remote}")
|
|
self.repo.git.fetch("--no-tags", self.bare_dir, self.__parse_version())
|
|
#git fetch --no-tags "${repo_local}" "${ref_name}"
|
|
Logger.git(f"\tCheckout: {hash_remote}")
|
|
self.repo.git.checkout("-f", "-q", hash_remote)
|
|
Logger.git(f"\tClean-up")
|
|
self.repo.git.clean("-q", "-d", "-f")
|
|
|
|
def git_work_clone(self):
|
|
Logger.git(f"Update and reset work repo '{self.name}' '{self.version}'")
|
|
self.repo_bare = git.Repo(self.bare_dir)
|
|
work_marker = Path(self.work_done_marker)
|
|
if (not work_marker.is_file()):
|
|
Logger.git("\tRemove GIT work directory - don't find 'finish' marker!")
|
|
shutil.rmtree(self.work_dir, ignore_errors=True)
|
|
self.git_work_tree_init()
|
|
self.repo = git.Repo(self.work_dir)
|
|
self.git_work_reset_state()
|
|
Path(self.work_done_marker).touch()
|
|
|
|
def __patch_apply(self, file, work_dir):
|
|
patch_f = Path(file)
|
|
if (patch_f.is_file()):
|
|
patch_bn = os.path.basename(file)
|
|
Logger.build(f"\tApply patch '{patch_bn}'")
|
|
p = subprocess.Popen(["patch", "--batch", "-p1", "-N",
|
|
f"--input={file}", "--quiet"], cwd=work_dir)
|
|
p.wait()
|
|
if (p.returncode != 0):
|
|
Logger.error("Failed to patch!")
|
|
|
|
def do_patch(self, board_name, dir):
|
|
Logger.build(f"Patch...")
|
|
patchs = []
|
|
if (hasattr(dir, '__len__') and (not isinstance(dir, str))):
|
|
dirs_arr = dir
|
|
else:
|
|
dirs_arr = [dir]
|
|
for ddir in dirs_arr:
|
|
dirs = [
|
|
f"{ROOT_DIR}/patch/{ddir}",
|
|
f"{ROOT_DIR}/patch/{ddir}/board_{board_name}"
|
|
]
|
|
for dir_p in dirs:
|
|
conf_fn = f"{dir_p}/series.conf"
|
|
conf_f = Path(conf_fn)
|
|
if (conf_f.is_file()):
|
|
with open(conf_fn, 'r') as f:
|
|
for line in f:
|
|
if (len(line)>10) and (line[0] != "#") and (line[0] != "-"):
|
|
file_n = line.strip()
|
|
patchs.append(f"{dir_p}/{file_n}")
|
|
f.close()
|
|
for patch_file in sorted(Path(dir_p).glob('*.patch')):
|
|
patchs.append(patch_file)
|
|
# create array only with unique files
|
|
patch_uniq = []
|
|
for patch_fn in patchs:
|
|
if patch_fn not in patch_uniq:
|
|
patch_uniq.append(patch_fn)
|
|
for patch_fn in patch_uniq:
|
|
self.__patch_apply(patch_fn, self.work_dir)
|
|
|
|
def configure(self, opts):
|
|
opts.insert(0, "./configure")
|
|
p = subprocess.Popen(opts, cwd=self.work_dir)
|
|
p.wait()
|
|
if (p.returncode != 0):
|
|
Logger.error("Failed to configure!")
|
|
|
|
def compile(self, opts, cfg_name):
|
|
#print(f"opts:{opts} target:{target}")
|
|
Logger.build(f"Compile (opts='{opts}')...")
|
|
if (cfg_name != ""):
|
|
work_cfg_name = f"{self.work_dir}/.config"
|
|
cfg_or = Path(cfg_name)
|
|
cfg_wr = Path(work_cfg_name)
|
|
if (cfg_or.is_file()):
|
|
# copy configuration, if exists
|
|
shutil.copyfile(cfg_name, work_cfg_name)
|
|
opts.insert(0, "make")
|
|
opts.append("-j11")
|
|
p = subprocess.Popen(opts, cwd=self.work_dir)
|
|
p.wait()
|
|
if (p.returncode != 0):
|
|
Logger.error("Failed to compile!")
|
|
if (cfg_name != ""):
|
|
if (cfg_or.is_file()):
|
|
# backup old configuration
|
|
shutil.copyfile(cfg_name, f"{cfg_name}.bak")
|
|
if (cfg_wr.is_file()):
|
|
# copy new configurtion, if exists
|
|
shutil.copyfile(work_cfg_name, cfg_name)
|
|
|
|
def copy_artifacts(self, artifacts, out_dir):
|
|
for art in artifacts:
|
|
file_name = self.work_dir + "/" + art["file"]
|
|
if ("subdir" in art):
|
|
dir_o = out_dir + "/" + art["subdir"] + "/"
|
|
else:
|
|
dir_o = out_dir + "/"
|
|
os.makedirs(dir_o, exist_ok=True)
|
|
if (file_name.find("*") == -1):
|
|
shutil.copy(file_name, dir_o)
|
|
else:
|
|
for file in glob.glob(file_name):
|
|
shutil.copy(file, dir_o)
|