Skip to content

QuokkaClusterManager.get_cluster_from_json

Get an EC2Cluster object from a json file. The json file must have been created by EC2Cluster.to_json. This will restart the cluster if all the instances have been stopped and set up the Quokka runtime. If the cluster is running, the Quokka runtime will not be set up again. So this will break if you manually turned on the instances.

Parameters:

Name Type Description Default
json_file str

Path to json file, must have been created by EC2Cluster.to_json. You can also manually create this json based on the format of the json file created by EC2Cluster.to_json, but this is not recommended.

required
Return

EC2Cluster: Cluster object.

Examples:

>>> from pyquokka.utils import *
>>> manager = QuokkaClusterManager()
>>> cluster = manager.get_cluster_from_json("my_cluster.json")
>>> from pyquokka.df import QuokkaContext 
>>> qc = QuokkaContext(cluster)
>>> df = qc.read_csv("s3://my_bucket/my_file.csv")
Source code in pyquokka/utils.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
def get_cluster_from_json(self, json_file):

    """
    Get an EC2Cluster object from a json file. The json file must have been created by `EC2Cluster.to_json`.
    This will restart the cluster if all the instances have been stopped and set up the Quokka runtime. 
    If the cluster is running, the Quokka runtime will not be set up again. So this will break if you manually turned on the instances.

    Args:
        json_file (str): Path to json file, must have been created by `EC2Cluster.to_json`. You can also manually create this json based on the 
            format of the json file created by `EC2Cluster.to_json`, but this is not recommended.

    Return:
        EC2Cluster: Cluster object.

    Examples:

        >>> from pyquokka.utils import *
        >>> manager = QuokkaClusterManager()
        >>> cluster = manager.get_cluster_from_json("my_cluster.json")
        >>> from pyquokka.df import QuokkaContext 
        >>> qc = QuokkaContext(cluster)
        >>> df = qc.read_csv("s3://my_bucket/my_file.csv")

    """

    ec2 = boto3.client("ec2")

    stuff = json.load(open(json_file,"r"))
    cpu_count = int(stuff["cpu_count_per_instance"])
    spill_dir = stuff["spill_dir"]
    instance_ids = self.str_key_to_int(stuff["instance_ids"])
    instance_ids = [instance_ids[i] for i in range(len(instance_ids))]
    a = ec2.describe_instances(InstanceIds = instance_ids)

    states = [k['State']['Name'] for reservation in a['Reservations'] for k in reservation['Instances']] 

    if sum([i=="stopped" for i in states]) == len(states):
        print("Cluster is stopped. Run quokkactl start_cluster to start the cluster.")
    if sum([i=="running" for i in states]) == len(states):
        request_instance_ids = [k['InstanceId'] for reservation in a['Reservations'] for k in reservation['Instances']]
        public_ips = [k['PublicIpAddress'] for reservation in a['Reservations'] for k in reservation['Instances']] 
        private_ips = [k['PrivateIpAddress'] for reservation in a['Reservations'] for k in reservation['Instances']] 

        # figure out where in request_instance_ids is instance_ids[0]
        leader_index = request_instance_ids.index(instance_ids[0])
        assert leader_index != -1, "Leader instance not found in request_instance_ids"
        public_ips = public_ips[leader_index:] + public_ips[:leader_index]
        private_ips = private_ips[leader_index:] + private_ips[:leader_index]

        return EC2Cluster(public_ips, private_ips, instance_ids, cpu_count, spill_dir, tags = stuff['tags'] if 'tags' in stuff else {})
    else:
        print("Cluster in an inconsistent state. Either only some machines are running or some machines have been terminated.")
        return False