Try Ray with $100 credit — Start now

Welcome to Ray

An open source framework to build and scale your ML and Python applications easily

Scale with Ray

 from typing import Dict import numpy as np import ray # Step 1: Create a Ray Dataset from in-memory Numpy arrays. ds = ray.data.from_numpy(np.asarray(["Complete this", "for me"])) # Step 2: Define a Predictor class for inference. class HuggingFacePredictor: def __init__(self): from transformers import pipeline # Initialize a pre-trained GPT2 Huggingface pipeline. self.model = pipeline("text-generation", model="gpt2") # Logic for inference on 1 batch of data. def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]: # Get the predictions from the input batch. predictions = self.model( list(batch["data"]), max_length=20, num_return_sequences=1) # `predictions` is a list of length-one lists. For example: # [[{"generated_text": "output_1"}], ..., [{"generated_text": "output_2"}]] # Modify the output to get it into the following format instead: # ["output_1", "output_2"] batch["output"] = [sequences[0]["generated_text"] for sequences in predictions] return batch # Use 2 parallel actors for inference. Each actor predicts on a # different partition of data. # Step 3: Map the Predictor over the Dataset to get predictions. predictions = ds.map_batches(HuggingFacePredictor, compute=ray.data.ActorPoolStrategy(size=2)) # Step 4: Show one prediction output. predictions.show(limit=1) 
 from ray.train import ScalingConfig from ray.train.torch import TorchTrainer # Step 1: Set up PyTorch model training as you normally would. def train_func(): model = ... train_dataset = ... for epoch in range(num_epochs): ... # model training logic # Step 2: Set up Ray's PyTorch Trainer to run on 32 GPUs. trainer = TorchTrainer( train_loop_per_worker=train_func, scaling_config=ScalingConfig(num_workers=32, use_gpu=True), datasets={"train": train_dataset}, ) # Step 3: Run distributed model training on 32 GPUs. result = trainer.fit() 
 from ray import tune from ray.train import ScalingConfig from ray.train.lightgbm import LightGBMTrainer train_dataset, eval_dataset = ... # Step 1: Set up Ray's LightGBM Trainer to train on 64 CPUs. trainer = LightGBMTrainer( ... scaling_config=ScalingConfig(num_workers=64), datasets={"train": train_dataset, "eval": eval_dataset}, ) # Step 2: Set up Ray Tuner to run 1000 trials. tuner = tune.Tuner( trainer=trainer, param_space=hyper_param_space, tune_config=tune.TuneConfig(num_samples=1000), ) # Step 3: Run distributed HPO with 1000 trials; each trial runs on 64 CPUs. result_grid = tuner.fit() 
 from io import BytesIO from fastapi import FastAPI from fastapi.responses import Response import torch from ray import serve from ray.serve.handle import DeploymentHandle app = FastAPI() @serve.deployment(num_replicas=1) @serve.ingress(app) class APIIngress: def __init__(self, diffusion_model_handle: DeploymentHandle) -> None: self.handle = diffusion_model_handle @app.get( "/imagine", responses={200: {"content": {"image/png": {}}}}, response_class=Response, ) async def generate(self, prompt: str, img_size: int = 512): assert len(prompt), "prompt parameter cannot be empty" image = await self.handle.generate.remote(prompt, img_size=img_size) file_stream = BytesIO() image.save(file_stream, "PNG") return Response(content=file_stream.getvalue(), media_type="image/png") @serve.deployment( ray_actor_options={"num_gpus": 1}, autoscaling_config={"min_replicas": 0, "max_replicas": 2}, ) class StableDiffusionV2: def __init__(self): from diffusers import EulerDiscreteScheduler, StableDiffusionPipeline model_id = "stabilityai/stable-diffusion-2" scheduler = EulerDiscreteScheduler.from_pretrained( model_id, subfolder="scheduler" ) self.pipe = StableDiffusionPipeline.from_pretrained( model_id, scheduler=scheduler, revision="fp16", torch_dtype=torch.float16 ) self.pipe = self.pipe.to("cuda") def generate(self, prompt: str, img_size: int = 512): assert len(prompt), "prompt parameter cannot be empty" with torch.autocast("cuda"): image = self.pipe(prompt, height=img_size, width=img_size).images[0] return image entrypoint = APIIngress.bind(StableDiffusionV2.bind()) 
 from ray.rllib.algorithms.ppo import PPOConfig # Step 1: Configure PPO to run 64 parallel workers to collect samples from the env. ppo_config = ( PPOConfig() .environment(env="Taxi-v3") .rollouts(num_rollout_workers=64) .framework("torch") .training(model=rnn_lage) ) # Step 2: Build the PPO algorithm. ppo_algo = ppo_config.build() # Step 3: Train and evaluate PPO. for _ in range(5): print(ppo_algo.train()) ppo_algo.evaluate() 

Beyond the basics