From 33bb9a298c1f115aa8e604bce14eb9261e4bdc22 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Fri, 9 Apr 2021 12:37:41 +0200 Subject: [PATCH 01/12] Removed collect_content from PySparkS3Dataset Downloading files via the SparkContext was much slower than downloading via boto (which is what S3Dataset does. So now both classes use the same method, as PySparkS3Dataset inherits from S3Dataset --- openwpm_utils/s3.py | 102 +++++++++++++++++--------------------------- 1 file changed, 40 insertions(+), 62 deletions(-) diff --git a/openwpm_utils/s3.py b/openwpm_utils/s3.py index b533e03..5654ba8 100644 --- a/openwpm_utils/s3.py +++ b/openwpm_utils/s3.py @@ -8,68 +8,7 @@ from pyarrow.filesystem import S3FSWrapper # noqa from pyspark.sql import SQLContext - -class PySparkS3Dataset(object): - def __init__(self, spark_context, s3_directory, - s3_bucket='openwpm-crawls'): - """Helper class to load OpenWPM datasets from S3 using PySpark - - Parameters - ---------- - spark_context - Spark context. In databricks, this is available via the `sc` - variable. - s3_directory : string - Directory within the S3 bucket in which the dataset is saved. - s3_bucket : string, optional - The bucket name on S3. Defaults to `openwpm-crawls`. - """ - self._s3_bucket = s3_bucket - self._s3_directory = s3_directory - self._spark_context = spark_context - self._sql_context = SQLContext(spark_context) - self._s3_table_loc = "s3a://%s/%s/visits/%%s/" % ( - s3_bucket, s3_directory) - self._s3_content_loc = "s3a://%s/%s/content/%%s.gz" % ( - s3_bucket, s3_directory) - - def read_table(self, table_name, columns=None): - """Read `table_name` from OpenWPM dataset into a pyspark dataframe. - - Parameters - ---------- - table_name : string - OpenWPM table to read - columns : list of strings - The set of columns to filter the parquet dataset by - """ - table = self._sql_context.read.parquet(self._s3_table_loc % table_name) - if columns is not None: - return table.select(columns) - return table - - def read_content(self, content_hash): - """Read the content corresponding to `content_hash`. - - NOTE: This can only be run in the driver process since it requires - access to the spark context - """ - return self._spark_context.textFile( - self._s3_content_loc % content_hash) - - def collect_content(self, content_hash, beautify=False): - """Collect content for `content_hash` to driver - - NOTE: This can only be run in the driver process since it requires - access to the spark context - """ - content = ''.join(self.read_content(content_hash).collect()) - if beautify: - return jsbeautifier.beautify(content) - return content - - -class S3Dataset(object): +class S3Dataset: def __init__(self, s3_directory, s3_bucket='openwpm-crawls'): """Helper class to load OpenWPM datasets from S3 using pandas @@ -134,3 +73,42 @@ def collect_content(self, content_hash, beautify=False): except IndexError: pass return content + +class PySparkS3Dataset(S3Dataset): + def __init__(self, spark_context, s3_directory, + s3_bucket='openwpm-crawls'): + """Helper class to load OpenWPM datasets from S3 using PySpark + + Parameters + ---------- + spark_context + Spark context. In databricks, this is available via the `sc` + variable. + s3_directory : string + Directory within the S3 bucket in which the dataset is saved. + s3_bucket : string, optional + The bucket name on S3. Defaults to `openwpm-crawls`. + """ + self._s3_bucket = s3_bucket + self._s3_directory = s3_directory + self._spark_context = spark_context + self._sql_context = SQLContext(spark_context) + self._s3_table_loc = "s3a://%s/%s/visits/%%s/" % ( + s3_bucket, s3_directory) + self._s3_content_loc = "s3a://%s/%s/content/%%s.gz" % ( + s3_bucket, s3_directory) + + def read_table(self, table_name, columns=None): + """Read `table_name` from OpenWPM dataset into a pyspark dataframe. + + Parameters + ---------- + table_name : string + OpenWPM table to read + columns : list of strings + The set of columns to filter the parquet dataset by + """ + table = self._sql_context.read.parquet(self._s3_table_loc % table_name) + if columns is not None: + return table.select(columns) + return table From cb8a25ff1a1876bc42cc4b21a6abcaec94124cb0 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 15 Jun 2020 11:41:24 +0200 Subject: [PATCH 02/12] Added mode parameter to PySparkS3Dataset This parameter allows for filtering out VisitIds that are part of `incompleted_visits` or that had a command with a command_status other than "ok" since users probably shouldn't consider them for analysis This filtering functionality is extracted into the TableFilter class to be reused by other Datasets. --- openwpm_utils/crawlhistory.py | 28 +++++++++++++ openwpm_utils/dataquality.py | 26 +++++++++++- openwpm_utils/s3.py | 77 ++++++++++++++++++++++++----------- setup.py | 2 +- 4 files changed, 108 insertions(+), 25 deletions(-) create mode 100644 openwpm_utils/crawlhistory.py diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py new file mode 100644 index 0000000..3799f47 --- /dev/null +++ b/openwpm_utils/crawlhistory.py @@ -0,0 +1,28 @@ +import pyspark.sql.functions as F +from pyspark.sql.types import StringType + +reduce_to_worst_command_status = ( + F.when(F.array_contains("command_status", "critical"), "critical") + .when(F.array_contains("command_status", "error"), "error") + .when(F.array_contains("command_status", "neterror"), "neterror") + .when(F.array_contains("command_status", "timeout"), "timeout") + .otherwise("ok") + .alias("worst_status") +) + + +reduce_to_best_command_status = ( + F.when(F.array_contains("command_status", "ok"), "ok") + .when(F.array_contains("command_status", "timeout"), "timeout") + .when(F.array_contains("command_status", "neterror"), "neterror") + .when(F.array_contains("command_status", "error"), "error") + .otherwise("critical") + .alias("best_status") +) + + +def get_worst_status_per_visit_id(crawl_history): + """Adds column `worst_status`""" + return (crawl_history.groupBy("visit_id") + .agg(F.collect_list("command_status").alias("command_status")) + .withColumn("worst_status",reduce_to_worst_command_status)) diff --git a/openwpm_utils/dataquality.py b/openwpm_utils/dataquality.py index d4a36f9..e7a2ce8 100644 --- a/openwpm_utils/dataquality.py +++ b/openwpm_utils/dataquality.py @@ -1,5 +1,9 @@ -from pyspark.sql.functions import countDistinct, col, isnan, lit, sum, count, when +import pyspark.sql.functions as F from pyspark.mllib.stat import Statistics +from pyspark.sql.dataframe import DataFrame +from pyspark.sql.functions import col, count, countDistinct, isnan, lit, sum, when + +from openwpm_utils.crawlhistory import get_worst_status_per_visit_id def count_not_null(c, nan_as_null=False): @@ -53,3 +57,23 @@ def check_df(df, skip_null_check=True): "\nNumber of records with visit_id == -1: %d" % df.where(df.visit_id == -1).count() ) + + +class TableFilter: + def __init__(self, incomplete_visits: DataFrame, crawl_history: DataFrame) -> None: + self._incomplete_visit_ids = incomplete_visits.select("visit_id") + self._failed_visit_ids = ( + get_worst_status_per_visit_id(crawl_history) + .where(F.col("worst_status") != "ok") + .select("visit_id") + ) + + def clean_table(self, table: DataFrame) -> DataFrame: + return table.join(self._failed_visit_ids, "visit_id", how="leftanti").join( + self._incomplete_visit_ids, "visit_id", how="leftanti" + ) + + def dirty_table(self, table: DataFrame) -> DataFrame: + return table.join(self._failed_visit_ids, "visit_id", how="inner").union( + table.join(self._incomplete_visit_ids, "visit_id", how="inner") + ) diff --git a/openwpm_utils/s3.py b/openwpm_utils/s3.py index 5654ba8..3e8581c 100644 --- a/openwpm_utils/s3.py +++ b/openwpm_utils/s3.py @@ -1,15 +1,22 @@ import gzip +from typing import List import boto3 import jsbeautifier import pyarrow.parquet as pq +import pyspark.sql.functions as F import s3fs from botocore.exceptions import ClientError from pyarrow.filesystem import S3FSWrapper # noqa -from pyspark.sql import SQLContext +from pyspark import SparkContext +from pyspark.sql import DataFrame, SQLContext -class S3Dataset: - def __init__(self, s3_directory, s3_bucket='openwpm-crawls'): +from openwpm_utils.crawlhistory import get_worst_status_per_visit_id +from openwpm_utils.dataquality import TableFilter + + +class S3Dataset(object): + def __init__(self, s3_directory, s3_bucket="openwpm-crawls"): """Helper class to load OpenWPM datasets from S3 using pandas This dataset wrapper is safe to use by spark worker processes, as it @@ -38,30 +45,33 @@ def read_table(self, table_name, columns=None): columns : list of strings The set of columns to filter the parquet dataset by """ - return pq.ParquetDataset( - self._s3_table_loc % table_name, - filesystem=self._s3fs, - metadata_nthreads=4 - ).read(use_pandas_metadata=True, columns=columns).to_pandas() + return ( + pq.ParquetDataset( + self._s3_table_loc % table_name, + filesystem=self._s3fs, + metadata_nthreads=4, + ) + .read(use_pandas_metadata=True, columns=columns) + .to_pandas() + ) def collect_content(self, content_hash, beautify=False): """Collect content by directly connecting to S3 via boto3""" - s3 = boto3.client('s3') + s3 = boto3.client("s3") try: obj = s3.get_object( - Bucket=self._s3_bucket, - Key=self._content_key % content_hash + Bucket=self._s3_bucket, Key=self._content_key % content_hash ) body = obj["Body"] compressed_content = body.read() body.close() except ClientError as e: - if e.response['Error']['Code'] != 'NoSuchKey': + if e.response["Error"]["Code"] != "NoSuchKey": raise else: return None - with gzip.GzipFile(fileobj=compressed_content, mode='r') as f: + with gzip.GzipFile(fileobj=compressed_content, mode="r") as f: content = f.read() if content is None or content == "": @@ -74,9 +84,11 @@ def collect_content(self, content_hash, beautify=False): pass return content + class PySparkS3Dataset(S3Dataset): - def __init__(self, spark_context, s3_directory, - s3_bucket='openwpm-crawls'): + def __init__( + self, spark_context, s3_directory: str, s3_bucket: str = "openwpm-crawls" + ): """Helper class to load OpenWPM datasets from S3 using PySpark Parameters @@ -89,16 +101,17 @@ def __init__(self, spark_context, s3_directory, s3_bucket : string, optional The bucket name on S3. Defaults to `openwpm-crawls`. """ - self._s3_bucket = s3_bucket - self._s3_directory = s3_directory + super().__init__(s3_directory, s3_bucket) self._spark_context = spark_context self._sql_context = SQLContext(spark_context) - self._s3_table_loc = "s3a://%s/%s/visits/%%s/" % ( - s3_bucket, s3_directory) - self._s3_content_loc = "s3a://%s/%s/content/%%s.gz" % ( - s3_bucket, s3_directory) + self._s3_table_loc = f"s3a://{self._s3_table_loc}" + incomplete_visits = self.read_table("incomplete_visits", mode="all") + crawl_history = self.read_table("crawl_history", mode="all") + self._filter = TableFilter(incomplete_visits, crawl_history) - def read_table(self, table_name, columns=None): + def read_table( + self, table_name: str, columns: List[str] = None, mode: str = "successful" + ): """Read `table_name` from OpenWPM dataset into a pyspark dataframe. Parameters @@ -107,8 +120,26 @@ def read_table(self, table_name, columns=None): OpenWPM table to read columns : list of strings The set of columns to filter the parquet dataset by + mode : string + The valid values are "successful", "failed", "all" + Success is determined per visit_id. A visit_id is failed + if one of it's commands failed or if it's in the interrupted table """ table = self._sql_context.read.parquet(self._s3_table_loc % table_name) + + if mode == "all": + table = table + elif mode == "failed": + table = self._filter.dirty_table(table) + elif mode == "successful": + table = self._filter.clean_table(table) + else: + raise AssertionError( + f"Mode was ${mode}," + "allowed modes are 'all', 'failed' and 'successful'" + ) + if columns is not None: - return table.select(columns) + table = table.select(columns) + return table diff --git a/setup.py b/setup.py index 8e7bbee..9ba38d6 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ name='openwpm-utils', license='MPL 2.0', url='https://github.com/mozilla/openwpm-utils', - version='0.2.0', + version='0.3.0', packages=['openwpm_utils'], # Dependencies From c43cee8c40de8506245a00f0411f8a8d11458269 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 15 Jun 2020 11:57:44 +0200 Subject: [PATCH 03/12] Added display_crawl_results --- openwpm_utils/crawlhistory.py | 85 +++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py index 3799f47..34eca5b 100644 --- a/openwpm_utils/crawlhistory.py +++ b/openwpm_utils/crawlhistory.py @@ -1,5 +1,7 @@ import pyspark.sql.functions as F from pyspark.sql.types import StringType +import json + reduce_to_worst_command_status = ( F.when(F.array_contains("command_status", "critical"), "critical") @@ -26,3 +28,86 @@ def get_worst_status_per_visit_id(crawl_history): return (crawl_history.groupBy("visit_id") .agg(F.collect_list("command_status").alias("command_status")) .withColumn("worst_status",reduce_to_worst_command_status)) + + +# TODO: This needs a name that expresses that we are giving general stats about the +# way the crawl ran and not it's specific data +def display_crawl_results(crawl_history, interrupted_visits): + """ + Analyze crawl_history and interrupted_visits to display general + success statistics + This function should be given the all entries in the crawl_history and + interrupted_visits tableq + """ + crawl_history.groupBy("command").count().show() + + total_num_command_sequences = crawl_history.groupBy("visit_id").count() + visit_id_and_worst_status = get_worst_status_per_visit_id(crawl_history) + print( + "Percentage of command_sequence that didn't complete successfully %0.2f%%" + % ( + visit_id_and_worst_status.where(F.col("worst_status") != "ok").count() + / float(total_num_command_sequences) + * 100 + ) + ) + net_error_count = visit_id_and_worst_status.where( + F.col("worst_status") == "neterror" + ).count() + print( + "There were a total of %d neterrors(%0.2f%% of the all command_sequences)" + % (net_error_count, net_error_count / float(total_num_command_sequences) * 100) + ) + timeout_count = visit_id_and_worst_status.where( + F.col("worst_status") == "timeout" + ).count() + print( + "There were a total of %d timeouts(%0.2f%% of the all command_sequences)" + % (timeout_count, timeout_count / float(total_num_command_sequences) * 100) + ) + + error_count = visit_id_and_worst_status.where( + F.col("worst_status") == "error" + ).count() + print( + "There were a total of %d errors(%0.2f%% of the all command_sequences)" + % (error_count, error_count / float(total_num_command_sequences) * 100) + ) + + print( + f"A total of ${interrupted_visits.count()} were interrupted." + f"This represents ${interrupted_visits.count()/ float(total_num_command_sequences)* 100} % of the entire crawl" + ) + + def extract_website_from_arguments(arguments): + """Given the arguments of a get_command this function returns which website was visited""" + return json.loads(arguments)["url"] + + udf_extract_website_from_arguments = F.udf( + extract_website_from_arguments, StringType() + ) + + visit_id_to_website = crawl_history.where( + F.col("command") == "GetCommand" + ).withColumn("website", udf_extract_website_from_arguments("arguments")) + visit_id_to_website = visit_id_to_website[["visit_id", "website"]] + + visit_id_website_status = visit_id_and_worst_status.join( + visit_id_to_website, "visit_id" + ) + multiple_successes = ( + visit_id_website_status.where(F.col("worst_status") == "ok") + .join(interrupted_visits, "visit_id", how="leftanti") + .groupBy("website") + .count() + .filter("count > 1") + .orderBy(F.desc("count")) + ) + + print( + f"There were {multiple_successes.count()} websites that were successfully visited multiple times" + ) + multiple_successes.groupBy( + F.col("count").alias("Number of successes") + ).count().show() + multiple_successes.filter("count > 2").show() From 5925ac9f9e48411d1377e75f9020c4db216d5719 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 15 Jun 2020 15:02:54 +0200 Subject: [PATCH 04/12] Rewrote crawlhistory.py --- openwpm_utils/crawlhistory.py | 59 ++++++++++++++++++++++++++++++----- setup.cfg | 3 ++ 2 files changed, 54 insertions(+), 8 deletions(-) diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py index 34eca5b..f445a4d 100644 --- a/openwpm_utils/crawlhistory.py +++ b/openwpm_utils/crawlhistory.py @@ -1,7 +1,7 @@ -import pyspark.sql.functions as F -from pyspark.sql.types import StringType import json +import pyspark.sql.functions as F +from pyspark.sql.types import StringType reduce_to_worst_command_status = ( F.when(F.array_contains("command_status", "critical"), "critical") @@ -30,9 +30,7 @@ def get_worst_status_per_visit_id(crawl_history): .withColumn("worst_status",reduce_to_worst_command_status)) -# TODO: This needs a name that expresses that we are giving general stats about the -# way the crawl ran and not it's specific data -def display_crawl_results(crawl_history, interrupted_visits): +def display_crawl_history_per_command_sequence(crawl_history, interrupted_visits): """ Analyze crawl_history and interrupted_visits to display general success statistics @@ -41,7 +39,8 @@ def display_crawl_results(crawl_history, interrupted_visits): """ crawl_history.groupBy("command").count().show() - total_num_command_sequences = crawl_history.groupBy("visit_id").count() + # Analyzing status per command_sequence + total_num_command_sequences = crawl_history.groupBy("visit_id").count().count() visit_id_and_worst_status = get_worst_status_per_visit_id(crawl_history) print( "Percentage of command_sequence that didn't complete successfully %0.2f%%" @@ -75,10 +74,15 @@ def display_crawl_results(crawl_history, interrupted_visits): ) print( - f"A total of ${interrupted_visits.count()} were interrupted." - f"This represents ${interrupted_visits.count()/ float(total_num_command_sequences)* 100} % of the entire crawl" + f"A total of {interrupted_visits.count()} command_sequences were interrupted." + f"This represents {interrupted_visits.count()/ float(total_num_command_sequences)* 100:.2f} % of the entire crawl" ) + +def display_crawl_history_per_website(crawl_history, interrupted_visits): + # Analyzing status per website + visit_id_and_worst_status = get_worst_status_per_visit_id(crawl_history) + def extract_website_from_arguments(arguments): """Given the arguments of a get_command this function returns which website was visited""" return json.loads(arguments)["url"] @@ -90,11 +94,50 @@ def extract_website_from_arguments(arguments): visit_id_to_website = crawl_history.where( F.col("command") == "GetCommand" ).withColumn("website", udf_extract_website_from_arguments("arguments")) + visit_id_to_website = visit_id_to_website[["visit_id", "website"]] visit_id_website_status = visit_id_and_worst_status.join( visit_id_to_website, "visit_id" ) + best_status_per_website = visit_id_website_status.groupBy("website").agg( + udf_reduce_to_best_command_status(F.collect_list("worst_status")).alias( + "best_status" + ) + ) + total_number_websites = best_status_per_website.count() + print(f"There was an attempt to visit a total of {total_number_websites} websites") + + print( + "Percentage of websites that didn't complete successfully %0.2f%%" + % ( + best_status_per_website.where(F.col("best_status") != "ok").count() + / float(total_number_websites) + * 100 + ) + ) + net_error_count = best_status_per_website.where( + F.col("best_status") == "neterror" + ).count() + print( + "There were a total of %d neterrors (%0.2f%% of the all websites)" + % (net_error_count, net_error_count / float(total_number_websites) * 100) + ) + timeout_count = best_status_per_website.where( + F.col("best_status") == "timeout" + ).count() + print( + "There were a total of %d timeouts (%0.2f%% of the all websites)" + % (timeout_count, timeout_count / float(total_number_websites) * 100) + ) + + error_count = best_status_per_website.where(F.col("best_status") == "error").count() + + print( + "There were a total of %d errors (%0.2f%% of the websites)" + % (error_count, error_count / float(total_number_websites) * 100) + ) + multiple_successes = ( visit_id_website_status.where(F.col("worst_status") == "ok") .join(interrupted_visits, "visit_id", how="leftanti") diff --git a/setup.cfg b/setup.cfg index d93c4f5..3162fbc 100644 --- a/setup.cfg +++ b/setup.cfg @@ -4,3 +4,6 @@ test=pytest [tool:pytest] addopts = --flake8 -rw testpaths = tests + +[flake8] +max-line-length = 88 \ No newline at end of file From 5639239a38e070461118e7ceefa940cb1367603f Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 13 Jul 2020 16:36:41 +0200 Subject: [PATCH 05/12] Used typeannotations --- openwpm_utils/crawlhistory.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py index f445a4d..68bae8d 100644 --- a/openwpm_utils/crawlhistory.py +++ b/openwpm_utils/crawlhistory.py @@ -35,7 +35,7 @@ def display_crawl_history_per_command_sequence(crawl_history, interrupted_visits Analyze crawl_history and interrupted_visits to display general success statistics This function should be given the all entries in the crawl_history and - interrupted_visits tableq + interrupted_visits table """ crawl_history.groupBy("command").count().show() @@ -100,10 +100,10 @@ def extract_website_from_arguments(arguments): visit_id_website_status = visit_id_and_worst_status.join( visit_id_to_website, "visit_id" ) - best_status_per_website = visit_id_website_status.groupBy("website").agg( - udf_reduce_to_best_command_status(F.collect_list("worst_status")).alias( - "best_status" - ) + best_status_per_website = ( + visit_id_website_status.groupBy("website") + .agg(F.collect_list("command_status").alias("command_status")) + .withColumn("best_status",reduce_to_best_command_status) ) total_number_websites = best_status_per_website.count() print(f"There was an attempt to visit a total of {total_number_websites} websites") From 2312e0eff4c604cd86d5971fa8e48d5ce45a4e98 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 27 Jul 2020 15:38:42 +0200 Subject: [PATCH 06/12] Fixing display_crawl_history --- openwpm_utils/crawlhistory.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py index 68bae8d..3538b3d 100644 --- a/openwpm_utils/crawlhistory.py +++ b/openwpm_utils/crawlhistory.py @@ -102,10 +102,12 @@ def extract_website_from_arguments(arguments): ) best_status_per_website = ( visit_id_website_status.groupBy("website") - .agg(F.collect_list("command_status").alias("command_status")) + .agg(F.collect_list("worst_status").alias("command_status")) .withColumn("best_status",reduce_to_best_command_status) ) + total_number_websites = best_status_per_website.count() + print(f"There was an attempt to visit a total of {total_number_websites} websites") print( @@ -153,4 +155,6 @@ def extract_website_from_arguments(arguments): multiple_successes.groupBy( F.col("count").alias("Number of successes") ).count().show() + + print("A list of all websites that where successfully visited more than twice:") multiple_successes.filter("count > 2").show() From 3deea98bd36ad22337138a46259907f9ea32fff6 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 27 Jul 2020 16:29:06 +0200 Subject: [PATCH 07/12] Added docstrings --- openwpm_utils/crawlhistory.py | 43 ++++++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py index 3538b3d..1f8eca1 100644 --- a/openwpm_utils/crawlhistory.py +++ b/openwpm_utils/crawlhistory.py @@ -31,11 +31,24 @@ def get_worst_status_per_visit_id(crawl_history): def display_crawl_history_per_command_sequence(crawl_history, interrupted_visits): - """ - Analyze crawl_history and interrupted_visits to display general - success statistics - This function should be given the all entries in the crawl_history and - interrupted_visits table + """ Analyzes crawl_history and interrupted_visits to display general + success statistics grouped by command_sequence + + Parameters + ---------- + crawl_history: dataframe + The full ``crawl_history`` dataframe + interrupted_visits: dataframe + The full ``interrupted_visits`` dataframe + + Examples + -------- + >>> from openwpm_utils.s3 import PySparkS3Dataset + >>> dataset = PySparkS3Dataset(sc, s3_directory=DB, s3_bucket=S3_BUCKET) + >>> crawl_history = dataset.read_table('crawl_history', mode="all") + >>> incomplete = dataset.read_table('incomplete_visits', mode="all") + >>> display_crawl_history_per_command_sequence(crawl_history, incomplete) + """ crawl_history.groupBy("command").count().show() @@ -80,7 +93,25 @@ def display_crawl_history_per_command_sequence(crawl_history, interrupted_visits def display_crawl_history_per_website(crawl_history, interrupted_visits): - # Analyzing status per website + """ Analyzes crawl_history and interrupted_visits to display general + success statistics grouped by website + + Parameters + ---------- + crawl_history: dataframe + The full ``crawl_history`` dataframe + interrupted_visits: dataframe + The full ``interrupted_visits`` dataframe + + Examples + -------- + >>> from openwpm_utils.s3 import PySparkS3Dataset + >>> dataset = PySparkS3Dataset(sc, s3_directory=DB, s3_bucket=S3_BUCKET) + >>> crawl_history = dataset.read_table('crawl_history', mode="all") + >>> incomplete = dataset.read_table('incomplete_visits', mode="all") + >>> display_crawl_history_per_website(crawl_history, incomplete) + + """ visit_id_and_worst_status = get_worst_status_per_visit_id(crawl_history) def extract_website_from_arguments(arguments): From cb195113f45d94959d22c960dbe89131329f5ea9 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Fri, 19 Mar 2021 12:05:28 +0100 Subject: [PATCH 08/12] Added demo file --- tests/test_crawlhistory.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 tests/test_crawlhistory.py diff --git a/tests/test_crawlhistory.py b/tests/test_crawlhistory.py new file mode 100644 index 0000000..bd10f58 --- /dev/null +++ b/tests/test_crawlhistory.py @@ -0,0 +1,26 @@ +from openwpm_utils.crawlhistory import get_worst_status_per_visit_id, reduce_to_worst_command_status + +from collections import namedtuple +import pyspark as spark + +srow = namedtuple('simple_row', 'visit_id command_status'.split()) +data = [ + srow('1', "critical"), + srow('1', "ok"), + srow('2', "ok"), + srow('3', "neterror"), + srow('3', "timeout") +] +data2 = [ + srow('1', ["ok", "critical"]), + srow('2', ["ok"]), + srow('3', ["timeout", "neterror"]), +] + +test_df = spark.createDataFrame(data) +test_df.printSchema() +test_df2 = spark.createDataFrame(data2) +test_df2.printSchema() + + +display(get_worst_status_per_visit_id(test_df)) \ No newline at end of file From 247adea45916af778ecb482a5eb760aa1b5399f4 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Fri, 9 Apr 2021 09:45:58 +0200 Subject: [PATCH 09/12] Backporting from next --- openwpm_utils/s3.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/openwpm_utils/s3.py b/openwpm_utils/s3.py index 3e8581c..2242a9e 100644 --- a/openwpm_utils/s3.py +++ b/openwpm_utils/s3.py @@ -15,8 +15,8 @@ from openwpm_utils.dataquality import TableFilter -class S3Dataset(object): - def __init__(self, s3_directory, s3_bucket="openwpm-crawls"): +class S3Dataset: + def __init__(self, s3_directory: str, s3_bucket: str = "openwpm-crawls"): """Helper class to load OpenWPM datasets from S3 using pandas This dataset wrapper is safe to use by spark worker processes, as it @@ -87,8 +87,11 @@ def collect_content(self, content_hash, beautify=False): class PySparkS3Dataset(S3Dataset): def __init__( - self, spark_context, s3_directory: str, s3_bucket: str = "openwpm-crawls" - ): + self, + spark_context: SparkContext, + s3_directory: str, + s3_bucket: str = "openwpm-crawls", + ) -> None: """Helper class to load OpenWPM datasets from S3 using PySpark Parameters @@ -111,7 +114,7 @@ def __init__( def read_table( self, table_name: str, columns: List[str] = None, mode: str = "successful" - ): + ) -> DataFrame: """Read `table_name` from OpenWPM dataset into a pyspark dataframe. Parameters @@ -126,7 +129,6 @@ def read_table( if one of it's commands failed or if it's in the interrupted table """ table = self._sql_context.read.parquet(self._s3_table_loc % table_name) - if mode == "all": table = table elif mode == "failed": From 2a3b94c52a5f6a6d4760171d214833a3396fca1d Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 15 Mar 2021 15:35:03 +0100 Subject: [PATCH 10/12] Updated get_option_dict (#22) * updated get_option_dict * Verified mapping * Getting blockers to work without files * Wrapped get_matching rules to bind blockers in scope --- openwpm_utils/blocklist.py | 1 - 1 file changed, 1 deletion(-) diff --git a/openwpm_utils/blocklist.py b/openwpm_utils/blocklist.py index 3324342..47dbc03 100644 --- a/openwpm_utils/blocklist.py +++ b/openwpm_utils/blocklist.py @@ -86,7 +86,6 @@ def get_option_dict(url, top_level_url, resource_type=None): options["third-party"] = True return options - def prepare_get_matching_rules(blockers: List[BlockListParser]): def get_matching_rules(url, top_level_url, resource_type): # skip top-level requests From d7c3dec922d97feebf55141150c9cb3f25cc42fe Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Fri, 19 Mar 2021 16:21:06 +0100 Subject: [PATCH 11/12] GcsDataset implementation --- openwpm_utils/gcs.py | 136 +++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 6 +- 2 files changed, 140 insertions(+), 2 deletions(-) create mode 100644 openwpm_utils/gcs.py diff --git a/openwpm_utils/gcs.py b/openwpm_utils/gcs.py new file mode 100644 index 0000000..0d7f6d5 --- /dev/null +++ b/openwpm_utils/gcs.py @@ -0,0 +1,136 @@ +from typing import List, Optional, Union + +import gcsfs +from google.api_core.exceptions import NotFound +import jsbeautifier +import pyarrow.parquet as pq +import pyspark.sql.functions as F +from google.cloud import storage +from pyspark.context import SparkContext +from pyspark.sql import SQLContext + +from openwpm_utils.crawlhistory import get_worst_status_per_visit_id + + +class GCSDataset(object): + def __init__(self, base_dir: str, bucket:Optional[str]="openwpm-data", **kwargs) -> None: + """Helper class to load OpenWPM datasets from GCS using pandas + + This dataset wrapper is safe to use by spark worker processes, as it + does not require the spark context. + + Parameters + ---------- + base_dir + Directory within the GCS bucket in which the dataset is saved. + bucket + The bucket name on GCS. + **kwargs + Passed on to GCSFS so you can customize it to your needs + """ + self._kwargs = kwargs + self._bucket = bucket + self._base_dir = base_dir + self._table_location_format_string = f"{bucket}/{base_dir}/visits/%s" + self._content_key = f"{base_dir}/content/%s.gz" + self._gcsfs = gcsfs.GCSFileSystem(**kwargs) + + def read_table(self, table_name, columns=None): + """Read `table_name` from OpenWPM dataset into a pyspark dataframe. + + Parameters + ---------- + table_name : string + OpenWPM table to read + columns : list of strings + The set of columns to filter the parquet dataset by + """ + return ( + pq.ParquetDataset( + self._table_location_format_string % table_name, + filesystem= self._gcsfs, + metadata_nthreads=4, + ) + .read(use_pandas_metadata=True, columns=columns) + .to_pandas() + ) + + def collect_content(self, content_hash: str, beautify: bool=False) -> Optional[Union[bytes, str]]: + """Collect content by directly connecting to GCS via google.cloud.storage""" + storage_client = storage.Client() + bucket = storage_client.bucket(self._bucket) + + + blob = bucket.blob(self._content_key % content_hash) + content: Union[bytes, str] = blob.download_as_bytes() + + if beautify: + try: + content = jsbeautifier.beautify(content) + except IndexError: + pass + return content + +class PySparkGCSDataset(GCSDataset): + def __init__(self, spark_context: SparkContext, base_dir: str, bucket:str="openwpm-data", **kwargs) -> None: + """Helper class to load OpenWPM datasets from GCS using PySpark + + Parameters + ---------- + spark_context + Spark context. In databricks, this is available via the `sc` + variable. + base_dir : string + Directory within the bucket in which the dataset is saved. + bucket : string, optional + The bucket name on GCS. Defaults to `openwpm-data`. + """ + super().__init__(base_dir, bucket, **kwargs) + self._spark_context = spark_context + self._sql_context = SQLContext(spark_context) + self._table_location_format_string = f"gcs://{self._table_location_format_string}" + self._incomplete_visit_ids = self.read_table( + "incomplete_visits", mode="all" + ).select("visit_id") + crawl_history = self.read_table("crawl_history", mode="all") + self._failed_visit_ids = ( + get_worst_status_per_visit_id(crawl_history) + .where(F.col("worst_status") != "ok") + .select("visit_id") + ) + + def read_table( + self, table_name: str, columns: List[str] = None, mode: str = "successful" + ): + """Read `table_name` from OpenWPM dataset into a pyspark dataframe. + + Parameters + ---------- + table_name : string + OpenWPM table to read + columns : list of strings + The set of columns to filter the parquet dataset by + mode : string + The valid values are "successful", "failed", "all" + Success is determined per visit_id. A visit_id is failed + if one of it's commands failed or if it's in the interrupted table + """ + table = self._sql_context.read.parquet(self._table_location_format_string % table_name) + if columns is not None: + table = table.select(columns) + if mode == "all": + return table + if mode == "failed": + return table.join(self._failed_visit_ids, "visit_id", how="inner").union( + table.join(self._incomplete_visit_ids, "visit_id", how="inner") + ) + if mode == "successful": + return table.join(self._failed_visit_ids, "visit_id", how="leftanti").join( + self._incomplete_visit_ids, "visit_id", how="leftanti" + ) + else: + raise AssertionError( + f"Mode was ${mode}," + "allowed modes are 'all', 'failed' and 'successful'" + ) + return table diff --git a/requirements.txt b/requirements.txt index 432baf6..b6fcdb0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,11 @@ +abp-blocklist-parser boto3 domain_utils +gcsfs +google-cloud-storage jsbeautifier pandas plyvel pyarrow pyspark -s3fs -abp-blocklist-parser +s3fs \ No newline at end of file From 5f635b487432838677a77b0ef84e2499e89b48a1 Mon Sep 17 00:00:00 2001 From: Stefan Zabka Date: Mon, 22 Mar 2021 18:21:06 +0100 Subject: [PATCH 12/12] Changing scheme from gcs to gs --- openwpm_utils/gcs.py | 39 +++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/openwpm_utils/gcs.py b/openwpm_utils/gcs.py index 0d7f6d5..9d407a5 100644 --- a/openwpm_utils/gcs.py +++ b/openwpm_utils/gcs.py @@ -7,13 +7,16 @@ import pyspark.sql.functions as F from google.cloud import storage from pyspark.context import SparkContext -from pyspark.sql import SQLContext +from pyspark.sql import SQLContext, DataFrame +from pandas import DataFrame as PandasDataFrame from openwpm_utils.crawlhistory import get_worst_status_per_visit_id class GCSDataset(object): - def __init__(self, base_dir: str, bucket:Optional[str]="openwpm-data", **kwargs) -> None: + def __init__( + self, base_dir: str, bucket: Optional[str] = "openwpm-data", **kwargs + ) -> None: """Helper class to load OpenWPM datasets from GCS using pandas This dataset wrapper is safe to use by spark worker processes, as it @@ -35,8 +38,8 @@ def __init__(self, base_dir: str, bucket:Optional[str]="openwpm-data", **kwargs) self._content_key = f"{base_dir}/content/%s.gz" self._gcsfs = gcsfs.GCSFileSystem(**kwargs) - def read_table(self, table_name, columns=None): - """Read `table_name` from OpenWPM dataset into a pyspark dataframe. + def read_table(self, table_name: str, columns: List[str]=None) -> PandasDataFrame: + """Read `table_name` from OpenWPM dataset into a pandas dataframe. Parameters ---------- @@ -48,19 +51,20 @@ def read_table(self, table_name, columns=None): return ( pq.ParquetDataset( self._table_location_format_string % table_name, - filesystem= self._gcsfs, + filesystem=self._gcsfs, metadata_nthreads=4, ) .read(use_pandas_metadata=True, columns=columns) .to_pandas() ) - def collect_content(self, content_hash: str, beautify: bool=False) -> Optional[Union[bytes, str]]: + def collect_content( + self, content_hash: str, beautify: bool = False + ) -> Optional[Union[bytes, str]]: """Collect content by directly connecting to GCS via google.cloud.storage""" storage_client = storage.Client() bucket = storage_client.bucket(self._bucket) - blob = bucket.blob(self._content_key % content_hash) content: Union[bytes, str] = blob.download_as_bytes() @@ -71,8 +75,15 @@ def collect_content(self, content_hash: str, beautify: bool=False) -> Optional[U pass return content + class PySparkGCSDataset(GCSDataset): - def __init__(self, spark_context: SparkContext, base_dir: str, bucket:str="openwpm-data", **kwargs) -> None: + def __init__( + self, + spark_context: SparkContext, + base_dir: str, + bucket: str = "openwpm-data", + **kwargs, + ) -> None: """Helper class to load OpenWPM datasets from GCS using PySpark Parameters @@ -88,7 +99,9 @@ def __init__(self, spark_context: SparkContext, base_dir: str, bucket:str="openw super().__init__(base_dir, bucket, **kwargs) self._spark_context = spark_context self._sql_context = SQLContext(spark_context) - self._table_location_format_string = f"gcs://{self._table_location_format_string}" + self._table_location_format_string = ( + f"gs://{self._table_location_format_string}" + ) self._incomplete_visit_ids = self.read_table( "incomplete_visits", mode="all" ).select("visit_id") @@ -100,8 +113,8 @@ def __init__(self, spark_context: SparkContext, base_dir: str, bucket:str="openw ) def read_table( - self, table_name: str, columns: List[str] = None, mode: str = "successful" - ): + self, table_name: str, columns: Optional[List[str]] = None, mode: str = "successful" + ) -> DataFrame: """Read `table_name` from OpenWPM dataset into a pyspark dataframe. Parameters @@ -115,7 +128,9 @@ def read_table( Success is determined per visit_id. A visit_id is failed if one of it's commands failed or if it's in the interrupted table """ - table = self._sql_context.read.parquet(self._table_location_format_string % table_name) + table = self._sql_context.read.parquet( + self._table_location_format_string % table_name + ) if columns is not None: table = table.select(columns) if mode == "all":