Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
c65cd06
common: Replace os.system() mkdir with os.makedirs()
KAMI911 Mar 14, 2026
8074bee
common: Validate provider info field count before unpacking
KAMI911 Mar 14, 2026
899992d
common: Fix downloaded_bytes counter to track actual received bytes
KAMI911 Mar 14, 2026
d4e6206
common: Fix favorites file handling for first-run and missing directory
KAMI911 Mar 14, 2026
d758994
hypnotix: Remove credential-exposing data from log output
KAMI911 Mar 14, 2026
70770e6
hypnotix: Replace bare except clauses with except Exception
KAMI911 Mar 14, 2026
d5132e7
hypnotix: Fix update_ytdlp to use absolute paths instead of os.chdir()
KAMI911 Mar 14, 2026
5bdf013
hypnotix: Fix search debounce timer truncating float to zero
KAMI911 Mar 14, 2026
cf05cf8
hypnotix: Prevent crash in on_search when no provider is selected
KAMI911 Mar 14, 2026
f3f48e5
hypnotix: Escape provider name before inserting into Pango markup
KAMI911 Mar 14, 2026
2db42d2
hypnotix: Use context manager and read() for GPL license file
KAMI911 Mar 14, 2026
ddeee4b
hypnotix: Replace type() == dict with isinstance() checks
KAMI911 Mar 14, 2026
fdd4c14
xtream: Fix cache_path fallback never triggering (== vs =)
KAMI911 Mar 14, 2026
2da41aa
xtream: Fix always-true condition in Group stream type check
KAMI911 Mar 14, 2026
06bb03d
xtream: Move mutable class attributes to instance __init__
KAMI911 Mar 14, 2026
5e433b0
xtream: Fix UnboundLocalError when processing series streams
KAMI911 Mar 14, 2026
22845bd
xtream: Remove duplicate catch_all_group insertion in load_iptv
KAMI911 Mar 14, 2026
e732ce9
xtream: Use compiled regex.match() directly in search_stream
KAMI911 Mar 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions usr/lib/hypnotix/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ def slugify(string):
class Provider:
def __init__(self, name, provider_info):
if provider_info is not None:
self.name, self.type_id, self.url, self.username, self.password, self.epg = provider_info.split(":::")
parts = provider_info.split(":::")
if len(parts) != 6:
raise ValueError("Invalid provider info: expected 6 fields, got %d" % len(parts))
self.name, self.type_id, self.url, self.username, self.password, self.epg = parts
else:
self.name = name
self.path = os.path.join(PROVIDERS_PATH, slugify(self.name))
Expand Down Expand Up @@ -133,7 +136,7 @@ def __init__(self, provider, info):

class Manager:
def __init__(self, settings):
os.system("mkdir -p '%s'" % PROVIDERS_PATH)
os.makedirs(PROVIDERS_PATH, exist_ok=True)
self.verbose = False
self.settings = settings

Expand Down Expand Up @@ -183,13 +186,13 @@ def get_playlist(self, provider, refresh=False) -> bool:
with open(provider.path, "w", encoding=response.encoding) as file:
# Grab data by block_bytes
for data in response.iter_content(block_bytes, decode_unicode=True):
downloaded_bytes += block_bytes
print("{} bytes".format(downloaded_bytes))
# if data is still bytes, decode it
if isinstance(data, bytes):
downloaded_bytes += len(data)
data = data.decode('utf-8', errors='ignore')
else:
data = str(data)
downloaded_bytes += len(data.encode('utf-8'))
# Write data to file
file.write(data)
if downloaded_bytes < total_content_size:
Expand Down Expand Up @@ -295,12 +298,15 @@ def load_channels(self, provider):

def load_favorites(self):
favorites = []
if not os.path.exists(FAVORITES_PATH):
return favorites
with open(FAVORITES_PATH, 'r', encoding="utf-8", errors="ignore") as f:
for line in f:
favorites.append(line.strip())
return favorites

def save_favorites(self, favorites):
os.makedirs(os.path.dirname(FAVORITES_PATH), exist_ok=True)
with open(FAVORITES_PATH, "w", encoding="utf-8") as f:
for fav in favorites:
f.write(f"{fav}\n")
45 changes: 22 additions & 23 deletions usr/lib/hypnotix/hypnotix.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,9 @@ def __init__(self, application):
self.ytdlp_local_switch.set_active(self.settings.get_boolean("use-local-ytdlp"))
self.ytdlp_local_switch.connect("notify::active", self.on_ytdlp_local_switch_activated)
self.ytdlp_system_version_label.set_text(subprocess.getoutput("/usr/bin/yt-dlp --version"))
if os.path.exists(os.path.expanduser("~/.cache/hypnotix/yt-dlp/yt-dlp")):
self.ytdlp_local_version_label.set_text(subprocess.getoutput("~/.cache/hypnotix/yt-dlp/yt-dlp --version"))
ytdlp_local_bin = os.path.expanduser("~/.cache/hypnotix/yt-dlp/yt-dlp")
if os.path.exists(ytdlp_local_bin):
self.ytdlp_local_version_label.set_text(subprocess.getoutput([ytdlp_local_bin, "--version"]))
self.ytdlp_update_button.connect("clicked", self.update_ytdlp)

# Dark mode manager
Expand Down Expand Up @@ -647,13 +648,13 @@ def on_ytdlp_local_switch_activated(self, widget, data=None):

def update_ytdlp(self, widget=None):
path = os.path.expanduser("~/.cache/hypnotix/yt-dlp")
os.chdir(path)
if os.path.exists("yt-dlp"):
subprocess.getoutput("./yt-dlp --update")
ytdlp_bin = os.path.join(path, "yt-dlp")
if os.path.exists(ytdlp_bin):
subprocess.getoutput([ytdlp_bin, "--update"])
else:
subprocess.getoutput("wget https://github.com/yt-dlp/yt-dlp/releases/latest/download/yt-dlp")
subprocess.getoutput("chmod a+rx ./yt-dlp")
self.ytdlp_local_version_label.set_text(subprocess.getoutput("~/.cache/hypnotix/yt-dlp/yt-dlp --version"))
subprocess.getoutput(["wget", "-P", path, "https://github.com/yt-dlp/yt-dlp/releases/latest/download/yt-dlp"])
os.chmod(ytdlp_bin, 0o755)
self.ytdlp_local_version_label.set_text(subprocess.getoutput([ytdlp_bin, "--version"]))

@async_function
def download_channel_logos(self, logos_to_refresh):
Expand Down Expand Up @@ -712,11 +713,14 @@ def on_search_bar(self, widget):
if search_bar_text != self.latest_search_bar_text:
self.latest_search_bar_text = search_bar_text
self.search_bar.set_sensitive(False)
GLib.timeout_add_seconds(0.1, self.on_search)
GLib.timeout_add(100, self.on_search)

def on_search(self):
self.visible_search_results = 0
channels = []
if self.active_provider is None:
self.search_bar.set_sensitive(True)
return False
for channel in self.active_provider.channels:
if self.latest_search_bar_text in channel.name.lower():
channels.append(channel)
Expand Down Expand Up @@ -877,7 +881,7 @@ def play_async(self, channel):
if self.mpv is not None:
self.mpv.stop()
self.mpv.pause = False
print("CHANNEL: '%s' (%s)" % (channel.name, channel.url))
print("CHANNEL: '%s'" % channel.name)
if channel is not None and channel.url is not None:
# os.system("mpv --wid=%s %s &" % (self.wid, channel.url))
# self.mpv_drawing_area.show()
Expand Down Expand Up @@ -936,7 +940,7 @@ def monitor_playback(self):
self.mpv.unobserve_property("video-bitrate", self.on_bitrate)
self.mpv.unobserve_property("audio-bitrate", self.on_bitrate)
self.mpv.unobserve_property("core-idle", self.on_playback_changed)
except:
except Exception:
pass
self.mpv.observe_property("video-params", self.on_video_params)
self.mpv.observe_property("video-format", self.on_video_format)
Expand Down Expand Up @@ -990,7 +994,7 @@ def on_bitrate(self, prop, bitrate):

@idle_function
def on_video_params(self, property, params):
if not params or not type(params) == dict:
if not params or not isinstance(params, dict):
return
if "w" in params and "h" in params:
self.video_properties[_("General")][_("Dimensions")] = "%sx%s" % (params["w"],params["h"])
Expand All @@ -1012,7 +1016,7 @@ def on_video_format(self, property, vformat):

@idle_function
def on_audio_params(self, property, params):
if not params or not type(params) == dict:
if not params or not isinstance(params, dict):
return
if "channels" in params:
chans = params["channels"]
Expand Down Expand Up @@ -1062,7 +1066,7 @@ def refresh_providers_page(self):
image.set_from_icon_name("xsi-tv-symbolic", Gtk.IconSize.BUTTON)
labels_box.pack_start(image, False, False, 0)
label = Gtk.Label()
label.set_markup("<b>%s</b>" % provider.name)
label.set_markup("<b>%s</b>" % GLib.markup_escape_text(provider.name))
labels_box.pack_start(label, False, False, 0)
num = len(provider.channels)
if num > 0:
Expand Down Expand Up @@ -1424,13 +1428,8 @@ def open_about(self, widget):
dlg.set_program_name(_("Hypnotix"))
dlg.set_comments(_("Watch TV"))
try:
h = open("/usr/share/common-licenses/GPL", encoding="utf-8")
s = h.readlines()
gpl = ""
for line in s:
gpl += line
h.close()
dlg.set_license(gpl)
with open("/usr/share/common-licenses/GPL", encoding="utf-8") as h:
dlg.set_license(h.read())
except Exception as e:
print(e)

Expand Down Expand Up @@ -1489,7 +1488,7 @@ def on_key_press_event(self, widget, event):
elif not event.keyval in [Gdk.KEY_F1, Gdk.KEY_F2]:
try:
self.mpv.command("keypress", Gdk.keyval_name(event.keyval))
except:
except Exception:
pass
return True
# elif event.keyval == Gdk.KEY_Up:
Expand Down Expand Up @@ -1584,7 +1583,7 @@ def reload(self, page=None, refresh=False):
except Exception as e:
print(e)
traceback.print_exc()
print("Couldn't parse provider info: ", provider_info)
print("Couldn't parse provider info (details omitted)")

# If there are more than 1 providers and no Active Provider, set to the first one
if len(self.providers) > 0 and self.active_provider is None:
Expand Down
55 changes: 23 additions & 32 deletions usr/lib/hypnotix/xtream.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def __init__(self, group_info: dict, stream_type: str):
self.group_type = MOVIES_GROUP
elif "Series" == stream_type:
self.group_type = SERIES_GROUP
elif "Live":
elif stream_type == "Live":
self.group_type = TV_GROUP
else:
print("Unrecognized stream type `{}` for `{}`".format(
Expand Down Expand Up @@ -262,26 +262,8 @@ class XTream:
vod_type = "VOD"
series_type = "Series"

auth_data = {}
authorization = {}

groups = []
channels = []
series = []
movies = []

state = {"authenticated": False, "loaded": False}

hide_adult_content = False

catch_all_group = Group(
{
"category_id": "9999",
"category_name":"xEverythingElse",
"parent_id":0
},
""
)
# If the cached JSON file is older than threshold_time_sec then load a new
# JSON dictionary from the provider
threshold_time_sec = 60 * 60 * 8
Expand Down Expand Up @@ -321,12 +303,24 @@ def __init__(
self.hide_adult_content = hide_adult_content
self.user_agent = user_agent

self.auth_data = {}
self.authorization = {}
self.groups = []
self.channels = []
self.series = []
self.movies = []
self.state = {"authenticated": False, "loaded": False}
self.catch_all_group = Group(
{"category_id": "9999", "category_name": "xEverythingElse", "parent_id": 0},
"Live"
)

# if the cache_path is specified, test that it is a directory
if self.cache_path != "":
# If the cache_path is not a directory, clear it
if not osp.isdir(self.cache_path):
print(" - Cache Path is not a directory, using default '~/.xtream-cache/'")
self.cache_path == ""
self.cache_path = ""

# If the cache_path is still empty, use default
if self.cache_path == "":
Expand Down Expand Up @@ -357,23 +351,22 @@ def search_stream(self, keyword: str, ignore_case: bool = True, return_type: str

print("Checking {} movies".format(len(self.movies)))
for stream in self.movies:
if re.match(regex, stream.name) is not None:
if regex.match(stream.name) is not None:
search_result.append(stream.export_json())

print("Checking {} channels".format(len(self.channels)))
for stream in self.channels:
if re.match(regex, stream.name) is not None:
if regex.match(stream.name) is not None:
search_result.append(stream.export_json())

print("Checking {} series".format(len(self.series)))
for stream in self.series:
if re.match(regex, stream.name) is not None:
if regex.match(stream.name) is not None:
search_result.append(stream.export_json())

if return_type == "JSON":
if search_result is not None:
print("Found {} results `{}`".format(len(search_result), keyword))
return json.dumps(search_result, ensure_ascii=False)
print("Found {} results `{}`".format(len(search_result), keyword))
return json.dumps(search_result, ensure_ascii=False)
else:
return search_result

Expand Down Expand Up @@ -562,9 +555,6 @@ def load_iptv(self):
))
## Add GROUPS to dictionaries

# Add the catch-all-errors group
self.groups.append(self.catch_all_group)

for cat_obj in all_cat:
# Create Group (Category)
new_group = Group(cat_obj, loading_stream_type)
Expand Down Expand Up @@ -661,13 +651,14 @@ def load_iptv(self):
self, group_title, stream_channel
)

if new_channel.group_id == "9999":
print(" - xEverythingElse Channel -> {} - {}".format(new_channel.name,new_channel.stream_type))

# Save the new channel to the local list of channels
if loading_stream_type == self.live_type:
if new_channel.group_id == "9999":
print(" - xEverythingElse Channel -> {} - {}".format(new_channel.name, new_channel.stream_type))
self.channels.append(new_channel)
elif loading_stream_type == self.vod_type:
if new_channel.group_id == "9999":
print(" - xEverythingElse Channel -> {} - {}".format(new_channel.name, new_channel.stream_type))
self.movies.append(new_channel)
else:
self.series.append(new_series)
Expand Down