diff --git a/usr/lib/hypnotix/common.py b/usr/lib/hypnotix/common.py
index 0e60b5f..859e442 100755
--- a/usr/lib/hypnotix/common.py
+++ b/usr/lib/hypnotix/common.py
@@ -47,7 +47,10 @@ def slugify(string):
class Provider:
def __init__(self, name, provider_info):
if provider_info is not None:
- self.name, self.type_id, self.url, self.username, self.password, self.epg = provider_info.split(":::")
+ parts = provider_info.split(":::")
+ if len(parts) != 6:
+ raise ValueError("Invalid provider info: expected 6 fields, got %d" % len(parts))
+ self.name, self.type_id, self.url, self.username, self.password, self.epg = parts
else:
self.name = name
self.path = os.path.join(PROVIDERS_PATH, slugify(self.name))
@@ -133,7 +136,7 @@ def __init__(self, provider, info):
class Manager:
def __init__(self, settings):
- os.system("mkdir -p '%s'" % PROVIDERS_PATH)
+ os.makedirs(PROVIDERS_PATH, exist_ok=True)
self.verbose = False
self.settings = settings
@@ -183,13 +186,13 @@ def get_playlist(self, provider, refresh=False) -> bool:
with open(provider.path, "w", encoding=response.encoding) as file:
# Grab data by block_bytes
for data in response.iter_content(block_bytes, decode_unicode=True):
- downloaded_bytes += block_bytes
- print("{} bytes".format(downloaded_bytes))
# if data is still bytes, decode it
if isinstance(data, bytes):
+ downloaded_bytes += len(data)
data = data.decode('utf-8', errors='ignore')
else:
data = str(data)
+ downloaded_bytes += len(data.encode('utf-8'))
# Write data to file
file.write(data)
if downloaded_bytes < total_content_size:
@@ -295,12 +298,15 @@ def load_channels(self, provider):
def load_favorites(self):
favorites = []
+ if not os.path.exists(FAVORITES_PATH):
+ return favorites
with open(FAVORITES_PATH, 'r', encoding="utf-8", errors="ignore") as f:
for line in f:
favorites.append(line.strip())
return favorites
def save_favorites(self, favorites):
+ os.makedirs(os.path.dirname(FAVORITES_PATH), exist_ok=True)
with open(FAVORITES_PATH, "w", encoding="utf-8") as f:
for fav in favorites:
f.write(f"{fav}\n")
diff --git a/usr/lib/hypnotix/hypnotix.py b/usr/lib/hypnotix/hypnotix.py
index a6be99f..4cc1dee 100755
--- a/usr/lib/hypnotix/hypnotix.py
+++ b/usr/lib/hypnotix/hypnotix.py
@@ -331,8 +331,9 @@ def __init__(self, application):
self.ytdlp_local_switch.set_active(self.settings.get_boolean("use-local-ytdlp"))
self.ytdlp_local_switch.connect("notify::active", self.on_ytdlp_local_switch_activated)
self.ytdlp_system_version_label.set_text(subprocess.getoutput("/usr/bin/yt-dlp --version"))
- if os.path.exists(os.path.expanduser("~/.cache/hypnotix/yt-dlp/yt-dlp")):
- self.ytdlp_local_version_label.set_text(subprocess.getoutput("~/.cache/hypnotix/yt-dlp/yt-dlp --version"))
+ ytdlp_local_bin = os.path.expanduser("~/.cache/hypnotix/yt-dlp/yt-dlp")
+ if os.path.exists(ytdlp_local_bin):
+ self.ytdlp_local_version_label.set_text(subprocess.getoutput([ytdlp_local_bin, "--version"]))
self.ytdlp_update_button.connect("clicked", self.update_ytdlp)
# Dark mode manager
@@ -647,13 +648,13 @@ def on_ytdlp_local_switch_activated(self, widget, data=None):
def update_ytdlp(self, widget=None):
path = os.path.expanduser("~/.cache/hypnotix/yt-dlp")
- os.chdir(path)
- if os.path.exists("yt-dlp"):
- subprocess.getoutput("./yt-dlp --update")
+ ytdlp_bin = os.path.join(path, "yt-dlp")
+ if os.path.exists(ytdlp_bin):
+ subprocess.getoutput([ytdlp_bin, "--update"])
else:
- subprocess.getoutput("wget https://github.com/yt-dlp/yt-dlp/releases/latest/download/yt-dlp")
- subprocess.getoutput("chmod a+rx ./yt-dlp")
- self.ytdlp_local_version_label.set_text(subprocess.getoutput("~/.cache/hypnotix/yt-dlp/yt-dlp --version"))
+ subprocess.getoutput(["wget", "-P", path, "https://github.com/yt-dlp/yt-dlp/releases/latest/download/yt-dlp"])
+ os.chmod(ytdlp_bin, 0o755)
+ self.ytdlp_local_version_label.set_text(subprocess.getoutput([ytdlp_bin, "--version"]))
@async_function
def download_channel_logos(self, logos_to_refresh):
@@ -712,11 +713,14 @@ def on_search_bar(self, widget):
if search_bar_text != self.latest_search_bar_text:
self.latest_search_bar_text = search_bar_text
self.search_bar.set_sensitive(False)
- GLib.timeout_add_seconds(0.1, self.on_search)
+ GLib.timeout_add(100, self.on_search)
def on_search(self):
self.visible_search_results = 0
channels = []
+ if self.active_provider is None:
+ self.search_bar.set_sensitive(True)
+ return False
for channel in self.active_provider.channels:
if self.latest_search_bar_text in channel.name.lower():
channels.append(channel)
@@ -877,7 +881,7 @@ def play_async(self, channel):
if self.mpv is not None:
self.mpv.stop()
self.mpv.pause = False
- print("CHANNEL: '%s' (%s)" % (channel.name, channel.url))
+ print("CHANNEL: '%s'" % channel.name)
if channel is not None and channel.url is not None:
# os.system("mpv --wid=%s %s &" % (self.wid, channel.url))
# self.mpv_drawing_area.show()
@@ -936,7 +940,7 @@ def monitor_playback(self):
self.mpv.unobserve_property("video-bitrate", self.on_bitrate)
self.mpv.unobserve_property("audio-bitrate", self.on_bitrate)
self.mpv.unobserve_property("core-idle", self.on_playback_changed)
- except:
+ except Exception:
pass
self.mpv.observe_property("video-params", self.on_video_params)
self.mpv.observe_property("video-format", self.on_video_format)
@@ -990,7 +994,7 @@ def on_bitrate(self, prop, bitrate):
@idle_function
def on_video_params(self, property, params):
- if not params or not type(params) == dict:
+ if not params or not isinstance(params, dict):
return
if "w" in params and "h" in params:
self.video_properties[_("General")][_("Dimensions")] = "%sx%s" % (params["w"],params["h"])
@@ -1012,7 +1016,7 @@ def on_video_format(self, property, vformat):
@idle_function
def on_audio_params(self, property, params):
- if not params or not type(params) == dict:
+ if not params or not isinstance(params, dict):
return
if "channels" in params:
chans = params["channels"]
@@ -1062,7 +1066,7 @@ def refresh_providers_page(self):
image.set_from_icon_name("xsi-tv-symbolic", Gtk.IconSize.BUTTON)
labels_box.pack_start(image, False, False, 0)
label = Gtk.Label()
- label.set_markup("%s" % provider.name)
+ label.set_markup("%s" % GLib.markup_escape_text(provider.name))
labels_box.pack_start(label, False, False, 0)
num = len(provider.channels)
if num > 0:
@@ -1424,13 +1428,8 @@ def open_about(self, widget):
dlg.set_program_name(_("Hypnotix"))
dlg.set_comments(_("Watch TV"))
try:
- h = open("/usr/share/common-licenses/GPL", encoding="utf-8")
- s = h.readlines()
- gpl = ""
- for line in s:
- gpl += line
- h.close()
- dlg.set_license(gpl)
+ with open("/usr/share/common-licenses/GPL", encoding="utf-8") as h:
+ dlg.set_license(h.read())
except Exception as e:
print(e)
@@ -1489,7 +1488,7 @@ def on_key_press_event(self, widget, event):
elif not event.keyval in [Gdk.KEY_F1, Gdk.KEY_F2]:
try:
self.mpv.command("keypress", Gdk.keyval_name(event.keyval))
- except:
+ except Exception:
pass
return True
# elif event.keyval == Gdk.KEY_Up:
@@ -1584,7 +1583,7 @@ def reload(self, page=None, refresh=False):
except Exception as e:
print(e)
traceback.print_exc()
- print("Couldn't parse provider info: ", provider_info)
+ print("Couldn't parse provider info (details omitted)")
# If there are more than 1 providers and no Active Provider, set to the first one
if len(self.providers) > 0 and self.active_provider is None:
diff --git a/usr/lib/hypnotix/xtream.py b/usr/lib/hypnotix/xtream.py
index 05775ca..ee8931f 100644
--- a/usr/lib/hypnotix/xtream.py
+++ b/usr/lib/hypnotix/xtream.py
@@ -145,7 +145,7 @@ def __init__(self, group_info: dict, stream_type: str):
self.group_type = MOVIES_GROUP
elif "Series" == stream_type:
self.group_type = SERIES_GROUP
- elif "Live":
+ elif stream_type == "Live":
self.group_type = TV_GROUP
else:
print("Unrecognized stream type `{}` for `{}`".format(
@@ -262,26 +262,8 @@ class XTream:
vod_type = "VOD"
series_type = "Series"
- auth_data = {}
- authorization = {}
-
- groups = []
- channels = []
- series = []
- movies = []
-
- state = {"authenticated": False, "loaded": False}
-
hide_adult_content = False
- catch_all_group = Group(
- {
- "category_id": "9999",
- "category_name":"xEverythingElse",
- "parent_id":0
- },
- ""
- )
# If the cached JSON file is older than threshold_time_sec then load a new
# JSON dictionary from the provider
threshold_time_sec = 60 * 60 * 8
@@ -321,12 +303,24 @@ def __init__(
self.hide_adult_content = hide_adult_content
self.user_agent = user_agent
+ self.auth_data = {}
+ self.authorization = {}
+ self.groups = []
+ self.channels = []
+ self.series = []
+ self.movies = []
+ self.state = {"authenticated": False, "loaded": False}
+ self.catch_all_group = Group(
+ {"category_id": "9999", "category_name": "xEverythingElse", "parent_id": 0},
+ "Live"
+ )
+
# if the cache_path is specified, test that it is a directory
if self.cache_path != "":
# If the cache_path is not a directory, clear it
if not osp.isdir(self.cache_path):
print(" - Cache Path is not a directory, using default '~/.xtream-cache/'")
- self.cache_path == ""
+ self.cache_path = ""
# If the cache_path is still empty, use default
if self.cache_path == "":
@@ -357,23 +351,22 @@ def search_stream(self, keyword: str, ignore_case: bool = True, return_type: str
print("Checking {} movies".format(len(self.movies)))
for stream in self.movies:
- if re.match(regex, stream.name) is not None:
+ if regex.match(stream.name) is not None:
search_result.append(stream.export_json())
print("Checking {} channels".format(len(self.channels)))
for stream in self.channels:
- if re.match(regex, stream.name) is not None:
+ if regex.match(stream.name) is not None:
search_result.append(stream.export_json())
print("Checking {} series".format(len(self.series)))
for stream in self.series:
- if re.match(regex, stream.name) is not None:
+ if regex.match(stream.name) is not None:
search_result.append(stream.export_json())
if return_type == "JSON":
- if search_result is not None:
- print("Found {} results `{}`".format(len(search_result), keyword))
- return json.dumps(search_result, ensure_ascii=False)
+ print("Found {} results `{}`".format(len(search_result), keyword))
+ return json.dumps(search_result, ensure_ascii=False)
else:
return search_result
@@ -562,9 +555,6 @@ def load_iptv(self):
))
## Add GROUPS to dictionaries
- # Add the catch-all-errors group
- self.groups.append(self.catch_all_group)
-
for cat_obj in all_cat:
# Create Group (Category)
new_group = Group(cat_obj, loading_stream_type)
@@ -661,13 +651,14 @@ def load_iptv(self):
self, group_title, stream_channel
)
- if new_channel.group_id == "9999":
- print(" - xEverythingElse Channel -> {} - {}".format(new_channel.name,new_channel.stream_type))
-
# Save the new channel to the local list of channels
if loading_stream_type == self.live_type:
+ if new_channel.group_id == "9999":
+ print(" - xEverythingElse Channel -> {} - {}".format(new_channel.name, new_channel.stream_type))
self.channels.append(new_channel)
elif loading_stream_type == self.vod_type:
+ if new_channel.group_id == "9999":
+ print(" - xEverythingElse Channel -> {} - {}".format(new_channel.name, new_channel.stream_type))
self.movies.append(new_channel)
else:
self.series.append(new_series)