Ray: A Distributed Execution Framework for AI
Unified compute layer for AI/ML workloads — tasks for stateless parallelism, actors for stateful services, Plasma for zero-copy object sharing. Covers Q43–Q45.
Ray was developed at UC Berkeley to support reinforcement learning workloads, which require running many environments in parallel, sharing model weights, and coordinating training updates. It has since become a general-purpose distributed computing framework used across ML training, data processing, and serving.
💡 Ray's unified task + actor model
Ray Architecture
Ray has a two-tier architecture: an application layer (user code, tasks, actors) and a system layer (GCS for global state, schedulers for placement, Plasma for object sharing).
Click any component above to learn about its role.
Ray's architecture has two main layers: the application layer(driver, workers, actors) and the system layer (GCS for global state, global scheduler for placement, local schedulers for per-node execution, and Plasma object stores for data sharing).
Tasks vs Actors
Click between the two primitives to see their use cases, properties, and code examples.
- Pure functions (same input → same output)
- Embarrassingly parallel workloads
- Data transformations, preprocessing
- Hyperparameter search
- Batch inference
- Decorated with @ray.remote
- Called with .remote() → returns ObjectRef
- Results stored in Plasma object store
- Fault tolerant: just re-execute on failure
- Can take ObjectRefs as arguments (dependency graph)
1import ray
2ray.init()
3
4# Tasks: stateless remote functions
5@ray.remote
6def square(x: int) -> int:
7 return x * x
8
9# Submit tasks asynchronously — returns ObjectRef immediately
10futures = [square.remote(i) for i in range(10)]
11
12# ray.get() blocks until results are ready
13results = ray.get(futures)
14print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
15
16# Tasks can depend on each other (pipelining)
17@ray.remote
18def add(a: int, b: int) -> int:
19 return a + b
20
21# Pass ObjectRef directly — Ray handles dependency resolution
22ref1 = square.remote(5)
23ref2 = square.remote(3)
24result_ref = add.remote(ref1, ref2) # executes after both squares complete
25print(ray.get(result_ref)) # 34Remote Task Execution: 8-Step Data Path
When you call f.remote(x), here is what happens end-to-end. Click any step to highlight it.
Reinforcement Learning Use Case
RL was Ray's original motivation. The pattern: many parallel environment actors collect experience; a Parameter Server actor aggregates gradients; the policy is updated and redistributed. This is trivially expressed with Ray actors.
holds model weights θ
(Actor)
(Actor)
(Actor)
(Actor)
1# Reinforcement Learning with Ray (conceptual)
2import ray
3
4@ray.remote
5class RolloutWorker:
6 """Collects experience from environment"""
7 def __init__(self, env_name: str):
8 import gym
9 self.env = gym.make(env_name)
10 self.policy = None # weights fetched from PS
11
12 def set_weights(self, weights):
13 self.policy = weights
14
15 def rollout(self, num_steps: int):
16 trajectories = []
17 obs = self.env.reset()
18 for _ in range(num_steps):
19 action = self.policy.act(obs)
20 obs, reward, done, _ = self.env.step(action)
21 trajectories.append((obs, action, reward))
22 if done:
23 obs = self.env.reset()
24 return trajectories
25
26@ray.remote
27class ParameterServer:
28 """Holds and updates model weights"""
29 def __init__(self):
30 self.weights = init_model_weights()
31
32 def get_weights(self):
33 return self.weights
34
35 def apply_gradients(self, gradients):
36 self.weights = optimizer_step(self.weights, gradients)
37
38# Training loop
39ps = ParameterServer.remote()
40workers = [RolloutWorker.remote("CartPole-v1") for _ in range(4)]
41
42for iteration in range(1000):
43 weights = ray.get(ps.get_weights.remote())
44 [w.set_weights.remote(weights) for w in workers]
45
46 # Parallel rollouts
47 trajectories = ray.get([w.rollout.remote(200) for w in workers])
48 gradients = compute_gradients(trajectories)
49 ps.apply_gradients.remote(gradients)Plasma Object Store
Fault Tolerance: Tasks vs Actors
| Scenario | Tasks | Actors |
|---|---|---|
| Worker failure | Re-execute the task (idempotent) | Actor is dead; state is lost |
| Node failure | Reschedule task on another node | Actor must be restarted; state lost unless checkpointed |
| Recovery mechanism | Transparent retry (configurable max_retries) | Explicit checkpoint + restart; or use a supervision strategy |
| Ease of recovery | Easy — pure function, just retry | Hard — must handle state reconstruction |
| Recommendation | Use tasks for pure compute | Checkpoint actor state periodically; use ray.util.actor_pool for pooling |