Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions examples/cdp_mode/playwright/raw_browserscan_nested.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from playwright.sync_api import sync_playwright
from seleniumbase import SB

with SB(uc=True, locale="en") as sb:
sb.activate_cdp_mode()
endpoint_url = sb.cdp.get_endpoint_url()

with sync_playwright() as p:
browser = p.chromium.connect_over_cdp(endpoint_url)
page = browser.contexts[0].pages[0]
page.goto("https://www.browserscan.net/bot-detection")
page.wait_for_timeout(1000)
sb.cdp.flash("Test Results", duration=4)
page.wait_for_timeout(1000)
sb.assert_element('strong:contains("Normal")')
sb.cdp.flash('strong:contains("Normal")', duration=4, pause=4)
15 changes: 15 additions & 0 deletions examples/cdp_mode/playwright/raw_browserscan_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from playwright.sync_api import sync_playwright
from seleniumbase import sb_cdp

sb = sb_cdp.Chrome(locale="en")
endpoint_url = sb.get_endpoint_url()

with sync_playwright() as p:
browser = p.chromium.connect_over_cdp(endpoint_url)
page = browser.contexts[0].pages[0]
page.goto("https://www.browserscan.net/bot-detection")
page.wait_for_timeout(1000)
sb.flash("Test Results", duration=4)
page.wait_for_timeout(1000)
sb.assert_element('strong:contains("Normal")')
sb.flash('strong:contains("Normal")', duration=4, pause=4)
11 changes: 11 additions & 0 deletions examples/cdp_mode/raw_muse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""(Bypasses Friendly Captcha)"""
from seleniumbase import SB

with SB(uc=True, test=True, guest=True) as sb:
url = "https://muse.jhu.edu/verify"
sb.activate_cdp_mode(url)
sb.sleep(1.5)
sb.solve_captcha()
sb.sleep(4)
sb.assert_element('#search_input')
sb.sleep(3)
18 changes: 18 additions & 0 deletions examples/cdp_mode/raw_muse_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""(Bypasses Friendly Captcha)"""
import asyncio
from seleniumbase import cdp_driver


async def main():
url = "https://muse.jhu.edu/verify"
driver = await cdp_driver.start_async(guest=True)
page = await driver.get(url)
await page.sleep(2.5)
await page.solve_captcha()
await page.sleep(4)
await page.find('#search_input')
await page.sleep(3)

if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
2 changes: 1 addition & 1 deletion mkdocs_build/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

regex>=2026.3.32
pymdown-extensions>=10.21.2
pipdeptree>=2.34.0
pipdeptree>=2.35.1
python-dateutil>=2.8.2
Markdown==3.10.2
click==8.3.1
Expand Down
9 changes: 4 additions & 5 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fasteners>=0.20
mycdp>=1.3.7
pynose>=1.5.5
platformdirs~=4.4.0;python_version<"3.10"
platformdirs>=4.9.4;python_version>="3.10"
platformdirs>=4.9.6;python_version>="3.10"
typing-extensions>=4.15.0
sbvirtualdisplay>=1.4.0
MarkupSafe>=3.0.3
Expand All @@ -29,7 +29,7 @@ pyreadline3>=3.5.4;platform_system=="Windows"
tabcompleter>=1.4.0
pdbp>=1.8.2
idna>=3.11
charset-normalizer>=3.4.6,<4
charset-normalizer>=3.4.7,<4
urllib3>=1.26.20,<2;python_version<"3.10"
urllib3>=1.26.20,<3;python_version>="3.10"
requests~=2.32.5;python_version<"3.10"
Expand All @@ -44,18 +44,17 @@ wsproto==1.2.0;python_version<"3.10"
wsproto~=1.3.2;python_version>="3.10"
websocket-client~=1.9.0
selenium==4.32.0;python_version<"3.10"
selenium==4.41.0;python_version>="3.10"
selenium==4.43.0;python_version>="3.10"
cssselect==1.3.0;python_version<"3.10"
cssselect>=1.4.0,<2;python_version>="3.10"
nest-asyncio==1.6.0
sortedcontainers==2.4.0
execnet==2.1.1;python_version<"3.10"
execnet==2.1.2;python_version>="3.10"
iniconfig==2.1.0;python_version<"3.10"
iniconfig==2.3.0;python_version>="3.10"
pluggy==1.6.0
pytest==8.4.2;python_version<"3.11"
pytest==9.0.2;python_version>="3.11"
pytest==9.0.3;python_version>="3.11"
pytest-html==4.0.2
pytest-metadata==3.1.1
pytest-ordering==0.6
Expand Down
2 changes: 1 addition & 1 deletion seleniumbase/__version__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# seleniumbase package
__version__ = "4.47.9"
__version__ = "4.48.0"
282 changes: 282 additions & 0 deletions seleniumbase/core/nest_asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
"""Patch asyncio to allow nested event loops."""
import asyncio
import asyncio.events as events
import os
import sys
import threading
from contextlib import contextmanager, suppress
from heapq import heappop

_run_close_loop = True


class PatchedNestAsyncio:
pass


def apply(loop=None, *, run_close_loop=False, error_on_mispatched=False):
global _run_close_loop
_patch_asyncio(error_on_mispatched=error_on_mispatched)
_patch_policy()
_patch_tornado()
loop = loop or _get_event_loop()
if loop is not None:
_patch_loop(loop)
_run_close_loop &= run_close_loop


if sys.version_info < (3, 14, 0):
def _get_event_loop():
return asyncio.get_event_loop()
else:
def _get_event_loop():
try:
return asyncio.get_event_loop()
except RuntimeError:
return None


if sys.version_info < (3, 12, 0):
def run(main, *, debug=False):
loop = asyncio.get_event_loop()
loop.set_debug(debug)
task = asyncio.ensure_future(main)
try:
return loop.run_until_complete(task)
finally:
if not task.done():
task.cancel()
with suppress(asyncio.CancelledError):
loop.run_until_complete(task)
else:
def run(main, *, debug=False, loop_factory=None):
new_event_loop = False
set_event_loop = None
try:
loop = asyncio.get_running_loop()
except RuntimeError:
if not _run_close_loop:
loop = _get_event_loop()
if loop is None:
if loop_factory is None:
loop_factory = asyncio.new_event_loop
loop = loop_factory()
asyncio.set_event_loop(loop)
else:
if loop_factory is None:
loop = asyncio.new_event_loop()
set_event_loop = _get_event_loop()
asyncio.set_event_loop(loop)
else:
loop = loop_factory()
new_event_loop = True
_patch_loop(loop)

loop.set_debug(debug)
task = asyncio.ensure_future(main, loop=loop)
try:
return loop.run_until_complete(task)
finally:
if not task.done():
task.cancel()
with suppress(asyncio.CancelledError):
loop.run_until_complete(task)
if set_event_loop:
asyncio.set_event_loop(set_event_loop)
if new_event_loop:
# Avoid ResourceWarning: unclosed event loop
loop.close()


def _patch_asyncio(*, error_on_mispatched=False):
"""Patch asyncio module to use pure Python tasks and futures."""

def _get_event_loop(stacklevel=3):
loop = events._get_running_loop()
if loop is None:
if sys.version_info < (3, 14, 0):
policy = events.get_event_loop_policy()
else:
policy = events._get_event_loop_policy()
loop = policy.get_event_loop()
return loop

if hasattr(asyncio, "_nest_patched"):
if not hasattr(asyncio, "_nest_asyncio"):
if error_on_mispatched:
raise RuntimeError("asyncio was already patched!")
elif sys.version_info >= (3, 12, 0):
import warnings
warnings.warn("asyncio was already patched!")
return

asyncio.tasks.Task = asyncio.tasks._PyTask
asyncio.Task = asyncio.tasks._CTask = asyncio.tasks.Task
asyncio.Future = asyncio.futures._CFuture = asyncio.futures.Future = (
asyncio.futures._PyFuture
)
asyncio.get_event_loop = _get_event_loop
events._get_event_loop = events.get_event_loop = asyncio.get_event_loop
asyncio.run = run
asyncio._nest_patched = True
asyncio._nest_asyncio = PatchedNestAsyncio()


def _patch_policy():
"""Patch the policy to always return a patched loop."""

def get_event_loop(self):
if self._local._loop is None:
loop = self.new_event_loop()
_patch_loop(loop)
self.set_event_loop(loop)
return self._local._loop

if sys.version_info < (3, 14, 0):
policy = events.get_event_loop_policy()
else:
policy = events._get_event_loop_policy()
policy.__class__.get_event_loop = get_event_loop


def _patch_loop(loop):
"""Patch loop to make it reentrant."""

def run_forever(self):
with manage_run(self), manage_asyncgens(self):
while True:
self._run_once()
if self._stopping:
break
self._stopping = False

def run_until_complete(self, future):
with manage_run(self):
f = asyncio.ensure_future(future, loop=self)
if f is not future:
f._log_destroy_pending = False
while not f.done():
self._run_once()
if self._stopping:
break
if not f.done():
raise RuntimeError("Loop stopped before Future completed!")
return f.result()

def _run_once(self):
"""Simplified re-implementation of asyncio's _run_once."""
ready = self._ready
scheduled = self._scheduled
while scheduled and scheduled[0]._cancelled:
heappop(scheduled)
timeout = (
0
if ready or self._stopping
else min(max(scheduled[0]._when - self.time(), 0), 86400)
if scheduled
else None
)
event_list = self._selector.select(timeout)
self._process_events(event_list)
end_time = self.time() + self._clock_resolution
while scheduled and scheduled[0]._when < end_time:
handle = heappop(scheduled)
ready.append(handle)
for _ in range(len(ready)):
if not ready:
break
handle = ready.popleft()
if not handle._cancelled:
if sys.version_info < (3, 14, 0):
curr_task = curr_tasks.pop(self, None)
else:
try:
curr_task = asyncio.tasks._swap_current_task(
self, None
)
except KeyError:
curr_task = None
try:
handle._run()
finally:
if curr_task is not None:
if sys.version_info < (3, 14, 0):
curr_tasks[self] = curr_task
else:
asyncio.tasks._swap_current_task(self, curr_task)
handle = None

@contextmanager
def manage_run(self):
self._check_closed()
old_thread_id = self._thread_id
old_running_loop = events._get_running_loop()
try:
self._thread_id = threading.get_ident()
events._set_running_loop(self)
self._num_runs_pending += 1
if self._is_proactorloop:
if self._self_reading_future is None:
self.call_soon(self._loop_self_reading)
yield
finally:
self._thread_id = old_thread_id
events._set_running_loop(old_running_loop)
self._num_runs_pending -= 1
if self._is_proactorloop:
if (
self._num_runs_pending == 0
and self._self_reading_future is not None
):
ov = self._self_reading_future._ov
self._self_reading_future.cancel()
if ov is not None:
self._proactor._unregister(ov)
self._self_reading_future = None

@contextmanager
def manage_asyncgens(self):
old_agen_hooks = sys.get_asyncgen_hooks()
try:
self._set_coroutine_origin_tracking(self._debug)
if self._asyncgens is not None:
sys.set_asyncgen_hooks(
firstiter=self._asyncgen_firstiter_hook,
finalizer=self._asyncgen_finalizer_hook,
)
yield
finally:
self._set_coroutine_origin_tracking(False)
if self._asyncgens is not None:
sys.set_asyncgen_hooks(*old_agen_hooks)

def _check_running(self):
"""Do not throw exception if loop is already running."""
pass

if hasattr(loop, "_nest_patched"):
return
if not isinstance(loop, asyncio.BaseEventLoop):
raise ValueError("Can't patch loop of type %s" % type(loop))
cls = loop.__class__
cls.run_forever = run_forever
cls.run_until_complete = run_until_complete
cls._run_once = _run_once
cls._check_running = _check_running
cls._num_runs_pending = 1 if loop.is_running() else 0
cls._is_proactorloop = os.name == "nt" and issubclass(
cls, asyncio.ProactorEventLoop
)
curr_tasks = asyncio.tasks._current_tasks
cls._nest_patched = True
cls._nest_asyncio = PatchedNestAsyncio()


def _patch_tornado():
"""If tornado is imported before nest_asyncio,
make tornado aware of the pure-Python asyncio Future."""
if "tornado" in sys.modules:
import tornado.concurrent as tc # type: ignore
tc.Future = asyncio.Future
if asyncio.Future not in tc.FUTURES:
tc.FUTURES += (asyncio.Future,)
Loading