src.plugins.plugin_default
Default commands: ls, cat, cd, cp, mv, rm, grep, history, undo, exit
1"""Default commands: ls, cat, cd, cp, mv, rm, grep, history, undo, exit""" 2import grp 3import inspect 4import os 5import pwd 6import re 7import shutil 8import stat 9import time 10from pathlib import Path 11import src.decorators.commands_register as cmd_register 12import src.decorators.handlers as handlers 13import src.extra.utils as utils 14import src.cmd_types.commands as cmds 15from src.cmd_types.output import CommandOutput 16from src.extra.utils import create_path_obj 17import src.constants as cst 18 19__author__ = "default" 20__version__ = "1.0.0" 21 22def get_resolved_line(args: list[str]) -> str: 23 line = "" 24 for arg in args: 25 line += f" {create_path_obj(arg, must_exist=False).resolve()}" 26 return line 27 28@cmd_register.command("ls", flags = ["-l"]) 29class LsCommand(cmds.ExecutableCommand): 30 def _parse_args(self) -> tuple[list[Path], dict[str, bool]]: 31 flags = self.parse_flags() 32 ret = [] 33 for arg in self.args: 34 ret.append(create_path_obj(arg, must_exist=False)) 35 if not ret: 36 ret.append(create_path_obj(".")) 37 return ret, flags 38 39 def execute(self): 40 paths, flags = self._parse_args() 41 out = CommandOutput() 42 l_flag = flags["-l"] 43 @handlers.handle_all_default 44 def file_info(item: Path): 45 output = "" 46 name = item.name 47 if item.name.startswith("."): 48 return CommandOutput() 49 if str(item).find(" ")!=-1: 50 name = f'"{name}"' 51 52 if l_flag: 53 stat_info = item.stat() 54 55 permissions = stat.filemode(stat_info.st_mode) 56 owner = pwd.getpwuid(stat_info.st_uid).pw_name 57 group = grp.getgrgid(stat_info.st_gid).gr_name 58 output += f"{permissions} {stat_info.st_nlink:>2} {owner} {group} {stat_info.st_size:>8} {time.strftime('%b %d %H:%M', time.gmtime(stat_info.st_mtime))} {name} \n" 59 60 else: 61 output+=f"{name} " 62 63 return CommandOutput( 64 stdout = output 65 ) 66 67 68 @handlers.handle_all_default 69 def list_dir(path: Path): 70 output = CommandOutput() 71 col = utils.get_terminal_dimensions()[0] 72 len_counter = 0 73 if path.is_file(): 74 return file_info(path) 75 next_line = 0 76 if len(paths)>1: 77 output.stdout = str(path) + ":\n" 78 next_line = 1 79 path_iter = path.iterdir() 80 81 for item in path_iter: 82 add = file_info(item) 83 add_stdout = add.stdout 84 if not l_flag: 85 len_counter += len(add_stdout) 86 if len_counter > col: 87 len_counter = len(add_stdout) 88 add.stdout = "\n" + add_stdout 89 output += add 90 91 output.stdout+=next_line*"\n\n" 92 return output 93 94 for arg in paths: 95 add_global = list_dir(arg) 96 out+=add_global 97 98 return out 99 100@cmd_register.command("cd") 101class CdCommand(cmds.ExecutableCommand): 102 103 def _parse_args(self) -> tuple[Path, int]: 104 105 args = self.args 106 if not args: 107 args.insert(0, "~/") 108 109 if len(args)>1: 110 return Path(""), len(args) 111 112 path = args[0] 113 path_obj = utils.create_path_obj(path) 114 if not path_obj.is_absolute(): 115 path_obj = Path.cwd() / path_obj 116 117 return path_obj, len(args) 118 119 def execute(self): 120 path, n_args = self._parse_args() 121 if n_args>1: 122 msg = "too many arguments\n" 123 return CommandOutput(stderr = msg, errcode = 4) 124 if path.is_dir(): 125 os.chdir(path) 126 return CommandOutput() 127 elif path.is_file(): 128 129 msg = f"{path}: it is a file\n" 130 return CommandOutput(stderr = msg, errcode = 2) 131 return CommandOutput(stderr = f"not found: {path}", errcode = 2) 132 133 134@cmd_register.command("cat") 135class CatCommand(cmds.ExecutableCommand): 136 137 def _parse_args(self) -> list[Path]: 138 if len(self.args)==0: 139 return [] 140 ret = [] 141 for arg in self.args: 142 ret.append(create_path_obj(arg, must_exist=False)) 143 return ret 144 145 def execute(self): 146 paths = self._parse_args() 147 output = CommandOutput() 148 if not paths: 149 msg = "too few arguments\n" 150 return CommandOutput(stderr = msg, errcode = 4) 151 152 @handlers.handle_all_default 153 def read_file(path: Path): 154 if path.is_dir(): 155 msg = f"{path}: is is a directory\n" 156 return CommandOutput(stderr = msg, errcode = 2) 157 elif path.is_file(): 158 try: 159 with open(path, "r", encoding='utf-8') as file: 160 return file.read()+"\n" 161 except Exception: 162 return CommandOutput(stderr = f"unable to read {path}\n", errcode = 3) 163 164 raise FileNotFoundError(2, path, str(path)) 165 166 for arg in paths: 167 output += read_file(arg) 168 return output 169 170@cmd_register.command("cp", flags = ["-r"]) 171class CopyCommand(cmds.ExecutableCommand): 172 def _parse_args(self) -> tuple[list[Path], Path, dict[str, bool]]: 173 flags = self.parse_flags() 174 args = self.args 175 source_dirs = [utils.create_path_obj(o, must_exist=False) for o in args[:-1]] 176 to_dir = utils.create_path_obj(args[-1], must_exist=False).expanduser() 177 178 return source_dirs, to_dir, flags 179 180 def execute(self): 181 if len(self.args) < 2: 182 return CommandOutput(stderr = "too few arguments\n", errcode = 4) 183 source_dirs, to_dir, flags = self._parse_args() 184 out = CommandOutput() 185 r = flags["-r"] 186 187 @handlers.handle_all_default 188 def copy(source: Path, to: Path): 189 if source.is_dir(): 190 if not r: 191 msg = f"-r option was not specified: '{source}' is ignored\n" 192 return CommandOutput(stderr = msg, errcode = 1) 193 194 if to_dir.resolve().is_relative_to(source_dir.resolve()): 195 msg = f"Unable to copy '{to}' to itself\n" 196 return CommandOutput(stderr = msg, errcode = 1) 197 try: 198 shutil.copytree(source, to, dirs_exist_ok=True) 199 except shutil.Error as e: 200 for arg in e.args[0]: 201 msg = f"{arg[0]}: {arg[2]}\n" 202 return CommandOutput(stderr = msg, errcode = 1) 203 return None 204 205 shutil.copy2(source, to) 206 return None 207 208 for source_dir in source_dirs: 209 res = copy(source_dir, to_dir) 210 if res: 211 out += res 212 return out 213 214 215 def history(self): 216 line = self.name 217 no_r = utils.remove_arg("-r", self.args) 218 line+=get_resolved_line(no_r) 219 220 utils.write_history(line) 221 222 def undo(self): 223 source = Path(self.args.pop()) 224 for arg in self.args: 225 move_from = source / Path(arg).name 226 RemoveCommand([str(move_from)]).execute() 227 228@cmd_register.command("mv") 229class MoveCommand(cmds.ExecutableCommand): 230 def _parse_args(self) -> tuple[list[Path], Path]: 231 args = self.args 232 233 source_dirs = [utils.create_path_obj(o) for o in args[:-1]] 234 to_dir = utils.create_path_obj(args[-1], must_exist=False) 235 236 return source_dirs, to_dir 237 238 239 def execute(self): 240 source_dirs, to_dir = self._parse_args() 241 out = CommandOutput() 242 for source_dir in source_dirs: 243 if to_dir.resolve().is_relative_to(source_dir.resolve()): 244 out.stderr += f"unable to move '{source_dir}' to itself\n" 245 out.errcode = 2 246 continue 247 shutil.move(source_dir, to_dir) 248 return out 249 250 def history(self): 251 line = self.name 252 line+=get_resolved_line(self.args) 253 254 utils.write_history(line) 255 256 def undo(self): 257 source = Path(self.args.pop()) 258 259 @handlers.handle_all_default 260 def undo_(arg: str): 261 move_from = source / Path(arg).name 262 shutil.move(move_from, arg) 263 264 for p in self.args: 265 undo_(p) 266 267 268@cmd_register.command("rm", flags = ["-r"]) 269class RemoveCommand(cmds.UndoableCommand): 270 def _parse_args(self) -> tuple[list[Path], dict[str, bool]]: 271 ret = [] 272 flags = self.parse_flags() 273 for arg in self.args: 274 try: 275 ret.append(create_path_obj(arg, must_exist=False).resolve()) 276 except FileNotFoundError: 277 self._log_error(f"Cannot remove '{arg}': no such file or directory") 278 279 return ret, flags 280 281 def history(self): 282 line = self.name 283 no_r = utils.remove_arg("-r", self.args) 284 line+=get_resolved_line(no_r) 285 286 utils.write_history(line) 287 288 def execute(self): 289 args, flags = self._parse_args() 290 r_flag = flags["-r"] 291 out = CommandOutput() 292 home = os.getenv("HOME")+"/" 293 @handlers.handle_all_default 294 def remove(path: Path): 295 296 if path == cst.TRASH_PATH: 297 msg = f"unable to remove '{path}' as it is a TRASH\n" 298 return CommandOutput(stderr = msg, errcode = 2) 299 300 if Path.cwd().is_relative_to(path): 301 msg = f"unable to remove '{path}': it is a parent directory\n" 302 return CommandOutput(stderr = msg, errcode = 2) 303 no_home = str(path).replace(home, "") 304 parent = Path(no_home).parent 305 if str(parent)[0]=="/": 306 parent = Path(str(parent)[1:]) 307 to_move = Path(cst.TRASH_PATH) / parent 308 if not to_move.exists(): 309 to_move.mkdir(parents=True) 310 311 if path.is_dir(): 312 if not r_flag and any(path.iterdir()): 313 msg = f"-r option was not specified: '{path}' is ignored\n" 314 return CommandOutput(stderr = msg, errcode = 2) 315 316 perm = input(f"Do you want to remove '{path}'? [y/n] ") 317 if perm.lower() == "y": 318 if not (to_move / path.name).exists(): 319 shutil.move(path, to_move) 320 else: 321 shutil.rmtree(to_move) 322 shutil.move(path, to_move) 323 else: 324 self.logger.warning(f"'{path}' was not removed: user declined operation") 325 return CommandOutput() 326 327 else: 328 if not (to_move / path.name).exists(): 329 shutil.move(path, to_move / path.name) 330 else: 331 (to_move/path.name).unlink() 332 shutil.move(path, to_move / path.name) 333 334 for arg in args: 335 res = remove(arg) 336 if res: 337 out += res 338 return out 339 340 341 def undo(self): 342 home = os.environ.get("HOME")+"/" 343 344 @handlers.handle_all_default 345 def undo_(arg: str): 346 no_home = str(arg).replace(home, "") 347 source = Path(cst.TRASH_PATH) / no_home 348 self.logger.info(f"undoing 'rm {arg}") 349 shutil.move(source, arg) 350 351 for p in self.args: 352 undo_(p) 353 354 355@cmd_register.command("grep", flags = ["-i", "-r", "-ir"]) 356class GrepCommand(cmds.ExecutableCommand): 357 def _parse_args(self): 358 flags = self.parse_flags() 359 f = create_path_obj(self.args[-1], must_exist=False) 360 if flags["-ir"]: 361 flags["-i"] = flags["-r"] = True 362 return f, self.args[0], flags 363 364 def execute(self): 365 if len(self.args) < 2: 366 msg = "too few arguments" 367 return CommandOutput(stderr = msg, errcode = 4) 368 path_arg, regexp, flags = self._parse_args() 369 flags_re = 0 370 if flags["-i"]: 371 flags_re |= re.IGNORECASE 372 try: 373 compiled = re.compile(regexp, flags_re) 374 except re.PatternError: 375 msg = f"invalid regular expression '{regexp}'" 376 return CommandOutput(stderr = msg, errcode = 5) 377 @handlers.handle_all_default 378 def grep(path: Path) -> CommandOutput: 379 out = CommandOutput() 380 if path.is_dir(): 381 if not flags["-r"]: 382 msg = f"'-r' flag was not specified: '{path}' is ignored" 383 return CommandOutput(stderr = msg, errcode = 2) 384 for p in path.iterdir(): 385 out+=grep(p) 386 elif path.is_file(): 387 with open(path, "r", encoding='utf-8') as f: 388 for n, line in enumerate(f.readlines(), start = 1): 389 match_found = bool(compiled.search(line)) 390 if match_found: 391 line = line.rstrip('\n\r') 392 out.stdout+=f"{str(path)}\t {n} {line}\n" 393 return out 394 output = grep(path_arg) 395 return output 396 397@cmd_register.command("history") 398class HistoryCommand(cmds.ExecutableCommand): 399 def _parse_args(self) -> int | None: 400 if self.args: 401 try: 402 return int(self.args[0]) 403 except ValueError: 404 self._log_error(f"{self.args[0]} is not an integer") 405 return None 406 407 def execute(self): 408 n = self._parse_args() 409 with open(cst.HISTORY_PATH, "r", encoding='utf-8') as file: 410 out = "" 411 rev = file.readlines() 412 if not n: 413 n = len(rev) 414 for num, line in enumerate(rev[-n:]): 415 out += f"{num+1} {line}" 416 return CommandOutput(stdout = out) 417 418@cmd_register.command("undo") 419class UndoCommand(cmds.ExecutableCommand): 420 def _parse_args(self) -> None: 421 return None 422 423 def execute(self): 424 cur_frame = inspect.currentframe() 425 f_back = cur_frame.f_back 426 while not getattr(f_back.f_locals["self"], 'cmd_map', None): 427 f_back = f_back.f_back 428 out = CommandOutput() 429 caller = f_back.f_back.f_locals["self"] 430 431 undoable = list( 432 filter( 433 lambda x: bool( 434 getattr(caller.cmd_map[x].cmd, "undo", None) 435 ), caller.cmd_map.keys() 436 ) 437 ) 438 439 with open(cst.HISTORY_PATH, "r", encoding='utf-8') as file: 440 history = file.readlines() 441 history_rev = history[::-1] 442 num_to_delete = None 443 for num, hist in enumerate(history_rev): 444 hist = hist[:-1] 445 name = hist.split(" ")[0] 446 if name in undoable: 447 try: 448 args = caller.shlex_split(hist)[1:] 449 if "--help" in args: 450 continue 451 cmd = caller.cmd_map[name].cmd(args) 452 cmd.undo() 453 num_to_delete = num 454 break 455 except Exception as e: 456 out.stderr +=f"{e}: {hist}\n" 457 continue 458 if num_to_delete: 459 with open(cst.HISTORY_PATH, "w", encoding='utf-8') as file: 460 for line in history_rev[:num_to_delete]+history_rev[num_to_delete+1:]: 461 file.write(line) 462 return out 463 464@cmd_register.command("exit") 465class ExitCommand(cmds.ExecutableCommand): 466 def _parse_args(self) -> int | None: 467 if not self.args: 468 return 0 469 try: 470 return int(self.args[0]) 471 except ValueError: 472 self._log_error(f"{self.args[0]} is not an integer") 473 return None 474 475 def execute(self): 476 code = self._parse_args() 477 if code is not None: 478 shutil.rmtree(cst.TRASH_PATH) 479 cst.TRASH_PATH.mkdir(parents=True, exist_ok=True) 480 return exit(code)
29@cmd_register.command("ls", flags = ["-l"]) 30class LsCommand(cmds.ExecutableCommand): 31 def _parse_args(self) -> tuple[list[Path], dict[str, bool]]: 32 flags = self.parse_flags() 33 ret = [] 34 for arg in self.args: 35 ret.append(create_path_obj(arg, must_exist=False)) 36 if not ret: 37 ret.append(create_path_obj(".")) 38 return ret, flags 39 40 def execute(self): 41 paths, flags = self._parse_args() 42 out = CommandOutput() 43 l_flag = flags["-l"] 44 @handlers.handle_all_default 45 def file_info(item: Path): 46 output = "" 47 name = item.name 48 if item.name.startswith("."): 49 return CommandOutput() 50 if str(item).find(" ")!=-1: 51 name = f'"{name}"' 52 53 if l_flag: 54 stat_info = item.stat() 55 56 permissions = stat.filemode(stat_info.st_mode) 57 owner = pwd.getpwuid(stat_info.st_uid).pw_name 58 group = grp.getgrgid(stat_info.st_gid).gr_name 59 output += f"{permissions} {stat_info.st_nlink:>2} {owner} {group} {stat_info.st_size:>8} {time.strftime('%b %d %H:%M', time.gmtime(stat_info.st_mtime))} {name} \n" 60 61 else: 62 output+=f"{name} " 63 64 return CommandOutput( 65 stdout = output 66 ) 67 68 69 @handlers.handle_all_default 70 def list_dir(path: Path): 71 output = CommandOutput() 72 col = utils.get_terminal_dimensions()[0] 73 len_counter = 0 74 if path.is_file(): 75 return file_info(path) 76 next_line = 0 77 if len(paths)>1: 78 output.stdout = str(path) + ":\n" 79 next_line = 1 80 path_iter = path.iterdir() 81 82 for item in path_iter: 83 add = file_info(item) 84 add_stdout = add.stdout 85 if not l_flag: 86 len_counter += len(add_stdout) 87 if len_counter > col: 88 len_counter = len(add_stdout) 89 add.stdout = "\n" + add_stdout 90 output += add 91 92 output.stdout+=next_line*"\n\n" 93 return output 94 95 for arg in paths: 96 add_global = list_dir(arg) 97 out+=add_global 98 99 return out
Abstract class for executable commands(has no undo method)
Parameters
- args: list of arguments passed to the command
40 def execute(self): 41 paths, flags = self._parse_args() 42 out = CommandOutput() 43 l_flag = flags["-l"] 44 @handlers.handle_all_default 45 def file_info(item: Path): 46 output = "" 47 name = item.name 48 if item.name.startswith("."): 49 return CommandOutput() 50 if str(item).find(" ")!=-1: 51 name = f'"{name}"' 52 53 if l_flag: 54 stat_info = item.stat() 55 56 permissions = stat.filemode(stat_info.st_mode) 57 owner = pwd.getpwuid(stat_info.st_uid).pw_name 58 group = grp.getgrgid(stat_info.st_gid).gr_name 59 output += f"{permissions} {stat_info.st_nlink:>2} {owner} {group} {stat_info.st_size:>8} {time.strftime('%b %d %H:%M', time.gmtime(stat_info.st_mtime))} {name} \n" 60 61 else: 62 output+=f"{name} " 63 64 return CommandOutput( 65 stdout = output 66 ) 67 68 69 @handlers.handle_all_default 70 def list_dir(path: Path): 71 output = CommandOutput() 72 col = utils.get_terminal_dimensions()[0] 73 len_counter = 0 74 if path.is_file(): 75 return file_info(path) 76 next_line = 0 77 if len(paths)>1: 78 output.stdout = str(path) + ":\n" 79 next_line = 1 80 path_iter = path.iterdir() 81 82 for item in path_iter: 83 add = file_info(item) 84 add_stdout = add.stdout 85 if not l_flag: 86 len_counter += len(add_stdout) 87 if len_counter > col: 88 len_counter = len(add_stdout) 89 add.stdout = "\n" + add_stdout 90 output += add 91 92 output.stdout+=next_line*"\n\n" 93 return output 94 95 for arg in paths: 96 add_global = list_dir(arg) 97 out+=add_global 98 99 return out
Executes command
Inherited Members
101@cmd_register.command("cd") 102class CdCommand(cmds.ExecutableCommand): 103 104 def _parse_args(self) -> tuple[Path, int]: 105 106 args = self.args 107 if not args: 108 args.insert(0, "~/") 109 110 if len(args)>1: 111 return Path(""), len(args) 112 113 path = args[0] 114 path_obj = utils.create_path_obj(path) 115 if not path_obj.is_absolute(): 116 path_obj = Path.cwd() / path_obj 117 118 return path_obj, len(args) 119 120 def execute(self): 121 path, n_args = self._parse_args() 122 if n_args>1: 123 msg = "too many arguments\n" 124 return CommandOutput(stderr = msg, errcode = 4) 125 if path.is_dir(): 126 os.chdir(path) 127 return CommandOutput() 128 elif path.is_file(): 129 130 msg = f"{path}: it is a file\n" 131 return CommandOutput(stderr = msg, errcode = 2) 132 return CommandOutput(stderr = f"not found: {path}", errcode = 2)
Abstract class for executable commands(has no undo method)
Parameters
- args: list of arguments passed to the command
120 def execute(self): 121 path, n_args = self._parse_args() 122 if n_args>1: 123 msg = "too many arguments\n" 124 return CommandOutput(stderr = msg, errcode = 4) 125 if path.is_dir(): 126 os.chdir(path) 127 return CommandOutput() 128 elif path.is_file(): 129 130 msg = f"{path}: it is a file\n" 131 return CommandOutput(stderr = msg, errcode = 2) 132 return CommandOutput(stderr = f"not found: {path}", errcode = 2)
Executes command
Inherited Members
135@cmd_register.command("cat") 136class CatCommand(cmds.ExecutableCommand): 137 138 def _parse_args(self) -> list[Path]: 139 if len(self.args)==0: 140 return [] 141 ret = [] 142 for arg in self.args: 143 ret.append(create_path_obj(arg, must_exist=False)) 144 return ret 145 146 def execute(self): 147 paths = self._parse_args() 148 output = CommandOutput() 149 if not paths: 150 msg = "too few arguments\n" 151 return CommandOutput(stderr = msg, errcode = 4) 152 153 @handlers.handle_all_default 154 def read_file(path: Path): 155 if path.is_dir(): 156 msg = f"{path}: is is a directory\n" 157 return CommandOutput(stderr = msg, errcode = 2) 158 elif path.is_file(): 159 try: 160 with open(path, "r", encoding='utf-8') as file: 161 return file.read()+"\n" 162 except Exception: 163 return CommandOutput(stderr = f"unable to read {path}\n", errcode = 3) 164 165 raise FileNotFoundError(2, path, str(path)) 166 167 for arg in paths: 168 output += read_file(arg) 169 return output
Abstract class for executable commands(has no undo method)
Parameters
- args: list of arguments passed to the command
146 def execute(self): 147 paths = self._parse_args() 148 output = CommandOutput() 149 if not paths: 150 msg = "too few arguments\n" 151 return CommandOutput(stderr = msg, errcode = 4) 152 153 @handlers.handle_all_default 154 def read_file(path: Path): 155 if path.is_dir(): 156 msg = f"{path}: is is a directory\n" 157 return CommandOutput(stderr = msg, errcode = 2) 158 elif path.is_file(): 159 try: 160 with open(path, "r", encoding='utf-8') as file: 161 return file.read()+"\n" 162 except Exception: 163 return CommandOutput(stderr = f"unable to read {path}\n", errcode = 3) 164 165 raise FileNotFoundError(2, path, str(path)) 166 167 for arg in paths: 168 output += read_file(arg) 169 return output
Executes command
Inherited Members
171@cmd_register.command("cp", flags = ["-r"]) 172class CopyCommand(cmds.ExecutableCommand): 173 def _parse_args(self) -> tuple[list[Path], Path, dict[str, bool]]: 174 flags = self.parse_flags() 175 args = self.args 176 source_dirs = [utils.create_path_obj(o, must_exist=False) for o in args[:-1]] 177 to_dir = utils.create_path_obj(args[-1], must_exist=False).expanduser() 178 179 return source_dirs, to_dir, flags 180 181 def execute(self): 182 if len(self.args) < 2: 183 return CommandOutput(stderr = "too few arguments\n", errcode = 4) 184 source_dirs, to_dir, flags = self._parse_args() 185 out = CommandOutput() 186 r = flags["-r"] 187 188 @handlers.handle_all_default 189 def copy(source: Path, to: Path): 190 if source.is_dir(): 191 if not r: 192 msg = f"-r option was not specified: '{source}' is ignored\n" 193 return CommandOutput(stderr = msg, errcode = 1) 194 195 if to_dir.resolve().is_relative_to(source_dir.resolve()): 196 msg = f"Unable to copy '{to}' to itself\n" 197 return CommandOutput(stderr = msg, errcode = 1) 198 try: 199 shutil.copytree(source, to, dirs_exist_ok=True) 200 except shutil.Error as e: 201 for arg in e.args[0]: 202 msg = f"{arg[0]}: {arg[2]}\n" 203 return CommandOutput(stderr = msg, errcode = 1) 204 return None 205 206 shutil.copy2(source, to) 207 return None 208 209 for source_dir in source_dirs: 210 res = copy(source_dir, to_dir) 211 if res: 212 out += res 213 return out 214 215 216 def history(self): 217 line = self.name 218 no_r = utils.remove_arg("-r", self.args) 219 line+=get_resolved_line(no_r) 220 221 utils.write_history(line) 222 223 def undo(self): 224 source = Path(self.args.pop()) 225 for arg in self.args: 226 move_from = source / Path(arg).name 227 RemoveCommand([str(move_from)]).execute()
Abstract class for executable commands(has no undo method)
Parameters
- args: list of arguments passed to the command
181 def execute(self): 182 if len(self.args) < 2: 183 return CommandOutput(stderr = "too few arguments\n", errcode = 4) 184 source_dirs, to_dir, flags = self._parse_args() 185 out = CommandOutput() 186 r = flags["-r"] 187 188 @handlers.handle_all_default 189 def copy(source: Path, to: Path): 190 if source.is_dir(): 191 if not r: 192 msg = f"-r option was not specified: '{source}' is ignored\n" 193 return CommandOutput(stderr = msg, errcode = 1) 194 195 if to_dir.resolve().is_relative_to(source_dir.resolve()): 196 msg = f"Unable to copy '{to}' to itself\n" 197 return CommandOutput(stderr = msg, errcode = 1) 198 try: 199 shutil.copytree(source, to, dirs_exist_ok=True) 200 except shutil.Error as e: 201 for arg in e.args[0]: 202 msg = f"{arg[0]}: {arg[2]}\n" 203 return CommandOutput(stderr = msg, errcode = 1) 204 return None 205 206 shutil.copy2(source, to) 207 return None 208 209 for source_dir in source_dirs: 210 res = copy(source_dir, to_dir) 211 if res: 212 out += res 213 return out
Executes command
216 def history(self): 217 line = self.name 218 no_r = utils.remove_arg("-r", self.args) 219 line+=get_resolved_line(no_r) 220 221 utils.write_history(line)
Writes command to history file
Inherited Members
229@cmd_register.command("mv") 230class MoveCommand(cmds.ExecutableCommand): 231 def _parse_args(self) -> tuple[list[Path], Path]: 232 args = self.args 233 234 source_dirs = [utils.create_path_obj(o) for o in args[:-1]] 235 to_dir = utils.create_path_obj(args[-1], must_exist=False) 236 237 return source_dirs, to_dir 238 239 240 def execute(self): 241 source_dirs, to_dir = self._parse_args() 242 out = CommandOutput() 243 for source_dir in source_dirs: 244 if to_dir.resolve().is_relative_to(source_dir.resolve()): 245 out.stderr += f"unable to move '{source_dir}' to itself\n" 246 out.errcode = 2 247 continue 248 shutil.move(source_dir, to_dir) 249 return out 250 251 def history(self): 252 line = self.name 253 line+=get_resolved_line(self.args) 254 255 utils.write_history(line) 256 257 def undo(self): 258 source = Path(self.args.pop()) 259 260 @handlers.handle_all_default 261 def undo_(arg: str): 262 move_from = source / Path(arg).name 263 shutil.move(move_from, arg) 264 265 for p in self.args: 266 undo_(p)
Abstract class for executable commands(has no undo method)
Parameters
- args: list of arguments passed to the command
240 def execute(self): 241 source_dirs, to_dir = self._parse_args() 242 out = CommandOutput() 243 for source_dir in source_dirs: 244 if to_dir.resolve().is_relative_to(source_dir.resolve()): 245 out.stderr += f"unable to move '{source_dir}' to itself\n" 246 out.errcode = 2 247 continue 248 shutil.move(source_dir, to_dir) 249 return out
Executes command
251 def history(self): 252 line = self.name 253 line+=get_resolved_line(self.args) 254 255 utils.write_history(line)
Writes command to history file
Inherited Members
269@cmd_register.command("rm", flags = ["-r"]) 270class RemoveCommand(cmds.UndoableCommand): 271 def _parse_args(self) -> tuple[list[Path], dict[str, bool]]: 272 ret = [] 273 flags = self.parse_flags() 274 for arg in self.args: 275 try: 276 ret.append(create_path_obj(arg, must_exist=False).resolve()) 277 except FileNotFoundError: 278 self._log_error(f"Cannot remove '{arg}': no such file or directory") 279 280 return ret, flags 281 282 def history(self): 283 line = self.name 284 no_r = utils.remove_arg("-r", self.args) 285 line+=get_resolved_line(no_r) 286 287 utils.write_history(line) 288 289 def execute(self): 290 args, flags = self._parse_args() 291 r_flag = flags["-r"] 292 out = CommandOutput() 293 home = os.getenv("HOME")+"/" 294 @handlers.handle_all_default 295 def remove(path: Path): 296 297 if path == cst.TRASH_PATH: 298 msg = f"unable to remove '{path}' as it is a TRASH\n" 299 return CommandOutput(stderr = msg, errcode = 2) 300 301 if Path.cwd().is_relative_to(path): 302 msg = f"unable to remove '{path}': it is a parent directory\n" 303 return CommandOutput(stderr = msg, errcode = 2) 304 no_home = str(path).replace(home, "") 305 parent = Path(no_home).parent 306 if str(parent)[0]=="/": 307 parent = Path(str(parent)[1:]) 308 to_move = Path(cst.TRASH_PATH) / parent 309 if not to_move.exists(): 310 to_move.mkdir(parents=True) 311 312 if path.is_dir(): 313 if not r_flag and any(path.iterdir()): 314 msg = f"-r option was not specified: '{path}' is ignored\n" 315 return CommandOutput(stderr = msg, errcode = 2) 316 317 perm = input(f"Do you want to remove '{path}'? [y/n] ") 318 if perm.lower() == "y": 319 if not (to_move / path.name).exists(): 320 shutil.move(path, to_move) 321 else: 322 shutil.rmtree(to_move) 323 shutil.move(path, to_move) 324 else: 325 self.logger.warning(f"'{path}' was not removed: user declined operation") 326 return CommandOutput() 327 328 else: 329 if not (to_move / path.name).exists(): 330 shutil.move(path, to_move / path.name) 331 else: 332 (to_move/path.name).unlink() 333 shutil.move(path, to_move / path.name) 334 335 for arg in args: 336 res = remove(arg) 337 if res: 338 out += res 339 return out 340 341 342 def undo(self): 343 home = os.environ.get("HOME")+"/" 344 345 @handlers.handle_all_default 346 def undo_(arg: str): 347 no_home = str(arg).replace(home, "") 348 source = Path(cst.TRASH_PATH) / no_home 349 self.logger.info(f"undoing 'rm {arg}") 350 shutil.move(source, arg) 351 352 for p in self.args: 353 undo_(p)
Abstract class for undoable commands(fields of ExecutableCommand are included)
282 def history(self): 283 line = self.name 284 no_r = utils.remove_arg("-r", self.args) 285 line+=get_resolved_line(no_r) 286 287 utils.write_history(line)
Writes command to history file
289 def execute(self): 290 args, flags = self._parse_args() 291 r_flag = flags["-r"] 292 out = CommandOutput() 293 home = os.getenv("HOME")+"/" 294 @handlers.handle_all_default 295 def remove(path: Path): 296 297 if path == cst.TRASH_PATH: 298 msg = f"unable to remove '{path}' as it is a TRASH\n" 299 return CommandOutput(stderr = msg, errcode = 2) 300 301 if Path.cwd().is_relative_to(path): 302 msg = f"unable to remove '{path}': it is a parent directory\n" 303 return CommandOutput(stderr = msg, errcode = 2) 304 no_home = str(path).replace(home, "") 305 parent = Path(no_home).parent 306 if str(parent)[0]=="/": 307 parent = Path(str(parent)[1:]) 308 to_move = Path(cst.TRASH_PATH) / parent 309 if not to_move.exists(): 310 to_move.mkdir(parents=True) 311 312 if path.is_dir(): 313 if not r_flag and any(path.iterdir()): 314 msg = f"-r option was not specified: '{path}' is ignored\n" 315 return CommandOutput(stderr = msg, errcode = 2) 316 317 perm = input(f"Do you want to remove '{path}'? [y/n] ") 318 if perm.lower() == "y": 319 if not (to_move / path.name).exists(): 320 shutil.move(path, to_move) 321 else: 322 shutil.rmtree(to_move) 323 shutil.move(path, to_move) 324 else: 325 self.logger.warning(f"'{path}' was not removed: user declined operation") 326 return CommandOutput() 327 328 else: 329 if not (to_move / path.name).exists(): 330 shutil.move(path, to_move / path.name) 331 else: 332 (to_move/path.name).unlink() 333 shutil.move(path, to_move / path.name) 334 335 for arg in args: 336 res = remove(arg) 337 if res: 338 out += res 339 return out
Executes command
342 def undo(self): 343 home = os.environ.get("HOME")+"/" 344 345 @handlers.handle_all_default 346 def undo_(arg: str): 347 no_home = str(arg).replace(home, "") 348 source = Path(cst.TRASH_PATH) / no_home 349 self.logger.info(f"undoing 'rm {arg}") 350 shutil.move(source, arg) 351 352 for p in self.args: 353 undo_(p)
Undoes command
Inherited Members
356@cmd_register.command("grep", flags = ["-i", "-r", "-ir"]) 357class GrepCommand(cmds.ExecutableCommand): 358 def _parse_args(self): 359 flags = self.parse_flags() 360 f = create_path_obj(self.args[-1], must_exist=False) 361 if flags["-ir"]: 362 flags["-i"] = flags["-r"] = True 363 return f, self.args[0], flags 364 365 def execute(self): 366 if len(self.args) < 2: 367 msg = "too few arguments" 368 return CommandOutput(stderr = msg, errcode = 4) 369 path_arg, regexp, flags = self._parse_args() 370 flags_re = 0 371 if flags["-i"]: 372 flags_re |= re.IGNORECASE 373 try: 374 compiled = re.compile(regexp, flags_re) 375 except re.PatternError: 376 msg = f"invalid regular expression '{regexp}'" 377 return CommandOutput(stderr = msg, errcode = 5) 378 @handlers.handle_all_default 379 def grep(path: Path) -> CommandOutput: 380 out = CommandOutput() 381 if path.is_dir(): 382 if not flags["-r"]: 383 msg = f"'-r' flag was not specified: '{path}' is ignored" 384 return CommandOutput(stderr = msg, errcode = 2) 385 for p in path.iterdir(): 386 out+=grep(p) 387 elif path.is_file(): 388 with open(path, "r", encoding='utf-8') as f: 389 for n, line in enumerate(f.readlines(), start = 1): 390 match_found = bool(compiled.search(line)) 391 if match_found: 392 line = line.rstrip('\n\r') 393 out.stdout+=f"{str(path)}\t {n} {line}\n" 394 return out 395 output = grep(path_arg) 396 return output
Abstract class for executable commands(has no undo method)
Parameters
- args: list of arguments passed to the command
365 def execute(self): 366 if len(self.args) < 2: 367 msg = "too few arguments" 368 return CommandOutput(stderr = msg, errcode = 4) 369 path_arg, regexp, flags = self._parse_args() 370 flags_re = 0 371 if flags["-i"]: 372 flags_re |= re.IGNORECASE 373 try: 374 compiled = re.compile(regexp, flags_re) 375 except re.PatternError: 376 msg = f"invalid regular expression '{regexp}'" 377 return CommandOutput(stderr = msg, errcode = 5) 378 @handlers.handle_all_default 379 def grep(path: Path) -> CommandOutput: 380 out = CommandOutput() 381 if path.is_dir(): 382 if not flags["-r"]: 383 msg = f"'-r' flag was not specified: '{path}' is ignored" 384 return CommandOutput(stderr = msg, errcode = 2) 385 for p in path.iterdir(): 386 out+=grep(p) 387 elif path.is_file(): 388 with open(path, "r", encoding='utf-8') as f: 389 for n, line in enumerate(f.readlines(), start = 1): 390 match_found = bool(compiled.search(line)) 391 if match_found: 392 line = line.rstrip('\n\r') 393 out.stdout+=f"{str(path)}\t {n} {line}\n" 394 return out 395 output = grep(path_arg) 396 return output
Executes command
Inherited Members
398@cmd_register.command("history") 399class HistoryCommand(cmds.ExecutableCommand): 400 def _parse_args(self) -> int | None: 401 if self.args: 402 try: 403 return int(self.args[0]) 404 except ValueError: 405 self._log_error(f"{self.args[0]} is not an integer") 406 return None 407 408 def execute(self): 409 n = self._parse_args() 410 with open(cst.HISTORY_PATH, "r", encoding='utf-8') as file: 411 out = "" 412 rev = file.readlines() 413 if not n: 414 n = len(rev) 415 for num, line in enumerate(rev[-n:]): 416 out += f"{num+1} {line}" 417 return CommandOutput(stdout = out)
Abstract class for executable commands(has no undo method)
Parameters
- args: list of arguments passed to the command
408 def execute(self): 409 n = self._parse_args() 410 with open(cst.HISTORY_PATH, "r", encoding='utf-8') as file: 411 out = "" 412 rev = file.readlines() 413 if not n: 414 n = len(rev) 415 for num, line in enumerate(rev[-n:]): 416 out += f"{num+1} {line}" 417 return CommandOutput(stdout = out)
Executes command
Inherited Members
419@cmd_register.command("undo") 420class UndoCommand(cmds.ExecutableCommand): 421 def _parse_args(self) -> None: 422 return None 423 424 def execute(self): 425 cur_frame = inspect.currentframe() 426 f_back = cur_frame.f_back 427 while not getattr(f_back.f_locals["self"], 'cmd_map', None): 428 f_back = f_back.f_back 429 out = CommandOutput() 430 caller = f_back.f_back.f_locals["self"] 431 432 undoable = list( 433 filter( 434 lambda x: bool( 435 getattr(caller.cmd_map[x].cmd, "undo", None) 436 ), caller.cmd_map.keys() 437 ) 438 ) 439 440 with open(cst.HISTORY_PATH, "r", encoding='utf-8') as file: 441 history = file.readlines() 442 history_rev = history[::-1] 443 num_to_delete = None 444 for num, hist in enumerate(history_rev): 445 hist = hist[:-1] 446 name = hist.split(" ")[0] 447 if name in undoable: 448 try: 449 args = caller.shlex_split(hist)[1:] 450 if "--help" in args: 451 continue 452 cmd = caller.cmd_map[name].cmd(args) 453 cmd.undo() 454 num_to_delete = num 455 break 456 except Exception as e: 457 out.stderr +=f"{e}: {hist}\n" 458 continue 459 if num_to_delete: 460 with open(cst.HISTORY_PATH, "w", encoding='utf-8') as file: 461 for line in history_rev[:num_to_delete]+history_rev[num_to_delete+1:]: 462 file.write(line) 463 return out
Abstract class for executable commands(has no undo method)
Parameters
- args: list of arguments passed to the command
424 def execute(self): 425 cur_frame = inspect.currentframe() 426 f_back = cur_frame.f_back 427 while not getattr(f_back.f_locals["self"], 'cmd_map', None): 428 f_back = f_back.f_back 429 out = CommandOutput() 430 caller = f_back.f_back.f_locals["self"] 431 432 undoable = list( 433 filter( 434 lambda x: bool( 435 getattr(caller.cmd_map[x].cmd, "undo", None) 436 ), caller.cmd_map.keys() 437 ) 438 ) 439 440 with open(cst.HISTORY_PATH, "r", encoding='utf-8') as file: 441 history = file.readlines() 442 history_rev = history[::-1] 443 num_to_delete = None 444 for num, hist in enumerate(history_rev): 445 hist = hist[:-1] 446 name = hist.split(" ")[0] 447 if name in undoable: 448 try: 449 args = caller.shlex_split(hist)[1:] 450 if "--help" in args: 451 continue 452 cmd = caller.cmd_map[name].cmd(args) 453 cmd.undo() 454 num_to_delete = num 455 break 456 except Exception as e: 457 out.stderr +=f"{e}: {hist}\n" 458 continue 459 if num_to_delete: 460 with open(cst.HISTORY_PATH, "w", encoding='utf-8') as file: 461 for line in history_rev[:num_to_delete]+history_rev[num_to_delete+1:]: 462 file.write(line) 463 return out
Executes command
Inherited Members
465@cmd_register.command("exit") 466class ExitCommand(cmds.ExecutableCommand): 467 def _parse_args(self) -> int | None: 468 if not self.args: 469 return 0 470 try: 471 return int(self.args[0]) 472 except ValueError: 473 self._log_error(f"{self.args[0]} is not an integer") 474 return None 475 476 def execute(self): 477 code = self._parse_args() 478 if code is not None: 479 shutil.rmtree(cst.TRASH_PATH) 480 cst.TRASH_PATH.mkdir(parents=True, exist_ok=True) 481 return exit(code)
Abstract class for executable commands(has no undo method)
Parameters
- args: list of arguments passed to the command
476 def execute(self): 477 code = self._parse_args() 478 if code is not None: 479 shutil.rmtree(cst.TRASH_PATH) 480 cst.TRASH_PATH.mkdir(parents=True, exist_ok=True) 481 return exit(code)
Executes command