Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ JSONiq queries are invoked with rumble.jsoniq() in a way similar to the way Spar

JSONiq variables can be bound to lists of JSON values (str, int, float, True, False, None, dict, list) or to Pyspark DataFrames. A JSONiq query can use as many variables as needed (for example, it can join between different collections).

It will later also be possible to read tables registered in the Hive metastore, similar to spark.sql(). Alternatively, the JSONiq query can also read many files of many different formats from many places (local drive, HTTP, S3, HDFS, ...) directly with simple builtin function calls such as json-lines(), text-file(), parquet-file(), csv-file(), etc. See [RumbleDB's documentation](https://rumble.readthedocs.io/en/latest/).
It will later also be possible to read tables registered in the Hive metastore, similar to spark.sql(). Alternatively, the JSONiq query can also read many files of many different formats from many places (local drive, HTTP, S3, HDFS, ...) directly with simple builtin function calls such as json-lines(), text-file(), parquet-file(), csv-file(), etc. See [RumbleDB's documentation](https://docs.rumbledb.org/writing-jsoniq-queries-in-python).

The resulting sequence of items can be retrieved as a list of JSON values, as a Pyspark DataFrame, or, for advanced users, as an RDD or with a streaming iteration over the items using the [RumbleDB Item API](https://github.com/RumbleDB/rumble/blob/master/src/main/java/org/rumbledb/api/Item.java).

It is also possible to write the sequence of items to the local disk, to HDFS, to S3, etc in a way similar to how DataFrames are written back by Pyspark.

The library also contains a jsoniq magic that allows you to directly write JSONiq queries in a Jupyter notebook and see the results automatically output on the screen.

The design goal is that it is possible to chain DataFrames between JSONiq and Spark SQL queries seamlessly. For example, JSONiq can be used to clean up very messy data and turn it into a clean DataFrame, which can then be processed with Spark SQL, spark.ml, etc.

Any feedback or error reports are very welcome.
Expand Down Expand Up @@ -345,7 +347,12 @@ seq.write().mode("overwrite").text("outputtext");

Even more queries can be found [here](https://colab.research.google.com/github/RumbleDB/rumble/blob/master/RumbleSandbox.ipynb) and you can look at the [JSONiq documentation](https://www.jsoniq.org) and tutorials.

# Last updates
# Latest updates

## Version 0.2.0 alpha 2
- You can change the result size cap through to the now accessible Rumble configuration (for example rumble .getRumbleConf().setResultSizeCap(10)). This controls how many items can be retrieved at most with a json() call. You can increase it to whichever number you would like if you reach the cap.
- Add the JSONiq magic to execute JSONiq queries directly in a notebook cell, using the RumbleDB instance shipped with the library.
- RumbleSession.builder.getOrCreate() now correctly reuses an existing session instead of creating a new object. It preserves the configuration.

## Version 0.2.0 alpha 1
- Allow to bind JSONiq variables to pandas dataframes
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "jsoniq"
version = "0.2.0a1"
version = "0.2.0a2"
description = "Python edition of RumbleDB, a JSONiq engine"
requires-python = ">=3.11"
dependencies = [
Expand Down
2 changes: 1 addition & 1 deletion src/jsoniq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from jsoniq.session import RumbleSession

__all__ = ["RumbleSession"]
__all__ = ["RumbleSession"]
Binary file modified src/jsoniq/jars/rumbledb-1.24.0.jar
Binary file not shown.
25 changes: 19 additions & 6 deletions src/jsoniq/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,26 @@
import json

class SequenceOfItems:
def __init__(self, sequence, sparksession):
def __init__(self, sequence, rumblesession):
self._jsequence = sequence
self._sparkcontext = sparksession.sparkContext
self._sparksession = sparksession
self._rumblesession = rumblesession
self._sparksession = rumblesession._sparksession
self._sparkcontext = self._sparksession.sparkContext

def items(self):
return self.getAsList()

def take(self, n):
return tuple(self.getFirstItemsAsList(n))

def first(self):
return tuple(self.getFirstItemsAsList(self._rumblesession.getRumbleConf().getResultSizeCap()))

def json(self):
return tuple([json.loads(l.serializeAsJSON()) for l in self._jsequence.items()])
return tuple([json.loads(l.serializeAsJSON()) for l in self._jsequence.getAsList()])

def rdd(self):
rdd = self._jsequence.getAsPickledStringRDD();
rdd = self._jsequence.getAsPickledStringRDD()
rdd = RDD(rdd, self._sparkcontext)
return rdd.map(lambda l: json.loads(l))

Expand All @@ -22,7 +32,10 @@ def df(self):

def pdf(self):
return self.df().toPandas()


def count(self):
return self._jsequence.count()

def nextJSON(self):
return self._jsequence.next().serializeAsJSON()

Expand Down
18 changes: 16 additions & 2 deletions src/jsoniq/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ def __init__(self, spark_session: SparkSession):
self._sparksession = spark_session
self._jrumblesession = spark_session._jvm.org.rumbledb.api.Rumble(spark_session._jsparkSession)

def getRumbleConf(self):
return self._jrumblesession.getConfiguration()

class Builder:
def __init__(self):

Expand Down Expand Up @@ -60,8 +63,18 @@ def __init__(self):
self._sparkbuilder = SparkSession.builder.config("spark.jars", jar_path_str)

def getOrCreate(self):
return RumbleSession(self._sparkbuilder.getOrCreate())
if RumbleSession._rumbleSession is None:
RumbleSession._rumbleSession = RumbleSession(self._sparkbuilder.getOrCreate())
return RumbleSession._rumbleSession

def create(self):
RumbleSession._rumbleSession = RumbleSession(self._sparkbuilder.create())
return RumbleSession._rumbleSession

def remote(self, spark_url):
self._sparkbuilder = self._sparkbuilder.remote(spark_url)
return self

def appName(self, name):
self._sparkbuilder = self._sparkbuilder.appName(name);
return self;
Expand All @@ -83,6 +96,7 @@ def __getattr__(self, name):
return res;

_builder = Builder()
_rumbleSession = None

def convert(self, value):
if isinstance(value, tuple):
Expand Down Expand Up @@ -155,7 +169,7 @@ def bindDataFrameAsVariable(self, name: str, df):

def jsoniq(self, str):
sequence = self._jrumblesession.runQuery(str);
return SequenceOfItems(sequence, self._sparksession);
return SequenceOfItems(sequence, self);

def __getattr__(self, item):
return getattr(self._sparksession, item)
9 changes: 9 additions & 0 deletions src/jsoniqmagic/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from jsoniq.session import RumbleSession
from jsoniqmagic.magic import JSONiqMagic

__all__ = ["JSONiqMagic"]

def load_ipython_extension(ipython):
rumble = RumbleSession.builder.getOrCreate();
rumble.getRumbleConf().setResultSizeCap(10);
ipython.register_magics(JSONiqMagic)
55 changes: 55 additions & 0 deletions src/jsoniqmagic/magic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from IPython.core.magic import Magics, cell_magic, magics_class
import time, json
from jsoniq.session import RumbleSession
from py4j.protocol import Py4JJavaError

@magics_class
class JSONiqMagic(Magics):
def run(self, line, cell=None, timed=False):
if cell is None:
data = line
else:
data = cell

start = time.time()
try:
rumble = RumbleSession.builder.getOrCreate();
response = rumble.jsoniq(data);
except Py4JJavaError as e:
print(e.java_exception.getMessage())
return
except Exception as e:
print("Query unsuccessful.")
print("Usual reasons: firewall, misconfigured proxy.")
print("Error message:")
print(e.args[0])
return
except:
print("Query unsuccessful.")
print("Usual reasons: firewall, misconfigured proxy.")
return
end = time.time()
if(timed):
print("Response time: %s ms" % (end - start))

if ("DataFrame" in response.availableOutputs()):
print(response.pdf())
elif ("Local" in response.availableOutputs()):
capplusone = response.take(rumble.getRumbleConf().getResultSizeCap() + 1)
if len(capplusone) > rumble.getRumbleConf().getResultSizeCap():
count = response.count()
print("The query output %s items, which is too many to display. Displaying the first %s items:" % (count, rumble.getRumbleConf().getResultSizeCap()))
for e in capplusone[:rumble.getRumbleConf().getResultSizeCap()]:
print(json.dumps(json.loads(e.serializeAsJSON()), indent=2))
elif ("PUL" in response.availableOutputs()):
print("The query output a Pending Update List.")
else:
print("No output available.")

@cell_magic
def jsoniq(self, line, cell=None):
return self.run(line, cell, False)

@cell_magic
def timedjsoniq(self, line, cell=None):
return self.run(line, cell, True)
22 changes: 22 additions & 0 deletions tests/test_iterator_bug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from jsoniq import RumbleSession
from unittest import TestCase
import json
class TryTesting(TestCase):
def test1(self):
# The syntax to start a session is similar to that of Spark.
# A RumbleSession is a SparkSession that additionally knows about RumbleDB.
# All attributes and methods of SparkSession are also available on RumbleSession.
rumble = RumbleSession.builder.appName("PyRumbleExample").getOrCreate();
# A more complex, standalone query

seq = rumble.jsoniq("""
max(
let $path := "http://www.rumbledb.org/samples/git-archive-small.json"
for $event in json-lines($path)
return 1
)
""");

expected = [1]

self.assertTrue(json.dumps(seq.json()) == json.dumps(expected))
Loading