# Skyward > Distributed compute orchestration for ML/AI. Write a normal Python function, decorate it with `@sky.function`, and Skyward provisions cloud GPUs, ships your code over SSH, runs it remotely, and returns the result - behind a synchronous, operator-based API. This is the complete, single-file reference for using Skyward, written for LLMs. Everything below is the public `import skyward as sky` surface, with exact signatures and runnable examples. Where library docstrings disagree with runtime behavior, this document follows runtime behavior and flags the difference in **Gotchas & constraints** (read that section before generating code). Project docs: https://gabfssilva.github.io/skyward/ | Repo: https://github.com/gabfssilva/skyward --- ## What Skyward is & the mental model Skyward turns ordinary Python functions into remote GPU jobs. The model is three steps: 1. **Define lazily.** `@sky.function` wraps a function. Calling it does **not** run anything - it returns a `PendingFunction[T]`, a frozen snapshot of `(fn, args, kwargs)`. 2. **Provision a pool.** `with sky.Compute(...) as pool:` provisions cloud machines on enter and tears them down on exit. 3. **Dispatch with an operator.** `pending >> pool` runs the snapshot on the pool and returns the result. The operator chooses the placement (one node, all nodes, parallel group, async). ```python import skyward as sky @sky.function def train(data): return model.fit(data) with sky.Compute(provider=sky.AWS(), accelerator=sky.accelerators.A100(), nodes=4) as pool: result = train(data) >> pool # runs on one node, returns the result ``` Nothing executes until an operator dispatches a `PendingFunction`. This is the single most important fact about the API. ### How it works under the hood (short) You do not need this to use Skyward; it explains why the API behaves synchronously. - `Compute`/`Session` runs an **asyncio event loop in a background daemon thread**. The public API is synchronous and bridges to that loop with `asyncio.run_coroutine_threadsafe`. - Provisioning, SSH, bootstrap, and the worker bridge are an **actor system** (Casty). You never touch actors directly. - Your function is serialized with **cloudpickle + lz4**, shipped over SSH, and executed by a worker process on each node. The return value is serialized back. - That serialization boundary is the source of most constraints (see **Gotchas & constraints**). --- ## Cheat sheet ```python import skyward as sky @sky.function def work(x): return x * 2 with sky.Compute(provider=sky.RunPod(), accelerator=sky.accelerators.A100(), nodes=4) as pool: a = work(10) >> pool # one node (round-robin) -> T everyone = work(10) @ pool # broadcast to ALL nodes -> list[T] x, y = (work(1) & work(2)) >> pool # parallel group -> tuple fut = work(10) > pool # async, non-blocking -> Future[T] val = fut.result() results = pool.map(work, [1, 2, 3, 4]) # one task per item, ordered -> list[R] n = pool.current_nodes() # ready node count ``` **Operators** (on `PendingFunction`; `pool` may be a pool object, the `sky` singleton, or the `skyward` module): | Expression | Meaning | Returns | |---|---|---| | `task() >> pool` | run on **one** node (round-robin) | `T` | | `task() @ pool` | **broadcast** to every node | `list[T]` (one per node, by node index) | | `task1() & task2() >> pool` | run group in **parallel** | `tuple[...]` | | `task() > pool` | **async**, non-blocking | `Future[T]` | | `sky.gather(*tasks) >> pool` | explicit parallel group (optional streaming) | `tuple` or generator | `&` builds a `PendingFunctionGroup`; only `>>` dispatches a group (no `@`/`>` on groups). **Node spec** (`nodes=`): `4` fixed | `(2, 8)` elastic (start 2, scale to 8) | `sky.Nodes(desired=8, min=4)` partial readiness (start at 4) | `sky.Nodes(desired=4, min=2, max=16)` elastic + early start. **Accelerators** (Python code): always an object - `sky.accelerators.A100()`, `sky.accelerators.H100(count=8)`. A bare string `"A100"` works **only** in `skyward.toml`, not in Python. **Allocation** (`allocation=`): `"spot-if-available"` (default) | `"spot"` | `"on-demand"` | `"cheapest"`. **Running it so an agent sees live progress** (don't foreground-block, don't `tail -f`): set `console="log"`, background to a file, poll the file by offset. ```python options=sky.Options(console="log") # in the script; logging stays on by default ``` ```bash PYTHONUNBUFFERED=1 python -u train.py > run.log 2>&1 & # then read run.log incrementally (new lines only) ``` `run.log` is your live feed; Skyward also keeps a full rotating DEBUG log at `~/.skyward/logs/skyward.log` (default). Full pattern + CLI/server alternative: see **Running from an agent (live feedback, no blind spots)**. --- ## Install & the single import ```bash pip install skyward # or: uv add skyward ``` ```python import skyward as sky ``` Requires Python 3.12+ (3.12, 3.13, 3.14). Always use the `sky` alias; everything public hangs off it (`sky.function`, `sky.Compute`, `sky.AWS`, `sky.accelerators.*`, `sky.plugins.*`, `sky.dict`, ...). Do not import from internal modules like `skyward.actors.*` or `skyward.core.*`. --- ## Defining work: `@sky.function` ```python def function[**P, T](fn=None, *, timeout: float | None = None) ``` Marks a function for remote execution. Calling the decorated function returns a `PendingFunction[T]` (a frozen `dataclass` of `fn`, `args`, `kwargs`, `timeout`) - it does **not** execute locally. ```python @sky.function def train(data): return model.fit(data) @sky.function(timeout=600) # per-call wall-clock timeout in seconds def long_train(data): return model.fit(data) ``` **Per-call timeout override** - `PendingFunction.with_timeout(seconds)` returns a new pending with the timeout applied: ```python result = train(data).with_timeout(600) >> pool ``` Use `sky.time` helpers for readable durations (they return seconds as floats): `sky.time.minutes(10)`, `sky.time.hours(2)`, `sky.time.days(1)`. What crosses to the remote worker: the function plus its captured args/kwargs and closure, serialized with cloudpickle+lz4. Keep arguments picklable and reasonably sized (see **Gotchas & constraints**). --- ## Provisioning a pool: `sky.Compute` ```python @contextmanager def Compute(*specs: Spec, name: str | None = None, options: Options = ..., **kwargs) -> Pool ``` `Compute` is the single-pool convenience context manager. It has **three mutually exclusive** calling conventions: **1. Flat kwargs** (most common - `Spec` fields passed directly): ```python with sky.Compute(provider=sky.AWS(), accelerator=sky.accelerators.A100(), nodes=4) as pool: result = train(data) >> pool ``` **2. Explicit specs** (multi-provider fallback - pass one or more `Spec` objects positionally): ```python with sky.Compute( sky.Spec(provider=sky.VastAI(), accelerator=sky.accelerators.A100()), sky.Spec(provider=sky.AWS(), accelerator=sky.accelerators.A100()), options=sky.Options(selection="cheapest"), ) as pool: result = train(data) >> pool ``` **3. Named pool** (load `[pools.]` from `skyward.toml`): ```python with sky.Compute(name="train") as pool: result = train(data) >> pool ``` Rules: you may **not** mix flat kwargs with positional `Spec` objects, and `name=` cannot be combined with specs/kwargs. `selection` and operational knobs live on `Options`, never as direct `Compute` arguments (see **Pool options**). The pool provisions on `__enter__` and is destroyed on `__exit__`. After exit, instances linger for `Spec.ttl` seconds (default 600) before auto-shutdown, then are torn down; `ttl=0` disables the grace period. There is also a lower-level `sky.Session` if you need to manage multiple pools or pool lifetime manually; `Compute` is the right default. --- ## Dispatch operators Operators are defined on `PendingFunction` and `PendingFunctionGroup`. The right-hand side is the dispatch **target**: a pool object, the `sky` singleton, or the `skyward` module. Inside a `with sky.Compute(...) as pool:` block, `>> pool` and `>> sky` are equivalent - `sky` resolves to the active pool via a `ContextVar`. ```python result = train(data) >> pool # __rshift__ -> T run on one node (round-robin) future = train(data) > pool # __gt__ -> Future[T] async; future.result() blocks results = train(data) @ pool # __matmul__ -> list[T] broadcast; one result per node group = task1() & task2() # __and__ -> PendingFunctionGroup a, b = group >> pool # group __rshift__ -> tuple run all in parallel ``` **Pinning placement** - the pool methods backing the operators accept a `target`: ```python result = pool.run(train(data), target="head") # run on the head node specifically result = pool.run(train(data), target=2) # run on rank 2 ``` **Pool method equivalents** (use when you need parameters the operators don't expose): | Method | Operator equivalent | Returns | |---|---|---| | `pool.run(pending, *, target=None)` | `pending >> pool` | `T` | | `pool.run_async(pending, *, target=None)` | `pending > pool` | `Future[T]` | | `pool.broadcast(pending)` | `pending @ pool` | `list[T]` | | `pool.run_parallel(group)` | `group >> pool` | `tuple` or generator | | `pool.map(fn, items)` | - | `list[R]` (input order) | | `pool.current_nodes()` | - | `int` | | `pool.resize(n)` / `pool.resize(sky.Nodes(...))` | - | `None` | `pool.map(fn, items)` wraps `fn` in `@sky.function` automatically, submits one task per item round-robin, and returns results in input order. `fn` does not need to be pre-decorated. **Explicit grouping with `gather`** - equivalent to chaining `&`, with optional streaming: ```python def gather(*pendings, stream: bool = False, ordered: bool = True) -> PendingFunctionGroup results = sky.gather(a(), b(), c()) >> pool # tuple, all at once for r in sky.gather(a(), b(), stream=True) >> pool: # generator, as each completes print(r) ``` `ordered=True` (default) preserves submission order while streaming; it is ignored when `stream=False`. --- ## Hardware spec A `Spec` describes what to provision. In the flat-kwargs `Compute(...)` form you pass these fields directly; in multi-spec form you build `Spec` objects. ```python @dataclass(frozen=True, slots=True) class Spec: provider: ProviderConfig # required: sky.AWS(), sky.RunPod(), ... accelerator: Accelerator | None = None # sky.accelerators.A100() - NOT a string nodes: NodeSpec = 1 # int | (low, high) | sky.Nodes(...) vcpus: float | None = None # minimum vCPUs per node memory_gb: float | None = None # minimum RAM per node disk_gb: int | None = None architecture: Architecture | None = None # "x86_64" | "arm64" allocation: AllocationStrategy = "spot-if-available" image: Image = Image() # environment (python, pip, apt, env, ...) region: str | None = None max_hourly_cost: float | None = None # USD/node/hour cap ttl: int = 600 # auto-shutdown seconds after pool exits; 0 disables volumes: tuple[Volume, ...] = () plugins: tuple[Plugin, ...] = () ``` **Accelerators are callable factories**, accessed via `sky.accelerators.(...)`. Every factory is keyword-only and accepts `count: float = 1`; some accept a `memory` literal and/or `form_factor`. ```python sky.accelerators.A100() # 80GB, count=1 sky.accelerators.A100(memory="40GB") # 40GB variant sky.accelerators.H100(count=8) # 8x H100 per node sky.accelerators.H100(form_factor="SXM") sky.accelerators.Custom("My-GPU", memory="48GB", count=2) ``` Common names: `H100`, `H200`, `B200`, `A100`, `A10G`, `A40`, `L4`, `L40S`, `T4`, `V100`, `RTX_4090`, `RTX_A6000`, `MI300X`. Many more exist (full NVIDIA/AMD/AWS/TPU catalog); see https://gabfssilva.github.io/skyward/accelerators/. CPU-only: leave `accelerator=None`. > **Do not pass `accelerator="A100"` (a string) in Python.** The constructor accepts it without error but it breaks at provision time (`'str' object has no attribute 'name'`). Use `sky.accelerators.A100()`. The string form is valid **only** inside `skyward.toml`. **Allocation strategies** (`allocation=`): | Value | Behavior | |---|---| | `"spot-if-available"` | default - spot when available, else on-demand | | `"spot"` | spot/preemptible only (cheapest, can be reclaimed) | | `"on-demand"` | on-demand only (stable, pricier) | | `"cheapest"` | whichever is cheaper per offer | Cap cost with `max_hourly_cost=2.5` (USD per node per hour). --- ## Node topology & scaling `type NodeSpec = int | tuple[int, int] | Nodes` ```python @dataclass(frozen=True, slots=True) class Nodes: desired: int # target count (>= 1) max: int | None = None # ceiling; enables autoscaling when set min: int | None = None # count needed before the pool is usable (partial readiness) ``` | `nodes=` | Meaning | |---|---| | `4` | exactly 4 nodes; pool is ready when all 4 are up | | `(2, 8)` | elastic: start at 2, autoscale up to 8 (tuple maps to `Nodes(desired=2, max=8)`) | | `sky.Nodes(desired=8, min=4)` | partial readiness: usable at 4, keeps growing to 8 | | `sky.Nodes(desired=4, min=2, max=16)` | elastic **and** early start | **Partial readiness** lets work begin before every node is provisioned (`min` ready is enough). **Elastic** pools autoscale between the floor and `max` based on pending-task pressure. `pool.current_nodes()` returns how many are ready right now; `pool.resize(...)` changes the target at runtime. ```python with sky.Compute(provider=sky.AWS(), accelerator=sky.accelerators.A100(), nodes=sky.Nodes(desired=8, min=4)) as pool: results = train(data) @ pool # starts on 4 nodes, broadcasts as more join ``` --- ## Multi-provider fallback Pass multiple `Spec` objects to provision from whichever provider wins under the `selection` strategy. Useful for finding capacity/price across clouds. ```python with sky.Compute( sky.Spec(provider=sky.VastAI(), accelerator=sky.accelerators.A100()), sky.Spec(provider=sky.RunPod(), accelerator=sky.accelerators.A100()), sky.Spec(provider=sky.AWS(), accelerator=sky.accelerators.A100()), options=sky.Options(selection="cheapest"), # or "first" ) as pool: result = train(data) >> pool ``` `selection="cheapest"` (default) picks the lowest-priced viable spec; `"first"` tries them in order. To preview prices before provisioning, see **Querying offers & pricing**. --- ## Runtime API (inside functions) These are called **inside** a `@sky.function` while it runs on a remote worker. ### `sky.instance_info() -> InstanceInfo` Returns this worker's place in the cluster. `InstanceInfo` is a Pydantic model: ```python info = sky.instance_info() info.node # int - this node's index (0..total_nodes-1) info.worker # int - worker index within the node (default 0) info.total_nodes # int info.workers_per_node # int - e.g. 2 with a MIG 3g.40gb profile info.accelerators # float - accelerators on this node info.total_accelerators # float - across the pool info.head_addr # str - head node IP info.head_port # int - head coordination port info.job_id # str - unique id for this pool execution info.peers # list[PeerInfo] -> {node, private_ip, public_ip} info.accelerator # AcceleratorInfo | None -> {type, count, memory_gb, is_trainium} info.network # NetworkInfo -> {interface, bandwidth_gbps} # computed properties: info.global_worker_index # int - node*workers_per_node + worker info.total_workers # int - total_nodes*workers_per_node info.is_head # bool - global_worker_index == 0 info.hostname # str ``` `sky.is_head(info)` is a free function equivalent to `info.is_head` - true on the single head worker. Use it to do head-only work (logging, saving a checkpoint, seeding a dataset): ```python @sky.function def step(batch): info = sky.instance_info() if sky.is_head(info): print(f"node {info.node}/{info.total_nodes}") return compute(batch) ``` ### Output control Per-worker stdout/stderr filtering, so broadcast jobs don't flood your terminal. ```python @sky.stdout(only="head") # decorator FACTORY - call with only= @sky.function def train(data): print("only the head node prints this") # other nodes' stdout is suppressed return model.fit(data) ``` - `sky.stdout(only=...)` / `sky.stderr(only=...)` - decorator factories. `only=` is `"head"` or a predicate `Callable[[InstanceInfo], bool]`. - `sky.silent` - a **bare** decorator (no call) that silences both streams. - `sky.redirect_output(callback)` - a context manager that routes stdout+stderr to `callback(str)`; yields the `(stdout, stderr)` `CallbackWriter` pair. ```python @sky.stdout(only=lambda i: i.node < 2) # only nodes 0 and 1 print @sky.function def task(): ... lines = [] with sky.redirect_output(lines.append): print("captured") ``` --- ## Data sharding & explicit parallelism `sky.shard(...)` splits data so each node gets a disjoint slice, **preserving input type** (`list`, `tuple`, `np.ndarray`, `torch.Tensor`, any `Sequence`). ```python def shard(*data, shuffle=False, seed=0, drop_last=False, node: int | None = None, total_nodes: int | None = None) ``` ```python @sky.function def train_shard(dataset): my_part = sky.shard(dataset, shuffle=True, seed=42) # same type as dataset, this node's slice return model.fit(my_part) results = train_shard(full_dataset) @ pool # broadcast: each node trains its own shard ``` - `shuffle=True` uses a seed synchronized across nodes so slices stay disjoint. - `drop_last=True` trims the tail so every node gets an equal count. - Multiple arrays share the same indices (returns a tuple of shards): ```python x_shard, y_shard = sky.shard(x_train, y_train) # aligned slices ``` `sky.gather(...)` (see operators) is the explicit-parallelism counterpart for running a set of pendings concurrently. --- ## Image & worker environment `Image` declares the remote environment. Defaults (`sky.DEFAULT_IMAGE`) match your local Python and ship Skyward automatically. ```python @dataclass(frozen=True, slots=True) class Image: python: str = "auto" # "auto" detects local interpreter, e.g. "3.13" pip: tuple[str, ...] = () # packages installed with uv pip_indexes: tuple[PipIndex, ...] = () # scoped custom indexes apt: tuple[str, ...] = () # system packages (apt-get) env: dict[str, str] = {} # env vars exported on workers shell_vars: dict[str, str] = {} # shell snippets evaluated at bootstrap -> env includes: tuple[str, ...] = () # local dirs/.py files to sync to workers excludes: tuple[str, ...] = () # glob patterns to skip within includes skyward_source: str = "auto" # "auto"|"local"|"github"|"pypi" metrics: ... = Default() # telemetry to collect; None disables bootstrap_timeout: int = 300 ``` ```python image = sky.Image( python="3.13", pip=["torch", "transformers", "datasets"], apt=["git", "ffmpeg"], env={"HF_HOME": "/data/hf"}, includes=["./src"], # ship your local package alongside the function ) with sky.Compute(provider=sky.AWS(), accelerator=sky.accelerators.A100(), image=image) as pool: ... ``` **Scoped package index** (`PipIndex`) - route specific packages to a custom index (e.g. PyTorch CUDA wheels): ```python sky.Image( pip=["torch", "torchvision"], pip_indexes=[sky.PipIndex(url="https://download.pytorch.org/whl/cu121", packages=["torch", "torchvision"])], ) ``` > `Image` has **no** base-Docker-image field. Custom Docker base images are set per-provider (e.g. `RunPod(container_image=...)`, `VastAI(docker_image=...)`, `Novita(docker_image=...)`) using `sky.containers`. ### Custom Docker base images (`sky.containers`) ```python sky.containers.cuda("12.9", variant="devel", ubuntu="24.04") # nvidia/cuda:... sky.containers.ubuntu("24.04") sky.containers.pytorch("2.8", cuda="12.8") # NGC pytorch image sky.containers.runpod_pytorch("2.8") img = sky.containers.cuda("12.9") with sky.Compute(provider=sky.RunPod(container_image=img), accelerator=sky.accelerators.A100()) as pool: ... ``` These return a `DockerImage` (also `sky.DockerImage`). `sky.containers.DockerImage.of("repo/tag", cuda="12.8", ubuntu="22.04")` wraps an arbitrary tag. ### Worker concurrency & executor `Worker` (set via `Options(worker=...)`) controls how many tasks run per node and the execution backend. ```python @dataclass(frozen=True, slots=True) class Worker: concurrency: int = 1 # concurrent task slots per node buffer: int = 0 # extra prefetched slots (throughput for short tasks) executor: str = "auto" # "auto"->"thread" | "thread" | "process" reuse_processes: bool = True # only for executor="process" ``` ```python opts = sky.Options(worker=sky.Worker(concurrency=4, executor="thread")) ``` - `"thread"` (default): lowest overhead; distributed collections work via shared memory; supports streaming. Best for I/O-bound or GPU work that releases the GIL. - `"process"`: bypasses the GIL for CPU-bound pure-Python work; distributed collections work via an IPC bridge. Set `reuse_processes=False` for a fresh subprocess per task (workloads that leak CUDA/GPU state). --- ## Storage & volumes Two distinct mechanisms - pick by access pattern. ### `Volume` - mount a bucket as a filesystem (FUSE) Objects appear as files on the worker. Best for reading datasets / writing checkpoints with normal file I/O. ```python @dataclass(frozen=True, slots=True) class Volume: bucket: str mount: str # absolute path; NOT /, /opt, /opt/skyward, /root, /tmp prefix: str = "" # subfolder within the bucket read_only: bool = True storage: Storage | None = None # backend override; auto-detected from provider if None ``` ```python with sky.Compute(provider=sky.AWS(), accelerator=sky.accelerators.A100(), volumes=[sky.Volume(bucket="my-data", mount="/data")]) as pool: # inside @sky.function, read /data/... like a local path ... ``` ### `Storage` - programmatic object-store client A small S3-compatible client used as a **context manager**. Best for explicit uploads/downloads from your own code. ```python @dataclass(frozen=True, slots=True) class Storage: endpoint: str access_key: Credential | None = None # str or () -> str | Awaitable[str] secret_key: Credential | None = None path_style: bool = False ``` Methods (inside `with storage:`): `upload(bucket, local_path, *, key=None)`, `download(bucket, key, local_path)`, `ls(bucket, prefix="") -> list[str]`, `exists(bucket, key) -> bool`, `rm(bucket, key)`. Method names are `upload`/`download`/`ls`/`exists`/`rm` - **not** put/get/list/delete. ```python with sky.storage.S3(region="us-east-1") as s3: s3.upload("my-bucket", "model.pt", key="checkpoints/model.pt") keys = s3.ls("my-bucket", prefix="checkpoints/") s3.download("my-bucket", "checkpoints/model.pt", "local/model.pt") ``` Presets (all keyword-only, return `Storage`): `sky.storage.S3(region=...)`, `sky.storage.R2(account_id=, access_key=, secret_key=)`, `sky.storage.GCS(access_key=, secret_key=)`, `sky.storage.Wasabi(region=, access_key=, secret_key=)`, `sky.storage.Backblaze(region=, key_id=, app_key=)`, `sky.storage.Hyperstack(api_key=, region=)`. --- ## Distributed collections Cluster-wide shared state, accessed by name from inside a `@sky.function`. Backed by the actor system; returned objects are synchronous proxies. Call factories **inside** running tasks (off-pool they raise "No active pool"). | Factory | Signature | Proxy methods | |---|---|---| | `sky.dict(name, *, consistency=None)` | distributed dict | `d[k]`, `d[k]=v`, `del d[k]`, `k in d`, `get(k, default=None)`, `update(items)`, `pop(k, default=None)` | | `sky.set(name, *, consistency=None)` | distributed set | `add(v)`, `discard(v)`, `v in s`, `len(s)` | | `sky.counter(name, *, consistency=None)` | distributed counter | `.value`, `increment(n=1)`, `decrement(n=1)`, `reset(value=0)`, `int(c)` | | `sky.queue(name)` | FIFO queue | `put(v)`, `get(timeout=None)`, `empty()`, `len(q)` | | `sky.barrier(name, n)` | barrier for `n` participants | `wait()` | | `sky.lock(name, timeout=30)` | mutex | `acquire()`, `release()`, or `with lock:` | ```python @sky.function def worker(item): processed = sky.counter("processed") seen = sky.set("seen") if item in seen: return None seen.add(item) processed.increment() with sky.lock("checkpoint"): save_progress() return item # coordinate a synchronized step across all 4 nodes: @sky.function def phased(): b = sky.barrier("sync", n=4) do_part_one() b.wait() # blocks until all 4 nodes arrive do_part_two() ``` **Method-name gotchas** (these differ from stdlib): set removal is `discard` (no `remove`); queue uses `empty()` and `len(q)` (no `qsize()`); `dict` has no `items()`/`keys()`/`len()`. Each method also has an `_async` variant (`get_async`, `increment_async`, ...). **Consistency** (`type Consistency = Literal["strong", "eventual"]`): - `"strong"` - linearizable; every op round-trips to the head node, applied in total order. Highest correctness, higher latency. - `"eventual"` - reads may be stale; writes batched/propagated asynchronously. Lower latency. Collections work under both the thread and process executors (process uses an IPC bridge to the parent worker); your code is identical either way. --- ## Distributed training patterns Multi-node training uses three pieces together: a framework **plugin** (sets up env vars / launcher), **broadcast** (`@`) to run the training function on every node, and `instance_info()` for rank/world/rendezvous. **PyTorch DDP** - the `torch` plugin configures `torch.distributed` env vars (master addr/port, rank, world size) per node: ```python import skyward as sky @sky.function def train(): import torch.distributed as dist dist.init_process_group(backend="nccl") # reads env set by the torch plugin info = sky.instance_info() # ... build DDP model, train on this rank ... dist.destroy_process_group() return "done" if sky.is_head(info) else None with sky.Compute( provider=sky.RunPod(), accelerator=sky.accelerators.A100(count=8), nodes=4, plugins=[sky.plugins.torch(backend="nccl", cuda="cu128")], ) as pool: train() @ pool # broadcast: every rank participates ``` **FSDP / DeepSpeed via Accelerate** - `sky.plugins.accelerate(...)` wraps the worker with `accelerate launch`; `num_processes` is total across the cluster (defaults to the node count). **JAX**: `sky.plugins.jax(cuda=...)` configures multi-host. **Keras 3**: `sky.plugins.keras(backend="jax"|"torch"|"tensorflow")`. Worked examples: https://gabfssilva.github.io/skyward/distributed-training/ and the guides under https://gabfssilva.github.io/skyward/guides/pytorch-distributed/. --- ## Plugin system Plugins compose environment setup and execution wrappers. Pass them at pool construction via `plugins=[...]`. **Built-ins** (`sky.plugins.*`): | Plugin | Signature | |---|---| | `torch` | `torch(*, backend=None, cuda="cu128", version="latest", vision=None, audio=None)` | | `jax` | `jax(*, cuda="cu124")` | | `keras` | `keras(*, backend="jax")` (`"jax"`/`"torch"`/`"tensorflow"`) | | `cuml` | `cuml(*, cuda="cu12")` | | `joblib` | `joblib(*, version=None)` | | `sklearn` | `sklearn(*, version=None)` | | `mig` | `mig(profile)` (positional, e.g. `"3g.40gb"`) | | `mps` | `mps(*, active_thread_percentage=None, pinned_memory_limit=None)` | | `accelerate` | `accelerate(config=None)` (config mirrors `accelerate config` YAML) | ```python with sky.Compute( provider=sky.AWS(), accelerator=sky.accelerators.A100(), nodes=2, plugins=[sky.plugins.torch(backend="nccl"), sky.plugins.accelerate()], ) as pool: ... ``` (`mig` partitions a GPU into multiple workers per node; `mps` enables GPU sharing between concurrent tasks; `accelerate` wraps the worker with `accelerate launch` for FSDP/DeepSpeed.) There is **no** dedicated `huggingface` plugin - for Transformers/Datasets work, install them via the image (`sky.Image(pip=["transformers", "datasets", "accelerate"])`) and use the `accelerate` plugin for distributed runs. **Custom plugins** - a `Plugin` is a frozen dataclass with seven optional hooks; build one with `Plugin.create(name)` and `with_*` builders: ```python from skyward.plugins import Plugin my_plugin = ( Plugin.create("my-plugin") .with_image_transform(lambda image, cluster: image) # (Image, Cluster) -> Image .with_bootstrap(lambda cluster: ("echo hi",)) # Cluster -> tuple[Op, ...] (shell ops) .with_decorator(lambda fn: fn) # wrap each task on the worker ) ``` | Builder | Hook field | When it runs | |---|---|---| | `with_image_transform(fn)` | `transform: (Image, Cluster) -> Image` | before bootstrap, edits the Image | | `with_bootstrap(fn)` | `bootstrap: Cluster -> tuple[Op, ...]` | injects shell ops after bootstrap | | `with_decorator(fn)` | `decorate: (fn) -> fn` | per-task wrapper on the remote worker | | `with_around_app(fn)` | `around_app: InstanceInfo -> ContextManager` | worker process lifecycle (once) | | `with_around_process(fn)` | `around_process: InstanceInfo -> ContextManager` | per executor subprocess | | `with_around_client(fn)` | `around_client: (Pool, Cluster) -> ContextManager` | client side, at pool enter | | `with_launcher(fn)` | `launcher: (LaunchCommand, LaunchContext) -> LaunchCommand` | rewrites the worker launch command | --- ## Pool options & reliability `Options` groups all operational tuning. Defaults are sensible; override only what you need. Pass via `Compute(..., options=sky.Options(...))`. ```python @dataclass(frozen=True, slots=True) class Options: selection: SelectionStrategy = "cheapest" # multi-spec: "cheapest" | "first" worker: Worker | None = None # concurrency / executor provision_timeout: int | None = None # seconds to wait for an instance ssh_timeout: int | None = None bootstrap_timeout: int | None = None provision_retry_delay: float | None = None max_provision_attempts: int | None = None ssh_retry_interval: int | None = None default_compute_timeout: float = 300.0 # default per-task timeout autoscale_cooldown: float = 30.0 autoscale_idle_timeout: float = 60.0 # idle seconds before scaling down reconcile_tick_interval: float = 15.0 shutdown_timeout: float = 120.0 console: bool | ConsoleMode = True # "rich"|"minimal"|"log"|"silent" logging: LogConfig | bool = True cluster: bool | None = None # form a Casty cluster (enables collections) retry_on_interruption: int = 3 # retries per task on spot preemption health_checker: HealthChecker | None = None # periodic remote probe -> node replacement ``` ```python opts = sky.Options( selection="cheapest", worker=sky.Worker(concurrency=4), provision_timeout=600, retry_on_interruption=5, console="minimal", ) with sky.Compute(provider=sky.VastAI(), accelerator=sky.accelerators.A100(), nodes=4, options=opts) as pool: ... ``` **Spot resilience**: when a spot instance is preempted, Skyward replaces the node and retries affected tasks up to `retry_on_interruption` times. Set `cluster=False` to run nodes independently (disables distributed collections); leave it `None`/`True` for coordinated work. `health_checker` runs a predicate on each node and replaces nodes that fail repeatedly. --- ## Observability: events, metrics, logging **Events** - the pool emits a stream of frozen-dataclass events you can observe (the Rich console is built on this). Event types are re-exported on `sky` and include lifecycle (`ClusterRequested`, `ClusterProvisioned`, `ClusterReady`, `ClusterDestroyed`, `InstanceProvisioned`, `InstanceBootstrapped`, `InstancePreempted`, `InstanceReplaced`, `InstanceDestroyed`), task events (`TaskStarted`, `TaskCompleted`), and telemetry (`Metric`, `Log`, `Error`). The console is controlled by `Options.console` (`True`/`"rich"`/`"minimal"`/`"log"`/`"silent"`). **Metrics** - `sky.metrics` provides telemetry collectors attached to `Image.metrics` (collected on workers, streamed back as `Metric` events): ```python img = sky.Image(metrics=[ sky.metrics.CPU(interval=1.0), sky.metrics.GPU(interval=2.0), sky.metrics.GPUMemory(), sky.metrics.Disk("/data"), ]) sky.Image(metrics=sky.metrics.Default()) # the default set sky.Image(metrics=None) # disable ``` Factories: `CPU`, `Memory`, `MemoryUsed`, `MemoryTotal`, `GPU`, `GPUMemory`, `GPUMemoryTotal`, `GPUTemp`, `Disk`, `NetworkRx`, `NetworkTx`, `Custom(name, command, interval=3)`, `Default(...)`. **Logging** - `Options.logging` accepts `True` (defaults), `False` (off), or a `LogConfig`: ```python @dataclass(frozen=True, slots=True) class LogConfig: level: str = "INFO" # "DEBUG"|"INFO"|"WARNING"|"ERROR"|"TRACE" file: str = "~/.skyward/logs/skyward.log" console: bool = True rotation: str = "50 MB" # size ("50 MB") or time ("1 day") retention: int = 10 # rotated files kept opts = sky.Options(logging=sky.LogConfig(level="DEBUG", console=False)) ``` --- ## Running from an agent (live feedback, no blind spots) Skyward streams events and remote stdout/stderr continuously. Two defaults work against an automated agent: the **Rich console** uses a live-redrawing layout (cursor moves) that is unreadable once captured to a file or pipe, and running a script in the **foreground from a single command blocks until it finishes and dumps all output at once**. To follow progress while a run is in flight, use the log console and poll a file. When stderr is not a TTY (piped or redirected), Skyward automatically falls back to the **log** console: plain append-only lines `HH:MM:SS