diff --git a/openwpm_utils/blocklist.py b/openwpm_utils/blocklist.py index 3324342..47dbc03 100644 --- a/openwpm_utils/blocklist.py +++ b/openwpm_utils/blocklist.py @@ -86,7 +86,6 @@ def get_option_dict(url, top_level_url, resource_type=None): options["third-party"] = True return options - def prepare_get_matching_rules(blockers: List[BlockListParser]): def get_matching_rules(url, top_level_url, resource_type): # skip top-level requests diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py new file mode 100644 index 0000000..1f8eca1 --- /dev/null +++ b/openwpm_utils/crawlhistory.py @@ -0,0 +1,191 @@ +import json + +import pyspark.sql.functions as F +from pyspark.sql.types import StringType + +reduce_to_worst_command_status = ( + F.when(F.array_contains("command_status", "critical"), "critical") + .when(F.array_contains("command_status", "error"), "error") + .when(F.array_contains("command_status", "neterror"), "neterror") + .when(F.array_contains("command_status", "timeout"), "timeout") + .otherwise("ok") + .alias("worst_status") +) + + +reduce_to_best_command_status = ( + F.when(F.array_contains("command_status", "ok"), "ok") + .when(F.array_contains("command_status", "timeout"), "timeout") + .when(F.array_contains("command_status", "neterror"), "neterror") + .when(F.array_contains("command_status", "error"), "error") + .otherwise("critical") + .alias("best_status") +) + + +def get_worst_status_per_visit_id(crawl_history): + """Adds column `worst_status`""" + return (crawl_history.groupBy("visit_id") + .agg(F.collect_list("command_status").alias("command_status")) + .withColumn("worst_status",reduce_to_worst_command_status)) + + +def display_crawl_history_per_command_sequence(crawl_history, interrupted_visits): + """ Analyzes crawl_history and interrupted_visits to display general + success statistics grouped by command_sequence + + Parameters + ---------- + crawl_history: dataframe + The full ``crawl_history`` dataframe + interrupted_visits: dataframe + The full ``interrupted_visits`` dataframe + + Examples + -------- + >>> from openwpm_utils.s3 import PySparkS3Dataset + >>> dataset = PySparkS3Dataset(sc, s3_directory=DB, s3_bucket=S3_BUCKET) + >>> crawl_history = dataset.read_table('crawl_history', mode="all") + >>> incomplete = dataset.read_table('incomplete_visits', mode="all") + >>> display_crawl_history_per_command_sequence(crawl_history, incomplete) + + """ + crawl_history.groupBy("command").count().show() + + # Analyzing status per command_sequence + total_num_command_sequences = crawl_history.groupBy("visit_id").count().count() + visit_id_and_worst_status = get_worst_status_per_visit_id(crawl_history) + print( + "Percentage of command_sequence that didn't complete successfully %0.2f%%" + % ( + visit_id_and_worst_status.where(F.col("worst_status") != "ok").count() + / float(total_num_command_sequences) + * 100 + ) + ) + net_error_count = visit_id_and_worst_status.where( + F.col("worst_status") == "neterror" + ).count() + print( + "There were a total of %d neterrors(%0.2f%% of the all command_sequences)" + % (net_error_count, net_error_count / float(total_num_command_sequences) * 100) + ) + timeout_count = visit_id_and_worst_status.where( + F.col("worst_status") == "timeout" + ).count() + print( + "There were a total of %d timeouts(%0.2f%% of the all command_sequences)" + % (timeout_count, timeout_count / float(total_num_command_sequences) * 100) + ) + + error_count = visit_id_and_worst_status.where( + F.col("worst_status") == "error" + ).count() + print( + "There were a total of %d errors(%0.2f%% of the all command_sequences)" + % (error_count, error_count / float(total_num_command_sequences) * 100) + ) + + print( + f"A total of {interrupted_visits.count()} command_sequences were interrupted." + f"This represents {interrupted_visits.count()/ float(total_num_command_sequences)* 100:.2f} % of the entire crawl" + ) + + +def display_crawl_history_per_website(crawl_history, interrupted_visits): + """ Analyzes crawl_history and interrupted_visits to display general + success statistics grouped by website + + Parameters + ---------- + crawl_history: dataframe + The full ``crawl_history`` dataframe + interrupted_visits: dataframe + The full ``interrupted_visits`` dataframe + + Examples + -------- + >>> from openwpm_utils.s3 import PySparkS3Dataset + >>> dataset = PySparkS3Dataset(sc, s3_directory=DB, s3_bucket=S3_BUCKET) + >>> crawl_history = dataset.read_table('crawl_history', mode="all") + >>> incomplete = dataset.read_table('incomplete_visits', mode="all") + >>> display_crawl_history_per_website(crawl_history, incomplete) + + """ + visit_id_and_worst_status = get_worst_status_per_visit_id(crawl_history) + + def extract_website_from_arguments(arguments): + """Given the arguments of a get_command this function returns which website was visited""" + return json.loads(arguments)["url"] + + udf_extract_website_from_arguments = F.udf( + extract_website_from_arguments, StringType() + ) + + visit_id_to_website = crawl_history.where( + F.col("command") == "GetCommand" + ).withColumn("website", udf_extract_website_from_arguments("arguments")) + + visit_id_to_website = visit_id_to_website[["visit_id", "website"]] + + visit_id_website_status = visit_id_and_worst_status.join( + visit_id_to_website, "visit_id" + ) + best_status_per_website = ( + visit_id_website_status.groupBy("website") + .agg(F.collect_list("worst_status").alias("command_status")) + .withColumn("best_status",reduce_to_best_command_status) + ) + + total_number_websites = best_status_per_website.count() + + print(f"There was an attempt to visit a total of {total_number_websites} websites") + + print( + "Percentage of websites that didn't complete successfully %0.2f%%" + % ( + best_status_per_website.where(F.col("best_status") != "ok").count() + / float(total_number_websites) + * 100 + ) + ) + net_error_count = best_status_per_website.where( + F.col("best_status") == "neterror" + ).count() + print( + "There were a total of %d neterrors (%0.2f%% of the all websites)" + % (net_error_count, net_error_count / float(total_number_websites) * 100) + ) + timeout_count = best_status_per_website.where( + F.col("best_status") == "timeout" + ).count() + print( + "There were a total of %d timeouts (%0.2f%% of the all websites)" + % (timeout_count, timeout_count / float(total_number_websites) * 100) + ) + + error_count = best_status_per_website.where(F.col("best_status") == "error").count() + + print( + "There were a total of %d errors (%0.2f%% of the websites)" + % (error_count, error_count / float(total_number_websites) * 100) + ) + + multiple_successes = ( + visit_id_website_status.where(F.col("worst_status") == "ok") + .join(interrupted_visits, "visit_id", how="leftanti") + .groupBy("website") + .count() + .filter("count > 1") + .orderBy(F.desc("count")) + ) + + print( + f"There were {multiple_successes.count()} websites that were successfully visited multiple times" + ) + multiple_successes.groupBy( + F.col("count").alias("Number of successes") + ).count().show() + + print("A list of all websites that where successfully visited more than twice:") + multiple_successes.filter("count > 2").show() diff --git a/openwpm_utils/dataquality.py b/openwpm_utils/dataquality.py index d4a36f9..e7a2ce8 100644 --- a/openwpm_utils/dataquality.py +++ b/openwpm_utils/dataquality.py @@ -1,5 +1,9 @@ -from pyspark.sql.functions import countDistinct, col, isnan, lit, sum, count, when +import pyspark.sql.functions as F from pyspark.mllib.stat import Statistics +from pyspark.sql.dataframe import DataFrame +from pyspark.sql.functions import col, count, countDistinct, isnan, lit, sum, when + +from openwpm_utils.crawlhistory import get_worst_status_per_visit_id def count_not_null(c, nan_as_null=False): @@ -53,3 +57,23 @@ def check_df(df, skip_null_check=True): "\nNumber of records with visit_id == -1: %d" % df.where(df.visit_id == -1).count() ) + + +class TableFilter: + def __init__(self, incomplete_visits: DataFrame, crawl_history: DataFrame) -> None: + self._incomplete_visit_ids = incomplete_visits.select("visit_id") + self._failed_visit_ids = ( + get_worst_status_per_visit_id(crawl_history) + .where(F.col("worst_status") != "ok") + .select("visit_id") + ) + + def clean_table(self, table: DataFrame) -> DataFrame: + return table.join(self._failed_visit_ids, "visit_id", how="leftanti").join( + self._incomplete_visit_ids, "visit_id", how="leftanti" + ) + + def dirty_table(self, table: DataFrame) -> DataFrame: + return table.join(self._failed_visit_ids, "visit_id", how="inner").union( + table.join(self._incomplete_visit_ids, "visit_id", how="inner") + ) diff --git a/openwpm_utils/gcs.py b/openwpm_utils/gcs.py new file mode 100644 index 0000000..9d407a5 --- /dev/null +++ b/openwpm_utils/gcs.py @@ -0,0 +1,151 @@ +from typing import List, Optional, Union + +import gcsfs +from google.api_core.exceptions import NotFound +import jsbeautifier +import pyarrow.parquet as pq +import pyspark.sql.functions as F +from google.cloud import storage +from pyspark.context import SparkContext +from pyspark.sql import SQLContext, DataFrame +from pandas import DataFrame as PandasDataFrame + +from openwpm_utils.crawlhistory import get_worst_status_per_visit_id + + +class GCSDataset(object): + def __init__( + self, base_dir: str, bucket: Optional[str] = "openwpm-data", **kwargs + ) -> None: + """Helper class to load OpenWPM datasets from GCS using pandas + + This dataset wrapper is safe to use by spark worker processes, as it + does not require the spark context. + + Parameters + ---------- + base_dir + Directory within the GCS bucket in which the dataset is saved. + bucket + The bucket name on GCS. + **kwargs + Passed on to GCSFS so you can customize it to your needs + """ + self._kwargs = kwargs + self._bucket = bucket + self._base_dir = base_dir + self._table_location_format_string = f"{bucket}/{base_dir}/visits/%s" + self._content_key = f"{base_dir}/content/%s.gz" + self._gcsfs = gcsfs.GCSFileSystem(**kwargs) + + def read_table(self, table_name: str, columns: List[str]=None) -> PandasDataFrame: + """Read `table_name` from OpenWPM dataset into a pandas dataframe. + + Parameters + ---------- + table_name : string + OpenWPM table to read + columns : list of strings + The set of columns to filter the parquet dataset by + """ + return ( + pq.ParquetDataset( + self._table_location_format_string % table_name, + filesystem=self._gcsfs, + metadata_nthreads=4, + ) + .read(use_pandas_metadata=True, columns=columns) + .to_pandas() + ) + + def collect_content( + self, content_hash: str, beautify: bool = False + ) -> Optional[Union[bytes, str]]: + """Collect content by directly connecting to GCS via google.cloud.storage""" + storage_client = storage.Client() + bucket = storage_client.bucket(self._bucket) + + blob = bucket.blob(self._content_key % content_hash) + content: Union[bytes, str] = blob.download_as_bytes() + + if beautify: + try: + content = jsbeautifier.beautify(content) + except IndexError: + pass + return content + + +class PySparkGCSDataset(GCSDataset): + def __init__( + self, + spark_context: SparkContext, + base_dir: str, + bucket: str = "openwpm-data", + **kwargs, + ) -> None: + """Helper class to load OpenWPM datasets from GCS using PySpark + + Parameters + ---------- + spark_context + Spark context. In databricks, this is available via the `sc` + variable. + base_dir : string + Directory within the bucket in which the dataset is saved. + bucket : string, optional + The bucket name on GCS. Defaults to `openwpm-data`. + """ + super().__init__(base_dir, bucket, **kwargs) + self._spark_context = spark_context + self._sql_context = SQLContext(spark_context) + self._table_location_format_string = ( + f"gs://{self._table_location_format_string}" + ) + self._incomplete_visit_ids = self.read_table( + "incomplete_visits", mode="all" + ).select("visit_id") + crawl_history = self.read_table("crawl_history", mode="all") + self._failed_visit_ids = ( + get_worst_status_per_visit_id(crawl_history) + .where(F.col("worst_status") != "ok") + .select("visit_id") + ) + + def read_table( + self, table_name: str, columns: Optional[List[str]] = None, mode: str = "successful" + ) -> DataFrame: + """Read `table_name` from OpenWPM dataset into a pyspark dataframe. + + Parameters + ---------- + table_name : string + OpenWPM table to read + columns : list of strings + The set of columns to filter the parquet dataset by + mode : string + The valid values are "successful", "failed", "all" + Success is determined per visit_id. A visit_id is failed + if one of it's commands failed or if it's in the interrupted table + """ + table = self._sql_context.read.parquet( + self._table_location_format_string % table_name + ) + if columns is not None: + table = table.select(columns) + if mode == "all": + return table + if mode == "failed": + return table.join(self._failed_visit_ids, "visit_id", how="inner").union( + table.join(self._incomplete_visit_ids, "visit_id", how="inner") + ) + if mode == "successful": + return table.join(self._failed_visit_ids, "visit_id", how="leftanti").join( + self._incomplete_visit_ids, "visit_id", how="leftanti" + ) + else: + raise AssertionError( + f"Mode was ${mode}," + "allowed modes are 'all', 'failed' and 'successful'" + ) + return table diff --git a/openwpm_utils/s3.py b/openwpm_utils/s3.py index b533e03..2242a9e 100644 --- a/openwpm_utils/s3.py +++ b/openwpm_utils/s3.py @@ -1,76 +1,22 @@ import gzip +from typing import List import boto3 import jsbeautifier import pyarrow.parquet as pq +import pyspark.sql.functions as F import s3fs from botocore.exceptions import ClientError from pyarrow.filesystem import S3FSWrapper # noqa -from pyspark.sql import SQLContext +from pyspark import SparkContext +from pyspark.sql import DataFrame, SQLContext +from openwpm_utils.crawlhistory import get_worst_status_per_visit_id +from openwpm_utils.dataquality import TableFilter -class PySparkS3Dataset(object): - def __init__(self, spark_context, s3_directory, - s3_bucket='openwpm-crawls'): - """Helper class to load OpenWPM datasets from S3 using PySpark - - Parameters - ---------- - spark_context - Spark context. In databricks, this is available via the `sc` - variable. - s3_directory : string - Directory within the S3 bucket in which the dataset is saved. - s3_bucket : string, optional - The bucket name on S3. Defaults to `openwpm-crawls`. - """ - self._s3_bucket = s3_bucket - self._s3_directory = s3_directory - self._spark_context = spark_context - self._sql_context = SQLContext(spark_context) - self._s3_table_loc = "s3a://%s/%s/visits/%%s/" % ( - s3_bucket, s3_directory) - self._s3_content_loc = "s3a://%s/%s/content/%%s.gz" % ( - s3_bucket, s3_directory) - - def read_table(self, table_name, columns=None): - """Read `table_name` from OpenWPM dataset into a pyspark dataframe. - - Parameters - ---------- - table_name : string - OpenWPM table to read - columns : list of strings - The set of columns to filter the parquet dataset by - """ - table = self._sql_context.read.parquet(self._s3_table_loc % table_name) - if columns is not None: - return table.select(columns) - return table - - def read_content(self, content_hash): - """Read the content corresponding to `content_hash`. - - NOTE: This can only be run in the driver process since it requires - access to the spark context - """ - return self._spark_context.textFile( - self._s3_content_loc % content_hash) - - def collect_content(self, content_hash, beautify=False): - """Collect content for `content_hash` to driver - - NOTE: This can only be run in the driver process since it requires - access to the spark context - """ - content = ''.join(self.read_content(content_hash).collect()) - if beautify: - return jsbeautifier.beautify(content) - return content - -class S3Dataset(object): - def __init__(self, s3_directory, s3_bucket='openwpm-crawls'): +class S3Dataset: + def __init__(self, s3_directory: str, s3_bucket: str = "openwpm-crawls"): """Helper class to load OpenWPM datasets from S3 using pandas This dataset wrapper is safe to use by spark worker processes, as it @@ -99,30 +45,33 @@ def read_table(self, table_name, columns=None): columns : list of strings The set of columns to filter the parquet dataset by """ - return pq.ParquetDataset( - self._s3_table_loc % table_name, - filesystem=self._s3fs, - metadata_nthreads=4 - ).read(use_pandas_metadata=True, columns=columns).to_pandas() + return ( + pq.ParquetDataset( + self._s3_table_loc % table_name, + filesystem=self._s3fs, + metadata_nthreads=4, + ) + .read(use_pandas_metadata=True, columns=columns) + .to_pandas() + ) def collect_content(self, content_hash, beautify=False): """Collect content by directly connecting to S3 via boto3""" - s3 = boto3.client('s3') + s3 = boto3.client("s3") try: obj = s3.get_object( - Bucket=self._s3_bucket, - Key=self._content_key % content_hash + Bucket=self._s3_bucket, Key=self._content_key % content_hash ) body = obj["Body"] compressed_content = body.read() body.close() except ClientError as e: - if e.response['Error']['Code'] != 'NoSuchKey': + if e.response["Error"]["Code"] != "NoSuchKey": raise else: return None - with gzip.GzipFile(fileobj=compressed_content, mode='r') as f: + with gzip.GzipFile(fileobj=compressed_content, mode="r") as f: content = f.read() if content is None or content == "": @@ -134,3 +83,65 @@ def collect_content(self, content_hash, beautify=False): except IndexError: pass return content + + +class PySparkS3Dataset(S3Dataset): + def __init__( + self, + spark_context: SparkContext, + s3_directory: str, + s3_bucket: str = "openwpm-crawls", + ) -> None: + """Helper class to load OpenWPM datasets from S3 using PySpark + + Parameters + ---------- + spark_context + Spark context. In databricks, this is available via the `sc` + variable. + s3_directory : string + Directory within the S3 bucket in which the dataset is saved. + s3_bucket : string, optional + The bucket name on S3. Defaults to `openwpm-crawls`. + """ + super().__init__(s3_directory, s3_bucket) + self._spark_context = spark_context + self._sql_context = SQLContext(spark_context) + self._s3_table_loc = f"s3a://{self._s3_table_loc}" + incomplete_visits = self.read_table("incomplete_visits", mode="all") + crawl_history = self.read_table("crawl_history", mode="all") + self._filter = TableFilter(incomplete_visits, crawl_history) + + def read_table( + self, table_name: str, columns: List[str] = None, mode: str = "successful" + ) -> DataFrame: + """Read `table_name` from OpenWPM dataset into a pyspark dataframe. + + Parameters + ---------- + table_name : string + OpenWPM table to read + columns : list of strings + The set of columns to filter the parquet dataset by + mode : string + The valid values are "successful", "failed", "all" + Success is determined per visit_id. A visit_id is failed + if one of it's commands failed or if it's in the interrupted table + """ + table = self._sql_context.read.parquet(self._s3_table_loc % table_name) + if mode == "all": + table = table + elif mode == "failed": + table = self._filter.dirty_table(table) + elif mode == "successful": + table = self._filter.clean_table(table) + else: + raise AssertionError( + f"Mode was ${mode}," + "allowed modes are 'all', 'failed' and 'successful'" + ) + + if columns is not None: + table = table.select(columns) + + return table diff --git a/requirements.txt b/requirements.txt index 432baf6..b6fcdb0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,11 @@ +abp-blocklist-parser boto3 domain_utils +gcsfs +google-cloud-storage jsbeautifier pandas plyvel pyarrow pyspark -s3fs -abp-blocklist-parser +s3fs \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index d93c4f5..3162fbc 100644 --- a/setup.cfg +++ b/setup.cfg @@ -4,3 +4,6 @@ test=pytest [tool:pytest] addopts = --flake8 -rw testpaths = tests + +[flake8] +max-line-length = 88 \ No newline at end of file diff --git a/setup.py b/setup.py index 8e7bbee..9ba38d6 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ name='openwpm-utils', license='MPL 2.0', url='https://github.com/mozilla/openwpm-utils', - version='0.2.0', + version='0.3.0', packages=['openwpm_utils'], # Dependencies diff --git a/tests/test_crawlhistory.py b/tests/test_crawlhistory.py new file mode 100644 index 0000000..bd10f58 --- /dev/null +++ b/tests/test_crawlhistory.py @@ -0,0 +1,26 @@ +from openwpm_utils.crawlhistory import get_worst_status_per_visit_id, reduce_to_worst_command_status + +from collections import namedtuple +import pyspark as spark + +srow = namedtuple('simple_row', 'visit_id command_status'.split()) +data = [ + srow('1', "critical"), + srow('1', "ok"), + srow('2', "ok"), + srow('3', "neterror"), + srow('3', "timeout") +] +data2 = [ + srow('1', ["ok", "critical"]), + srow('2', ["ok"]), + srow('3', ["timeout", "neterror"]), +] + +test_df = spark.createDataFrame(data) +test_df.printSchema() +test_df2 = spark.createDataFrame(data2) +test_df2.printSchema() + + +display(get_worst_status_per_visit_id(test_df)) \ No newline at end of file