Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 40 additions & 62 deletions openwpm_utils/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,68 +8,7 @@
from pyarrow.filesystem import S3FSWrapper # noqa
from pyspark.sql import SQLContext


class PySparkS3Dataset(object):
def __init__(self, spark_context, s3_directory,
s3_bucket='openwpm-crawls'):
"""Helper class to load OpenWPM datasets from S3 using PySpark

Parameters
----------
spark_context
Spark context. In databricks, this is available via the `sc`
variable.
s3_directory : string
Directory within the S3 bucket in which the dataset is saved.
s3_bucket : string, optional
The bucket name on S3. Defaults to `openwpm-crawls`.
"""
self._s3_bucket = s3_bucket
self._s3_directory = s3_directory
self._spark_context = spark_context
self._sql_context = SQLContext(spark_context)
self._s3_table_loc = "s3a://%s/%s/visits/%%s/" % (
s3_bucket, s3_directory)
self._s3_content_loc = "s3a://%s/%s/content/%%s.gz" % (
s3_bucket, s3_directory)

def read_table(self, table_name, columns=None):
"""Read `table_name` from OpenWPM dataset into a pyspark dataframe.

Parameters
----------
table_name : string
OpenWPM table to read
columns : list of strings
The set of columns to filter the parquet dataset by
"""
table = self._sql_context.read.parquet(self._s3_table_loc % table_name)
if columns is not None:
return table.select(columns)
return table

def read_content(self, content_hash):
"""Read the content corresponding to `content_hash`.

NOTE: This can only be run in the driver process since it requires
access to the spark context
"""
return self._spark_context.textFile(
self._s3_content_loc % content_hash)

def collect_content(self, content_hash, beautify=False):
"""Collect content for `content_hash` to driver

NOTE: This can only be run in the driver process since it requires
access to the spark context
"""
content = ''.join(self.read_content(content_hash).collect())
if beautify:
return jsbeautifier.beautify(content)
return content


class S3Dataset(object):
class S3Dataset:
def __init__(self, s3_directory, s3_bucket='openwpm-crawls'):
"""Helper class to load OpenWPM datasets from S3 using pandas

Expand Down Expand Up @@ -134,3 +73,42 @@ def collect_content(self, content_hash, beautify=False):
except IndexError:
pass
return content

class PySparkS3Dataset(S3Dataset):
def __init__(self, spark_context, s3_directory,
s3_bucket='openwpm-crawls'):
"""Helper class to load OpenWPM datasets from S3 using PySpark

Parameters
----------
spark_context
Spark context. In databricks, this is available via the `sc`
variable.
s3_directory : string
Directory within the S3 bucket in which the dataset is saved.
s3_bucket : string, optional
The bucket name on S3. Defaults to `openwpm-crawls`.
"""
self._s3_bucket = s3_bucket
self._s3_directory = s3_directory
self._spark_context = spark_context
self._sql_context = SQLContext(spark_context)
self._s3_table_loc = "s3a://%s/%s/visits/%%s/" % (
s3_bucket, s3_directory)
self._s3_content_loc = "s3a://%s/%s/content/%%s.gz" % (
s3_bucket, s3_directory)

def read_table(self, table_name, columns=None):
"""Read `table_name` from OpenWPM dataset into a pyspark dataframe.

Parameters
----------
table_name : string
OpenWPM table to read
columns : list of strings
The set of columns to filter the parquet dataset by
"""
table = self._sql_context.read.parquet(self._s3_table_loc % table_name)
if columns is not None:
return table.select(columns)
return table