diff --git a/README.md b/README.md index f921d9c..fd2d3e9 100644 --- a/README.md +++ b/README.md @@ -95,9 +95,14 @@ part of your geospatial project. ## 3.0.9.dev ### Bug fixes - - BREAKING CHANGE. Reader now raises TypeError instead of failing silently (e.g. in anticipation - of a subsequent call to Reader.load), when passed an unsupported truthy Python object (not a str - or a Path) as its first arg (the positional-only, non-keyword arg). + - Remove ambiguity in API (BREAKING CHANGE). Reader and Writer now both raise TypeError + when passed a supported Shapefile target + as the positional-only, non-keyword arg, but also passed kwargs for shp, dbf or shx (instead + of silently dropping or overwriting those kwargs, with the user being none the wiser). + - Bug fix (BREAKING CHANGE). Reader and Writer now raise TypeError instead of continuing silently + as if given no args (e.g. in anticipation of a subsequent call to Reader.load), when passed an unsupported + truthy Python object (not a str or a Path) as its first arg (the positional-only, non-keyword arg). + - BREAKING CHANGE. Correct NODATA lower threshold to -1e38 (as per [spec](https://www.esri.com/content/dam/esrisites/sitecore-archive/Files/Pdfs/library/whitepapers/pdfs/shapefile.pdf) 2, Numeric Types). This is 10% of PyShp's previous threshold -10e38, so it is possible large negative values x (-1e39 < x < -1e38 will now be NODATA. diff --git a/changelog.txt b/changelog.txt index 2be231c..f674263 100644 --- a/changelog.txt +++ b/changelog.txt @@ -1,10 +1,15 @@ VERSION 3.0.9.dev +2026-05-26 + Remove ambiguity in API (BREAKING CHANGE): + * Reader and Writer now both raise TypeError when passed a supported Shapefile target + as the positional-only, non-keyword arg, and also kwargs for shp, dbf or shx (instead + of silently dropping or overwriting those kwargs, with the user being none the wiser). 2026-05-26 Bug fix (BREAKING CHANGE): - * Reader now raises TypeError instead of failing silently (e.g. in anticipation of a subsequent call - to Reader.load), when passed an unsupported truthy Python object (not a str or a Path) as its first - arg (the positional-only, non-keyword arg). + * Reader and Writer now raise TypeError instead of continuing silently, as if given no args + (e.g. in anticipation of a subsequent call to Reader.load), when passed an unsupported + truthy Python object (not a str or a Path) as its first arg (the positional-only, non-keyword arg). 2026-05-23 Bug fix (BREAKING CHANGE): diff --git a/src/shapefile.py b/src/shapefile.py index 50d498e..91509c8 100644 --- a/src/shapefile.py +++ b/src/shapefile.py @@ -15,6 +15,7 @@ import doctest import functools import io +import itertools import logging import os import sys @@ -26,6 +27,7 @@ from contextlib import AbstractContextManager, ExitStack from datetime import date from os import PathLike +from pathlib import Path from struct import Struct, calcsize, error, pack, unpack from types import TracebackType from typing import ( @@ -46,7 +48,7 @@ runtime_checkable, ) from urllib.error import HTTPError -from urllib.parse import ParseResult, urlparse, urlunparse +from urllib.parse import SplitResult, urlsplit, urlunsplit from urllib.request import Request, urlopen # Preserve error in namespace in case a user imported it from PyShp @@ -63,6 +65,8 @@ os.getenv("REPLACE_REMOTE_URLS_WITH_LOCALHOST", "").lower() == "true" ) +IS_WINDOWS = sys.platform == "win32" + # Constants for shape types NULL = 0 POINT = 1 @@ -384,17 +388,6 @@ class GeoJSONFeatureCollectionWithBBox(GeoJSONFeatureCollection): ) # value to encode m=None as. Must be < ISDATA_LOWER_BOUND -@overload -def fsdecode_if_pathlike(path: PathLike[Any]) -> str: ... -@overload -def fsdecode_if_pathlike(path: T) -> T: ... -def fsdecode_if_pathlike(path: Any) -> Any: - if isinstance(path, PathLike): - return os.fsdecode(path) # str - - return path - - # Begin ARR_TYPE = TypeVar("ARR_TYPE", int, float) @@ -2277,29 +2270,31 @@ class UnsuccessfulFileDownload(Warning): ... class PossiblyCorruptFileHeader(Warning): ... -SUPPORTED_URL_SCHEMES = ("http", "https") # must be lower case +# Must be tuple of lower case strings (for str.lower.startswith()), +# and must not include Windows drive letters. +SUPPORTED_URL_SCHEMES = ("http", "https") DEFAULT_USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36" @overload def _try_to_download_binary_file( - urlinfo: ParseResult, + urlinfo: SplitResult, ) -> tuple[bytes, ReadableBinStream]: ... @overload def _try_to_download_binary_file( - urlinfo: ParseResult, + urlinfo: SplitResult, ext: str | None, suppress_http_errors: bool, user_agent: str, ) -> tuple[bytes, ReadableBinStream | None]: ... @overload def _try_to_download_binary_file( - urlinfo: ParseResult, + urlinfo: SplitResult, ext: str | None, suppress_http_errors: bool, ) -> tuple[bytes, ReadableBinStream | None]: ... def _try_to_download_binary_file( - urlinfo: ParseResult, + urlinfo: SplitResult, ext: str | None = None, suppress_http_errors: bool = False, user_agent: str = DEFAULT_USER_AGENT, @@ -2309,12 +2304,9 @@ def _try_to_download_binary_file( """ if ext is not None: - urlpath, _ = os.path.splitext( - urlinfo.path - ) # Removes e.g. ".shp", including the "." - urlinfo = urlinfo._replace(path=f"{urlpath}.{ext}") + urlinfo = urlinfo._replace(path=Path(urlinfo.path).with_suffix(ext).as_posix()) - url = urlunparse(urlinfo) + url = urlunsplit(urlinfo) req = Request( url, @@ -2336,7 +2328,7 @@ def _try_to_download_binary_file( raise e elif ext != ".shx": # Technically the .shx is required for an ESRI Shapefile, - # but it's not needed for PyShp, it only contains indices of shapes. + # but it's not needed for PyShp. It only contains indices of shapes. warnings.warn(msg, category=UnsuccessfulFileDownload) return b"", None @@ -2371,23 +2363,19 @@ def _try_to_download_binary_file( def _try_get_open_constituent_file( - shapefile_name: str, - ext: Literal["shp", "shx", "dbf"], + file: Path, + ext: Literal[".shp", ".shx", ".dbf"], ) -> IO[bytes] | None: """ Attempts to open a .shp, .dbf or .shx file, with both lower case and upper case file extensions, and return it. If it was not possible to open the file, None is returned. """ - # typing.LiteralString is only available from Python 3.11 onwards. - # https://docs.python.org/3/library/typing.html#typing.LiteralString - # assert ext in {'shp', 'dbf', 'shx'} - exts = {ext, ext.upper(), ext.lower()} for candidate_ext in exts: try: - return open(f"{shapefile_name}.{candidate_ext}", "rb") + return file.with_suffix(candidate_ext).open("rb") except OSError: pass return None @@ -2438,6 +2426,8 @@ def close(self) -> None: class _FileObjChecker(_HasExitStack, Generic[FileProtoT]): + """Base class that verifies its file-like object.""" + @property @abc.abstractmethod def FileProto(self) -> type[FileProtoT]: ... @@ -2448,52 +2438,37 @@ def new_file_obj_mode(self) -> Literal["rb", "w+b"]: ... @property @abc.abstractmethod - def ext(self) -> Literal[".shp", ".shx", ".dbf"]: ... + def ext(self) -> Literal[".shp", ".shx", ".dbf", None]: ... ExceptionClass = ShapefileException def __init__( self, - file: str | PathLike[Any] | FileProtoT, + file: str | os.PathLike[Any] | FileProtoT | None, ): super().__init__() - - file = fsdecode_if_pathlike(file) - self._file: str | FileProtoT - if not file: - raise TypeError( - f"file must be set to a str, Path or file-like object. Got: {file}" - ) - elif isinstance(file, str): - self._file = f"{os.path.splitext(file)[0]}{self.ext}" + if self.ext is None: + return + self._file: Path | FileProtoT | None + if isinstance(file, (str, os.PathLike)): + self._file = Path(file).with_suffix(self.ext) else: self._file = file - @functools.cached_property - def file(self) -> FileProtoT: - return self._ensure_file_obj() - # f=self._file, - # FileProto=self.FileProto, - # exit_stack=self.exit_stack, - # new_file_obj_mode="rb", - # ExceptionClass=dbfFileException, - # ) - def _ensure_file_obj( self, - f: str | FileProtoT | None = None, + f: str | os.PathLike[Any] | FileProtoT | None = None, ) -> FileProtoT: - """Safety handler to verify file-like objects""" + # Also used by Writer on constituent files self._shp & self._shx f = f or self._file - if not f: - raise self.ExceptionClass(f"No file-like object received. Got: {f}") - if isinstance(f, str): - dir_ = os.path.dirname(f) - if dir_ and not os.path.exists(dir_): - os.makedirs(dir_) - fp = open(f, self.new_file_obj_mode) + if isinstance(f, (str, os.PathLike)): + path = Path(f) + dir_ = path.parent + if dir_: + dir_.mkdir(exist_ok=True, parents=True) + fp = path.open(self.new_file_obj_mode) # Only push files created here to the exit stack. # The user must close their own file objects. @@ -2538,6 +2513,10 @@ def __init__(self, file: str | PathLike[Any] | ReadSeekableBinStream): self.file_size_B = self.file.tell() self.file.seek(0) + @functools.cached_property + def file(self) -> ReadSeekableBinStream: + return self._ensure_file_obj() + class DbfReader(_HasCheckedReadableFile): """Reads a dbf file's fields and records.""" @@ -2564,22 +2543,14 @@ def __init__( self._dbfHeader() - # @functools.cached_property - # def dbf(self) -> ReadSeekableBinStream: - # return self._ensure_file_obj( - # # f=self._file, - # # FileProto=self.FileProto, - # # exit_stack=self.exit_stack, - # # new_file_obj_mode="rb", - # # ExceptionClass=dbfFileException, - # ) - def __len__(self) -> int: """Returns the number of records in the .dbf file.""" return self.numRecords def _dbfHeader(self) -> None: - """Reads a dbf header. Xbase-related code borrows heavily from ActiveState Python Cookbook Recipe 362715 by Raymond Hettinger""" + """Reads a dbf header. Xbase-related code borrows heavily from + ActiveState Python Cookbook Recipe 362715 by Raymond Hettinger + """ # read relevant header parts self.file.seek(0) @@ -3072,7 +3043,6 @@ def _shape( try: shapeType = unpack(" None | IO[bytes]: - if file_ is None: + if file is None: return None - if isinstance(file_, (str, PathLike)): - baseName, __ = os.path.splitext(file_) - file_obj = _try_get_open_constituent_file(baseName, ext) + if isinstance(file, (str, PathLike)): + file_obj = _try_get_open_constituent_file(Path(file), ext) if file_obj is not None: self.exit_stack.enter_context(file_obj) return file_obj - if hasattr(file_, "read"): + if hasattr(file, "read"): # Copy if required try: - file_.seek(0) - return file_ + file.seek(0) + return file except (NameError, io.UnsupportedOperation): # Read the whole file into a seekable wrapper. - return io.BytesIO(file_.read()) + return io.BytesIO(file.read()) raise ShapefileException( - f"Could not load shapefile constituent file from: {file_}" + f"Could not load shapefile constituent file from: {file}" ) - def _load_shp_shx_or_dbf_from_url( + def _download_binary_file_from_url( self, - urlinfo: ParseResult, - ext: Literal["shp", "shx", "dbf"], + urlinfo: SplitResult, + ext: Literal[".shp", ".shx", ".dbf", ".zip"], + suppress_http_errors: bool = True, ) -> tempfile._TemporaryFileWrapper[bytes] | None: sniffed_bytes, resp = _try_to_download_binary_file( urlinfo=urlinfo, ext=ext, - suppress_http_errors=True, + suppress_http_errors=suppress_http_errors, ) if resp is None: return None # Use tempfile as source for url data. fileobj = _save_to_named_tmp_file(resp, initial_bytes=sniffed_bytes) - self.exit_stack.enter_context(fileobj) return fileobj - def _load_from_url(self, url: str) -> None: + def _load_from_url(self, urlinfo: SplitResult) -> None: # Shapefile is from a url # Download each file to temporary path and treat as normal shapefile path - urlinfo = urlparse(url) - self._shp = self._load_shp_shx_or_dbf_from_url(urlinfo, "shp") - self._shx = self._load_shp_shx_or_dbf_from_url(urlinfo, "shx") - self._dbf = self._load_shp_shx_or_dbf_from_url(urlinfo, "dbf") - if self._shp is None and self._dbf is None: - raise ShapefileException(f"Failed to download .shp or .dbf from: {url}") - - def _load_shp_shx_or_dbf_from_zip( + self._shp = self._download_binary_file_from_url(urlinfo, ".shp") + self._shx = self._download_binary_file_from_url(urlinfo, ".shx") + self._dbf = self._download_binary_file_from_url(urlinfo, ".dbf") + + shp_or_dbf_loaded = False + if self._shx is not None: + self.exit_stack.enter_context(self._shx) + if self._shp is not None: + self.exit_stack.enter_context(self._shp) + shp_or_dbf_loaded = True + if self._dbf is not None: + self.exit_stack.enter_context(self._dbf) + shp_or_dbf_loaded = True + + if not shp_or_dbf_loaded: + raise ShapefileException( + f"Failed to download .shp or .dbf from: {urlunsplit(urlinfo)}" + ) + + def _load_file_from_zip_to_tmp_file( self, archive: zipfile.ZipFile, - shapefile: str, - ext: Literal["shp", "shx", "dbf"], + file: Path, + ext: Literal[".shp", ".shx", ".dbf"], ) -> tempfile._TemporaryFileWrapper[bytes] | None: for cased_ext in {ext.lower(), ext.upper(), ext}: try: - member = archive.open(f"{shapefile}.{cased_ext}") - # Use read+write tempfile as source for member data. - fileobj = _save_to_named_tmp_file(member) - self.exit_stack.enter_context(fileobj) - return fileobj - except (OSError, AttributeError, KeyError): - pass + member = archive.open(file.with_suffix(cased_ext).as_posix()) + except (OSError, KeyError): + continue + # Use read+write tempfile as source for member data. + fileobj = _save_to_named_tmp_file(member) + self.exit_stack.enter_context(fileobj) + return fileobj return None - def _load_from_zip(self, path: str) -> None: - # Shapefile is inside a zipfile - if path.count(".zip") > 1: - # Multiple nested zipfiles - raise ShapefileException( - f"Reading from multiple nested zipfiles is not supported: {path}" - ) - # Split into zipfile and shapefile paths - if path.endswith(".zip"): - zpath = path - shapefile = None - else: - N = path.find(".zip") + 4 - zpath = path[:N] # endswith(".zip") - shapefile = path[N + 1 :] # skip "." or ":" ? Support archive.zip:shapes - - # Declare a zip file handle - zipfileobj: tempfile._TemporaryFileWrapper[bytes] | io.BufferedReader - urlinfo = urlparse(zpath) - - resp: ReadableBinStream | None - if urlinfo.scheme in SUPPORTED_URL_SCHEMES: - # Zipfile is from a url - # Download to a temporary file and treat as normal zipfile - sniffed_bytes, resp = _try_to_download_binary_file(urlinfo=urlinfo) - - # Use named tmp file as source for zip file data. - zipfileobj = _save_to_named_tmp_file( - resp, - initial_bytes=sniffed_bytes, - suffix=".zip", - ) + def _load_from_local_zip_file( + self, + path: Path, + zippath: Path, + ) -> None: + shapefile: Path | None + + if path != zippath: + more_zipfile_parents = [ + parent + for parent in path.relative_to(zippath).parents + if parent.suffix.lower() == ".zip" + ] + if more_zipfile_parents: + warnings.warn( + f"Reading from multiple nested zipfiles is not supported. " + "Multiple path segments requested within zip file, that " + f"also look like zip files: {more_zipfile_parents}. " + "Trying to open Shapefile at: {path} anyway " + ) + shapefile = path.relative_to(zippath) else: - # Zipfile is from a file - zipfileobj = open(zpath, mode="rb") + shapefile = None + + with open(zippath, mode="rb") as zipfileobj: + self._load_from_zipfileobj(zipfileobj, shapefile) + def _load_from_zipfileobj( + self, + zipfileobj: tempfile._TemporaryFileWrapper[bytes] | io.BufferedReader, + shapefile: Path | None = None, + ) -> None: # Open the zipfile archive with zipfile.ZipFile(zipfileobj, "r") as archive: - if not shapefile: + if shapefile is None: # Only the zipfile path is given # Inspect zipfile contents to find the full shapefile path - shapefiles = [ - name + constituent_files = ( + Path(name) for name in archive.namelist() - if name.endswith((".SHP", ".shp")) - ] + if name.lower().endswith((".shp", ".dbf", ".shx")) + ) + + def without_ext(path: Path) -> Path: + return path.with_suffix("") + + shapefiles = list(itertools.groupby(constituent_files, without_ext)) # The zipfile must contain exactly one shapefile - if len(shapefiles) == 0: + if not shapefiles: raise ShapefileException("Zipfile does not contain any shapefiles") - if len(shapefiles) == 1: - shapefile = shapefiles[0] - else: + if len(shapefiles) >= 2: raise ShapefileException( f"Zipfile contains more than one shapefile: {shapefiles}. " - "Please specify the full path to the shapefile you would like to open." + "Please ensure the zip file is on the file system " + "(download it if necessary) " + "and specify the full path to the shapefile in it " + "that you would like to open, " + "e.g. 'my_zip_file.zip/this_particular_shape_file.shp'. " ) + shapefile, _files = shapefiles[0] # Try to extract file-like objects from zipfile - name = os.path.splitext(shapefile)[0] # root shapefile name - self._shp = self._load_shp_shx_or_dbf_from_zip(archive, name, "shp") - self._shx = self._load_shp_shx_or_dbf_from_zip(archive, name, "shx") - self._dbf = self._load_shp_shx_or_dbf_from_zip(archive, name, "dbf") + self._shp = self._load_file_from_zip_to_tmp_file(archive, shapefile, ".shp") + self._shx = self._load_file_from_zip_to_tmp_file(archive, shapefile, ".shx") + self._dbf = self._load_file_from_zip_to_tmp_file(archive, shapefile, ".dbf") - # Close and delete the temporary zipfile - try: - zipfileobj.close() - # TODO Does catching all possible exceptions really increase - # the chances of closing the zipfile successully, or does it - # just mean .close() failures will still fail, but fail - # silently? - except: # noqa: E722 - pass - - def load(self, shapefile: str | None = None) -> None: + def load(self, file: str | os.PathLike[Any]) -> None: """Opens a shapefile from a filename or file-like object. Normally this method would be called by the constructor with the file name as an argument.""" - if shapefile: - (shapeName, __ext) = os.path.splitext(shapefile) - self.shapeName = shapeName - self.load_shp(shapeName) - self.load_shx(shapeName) - self.load_dbf(shapeName) + if not file: + raise ShapefileException(f"No Shapefile to load. Got: {file=}") + self.path = file + file = Path(file) + self.shapeName = file.stem + self.load_shp(file) + self.load_shx(file) + self.load_dbf(file) if not (self._shp or self._dbf): raise ShapefileException( - f"Unable to open {shapeName}.dbf or {shapeName}.shp." + f"Neither {self.shapeName}.dbf nor {self.shapeName}.shp could be opened " ) - def load_shp(self, shapefile_name: str) -> None: + def load_shp(self, file: Path) -> None: """ Attempts to load file with .shp extension as both lower and upper case """ - self._shp = _try_get_open_constituent_file(shapefile_name, "shp") + self._shp = _try_get_open_constituent_file(file, ".shp") if self._shp: self.exit_stack.enter_context(self._shp) self._get_shp_reader() - def load_shx(self, shapefile_name: str) -> None: + def load_shx(self, file: Path) -> None: """ Attempts to load file with .shx extension as both lower and upper case """ - self._shx = _try_get_open_constituent_file(shapefile_name, "shx") + self._shx = _try_get_open_constituent_file(file, ".shx") if self._shx: self.exit_stack.enter_context(self._shx) self._get_shx_reader() - def load_dbf(self, shapefile_name: str) -> None: + def load_dbf(self, file: Path) -> None: """ Attempts to load file with .dbf extension as both lower and upper case """ - self._dbf = _try_get_open_constituent_file(shapefile_name, "dbf") + self._dbf = _try_get_open_constituent_file(file, ".dbf") if self._dbf: self.exit_stack.enter_context(self._dbf) self._get_dbf_reader() @@ -3730,11 +3761,21 @@ def _try_to_flush_file_obj(f: WriteSeekableBinStream | str | None) -> None: pass -class DbfWriter(_FileObjChecker[WriteSeekableBinStream]): - """Writes .dbf files (dBASE database files), in particular those of Shapefiles.""" - +class _HasCheckedWriteableFile(_FileObjChecker[WriteSeekableBinStream]): FileProto = WriteSeekableBinStream new_file_obj_mode = "w+b" + + def __init__(self, file: str | PathLike[Any] | WriteSeekableBinStream): + super().__init__(file) + + @functools.cached_property + def file(self) -> WriteSeekableBinStream: + return self._ensure_file_obj() + + +class DbfWriter(_HasCheckedWriteableFile): + """Writes .dbf files (dBASE database files), in particular those of Shapefiles.""" + ext = ".dbf" def __init__( @@ -3757,26 +3798,16 @@ def __init__( self.recNum = 0 self.deletionFlag = 0 - @functools.cached_property - def dbf(self) -> WriteSeekableBinStream: - return self._ensure_file_obj( - # f=self._dbf, - # FileProto=WriteSeekableBinStream, - # exit_stack=self.exit_stack, - # new_file_obj_mode="w+b", - # ExceptionClass=dbfFileException, - ) - def close(self) -> None: """ Write final dbf header, close opened files. """ # Update the dbf header with final length etc - if _is_file_obj_open(self.dbf): + if _is_file_obj_open(self.file): self._dbfHeader() - _try_to_flush_file_obj(self.dbf) + _try_to_flush_file_obj(self.file) super().close() @@ -3798,7 +3829,7 @@ def field( def _dbfHeader(self) -> None: """Writes the dbf header and field descriptors.""" - f = self.dbf + f = self.file f.seek(0) version = 3 year, month, day = time.localtime()[:3] @@ -3884,7 +3915,7 @@ def record( def __dbfRecord(self, record: list[RecordValue]) -> None: """Writes the dbf records.""" - f = self.dbf + f = self.file if self.recNum == 0: # first records, so all fields should be set # allowing us to write the dbf header @@ -3983,7 +4014,7 @@ class Writer(_FileObjChecker[WriteSeekableBinStream]): FileProto = WriteSeekableBinStream new_file_obj_mode = "w+b" - ext = ".shp" + ext = None ExceptionClass = ShapefileException def __init__( @@ -4000,8 +4031,15 @@ def __init__( # Keep kwargs even though unused, to preserve PyShp 2.4 API **kwargs: Any, ): - target = fsdecode_if_pathlike(target) - self.target = target + # Don't call super().__init + if target is not None: + try: + target = Path(target) + except (ValueError, TypeError): + raise TypeError( + f"The target filepath {target!r} must be a str, Path, or os.PathLike, not {type(target)}." + ) + self.target: Path | None = target # User settable - see ### Geometry and Record Balancing in README.md self.autoBalance = autoBalance @@ -4009,26 +4047,27 @@ def __init__( # User settable - see #### Setting the Shape Type in README.md self.shapeType = shapeType - self._shp: str | WriteSeekableBinStream | None = shp - self._shx: str | WriteSeekableBinStream | None = shx - self._dbf: str | WriteSeekableBinStream | None = dbf + self._shp: Path | WriteSeekableBinStream | None = shp + self._shx: Path | WriteSeekableBinStream | None = shx + self._dbf: Path | WriteSeekableBinStream | None = dbf self._dbf_writer: DbfWriter | None = None self.exit_stack = ExitStack() - if target: - if not isinstance(target, str): + if target is not None: + if shp or shx or dbf: raise TypeError( - f"The target filepath {target!r} must be of type str/unicode or path-like, not {type(target)}." + "Unused kwargs were silently ignored by previous versions of PyShp. " + "Either specify target (first positional only arg), " + "or shp and/or dbf, possible plus shx" ) - self._shp = os.path.splitext(target)[0] + ".shp" - self._shx = os.path.splitext(target)[0] + ".shx" - self._dbf = os.path.splitext(target)[0] + ".dbf" + self._shp = target.with_suffix(".shp") + self._shx = target.with_suffix(".shx") + self._dbf = target.with_suffix(".dbf") elif not (shp or shx or dbf): raise TypeError( "Either the target filepath, or any of shp, shx, or dbf must be set to create a shapefile." ) if self._dbf: self._dbf_writer = DbfWriter( - target=target, dbf=self._dbf, encoding=encoding, encodingErrors=encodingErrors, @@ -4050,18 +4089,12 @@ def __init__( def shp(self) -> WriteSeekableBinStream: return self._ensure_file_obj( f=self._shp, - # FileProto=WriteSeekableBinStream, - # exit_stack=self.exit_stack, - # new_file_obj_mode="w+b", ) @functools.cached_property def shx(self) -> WriteSeekableBinStream: return self._ensure_file_obj( f=self._shx, - # FileProto=WriteSeekableBinStream, - # exit_stack=self.exit_stack, - # new_file_obj_mode="w+b", ) @functools.cached_property @@ -4075,7 +4108,7 @@ def dbf_writer(self) -> DbfWriter: @property def dbf(self) -> WriteSeekableBinStream: - return self.dbf_writer.dbf + return self.dbf_writer.file @property def fields(self) -> list[Field]: @@ -4130,7 +4163,7 @@ def close(self) -> None: dbf_open = ( False if self._dbf_writer is None - else _is_file_obj_open(self.dbf_writer.dbf) + else _is_file_obj_open(self.dbf_writer.file) ) # Balance if already not balanced @@ -4598,26 +4631,25 @@ def _replace_remote_url( query: str = "", fragment: str = "", ) -> str: - old_parsed = urlparse(old_url) + old_split = urlsplit(old_url) # Strip subpaths, so an artefacts # repo or file tree can be simpler and flat if path is None: - path = old_parsed.path.rpartition("/")[2] + path = old_split.path.rpartition("/")[2] if port not in (None, ""): # type: ignore[comparison-overlap] netloc = f"{netloc}:{port}" - new_parsed = old_parsed._replace( + new_split = old_split._replace( scheme=scheme, netloc=netloc, path=path, - params=params, query=query, fragment=fragment, ) - new_url = urlunparse(new_parsed) + new_url = urlunsplit(new_split) return new_url diff --git a/test_shapefile.py b/test_shapefile.py index bc2086f..a865385 100644 --- a/test_shapefile.py +++ b/test_shapefile.py @@ -1311,14 +1311,16 @@ def test_reader_corrupt_files(): w.record("value") w.line([[(1, 1), (1, 2), (2, 2)]]) # add junk byte data to end of dbf and shp files - w.dbf.write(b"12345") + w.dbf_writer.file.write(b"12345") w.shp.write(b"12345") # read the corrupt shapefile and assert that it reads correctly with pytest.warns(shapefile.PossiblyCorruptFileHeader): with shapefile.Reader(basename) as sf: # assert correct shapefile length metadata - assert len(sf) == sf.numRecords == sf.numShapes == 10 + assert len(sf) == 10 + assert sf.numRecords == 10 + assert sf.numShapes == 10 # assert that records are read without error assert len(sf.records()) == 10 # assert that didn't read the extra junk data @@ -1526,7 +1528,7 @@ def test_shaperecord_record(): def test_reader_zip_polyylinez_no_m_itershaperecords(): """ Make sure the M field is initialised to None (so the - fix from the bgu in 3.0.2 isn't regressed)! + fix from the bug in 3.0.2 isn't regressed)! Test Polygonz Shapes can be read, even if the m field is missing (all the points in this file are 2D only, so this could also be @@ -1538,7 +1540,7 @@ def test_reader_zip_polyylinez_no_m_itershaperecords(): Original source: https://github.com/OpenNHM/AvaFrameData/blob/main/avaPopeletzbach/ License CC-BY-4.0 """ - with shapefile.Reader("shapefiles/test/REL.zip") as sf: + with shapefile.Reader("shapefiles/test/REL.zip/REL/releaseArea20090407") as sf: for _shaperec in sf.iterShapeRecords(): pass