Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,13 @@ Even more queries can be found [here](https://colab.research.google.com/github/R

# Latest updates

## Version 0.3.0 alpha 3
- Added parameters to the jsoniq magic to select the desired output to print: -j, -df, -pdf
- Added informative error message with a hint on how to fix when trying to get a DataFrame and there is no schema.
- Added parameter -t to the jsoniq magic to measure the response time
- The RumbleSession object now saves the latest result (sequence of items) in a field called lastResult. This is particularly useful in notebooks for post-processing a result in Python after obtained it through the jsoniq magic.
- Improved static type detection upon binding a pandas or pyspark DataFrame as an input variable to a JSONiq queries.

## Version 0.2.0 alpha 2
- You can change the result size cap through to the now accessible Rumble configuration (for example rumble .getRumbleConf().setResultSizeCap(10)). This controls how many items can be retrieved at most with a json() call. You can increase it to whichever number you would like if you reach the cap.
- Add the JSONiq magic to execute JSONiq queries directly in a notebook cell, using the RumbleDB instance shipped with the library.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "jsoniq"
version = "0.2.0a2"
version = "0.2.0a3"
description = "Python edition of RumbleDB, a JSONiq engine"
requires-python = ">=3.11"
dependencies = [
Expand Down
Binary file modified src/jsoniq/jars/rumbledb-1.24.0.jar
Binary file not shown.
28 changes: 28 additions & 0 deletions src/jsoniq/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,30 @@
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
import json
import sys

class SequenceOfItems:
schema_str = """
No DataFrame available as no schema was automatically detected. If you still believe the output is structured enough, you could add a schema and validate expression explicitly to your query.

This is an example of how you can simply define a schema and wrap your query in a validate expression:

declare type local:mytype as {
"product" : "string",
"store-number" : "int",
"quantity" : "decimal"
};
validate type local:mytype* {
for $product in json-lines("http://rumbledb.org/samples/products-small.json", 10)
where $product.quantity ge 995
return $product
}

RumbleDB keeps getting improved and automatic schema detection will improve as new versions get released. But even when RumbleDB fails to detect a schema, you can always declare your own schema as shown above.

For more information, see the documentation at https://docs.rumbledb.org/rumbledb-reference/types
"""

def __init__(self, sequence, rumblesession):
self._jsequence = sequence
self._rumblesession = rumblesession
Expand All @@ -28,9 +50,15 @@ def rdd(self):
return rdd.map(lambda l: json.loads(l))

def df(self):
if (not "DataFrame" in self._jsequence.availableOutputs()):
sys.stderr.write(self.schema_str)
return None
return DataFrame(self._jsequence.getAsDataFrame(), self._sparksession)

def pdf(self):
if (not "DataFrame" in self._jsequence.availableOutputs()):
sys.stderr.write(self.schema_str)
return None
return self.df().toPandas()

def count(self):
Expand Down
3 changes: 2 additions & 1 deletion src/jsoniq/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ def bindDataFrameAsVariable(self, name: str, df):

def jsoniq(self, str):
sequence = self._jrumblesession.runQuery(str);
return SequenceOfItems(sequence, self);
self.lastResult = SequenceOfItems(sequence, self);
return self.lastResult;

def __getattr__(self, item):
return getattr(self._sparksession, item)
73 changes: 59 additions & 14 deletions src/jsoniqmagic/magic.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,36 @@
from IPython.core.magic import Magics, cell_magic, magics_class
from IPython.core.magic_arguments import (
argument, magic_arguments, parse_argstring
)
import time, json
from jsoniq.session import RumbleSession
from py4j.protocol import Py4JJavaError

@magics_class
class JSONiqMagic(Magics):
@magic_arguments()
@argument(
'-t', '--timed', action='store_true', help='Measure execution time.'
)
@argument(
'-df', '--pyspark-data-frame', action='store_true', help='Prints the output as a Pyspark DataFrame (if a schema is available).'
)
@argument(
'-pdf', '--pandas-data-frame', action='store_true', help='Prints the output as a Pandas DataFrame (if a schema is available).'
)
@argument(
'-j', '--json', action='store_true', help='Prints the output as JSON.'
)
@argument(
'-u', '--apply-updates', action='store_true', help='Applies updates if a PUL is output.'
)
def run(self, line, cell=None, timed=False):
if cell is None:
data = line
else:
data = cell

args = parse_argstring(self.run, line)
start = time.time()
try:
rumble = RumbleSession.builder.getOrCreate();
Expand All @@ -28,28 +48,53 @@ def run(self, line, cell=None, timed=False):
print("Query unsuccessful.")
print("Usual reasons: firewall, misconfigured proxy.")
return
end = time.time()
if(timed):
print("Response time: %s ms" % (end - start))

if ("DataFrame" in response.availableOutputs()):
print(response.pdf())
elif ("Local" in response.availableOutputs()):
schema_str = """
No DataFrame available as no schema was detected. If you still believe the output is structured enough, you could add a schema and validate expression explicitly to your query.

This is an example of how you can simply define a schema and wrap your query in a validate expression:

declare type mytype as {
"product" : "string",
"store-number" : "int",
"quantity" : "decimal"
};
validate type mytype* {
for $product in json-lines("http://rumbledb.org/samples/products-small.json", 10)
where $product.quantity ge 995
return $product
}
"""

if(args.pyspark_data_frame):
df = response.df();
if df is not None:
df.show()

if (args.pandas_data_frame):
pdf = response.pdf()
if pdf is not None:
print(pdf)

if (args.apply_updates):
if ("PUL" in response.availableOutputs()):
response.applyPUL()
print("Updates applied successfully.")
else:
print("No Pending Update List (PUL) available to apply.")

if (args.json or (not args.pandas_data_frame and not args.pyspark_data_frame)):
capplusone = response.take(rumble.getRumbleConf().getResultSizeCap() + 1)
if len(capplusone) > rumble.getRumbleConf().getResultSizeCap():
count = response.count()
print("The query output %s items, which is too many to display. Displaying the first %s items:" % (count, rumble.getRumbleConf().getResultSizeCap()))
for e in capplusone[:rumble.getRumbleConf().getResultSizeCap()]:
print(json.dumps(json.loads(e.serializeAsJSON()), indent=2))
elif ("PUL" in response.availableOutputs()):
print("The query output a Pending Update List.")
else:
print("No output available.")

end = time.time()
if(args.timed):
print("Response time: %s ms" % (end - start))

@cell_magic
def jsoniq(self, line, cell=None):
return self.run(line, cell, False)

@cell_magic
def timedjsoniq(self, line, cell=None):
return self.run(line, cell, True)
1 change: 1 addition & 0 deletions tests/test_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def test1(self):
# Generally though, the results may contain zero, one, two, or more items.
python_list = items.json()
print(python_list)
self.assertTrue(json.dumps(python_list) == json.dumps((2,)))

############################################
##### More complex, standalone queries #####
Expand Down