Skip to content

Commit af0cbae

Browse files
committed
allow concurrent writes and flushes
1 parent f2d3c73 commit af0cbae

1 file changed

Lines changed: 46 additions & 54 deletions

File tree

google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java

Lines changed: 46 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,19 @@
1818

1919
import static com.google.common.base.MoreObjects.firstNonNull;
2020

21+
import com.google.api.gax.core.ApiFuture;
2122
import com.google.api.gax.core.ApiFutureCallback;
2223
import com.google.api.gax.core.ApiFutures;
2324
import com.google.cloud.MonitoredResource;
2425
import com.google.cloud.logging.Logging.WriteOption;
2526
import com.google.common.collect.ImmutableList;
2627
import com.google.common.collect.ImmutableMap;
27-
import com.google.common.util.concurrent.Monitor;
28+
import com.google.common.collect.MapMaker;
29+
import com.google.common.util.concurrent.Uninterruptibles;
2830
import java.util.ArrayList;
2931
import java.util.Collections;
3032
import java.util.List;
33+
import java.util.concurrent.ConcurrentMap;
3134
import java.util.logging.ErrorManager;
3235
import java.util.logging.Filter;
3336
import java.util.logging.Formatter;
@@ -120,45 +123,12 @@ public class LoggingHandler extends Handler {
120123
// https://github.com/GoogleCloudPlatform/google-cloud-java/issues/1740 .
121124
private final Level baseLevel;
122125

123-
private final Monitor flushMonitor = new Monitor(true);
124-
private int numPendingWrites = 0;
125-
private int numFlushers = 0;
126-
private final Monitor.Guard noFlushers =
127-
new Monitor.Guard(flushMonitor) {
128-
@Override
129-
public boolean isSatisfied() {
130-
return numFlushers == 0;
131-
}
132-
};
133-
private final Monitor.Guard writesCompleted =
134-
new Monitor.Guard(flushMonitor) {
135-
@Override
136-
public boolean isSatisfied() {
137-
return numPendingWrites == 0;
138-
}
139-
};
140-
private final ApiFutureCallback<Void> writeCallback =
141-
new ApiFutureCallback<Void>() {
142-
@Override
143-
public void onSuccess(Void v) {
144-
flushMonitor.enter();
145-
try {
146-
numPendingWrites--;
147-
} finally {
148-
flushMonitor.leave();
149-
}
150-
}
151-
152-
@Override
153-
public void onFailure(Throwable t) {
154-
if (t instanceof Exception) {
155-
reportError(null, (Exception) t, ErrorManager.FLUSH_FAILURE);
156-
} else {
157-
reportError(null, new Exception(t), ErrorManager.FLUSH_FAILURE);
158-
}
159-
onSuccess(null);
160-
}
161-
};
126+
// A map whose keys are pending write operations. The values of the map are meaningless, but the type is Boolean
127+
// and not Void since the map implementation does not allow null values.
128+
// Since the map has weak keys and we do not hold on to completed futures,
129+
// completed futures are automatically GCed and removed from the map.
130+
private final ConcurrentMap<ApiFuture<Void>, Boolean> pendingWrites =
131+
new MapMaker().weakKeys().makeMap();
162132

163133
/**
164134
* Creates an handler that publishes messages to Stackdriver Logging.
@@ -503,27 +473,49 @@ void write(LogEntry entry, WriteOption... options) {
503473

504474
case ASYNC:
505475
default:
506-
flushMonitor.enterWhenUninterruptibly(noFlushers);
507-
try {
508-
numPendingWrites++;
509-
ApiFutures.addCallback(getLogging().writeAsync(entryList, options), writeCallback);
510-
} finally {
511-
flushMonitor.leave();
512-
}
476+
ApiFuture<Void> writeFuture = getLogging().writeAsync(entryList, options);
477+
pendingWrites.put(writeFuture, Boolean.TRUE);
478+
ApiFutures.addCallback(
479+
writeFuture,
480+
new ApiFutureCallback<Void>() {
481+
@Override
482+
public void onSuccess(Void v) {
483+
// Nothing to do.
484+
}
485+
486+
@Override
487+
public void onFailure(Throwable t) {
488+
if (t instanceof Exception) {
489+
reportError(null, (Exception) t, ErrorManager.FLUSH_FAILURE);
490+
} else {
491+
reportError(null, new Exception(t), ErrorManager.FLUSH_FAILURE);
492+
}
493+
}
494+
});
513495
break;
514496
}
515497
}
516498

517499
@Override
518500
public void flush() {
519501
// BUG(1795): flush is broken, need support from batching implementation.
520-
flushMonitor.enter();
521-
try {
522-
numFlushers++;
523-
flushMonitor.waitForUninterruptibly(writesCompleted);
524-
numFlushers--;
525-
} finally {
526-
flushMonitor.leave();
502+
503+
// Make a copy of currently-pending writes.
504+
// As new writes are made, they might be reflected in the keySet iterator.
505+
// If we naively iterate through keySet, waiting for each future,
506+
// we might never finish.
507+
ArrayList<ApiFuture<Void>> writes = new ArrayList<>(pendingWrites.size());
508+
for (ApiFuture<Void> write : pendingWrites.keySet()) {
509+
writes.add(write);
510+
}
511+
for (int i = 0; i < writes.size(); i++) {
512+
ApiFuture<Void> write = writes.get(i);
513+
try {
514+
Uninterruptibles.getUninterruptibly(write);
515+
} catch (Exception e) {
516+
// Ignore exceptions, they are propagated to the error manager.
517+
}
518+
writes.set(i, null);
527519
}
528520
}
529521

0 commit comments

Comments
 (0)