Skip to content

Commit

Permalink
PEP
Browse files Browse the repository at this point in the history
  • Loading branch information
Huevos committed Nov 17, 2024
1 parent 396cfbc commit a37a9ec
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 70 deletions.
42 changes: 19 additions & 23 deletions TerrestrialScan/src/MakeBouquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@

from enigma import eDVBResourceManager, eTimer, eDVBDB, eDVBFrontendParametersTerrestrial

import os
import sys

import datetime
import time

Expand All @@ -40,16 +37,16 @@ def __init__(self, session, args=0):
self.path = "/etc/enigma2"
self.services_dict = {}
self.tmp_services_dict = {}
self.namespace_dict = {} # to store namespace when sub network is enabled
self.namespace_dict = {} # to store namespace when sub network is enabled
self.logical_channel_number_dict = {}
self.ignore_visible_service_flag = False # make this a user override later if found necessary
self.ignore_visible_service_flag = False # make this a user override later if found necessary
self.VIDEO_ALLOWED_TYPES = [1, 4, 5, 17, 22, 24, 25, 27, 135]
self.AUDIO_ALLOWED_TYPES = [2, 10]
self.BOUQUET_PREFIX = "userbouquet.TerrestrialScan."
self.bouquetsIndexFilename = "bouquets.tv"
self.bouquetFilename = self.BOUQUET_PREFIX + "tv"
self.bouquetName = _('Terrestrial')
self.namespace_complete_terrestrial = not (config.usage.subnetwork_terrestrial.value if hasattr(config.usage, "subnetwork_terrestrial") else True) # config.usage.subnetwork not available in all images
self.namespace_complete_terrestrial = not (config.usage.subnetwork_terrestrial.value if hasattr(config.usage, "subnetwork_terrestrial") else True) # config.usage.subnetwork not available in all images

self.terrestrialXmlFilename = "terrestrial.xml"

Expand Down Expand Up @@ -149,7 +146,7 @@ def getFrontend(self):
return

if self.rawchannel:
del(self.rawchannel)
del self.rawchannel

self.frontend = None
self.rawchannel = None
Expand Down Expand Up @@ -187,12 +184,12 @@ def checkTunerLock(self):
self.dict = {}
self.frontend.getFrontendStatus(self.dict)
if self.dict["tuner_state"] == "TUNING":
if self.lockcounter < 1: # only show this once in the log per retune event
if self.lockcounter < 1: # only show this once in the log per retune event
print("[MakeBouquet][checkTunerLock] TUNING")
elif self.dict["tuner_state"] == "LOCKED":
print("[MakeBouquet][checkTunerLock] TUNER LOCKED")
self["action"].setText(_("Reading SI tables on %s MHz") % str(self.transponder["frequency"] // 1000000))
#self["status"].setText(_("???"))
# self["status"].setText(_("???"))

self.readTransponderCounter = 0
self.readTranspondertimer = eTimer()
Expand Down Expand Up @@ -225,7 +222,7 @@ def readSDT(self):
sdt_pid = 0x11
sdt_current_table_id = 0x42
mask = 0xff
sdtTimeout = 5 # maximum time allowed to read the service descriptor table (seconds)
sdtTimeout = 5 # maximum time allowed to read the service descriptor table (seconds)

sdt_current_version_number = -1
sdt_current_sections_read = []
Expand All @@ -248,7 +245,7 @@ def readSDT(self):

section = dvbreader.read_sdt(fd, sdt_current_table_id, 0x00)
if section is None:
time.sleep(0.1) # no data.. so we wait a bit
time.sleep(0.1) # no data.. so we wait a bit
continue

if section["header"]["table_id"] == sdt_current_table_id and not sdt_current_completed:
Expand All @@ -261,7 +258,7 @@ def readSDT(self):
if section["header"]["section_number"] not in sdt_current_sections_read:
sdt_current_sections_read.append(section["header"]["section_number"])
sdt_current_content += section["content"]
if self.tsid is None or self.onid is None: # save first read of tsid and onid, although data in self.transponder should already be correct.
if self.tsid is None or self.onid is None: # save first read of tsid and onid, although data in self.transponder should already be correct.
self.tsid = self.transponder["tsid"] = section["header"]["transport_stream_id"]
self.onid = self.transponder["onid"] = section["header"]["original_network_id"]

Expand All @@ -287,7 +284,7 @@ def readSDT(self):
continue

servicekey = "%x:%x:%x" % (service["transport_stream_id"], service["original_network_id"], service["service_id"])
service["signalQuality"] = self.transponder["signalQuality"] # Used for sorting of duplicate LCNs
service["signalQuality"] = self.transponder["signalQuality"] # Used for sorting of duplicate LCNs
self.tmp_services_dict[servicekey] = service

def readNIT(self):
Expand All @@ -296,12 +293,12 @@ def readNIT(self):

nit_current_pid = 0x10
nit_current_table_id = 0x40
nit_other_table_id = 0x00 # don't read other table
nit_other_table_id = 0x00 # don't read other table
if nit_other_table_id == 0x00:
mask = 0xff
else:
mask = nit_current_table_id ^ nit_other_table_id ^ 0xff
nit_current_timeout = 20 # maximum time allowed to read the network information table (seconds)
nit_current_timeout = 20 # maximum time allowed to read the network information table (seconds)

nit_current_version_number = -1
nit_current_sections_read = []
Expand All @@ -324,7 +321,7 @@ def readNIT(self):

section = dvbreader.read_nit(fd, nit_current_table_id, nit_other_table_id)
if section is None:
time.sleep(0.1) # no data.. so we wait a bit
time.sleep(0.1) # no data.. so we wait a bit
continue

if section["header"]["table_id"] == nit_current_table_id and not nit_current_completed:
Expand All @@ -351,11 +348,10 @@ def readNIT(self):
return

# descriptor_tag 0x5A is DVB-T, descriptor_tag 0x7f is DVB-T
transponders = [t for t in nit_current_content if "descriptor_tag" in t and t["descriptor_tag"] in (0x5A, 0x7f) and t["original_network_id"] == self.transponder["onid"] and t["transport_stream_id"] == self.transponder["tsid"]] # this should only ever have a length of one transponder
transponders = [t for t in nit_current_content if "descriptor_tag" in t and t["descriptor_tag"] in (0x5A, 0x7f) and t["original_network_id"] == self.transponder["onid"] and t["transport_stream_id"] == self.transponder["tsid"]] # this should only ever have a length of one transponder
print("[MakeBouquet][readNIT] transponders", transponders)
if transponders:

if transponders[0]["descriptor_tag"] == 0x5A: # DVB-T
if transponders[0]["descriptor_tag"] == 0x5A: # DVB-T
self.transponder["system"] = eDVBFrontendParametersTerrestrial.System_DVB_T
else: # must be DVB-T2
self.transponder["system"] = eDVBFrontendParametersTerrestrial.System_DVB_T2
Expand Down Expand Up @@ -392,7 +388,7 @@ def createBouquet(self):
break
self.tv_radio = tv_radio
bouquetIndexContent = self.readBouquetIndex()
if '"' + self.bouquetFilename[:-2] + tv_radio + '"' not in bouquetIndexContent: # only edit the index if bouquet file is not present
if '"' + self.bouquetFilename[:-2] + tv_radio + '"' not in bouquetIndexContent: # only edit the index if bouquet file is not present
self.writeBouquetIndex(bouquetIndexContent)
self.writeBouquet()

Expand All @@ -413,7 +409,7 @@ def solveDuplicates(self):
if config.plugins.TerrestrialScan.uhf_vhf.value == "australia":
vacant = [i for i in range(350, 400) if i not in self.services_dict]
for duplicate in self.duplicates:
if not vacant: # not slots available
if not vacant: # not slots available
break
self.services_dict[vacant.pop(0)] = duplicate

Expand All @@ -425,7 +421,7 @@ def iterateServicesBySNR(self, servicesDict):
def readBouquetIndex(self):
try:
bouquets = open(self.path + "/%s%s" % (self.bouquetsIndexFilename[:-2], self.tv_radio), "r")
except Exception as e:
except Exception:
return ""
content = bouquets.read()
bouquets.close()
Expand Down Expand Up @@ -512,4 +508,4 @@ def keyCancel(self):
def __onClose(self):
if self.frontend:
self.frontend = None
del(self.rawchannel)
del self.rawchannel
71 changes: 34 additions & 37 deletions TerrestrialScan/src/TerrestrialScan.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,14 @@
from Components.NimManager import nimmanager
from enigma import eDVBFrontendParameters, eDVBFrontendParametersTerrestrial, eDVBResourceManager, eTimer, iFrontendInformation

import os
import sys

import datetime
import time

from . import dvbreader
from .TerrestrialScanSkin import downloadBar


def setParams(frequency, system, bandwidth): # freq is nine digits (474000000), bandwidth in Hz (8000000)
def setParams(frequency, system, bandwidth): # freq is nine digits (474000000), bandwidth in Hz (8000000)
params = eDVBFrontendParametersTerrestrial()
params.frequency = frequency
params.bandwidth = bandwidth
Expand All @@ -43,30 +40,30 @@ def setParamsFe(params):
params_fe.setDVBT(params)
return params_fe

def channel2freq(channel, bandwidth=8): # Europe channels
if 4 < channel < 13: # Band III
def channel2freq(channel, bandwidth=8): # Europe channels
if 4 < channel < 13: # Band III
return (((177 + (bandwidth * (channel - 5))) * 1000000) + 500000)
elif 20 < channel < 70: # Bands IV,V
elif 20 < channel < 70: # Bands IV,V
return ((474 + (bandwidth * (channel - 21))) * 1000000) # returns nine digits


def getChannelNumber(frequency, descr):
f = (frequency + 50000) / 100000 / 10.
if descr in ("uhf", "uhf_vhf"):
if 174 < f < 230: # III
if 174 < f < 230: # III
d = (f + 1) % 7
return str(int(f - 174) // 7 + 5) + (d < 3 and "-" or d > 4 and "+" or "")
elif 470 <= f < 863: # IV,V
elif 470 <= f < 863: # IV,V
d = (f + 2) % 8
return str(int(f - 470) // 8 + 21) + (d < 3.5 and "-" or d > 4.5 and "+" or "")
elif descr == "australia":
if 174 < f < 202: # III: CH6-CH9
if 174 < f < 202: # III: CH6-CH9
return str(int(f - 174) // 7 + 6)
elif 202 <= f < 209: # III: CH9A
elif 202 <= f < 209: # III: CH9A
return "9A"
elif 209 <= f < 230: # III: CH10-CH12
elif 209 <= f < 230: # III: CH10-CH12
return str(int(f - 209) // 7 + 10)
elif 526 < f < 820: # IV, V: CH28-CH69
elif 526 < f < 820: # IV, V: CH28-CH69
return str(int(f - 526) // 7 + 28)
return ""

Expand Down Expand Up @@ -97,7 +94,7 @@ def __init__(self, session, args=0):
self.uhf_vhf = "uhf"
self.networkid = 0
self.restrict_to_networkid = False
self.stabliseTime = 2 # time in seconds for tuner to stablise on tune before taking a signal quality reading
self.stabliseTime = 2 # time in seconds for tuner to stablise on tune before taking a signal quality reading
self.region = None
self.country = None
self.skipT2 = False
Expand Down Expand Up @@ -126,21 +123,21 @@ def __init__(self, session, args=0):
self.index = 0
self.frequency = 0
self.system = eDVBFrontendParametersTerrestrial.System_DVB_T
self.lockTimeout = 50 # 100ms for tick - 5 sec
self.tsidOnidTimeout = 100 # 100ms for tick - 10 sec
self.snrTimeout = 100 # 100ms for tick - 10 sec
self.bandwidth = 8 # MHz
self.lockTimeout = 50 # 100ms for tick - 5 sec
self.tsidOnidTimeout = 100 # 100ms for tick - 10 sec
self.snrTimeout = 100 # 100ms for tick - 10 sec
self.bandwidth = 8 # MHz
self.scanTransponders = []
systems = (eDVBFrontendParametersTerrestrial.System_DVB_T,) if self.skipT2 else (eDVBFrontendParametersTerrestrial.System_DVB_T, eDVBFrontendParametersTerrestrial.System_DVB_T2)
if self.uhf_vhf == "uhf_vhf":
bandwidth = 7
for a in range(5, 13):
for b in systems: # system
for b in systems: # system
self.scanTransponders.append({"frequency": channel2freq(a, bandwidth), "system": b, "bandwidth": bandwidth * 1000000})
if self.uhf_vhf in ("uhf", "uhf_short", "uhf_vhf"):
bandwidth = 8
for a in range(21, 50 if self.uhf_vhf == "uhf_short" else 70):
for b in systems: # system
for b in systems: # system
self.scanTransponders.append({"frequency": channel2freq(a, bandwidth), "system": b, "bandwidth": bandwidth * 1000000})
if self.uhf_vhf == "australia":
bandwidth = 7
Expand All @@ -152,9 +149,9 @@ def __init__(self, session, args=0):
# frequency 1, inversion 9, bandwidth 2, fechigh 4, feclow 5, modulation 3, transmission 7, guard 6, hierarchy 8, system 10, plp_id 1
for tp in nimmanager.getTranspondersTerrestrial(self.region):
# system contains "-1" when both DVB-T and DVB-T2 are to be scanned
if tp[10] < 1: # DVB-T
if tp[10] < 1: # DVB-T
self.scanTransponders.append({"frequency": tp[1], "system": eDVBFrontendParametersTerrestrial.System_DVB_T, "bandwidth": tp[2]})
if tp[10] != 0: # DVB-T2
if tp[10] != 0: # DVB-T2
self.scanTransponders.append({"frequency": tp[1], "system": eDVBFrontendParametersTerrestrial.System_DVB_T2, "bandwidth": tp[2]})
self.transponders_found = []
self.transponders_unique = {}
Expand Down Expand Up @@ -191,7 +188,7 @@ def search(self):
self.system = self.scanTransponders[self.index]["system"]
self.bandwidth = self.scanTransponders[self.index]["bandwidth"]
self.frequency = self.scanTransponders[self.index]["frequency"]
channelNumber = getChannelNumber(self.frequency, self.uhf_vhf == "xml" and ("australia" if self.country == "AUS" else "uhf") or self.uhf_vhf)
channelNumber = getChannelNumber(self.frequency, self.uhf_vhf == "xml" and ("australia" if self.country == "AUS" else "uhf") or self.uhf_vhf)
self.channelNumberText = (_("(ch %s)") % channelNumber) if channelNumber else ""
print("[TerrestrialScan][Search] Scan frequency %d %s" % (self.frequency, self.channelNumberText))
print("[TerrestrialScan][Search] Scan system %d" % self.system)
Expand All @@ -202,7 +199,7 @@ def search(self):
self["action"].setText(_("Tuning %s MHz %s") % (str(self.frequency // 1000000), self.channelNumberText))
self["status"].setText((len(self.transponders_unique) == 1 and _("Found %d unique transponder") or _("Found %d unique transponders")) % len(self.transponders_unique))
self.index += 1
if self.frequency in self.transponders_found or self.system == eDVBFrontendParametersTerrestrial.System_DVB_T2 and self.isT2tuner == False:
if self.frequency in self.transponders_found or self.system == eDVBFrontendParametersTerrestrial.System_DVB_T2 and self.isT2tuner is False:
print("[TerrestrialScan][Search] Skipping T2 search of %s MHz %s" % (str(self.frequency // 1000000), self.channelNumberText))
self.search()
return
Expand All @@ -216,7 +213,7 @@ def search(self):
answer = None
self.close(answer)

def config_mode(self, nim): # Workaround for OpenATV > 5.3
def config_mode(self, nim): # Workaround for OpenATV > 5.3
try:
return nim.config_mode
except AttributeError:
Expand All @@ -225,7 +222,7 @@ def config_mode(self, nim): # Workaround for OpenATV > 5.3
def getFrontend(self):
print("[TerrestrialScan][getFrontend] searching for available tuner")
nimList = []
if self.selectedNIM < 0: # automatic tuner selection
if self.selectedNIM < 0: # automatic tuner selection
for nim in nimmanager.nim_slots:
if self.config_mode(nim) not in ("nothing",) and (nim.isCompatible("DVB-T2") or (nim.isCompatible("DVB-S") and nim.canBeCompatible("DVB-T2"))):
nimList.append(nim.slot)
Expand All @@ -239,7 +236,7 @@ def getFrontend(self):
print("[TerrestrialScan][getFrontend] No terrestrial tuner found")
self.showError(_('No terrestrial tuner found'))
return
else: # manual tuner selection, and subsequent iterations
else: # manual tuner selection, and subsequent iterations
nim = nimmanager.nim_slots[self.selectedNIM]
if self.config_mode(nim) not in ("nothing",) and (nim.isCompatible("DVB-T2") or (nim.isCompatible("DVB-S") and nim.canBeCompatible("DVB-T2"))):
nimList.append(nim.slot)
Expand All @@ -264,7 +261,7 @@ def getFrontend(self):
self.showError(_('Cannot retrieve Resource Manager instance'))
return

if self.selectedNIM < 0: # automatic tuner selection
if self.selectedNIM < 0: # automatic tuner selection
print("[TerrestrialScan][getFrontend] Choosing NIM")

# stop pip if running
Expand All @@ -285,14 +282,14 @@ def getFrontend(self):

current_slotid = -1
if self.rawchannel:
del(self.rawchannel)
del self.rawchannel

self.frontend = None
self.rawchannel = None

nimList.reverse() # start from the last
nimList.reverse() # start from the last
for slotid in nimList:
if current_slotid == -1: # mark the first valid slotid in case of no other one is free
if current_slotid == -1: # mark the first valid slotid in case of no other one is free
current_slotid = slotid
self.rawchannel = resmanager.allocateRawChannel(slotid)
if self.rawchannel:
Expand Down Expand Up @@ -329,7 +326,7 @@ def getFrontend(self):

print("[TerrestrialScan][getFrontend] Will wait up to %i seconds for tuner lock." % (self.lockTimeout // 10))

self.selectedNIM = current_slotid # Remember for next iteration
self.selectedNIM = current_slotid # Remember for next iteration

self["tuner_text"].setText(chr(ord('A') + current_slotid))

Expand Down Expand Up @@ -361,7 +358,7 @@ def checkTunerLock(self):
self.dict = {}
self.frontend.getFrontendStatus(self.dict)
if self.dict["tuner_state"] == "TUNING":
if self.lockcounter < 1: # only show this once in the log per retune event
if self.lockcounter < 1: # only show this once in the log per retune event
print("[TerrestrialScan][checkTunerLock] TUNING")
elif self.dict["tuner_state"] == "LOCKED":
print("[TerrestrialScan][checkTunerLock] LOCKED")
Expand Down Expand Up @@ -399,12 +396,12 @@ def tsidOnidWait(self):
def getCurrentTsidOnid(self, from_retune=False):
adapter = 0
demuxer_device = "/dev/dvb/adapter%d/demux%d" % (adapter, self.demuxer_id)
start = time.time() # for debug info
start = time.time() # for debug info

sdt_pid = 0x11
sdt_current_table_id = 0x42
mask = 0xff
tsidOnidTimeout = 5 # maximum time allowed to read the service descriptor table (seconds)
tsidOnidTimeout = 5 # maximum time allowed to read the service descriptor table (seconds)
self.tsid = None
self.onid = None

Expand All @@ -423,7 +420,7 @@ def getCurrentTsidOnid(self, from_retune=False):

section = dvbreader.read_sdt(fd, sdt_current_table_id, 0x00)
if section is None:
time.sleep(0.1) # no data.. so we wait a bit
time.sleep(0.1) # no data.. so we wait a bit
continue

if section["header"]["table_id"] == sdt_current_table_id:
Expand Down Expand Up @@ -459,4 +456,4 @@ def signalQualityWait(self):
def __onClose(self):
if self.frontend:
self.frontend = None
del(self.rawchannel)
del self.rawchannel
3 changes: 1 addition & 2 deletions TerrestrialScan/src/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
from Components.Language import language
from Tools.Directories import resolveFilename, SCOPE_PLUGINS, SCOPE_LANGUAGE
import os
from Tools.Directories import resolveFilename, SCOPE_PLUGINS
import gettext

PluginLanguageDomain = "TerrestrialScan"
Expand Down
Loading

0 comments on commit a37a9ec

Please sign in to comment.