blob: 02c1c612ce6de1676e1dbe3459b1a22755d66789 [file] [log] [blame]
package com.android.networkstack.tethering.companionproxy.io;
import android.os.ParcelFileDescriptor;
import android.system.StructPollfd;
import android.util.Log;
import com.android.internal.util.IndentingPrintWriter;
import com.android.networkstack.tethering.companionproxy.util.Assertions;
import java.io.Closeable;
import java.io.FileDescriptor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Implements EventManager. Starts its own thread and invokes all callbacks
* on that thread.
*
* @hide
*/
public final class EventManagerImpl implements EventManager {
// Never block poll indefinitely. We should always wakeup() poll when necessary,
// however having an absolute maximum adds an extra safety net.
private static final int MAX_POLL_TIMEOUT = 5000;
/** @hide */
public interface Listener {
void onEventManagerFailure();
}
private final OsAccess mOsAccess;
private final Listener mListener;
private final String mLogTag;
private final Object mLock = new Object();
private final AtomicBoolean mIsStopped = new AtomicBoolean(false);
private final FdAsyncFile[] mWakeupPipe = new FdAsyncFile[2];
private final ArrayList<BaseAsyncFileImpl> mFiles = new ArrayList<>();
private final PriorityQueue<AlarmData> mAlarms = new PriorityQueue<>();
private final byte[] mWakeupData = new byte[1];
private final ArrayList<BaseAsyncFileImpl> mTempFileList = new ArrayList<>();
private final short mPollInMask;
private final short mPollOutMask;
private StructPollfd[] mPollSet = new StructPollfd[0];
private volatile Thread mThread;
private volatile boolean mHasAlarmCancellations;
private boolean mHasLocalWakeupCalls;
public EventManagerImpl(OsAccess osAccess, Listener listener, String logTag) {
mOsAccess = osAccess;
mListener = listener;
mLogTag = logTag;
mPollInMask = mOsAccess.getPollInMask();
mPollOutMask = mOsAccess.getPollOutMask();
}
/** Starts EventManager internal thread. */
public void start() throws IOException {
if (mThread != null || mIsStopped.get()) {
throw new IllegalStateException("Cannot start EventManager");
}
final ParcelFileDescriptor[] wakeupPipe = mOsAccess.pipe();
try {
AsyncFile.Listener wakeupListener = new WakeupEventListener();
mWakeupPipe[0] = new FdAsyncFile(wakeupPipe[0], wakeupListener);
mWakeupPipe[1] = new FdAsyncFile(wakeupPipe[1], wakeupListener);
} catch (IOException e) {
if (wakeupPipe != null) {
mOsAccess.close(wakeupPipe[0]);
mOsAccess.close(wakeupPipe[1]);
}
throw e;
}
mFiles.add(mWakeupPipe[0]);
mFiles.add(mWakeupPipe[1]);
mWakeupPipe[0].enableReadEvents(true);
mThread =
new Thread(
() -> {
runLoop();
},
"CompanionProxyEventManager");
mThread.start();
}
/** Stops EventManager internal thread, and closes all managed file descriptors. */
public void shutdown() {
synchronized (mLock) {
mIsStopped.set(true);
if (mThread == null || mThread == Thread.currentThread()) {
return;
}
}
wakeup();
try {
Thread thread = mThread;
if (thread != null) {
thread.join();
}
} catch (InterruptedException e) {
Log.w(mLogTag, logStr("Interrupted while joining EventManager thread"));
}
}
private void cleanupOnShutdown() {
ArrayList<AlarmData> alarms;
ArrayList<BaseAsyncFileImpl> files;
synchronized (mLock) {
mIsStopped.set(true);
mThread = null;
mWakeupPipe[0] = null;
mWakeupPipe[1] = null;
files = new ArrayList<>(mFiles);
mFiles.clear();
alarms = new ArrayList<>(mAlarms);
mAlarms.clear();
}
for (BaseAsyncFileImpl file : files) {
file.doClose();
}
for (AlarmData alarm : alarms) {
try {
alarm.callback.onAlarmCancelled(alarm);
} catch (Throwable e) {
Log.w(mLogTag, logStr("Unexpected error while notifying close event"), e);
}
}
}
@Override
public void assertInThread() {
Thread thread = mThread;
if (!Assertions.IS_USER_BUILD && thread != null && thread != Thread.currentThread()) {
throw new IllegalArgumentException("Not in EventManager thread: "
+ Thread.currentThread().getName());
}
}
@Override
public AsyncFile registerFile(FileHandle fileHandle, AsyncFile.Listener listener)
throws IOException {
BaseAsyncFileImpl file;
synchronized (mLock) {
if (mThread == null || mIsStopped.get()) {
throw new IOException("EventManager has not started or has been shut down");
}
file = createAsyncFileImpl(fileHandle, listener);
mFiles.add(file);
}
wakeup();
return file;
}
private BaseAsyncFileImpl createAsyncFileImpl(FileHandle fileHandle,
AsyncFile.Listener listener) throws IOException {
ParcelFileDescriptor fd = fileHandle.getFileDescriptor();
if (fd != null) {
try {
return new FdAsyncFile(fd, listener);
} catch (IOException e) {
mOsAccess.close(fd);
throw e;
}
}
Closeable closeable = fileHandle.getCloseable();
if (closeable != null) {
closeFile(closeable);
throw new IOException("Blocking sockets not supported");
}
throw new IOException("Invalid FileHandle");
}
private static void closeFile(Closeable file) {
try {
if (file != null) {
file.close();
}
} catch (IOException e) {
// ignore
}
}
@Override
public Alarm scheduleAlarm(long timeout, Alarm.Listener callback) {
AlarmData alarm = new AlarmData(callback, mOsAccess.monotonicTimeMillis(), timeout);
synchronized (mLock) {
if (mThread == null || mIsStopped.get()) {
return null;
}
mAlarms.add(alarm);
}
wakeup();
return alarm;
}
@Override
public void execute(Runnable callback) {
scheduleAlarm(0, new Alarm.Listener() {
@Override
public void onAlarm(Alarm alarm, long elapsedTimeMs) {
callback.run();
}
@Override
public void onAlarmCancelled(Alarm alarm) {}
});
}
private void wakeup() {
if (Thread.currentThread() == mThread) {
mHasLocalWakeupCalls = true;
return;
}
synchronized (mLock) {
if (mThread != null && mWakeupPipe[1] != null) {
try {
mWakeupPipe[1].writeInternal(mWakeupData, 0, mWakeupData.length);
} catch (IOException e) {
Log.w(mLogTag, logStr("Failed to write to wakeup pipe: " + e.toString()));
}
}
}
}
private void runLoop() {
if (Log.isLoggable(mLogTag, Log.VERBOSE)) {
Log.v(mLogTag, logStr("Starting EventManager"));
}
try {
while (!mIsStopped.get()) {
processExpiredAlarms();
if (mIsStopped.get()) {
break;
}
processReceivedEvents();
if (mIsStopped.get()) {
break;
}
updatePollFds();
long pollTimeout = MAX_POLL_TIMEOUT;
synchronized (mLock) {
if (mHasLocalWakeupCalls) {
pollTimeout = 0;
mHasLocalWakeupCalls = false;
} else if (!mAlarms.isEmpty()) {
long now = mOsAccess.monotonicTimeMillis();
long nextAlarmDeadline = mAlarms.peek().expirationTimeMs;
if (nextAlarmDeadline < now) {
nextAlarmDeadline = now;
}
pollTimeout = Math.min(pollTimeout, nextAlarmDeadline - now);
}
}
mOsAccess.poll(mPollSet, (int) pollTimeout);
}
} catch (Throwable e) {
Log.w(mLogTag, logStr("Unexpected error while trying to poll"), e);
} finally {
if (Log.isLoggable(mLogTag, Log.VERBOSE)) {
Log.v(mLogTag, logStr("Exiting EventManager"));
}
if (!mIsStopped.get()) {
Log.e(mLogTag, logStr("Exiting EventManager with an open connection"));
mListener.onEventManagerFailure();
}
}
cleanupOnShutdown();
}
private void processExpiredAlarms() {
ArrayList<AlarmData> alarms = null;
synchronized (mLock) {
if (mHasAlarmCancellations) {
mHasAlarmCancellations = false;
Iterator<AlarmData> it = mAlarms.iterator();
while (it.hasNext()) {
AlarmData alarm = it.next();
if (alarm.isCancelled) {
if (alarms == null) {
alarms = new ArrayList<>();
}
alarms.add(alarm);
it.remove();
}
}
}
long now = mOsAccess.monotonicTimeMillis();
while (!mAlarms.isEmpty() && !mIsStopped.get()) {
if (mAlarms.peek().expirationTimeMs > now) {
break;
}
if (alarms == null) {
alarms = new ArrayList<>();
}
alarms.add(mAlarms.poll());
}
}
if (alarms == null) {
return;
}
for (AlarmData alarm : alarms) {
if (alarm.isCancelled) {
try {
alarm.callback.onAlarmCancelled(alarm);
} catch (Throwable e) {
Log.w(mLogTag, logStr("Unexpected error while cancelling alarm"), e);
}
continue;
}
try {
long elapsedTimeMs = mOsAccess.monotonicTimeMillis() - alarm.creationTimeMs;
alarm.callback.onAlarm(alarm, elapsedTimeMs);
} catch (Throwable e) {
Log.w(mLogTag, logStr("Unexpected error while executing alarm"), e);
}
}
}
private void removeClosedFiles() {
int idx = 0;
while (idx < mFiles.size()) {
BaseAsyncFileImpl file = mFiles.get(idx);
if (file.wantsClose()) {
mFiles.remove(idx);
file.doClose();
} else {
idx++;
}
}
}
private void processReceivedEvents() {
synchronized (mLock) {
removeClosedFiles();
mTempFileList.clear();
for (int i = 0; i < mFiles.size(); i++) {
mTempFileList.add(mFiles.get(i));
}
}
for (int i = 0; i < mTempFileList.size(); i++) {
if (!mIsStopped.get()) {
mTempFileList.get(i).notifyEvents();
}
}
}
private void updatePollFds() {
synchronized (mLock) {
// Perform all requested close() operations.
removeClosedFiles();
// Perform all requested event registration operations.
int pollSetSize = 0;
for (int i = 0; i < mFiles.size(); i++) {
BaseAsyncFileImpl file = mFiles.get(i);
file.applyEventRequests();
if (file.getStructPollfd() != null) {
pollSetSize++;
}
}
// Re-compose the mPollSet.
if (pollSetSize != mPollSet.length) {
mPollSet = new StructPollfd[pollSetSize];
}
int idx = 0;
for (int i = 0; i < mFiles.size(); i++) {
BaseAsyncFileImpl file = mFiles.get(i);
StructPollfd structPollFd = file.getStructPollfd();
if (structPollFd != null) {
mPollSet[idx++] = structPollFd;
}
}
}
}
private String logStr(String message) {
return "[EventManager] " + message;
}
public void dump(IndentingPrintWriter ipw) {
assertInThread();
synchronized (mLock) {
ipw.print("EventManager [");
ipw.printPair("stopped", mIsStopped);
ipw.printPair("thread", mThread != null ? mThread.isAlive() : false);
ipw.println();
ipw.print("Files");
ipw.increaseIndent();
for (BaseAsyncFileImpl file : mFiles) {
ipw.println(file);
}
ipw.decreaseIndent();
ipw.println();
ipw.print("Alarms");
ipw.increaseIndent();
for (AlarmData alarm : mAlarms) {
ipw.println(alarm);
}
ipw.decreaseIndent();
ipw.println("]");
}
}
private class FdAsyncFile extends BaseAsyncFileImpl {
private final ParcelFileDescriptor mParcelFd;
private final FileDescriptor mRawFd;
private final StructPollfd mStructPollFd = new StructPollfd();
FdAsyncFile(ParcelFileDescriptor fd, AsyncFile.Listener listener) throws IOException {
super(listener, mLogTag);
if (fd == null) {
throw new NullPointerException();
}
mParcelFd = fd;
mRawFd = mOsAccess.getInnerFileDescriptor(mParcelFd);
mStructPollFd.fd = mRawFd;
mStructPollFd.userData = this;
mOsAccess.setNonBlocking(mRawFd);
}
@Override
protected void closeInternal() {
mStructPollFd.events = 0;
mStructPollFd.revents = 0;
mOsAccess.close(mParcelFd);
}
@Override
protected void applyEventRequests() {
// Runs in the EventManager thread.
mStructPollFd.revents = 0;
mStructPollFd.events = (short) (
(wantsReadEvents() ? mPollInMask : 0) |
(wantsWriteEvents() ? mPollOutMask : 0));
}
@Override
protected StructPollfd getStructPollfd() {
return (mStructPollFd.events != 0 ? mStructPollFd : null);
}
@Override
protected void notifyEvents() {
// Runs in the EventManager thread.
final boolean hasReadEvent =
(mStructPollFd.revents & mPollInMask) == mPollInMask;
final boolean hasWriteEvent =
(mStructPollFd.revents & mPollOutMask) == mPollOutMask;
mStructPollFd.revents = 0;
callEventListener(hasReadEvent, hasWriteEvent);
}
@Override
protected int readInternal(byte[] buffer, int pos, int len) throws IOException {
return mOsAccess.read(mRawFd, buffer, pos, len);
}
@Override
protected int writeInternal(byte[] buffer, int pos, int len) throws IOException {
return mOsAccess.write(mRawFd, buffer, pos, len);
}
@Override
protected void wakeup() {
EventManagerImpl.this.wakeup();
}
@Override
protected void assertInThread() {
EventManagerImpl.this.assertInThread();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("AsyncFile{fd=");
sb.append(mOsAccess.getFileDebugName(mParcelFd));
toString(sb);
sb.append(",events=");
sb.append(mStructPollFd.events);
sb.append(",revents=");
sb.append(mStructPollFd.revents);
sb.append('}');
return sb.toString();
}
}
private static class WakeupEventListener implements AsyncFile.Listener {
private final byte[] mDiscardBuffer = new byte[12];
@Override
public void onClosed(AsyncFile file) {}
@Override
public void onReadReady(AsyncFile file) {
try {
while (file.read(mDiscardBuffer, 0, mDiscardBuffer.length) > 0) {
// keep reading
}
} catch (IOException e) {
// ignore
}
}
@Override
public void onWriteReady(AsyncFile file) {}
};
private class AlarmData implements Alarm, Comparable<AlarmData> {
final Alarm.Listener callback;
final long creationTimeMs;
final long expirationTimeMs;
volatile boolean isCancelled;
AlarmData(Alarm.Listener callback, long creationTimeMs, long durationMs) {
this.callback = callback;
this.creationTimeMs = creationTimeMs;
this.expirationTimeMs = creationTimeMs + durationMs;
}
@Override
public void cancel() {
isCancelled = true;
mHasAlarmCancellations = true;
wakeup();
}
@Override
public int compareTo(AlarmData other) {
long distance = expirationTimeMs - other.expirationTimeMs;
return (distance > 0 ? 1 : (distance < 0) ? -1 : 0);
}
@Override
public String toString() {
assertInThread();
StringBuilder sb = new StringBuilder();
sb.append("Alarm{cancelled=");
sb.append(isCancelled);
sb.append(",duration=");
sb.append(expirationTimeMs - creationTimeMs);
sb.append(",remaining=");
sb.append(expirationTimeMs - mOsAccess.monotonicTimeMillis());
sb.append('}');
return sb.toString();
}
}
}