Skip to content

Commit 082fe70

Browse files
authored
Merge pull request #155 from jayzhenghan/feature_progressCallback
Feature progress callback
2 parents 22debb2 + 974e4a8 commit 082fe70

File tree

3 files changed

+54
-5
lines changed

3 files changed

+54
-5
lines changed

qcloud_cos/cos_client.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import time
1010
import copy
1111
import json
12+
import threading
1213
import xml.dom.minidom
1314
import xml.etree.ElementTree
1415
from requests import Request, Session, ConnectionError, Timeout
@@ -2908,7 +2909,7 @@ def list_buckets(self, **kwargs):
29082909
return data
29092910

29102911
# Advanced interface
2911-
def _upload_part(self, bucket, key, local_path, offset, size, part_num, uploadid, md5_lst, resumable_flag, already_exist_parts, enable_md5, traffic_limit):
2912+
def _upload_part(self, bucket, key, local_path, offset, size, part_num, uploadid, md5_lst, resumable_flag, already_exist_parts, enable_md5, traffic_limit, progress_callback=None):
29122913
"""从本地文件中读取分块, 上传单个分块,将结果记录在md5——list中
29132914
29142915
:param bucket(string): 存储桶名称.
@@ -2933,6 +2934,8 @@ def _upload_part(self, bucket, key, local_path, offset, size, part_num, uploadid
29332934
data = fp.read(size)
29342935
rt = self.upload_part(bucket, key, data, part_num, uploadid, enable_md5, TrafficLimit=traffic_limit)
29352936
md5_lst.append({'PartNumber': part_num, 'ETag': rt['ETag']})
2937+
if progress_callback:
2938+
progress_callback.report(size)
29362939
return None
29372940

29382941
def _get_resumable_uploadid(self, bucket, key):
@@ -3045,7 +3048,7 @@ def download_file(self, Bucket, Key, DestFilePath, PartSize=20, MAXThread=5, Ena
30453048
downloader = ResumableDownLoader(self, Bucket, Key, DestFilePath, object_info, PartSize, MAXThread, EnableCRC, **Kwargs)
30463049
downloader.start()
30473050

3048-
def upload_file(self, Bucket, Key, LocalFilePath, PartSize=1, MAXThread=5, EnableMD5=False, **kwargs):
3051+
def upload_file(self, Bucket, Key, LocalFilePath, PartSize=1, MAXThread=5, EnableMD5=False, progress_callback=None, **kwargs):
30493052
"""小于等于20MB的文件简单上传,大于20MB的文件使用分块上传
30503053
30513054
:param Bucket(string): 存储桶名称.
@@ -3115,12 +3118,14 @@ def upload_file(self, Bucket, Key, LocalFilePath, PartSize=1, MAXThread=5, Enabl
31153118
offset = 0 # 记录文件偏移量
31163119
lst = list() # 记录分块信息
31173120
pool = SimpleThreadPool(MAXThread)
3118-
3121+
callback = None
3122+
if progress_callback:
3123+
callback = ProgressCallback(file_size, progress_callback)
31193124
for i in range(1, parts_num+1):
31203125
if i == parts_num: # 最后一块
3121-
pool.add_task(self._upload_part, Bucket, Key, LocalFilePath, offset, file_size-offset, i, uploadid, lst, resumable_flag, already_exist_parts, EnableMD5, traffic_limit)
3126+
pool.add_task(self._upload_part, Bucket, Key, LocalFilePath, offset, file_size-offset, i, uploadid, lst, resumable_flag, already_exist_parts, EnableMD5, traffic_limit, callback)
31223127
else:
3123-
pool.add_task(self._upload_part, Bucket, Key, LocalFilePath, offset, part_size, i, uploadid, lst, resumable_flag, already_exist_parts, EnableMD5, traffic_limit)
3128+
pool.add_task(self._upload_part, Bucket, Key, LocalFilePath, offset, part_size, i, uploadid, lst, resumable_flag, already_exist_parts, EnableMD5, traffic_limit, callback)
31243129
offset += part_size
31253130

31263131
pool.wait_completion()

qcloud_cos/cos_comm.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io
99
import re
1010
import sys
11+
import threading
1112
import xml.dom.minidom
1213
import xml.etree.ElementTree
1314
from datetime import datetime
@@ -459,3 +460,16 @@ class CiDetectType():
459460
TERRORIST = 2
460461
POLITICS = 4
461462
ADS = 8
463+
464+
465+
class ProgressCallback():
466+
def __init__(self, file_size, progress_callback):
467+
self.__lock = threading.Lock()
468+
self.__finished_size = 0
469+
self.__file_size = file_size
470+
self.__progress_callback = progress_callback
471+
472+
def report(self, size):
473+
with self.__lock:
474+
self.__finished_size += size
475+
self.__progress_callback(self.__finished_size, self.__file_size)

ut/test.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,16 @@ def print_error_msg(e):
9292
print (e.get_trace_id())
9393
print (e.get_request_id())
9494

95+
def percentage(consumed_bytes, total_bytes):
96+
"""进度条回调函数,计算当前完成的百分比
97+
98+
:param consumed_bytes: 已经上传/下载的数据量
99+
:param total_bytes: 总数据量
100+
"""
101+
if total_bytes:
102+
rate = int(100 * (float(consumed_bytes) / float(total_bytes)))
103+
print('\r{0}% '.format(rate))
104+
sys.stdout.flush()
95105

96106
def setUp():
97107
print ("start test...")
@@ -714,6 +724,25 @@ def test_upload_file_multithreading():
714724
print (ed - st)
715725

716726

727+
def test_upload_file_with_progress_callback():
728+
"""带有进度条功能的并发上传"""
729+
file_name = "test_progress_callback"
730+
file_size = 1024
731+
if TRAVIS_FLAG == 'true':
732+
file_size = 5 # set 5MB beacuse travis too slow
733+
gen_file(file_name, file_size)
734+
response = client.upload_file(
735+
Bucket=test_bucket,
736+
Key=file_name,
737+
LocalFilePath=file_name,
738+
MAXThread=5,
739+
EnableMD5=True,
740+
progress_callback=percentage
741+
)
742+
if os.path.exists(file_name):
743+
os.remove(file_name)
744+
745+
717746
def test_copy_file_automatically():
718747
"""根据拷贝源文件的大小自动选择拷贝策略,不同园区,小于5G直接copy_object,大于5G分块拷贝"""
719748
copy_source = {'Bucket': copy_test_bucket, 'Key': 'test.txt', 'Region': REGION}
@@ -1493,6 +1522,7 @@ def test_live_channel():
14931522
test_put_get_delete_replication()
14941523
test_upload_part_copy()
14951524
test_upload_file_multithreading()
1525+
test_upload_file_with_progress_callback()
14961526
test_copy_file_automatically()
14971527
test_copy_10G_file_in_same_region()
14981528
test_list_objects()

0 commit comments

Comments
 (0)