jax.experimental.host_callback module

Primitives for calling Python functions on the host from JAX accelerator code.

Experimental: please give feedback, and expect changes.

This module introduces the host callback functions call(), id_tap(), and id_print(), that send their arguments from the device to the host and invoke user-defined Python functions on the host, optionally returning results back to the device computation.

We show below how these functions can be used. We start with call(), and we discuss examples of calling from JAX to arbitrary Python functions on the CPU, e.g., to use NumPy CPU custom kernels. Then we show uses of id_tap() and id_print(), which have the restriction that they cannot return values from the host to the device. These primitives are generally faster because they are executed asynchronously with the device code. In particular, they can be used to tap into and to debug JAX code.

Using call() to call a host function and return results to device

Use call() to invoke a computation on the host and return NumPy arrays to the device computation. Host computation is useful, e.g., when a device computation needs some data that requires I/O on the host, or it needs a library that is available on the host and you do not want to code it in JAX. For example, eigen decomposition for general matrices in JAX does not work on TPU. We can call the Numpy implementation from any JAX accelerator computation, using a host computation:

# This function runs on the host
def host_eig(m: np.ndarray) -> np.ndarray:
  return np.linalg.eigvals(m)

# This function is used in JAX
def device_fun(m):
  # We send "m" to the host, asking it to call "host_eig" and return the result.
  # We have to specify the result shape and dtype, either in the form of an
  # example return value or any object that has `shape` and `dtype` attributes,
  # e.g., a NumPy array or a `jax.ShapeDtypeStruct`.
  return hcb.call(host_eig, m,
                  # Given an input of shape (..., d, d), eig output has shape (..., d)
                  result_shape=jax.ShapeDtypeStruct(m.shape[:-1], m.dtype))

The call() function and the Python host function both take a single argument and return a single result, but those can be pytrees. Note that we must tell the call() what shape and dtype to expect from the host invocation, using the result_shape keyword argument. This is important because the device code is compiled with that expectation. There will be an error raised at runtime if the actual invocation produces a different result shape. In general, such errors and also exceptions raised by the host computation may be difficult to debug. See the Debugging section below. This is a problem for call() but not for id_tap() because for the latter the decice code does not expect a returned value.

The call() API can be used inside a jit or pmap computation or inside cond/scan/while control flow. When used inside jax.pmap(), there will be separate calls to the host from each of the participating devices:

def host_sin(x, *, device):
  # The ``device`` argument is passed due to ``call_with_device=True`` below.
  print(f"Invoking host_sin with {x.shape} on {device}")
  return np.sin(x)

# Use pmap to run the computation on two devices
jax.pmap(lambda x: hcb.call(host_sin, x,
                            result_shape=x,
                            # Ask that the `host_sin` function be passed `device=dev`
                            call_with_device=True))(
         np.ones((2, 4), dtype=np.float32))

# prints (in arbitrary order)
# Invoking host_sin with (4,) on cpu:0
# Invoking host_sin with (4,) on cpu:1

Note that call() does not support any JAX transformations, but as we show below one can make use of the existing support for Custom differentiation in JAX.

Using id_tap() to call a Python function on the host, with no returned values

The id_tap() and id_print() are special cases of call(), when you just want the side effects of your Python callback. These functions have the advantage that once the arguments have been sent to the host, the device computation can proceed without waiting for the Python callback to return. For id_tap() you can specify your Python callback to be called, while id_print() uses a built-in callback that prints the arguments to stdout on the host. The Python function passed to id_tap() takes two positional arguments (the value tapped from the device computation along with a transforms tuple, described below). Optionally, the function may be passed a keyword argument device with the Device from which the value was tapped.

A few examples:

def host_func(arg, transforms):
   ...do something with arg...

# calls host_func(2x, []) on host
id_tap(host_func, 2 * x)

# calls host_func((2x, 3x), [])
id_tap(host_func, (2 * x, 3 * x))  # The argument can be a pytree

# calls host_func(2x, [], device=jax.devices()[0])
id_tap(host_func, 2 * x, tap_with_device=True)  # Pass the device to the tap

# calls host_func(2x, [], what='activation')
id_tap(functools.partial(host_func, what='activation'), 2 * x)

# calls host_func(dict(x=x, y=y), what='data')
id_tap(lambda tap, transforms: host_func(tap, what='data'), dict(x=x, y=y))

The above examples can all be adapted to use id_print() instead, with the difference that id_print() prints on the host the positional argument, along with any additional kwargs and the automatic kwarg transforms.

Using barrier_wait() to wait until all callbacks have executed

If your Python callbacks have side-effects you may need to wait until the computation has finished to ensure that the side-effects have been observed. You can use the barrier_wait() function for that purpose:

accumulator = p[]
def host_log(arg):
  # We just record the arguments in a list
  accumulator.append(arg)


def device_fun(c):
  id_tap(host_log, x)
  id_tap(host_log, 2. * x)

jax.jit(device_fun)(1.)
jax.jit(device_fun)(1.)

# At this point, we have started two computations, each with two
# taps, but they may not have yet executed.
barrier_wait()
# Now we know that all the computations started before `barrier_wait`
# on all devices, have finished, and all the callbacks have finished
# executing.

Note that barrier_wait() will start one tiny computation with one tap on each of the jax.local_devices() and will wait for all these taps to be received.

An alternative to using barrier_wait() is to just wait for the end of the computation, if all the callbacks are call():

accumulator = p[]
def host_log(arg):
  # We just record the arguments in a list
  accumulator.append(arg)
  return 0.  #  return something


def device_fun(c):
  y = call(host_log, x, result_shape=jax.ShapeDtypeStruct((), np.float32))
  z = call(host_log, 2. * x, result_shape=jax.ShapeDtypeStruct((), np.float32))
  return y + z  # return something that uses both results

res1 = jax.jit(device_fun)(1.)
res2 = jax.jit(device_fun)(1.)
res1.block_until_ready()
res2.block_until_ready()

Behavior under parallelization transformations

In presence of jax.pmap() the code will run on multiple devices and each device will tap its values independently. It may be helpful to use the tap_with_device option for id_print() or id_tap(), so that you see which device is sending which data:

jax.pmap(power3, devices=jax.local_devices()[:2])(np.array([3., 4.])
# device=cpu:0 what=x,x^2: (3., 9.)  # from the first device
# device=cpu:1 what=x,x^2: (4., 16.)  # from the second device

When using jax.pmap() with multiple devices on multiple hosts, every host will receive callbacks from all of its local devices, with an operand that corresponds to each device slice. For a call(), the callback must return to each device only the slice of the result that pertains to the corresponding device.

When using the experimental pjit.pjit() the code will run on multiple devices on different shards of the input. The current implementation of host callbacks will ensure that a single device will collect and outfeed the entire operand, in a single callback. The callback function is supposed to return the entire array, which will then be sent in a single infeed to the same device that issued the outfeed. This device is then responsible for sending the required shards to the other devices:

with maps.mesh(jax.local_devices()[:2], ["d"]):
  pjit.pjit(power3, in_axis_resources=(P("d"),),
            out_axis_resources=(P("d"),))(np.array([3., 4.]))

# device=TPU:0 what=x,x^2: ( [3., 4.],
#                            [9., 16.] )

Note that the collection of the operand on one device may result in OOM if the operand was sharded across devices.

When using pjit.pjit() with multiple devices on multiple hosts, only the host for the device 0 (w.r.t. the mesh) will receive the callback, with the operand collected from all participating devices on all hosts. For a call(), the callback must return the entire array for all devices on all hosts.

Behavior under JAX autodiff transformations

When used under a JAX autodiff transformation, the host callback functions operate on the primal values only. Consider the following example:

def power3(x):
  y = x * x
  # Print both 'x' and 'x^2'. Must pack as a tuple.
  hcb.id_print((x, y), what="x,x^2")
  return y * x

power3(3.)
# what: x,x^2 : (3., 9.)

(You can see these examples tested in host_callback_test.HostCallbackTapTest.test_tap_transforms.)

When used under jax.jvp() there will be one callback with the primal values only:

jax.jvp(power3, (3.,), (0.1,))
# what: x,x^2 : (3., 9.)

Similarly for jax.grad(), we get a callback from the forward computation only:

jax.grad(power3)(3.)
# what: x,x^2 : (3., 9.)

If you want to invoke the callback on the tangents during a jax.jvp(), you can use a custom_jvp. For example, you can define a function that does nothing interesting except that its custom_jvp will print the tangents:

@jax.custom_jvp
def print_tangents(arg):
  return None

@print_tangents.defjvp
def print_tangents_jvp(primals, tangents):
  arg_dot, = tangents
  hcb.id_print(arg_dot, what="tangents")
  return primals, tangents

Then you use this function in the places where you want to tap the tangents:

def power3_with_tangents(x):
  y = x * x
  # Print both 'x' and 'x^2'. Must pack as a tuple.
  hcb.id_print((x, y), what="x,x^2")
  print_tangents((x, y))
  return y * x

jax.jvp(power3_with_tangents, (3.,), (0.1,))
# what: x,x^2 : (3., 9.)
# what: tangents : (0.1, 0.6)

You can do a similar thing for the cotangents during jax.grad(). This time you must be careful to use in the rest of the computation the values whose cotangents you want to tap. Hence we make the print_cotangents return its argument:

@jax.custom_vjp
def print_cotangents(arg):
  # Must return the argument for which we want the cotangent.
  return arg

# f_fwd: a -> (b, residual)
def print_cotangents_fwd(arg):
  return print_cotangents(arg), None
# f_bwd: (residual, CT b) -> [CT a]
def print_cotangents_bwd(residual, ct_b):
  hcb.id_print(ct_b, what="cotangents", output_stream=testing_stream)
  return ct_b,

print_cotangents.defvjp(print_cotangents_fwd, print_cotangents_bwd)

def power3_with_cotangents(x):
  y = x * x
  # Print both 'x' and 'x^2'. Must pack as a tuple.
  hcb.id_print((x, y), what="x,x^2", output_stream=testing_stream)
  (x1, y1) = print_cotangents((x, y))
  # Must use the output of print_cotangents
  return y1 * x1

jax.grad(power3_with_cotangents)(3.)
# what: x,x^2 : (3., 9.)
# what: cotangents : (9., 3.)

If you use ad_checkpoint.checkpoint() to rematerialize the residuals for the backward pass, then the callbacks from the primal computation will be called twice:

jax.grad(lambda x: power3(ad_checkpoint.checkpoint(power3)(x)))(3.)
# what: x,x^2 : (3., 9.)
# what: x,x^2 : (27., 729.)
# what: x,x^2 : (3., 9.)

The callbacks are, in order from: the primal computation of the inner power3, the primal computation of the outer power3, and the rematerialization of the residuals for the inner power3.

Behavior under jax.vmap

The host callback functions id_print() and id_tap() support the vectorization transformation jax.vmap().

For jax.vmap() the arguments to the callback are batched, and the callback function is passed an additional special transforms containing a list of transformation descriptors in the form ("batch", {"batch_dims": ...}), where ...` denotes the batched dimensions for the tapped values (one entry per argument, ` None` denotes an argument that was broadcast).

jax.vmap(power3)(np.array([2., 3.])) # transforms: [(‘batch’, {‘batch_dims’: (0, 0)})] what: x,x^2 : ([2., 3.], [4., 9.])

See documentation for id_tap(), id_print(), and call().

For more usage example, see tests/host_callback_test.py.

Using call() to call a TensorFlow function, with reverse-mode autodiff support

Another possible use for host computation is to invoke a library written for another framework, such as TensorFlow. In this case it becomes interesting to support JAX autodiff for host callbacks by deferring to the autodiff mechanism in TensorFlow, using the jax.custom_vjp() mechanism.

This is relatively easy to do, once one understands both the JAX custom VJP and the TensorFlow autodiff mechanisms. The code for how this can be done is shown in the call_tf_full_ad function in host_callback_to_tf_test.py. This example supports arbitrary higher-order differentiation as well.

Note that if you just want to call TensorFlow functions from JAX, you can also use the jax2tf.call_tf function.

Using call() to call a JAX function on another device, with reverse-mode autodiff support

It should not be surprising that we can use host computation to invoke a JAX computation on another device. The arguments are sent from the accelerator to the host, and then to the outside device on which the JAX host computation will run, and then the results are sent back to the original accelerator.

The code for how this can be done is shown in the call_jax_other_device function in host_callback_test.py.

Low-level details and debugging

The host callback functions will be executed for each device in the order in which the send operations were performed on the device.

The host callback functions for multiple devices may be interleaved. The data from the devices is received by separate threads managed by the JAX runtime (one thread per device). The runtime maintains a buffer of configurable size (see the flag --jax_host_callback_max_queue_byte_size). When the buffer is full, all the receiving threads are paused which eventually pauses the computation on devices. The runtime has one additional thread for each device to invoke the Python user functions with the received data. If the processing of the callbacks is slow, it may actually lead to the runtime buffer filling up, and eventually pausing the computation on the devices when they need to send something. For more details on the outfeed receiver runtime mechanism see runtime code.

In order to pause the execution until all data from computations already started on devices has arrived and has been processed, use barrier_wait().

Exceptions from the user-defined callback functions are logged along with their stack traces, but the receiving threads are not stopped. Instead the last exception is recorded and the subsequent barrier_wait() will raise CallbackException if any exception had occurred in one of the tap functions. This exception will include the text and the stack trace of the last exception encountered.

One further complication arises for callback functions that must return results to the call origin device, such as call(). This is handled differently on CPU/GPU devices compared to TPU devices.

On CPU/GPU devices, in order to avoid the device computation being stuck waiting for a result that will never arrive, in case of any error during the processing of the callback (whether raised by the user-code itself or due to a mismatch of the returned value and the expected return_shape) we send the device a “fake” result of shape int8[12345]. This will make the device computation abort because the received data is different than the one that it expects. On CPU the runtime will crash with a distinctive error message:

` Check failed: buffer->length() == buffer_length (12345 vs. ...) `

On GPU, the failure is more user-friendly and will be surfaced to the Python program as:

` RET_CHECK failure ... Mismatch between infeed source buffer shape s8[12345] ... `

To debug the underlying cause for these messages, see the Debugging section.

On TPU devices, there is currently no shape check for infeed, so we take the safer route of not sending this fake result in case of errors. This means that the computation will hang, and no exception will be raised (but any exceptions in the callback functions will still appear in the logs).

The current implementation uses the outfeed mechanism provided by XLA. The mechanism itself is quite primitive in the sense that a receiver must know exactly the shape of each incoming packet, and how many packets are expected. This makes it hard to use for multiple kinds of data in the same computation, and it is practically impossible to use it under conditionals or in loops of non-constant iteration count. Furthermore, code that uses the outfeed mechanism directly cannot be transformed by JAX. All these limitations are addressed by the host callback functions. The tapping API introduced here makes it easy to share the outfeed mechanism for multiple purposes, while supporting all transformations.

Note that after you have used the host callback functions, you cannot use lax.outfeed directly. You may want to stop_outfeed_receiver() if you later need to use lax.outfeed.

Since the actual calls to your callback functions are made from the C++ receiver, it may be hard to debug the calls. In particular, the stack trace will not include the calling code. You can use the flag jax_host_callback_inline (or the environment variable JAX_HOST_CALLBACK_INLINE) to ensure that the calls to the callbacks are inlined. This works only if the calls are outside a staging context (jit or a control-flow primitive).

The C++ receiver is started automatically on the first call to id_tap(). In order to stop it properly, upon start an atexit handler is registered to call barrier_wait() with the logging name “at_exit”.

There are a few environment variables that you can use to turn on logging for the C++ outfeed receiver backend.

  • TF_CPP_MIN_LOG_LEVEL=0: will turn on INFO logging, needed for all below.

  • TF_CPP_MIN_VLOG_LEVEL=3: will make all VLOG logging up to level 3 behave like INFO logs. This may be too much, but you will see which modules are logging relevant info, and then you can select which modules to log from.

  • TF_CPP_VMODULE=<module_name>=3 (the module name can be either C++ or Python, without the extension).

You should also use the --verbosity=2 flag so that you see the logs from Python.

For example, you can try to enable logging in the host_callback module: TF_CPP_MIN_LOG_LEVEL=0 TF_CPP_VMODULE=host_callback=3 python tests/host_callback_test.py --verbosity=2 HostCallbackIdTapTest.test_tap_jit_simple

If you want to enable logging in lower-level implementation modules try: TF_CPP_MIN_LOG_LEVEL=0 TF_CPP_VMODULE=outfeed_receiver=3,host_callback=3,outfeed_receiver_py=3,outfeed_thunk=3,infeed_thunk=3,cpu_transfer_manager=3,cpu_runtime=3,xfeed_manager=3,pjrt_client=3 python tests/host_callback_test.py --verbosity=2 HostCallbackIdTapTest.test_tap_jit_simple

(For bazel tests use –test_arg=–vmodule=…

Still to do:
  • More performance tests.

  • Explore implementation with outside compilation for TPU.

  • Explore implementation with XLA CustomCall for CPU and GPU.

API

jax.experimental.host_callback.id_tap(tap_func, arg, *, result=None, tap_with_device=False, **kwargs)[source]

Host-callback tap primitive, like identity function with a call to tap_func.

Experimental: please give feedback, and expect changes!

id_tap behaves semantically like the identity function but has the side-effect that a user-defined Python function is called with the runtime value of the argument.

Parameters
  • tap_func – tap function to call like tap_func(arg, transforms), with arg as described below and where transforms is the sequence of applied JAX transformations in the form (name, params). If the tap_with_device optional argument is True, then the invocation also includes the device from which the value is tapped as a keyword argument: tap_func(arg, transforms, device=dev).

  • arg – the argument passed to the tap function, can be a pytree of JAX types.

  • result – if given, specifies the return value of id_tap. This value is not passed to the tap function, and in fact is not sent from the device to the host. If the result parameter is not specified then the return value of id_tap is arg.

  • tap_with_device – if True then the tap function is invoked with the device from which the tap originates as a keyword argument.

Returns

arg, or result if given.

The order of execution is by data dependency: after all the arguments and the value of result if present, are computed and before the returned value is used. At least one of the returned values of id_tap must be used in the rest of the computation, or else this operation has no effect.

Tapping works even for code executed on accelerators and even for code under JAX transformations.

For more details see the module documentation.

jax.experimental.host_callback.id_print(arg, *, result=None, tap_with_device=False, output_stream=None, threshold=None, **kwargs)[source]

Like id_tap() with a printing tap function.

Experimental: please give feedback, and expect changes!

On each invocation of the printing tap, the kwargs if present will be printed first (sorted by keys). Then arg will be printed, with the arrays stringified with numpy.array2string.

See the id_tap() documentation.

Additional keyword arguments:

  • tap_with_device if True, will print also the device from which the value originates.

  • output_stream if given then it will be used instead of the built-in print. The string will be passed as output_stream.write(s).

  • threshold is passed to numpy.array2string.

jax.experimental.host_callback.call(callback_func, arg, *, result_shape=None, call_with_device=False)[source]

Make a call to the host, and expect a result.

Experimental: please give feedback, and expect changes!

Parameters
  • callback_func (Callable) – The Python function to invoke on the host as callback_func(arg). If the call_with_device optional argument is True, then the invocation also includes the device kwarg with the device from which the call originates: callback_func(arg, device=dev). This function must return a pytree of numpy ndarrays.

  • arg – the argument passed to the callback function, can be a pytree of JAX types.

  • result_shape – a value that describes the expected shape and dtype of the result. This can be a numeric scalar, from which a shape and dtype are obtained, or an object that has .shape and .dtype attributes. If the result of the callback is a pytree, then result_shape should also be a pytree with the same structure. In particular, result_shape can be () or None if the function does not have any results. The device code containing call is compiled with the expected result shape and dtype, and an error will be raised at runtime if the actual callback_func invocation returns a different kind of result.

  • call_with_device – if True then the callback function is invoked with the device from which the call originates as a keyword argument.

Returns

the result of the callback_func invocation.

For more details see the module documentation.

jax.experimental.host_callback.barrier_wait(logging_name=None)[source]

Blocks the calling thread until all current outfeed is processed.

Waits until all callbacks from computations already running on all devices have been received and processed by the Python callbacks. Raises CallbackException if there were exceptions while processing the callbacks.

This works by enqueueing a special tap computation to all devices to which we are listening for outfeed. Once all those tap computations are done, we return from barrier_wait.

Note: If any of the devices are busy and cannot accept new computations, this will deadlock.

Parameters

logging_name (Optional[str]) – an optional string that will be used in the logging statements for this invocation. See Debugging in the module documentation.

exception jax.experimental.host_callback.CallbackException[source]

Signals that some callback function had exceptions.

Raised by barrier_wait(). See module documentation for details.