Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

type hint: add type hint for checksum and xpak #1318

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions lib/portage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,15 @@ def _decode_argv(argv):
return [_unicode_decode(x.encode(fs_encoding, "surrogateescape")) for x in argv]


def _unicode_encode(s, encoding=_encodings["content"], errors="backslashreplace"):
def _unicode_encode(
s, encoding=_encodings["content"], errors="backslashreplace"
) -> bytes:
if isinstance(s, str):
s = s.encode(encoding, errors)
return s


def _unicode_decode(s, encoding=_encodings["content"], errors="replace"):
def _unicode_decode(s, encoding=_encodings["content"], errors="replace") -> str:
if isinstance(s, bytes):
s = str(s, encoding=encoding, errors=errors)
return s
Expand Down
53 changes: 28 additions & 25 deletions lib/portage/xpak.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import array
import errno
from typing import Optional, Any

import portage
from portage import os
Expand All @@ -48,7 +49,7 @@
from portage.util.file_copy import copyfile


def addtolist(mylist, curdir):
def addtolist(mylist: list[str], curdir: str | bytes) -> None:
"""(list, dir) --- Takes an array(list) and appends all files from dir down
the directory tree. Returns nothing. list is modified."""
curdir = normalize_path(
Expand All @@ -73,7 +74,7 @@ def addtolist(mylist, curdir):
mylist.append(os.path.join(parent, x)[len(curdir) + 1 :])


def encodeint(myint):
def encodeint(myint: int) -> bytes:
"""Takes a 4 byte integer and converts it into a string of 4 characters.
Returns the characters in a string."""
a = array.array("B")
Expand All @@ -88,7 +89,7 @@ def encodeint(myint):
return a.tostring()


def decodeint(mystring):
def decodeint(mystring: str | bytes) -> int:
"""Takes a 4 byte string and converts it into a 4 byte integer.
Returns an integer."""
myint = 0
Expand All @@ -99,7 +100,7 @@ def decodeint(mystring):
return myint


def xpak(rootdir, outfile=None):
def xpak(rootdir, outfile=None) -> bytes:
"""(rootdir, outfile) -- creates an xpak segment of the directory 'rootdir'
and under the name 'outfile' if it is specified. Otherwise it returns the
xpak segment."""
Expand Down Expand Up @@ -133,7 +134,7 @@ def xpak(rootdir, outfile=None):
return xpak_segment


def xpak_mem(mydata):
def xpak_mem(mydata: dict) -> bytes:
"""Create an xpack segment from a map object."""

mydata_encoded = {}
Expand Down Expand Up @@ -174,7 +175,7 @@ def xpak_mem(mydata):
)


def xsplit(infile):
def xsplit(infile: str | bytes) -> bool:
"""(infile) -- Splits the infile into two files.
'infile.index' contains the index segment.
'infile.dat' contains the data segment."""
Expand Down Expand Up @@ -204,7 +205,7 @@ def xsplit(infile):
return True


def xsplit_mem(mydat):
def xsplit_mem(mydat: bytes) -> Optional[tuple[bytes, bytes]]:
if mydat[0:8] != b"XPAKPACK":
return None
if mydat[-8:] != b"XPAKSTOP":
Expand All @@ -213,7 +214,7 @@ def xsplit_mem(mydat):
return (mydat[16 : indexsize + 16], mydat[indexsize + 16 : -8])


def getindex(infile):
def getindex(infile: str | bytes) -> None | bytes:
"""(infile) -- grabs the index segment from the infile and returns it."""
myfile = open(
_unicode_encode(infile, encoding=_encodings["fs"], errors="strict"), "rb"
Expand All @@ -228,7 +229,7 @@ def getindex(infile):
return myindex


def getboth(infile):
def getboth(infile: str | bytes):
"""(infile) -- grabs the index and data segments from the infile.
Returns an array [indexSegment, dataSegment]"""
myfile = open(
Expand All @@ -246,13 +247,13 @@ def getboth(infile):
return myindex, mydata


def listindex(myindex):
def listindex(myindex) -> None:
"""Print to the terminal the filenames listed in the indexglob passed in."""
for x in getindex_mem(myindex):
print(x)


def getindex_mem(myindex):
def getindex_mem(myindex) -> list[Any]:
"""Returns the filenames listed in the indexglob passed in."""
myindexlen = len(myindex)
startpos = 0
Expand All @@ -264,7 +265,7 @@ def getindex_mem(myindex):
return myret


def searchindex(myindex, myitem):
def searchindex(myindex, myitem) -> tuple[int, int]:
"""(index, item) -- Finds the offset and length of the file 'item' in the
datasegment via the index 'index' provided."""
myitem = _unicode_encode(
Expand All @@ -288,7 +289,7 @@ def searchindex(myindex, myitem):
startpos = startpos + mytestlen + 12


def getitem(myid, myitem):
def getitem(myid, myitem) -> list[Any]:
myindex = myid[0]
mydata = myid[1]
myloc = searchindex(myindex, myitem)
Expand All @@ -297,7 +298,7 @@ def getitem(myid, myitem):
return mydata[myloc[0] : myloc[0] + myloc[1]]


def xpand(myid, mydest):
def xpand(myid, mydest) -> None:
mydest = normalize_path(mydest) + os.sep
myindex = myid[0]
mydata = myid[1]
Expand Down Expand Up @@ -340,7 +341,7 @@ def __init__(self, myfile):
self.indexpos = None
self.datapos = None

def decompose(self, datadir, cleanup=1):
def decompose(self, datadir, cleanup: int = 1) -> int:
"""Alias for unpackinfo() --- Complement to recompose() but optionally
deletes the destination directory. Extracts the xpak from the tbz2 into
the directory provided. Raises IOError if scan() fails.
Expand All @@ -353,11 +354,13 @@ def decompose(self, datadir, cleanup=1):
os.makedirs(datadir)
return self.unpackinfo(datadir)

def compose(self, datadir, cleanup=0):
def compose(self, datadir, cleanup: int = 0) -> None:
"""Alias for recompose()."""
return self.recompose(datadir, cleanup)

def recompose(self, datadir, cleanup=0, break_hardlinks=True):
def recompose(
self, datadir, cleanup: int = 0, break_hardlinks: bool = True
) -> None:
"""Creates an xpak segment from the datadir provided, truncates the tbz2
to the end of regular data if an xpak segment already exists, and adds
the new segment to the file with terminating info."""
Expand All @@ -366,7 +369,7 @@ def recompose(self, datadir, cleanup=0, break_hardlinks=True):
if cleanup:
self.cleanup(datadir)

def recompose_mem(self, xpdata, break_hardlinks=True):
def recompose_mem(self, xpdata, break_hardlinks: bool = True) -> int:
"""
Update the xpak segment.
@param xpdata: A new xpak segment to be written, like that returned
Expand Down Expand Up @@ -402,7 +405,7 @@ def recompose_mem(self, xpdata, break_hardlinks=True):
myfile.close()
return 1

def cleanup(self, datadir):
def cleanup(self, datadir) -> None:
datadir_split = os.path.split(datadir)
if len(datadir_split) >= 2 and len(datadir_split[1]) > 0:
# This is potentially dangerous,
Expand All @@ -415,7 +418,7 @@ def cleanup(self, datadir):
else:
raise oe

def scan(self):
def scan(self) -> int:
"""Scans the tbz2 to locate the xpak segment and setup internal values.
This function is called by relevant functions already."""
a = None
Expand Down Expand Up @@ -480,7 +483,7 @@ def scan(self):
if a is not None:
a.close()

def filelist(self):
def filelist(self) -> Optional[list[Any]]:
"""Return an array of each file listed in the index."""
if not self.scan():
return None
Expand All @@ -501,14 +504,14 @@ def getfile(self, myfile, mydefault=None):
a.close()
return myreturn

def getelements(self, myfile):
def getelements(self, myfile) -> list:
"""A split/array representation of tbz2.getfile()"""
mydat = self.getfile(myfile)
if not mydat:
return []
return mydat.split()

def unpackinfo(self, mydest):
def unpackinfo(self, mydest) -> int:
"""Unpacks all the files from the dataSegment into 'mydest'."""
if not self.scan():
return 0
Expand Down Expand Up @@ -551,7 +554,7 @@ def unpackinfo(self, mydest):
a.close()
return 1

def get_data(self):
def get_data(self) -> dict[bytes, bytes]:
"""Returns all the files from the dataSegment as a map object."""
if not self.scan():
return {}
Expand All @@ -575,7 +578,7 @@ def get_data(self):
a.close()
return mydata

def getboth(self):
def getboth(self) -> tuple[bytes, bytes]:
"""Returns an array [indexSegment, dataSegment]"""
if not self.scan():
return None
Expand Down