Uploading files in S3 with python

Saurab DahalSaurab Dahal
5 min read

In the previous article we created a bucket in s3 using boto3. Now it’s time to upload some files that we will use later to analyze.

There are many ways to upload file(s) in s3. Easy way is to use console but it’s not automated, meaning you will have to do it manually. Now imagine you have logs generated daily. It would be impossible to go to console and upload each log file one at a time.

That’s why in this guide, we will go through the process of automating this tedious task.

Uploading a single file

Create a file on s3 folder.

We will also create a data directory in our root directory to make it easier to work. While this is not necessary as files can reside anywhere as long as we can access them, doing this way makes it easier to follow thorough.

Paste this code.

import os
import boto3

class FileManager:
    def __init__(self):
        self.bucket_name = "aws-demo-fruit"
        self.s3 = boto3.client("s3")
        self.data_dir = "<path to your data dir>"

    def upload_single_file(self, filename):
        try:
            self.s3.upload_file(
                Filename=os.path.join(self.data_dir, filename),
                Bucket=self.bucket_name,
                Key=filename,
            )
        except Exception as e:
            print(f"Error: {e.args}")

Now run the code from you main.py

I have used iris dataset as it is easily available, small and overall easier to work with. Feel free to upload any file you want.

Now if you open your aws console, you should see something similar.

The good thing about using upload_file() is that it gives us a callback option that we can use to see the file upload progress

def upload_single_file(self, filename=None):
    if filename is None:
        filename = self.filename
        self.filesize = float(os.path.getsize(os.path.join(self.data_dir, filename)))
        try:
            self.s3.upload_file(
                Filename=os.path.join(self.data_dir, filename),
                Bucket=self.bucket_name,
                Key=filename,
                Callback=lambda bytes_transferred: self.__call__(
                    bytes_transferred, filename
                ),
            )
        except Exception as e:
            print(f"Error: {e.args}")

We can write our call back as such

def __call__(self, bytes_amount, filename):
    with self._lock:
        self.uploaded += bytes_amount
        upload_percentage = (self.uploaded / self.filesize) * 100
        print(
            f"file : {filename} : {self.uploaded} / {self.filesize} ----- {upload_percentage}% "
        )

We will need to add some parameters on our __init__ method

self.filesize = 0
self.uploaded = 0
self._lock = threading.Lock()

Final code looks like this

import os,sys
import threading

import boto3


class FileManager:
    def __init__(self):
        self.bucket_name = "aws-demo-fruit"
        self.s3 = boto3.client('s3')
        self.data_dir = "<path to your data dir>"
        self.filesize = 0
        self.uploaded = 0
        self._lock = threading.Lock()

    def __call__(self, bytes_amount, filename):
        with self._lock:
            self.uploaded += bytes_amount
            upload_percentage = (self.uploaded / self.filesize) * 100
            print(f"file : {filename} : {self.uploaded} / {self.filesize} ----- {upload_percentage}% ")

    def upload_single_file(self, filename=None):
        if filename is None:
            filename = self.filename
        self.filesize = float(os.path.getsize(os.path.join(self.data_dir, filename)))
        try:
            self.s3.upload_file(
                Filename=os.path.join(self.data_dir, filename),
                Bucket=self.bucket_name,
                Key=filename,
                Callback=lambda bytes_transferred: self.__call__(bytes_transferred, filename)
            )
        except Exception as e:
            print(f"Error: {e.args}")

Output should be something like this

  file : iris.csv : 5107 / 5107.0 ----- 100.0%

For a bit larger file you should see something like

  file : annual-enterprise.csv : 1048576 / 8065547.0 ----- 13.000680549006782% 
  file : annual-enterprise.csv : 2097152 / 8065547.0 ----- 26.001361098013565% 
  file : annual-enterprise.csv : 3145728 / 8065547.0 ----- 39.00204164702035% 
  file : annual-enterprise.csv : 4194304 / 8065547.0 ----- 52.00272219602713% 
  file : annual-enterprise.csv : 5242880 / 8065547.0 ----- 65.00340274503391% 
  file : annual-enterprise.csv : 6291456 / 8065547.0 ----- 78.0040832940407% 
  file : annual-enterprise.csv : 7340032 / 8065547.0 ----- 91.00476384304747% 
  file : annual-enterprise.csv : 8065547 / 8065547.0 ----- 100.0%

Here is the fun part, we know what a multipart upload in s3, right ? For larger file s3 recommends that we use multipart upload, that is it the file is split into defined chunks and uploaded in parallel rather than as a whole in a single thread. In order to use multiple upload with boto3 we will need to modify our code slightly

def upload_single_file(self, filename=None):
    if filename is None:
        filename = self.filename
    self.filesize = float(os.path.getsize(os.path.join(self.data_dir, filename)))

    # TransferConfig allows us to define the parameters for multipart.
    # Since default threshold is larger than our filesize we will use smaller threshold to force
    # boto3 to use multipart. We will use 100KB chunk size
    transfer_config = TransferConfig(
        multipart_threshold=100 * KB, multipart_chunksize=100 * KB
    )
    try:
        self.s3.upload_file(
            Filename=os.path.join(self.data_dir, filename),
            Bucket=self.bucket_name,
            Key=filename,
            Callback=lambda bytes_transferred: self.__call__(
                bytes_transferred, filename
            ),
            Config=transfer_config,
        )

        # We can verify the upload completion by checking ETag
        # Note: For multipart uploads, response ETag will contain a dash and part count
        response = self.s3.head_object(Bucket=self.bucket_name, Key=filename)
        etag = response["ETag"]
        print(etag)
    except Exception as e:
        print(f"Error: {e.args}")

The response now would be something similar to this

  "924e834fb0c3946321bc1f716f163a8e-1"

Now lets increase the multipart_threshold

  multipart_threshold=100 * MB

and the response would be something similar to this

  "0a972b95e3216112c716228740cce66a"

Notice there is no dash (-). This is because the threshold we used is larger than the file size. The absence of the dash confirms it to be not a multipart upload. Now to upload all files in a directory we simply walk the directory and upload files.

def upload_directory(self, max_workers: int = 4):

    files_list = []
    for root, _, files in os.walk(self.data_dir):
        for f in files:
            files_list.append(os.path.join(root, f))

    if not files_list:
        print(f" No files in {self.data_dir} ?")
        return

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(
                self.upload_single_file, os.path.relpath(path, self.data_dir)
            ): path
            for path in files_list
        }

        """
            We use as_completed() here for futures. It is an iterator over the given futures that yields each as it
            completes. [Doc]. as_completed() takes a list (or any iterable) of Future objects and returns an 
            iterator that yields each future as soon as it finishes, in no particular order. 
        """
        for future in as_completed(futures):
            src_path = futures[future]
            try:
                future.result()
                print(f"Uploaded: {src_path}")
            except Exception as e:
                print(f"Failed to upload {src_path}: {e}")

Calling this function should give you something like this.

file : iris.csv : 5107 / 5107.0 ----- 100.0% 
file : annual-enterprise.csv : 1053683 / 5107.0 ----- 20632.132367338945% 
"717820ef0af287ff346c5cabfb4c612c"
Uploaded: <path to data>/iris.csv
file : annual-enterprise.csv : 2102259 / 5107.0 ----- 41164.26473467789% 
file : annual-enterprise.csv : 3150835 / 5107.0 ----- 61696.39710201684% 
file : annual-enterprise.csv : 4062711 / 5107.0 ----- 79551.81123947522% 
file : annual-enterprise.csv : 5111287 / 5107.0 ----- 100083.94360681418% 
file : annual-enterprise.csv : 6159863 / 5107.0 ----- 120616.07597415311% 
file : annual-enterprise.csv : 7208439 / 5107.0 ----- 141148.20834149208% 
file : annual-enterprise.csv : 8257015 / 5107.0 ----- 161680.34070883103% 
"e24b18ef5a0bf1d0f5bec32bf0cef99a-2"
Uploaded: <path to data>/annual-enterprise.csv

As always full code on my GitHub

0
Subscribe to my newsletter

Read articles from Saurab Dahal directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Saurab Dahal
Saurab Dahal