An introduction to Python distributed parallel processing with Ray

What is Ray

Ray is a framework that allows you to write distributed parallel processing in Python quickly and simply, and is designed to make it easy to parallelize existing code. By using Ray, you can write process-level parallel processing more easily than multiprocessing.

This article is based on the content of Ray Tutorial. The code has been confirmed to work with Python 3.8.2 and Ray 0.8.4.

Installation

You can install it from pip etc. in the terminal.

$ pip install ray

How to use

As a basic use, there are only three grammars to remember: ray.init`` ray.remote ray.get, and this article will also introduce ray.wait ray.put.

Basics of parallelization by Ray

Consider parallelizing the execution of func for the following code, where the function func, which takes 3 seconds to execute, is called twice and the entire execution takes 6 seconds.

import time

def func(x):
    time.sleep(3)
    return x

begin_time = time.time() #Record start time

res1, res2 = func(1), func(2) #Call func twice
print(res1, res2)  #output: 1 2

end_time = time.time() #Record the end time

print(end_time - begin_time) #About 6 seconds

When using Ray, it is necessary to ** always first ** specify the number of resources to be used in ray.init and start the Ray process.

import ray

# ray.init()If you do not specify explicitly like, the number of resources will be determined automatically.
ray.init(num_cpus=4)

#Wait a little for Ray to start for the sake of more accurate time measurement
time.sleep(1)

If you want to execute a function in parallel, you need to make it a ** remote function ** that Ray can handle. That said, it's easy to do, just add @ ray.remote and a decorator to the function. The remote function can be called as (function name) .remote (argument) and sent to Ray's parallel workers for execution. .remote (argument) returns ** Object ID ** without waiting for it to finish.

@ray.remote
def func(x):
    time.sleep(3)
    return x

begin_time = time.time()
res1, res2 = func.remote(1), func.remote(2)
print(res1) #Output example: ObjectID(45b9....) 

If you want to get the result, you can pass the Object ID returned from the remote function to ray.get. ray.get blocks until all results corresponding to the Object ID are available.

print(ray.get(res1), ray.get(res2)) #output: 1 2

# ray.get can also receive a list
print(ray.get([res1, res2])) #output: [1, 2]

end_time = time.time()
print(end_time - begin_time) #About 3 seconds

When the above code is executed as one script, it takes only about 3 seconds, and you can see that the execution of func is parallelized. That's all the basics.

Dependent parallelization

Ray can handle even if there is a dependency between the remote functions by passing the Object ID as it is. The passed Object ID is restored to a normal Python object and executed when it is actually executed. In the example below, func1 and func2 are applied in sequence to each of the four elements in vec. Processing one element takes 2 seconds.

@ray.remote
def func1(x):
    time.sleep(1)
    return x * 2

@ray.remote
def func2(x):
    time.sleep(1)
    return x + 1

vec = [1, 2, 3, 4]
results = []

for x in vec:
    res = func1.remote(x)  #objectID is included in res
    res = func2.remote(res) #Pass the ObjectID as is to the next remote function
    results.append(res)

#results is a list of ObjectIDs
print(ray.get(results)) #output: [3, 5, 7, 9]

Ray parses the dependencies, runs func1 with no dependencies first, and then runs func2 in parallel for the processed elements of func1. This process, which takes 8 seconds sequentially, is executed in about 2 seconds by parallelization.

Ray also supports nested calls, and rewriting func2 as follows works fine. The only condition for nested calls is that the function you want to call is predefined.

@ray.remote
def func2(x):
    x = func1.remote(x)  #ObjectID returned
    time.sleep(1)
    return ray.get(x) + 1 #Because it cannot be added directly to the Object ID, ray.Get and then calculate

print(ray.get([func2.remote(x) for x in vec])) #output: [3, 5, 7, 9]

The measured value in my environment is a little slower than 2 seconds, but it can be executed in parallel faster than 8 seconds.

Actor The remote function returns as it is after being executed and cannot have a state. In Ray, processing that has a state is realized by modifying the class with @ ray.remote. Classes qualified with @ ray.remote are called ** Actor **.

For example, consider the following counter, which takes 1 second per increment. When creating an instance of Actor, add .remote () as in the case of function call.

@ray.remote
class Counter:
    def __init__(self, init_val, sleep=True):
        #Init counter_Initialize with val
        self.count = init_val
        self.sleep = sleep

    def increment(self):
        if self.sleep:
            time.sleep(1)
        self.count += 1
        return self.count

#Create counters with initial values of 0 and 100
counter1, counter2 = Counter.remote(0), Counter.remote(100)

Let's record the value at each stage in results while incrementing each counter three times.

results = []
for _ in range(3):
    results.append(counter1.increment.remote())
    results.append(counter2.increment.remote())

print(ray.get(results)) #output: [1, 101, 2, 102, 3, 103]

It has been incremented a total of 6 times, but since it is parallelized for each counter, the value can be obtained in only 3 seconds.

Also, if you want to call the methods of the same instance of Actor in parallel, you can define a remote function that takes an instance of Actor as an argument. For example, let's execute a function called ʻincrementer that calls ʻincrement every second with a shift of 0.5 seconds as follows. Note that here we have a Counter that makes ʻincrement` itself end in an instant.

@ray.remote
def incrementer(counter, id, times):
    #Increment times every second
    for _ in range(times):
        cnt = counter.increment.remote()
        print(f'id= {id} : count = {ray.get(cnt)}')
        time.sleep(1)

counter = Counter.remote(0, sleep=False) #A counter where one increment ends in an instant

incrementer.remote(counter, id=1, times=5)
time.sleep(0.5)  #Start 0.Shift by 5 seconds
inc = incrementer.remote(counter, id=2, times=5)

ray.wait([inc]) #Will be explained next,Function to wait for the end

When you run it, you can see that ʻincrementer updates the value of counter` alternately every 0.5 seconds as follows.

(0.0 seconds later) id = 1 : count = 1
(0.5 seconds later) id = 2 : count = 2
(1.0 seconds later) id = 1 : count = 3
(1.5 seconds later) id = 2 : count = 4
(2.0 seconds later) ......

ray.wait If you pass a list of Object IDs running in parallel to ray.get, you will not be able to get the values until all of them have finished running. If you use ray.wait, it will wait until the specified number of functions executed in parallel are completed, and will return the ID that ended at that point and the ID that did not.

@ray.remote
def sleep(x):
    #A function that rests for x seconds and returns x
    time.sleep(x)
    return x

ids = [sleep.remote(3), sleep.remote(5), sleep.remote(2)]
finished_ids, running_ids = ray.wait(ids, num_returns=2, timeout=None)

print(ray.get(finished_ids)) #output(After 3 seconds): [3,2] 
print(ray.get(running_ids))  #output(After 5 seconds): [5]

ray.put In fact, each object passed to the remote function is implicitly serialized and copied into Ray's shared memory. Therefore, if you pass a huge object to the argument of remote multiple times, it will take extra time to copy and the area on the shared memory will be wasted.

In such cases, you can avoid this waste by explicitly copying only once in advance using ray.put. ray.put returns an Object ID like remote and passes it to the remote function. Once the object is copied, it is shared so that any worker running in parallel can see it.

@ray.remote
def func4(obj, idx):
    time.sleep(1)
    return idx

# big_Let object be a large object
big_object = None

big_obj_id = ray.put(big_object)

# func.remote()Is called four times,I'm passing the Object ID, so it's big again_No object copy occurs
results = [func4.remote(big_obj_id, i) for i in range(4)]

print(ray.get(results)) #output: [0, 1, 2, 3]

Note that Ray's ray.get deserialization seems to be much faster than pickle.load.

in conclusion

The Official Document has more detailed usage. In particular, Examples contains specific examples such as parameter server and reinforcement learning in a distributed environment, which is helpful. Let's. There is also a high-level framework based on Ray, RLlib for reinforcement learning and [] for hyperparameter tuning. Tune and so on. Let's get a comfortable parallel processing life with Ray.

Reference site

-Ray Tutorial

Recommended Posts

An introduction to Python distributed parallel processing with Ray
How to do multi-core parallel processing with python
An introduction to Python Programming
[Chapter 5] Introduction to Python with 100 knocks of language processing
Reading Note: An Introduction to Data Analysis with Python
[Chapter 3] Introduction to Python with 100 knocks of language processing
[Chapter 2] Introduction to Python with 100 knocks of language processing
[Chapter 4] Introduction to Python with 100 knocks of language processing
[Python] Easy parallel processing with Joblib
An introduction to Python for non-engineers
[Python Tutorial] An Easy Introduction to Python
Introduction to Python Image Inflating Image inflating with ImageDataGenerator
[Introduction to Python] Let's use foreach with Python
[Python] Introduction to CNN with Pytorch MNIST
An introduction to Python for machine learning
An introduction to Python for C programmers
Creating an exe file with Python PyInstaller: PC freezes in parallel processing
[What is an algorithm? Introduction to Search Algorithm] ~ Python ~
[Python] Easy introduction to machine learning with python (SVM)
Introduction to Artificial Intelligence with Python 1 "Genetic Algorithm-Theory-"
Markov Chain Chatbot with Python + Janome (1) Introduction to Janome
Markov Chain Chatbot with Python + Janome (2) Introduction to Markov Chain
Introduction to Artificial Intelligence with Python 2 "Genetic Algorithm-Practice-"
Send an email to Spushi's address with python
Introduction to Tornado (1): Python web framework started with Tornado
How to crop an image with Python + OpenCV
Introduction to formation flight with Tello edu (Python)
Introduction to Python with Atom (on the way)
Introduction to Python "Re" 1 Building an execution environment
Introduction to Generalized Linear Models (GLM) with Python
Parallel processing with no deep meaning in Python
[Introduction to Udemy Python3 + Application] 9. First, print with print
Post an article with an image to WordPress with Python
Python distributed processing Spartan
Introduction to Python language
Introduction to OpenCV (python)-(2)
Image processing with Python
Parallel processing with multiprocessing
[Introduction to Python] How to use while statements (repetitive processing)
[Introduction to Python] How to iterate with the range function?
Introduction to Mathematics Starting with Python Study Memo Vol.1
[Chapter 6] Introduction to scikit-learn with 100 knocks of language processing
I tried to implement an artificial perceptron with python
Building an environment for natural language processing with Python
Image processing with Python (Part 2)
100 Language Processing with Python Knock 2015
Connect to BigQuery with Python
opencv-python Introduction to image processing
Parallel processing with local functions
Introduction to Python Django (2) Win
"Apple processing" with OpenCV3 + Python3
An introduction to private TensorFlow
An introduction to machine learning
Connect to Wikipedia with Python
Acoustic signal processing with Python (2)
Post to slack with Python 3
Acoustic signal processing with Python
Introduction to RDB with sqlalchemy Ⅰ
Introduction to serial communication [Python]
Parallel processing with Parallel of scikit-learn
Image processing with Python (Part 1)