Skip to content

Commit fc0f202

Browse files
authored
Merge pull request #229 from tencentyun/dev/checkPointUpload
Dev/check point upload
2 parents 83636dd + 1c4f894 commit fc0f202

11 files changed

+323
-9
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<modelVersion>4.0.0</modelVersion>
55
<groupId>com.qcloud</groupId>
66
<artifactId>cos_api</artifactId>
7-
<version>5.6.237</version>
7+
<version>5.6.238</version>
88
<packaging>jar</packaging>
99
<name>cos-java-sdk</name>
1010
<description>java sdk for qcloud cos</description>

src/main/java/com/qcloud/cos/COSClient.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -176,11 +176,7 @@ public COSClient(COSCredentialsProvider credProvider, ClientConfig clientConfig)
176176
super();
177177
this.credProvider = credProvider;
178178
this.clientConfig = clientConfig;
179-
if (clientConfig.getRequestTimeOutEnable()) {
180-
this.cosHttpClient = new TimeOutCosHttpClient(clientConfig);
181-
} else {
182-
this.cosHttpClient = new DefaultCosHttpClient(clientConfig);
183-
}
179+
this.cosHttpClient = new DefaultCosHttpClient(clientConfig);
184180
}
185181

186182
public void shutdown() {

src/main/java/com/qcloud/cos/ClientConfig.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ public class ClientConfig {
131131

132132
private long preflightStatusUpdateInterval = 10 * 1000L;
133133

134+
private boolean isRedirectsEnabled = false;
135+
134136
// 不传入region 用于后续调用List Buckets(获取所有的bucket信息)
135137
public ClientConfig() {
136138
super();
@@ -373,7 +375,7 @@ public void setRequestTimeOutEnable(boolean requestTimeOutEnable) {
373375
}
374376

375377
public boolean getRequestTimeOutEnable() {
376-
return isRequestTimeOutEnable;
378+
return isRequestTimeOutEnable && (requestTimeout > 0);
377379
}
378380

379381
public void setShutdownTimeout(int shutdownTimeout) {
@@ -467,4 +469,12 @@ public void setCheckPreflightStatus(boolean checkPreflightStatus) {
467469
public long getPreflightStatusUpdateInterval() {
468470
return preflightStatusUpdateInterval;
469471
}
472+
473+
public boolean isRedirectsEnabled() {
474+
return isRedirectsEnabled;
475+
}
476+
477+
public void setRedirectsEnabled(boolean redirectsEnabled) {
478+
isRedirectsEnabled = redirectsEnabled;
479+
}
470480
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.qcloud.cos.http;
2+
3+
import com.qcloud.cos.exception.CosClientException;
4+
import com.qcloud.cos.internal.CosClientAbortTask;
5+
import com.qcloud.cos.internal.CosClientAbortTaskMonitor;
6+
import com.qcloud.cos.internal.DefaultClientAbortTaskImpl;
7+
import com.qcloud.cos.internal.CosClientAbortTaskImpl;
8+
import com.qcloud.cos.internal.CosClientAbortTaskMonitorImpl;
9+
10+
import java.util.concurrent.ScheduledFuture;
11+
import java.util.concurrent.ScheduledThreadPoolExecutor;
12+
import java.util.concurrent.ThreadFactory;
13+
import java.util.concurrent.TimeUnit;
14+
15+
public class CosHttpClientTimer {
16+
private volatile ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
17+
18+
private static ThreadFactory getThreadFactory(final String name) {
19+
return new ThreadFactory() {
20+
private int threadCount = 1;
21+
22+
public Thread newThread(Runnable r) {
23+
Thread thread = new Thread(r);
24+
if (name != null) {
25+
thread.setName(name + "-" + threadCount++);
26+
}
27+
thread.setPriority(Thread.MAX_PRIORITY);
28+
return thread;
29+
}
30+
};
31+
}
32+
33+
private synchronized void initializeExecutor() {
34+
if (scheduledThreadPoolExecutor == null) {
35+
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5, getThreadFactory("COSClientRequestTimeOutThread"));
36+
try {
37+
executor.getClass().getMethod("setRemoveOnCancelPolicy", boolean.class).invoke(executor, Boolean.TRUE);
38+
} catch (NoSuchMethodException e) {
39+
throw new CosClientException("The request timeout feature is only available for Java 1.7 and above.");
40+
} catch (SecurityException e) {
41+
throw new CosClientException("The request timeout feature needs additional permissions to function.", e);
42+
} catch (Exception e) {
43+
throw new CosClientException(e);
44+
}
45+
46+
executor.setKeepAliveTime(5, TimeUnit.SECONDS);
47+
executor.allowCoreThreadTimeOut(true);
48+
49+
scheduledThreadPoolExecutor = executor;
50+
}
51+
}
52+
53+
public CosClientAbortTaskMonitor startTimer(int requestTimeout) {
54+
if (requestTimeout <= 0) {
55+
return DefaultClientAbortTaskImpl.INSTANCE;
56+
} else if (scheduledThreadPoolExecutor == null) {
57+
initializeExecutor();
58+
}
59+
60+
CosClientAbortTask task = new CosClientAbortTaskImpl(Thread.currentThread());
61+
ScheduledFuture<?> timerTaskFuture = scheduledThreadPoolExecutor.schedule(task, requestTimeout, TimeUnit.MILLISECONDS);
62+
return new CosClientAbortTaskMonitorImpl(task, timerTaskFuture);
63+
}
64+
65+
public synchronized void shutdown() {
66+
if (scheduledThreadPoolExecutor != null) {
67+
scheduledThreadPoolExecutor.shutdown();
68+
}
69+
}
70+
}

src/main/java/com/qcloud/cos/http/CosHttpRequest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
import com.qcloud.cos.auth.COSCredentials;
2828
import com.qcloud.cos.event.ProgressListener;
2929
import com.qcloud.cos.exception.ExceptionLogDetail;
30+
import com.qcloud.cos.internal.CosClientAbortTaskMonitor;
3031
import com.qcloud.cos.internal.CosServiceRequest;
32+
import com.qcloud.cos.internal.DefaultClientAbortTaskImpl;
3133

3234
public class CosHttpRequest<T extends CosServiceRequest> {
3335

@@ -62,6 +64,8 @@ public class CosHttpRequest<T extends CosServiceRequest> {
6264

6365
private List<ExceptionLogDetail> logDetails = new ArrayList<ExceptionLogDetail>();
6466

67+
private CosClientAbortTaskMonitor clientAbortTaskMonitor = DefaultClientAbortTaskImpl.INSTANCE;
68+
6569
public CosHttpRequest(T originRequest) {
6670
this.originRequest = originRequest;
6771
this.ciSpecialEndParameter = originRequest.getCiSpecialEndParameter();
@@ -182,6 +186,14 @@ public List<ExceptionLogDetail> getExceptionsLogDetails() {
182186
return logDetails;
183187
}
184188

189+
public CosClientAbortTaskMonitor getClientAbortTaskMonitor() {
190+
return clientAbortTaskMonitor;
191+
}
192+
193+
public void setClientAbortTaskMonitor(CosClientAbortTaskMonitor clientAbortTaskMonitor) {
194+
this.clientAbortTaskMonitor = clientAbortTaskMonitor;
195+
}
196+
185197
@Override
186198
public String toString() {
187199
StringBuilder strBuilder = new StringBuilder();

src/main/java/com/qcloud/cos/http/DefaultCosHttpClient.java

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import com.qcloud.cos.region.Region;
5757
import com.qcloud.cos.event.ProgressInputStream;
5858
import com.qcloud.cos.event.ProgressListener;
59+
import com.qcloud.cos.exception.AbortedException;
5960
import com.qcloud.cos.exception.ClientExceptionConstants;
6061
import com.qcloud.cos.exception.CosClientException;
6162
import com.qcloud.cos.exception.CosServiceException;
@@ -70,6 +71,7 @@
7071
import com.qcloud.cos.internal.SdkBufferedInputStream;
7172
import com.qcloud.cos.internal.CIWorkflowServiceRequest;
7273
import com.qcloud.cos.internal.CIServiceRequest;
74+
import com.qcloud.cos.internal.CosClientAbortTaskMonitor;
7375
import com.qcloud.cos.retry.BackoffStrategy;
7476
import com.qcloud.cos.retry.RetryPolicy;
7577
import com.qcloud.cos.utils.CodecUtils;
@@ -124,6 +126,7 @@ public class DefaultCosHttpClient implements CosHttpClient {
124126

125127
private CosErrorResponseHandler errorResponseHandler;
126128
private HandlerAfterProcess handlerAfterProcess;
129+
private final CosHttpClientTimer cosHttpClientTimer;
127130
private static final Logger log = LoggerFactory.getLogger(DefaultCosHttpClient.class);
128131

129132
public DefaultCosHttpClient(ClientConfig clientConfig) {
@@ -179,6 +182,7 @@ public InetAddress[] resolve(String host) throws UnknownHostException {
179182
this.maxErrorRetry = clientConfig.getMaxErrorRetry();
180183
this.retryPolicy = ValidationUtils.assertNotNull(clientConfig.getRetryPolicy(), "retry policy");
181184
this.backoffStrategy = ValidationUtils.assertNotNull(clientConfig.getBackoffStrategy(), "backoff strategy");
185+
this.cosHttpClientTimer = new CosHttpClientTimer();
182186
initHttpClient();
183187
}
184188

@@ -201,7 +205,9 @@ private void initHttpClient() {
201205
.setConnectionRequestTimeout(
202206
this.clientConfig.getConnectionRequestTimeout())
203207
.setConnectTimeout(this.clientConfig.getConnectionTimeout())
204-
.setSocketTimeout(this.clientConfig.getSocketTimeout()).build();
208+
.setSocketTimeout(this.clientConfig.getSocketTimeout())
209+
.setRedirectsEnabled(this.clientConfig.isRedirectsEnabled())
210+
.build();
205211
this.idleConnectionMonitor = new IdleConnectionMonitorThread(this.connectionManager);
206212
this.idleConnectionMonitor.setIdleAliveMS(this.clientConfig.getIdleConnectionAlive());
207213
this.idleConnectionMonitor.setDaemon(true);
@@ -224,6 +230,7 @@ public void shutdown() {
224230
}
225231
log.info(trace.toString());
226232
}
233+
cosHttpClientTimer.shutdown();
227234
this.idleConnectionMonitor.shutdown();
228235
}
229236

@@ -596,7 +603,11 @@ public <X, Y extends CosServiceRequest> X exeute(CosHttpRequest<Y> request,
596603
httpRequest = buildHttpRequest(request);
597604
httpResponse = null;
598605
startTime = System.currentTimeMillis();
599-
httpResponse = executeRequest(context, httpRequest);
606+
if (clientConfig.getRequestTimeOutEnable()) {
607+
httpResponse = executeRequestWithTimeout(context, httpRequest, request);
608+
} else {
609+
httpResponse = executeRequest(context, httpRequest);
610+
}
600611
checkResponse(request, httpRequest, httpResponse);
601612
break;
602613
} catch (CosServiceException cse) {
@@ -737,6 +748,63 @@ private HttpResponse executeRequest(HttpContext context, HttpRequestBase httpReq
737748
return httpResponse;
738749
}
739750

751+
private <Y extends CosServiceRequest> HttpResponse executeRequestWithTimer(HttpContext context, HttpRequestBase httpRequest, CosHttpRequest<Y> originRequest) throws Exception {
752+
CosClientAbortTaskMonitor abortTaskMonitor = cosHttpClientTimer.startTimer(clientConfig.getRequestTimeout());
753+
abortTaskMonitor.setCurrentHttpRequest(httpRequest);
754+
HttpResponse httpResponse = null;
755+
try {
756+
originRequest.setClientAbortTaskMonitor(abortTaskMonitor);
757+
httpResponse = executeOneRequest(context, httpRequest);
758+
} catch (IOException ie) {
759+
if (originRequest.getClientAbortTaskMonitor().hasTimeoutExpired()) {
760+
Thread.interrupted();
761+
String errorMsg = String.format("catch IOException when executing http request[%s], and execution aborted task has been done, exp:", originRequest);
762+
log.error(errorMsg, ie);
763+
throw new InterruptedException();
764+
}
765+
throw ie;
766+
} finally {
767+
originRequest.getClientAbortTaskMonitor().cancelTask();
768+
}
769+
770+
return httpResponse;
771+
}
772+
773+
private <Y extends CosServiceRequest> HttpResponse executeRequestWithTimeout(HttpContext context, HttpRequestBase httpRequest, CosHttpRequest<Y> originRequest) throws Exception {
774+
try {
775+
return executeRequestWithTimer(context, httpRequest, originRequest);
776+
} catch (InterruptedException ie) {
777+
if (originRequest.getClientAbortTaskMonitor().hasTimeoutExpired()) {
778+
Thread.interrupted();
779+
String errorMsg = "InterruptedException: time out after waiting " + this.clientConfig.getRequestTimeout()/1000 + " seconds";
780+
throw new CosClientException(errorMsg, ClientExceptionConstants.REQUEST_TIMEOUT, ie);
781+
}
782+
if (!httpRequest.isAborted()) {
783+
httpRequest.abort();
784+
}
785+
throw ie;
786+
} catch (AbortedException ae) {
787+
if (originRequest.getClientAbortTaskMonitor().hasTimeoutExpired()) {
788+
Thread.interrupted();
789+
String errorMsg = "AbortedException: time out after waiting " + this.clientConfig.getRequestTimeout()/1000 + " seconds";
790+
throw new CosClientException(errorMsg, ClientExceptionConstants.REQUEST_TIMEOUT, ae);
791+
}
792+
if (!httpRequest.isAborted()) {
793+
httpRequest.abort();
794+
}
795+
throw ae;
796+
} catch (IOException ie) {
797+
if (!httpRequest.isAborted()) {
798+
httpRequest.abort();
799+
}
800+
throw ExceptionUtils.createClientException(ie);
801+
} finally {
802+
if (originRequest.getClientAbortTaskMonitor().hasTimeoutExpired()) {
803+
Thread.interrupted();
804+
}
805+
}
806+
}
807+
740808
private <Y extends CosServiceRequest> void handleLog(CosHttpRequest<Y> request) {
741809
for (ExceptionLogDetail logDetail : request.getExceptionsLogDetails()) {
742810
log.error(logDetail.getErrMsg(), logDetail.getException());
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.qcloud.cos.internal;
2+
3+
import org.apache.http.client.methods.HttpRequestBase;
4+
5+
public interface CosClientAbortTask extends Runnable {
6+
void setCurrentHttpRequest(HttpRequestBase newRequest);
7+
8+
boolean hasClientExecutionAborted();
9+
10+
boolean isEnabled();
11+
12+
void cancel();
13+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.qcloud.cos.internal;
2+
3+
import org.apache.http.client.methods.HttpRequestBase;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
7+
public class CosClientAbortTaskImpl implements CosClientAbortTask {
8+
private volatile boolean hasTaskExecuted;
9+
private HttpRequestBase currentHttpRequest;
10+
private final Thread thread;
11+
private volatile boolean isCancelled;
12+
13+
private static final Logger log = LoggerFactory.getLogger(CosClientAbortTaskImpl.class);
14+
15+
private final Object lock = new Object();
16+
17+
public CosClientAbortTaskImpl(Thread thread) {
18+
this.thread = thread;
19+
}
20+
21+
@Override
22+
public void run() {
23+
synchronized (this.lock) {
24+
if (isCancelled) {
25+
return;
26+
}
27+
hasTaskExecuted = true;
28+
if (!thread.isInterrupted()) {
29+
log.debug("request timeout and current thread will be interrupted");
30+
thread.interrupt();
31+
}
32+
if (!currentHttpRequest.isAborted()) {
33+
log.debug("request timeout and current http request will be aborted");
34+
currentHttpRequest.abort();
35+
}
36+
}
37+
}
38+
39+
@Override
40+
public void setCurrentHttpRequest(HttpRequestBase newRequest) {
41+
this.currentHttpRequest = newRequest;
42+
}
43+
44+
@Override
45+
public boolean hasClientExecutionAborted() {
46+
synchronized (this.lock) {
47+
return hasTaskExecuted;
48+
}
49+
}
50+
51+
@Override
52+
public boolean isEnabled() {
53+
return true;
54+
}
55+
56+
@Override
57+
public void cancel() {
58+
synchronized (this.lock) {
59+
isCancelled = true;
60+
}
61+
}
62+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.qcloud.cos.internal;
2+
3+
import org.apache.http.client.methods.HttpRequestBase;
4+
5+
public interface CosClientAbortTaskMonitor {
6+
void setCurrentHttpRequest(HttpRequestBase newRequest);
7+
8+
boolean hasTimeoutExpired();
9+
10+
boolean isEnabled();
11+
12+
void cancelTask();
13+
}

0 commit comments

Comments
 (0)