14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134 | def __init__(self, cluster = None, io_per_node = 2, exec_per_node = 1) -> None:
"""
Creates a QuokkaContext object. Needed to do basically anything. Similar to SparkContext.
Args:
cluster (Cluster): Cluster object. If None, a LocalCluster will be created.
io_per_node (int): Number of IO nodes to launch per machine. Default is 2. This controls the number of IO thread pools to use per machine.
exec_per_node (int): Number of compute nodes to launch per machine. Default is 1. This controls the number of compute thread pools to use per machine.
Returns:
QuokkaContext object
Examples:
>>> from pyquokka.df import QuokkaContext
This will create a QuokkaContext object with a LocalCluster to execute locally. Useful for testing and exploration.
>>> qc = QuokkaContext()
If you want to connect to a remote Ray cluster, you can do this.
>>> manager = QuokkaClusterManager()
>>> cluster = manager.get_cluster_from_ray("my_cluster.yaml", aws_access_key, aws_access_id, requirements = ["numpy", "pandas"], spill_dir = "/data")
>>> from pyquokka.df import QuokkaContext
>>> qc = QuokkaContext(cluster)
Or you can spin up a new cluster:
>>> manager = QuokkaClusterManager(key_name = "my_key", key_location = "/home/ubuntu/.ssh/my_key.pem", security_group = "my_security_group")
>>> cluster = manager.create_cluster(aws_access_key, aws_access_id, num_instances = 2, instance_type = "i3.2xlarge", ami="ami-0530ca8899fac469f", requirements = ["numpy", "pandas"])
>>> qc = QuokkaContext(cluster)
Or you can connect to a stopped cluster that was saved to json.
>>> from pyquokka.utils import *
>>> manager = QuokkaClusterManager(key_name = "my_key", key_location = "/home/ubuntu/.ssh/my_key.pem", security_group = "my_security_group")
>>> cluster = manager.create_cluster(aws_access_key, aws_access_id, num_instances = 2, instance_type = "i3.2xlarge", ami="ami-0530ca8899fac469f", requirements = ["numpy", "pandas"])
>>> cluster.to_json("my_cluster.json")
You can now close this Python session. In a new Python session you can connect to this cluster by doing:
>>> from pyquokka.utils import *
>>> manager = QuokkaClusterManager(key_name = "my_key", key_location = "/home/ubuntu/.ssh/my_key.pem", security_group = "my_security_group")
>>> cluster = manager.from_json("my_cluster.json")
"""
self.sql_config= {"optimize_joins" : True, "s3_csv_materialize_threshold" : 10 * 1048576, "disk_csv_materialize_threshold" : 1048576,
"s3_parquet_materialize_threshold" : 10 * 1048576, "disk_parquet_materialize_threshold" : 1048576}
self.exec_config = {"hbq_path": "/data/", "fault_tolerance": False, "memory_limit": 0.25, "max_pipeline_batches": 30,
"checkpoint_interval": None, "checkpoint_bucket": "quokka-checkpoint", "batch_attempt": 20, "max_pipeline": 3}
self.latest_node_id = 0
self.nodes = {}
self.cluster = LocalCluster() if cluster is None else cluster
if type(self.cluster) == LocalCluster:
if self.exec_config["fault_tolerance"]:
print("Fault tolerance is not supported in local mode, turning it off")
self.exec_config["fault_tolerance"] = False
self.io_per_node = io_per_node
self.exec_per_node = exec_per_node
self.coordinator = Coordinator.options(num_cpus=0.001, max_concurrency = 2,resources={"node:" + str(self.cluster.leader_private_ip): 0.001}).remote()
self.catalog = Catalog.options(num_cpus=0.001,resources={"node:" + str(self.cluster.leader_private_ip): 0.001}).remote()
self.dataset_manager = ArrowDataset.options(num_cpus = 0.001, resources={"node:" + str(self.cluster.leader_private_ip): 0.001}).remote()
self.task_managers = {}
self.node_locs= {}
self.io_nodes = set()
self.compute_nodes = set()
self.replay_nodes = set()
count = 0
self.tag_io_nodes = {k: set() for k in self.cluster.tags}
self.tag_compute_nodes = {k: set() for k in self.cluster.tags}
self.leader_compute_nodes = []
self.leader_io_nodes = []
# default strategy launches two IO nodes and one compute node per machine
private_ips = list(self.cluster.private_ips.values())
for ip in private_ips:
for k in range(1):
self.task_managers[count] = ReplayTaskManager.options(num_cpus = 0.001, max_concurrency = 2, resources={"node:" + ip : 0.001}).remote(count, self.cluster.leader_public_ip, list(self.cluster.private_ips.values()), self.exec_config)
self.replay_nodes.add(count)
self.node_locs[count] = ip
count += 1
for k in range(io_per_node):
self.task_managers[count] = IOTaskManager.options(num_cpus = 0.001, max_concurrency = 2, resources={"node:" + ip : 0.001}).remote(count, self.cluster.leader_public_ip, list(self.cluster.private_ips.values()), self.exec_config)
self.io_nodes.add(count)
if ip == self.cluster.leader_private_ip:
self.leader_io_nodes.append(count)
for tag in self.cluster.tags:
if ip in self.cluster.tags[tag]:
self.tag_io_nodes[tag].add(count)
self.node_locs[count] = ip
count += 1
for k in range(exec_per_node):
if type(self.cluster) == LocalCluster:
self.task_managers[count] = ExecTaskManager.options(num_cpus = 0.001, max_concurrency = 2, resources={"node:" + ip : 0.001}).remote(count, self.cluster.leader_public_ip, list(self.cluster.private_ips.values()), self.exec_config)
elif type(self.cluster) == EC2Cluster:
self.task_managers[count] = ExecTaskManager.options(num_cpus = 0.001, max_concurrency = 2, resources={"node:" + ip : 0.001}).remote(count, self.cluster.leader_public_ip, list(self.cluster.private_ips.values()), self.exec_config)
else:
raise Exception
if ip == self.cluster.leader_private_ip:
self.leader_compute_nodes.append(count)
for tag in self.cluster.tags:
if ip in self.cluster.tags[tag]:
self.tag_compute_nodes[tag].add(count)
self.compute_nodes.add(count)
self.node_locs[count] = ip
count += 1
ray.get(self.coordinator.register_nodes.remote(replay_nodes = {k: self.task_managers[k] for k in self.replay_nodes}, io_nodes = {k: self.task_managers[k] for k in self.io_nodes}, compute_nodes = {k: self.task_managers[k] for k in self.compute_nodes}))
ray.get(self.coordinator.register_node_ips.remote( self.node_locs ))
|