Skip to content

LocalCluster

Source code in pyquokka/utils.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class LocalCluster:
    def __init__(self) -> None:

        """
        Creates a local cluster on your machine. This is useful for testing purposes. This not should be necessary because `QuokkaContext` will automatically
        make one for you.

        Return:
            LocalCluster: A LocalCluster object.

        Examples:

            >>> from pyquokka.utils import *
            >>> cluster = LocalCluster()
            >>> from pyquokka.df import QuokkaContext
            >>> qc = QuokkaContext(cluster)

            But the following is equivalent:

            >>> from pyquokka.df import QuokkaContext
            >>> qc = QuokkaContext()

        """

        print("Initializing local Quokka cluster.")
        self.num_node = 1
        self.tags = {}
        self.cpu_count = multiprocessing.cpu_count()
        pyquokka_loc = pyquokka.__file__.replace("__init__.py","")
        # we assume you have pyquokka installed, and we are going to spin up a ray cluster locally
        ray.init(ignore_reinit_error=True)
        flight_file = pyquokka_loc + "/flight.py"
        self.flight_process = None
        self.redis_process = None
        os.system("export GLIBC_TUNABLES=glibc.malloc.trim_threshold=524288")
        port5005 = os.popen("lsof -i:5005").read()
        if "python" in port5005:
            raise Exception("Port 5005 is already in use. Kill the process that is using it first.")
        try:
            self.flight_process = subprocess.Popen(["python3", flight_file], preexec_fn = preexec_function)
        except:
            raise Exception("Could not start flight server properly. Check if there is already something using port 5005, kill it if necessary. Use lsof -i:5005")
        self.redis_process = subprocess.Popen(["redis-server" , pyquokka_loc + "redis.conf", "--port 6800", "--protected-mode no"], preexec_fn=preexec_function)
        self.leader_public_ip = "localhost"
        self.leader_private_ip = ray.get_runtime_context().gcs_address.split(":")[0]
        self.public_ips = {0:"localhost"}
        self.private_ips = {0: ray.get_runtime_context().gcs_address.split(":")[0]}
        print("Finished setting up local Quokka cluster.")

    def __del__(self):
        # we need to join the process that is running the flight server! 
        if self.flight_process is not None:
            self.flight_process.kill()
        if self.redis_process is not None:
            self.redis_process.kill()

__init__()

Creates a local cluster on your machine. This is useful for testing purposes. This not should be necessary because QuokkaContext will automatically make one for you.

Return

LocalCluster: A LocalCluster object.

Examples:

>>> from pyquokka.utils import *
>>> cluster = LocalCluster()
>>> from pyquokka.df import QuokkaContext
>>> qc = QuokkaContext(cluster)

But the following is equivalent:

>>> from pyquokka.df import QuokkaContext
>>> qc = QuokkaContext()
Source code in pyquokka/utils.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def __init__(self) -> None:

    """
    Creates a local cluster on your machine. This is useful for testing purposes. This not should be necessary because `QuokkaContext` will automatically
    make one for you.

    Return:
        LocalCluster: A LocalCluster object.

    Examples:

        >>> from pyquokka.utils import *
        >>> cluster = LocalCluster()
        >>> from pyquokka.df import QuokkaContext
        >>> qc = QuokkaContext(cluster)

        But the following is equivalent:

        >>> from pyquokka.df import QuokkaContext
        >>> qc = QuokkaContext()

    """

    print("Initializing local Quokka cluster.")
    self.num_node = 1
    self.tags = {}
    self.cpu_count = multiprocessing.cpu_count()
    pyquokka_loc = pyquokka.__file__.replace("__init__.py","")
    # we assume you have pyquokka installed, and we are going to spin up a ray cluster locally
    ray.init(ignore_reinit_error=True)
    flight_file = pyquokka_loc + "/flight.py"
    self.flight_process = None
    self.redis_process = None
    os.system("export GLIBC_TUNABLES=glibc.malloc.trim_threshold=524288")
    port5005 = os.popen("lsof -i:5005").read()
    if "python" in port5005:
        raise Exception("Port 5005 is already in use. Kill the process that is using it first.")
    try:
        self.flight_process = subprocess.Popen(["python3", flight_file], preexec_fn = preexec_function)
    except:
        raise Exception("Could not start flight server properly. Check if there is already something using port 5005, kill it if necessary. Use lsof -i:5005")
    self.redis_process = subprocess.Popen(["redis-server" , pyquokka_loc + "redis.conf", "--port 6800", "--protected-mode no"], preexec_fn=preexec_function)
    self.leader_public_ip = "localhost"
    self.leader_private_ip = ray.get_runtime_context().gcs_address.split(":")[0]
    self.public_ips = {0:"localhost"}
    self.private_ips = {0: ray.get_runtime_context().gcs_address.split(":")[0]}
    print("Finished setting up local Quokka cluster.")