diff --git a/README.md b/README.md index e02af4f..32ef89e 100644 --- a/README.md +++ b/README.md @@ -30,12 +30,14 @@ JSONiq queries are invoked with rumble.jsoniq() in a way similar to the way Spar JSONiq variables can be bound to lists of JSON values (str, int, float, True, False, None, dict, list) or to Pyspark DataFrames. A JSONiq query can use as many variables as needed (for example, it can join between different collections). -It will later also be possible to read tables registered in the Hive metastore, similar to spark.sql(). Alternatively, the JSONiq query can also read many files of many different formats from many places (local drive, HTTP, S3, HDFS, ...) directly with simple builtin function calls such as json-lines(), text-file(), parquet-file(), csv-file(), etc. See [RumbleDB's documentation](https://rumble.readthedocs.io/en/latest/). +It will later also be possible to read tables registered in the Hive metastore, similar to spark.sql(). Alternatively, the JSONiq query can also read many files of many different formats from many places (local drive, HTTP, S3, HDFS, ...) directly with simple builtin function calls such as json-lines(), text-file(), parquet-file(), csv-file(), etc. See [RumbleDB's documentation](https://docs.rumbledb.org/writing-jsoniq-queries-in-python). The resulting sequence of items can be retrieved as a list of JSON values, as a Pyspark DataFrame, or, for advanced users, as an RDD or with a streaming iteration over the items using the [RumbleDB Item API](https://github.com/RumbleDB/rumble/blob/master/src/main/java/org/rumbledb/api/Item.java). It is also possible to write the sequence of items to the local disk, to HDFS, to S3, etc in a way similar to how DataFrames are written back by Pyspark. +The library also contains a jsoniq magic that allows you to directly write JSONiq queries in a Jupyter notebook and see the results automatically output on the screen. + The design goal is that it is possible to chain DataFrames between JSONiq and Spark SQL queries seamlessly. For example, JSONiq can be used to clean up very messy data and turn it into a clean DataFrame, which can then be processed with Spark SQL, spark.ml, etc. Any feedback or error reports are very welcome. @@ -345,7 +347,12 @@ seq.write().mode("overwrite").text("outputtext"); Even more queries can be found [here](https://colab.research.google.com/github/RumbleDB/rumble/blob/master/RumbleSandbox.ipynb) and you can look at the [JSONiq documentation](https://www.jsoniq.org) and tutorials. -# Last updates +# Latest updates + +## Version 0.2.0 alpha 2 +- You can change the result size cap through to the now accessible Rumble configuration (for example rumble .getRumbleConf().setResultSizeCap(10)). This controls how many items can be retrieved at most with a json() call. You can increase it to whichever number you would like if you reach the cap. +- Add the JSONiq magic to execute JSONiq queries directly in a notebook cell, using the RumbleDB instance shipped with the library. +- RumbleSession.builder.getOrCreate() now correctly reuses an existing session instead of creating a new object. It preserves the configuration. ## Version 0.2.0 alpha 1 - Allow to bind JSONiq variables to pandas dataframes diff --git a/pyproject.toml b/pyproject.toml index 857b96d..b99a64e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "jsoniq" -version = "0.2.0a1" +version = "0.2.0a2" description = "Python edition of RumbleDB, a JSONiq engine" requires-python = ">=3.11" dependencies = [ diff --git a/src/jsoniq/__init__.py b/src/jsoniq/__init__.py index e30260f..323dbeb 100644 --- a/src/jsoniq/__init__.py +++ b/src/jsoniq/__init__.py @@ -1,3 +1,3 @@ from jsoniq.session import RumbleSession -__all__ = ["RumbleSession"] \ No newline at end of file +__all__ = ["RumbleSession"] diff --git a/src/jsoniq/jars/rumbledb-1.24.0.jar b/src/jsoniq/jars/rumbledb-1.24.0.jar index e7f1ada..f87e7b7 100644 Binary files a/src/jsoniq/jars/rumbledb-1.24.0.jar and b/src/jsoniq/jars/rumbledb-1.24.0.jar differ diff --git a/src/jsoniq/sequence.py b/src/jsoniq/sequence.py index b0bdc5e..2aa5b46 100644 --- a/src/jsoniq/sequence.py +++ b/src/jsoniq/sequence.py @@ -4,16 +4,26 @@ import json class SequenceOfItems: - def __init__(self, sequence, sparksession): + def __init__(self, sequence, rumblesession): self._jsequence = sequence - self._sparkcontext = sparksession.sparkContext - self._sparksession = sparksession + self._rumblesession = rumblesession + self._sparksession = rumblesession._sparksession + self._sparkcontext = self._sparksession.sparkContext + + def items(self): + return self.getAsList() + + def take(self, n): + return tuple(self.getFirstItemsAsList(n)) + + def first(self): + return tuple(self.getFirstItemsAsList(self._rumblesession.getRumbleConf().getResultSizeCap())) def json(self): - return tuple([json.loads(l.serializeAsJSON()) for l in self._jsequence.items()]) + return tuple([json.loads(l.serializeAsJSON()) for l in self._jsequence.getAsList()]) def rdd(self): - rdd = self._jsequence.getAsPickledStringRDD(); + rdd = self._jsequence.getAsPickledStringRDD() rdd = RDD(rdd, self._sparkcontext) return rdd.map(lambda l: json.loads(l)) @@ -22,7 +32,10 @@ def df(self): def pdf(self): return self.df().toPandas() - + + def count(self): + return self._jsequence.count() + def nextJSON(self): return self._jsequence.next().serializeAsJSON() diff --git a/src/jsoniq/session.py b/src/jsoniq/session.py index b1e8568..f4c03fe 100644 --- a/src/jsoniq/session.py +++ b/src/jsoniq/session.py @@ -22,6 +22,9 @@ def __init__(self, spark_session: SparkSession): self._sparksession = spark_session self._jrumblesession = spark_session._jvm.org.rumbledb.api.Rumble(spark_session._jsparkSession) + def getRumbleConf(self): + return self._jrumblesession.getConfiguration() + class Builder: def __init__(self): @@ -60,8 +63,18 @@ def __init__(self): self._sparkbuilder = SparkSession.builder.config("spark.jars", jar_path_str) def getOrCreate(self): - return RumbleSession(self._sparkbuilder.getOrCreate()) + if RumbleSession._rumbleSession is None: + RumbleSession._rumbleSession = RumbleSession(self._sparkbuilder.getOrCreate()) + return RumbleSession._rumbleSession + def create(self): + RumbleSession._rumbleSession = RumbleSession(self._sparkbuilder.create()) + return RumbleSession._rumbleSession + + def remote(self, spark_url): + self._sparkbuilder = self._sparkbuilder.remote(spark_url) + return self + def appName(self, name): self._sparkbuilder = self._sparkbuilder.appName(name); return self; @@ -83,6 +96,7 @@ def __getattr__(self, name): return res; _builder = Builder() + _rumbleSession = None def convert(self, value): if isinstance(value, tuple): @@ -155,7 +169,7 @@ def bindDataFrameAsVariable(self, name: str, df): def jsoniq(self, str): sequence = self._jrumblesession.runQuery(str); - return SequenceOfItems(sequence, self._sparksession); + return SequenceOfItems(sequence, self); def __getattr__(self, item): return getattr(self._sparksession, item) \ No newline at end of file diff --git a/src/jsoniqmagic/__init__.py b/src/jsoniqmagic/__init__.py new file mode 100644 index 0000000..2c4ccce --- /dev/null +++ b/src/jsoniqmagic/__init__.py @@ -0,0 +1,9 @@ +from jsoniq.session import RumbleSession +from jsoniqmagic.magic import JSONiqMagic + +__all__ = ["JSONiqMagic"] + +def load_ipython_extension(ipython): + rumble = RumbleSession.builder.getOrCreate(); + rumble.getRumbleConf().setResultSizeCap(10); + ipython.register_magics(JSONiqMagic) \ No newline at end of file diff --git a/src/jsoniqmagic/magic.py b/src/jsoniqmagic/magic.py new file mode 100644 index 0000000..ef02ee7 --- /dev/null +++ b/src/jsoniqmagic/magic.py @@ -0,0 +1,55 @@ +from IPython.core.magic import Magics, cell_magic, magics_class +import time, json +from jsoniq.session import RumbleSession +from py4j.protocol import Py4JJavaError + +@magics_class +class JSONiqMagic(Magics): + def run(self, line, cell=None, timed=False): + if cell is None: + data = line + else: + data = cell + + start = time.time() + try: + rumble = RumbleSession.builder.getOrCreate(); + response = rumble.jsoniq(data); + except Py4JJavaError as e: + print(e.java_exception.getMessage()) + return + except Exception as e: + print("Query unsuccessful.") + print("Usual reasons: firewall, misconfigured proxy.") + print("Error message:") + print(e.args[0]) + return + except: + print("Query unsuccessful.") + print("Usual reasons: firewall, misconfigured proxy.") + return + end = time.time() + if(timed): + print("Response time: %s ms" % (end - start)) + + if ("DataFrame" in response.availableOutputs()): + print(response.pdf()) + elif ("Local" in response.availableOutputs()): + capplusone = response.take(rumble.getRumbleConf().getResultSizeCap() + 1) + if len(capplusone) > rumble.getRumbleConf().getResultSizeCap(): + count = response.count() + print("The query output %s items, which is too many to display. Displaying the first %s items:" % (count, rumble.getRumbleConf().getResultSizeCap())) + for e in capplusone[:rumble.getRumbleConf().getResultSizeCap()]: + print(json.dumps(json.loads(e.serializeAsJSON()), indent=2)) + elif ("PUL" in response.availableOutputs()): + print("The query output a Pending Update List.") + else: + print("No output available.") + + @cell_magic + def jsoniq(self, line, cell=None): + return self.run(line, cell, False) + + @cell_magic + def timedjsoniq(self, line, cell=None): + return self.run(line, cell, True) \ No newline at end of file diff --git a/tests/test_iterator_bug.py b/tests/test_iterator_bug.py new file mode 100644 index 0000000..24cb4df --- /dev/null +++ b/tests/test_iterator_bug.py @@ -0,0 +1,22 @@ +from jsoniq import RumbleSession +from unittest import TestCase +import json +class TryTesting(TestCase): + def test1(self): + # The syntax to start a session is similar to that of Spark. + # A RumbleSession is a SparkSession that additionally knows about RumbleDB. + # All attributes and methods of SparkSession are also available on RumbleSession. + rumble = RumbleSession.builder.appName("PyRumbleExample").getOrCreate(); + # A more complex, standalone query + + seq = rumble.jsoniq(""" + max( + let $path := "http://www.rumbledb.org/samples/git-archive-small.json" + for $event in json-lines($path) + return 1 + ) + """); + + expected = [1] + + self.assertTrue(json.dumps(seq.json()) == json.dumps(expected)) diff --git a/tests/test_sample.py b/tests/test_sample.py new file mode 100644 index 0000000..3acb22e --- /dev/null +++ b/tests/test_sample.py @@ -0,0 +1,268 @@ +from jsoniq import RumbleSession +from unittest import TestCase +import json +import pandas as pd + +class TryTesting(TestCase): + def test1(self): + + # The syntax to start a session is similar to that of Spark. + # A RumbleSession is a SparkSession that additionally knows about RumbleDB. + # All attributes and methods of SparkSession are also available on RumbleSession. + + rumble = RumbleSession.builder.getOrCreate(); + + # Just to improve readability when invoking Spark methods + # (such as spark.sql() or spark.createDataFrame()). + spark = rumble + + ############################## + ###### Your first query ###### + ############################## + + # Even though RumbleDB uses Spark internally, it can be used without any knowledge of Spark. + + # Executing a query is done with rumble.jsoniq() like so. A query returns a sequence + # of items, here the sequence with just the integer item 2. + items = rumble.jsoniq('1+1') + + # A sequence of items can simply be converted to a list of Python/JSON values with json(). + # Since there is only one value in the sequence output by this query, + # we get a singleton list with the integer 2. + # Generally though, the results may contain zero, one, two, or more items. + python_list = items.json() + print(python_list) + + ############################################ + ##### More complex, standalone queries ##### + ############################################ + + # JSONiq is very powerful and expressive. You will find tutorials as well as a reference on JSONiq.org. + + seq = rumble.jsoniq(""" + + let $stores := + [ + { "store number" : 1, "state" : "MA" }, + { "store number" : 2, "state" : "MA" }, + { "store number" : 3, "state" : "CA" }, + { "store number" : 4, "state" : "CA" } + ] + let $sales := [ + { "product" : "broiler", "store number" : 1, "quantity" : 20 }, + { "product" : "toaster", "store number" : 2, "quantity" : 100 }, + { "product" : "toaster", "store number" : 2, "quantity" : 50 }, + { "product" : "toaster", "store number" : 3, "quantity" : 50 }, + { "product" : "blender", "store number" : 3, "quantity" : 100 }, + { "product" : "blender", "store number" : 3, "quantity" : 150 }, + { "product" : "socks", "store number" : 1, "quantity" : 500 }, + { "product" : "socks", "store number" : 2, "quantity" : 10 }, + { "product" : "shirt", "store number" : 3, "quantity" : 10 } + ] + let $join := + for $store in $stores[], $sale in $sales[] + where $store."store number" = $sale."store number" + return { + "nb" : $store."store number", + "state" : $store.state, + "sold" : $sale.product + } + return [$join] + """); + + print(seq.json()); + + seq = rumble.jsoniq(""" + for $product in json-lines("http://rumbledb.org/samples/products-small.json", 10) + group by $store-number := $product.store-number + order by $store-number ascending + return { + "store" : $store-number, + "products" : [ distinct-values($product.product) ] + } + """); + print(seq.json()); + + ############################################################ + ###### Binding JSONiq variables to Python values ########### + ############################################################ + + # It is possible to bind a JSONiq variable to a tuple of native Python values + # and then use it in a query. + # JSONiq, variables are bound to sequences of items, just like the results of JSONiq + # queries are sequence of items. + # A Python tuple will be seamlessly converted to a sequence of items by the library. + # Currently we only support strs, ints, floats, booleans, None, lists, and dicts. + # But if you need more (like date, bytes, etc) we will add them without any problem. + # JSONiq has a rich type system. + + rumble.bind('$c', (1,2,3,4, 5, 6)) + print(rumble.jsoniq(""" + for $v in $c + let $parity := $v mod 2 + group by $parity + return { switch($parity) + case 0 return "even" + case 1 return "odd" + default return "?" : $v + } + """).json()) + + rumble.bind('$c', ([1,2,3],[4,5,6])) + print(rumble.jsoniq(""" + for $i in $c + return [ + for $j in $i + return { "foo" : $j } + ] + """).json()) + + rumble.bind('$c', ({"foo":[1,2,3]},{"foo":[4,{"bar":[1,False, None]},6]})) + print(rumble.jsoniq('{ "results" : $c.foo[[2]] }').json()) + + # It is possible to bind only one value. The it must be provided as a singleton tuple. + # This is because in JSONiq, an item is the same a sequence of one item. + rumble.bind('$c', (42,)) + print(rumble.jsoniq('for $i in 1 to $c return $i*$i').json()) + + # For convenience and code readability, you can also use bindOne(). + rumble.bindOne('$c', 42) + print(rumble.jsoniq('for $i in 1 to $c return $i*$i').json()) + + ########################################################## + ##### Binding JSONiq variables to pandas DataFrames ###### + ##### Getting the output as a Pandas DataFrame ###### + ########################################################## + + # Creating a dummy pandas dataframe + data = {'Name': ['Alice', 'Bob', 'Charlie'], + 'Age': [30,25,35]}; + pdf = pd.DataFrame(data); + + # Binding a pandas dataframe + rumble.bind('$a',pdf); + seq = rumble.jsoniq('$a.Name') + # Getting the output as a pandas dataframe + print(seq.pdf()) + + + ################################################ + ##### Using Pyspark DataFrames with JSONiq ##### + ################################################ + + # The power users can also interface our library with pyspark DataFrames. + # JSONiq sequences of items can have billions of items, and our library supports this + # out of the box: it can also run on clusters on AWS Elastic MapReduce for example. + # But your laptop is just fine, too: it will spread the computations on your cores. + # You can bind a DataFrame to a JSONiq variable. JSONiq will recognize this + # DataFrame as a sequence of object items. + + # Create a data frame also similar to Spark (but using the rumble object). + data = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]; + columns = ["Name", "Age"]; + df = spark.createDataFrame(data, columns); + + # This is how to bind a JSONiq variable to a dataframe. You can bind as many variables as you want. + rumble.bind('$a', df); + + # This is how to run a query. This is similar to spark.sql(). + # Since variable $a was bound to a DataFrame, it is automatically declared as an external variable + # and can be used in the query. In JSONiq, it is logically a sequence of objects. + res = rumble.jsoniq('$a.Name'); + + # There are several ways to collect the outputs, depending on the user needs but also + # on the query supplied. + # This returns a list containing one or several of "DataFrame", "RDD", "PUL", "Local" + # If DataFrame is in the list, df() can be invoked. + # If RDD is in the list, rdd() can be invoked. + # If Local is the list, items() or json() can be invokved, as well as the local iterator API. + modes = res.availableOutputs(); + for mode in modes: + print(mode) + + ######################################################### + ###### Manipulating DataFrames with SQL and JSONiq ###### + ######################################################### + + # If the output of the JSONiq query is structured (i.e., RumbleDB was able to detect a schema), + # then we can extract a regular data frame that can be further processed with spark.sql() or rumble.jsoniq(). + df = res.df(); + df.show(); + + # We are continuously working on the detection of schemas and RumbleDB will get better at it with them. + # JSONiq is a very powerful language and can also produce heterogeneous output "by design". Then you need + # to use rdd() instead of df(), or to collect the list of JSON values (see further down). Remember + # that availableOutputs() tells you what is at your disposal. + + # A DataFrame output by JSONiq can be reused as input to a Spark SQL query. + # (Remember that rumble is a wrapper around a SparkSession object, so you can use rumble.sql() just like spark.sql()) + df.createTempView("myview") + df2 = spark.sql("SELECT * FROM myview").toDF("name"); + df2.show(); + + # A DataFrame output by Spark SQL can be reused as input to a JSONiq query. + rumble.bind('$b', df2); + seq2 = rumble.jsoniq("for $i in 1 to 5 return $b"); + df3 = seq2.df(); + df3.show(); + + # And a DataFrame output by JSONiq can be reused as input to another JSONiq query. + rumble.bind('$b', df3); + seq3 = rumble.jsoniq("$b[position() lt 3]"); + df4 = seq3.df(); + df4.show(); + + ######################### + ##### Local access ###### + ######################### + + # This materializes the rows as items. + # The items are accessed with the RumbleDB Item API. + list = res.items(); + for result in list: + print(result.getStringValue()) + + # This streams through the items one by one + res.open(); + while (res.hasNext()): + print(res.next().getStringValue()); + res.close(); + + ################################################################################################################ + ###### Native Python/JSON Access for bypassing the Item API (but losing on the richer JSONiq type system) ###### + ################################################################################################################ + + # This method directly gets the result as JSON (dict, list, strings, ints, etc). + jlist = res.json(); + for str in jlist: + print(str); + + # This streams through the JSON values one by one. + res.open(); + while(res.hasNext()): + print(res.nextJSON()); + res.close(); + + # This gets an RDD of JSON values that can be processed by Python + rdd = res.rdd(); + print(rdd.count()); + for str in rdd.take(10): + print(str); + + ################################################### + ###### Write back to the disk (or data lake) ###### + ################################################### + + # It is also possible to write the output to a file locally or on a cluster. The API is similar to that of Spark dataframes. + # Note that it creates a directory and stores the (potentially very large) output in a sharded directory. + # RumbleDB was already tested with up to 64 AWS machines and 100s of TBs of data. + # Of course the examples below are so small that it makes more sense to process the results locally with Python, + # but this shows how GBs or TBs of data obtained from JSONiq can be written back to disk. + seq = rumble.jsoniq("$a.Name"); + seq.write().mode("overwrite").json("outputjson"); + seq.write().mode("overwrite").parquet("outputparquet"); + + seq = rumble.jsoniq("1+1"); + seq.write().mode("overwrite").text("outputtext"); + + self.assertTrue(True)