/
opt
/
imunify360
/
venv
/
lib64
/
python3.11
/
site-packages
/
imav
/
contracts
/
Upload File
HOME
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import asyncio import logging import pwd from functools import lru_cache import uuid from pathlib import Path from defence360agent.utils import log_error_and_ignore, safe_fileops IMUNIFY_PATCH_ID_FILE = ".imunify_patch_id" logger = logging.getLogger(__name__) class ImunifyPatchIdError(Exception): pass ImunifyPatchUserId = str async def ensure_id_file(username: str) -> ImunifyPatchUserId: """Ensure the Imunify.Patch ID file exists for the given user. This function checks if the Imunify.Patch ID file exists in the user's home directory. If it does not exist, it generates a new ID, writes it to the file, and returns the ID. If the file already exists, it reads and returns the existing ID. Args: username (str): The username for which to ensure the ID file. Returns: ImunifyPatchUserId: The Imunify.Patch user ID. """ id_file = await _get_id_file(username) if _id := _read_id_file(id_file): return _id _id = _generate_id() await _write_id_file(id_file, _id) return _id @log_error_and_ignore() async def get_imunify_patch_id(username: str) -> ImunifyPatchUserId: async with get_lock(username): return await ensure_id_file(username) @lru_cache(maxsize=None) def get_lock(username: str): return asyncio.Lock() async def _get_id_file(username: str) -> Path: """Get a file with Imunify.Patch user id and create it if does not exist""" try: user_pwd = pwd.getpwnam(username) except KeyError as e: logger.error(f"No such user: {username}") raise ImunifyPatchIdError(f"No such user {username}") from e else: id_file = Path(user_pwd.pw_dir) / IMUNIFY_PATCH_ID_FILE if not id_file.exists(): if not id_file.parent.exists(): logger.error(f"No such user homedir: {id_file.parent}") raise ImunifyPatchIdError( f"No such user homedir: {id_file.parent}" ) try: await safe_fileops.touch(str(id_file)) except (PermissionError, OSError) as e: logger.error( "Unable to put %s in user home dir %s", IMUNIFY_PATCH_ID_FILE, e, ) raise ImunifyPatchIdError from e return id_file def _generate_id() -> ImunifyPatchUserId: """Generate Imunify.Patch id""" return uuid.uuid4().hex def _read_id_file(id_file: Path) -> ImunifyPatchUserId | None: """Read Imunify.Patch id from `id_file`. If id is not found, return `None`. """ with id_file.open("r") as f: for line in reversed(f.readlines()): if line and not line.startswith("#"): if imunify_patch_id := line.strip(): return imunify_patch_id logger.warning(f"Cannot parse {id_file}, file is corrupted or empty") return None async def _write_id_file(id_file: Path, _id: ImunifyPatchUserId) -> None: """Write Imunify.Patch id to `id_file`.""" text = ( "# DO NOT EDIT\n" "# This file contains Imunify.Patch id unique to this user\n" "\n" f"{_id}\n" ) try: await safe_fileops.write_text(str(id_file), text) except (OSError, PermissionError) as e: logger.error( "Unable to write %s in user home dir: %s", IMUNIFY_PATCH_ID_FILE, e ) raise ImunifyPatchIdError from e