Uploading files in S3 with python

Saurab DahalSaurab Dahal
5 min read

In the previous article we created a bucket in s3 using boto3. Now it’s time to upload some files that we will use later to analyze.

There are many ways to upload file(s) in s3. Easy way is to use console but it’s not automated, meaning you will have to do it manually. Now imagine you have logs generated daily. It would be impossible to go to console and upload each log file one at a time.

That’s why in this guide, we will go through the process of automating this tedious task.

Uploading a single file

  1. Create a file on s3 folder.

  2. We will also create a data directory in our root directory to make it easier to work. While this is not necessary as files can reside anywhere as long as we can access them, doing this way makes it easier to follow thorough.

  3. Paste this code.

import os
import boto3


 class FileManager:
     def __init__(self):
         self.bucket_name = "aws-demo-fruit"  
         self.s3 = boto3.client('s3')
         self.data_dir = "<path to your data dir>"

     def upload_single_file(self, filename):
         try:
             self.s3.upload_file(
                 Filename=os.path.join(self.data_dir, filename),
                 Bucket=self.bucket_name,
                 Key=filename
             )
         except Exception as e:
             print(f"Error: {e.args}")
  1. Now run the code from you main.py

  2. I have used iris dataset as it is easily available, small and overall easier to work with. Feel free to upload any file you want.

  3. Now if you open your aws console, you should see something similar.

  4. The good thing about using upload_file() is that it gives us a callback option that we can use to see the file upload progress

  5.       def upload_single_file(self, filename=None):
              if filename is None:
                  filename = self.filename
              self.filesize = float(os.path.getsize(os.path.join(self.data_dir, filename)))
              try:
                  self.s3.upload_file(
                      Filename=os.path.join(self.data_dir, filename),
                      Bucket=self.bucket_name,
                      Key=filename,
                      Callback=lambda bytes_transferred: self.__call__(bytes_transferred, filename)
                  )
              except Exception as e:
                  print(f"Error: {e.args}")
    

    We can write our call back as such

  6.         def __call__(self, bytes_amount, filename):
                 with self._lock:
                     self.uploaded += bytes_amount
                     upload_percentage = (self.uploaded / self.filesize) * 100
                     print(f"file : {filename} : {self.uploaded} / {self.filesize} ----- {upload_percentage}% ")
    

    We will need to add some parameters on our __init__ method

             self.filesize = 0
             self.uploaded = 0
             self._lock = threading.Lock()
    

    Final code looks like this

import os,sys
import threading

import boto3


class FileManager:
    def __init__(self):
        self.bucket_name = "aws-demo-fruit"
        self.s3 = boto3.client('s3')
        self.data_dir = "<path to your data dir>"
        self.filesize = 0
        self.uploaded = 0
        self._lock = threading.Lock()

    def __call__(self, bytes_amount, filename):
        with self._lock:
            self.uploaded += bytes_amount
            upload_percentage = (self.uploaded / self.filesize) * 100
            print(f"file : {filename} : {self.uploaded} / {self.filesize} ----- {upload_percentage}% ")

    def upload_single_file(self, filename=None):
        if filename is None:
            filename = self.filename
        self.filesize = float(os.path.getsize(os.path.join(self.data_dir, filename)))
        try:
            self.s3.upload_file(
                Filename=os.path.join(self.data_dir, filename),
                Bucket=self.bucket_name,
                Key=filename,
                Callback=lambda bytes_transferred: self.__call__(bytes_transferred, filename)
            )
        except Exception as e:
            print(f"Error: {e.args}")

Output should be something like this

  1.   file : iris.csv : 5107 / 5107.0 ----- 100.0%
    

    For a bit larger file you should see something like

  2.   file : annual-enterprise.csv : 1048576 / 8065547.0 ----- 13.000680549006782% 
      file : annual-enterprise.csv : 2097152 / 8065547.0 ----- 26.001361098013565% 
      file : annual-enterprise.csv : 3145728 / 8065547.0 ----- 39.00204164702035% 
      file : annual-enterprise.csv : 4194304 / 8065547.0 ----- 52.00272219602713% 
      file : annual-enterprise.csv : 5242880 / 8065547.0 ----- 65.00340274503391% 
      file : annual-enterprise.csv : 6291456 / 8065547.0 ----- 78.0040832940407% 
      file : annual-enterprise.csv : 7340032 / 8065547.0 ----- 91.00476384304747% 
      file : annual-enterprise.csv : 8065547 / 8065547.0 ----- 100.0%
    

    Here is the fun part, we know what a multipart upload in s3, right ? For larger file s3 recommends that we use multipart upload, that is it the file is split into defined chunks and uploaded in parallel rather than as a whole in a single thread. In order to use multiple upload with boto3 we will need to modify our code slightly

  3.      def upload_single_file(self, filename=None):
              if filename is None:
                  filename = self.filename
              self.filesize = float(os.path.getsize(os.path.join(self.data_dir, filename)))
    
              # TransferConfig allows us to define the parameters for multipart.
              # Since default threshold is larger than our filesize we will use smaller threshold to force
              # boto3 to use multipart. We will use 100KB chunk size
              transfer_config = TransferConfig(
                  multipart_threshold=100 * KB,
                  multipart_chunksize=100 * KB
              )
              try:
                  self.s3.upload_file(
                      Filename=os.path.join(self.data_dir, filename),
                      Bucket=self.bucket_name,
                      Key=filename,
                      Callback=lambda bytes_transferred: self.__call__(bytes_transferred, filename),
                      Config=transfer_config
                  )
    
                  # We can verify the upload completion by checking ETag
                  # Note: For multipart uploads, response ETag will contain a dash and part count
                  response = self.s3.head_object(Bucket=self.bucket_name, Key=filename)
                  etag = response['ETag']
                  print(etag)
              except Exception as e:
                  print(f"Error: {e.args}")
    

    The response now would be something similar to this

  4.   "924e834fb0c3946321bc1f716f163a8e-1"
    

    now lets increase the multipart_threshold

  5.   multipart_threshold=100 * MB
    

    and the response would be something similar to this

  6.   "0a972b95e3216112c716228740cce66a"
    

    Notice there is no dash (-). This is because the threshold we used is larger than the file size. The absence of the dash confirms it to be not a multipart upload.

Now to upload all files in a directory we simply walk the directory and upload files.

    def upload_directory(self, max_workers: int = 4):

        files_list = []
        for root, _, files in os.walk(self.data_dir):
            for f in files:
                files_list.append(os.path.join(root, f))

        if not files_list:
            print(f" No files in {self.data_dir} ?")
            return

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {
                executor.submit(self.upload_single_file, os.path.relpath(path, self.data_dir)): path
                for path in files_list
            }

            """
                We use as_completed() here for futures. It is an iterator over the given futures that yields each as it
                completes. [Doc]. as_completed() takes a list (or any iterable) of Future objects and returns an 
                iterator that yields each future as soon as it finishes, in no particular order. 
            """
            for future in as_completed(futures):
                src_path = futures[future]
                try:
                    future.result()
                    print(f"Uploaded: {src_path}")
                except Exception as e:
                    print(f"Failed to upload {src_path}: {e}")

Calling this function should give you something like this

file : iris.csv : 5107 / 5107.0 ----- 100.0% 
file : annual-enterprise.csv : 1053683 / 5107.0 ----- 20632.132367338945% 
"717820ef0af287ff346c5cabfb4c612c"
Uploaded: <path to data>/iris.csv
file : annual-enterprise.csv : 2102259 / 5107.0 ----- 41164.26473467789% 
file : annual-enterprise.csv : 3150835 / 5107.0 ----- 61696.39710201684% 
file : annual-enterprise.csv : 4062711 / 5107.0 ----- 79551.81123947522% 
file : annual-enterprise.csv : 5111287 / 5107.0 ----- 100083.94360681418% 
file : annual-enterprise.csv : 6159863 / 5107.0 ----- 120616.07597415311% 
file : annual-enterprise.csv : 7208439 / 5107.0 ----- 141148.20834149208% 
file : annual-enterprise.csv : 8257015 / 5107.0 ----- 161680.34070883103% 
"e24b18ef5a0bf1d0f5bec32bf0cef99a-2"
Uploaded: <path to data>/annual-enterprise.csv

As always full code on my GitHub

0
Subscribe to my newsletter

Read articles from Saurab Dahal directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Saurab Dahal
Saurab Dahal