Skip to content

groupByInterval not working [Spark: 2.4.4] #5

@drahnreb

Description

@drahnreb

Two joint dfs should be grouped by a third (clock) df.
But Error is thrown:
java.lang.NoClassDefFoundError: Could not initialize class com.twosigma.flint.rdd.function.group.Intervalize$

    724         with traceback_utils.SCCallSiteSync(self._sc) as css:
    725             tsrdd = self.timeSeriesRDD.groupByInterval(clock.timeSeriesRDD, scala_key,
--> 726                                                        inclusion, rounding)
    727         return TimeSeriesDataFrame._from_tsrdd(tsrdd, self.sql_ctx)
    728 

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o3361.groupByInterval.
: java.lang.NoClassDefFoundError: Could not initialize class com.twosigma.flint.rdd.function.group.Intervalize$
	at com.twosigma.flint.rdd.OrderedRDD.intervalize(OrderedRDD.scala:560)
	at com.twosigma.flint.timeseries.TimeSeriesRDDImpl.summarizeIntervals(TimeSeriesRDD.scala:1605)
	at com.twosigma.flint.timeseries.TimeSeriesRDDImpl.groupByInterval(TimeSeriesRDD.scala:1493)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:295)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:251)
	at java.lang.Thread.run(Thread.java:748)

Steps to reproduce

from datetime import datetime
import numpy as np

import ts.flint
from ts.flint import FlintContext
flintContext = FlintContext(sqlContext)

values = {'time': [1543672800000, 1543672800100, 1543672800260],
         'F_1': [22, 38, 26],
         'F.2': [7.3, 71.3, 7.9]}

states = {'time': [1543672800100, 1543672800200, 1543672800300],
         'types': ["0", "24", "42"],
         'state': ["False", "True", "True"]}

stops = {'time': [1543672800150, 1543672800200, 1543672800300, 43672800360]}

states_pd = pd.DataFrame(states, columns=states.keys())
values_pd = pd.DataFrame(values, columns=values.keys())
stops_pd = pd.DataFrame(stops, columns=stops.keys())
states_pd['time'] = pd.to_datetime(states_pd['time'], unit='ms', origin='unix')
values_pd['time'] = pd.to_datetime(values_pd['time'], unit='ms', origin='unix')
stops_pd['time'] = pd.to_datetime(stops_pd['time'], unit='ms', origin='unix')

states_df = spark.createDataFrame(states_pd)
values_df = spark.createDataFrame(values_pd)
stops_df = spark.createDataFrame(stops_pd)

# Convert to Flint DataFrame
flint_df1, flint_states, flint_stops = [flintContext.read \
              .option("isSorted", False) \
              .option("timeColumn", 'time') \
              .option("timeUnit", 'ms') \
              .dataframe(
                # https://github.com/twosigma/flint:
                # 'To create a TimeSeriesRDD from a DataFrame, you have to make sure the DataFrame contains a column named "time" of type LongType'
                df.withColumn("time", (df.time.cast('double')*1000).cast("long"))
              ) for df in [values_df, states_df, stops_df]]

### combine data
tolerance = '100ms' #exact or '100ms'
data_joined = flint_df1.futureLeftJoin(flint_states, tolerance=tolerance)

data_joined.show()
+--------------------+---+----+-----+-----+
| time|F_1| F.2|types|state|
+--------------------+---+----+-----+-----+
| 2018-12-01 14:00:00.000| 22| 7.3| 0|False|
|2018-12-01 14:00:00.100| 38|71.3| 0|False|
|2018-12-01 14:00:00.260| 26| 7.9| 42| True|
+--------------------+---+----+-----+-----+

### cut into bins
data_binned = data_joined.groupByInterval(flint_stops.select('time'),
                                                 inclusion='begin') # automatically uses col 'time'

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions