Uploading files in S3 with python

In the previous article we created a bucket in s3 using boto3. Now it’s time to upload some files that we will use later to analyze.
There are many ways to upload file(s) in s3. Easy way is to use console but it’s not automated, meaning you will have to do it manually. Now imagine you have logs generated daily. It would be impossible to go to console and upload each log file one at a time.
That’s why in this guide, we will go through the process of automating this tedious task.
Uploading a single file
Create a file on s3 folder.
We will also create a data directory in our root directory to make it easier to work. While this is not necessary as files can reside anywhere as long as we can access them, doing this way makes it easier to follow thorough.
Paste this code.
import os
import boto3
class FileManager:
def __init__(self):
self.bucket_name = "aws-demo-fruit"
self.s3 = boto3.client('s3')
self.data_dir = "<path to your data dir>"
def upload_single_file(self, filename):
try:
self.s3.upload_file(
Filename=os.path.join(self.data_dir, filename),
Bucket=self.bucket_name,
Key=filename
)
except Exception as e:
print(f"Error: {e.args}")
Now run the code from you
main.py
I have used iris dataset as it is easily available, small and overall easier to work with. Feel free to upload any file you want.
Now if you open your aws console, you should see something similar.
The good thing about using
upload_file()
is that it gives us a callback option that we can use to see the file upload progressdef upload_single_file(self, filename=None): if filename is None: filename = self.filename self.filesize = float(os.path.getsize(os.path.join(self.data_dir, filename))) try: self.s3.upload_file( Filename=os.path.join(self.data_dir, filename), Bucket=self.bucket_name, Key=filename, Callback=lambda bytes_transferred: self.__call__(bytes_transferred, filename) ) except Exception as e: print(f"Error: {e.args}")
We can write our call back as such
def __call__(self, bytes_amount, filename): with self._lock: self.uploaded += bytes_amount upload_percentage = (self.uploaded / self.filesize) * 100 print(f"file : {filename} : {self.uploaded} / {self.filesize} ----- {upload_percentage}% ")
We will need to add some parameters on our
__init__
methodself.filesize = 0 self.uploaded = 0 self._lock = threading.Lock()
Final code looks like this
import os,sys
import threading
import boto3
class FileManager:
def __init__(self):
self.bucket_name = "aws-demo-fruit"
self.s3 = boto3.client('s3')
self.data_dir = "<path to your data dir>"
self.filesize = 0
self.uploaded = 0
self._lock = threading.Lock()
def __call__(self, bytes_amount, filename):
with self._lock:
self.uploaded += bytes_amount
upload_percentage = (self.uploaded / self.filesize) * 100
print(f"file : {filename} : {self.uploaded} / {self.filesize} ----- {upload_percentage}% ")
def upload_single_file(self, filename=None):
if filename is None:
filename = self.filename
self.filesize = float(os.path.getsize(os.path.join(self.data_dir, filename)))
try:
self.s3.upload_file(
Filename=os.path.join(self.data_dir, filename),
Bucket=self.bucket_name,
Key=filename,
Callback=lambda bytes_transferred: self.__call__(bytes_transferred, filename)
)
except Exception as e:
print(f"Error: {e.args}")
Output should be something like this
file : iris.csv : 5107 / 5107.0 ----- 100.0%
For a bit larger file you should see something like
file : annual-enterprise.csv : 1048576 / 8065547.0 ----- 13.000680549006782% file : annual-enterprise.csv : 2097152 / 8065547.0 ----- 26.001361098013565% file : annual-enterprise.csv : 3145728 / 8065547.0 ----- 39.00204164702035% file : annual-enterprise.csv : 4194304 / 8065547.0 ----- 52.00272219602713% file : annual-enterprise.csv : 5242880 / 8065547.0 ----- 65.00340274503391% file : annual-enterprise.csv : 6291456 / 8065547.0 ----- 78.0040832940407% file : annual-enterprise.csv : 7340032 / 8065547.0 ----- 91.00476384304747% file : annual-enterprise.csv : 8065547 / 8065547.0 ----- 100.0%
Here is the fun part, we know what a multipart upload in s3, right ? For larger file s3 recommends that we use multipart upload, that is it the file is split into defined chunks and uploaded in parallel rather than as a whole in a single thread. In order to use multiple upload with boto3 we will need to modify our code slightly
def upload_single_file(self, filename=None): if filename is None: filename = self.filename self.filesize = float(os.path.getsize(os.path.join(self.data_dir, filename))) # TransferConfig allows us to define the parameters for multipart. # Since default threshold is larger than our filesize we will use smaller threshold to force # boto3 to use multipart. We will use 100KB chunk size transfer_config = TransferConfig( multipart_threshold=100 * KB, multipart_chunksize=100 * KB ) try: self.s3.upload_file( Filename=os.path.join(self.data_dir, filename), Bucket=self.bucket_name, Key=filename, Callback=lambda bytes_transferred: self.__call__(bytes_transferred, filename), Config=transfer_config ) # We can verify the upload completion by checking ETag # Note: For multipart uploads, response ETag will contain a dash and part count response = self.s3.head_object(Bucket=self.bucket_name, Key=filename) etag = response['ETag'] print(etag) except Exception as e: print(f"Error: {e.args}")
The response now would be something similar to this
"924e834fb0c3946321bc1f716f163a8e-1"
now lets increase the
multipart_threshold
multipart_threshold=100 * MB
and the response would be something similar to this
"0a972b95e3216112c716228740cce66a"
Notice there is no dash (-). This is because the threshold we used is larger than the file size. The absence of the dash confirms it to be not a multipart upload.
Now to upload all files in a directory we simply walk the directory and upload files.
def upload_directory(self, max_workers: int = 4):
files_list = []
for root, _, files in os.walk(self.data_dir):
for f in files:
files_list.append(os.path.join(root, f))
if not files_list:
print(f" No files in {self.data_dir} ?")
return
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(self.upload_single_file, os.path.relpath(path, self.data_dir)): path
for path in files_list
}
"""
We use as_completed() here for futures. It is an iterator over the given futures that yields each as it
completes. [Doc]. as_completed() takes a list (or any iterable) of Future objects and returns an
iterator that yields each future as soon as it finishes, in no particular order.
"""
for future in as_completed(futures):
src_path = futures[future]
try:
future.result()
print(f"Uploaded: {src_path}")
except Exception as e:
print(f"Failed to upload {src_path}: {e}")
Calling this function should give you something like this
file : iris.csv : 5107 / 5107.0 ----- 100.0%
file : annual-enterprise.csv : 1053683 / 5107.0 ----- 20632.132367338945%
"717820ef0af287ff346c5cabfb4c612c"
Uploaded: <path to data>/iris.csv
file : annual-enterprise.csv : 2102259 / 5107.0 ----- 41164.26473467789%
file : annual-enterprise.csv : 3150835 / 5107.0 ----- 61696.39710201684%
file : annual-enterprise.csv : 4062711 / 5107.0 ----- 79551.81123947522%
file : annual-enterprise.csv : 5111287 / 5107.0 ----- 100083.94360681418%
file : annual-enterprise.csv : 6159863 / 5107.0 ----- 120616.07597415311%
file : annual-enterprise.csv : 7208439 / 5107.0 ----- 141148.20834149208%
file : annual-enterprise.csv : 8257015 / 5107.0 ----- 161680.34070883103%
"e24b18ef5a0bf1d0f5bec32bf0cef99a-2"
Uploaded: <path to data>/annual-enterprise.csv
As always full code on my GitHub
Subscribe to my newsletter
Read articles from Saurab Dahal directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
