Skip to content

QuokkaContext

Creates a QuokkaContext object. Needed to do basically anything. Similar to SparkContext.

Parameters:

Name Type Description Default
cluster Cluster

Cluster object. If None, a LocalCluster will be created.

None
io_per_node int

Number of IO nodes to launch per machine. Default is 2. This controls the number of IO thread pools to use per machine.

2
exec_per_node int

Number of compute nodes to launch per machine. Default is 1. This controls the number of compute thread pools to use per machine.

1

Returns:

Type Description
None

QuokkaContext object

Examples:

>>> from pyquokka.df import QuokkaContext

This will create a QuokkaContext object with a LocalCluster to execute locally. Useful for testing and exploration.

>>> qc = QuokkaContext()

If you want to connect to a remote Ray cluster, you can do this.

>>> manager = QuokkaClusterManager()
>>> cluster = manager.get_cluster_from_ray("my_cluster.yaml", aws_access_key, aws_access_id, requirements = ["numpy", "pandas"], spill_dir = "/data")
>>> from pyquokka.df import QuokkaContext
>>> qc = QuokkaContext(cluster)

Or you can spin up a new cluster:

>>> manager =  QuokkaClusterManager(key_name = "my_key", key_location = "/home/ubuntu/.ssh/my_key.pem", security_group = "my_security_group")
>>> cluster = manager.create_cluster(aws_access_key, aws_access_id, num_instances = 2, instance_type = "i3.2xlarge", ami="ami-0530ca8899fac469f", requirements = ["numpy", "pandas"])
>>> qc = QuokkaContext(cluster)

Or you can connect to a stopped cluster that was saved to json.

>>> from pyquokka.utils import *
>>> manager =  QuokkaClusterManager(key_name = "my_key", key_location = "/home/ubuntu/.ssh/my_key.pem", security_group = "my_security_group")
>>> cluster = manager.create_cluster(aws_access_key, aws_access_id, num_instances = 2, instance_type = "i3.2xlarge", ami="ami-0530ca8899fac469f", requirements = ["numpy", "pandas"])
>>> cluster.to_json("my_cluster.json")

You can now close this Python session. In a new Python session you can connect to this cluster by doing:

>>> from pyquokka.utils import *
>>> manager = QuokkaClusterManager(key_name = "my_key", key_location = "/home/ubuntu/.ssh/my_key.pem", security_group = "my_security_group")
>>> cluster = manager.from_json("my_cluster.json")
Source code in pyquokka/df.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def __init__(self, cluster = None, io_per_node = 2, exec_per_node = 1) -> None:

    """
    Creates a QuokkaContext object. Needed to do basically anything. Similar to SparkContext.

    Args:
        cluster (Cluster): Cluster object. If None, a LocalCluster will be created.
        io_per_node (int): Number of IO nodes to launch per machine. Default is 2. This controls the number of IO thread pools to use per machine.
        exec_per_node (int): Number of compute nodes to launch per machine. Default is 1. This controls the number of compute thread pools to use per machine.

    Returns:
        QuokkaContext object

    Examples:

        >>> from pyquokka.df import QuokkaContext

        This will create a QuokkaContext object with a LocalCluster to execute locally. Useful for testing and exploration.

        >>> qc = QuokkaContext()

        If you want to connect to a remote Ray cluster, you can do this.

        >>> manager = QuokkaClusterManager()
        >>> cluster = manager.get_cluster_from_ray("my_cluster.yaml", aws_access_key, aws_access_id, requirements = ["numpy", "pandas"], spill_dir = "/data")
        >>> from pyquokka.df import QuokkaContext
        >>> qc = QuokkaContext(cluster)

        Or you can spin up a new cluster:

        >>> manager =  QuokkaClusterManager(key_name = "my_key", key_location = "/home/ubuntu/.ssh/my_key.pem", security_group = "my_security_group")
        >>> cluster = manager.create_cluster(aws_access_key, aws_access_id, num_instances = 2, instance_type = "i3.2xlarge", ami="ami-0530ca8899fac469f", requirements = ["numpy", "pandas"])
        >>> qc = QuokkaContext(cluster)

        Or you can connect to a stopped cluster that was saved to json.

        >>> from pyquokka.utils import *
        >>> manager =  QuokkaClusterManager(key_name = "my_key", key_location = "/home/ubuntu/.ssh/my_key.pem", security_group = "my_security_group")
        >>> cluster = manager.create_cluster(aws_access_key, aws_access_id, num_instances = 2, instance_type = "i3.2xlarge", ami="ami-0530ca8899fac469f", requirements = ["numpy", "pandas"])
        >>> cluster.to_json("my_cluster.json")

        You can now close this Python session. In a new Python session you can connect to this cluster by doing:

        >>> from pyquokka.utils import *
        >>> manager = QuokkaClusterManager(key_name = "my_key", key_location = "/home/ubuntu/.ssh/my_key.pem", security_group = "my_security_group")
        >>> cluster = manager.from_json("my_cluster.json")

    """

    self.sql_config= {"optimize_joins" : True, "s3_csv_materialize_threshold" : 10 * 1048576, "disk_csv_materialize_threshold" : 1048576,
                  "s3_parquet_materialize_threshold" : 10 * 1048576, "disk_parquet_materialize_threshold" : 1048576}
    self.exec_config = {"hbq_path": "/data/", "fault_tolerance": False, "memory_limit": 0.25, "max_pipeline_batches": 30, 
                    "checkpoint_interval": None, "checkpoint_bucket": "quokka-checkpoint", "batch_attempt": 20, "max_pipeline": 3}

    self.latest_node_id = 0
    self.nodes = {}
    self.cluster = LocalCluster() if cluster is None else cluster
    if type(self.cluster) == LocalCluster:
        if self.exec_config["fault_tolerance"]:
            print("Fault tolerance is not supported in local mode, turning it off")
            self.exec_config["fault_tolerance"] = False
    self.io_per_node = io_per_node
    self.exec_per_node = exec_per_node

    self.coordinator = Coordinator.options(num_cpus=0.001, max_concurrency = 2,resources={"node:" + str(self.cluster.leader_private_ip): 0.001}).remote()
    self.catalog = Catalog.options(num_cpus=0.001,resources={"node:" + str(self.cluster.leader_private_ip): 0.001}).remote()
    self.dataset_manager = ArrowDataset.options(num_cpus = 0.001, resources={"node:" + str(self.cluster.leader_private_ip): 0.001}).remote()

    self.task_managers = {}
    self.node_locs= {}
    self.io_nodes = set()
    self.compute_nodes = set()
    self.replay_nodes = set()
    count = 0

    self.tag_io_nodes = {k: set() for k in self.cluster.tags}
    self.tag_compute_nodes = {k: set() for k in self.cluster.tags}

    self.leader_compute_nodes = []
    self.leader_io_nodes = []

    # default strategy launches two IO nodes and one compute node per machine
    private_ips = list(self.cluster.private_ips.values())
    for ip in private_ips:

        for k in range(1):
            self.task_managers[count] = ReplayTaskManager.options(num_cpus = 0.001, max_concurrency = 2, resources={"node:" + ip : 0.001}).remote(count, self.cluster.leader_public_ip, list(self.cluster.private_ips.values()), self.exec_config)
            self.replay_nodes.add(count)
            self.node_locs[count] = ip
            count += 1
        for k in range(io_per_node):
            self.task_managers[count] = IOTaskManager.options(num_cpus = 0.001, max_concurrency = 2, resources={"node:" + ip : 0.001}).remote(count, self.cluster.leader_public_ip, list(self.cluster.private_ips.values()), self.exec_config)
            self.io_nodes.add(count)
            if ip == self.cluster.leader_private_ip:
                self.leader_io_nodes.append(count)
            for tag in self.cluster.tags:
                if ip in self.cluster.tags[tag]:
                    self.tag_io_nodes[tag].add(count)
            self.node_locs[count] = ip
            count += 1

        for k in range(exec_per_node):
            if type(self.cluster) == LocalCluster:
                self.task_managers[count] = ExecTaskManager.options(num_cpus = 0.001,  max_concurrency = 2, resources={"node:" + ip : 0.001}).remote(count, self.cluster.leader_public_ip, list(self.cluster.private_ips.values()), self.exec_config)
            elif type(self.cluster) == EC2Cluster:
                self.task_managers[count] = ExecTaskManager.options(num_cpus = 0.001,  max_concurrency = 2, resources={"node:" + ip : 0.001}).remote(count, self.cluster.leader_public_ip, list(self.cluster.private_ips.values()), self.exec_config) 
            else:
                raise Exception

            if ip == self.cluster.leader_private_ip:
                self.leader_compute_nodes.append(count)
            for tag in self.cluster.tags:
                if ip in self.cluster.tags[tag]:
                    self.tag_compute_nodes[tag].add(count)

            self.compute_nodes.add(count)
            self.node_locs[count] = ip
            count += 1        

    ray.get(self.coordinator.register_nodes.remote(replay_nodes = {k: self.task_managers[k] for k in self.replay_nodes}, io_nodes = {k: self.task_managers[k] for k in self.io_nodes}, compute_nodes = {k: self.task_managers[k] for k in self.compute_nodes}))
    ray.get(self.coordinator.register_node_ips.remote( self.node_locs ))