Ray: A Distributed Execution Framework for AI

Unified compute layer for AI/ML workloads — tasks for stateless parallelism, actors for stateful services, Plasma for zero-copy object sharing. Covers Q43–Q45.

Ray was developed at UC Berkeley to support reinforcement learning workloads, which require running many environments in parallel, sharing model weights, and coordinating training updates. It has since become a general-purpose distributed computing framework used across ML training, data processing, and serving.

💡 Ray's unified task + actor model

Ray provides two primitives: tasks (stateless remote functions) andactors (stateful remote objects). Together, they enable any distributed computation — from simple map-reduce to complex RL training loops — without building custom distributed systems.

Ray Architecture

Ray has a two-tier architecture: an application layer (user code, tasks, actors) and a system layer (GCS for global state, schedulers for placement, Plasma for object sharing).

Ray System Architecture
Click any component to learn about it
Application Layer
System Layer (Control Plane)
Node 1
W
W
A
Node 2
W
W
W = Worker  |  A = Actor

Click any component above to learn about its role.

Ray's architecture has two main layers: the application layer(driver, workers, actors) and the system layer (GCS for global state, global scheduler for placement, local schedulers for per-node execution, and Plasma object stores for data sharing).

Tasks vs Actors

Click between the two primitives to see their use cases, properties, and code examples.

When to use tasks
  • Pure functions (same input → same output)
  • Embarrassingly parallel workloads
  • Data transformations, preprocessing
  • Hyperparameter search
  • Batch inference
Task properties
  • Decorated with @ray.remote
  • Called with .remote() → returns ObjectRef
  • Results stored in Plasma object store
  • Fault tolerant: just re-execute on failure
  • Can take ObjectRefs as arguments (dependency graph)
ray_tasks.py
1import ray
2ray.init()
3
4# Tasks: stateless remote functions
5@ray.remote
6def square(x: int) -> int:
7    return x * x
8
9# Submit tasks asynchronously — returns ObjectRef immediately
10futures = [square.remote(i) for i in range(10)]
11
12# ray.get() blocks until results are ready
13results = ray.get(futures)
14print(results)  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
15
16# Tasks can depend on each other (pipelining)
17@ray.remote
18def add(a: int, b: int) -> int:
19    return a + b
20
21# Pass ObjectRef directly — Ray handles dependency resolution
22ref1 = square.remote(5)
23ref2 = square.remote(3)
24result_ref = add.remote(ref1, ref2)  # executes after both squares complete
25print(ray.get(result_ref))  # 34

Remote Task Execution: 8-Step Data Path

When you call f.remote(x), here is what happens end-to-end. Click any step to highlight it.

1
Submit task
Driver calls f.remote(x). Returns ObjectRef immediately — non-blocking.
2
Task submitted to local scheduler
Driver's local scheduler receives the task with its dependencies.
3
Object lookup in GCS
Scheduler checks: are all input objects available locally? If not, queries GCS Object Table for their locations.
4
Fetch missing objects
If input objects are on remote nodes, fetch them via gRPC → store in local Plasma.
5
Schedule to worker
Local scheduler assigns task to a free worker process. Worker loads function code (from GCS Function Table if needed).
6
Execute task
Worker reads input objects from local Plasma store (zero-copy via shared memory). Executes the function.
7
Store result in Plasma
Return value serialized and written to local Plasma object store. ObjectRef now resolves to this value.
8
Update GCS Object Table
Object location registered in GCS. Any task waiting on this ObjectRef can now be scheduled.

Reinforcement Learning Use Case

RL was Ray's original motivation. The pattern: many parallel environment actors collect experience; a Parameter Server actor aggregates gradients; the policy is updated and redistributed. This is trivially expressed with Ray actors.

Reinforcement Learning with Ray
Parameter Server (Actor)
holds model weights θ
↓ weights θ
↑ gradients ∇θ
Rollout 1
(Actor)
Env
Rollout 2
(Actor)
Env
Rollout 3
(Actor)
Env
Rollout 4
(Actor)
Env
Rollout workers run environments in parallel, collecting (s, a, r) tuples. Gradients aggregated by PS → update policy → repeat. Ray's actor model makes this pattern trivial to implement.
ray_rl.py
1# Reinforcement Learning with Ray (conceptual)
2import ray
3
4@ray.remote
5class RolloutWorker:
6    """Collects experience from environment"""
7    def __init__(self, env_name: str):
8        import gym
9        self.env = gym.make(env_name)
10        self.policy = None  # weights fetched from PS
11
12    def set_weights(self, weights):
13        self.policy = weights
14
15    def rollout(self, num_steps: int):
16        trajectories = []
17        obs = self.env.reset()
18        for _ in range(num_steps):
19            action = self.policy.act(obs)
20            obs, reward, done, _ = self.env.step(action)
21            trajectories.append((obs, action, reward))
22            if done:
23                obs = self.env.reset()
24        return trajectories
25
26@ray.remote
27class ParameterServer:
28    """Holds and updates model weights"""
29    def __init__(self):
30        self.weights = init_model_weights()
31
32    def get_weights(self):
33        return self.weights
34
35    def apply_gradients(self, gradients):
36        self.weights = optimizer_step(self.weights, gradients)
37
38# Training loop
39ps = ParameterServer.remote()
40workers = [RolloutWorker.remote("CartPole-v1") for _ in range(4)]
41
42for iteration in range(1000):
43    weights = ray.get(ps.get_weights.remote())
44    [w.set_weights.remote(weights) for w in workers]
45
46    # Parallel rollouts
47    trajectories = ray.get([w.rollout.remote(200) for w in workers])
48    gradients = compute_gradients(trajectories)
49    ps.apply_gradients.remote(gradients)

Plasma Object Store

How Plasma enables zero-copy sharing
Shared memory
Plasma stores objects in memory-mapped files. Workers on the same node read objects via mmap — no serialization or copying.
Apache Arrow format
Objects are serialized using Apache Arrow (columnar, zero-copy read). Workers deserialize by pointing into the memory-mapped region.
Cross-node transfer
If a task on Node A needs an object from Node B: Ray fetches it via gRPC, deserializes, and stores in Node A's Plasma. Future tasks on Node A reuse the local copy.
Object lifecycle
Objects are reference-counted. When all ObjectRefs to an object go out of scope, the object is evicted from Plasma (LRU). Large objects spill to disk.

Fault Tolerance: Tasks vs Actors

Fault tolerance comparison between Ray tasks and actors
ScenarioTasksActors
Worker failureRe-execute the task (idempotent)Actor is dead; state is lost
Node failureReschedule task on another nodeActor must be restarted; state lost unless checkpointed
Recovery mechanismTransparent retry (configurable max_retries)Explicit checkpoint + restart; or use a supervision strategy
Ease of recoveryEasy — pure function, just retryHard — must handle state reconstruction
RecommendationUse tasks for pure computeCheckpoint actor state periodically; use ray.util.actor_pool for pooling

Ray Exam Questions (Q43–Q45)