From a37a9ec7431d35c5f0fa9729f7bf8cf8f985c9f3 Mon Sep 17 00:00:00 2001 From: Huevos Date: Sun, 17 Nov 2024 20:21:54 +0100 Subject: [PATCH] PEP --- TerrestrialScan/src/MakeBouquet.py | 42 +++++++-------- TerrestrialScan/src/TerrestrialScan.py | 71 ++++++++++++-------------- TerrestrialScan/src/__init__.py | 3 +- TerrestrialScan/src/plugin.py | 14 +++-- 4 files changed, 60 insertions(+), 70 deletions(-) diff --git a/TerrestrialScan/src/MakeBouquet.py b/TerrestrialScan/src/MakeBouquet.py index e92471f..07984e3 100644 --- a/TerrestrialScan/src/MakeBouquet.py +++ b/TerrestrialScan/src/MakeBouquet.py @@ -14,9 +14,6 @@ from enigma import eDVBResourceManager, eTimer, eDVBDB, eDVBFrontendParametersTerrestrial -import os -import sys - import datetime import time @@ -40,16 +37,16 @@ def __init__(self, session, args=0): self.path = "/etc/enigma2" self.services_dict = {} self.tmp_services_dict = {} - self.namespace_dict = {} # to store namespace when sub network is enabled + self.namespace_dict = {} # to store namespace when sub network is enabled self.logical_channel_number_dict = {} - self.ignore_visible_service_flag = False # make this a user override later if found necessary + self.ignore_visible_service_flag = False # make this a user override later if found necessary self.VIDEO_ALLOWED_TYPES = [1, 4, 5, 17, 22, 24, 25, 27, 135] self.AUDIO_ALLOWED_TYPES = [2, 10] self.BOUQUET_PREFIX = "userbouquet.TerrestrialScan." self.bouquetsIndexFilename = "bouquets.tv" self.bouquetFilename = self.BOUQUET_PREFIX + "tv" self.bouquetName = _('Terrestrial') - self.namespace_complete_terrestrial = not (config.usage.subnetwork_terrestrial.value if hasattr(config.usage, "subnetwork_terrestrial") else True) # config.usage.subnetwork not available in all images + self.namespace_complete_terrestrial = not (config.usage.subnetwork_terrestrial.value if hasattr(config.usage, "subnetwork_terrestrial") else True) # config.usage.subnetwork not available in all images self.terrestrialXmlFilename = "terrestrial.xml" @@ -149,7 +146,7 @@ def getFrontend(self): return if self.rawchannel: - del(self.rawchannel) + del self.rawchannel self.frontend = None self.rawchannel = None @@ -187,12 +184,12 @@ def checkTunerLock(self): self.dict = {} self.frontend.getFrontendStatus(self.dict) if self.dict["tuner_state"] == "TUNING": - if self.lockcounter < 1: # only show this once in the log per retune event + if self.lockcounter < 1: # only show this once in the log per retune event print("[MakeBouquet][checkTunerLock] TUNING") elif self.dict["tuner_state"] == "LOCKED": print("[MakeBouquet][checkTunerLock] TUNER LOCKED") self["action"].setText(_("Reading SI tables on %s MHz") % str(self.transponder["frequency"] // 1000000)) - #self["status"].setText(_("???")) + # self["status"].setText(_("???")) self.readTransponderCounter = 0 self.readTranspondertimer = eTimer() @@ -225,7 +222,7 @@ def readSDT(self): sdt_pid = 0x11 sdt_current_table_id = 0x42 mask = 0xff - sdtTimeout = 5 # maximum time allowed to read the service descriptor table (seconds) + sdtTimeout = 5 # maximum time allowed to read the service descriptor table (seconds) sdt_current_version_number = -1 sdt_current_sections_read = [] @@ -248,7 +245,7 @@ def readSDT(self): section = dvbreader.read_sdt(fd, sdt_current_table_id, 0x00) if section is None: - time.sleep(0.1) # no data.. so we wait a bit + time.sleep(0.1) # no data.. so we wait a bit continue if section["header"]["table_id"] == sdt_current_table_id and not sdt_current_completed: @@ -261,7 +258,7 @@ def readSDT(self): if section["header"]["section_number"] not in sdt_current_sections_read: sdt_current_sections_read.append(section["header"]["section_number"]) sdt_current_content += section["content"] - if self.tsid is None or self.onid is None: # save first read of tsid and onid, although data in self.transponder should already be correct. + if self.tsid is None or self.onid is None: # save first read of tsid and onid, although data in self.transponder should already be correct. self.tsid = self.transponder["tsid"] = section["header"]["transport_stream_id"] self.onid = self.transponder["onid"] = section["header"]["original_network_id"] @@ -287,7 +284,7 @@ def readSDT(self): continue servicekey = "%x:%x:%x" % (service["transport_stream_id"], service["original_network_id"], service["service_id"]) - service["signalQuality"] = self.transponder["signalQuality"] # Used for sorting of duplicate LCNs + service["signalQuality"] = self.transponder["signalQuality"] # Used for sorting of duplicate LCNs self.tmp_services_dict[servicekey] = service def readNIT(self): @@ -296,12 +293,12 @@ def readNIT(self): nit_current_pid = 0x10 nit_current_table_id = 0x40 - nit_other_table_id = 0x00 # don't read other table + nit_other_table_id = 0x00 # don't read other table if nit_other_table_id == 0x00: mask = 0xff else: mask = nit_current_table_id ^ nit_other_table_id ^ 0xff - nit_current_timeout = 20 # maximum time allowed to read the network information table (seconds) + nit_current_timeout = 20 # maximum time allowed to read the network information table (seconds) nit_current_version_number = -1 nit_current_sections_read = [] @@ -324,7 +321,7 @@ def readNIT(self): section = dvbreader.read_nit(fd, nit_current_table_id, nit_other_table_id) if section is None: - time.sleep(0.1) # no data.. so we wait a bit + time.sleep(0.1) # no data.. so we wait a bit continue if section["header"]["table_id"] == nit_current_table_id and not nit_current_completed: @@ -351,11 +348,10 @@ def readNIT(self): return # descriptor_tag 0x5A is DVB-T, descriptor_tag 0x7f is DVB-T - transponders = [t for t in nit_current_content if "descriptor_tag" in t and t["descriptor_tag"] in (0x5A, 0x7f) and t["original_network_id"] == self.transponder["onid"] and t["transport_stream_id"] == self.transponder["tsid"]] # this should only ever have a length of one transponder + transponders = [t for t in nit_current_content if "descriptor_tag" in t and t["descriptor_tag"] in (0x5A, 0x7f) and t["original_network_id"] == self.transponder["onid"] and t["transport_stream_id"] == self.transponder["tsid"]] # this should only ever have a length of one transponder print("[MakeBouquet][readNIT] transponders", transponders) if transponders: - - if transponders[0]["descriptor_tag"] == 0x5A: # DVB-T + if transponders[0]["descriptor_tag"] == 0x5A: # DVB-T self.transponder["system"] = eDVBFrontendParametersTerrestrial.System_DVB_T else: # must be DVB-T2 self.transponder["system"] = eDVBFrontendParametersTerrestrial.System_DVB_T2 @@ -392,7 +388,7 @@ def createBouquet(self): break self.tv_radio = tv_radio bouquetIndexContent = self.readBouquetIndex() - if '"' + self.bouquetFilename[:-2] + tv_radio + '"' not in bouquetIndexContent: # only edit the index if bouquet file is not present + if '"' + self.bouquetFilename[:-2] + tv_radio + '"' not in bouquetIndexContent: # only edit the index if bouquet file is not present self.writeBouquetIndex(bouquetIndexContent) self.writeBouquet() @@ -413,7 +409,7 @@ def solveDuplicates(self): if config.plugins.TerrestrialScan.uhf_vhf.value == "australia": vacant = [i for i in range(350, 400) if i not in self.services_dict] for duplicate in self.duplicates: - if not vacant: # not slots available + if not vacant: # not slots available break self.services_dict[vacant.pop(0)] = duplicate @@ -425,7 +421,7 @@ def iterateServicesBySNR(self, servicesDict): def readBouquetIndex(self): try: bouquets = open(self.path + "/%s%s" % (self.bouquetsIndexFilename[:-2], self.tv_radio), "r") - except Exception as e: + except Exception: return "" content = bouquets.read() bouquets.close() @@ -512,4 +508,4 @@ def keyCancel(self): def __onClose(self): if self.frontend: self.frontend = None - del(self.rawchannel) + del self.rawchannel diff --git a/TerrestrialScan/src/TerrestrialScan.py b/TerrestrialScan/src/TerrestrialScan.py index 072318b..8dc7b3c 100644 --- a/TerrestrialScan/src/TerrestrialScan.py +++ b/TerrestrialScan/src/TerrestrialScan.py @@ -13,9 +13,6 @@ from Components.NimManager import nimmanager from enigma import eDVBFrontendParameters, eDVBFrontendParametersTerrestrial, eDVBResourceManager, eTimer, iFrontendInformation -import os -import sys - import datetime import time @@ -23,7 +20,7 @@ from .TerrestrialScanSkin import downloadBar -def setParams(frequency, system, bandwidth): # freq is nine digits (474000000), bandwidth in Hz (8000000) +def setParams(frequency, system, bandwidth): # freq is nine digits (474000000), bandwidth in Hz (8000000) params = eDVBFrontendParametersTerrestrial() params.frequency = frequency params.bandwidth = bandwidth @@ -43,30 +40,30 @@ def setParamsFe(params): params_fe.setDVBT(params) return params_fe -def channel2freq(channel, bandwidth=8): # Europe channels - if 4 < channel < 13: # Band III +def channel2freq(channel, bandwidth=8): # Europe channels + if 4 < channel < 13: # Band III return (((177 + (bandwidth * (channel - 5))) * 1000000) + 500000) - elif 20 < channel < 70: # Bands IV,V + elif 20 < channel < 70: # Bands IV,V return ((474 + (bandwidth * (channel - 21))) * 1000000) # returns nine digits def getChannelNumber(frequency, descr): f = (frequency + 50000) / 100000 / 10. if descr in ("uhf", "uhf_vhf"): - if 174 < f < 230: # III + if 174 < f < 230: # III d = (f + 1) % 7 return str(int(f - 174) // 7 + 5) + (d < 3 and "-" or d > 4 and "+" or "") - elif 470 <= f < 863: # IV,V + elif 470 <= f < 863: # IV,V d = (f + 2) % 8 return str(int(f - 470) // 8 + 21) + (d < 3.5 and "-" or d > 4.5 and "+" or "") elif descr == "australia": - if 174 < f < 202: # III: CH6-CH9 + if 174 < f < 202: # III: CH6-CH9 return str(int(f - 174) // 7 + 6) - elif 202 <= f < 209: # III: CH9A + elif 202 <= f < 209: # III: CH9A return "9A" - elif 209 <= f < 230: # III: CH10-CH12 + elif 209 <= f < 230: # III: CH10-CH12 return str(int(f - 209) // 7 + 10) - elif 526 < f < 820: # IV, V: CH28-CH69 + elif 526 < f < 820: # IV, V: CH28-CH69 return str(int(f - 526) // 7 + 28) return "" @@ -97,7 +94,7 @@ def __init__(self, session, args=0): self.uhf_vhf = "uhf" self.networkid = 0 self.restrict_to_networkid = False - self.stabliseTime = 2 # time in seconds for tuner to stablise on tune before taking a signal quality reading + self.stabliseTime = 2 # time in seconds for tuner to stablise on tune before taking a signal quality reading self.region = None self.country = None self.skipT2 = False @@ -126,21 +123,21 @@ def __init__(self, session, args=0): self.index = 0 self.frequency = 0 self.system = eDVBFrontendParametersTerrestrial.System_DVB_T - self.lockTimeout = 50 # 100ms for tick - 5 sec - self.tsidOnidTimeout = 100 # 100ms for tick - 10 sec - self.snrTimeout = 100 # 100ms for tick - 10 sec - self.bandwidth = 8 # MHz + self.lockTimeout = 50 # 100ms for tick - 5 sec + self.tsidOnidTimeout = 100 # 100ms for tick - 10 sec + self.snrTimeout = 100 # 100ms for tick - 10 sec + self.bandwidth = 8 # MHz self.scanTransponders = [] systems = (eDVBFrontendParametersTerrestrial.System_DVB_T,) if self.skipT2 else (eDVBFrontendParametersTerrestrial.System_DVB_T, eDVBFrontendParametersTerrestrial.System_DVB_T2) if self.uhf_vhf == "uhf_vhf": bandwidth = 7 for a in range(5, 13): - for b in systems: # system + for b in systems: # system self.scanTransponders.append({"frequency": channel2freq(a, bandwidth), "system": b, "bandwidth": bandwidth * 1000000}) if self.uhf_vhf in ("uhf", "uhf_short", "uhf_vhf"): bandwidth = 8 for a in range(21, 50 if self.uhf_vhf == "uhf_short" else 70): - for b in systems: # system + for b in systems: # system self.scanTransponders.append({"frequency": channel2freq(a, bandwidth), "system": b, "bandwidth": bandwidth * 1000000}) if self.uhf_vhf == "australia": bandwidth = 7 @@ -152,9 +149,9 @@ def __init__(self, session, args=0): # frequency 1, inversion 9, bandwidth 2, fechigh 4, feclow 5, modulation 3, transmission 7, guard 6, hierarchy 8, system 10, plp_id 1 for tp in nimmanager.getTranspondersTerrestrial(self.region): # system contains "-1" when both DVB-T and DVB-T2 are to be scanned - if tp[10] < 1: # DVB-T + if tp[10] < 1: # DVB-T self.scanTransponders.append({"frequency": tp[1], "system": eDVBFrontendParametersTerrestrial.System_DVB_T, "bandwidth": tp[2]}) - if tp[10] != 0: # DVB-T2 + if tp[10] != 0: # DVB-T2 self.scanTransponders.append({"frequency": tp[1], "system": eDVBFrontendParametersTerrestrial.System_DVB_T2, "bandwidth": tp[2]}) self.transponders_found = [] self.transponders_unique = {} @@ -191,7 +188,7 @@ def search(self): self.system = self.scanTransponders[self.index]["system"] self.bandwidth = self.scanTransponders[self.index]["bandwidth"] self.frequency = self.scanTransponders[self.index]["frequency"] - channelNumber = getChannelNumber(self.frequency, self.uhf_vhf == "xml" and ("australia" if self.country == "AUS" else "uhf") or self.uhf_vhf) + channelNumber = getChannelNumber(self.frequency, self.uhf_vhf == "xml" and ("australia" if self.country == "AUS" else "uhf") or self.uhf_vhf) self.channelNumberText = (_("(ch %s)") % channelNumber) if channelNumber else "" print("[TerrestrialScan][Search] Scan frequency %d %s" % (self.frequency, self.channelNumberText)) print("[TerrestrialScan][Search] Scan system %d" % self.system) @@ -202,7 +199,7 @@ def search(self): self["action"].setText(_("Tuning %s MHz %s") % (str(self.frequency // 1000000), self.channelNumberText)) self["status"].setText((len(self.transponders_unique) == 1 and _("Found %d unique transponder") or _("Found %d unique transponders")) % len(self.transponders_unique)) self.index += 1 - if self.frequency in self.transponders_found or self.system == eDVBFrontendParametersTerrestrial.System_DVB_T2 and self.isT2tuner == False: + if self.frequency in self.transponders_found or self.system == eDVBFrontendParametersTerrestrial.System_DVB_T2 and self.isT2tuner is False: print("[TerrestrialScan][Search] Skipping T2 search of %s MHz %s" % (str(self.frequency // 1000000), self.channelNumberText)) self.search() return @@ -216,7 +213,7 @@ def search(self): answer = None self.close(answer) - def config_mode(self, nim): # Workaround for OpenATV > 5.3 + def config_mode(self, nim): # Workaround for OpenATV > 5.3 try: return nim.config_mode except AttributeError: @@ -225,7 +222,7 @@ def config_mode(self, nim): # Workaround for OpenATV > 5.3 def getFrontend(self): print("[TerrestrialScan][getFrontend] searching for available tuner") nimList = [] - if self.selectedNIM < 0: # automatic tuner selection + if self.selectedNIM < 0: # automatic tuner selection for nim in nimmanager.nim_slots: if self.config_mode(nim) not in ("nothing",) and (nim.isCompatible("DVB-T2") or (nim.isCompatible("DVB-S") and nim.canBeCompatible("DVB-T2"))): nimList.append(nim.slot) @@ -239,7 +236,7 @@ def getFrontend(self): print("[TerrestrialScan][getFrontend] No terrestrial tuner found") self.showError(_('No terrestrial tuner found')) return - else: # manual tuner selection, and subsequent iterations + else: # manual tuner selection, and subsequent iterations nim = nimmanager.nim_slots[self.selectedNIM] if self.config_mode(nim) not in ("nothing",) and (nim.isCompatible("DVB-T2") or (nim.isCompatible("DVB-S") and nim.canBeCompatible("DVB-T2"))): nimList.append(nim.slot) @@ -264,7 +261,7 @@ def getFrontend(self): self.showError(_('Cannot retrieve Resource Manager instance')) return - if self.selectedNIM < 0: # automatic tuner selection + if self.selectedNIM < 0: # automatic tuner selection print("[TerrestrialScan][getFrontend] Choosing NIM") # stop pip if running @@ -285,14 +282,14 @@ def getFrontend(self): current_slotid = -1 if self.rawchannel: - del(self.rawchannel) + del self.rawchannel self.frontend = None self.rawchannel = None - nimList.reverse() # start from the last + nimList.reverse() # start from the last for slotid in nimList: - if current_slotid == -1: # mark the first valid slotid in case of no other one is free + if current_slotid == -1: # mark the first valid slotid in case of no other one is free current_slotid = slotid self.rawchannel = resmanager.allocateRawChannel(slotid) if self.rawchannel: @@ -329,7 +326,7 @@ def getFrontend(self): print("[TerrestrialScan][getFrontend] Will wait up to %i seconds for tuner lock." % (self.lockTimeout // 10)) - self.selectedNIM = current_slotid # Remember for next iteration + self.selectedNIM = current_slotid # Remember for next iteration self["tuner_text"].setText(chr(ord('A') + current_slotid)) @@ -361,7 +358,7 @@ def checkTunerLock(self): self.dict = {} self.frontend.getFrontendStatus(self.dict) if self.dict["tuner_state"] == "TUNING": - if self.lockcounter < 1: # only show this once in the log per retune event + if self.lockcounter < 1: # only show this once in the log per retune event print("[TerrestrialScan][checkTunerLock] TUNING") elif self.dict["tuner_state"] == "LOCKED": print("[TerrestrialScan][checkTunerLock] LOCKED") @@ -399,12 +396,12 @@ def tsidOnidWait(self): def getCurrentTsidOnid(self, from_retune=False): adapter = 0 demuxer_device = "/dev/dvb/adapter%d/demux%d" % (adapter, self.demuxer_id) - start = time.time() # for debug info + start = time.time() # for debug info sdt_pid = 0x11 sdt_current_table_id = 0x42 mask = 0xff - tsidOnidTimeout = 5 # maximum time allowed to read the service descriptor table (seconds) + tsidOnidTimeout = 5 # maximum time allowed to read the service descriptor table (seconds) self.tsid = None self.onid = None @@ -423,7 +420,7 @@ def getCurrentTsidOnid(self, from_retune=False): section = dvbreader.read_sdt(fd, sdt_current_table_id, 0x00) if section is None: - time.sleep(0.1) # no data.. so we wait a bit + time.sleep(0.1) # no data.. so we wait a bit continue if section["header"]["table_id"] == sdt_current_table_id: @@ -459,4 +456,4 @@ def signalQualityWait(self): def __onClose(self): if self.frontend: self.frontend = None - del(self.rawchannel) + del self.rawchannel diff --git a/TerrestrialScan/src/__init__.py b/TerrestrialScan/src/__init__.py index 26a138b..9b81595 100644 --- a/TerrestrialScan/src/__init__.py +++ b/TerrestrialScan/src/__init__.py @@ -1,7 +1,6 @@ # -*- coding: utf-8 -*- from Components.Language import language -from Tools.Directories import resolveFilename, SCOPE_PLUGINS, SCOPE_LANGUAGE -import os +from Tools.Directories import resolveFilename, SCOPE_PLUGINS import gettext PluginLanguageDomain = "TerrestrialScan" diff --git a/TerrestrialScan/src/plugin.py b/TerrestrialScan/src/plugin.py index aeccd03..337b21a 100644 --- a/TerrestrialScan/src/plugin.py +++ b/TerrestrialScan/src/plugin.py @@ -20,8 +20,6 @@ from .TerrestrialScan import TerrestrialScan, setParams from .MakeBouquet import MakeBouquet -import os - config.plugins.TerrestrialScan = ConfigSubsection() config.plugins.TerrestrialScan.networkid_bool = ConfigYesNo(default=False) config.plugins.TerrestrialScan.networkid = ConfigInteger(default=0, limits=(0, 65535)) @@ -33,7 +31,7 @@ ('uhf_short', _("UHF Europe channels 21-49")), ('uhf_vhf', _("UHF/VHF Europe")), ('australia', _("Australia generic"))] -if nimmanager.getTerrestrialsList(): # check transponders are available from terrestrial.xml +if nimmanager.getTerrestrialsList(): # check transponders are available from terrestrial.xml uhf_vhf_choices.append(('xml', _("From XML"))) config.plugins.TerrestrialScan.uhf_vhf = ConfigSelection(default='uhf', choices=uhf_vhf_choices) config.plugins.TerrestrialScan.makebouquet = ConfigYesNo(default=True) @@ -85,7 +83,7 @@ def __init__(self, session): self.createSetup() - if not self.selectionChanged in self["config"].onSelectionChanged: + if self.selectionChanged not in self["config"].onSelectionChanged: self["config"].onSelectionChanged.append(self.selectionChanged) self.selectionChanged() @@ -124,13 +122,13 @@ def createSetup(self): self["config"].l.setList(setup_list) def setTerrestrialLocationEntries(self): - slotid = self.dvbt_capable_nims[0] if self.scan_nims.value < 0 else self.scan_nims.value # number of first enabled terrestrial tuner if automatic is selected. + slotid = self.dvbt_capable_nims[0] if self.scan_nims.value < 0 else self.scan_nims.value # number of first enabled terrestrial tuner if automatic is selected. nimConfig = nimmanager.nim_slots[slotid].config # country if not hasattr(self, "terrestrialCountriesEntry"): terrestrialcountrycodelist = nimmanager.getTerrestrialsCountrycodeList() - terrestrialcountrycode = nimmanager.getTerrestrialCountrycode(slotid) # number of first enabled terrestrial tuner if automatic is selected. + terrestrialcountrycode = nimmanager.getTerrestrialCountrycode(slotid) # number of first enabled terrestrial tuner if automatic is selected. default = terrestrialcountrycode in terrestrialcountrycodelist and terrestrialcountrycode or None choices = [("all", _("All"))] + sorted([(x, self.countrycodeToCountry(x)) for x in terrestrialcountrycodelist], key=lambda listItem: listItem[1]) self.terrestrialCountries = ConfigSelection(default=default, choices=choices) @@ -184,7 +182,7 @@ def createSummary(self): return SetupSummary def keyGo(self): -# config.plugins.TerrestrialScan.save() + # config.plugins.TerrestrialScan.save() for x in self["config"].list: x[1].save() configfile.save() @@ -270,7 +268,7 @@ def startServiceSearchCallback(self, answer=None): if answer: self.close(True) - def config_mode(self, nim): # Workaround for OpenATV > 5.3 + def config_mode(self, nim): # Workaround for OpenATV > 5.3 try: return nim.config_mode except AttributeError: