Minimal Effort Scaling with Ray.io - Easy Analogies to Get Started

How to scale up complex workloads with minimal effort via Ray.io's modules. Useful for AI/ML and other computationally intensive tasks in particular.

Minimal Effort Scaling with Ray.io -  Easy Analogies to Get Started

ray.io describes itself as follows:

Effortlessly Scale Your Most Complex Workloads

When I read that the first time, it wasn't obvious what ray could do for me.

This article is more of a conceptual exploration of what ray is about.

If you need concrete step by step instructions or deeper technical details, head over to the official documentation.

Ray has many moving parts working in tandem to address many distributed engineering challenges.

But ultimately, it all boils down to Effortlessly Scale Your Most Complex Workloads.

So I will try to give a hypothetical example to illuminate that statement a bit.

When Ray Is The Right Tool For the Job

Say - I have a backend server that must perform document OCR. It must be FAST, while also being light on the pocket (obviously).

I will make the case that, the above problem ticks both the check-marks required by ray.

The problem:

  1. Will require dealing with complex workload
  2. Requires many types of scaling to work well

First of all, a document consists of many pages. For an average book - it's around 300 pages.

Zooming in, each page consists of many sub-elements: Titles (various levels of them), paragraphs, images, tables, and so on. There could be single column, two column, or three column layouts. There could be hybrid layouts.

To complicate matters - there could be multiple languages, multiple font sizes, typographical styles, colors, and so on. And of course - there is the matter of the order in which we are supposed to process these elements.

Making sense of a single page is no small feat. And once we have understanding at a higher level, we want to get text out of those nicely recognized elements, which in itself is a computationally heavy task.

Processing one page itself is a collection of many, many tasks of significant challenge.

Moreover, zooming out, a book consists of many pages. It seems natural that we could process pages in batches, since 1 computer/node can deal with at least a few pages at any period of time.

Summing up:

  1. The problem is complex because - we definitely need demanding algorithms/methods to crack the layout & detection related issues. And it also looks like solving these problems in practice means significant CPU and RAM demands.
  2. At the same time - from both cost and speed or efficiency perspective - it helps to scale things. We can process each page in a more sophisticated manner. Also, we can process the whole document as well, in a cleverer fashion. So scaling is possible in following ways:
    1. Process page(s) parallelly (use all the CPU/GPU cores in a machine)
    2. Distribute processing (use more than 1 machine to do (1) and more)

.remote() Makes Functions Parallel, While Demanding Minimum Changes

The beautiful thing about ray is that it doesn't require significant effort to convert a serial program to parallel one.

In this toy example, you see a sequential loop converted to a parallel one with just 2 changes (decorator and .remote(i) invocation).

import ray

ray.init()

# Define the square task.
@ray.remote
def square(x):
    return x * x

# Launch four parallel square tasks.
futures = [square.remote(i) for i in range(4)]

# Retrieve results.
print(ray.get(futures))
# -> [0, 1, 4, 9]

.remote() Makes Classes Parallel Too For Minimal Work From Your side

One can decorate a class with the same @ray.remote decorator.

import ray

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

    def get_counter(self):
        return self.value

Now it becomes possible to initialize an object of this class in a remote ray machine, for no effort whatsoever:

# Create an actor from this class.
counter = Counter.remote()

Remote? What Exactly Is a Remote?

When I started out with ray, my core question was: How does Ray Object Storage work?

The answer is complicated. Because to understand storage, first we must understand the larger idea of a cluster.

So, the question then is What is a Ray Cluster?

ray cluster

Highlights of a Ray cluster:

  1. There are two types of nodes - Head node and Worker Node
  2. Common feature - Raylets. And each Raylet consists of
    1. Scheduler
    2. Object Store
  3. Two types of processes:
    1. Driver Process
    2. Worker Process
  4. Two components exclusive to Head
    1. Global Control Store
    2. Autoscaler

As we see in the diagram and analysis above, there is an Object Store in each node.

A remote object can reside in multiple nodes of the cluster.

And remote tasks make use of remote objects to get work done.

And we use object refs to refer to remote objects.

Putting and Retrieving Remote Objects

Object Refs Can be obtained in two ways:

  1. When we do a remote call - the response is an object ref. The actual object hasn't been resolved yet, but it can be pointed at via the ref.
  2. Another way is simpler, we can simply put and object into the object repository
import ray

y = 1
object_ref = ray.put(y)

A key property for these objects is - they are immutable. Therefore, they can be stored in multiple nodes, accessed parallelly via multiple processes, without any sort of synchronization.

Getting object out of object ref

ray.get() is to get the latest version of the object. If the object is not available in the local node of the cluster, it is "downloaded" to the host node.

Analogy: Object Store Interactions are Like "Promises" in Javascript

Essentially, ray.put() creates a reference, which is like a Promise in JS.

And when you do ray.get() you basically await for that whole object to resolve (or be downloaded) in this case.

Interestingly, you can wait on a single get:

obj_ref = ray.put(1)
assert ray.get(obj_ref) == 1

Or you can wait on a list of multiple gets:

assert ray.get([ray.put(i) for i in range(3)]) == [0, 1, 2]

Remember that, both of these patterns are possible with JS Promises.

Ray Objects Refs can Be Passed Around

Ray objects are quite flexible, and can be passed around to:

  1. tasks
  2. actor methods
  3. other objects

However, there's some nuance in the way Ray tasks deal with Objects within parameters.

For example, say we have a remote function echo like this:

@ray.remote
def echo(a: int, b: int, c: int):
    """This function prints its input values to stdout."""
    print(a, b, c)

Now, we could simply call the remote function with literals:

echo.remote(1,2,3)

And it'll work.

You can also pass Ray Objects instead:

echo.remote(ray.put(1), ray.put(2), ray.put(3))

The above will also work surprisingly, since Ray implements dereferencing for top-level objects in remote task parameters.

But, say I have nested objects, then there is no automatic de-referencing. You must call .get within the task to use the actual values:

import ray


@ray.remote
def echo_and_get(x_list):  # List[ObjectRef]
    """This function prints its input values to stdout."""
    print("args:", x_list)
    print("values:", ray.get(x_list))


# Put the values (1, 2, 3) into Ray's object store.
a, b, c = ray.put(1), ray.put(2), ray.put(3)

# Passing an object as a nested argument to `echo_and_get`. Ray does not
# de-reference nested args, so `echo_and_get` sees the references.
echo_and_get.remote([a, b, c])
# -> prints args: [ObjectRef(...), ObjectRef(...), ObjectRef(...)]
#           values: [1, 2, 3]

Analogy: Argument passing can happen via value fn(x) or reference fn([x])

The analogy here is something usually identified by "call by value" or "call by reference".

Closure Capture is also available (basically no need to send the params), but the problem is then automatic "free" after usage is not available and it'll wait until the program end to free the memory.

import ray

# Put the values (1, 2, 3) into Ray's object store.
a, b, c = ray.put(1), ray.put(2), ray.put(3)


@ray.remote
def print_via_capture():
    """This function prints the values of (a, b, c) to stdout."""
    print(ray.get([a, b, c]))


# Passing object references via closure-capture. Inside the `print_via_capture`
# function, the global object refs (a, b, c) can be retrieved and printed.
print_via_capture.remote()
# -> prints [1, 2, 3]

Ray Internals: Complex Object Serialization

For doing anything non-trivial, it is these details that make the difference. You may have a relatively complex class object which needs to be sent across CPU/GPU cores, or nodes. How will this be handled?

Under the hood, Ray uses Plasma In-Memory Object Store.

It is designed to share data across process boundaries.

Expensive data serialization/de-serialization is a key performance bottleneck in distributed systems

The core idea is to have a "shared memory" which can be accessed by multiple processes, across multiple nodes. This is sort of like "redis" data-structures, but only it is at a raw memory level

Analogy - Objects are like redis data structure, but with direct bindings in Python

Despite all that, there is still quite a bit of serializing to do, and you have to do a serializing using cloudpickle, a more advanced pickling mechanisms with wide range of object pickling capability.

Numpy arrays can be shared with zero-pickling apparently. So multiple processes can deal with numpy arrays without any sort of extra processing via a shared memory mechanism.

A Few Best Practices From The Documentation

  1. A remote task can call another remote task. This is encouraged. Can increase performance

  2. @remote can yield rather than return large lists

BAD:

import numpy as np


@ray.remote
def large_values(num_returns):
    return [
        np.random.randint(np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8)
        for _ in range(num_returns)
    ]

GOOD:

@ray.remote
def large_values_generator(num_returns):
    for i in range(num_returns):
        yield np.random.randint(
            np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8
        )
        print(f"yielded return value {i}")

Find more Design Patterns and Anti Patterns at the documentation site.

Conclusion

Ray is a modular but a diverse distributed tasks/computing library.

It is easy to get started from a serialized program. Takes only a decorator and a small change in function invocation.

Ray has a strong Remote Object story - allowing for painless scaling.

The documentation is extensive, but also can be a bit daunting to get started with.

Given the scope of the library, I may revisit this topic and additional insights in another post in the future.

What Ray feature do you like the most? Leave a comment sharing your experiences with Ray.