The Asynchronous I/O API allows you to perform I/O operations that invoke a callback when they are complete, rather than synchronously calling a function that returns a value (like cl:read-line). This allows many operations to run in a single thread. When using this API, you have to hold all of the application's state in data structures so that the callback can determine how to proceed.
There are two parts to the API:
A wait-state-collection is an object that controls asynchronous I/O via an event loop. Each I/O channel is associated with a wait-state in the collection (see the 25.7.2 The Async-I/O-State API for how to add channels to a collection).
Make a wait-state-collection using make-wait-state-collection, wait for I/O to occur using wait-for-wait-state-collection, process the I/O using call-wait-state-collection and close the collection using close-wait-state-collection.
The function loop-processing-wait-state-collection simplifies processing I/O by repeatedly calling wait-for-wait-state-collection and call-wait-state-collection. It can be stopped by wait-state-collection-stop-loop. The function create-and-run-wait-state-collection makes a wait-state-collection and a process that runs it (using loop-processing-wait-state-collection). In many cases, create-and-run-wait-state-collection is the only function that you need to use.
To call a function in the process associated with a wait-state-collection you can use apply-in-wait-state-collection-process (but see also 25.7.3 Writing callbacks in Asynchronous I/O operations).
For the wait-state-collection to actually do anything, it must have some "wait-states" associated with it. The primary way of associating "wait-states" with a wait-state-collection is to create an async-io-state associated with it, see 25.7.2 The Async-I/O-State API below. The function accept-tcp-connections-creating-async-io-states also creates an associated "wait-state", which itself creates an async-io-state associated with the wait-state-collection. Note that new async-io-states can be added (and removed) dynamically to the wait-state-collection from any process while it is working.
See:
(example-edit-file "async-io/driver")
The Async-I/O-State API contains functions to create and close various kinds of asynchronous I/O channels and perform input and output operations on them. Currently "I/O channel" means a socket or a socket-stream.
Each channel has an associated async-io-state object, which is used to retain information about the channel between calls to the input and output functions. You can store your own information using the async-io-state-user-info accessor and give the object a name for debugging purposes using the async-io-state-name accessor.
An async-io-state is created by any of these functions:
Takes a socket (an integer) or a socket-stream and allows I/O on the socket. | |
Takes a socket address to connect to, creates a TCP socket and connects it, and allows I/O on it. You must not start any I/O operations on the async-io-state returned by create-async-io-state-and-connected-tcp-socket until its callback argument has been called. create-async-io-state-and-connected-tcp-socket takes a callback argument that is called when the connection has been made. You must not start any I/O operations on the async-io-state before this callback has been called. | |
Takes a service and creates a listening socket that accepts connection and create states which allow I/O on the accepted connections. | |
Creates a UDP socket and allows I/O on it. | |
Takes a socket address, creates a UDP socket and connect it, and allows I/O on it. |
Once an async-io-state is created for an object, the object itself should not be used directly for I/O in the same direction (read or write). The async-io-state can then be made active by one of async-io-state-read-buffer, async-io-state-write-buffer, async-io-state-read-with-checking, async-io-state-receive-message, async-io-state-send-message and async-io-state-send-message-to-address.
Each async-io-state is associated with a wait-state-collection when it is created. For the async-io-state to be active, the wait-state-collection must be active, which means there must be a process calling wait-for-wait-state-collection and call-wait-state-collection, possibly via loop-processing-wait-state-collection.
The functions async-io-state-read-buffer and async-io-state-write-buffer create an I/O operation that reads or writes a fixed amount of data in a buffer. The operation finishes when the callback is called, or when when an abort-callback is called (after being set up by async-io-state-abort).
The function async-io-state-read-with-checking creates an input operation that periodically invokes a callback to determine whether enough data has been received, by examining the internal buffer. You can call async-io-state-discard to indicate that part of the internal buffer has been processed (for example parsed and converted to some data structure). The operation finishes when async-io-state-finish is called inside the callback, or when an abort-callback is called (after being set up by async-io-state-abort).
The function async-io-state-receive-message creates an input operation that receives a message (using recv
or recvfrom
). The functions async-io-state-send-message and async-io-state-send-message-to-address create an I/O operation that sends a message (using send
or sendto
). These three functions are intended to be used with states created with UDP sockets.
While an input operation is ongoing, you cannot start another input operation with the same direction. While a write operation is ongoing, whether you can start another write operation depends on the keyword argument :queue-output
which is used when the async-io-state is created. If queue-input was nil
(the default for TCP), then you cannot start another write operation while one is ongoing. If queue-output was supplied as non-nil (the default for UDP), you can start another write operation, and the operation gets queued and actually starts after all previously queued operations have finished.
When you no longer need the async-io-state you must close it by close-async-io-state. Normally, that would close the object of the async-io-state too. close-async-io-state can be told to leave the object alive, so you can do further I/O with it. However, if you have read using async-io-state, it may have buffered data which you will need to deal with by async-io-state-buffered-data-length and async-io-state-get-buffered-data (unless you can just ignore it).
An async-io-state can have a name, to help identifying it, mainly for debugging. The default names that different functions give help to identify the kind of object that the state has.
See:
(example-edit-file "async-io/multiplication-table")
(example-edit-file "async-io/print-connection-delay")
(example-edit-file "async-io/udp")
All of the Asynchronous I/O operations take a callback, which is called when the operation finished. The callbacks are called inside the same process that processes the wait-state-collection (specifically, the process that called call-wait-state-collection, potentially via loop-processing-wait-state-collection). That means that until the callback returns, no further processing happens on the wait-state-collection, and hence on any of the other async-io-states that are associated with it. Therefore callbacks need to be reasonably fast and not hang.
In general, the callbacks should be creating the next I/O operations, to ensure that that operations on each state are sequential (see 25.7.4 Asynchronous I/O and multiprocessing). If this is a reasonably simple operation you just do it, but if the data for the next operation make take a long time to prepare you probably want to avoid doing it in the context of the callback. Things that may cause it to take a long time include heavy computation or access to external resources that may cause delays.
A general solution is to send the work to another process, which will do the work and on completion will do the next I/O operation by calling the read/write async-io-state function.
Another possible solution is to perform operations that can be fast using one wait-state-collection, and perform slow operations on (an)other wait-state-collection(s). This way a slow callback will only impede other slow callbacks. For example you may be accepting connections on the "fast" wait-state-collection, but communicate with the accepted connection on a slow wait-state-collection (pass :create-state nil
to accept-tcp-connections-creating-async-io-states, and in the callback use create-async-io-state with another wait-state-collection). You may also decide to do the communication using streams and synchronous I/O (pass :create-state nil
and in the callback use (make-instance 'socket-stream ...)
and send the result to another process).
Processing of the wait-state-collection is not thread-safe, and for each collection there must be only one process at any one time calling any of these functions:
wait-state-collection-stop-loop is thread-safe, and can be called on any thread at any time.
Adding and removing states to/from the collection is thread-safe with respect to the collection, which means that the creation functions like create-async-io-state can be called in parallel with any function that access the same collection, including themselves and the processing functions above. The same applies to functions that remove the state from the collection (close-async-io-state), though these are not thread-safe with respect to the state (see below).
Note that the functions that create states use other resources which may have their own limitations. Most notably, local ports can be used only once at any time with the same protocol and family, so if you try to bind to a specific local port (by passing local-port to any of the functions or non-zero service in accept-tcp-connections-creating-async-io-states), you have to make sure that you do not do it with a port that is currently in use. (Note that accept-tcp-connections-creating-async-io-states may try several times).
The functions that actually do the I/O are not thread-safe with respect to the state argument, but are thread-safe with respect to the collection that the state is associated with. That means that they can be called in parallel to any function that accesses the collection that the state is associated with, but cannot be called in parallel to another function that tries to do I/O on the same state and direction. Moreover, the read functions cannot be called while there is an ongoing read operation, and the write function can be called while another write operation is ongoing only if queue-output is non-nil when creating the state. The function close-async-io-state also cannot be called in parallel to any of the I/O functions.
Explicitly:
The reading functions async-io-state-read-buffer, async-io-state-read-with-checking and async-io-state-receive-message must not be called on the same state in parallel to any of themselves, or in the period between any call to any of themselves and the call to the callback in the case of async-io-state-read-with-checking or async-io-state-receive-message, or the call to async-io-state-finish in the case of async-io-state-read-with-checking, or abort-callback.
If queue-output was nil
when the state was created (TCP default), the writing functions async-io-state-write-buffer, async-io-state-send-message, and async-io-state-send-message-to-address must not be called on the same state in parallel to any of themselves, or in the period between any call to any of themselves and the call to the callback, or abort-callback. If queue-output was non-nil when the state was created (UDP default), the writing functions can be called in parallel.
close-async-io-state must not be called on the same state in parallel to any of the reading or writing functions, or between a call to any of them at the end of their operation (the callback, async-io-state-finish, or the abort-callback).
The reading and writing functions are mutually thread-safe, that is any of the reading functions can be called in parallel to any of the writing functions.
The functions async-io-state-abort and async-io-state-abort-and-close are thread-safe, and be called at any time in parallel to any function.
async-io-state-get-buffered-data is not thread-safe, and must not be called in parallel to any other function that may modify the state.
async-io-state-finish and async-io-state-discard are not thread-safe, but can only be called inside the callback of async-io-state-read-with-checking, which will be always in the same process. The accessors of async-io-state are thread-safe.
In general, it is intended that you will cope with these thread-safe restrictions of I/O functions by calling them from the callbacks of the previous I/O operation, thus guaranteeing that the previous I/O operation finished. For example, if you need to write several buffers to a socket, you can call async-io-state-write-buffer with the first buffer, and with a callback that calls async-io-state-write-buffer with the next buffer. A natural place to put the information where to get the next buffer is the user-info of the async-io-state, which can be accessed using async-io-state-user-info. For example, assume you have an async-io-state, a list of buffers to send, and also on completion you want to call a function finished
on some object:
(defun my-send-buffers (state buffers object) (setf (async-io-state-user-info state) (cons buffers object)) (my-state-send-next-buffer state)) (defun my-state-send-next-buffer (state) (let ((info (async-io-state-user-info state))) (if-let (buffer (pop (car info))) (async-io-state-write-buffer state buffer #'(lambda (state buffer length) (declare (ignore buffer length)) (my-state-send-next-buffer state))) (finished (cdr info)))))
In a real application the user-info is likely to be a more complex object.
If you make the state with queue-output t
, you can simply write all the buffers in one go:
(defun my-send-buffers (state buffers object) (setf (async-io-state-user-info state) object) (loop for cons on buffers do (async-io-state-write-buffer state (car cons) :callback (if (cdr cons) ; if there are more buffers #'true ; do nothing #'(lambda (state buffer length) (declare (ignore buffer length)) (finished (async-io-state-user-info state)))))))
LispWorks® User Guide and Reference Manual - 01 Dec 2021 19:30:24