Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug / Race condition when adding assets via the python API #15354

Open
1 of 3 tasks
twentworth opened this issue Jan 15, 2025 · 4 comments
Open
1 of 3 tasks

Bug / Race condition when adding assets via the python API #15354

twentworth opened this issue Jan 15, 2025 · 4 comments

Comments

@twentworth
Copy link

twentworth commented Jan 15, 2025

The bug

I ran into what I suspect is some kind of race condition with the postgres database in immich when adding assets too rapidly.

What I was doing:
My wife and I had separate accounts, but we want to share facial recognition, so I decided to have us both use my account. To migrate her photos into my account, I wrote the python code below. The code first went through and downloaded all of her assets. I then uploaded them into my account with 3 threads.

What went wrong:
All of the files made it into my account, however some were not viewable from the immich webpage or app. I checked some of the images/videos that wouldn't show up (I could find them because I had a copy of everything under my wife's account), and verified that the files were indeed where they should be. Trying to upload the file again would give the duplicate file error.

How I fixed it:
In the end, I wrote code to delete and re-upload all of the files, this time without using threads. This resulted in a different set of photos being missing, which is what lead me to believe there is a race condition type error. I then deleted and re-uploaded them a second time, this time with a sleep command in-between the steps. This finally resulted in a successful upload.

Python code:
Base functions/etc:

import os
import requests
import shutil
import json
import concurrent.futures

BASE_URL = "http://<my-internal-ip>:<my-port>"
SOURCE_KEY = '<wife-api-key>'
DEST_KEY = '<my-api-key>'
SOURCE_HEADER = {
  'Content-Type': 'application/json',
  'Accept': 'application/json',
  'x-api-key': SOURCE_KEY
}
DEST_HEADER = {k:v for k, v in SOURCE_HEADER.items()}; DEST_HEADER['x-api-key'] = DEST_KEY
FOLDER = 'images/'

def get_all_assests():
    url = f"{BASE_URL}/api/search/metadata"
    page=1
    assets_list = []
    while True:
        payload = json.dumps({
            'page':page
        })
        print('Getting page:', page)
        response = requests.request("POST", url, headers=SOURCE_HEADER, data=payload)

        tj = response.json()
        assets_list+= tj['assets']['items']
        if 'nextPage' in tj['assets'] and tj['assets']['nextPage'] is not None:
            page = tj['assets']['nextPage']
        else:
            return assets_list
        
def get_paths(img_dict):
    name_prefix = img_dict['originalPath'].replace('/','_')
    return (
        FOLDER + name_prefix + '.tmp', #temp path for downloading
        FOLDER + name_prefix, # final path
        FOLDER + name_prefix + '.done' #file to create after uploading
    )

def save_image(img_dict, force_redo=False):
    idd = img_dict['id']
    url = f"{BASE_URL}/api/assets/{idd}/original"

    tmp_path, final_path, _ = get_paths(img_dict)
    
    payload = {}
    if force_redo or not os.path.isfile(final_path):
        # print(f'{idd} Downloading')
        response = requests.request("GET", url, headers=SOURCE_HEADER, data=payload, stream=True)
        response.raise_for_status()
        try:
            with open(tmp_path, "wb") as temp_file:
                for chunk in response.iter_content(chunk_size=8192):
                    temp_file.write(chunk)
            shutil.move(tmp_path, final_path)
        except Exception:
            if os.path.isfile(tmp_path):
                os.remove(tmp_path)
            raise
        # print(f'{idd} Saved')
    else:
        # print(f'{idd} already saved')
        pass
            
        
        
def upload_image(img_dict):
    idd = img_dict['id']
    _, final_path, done_path = get_paths(img_dict)
    url = f"{BASE_URL}/api/assets"


    payload = {
        'deviceAssetId': img_dict['deviceAssetId'],
        'deviceId': img_dict['deviceId'],
        'duration': img_dict['duration'],
        'fileCreatedAt': img_dict['fileCreatedAt'],
        'fileModifiedAt': img_dict['fileModifiedAt'],
        'x-immich-checksum': img_dict['checksum']
    }
    

    header = {k:v for k, v in DEST_HEADER.items()}
    del header['Content-Type']
    if not os.path.isfile(done_path):
        with open(final_path,'rb') as fl:
            files=[
              ('assetData',fl)
            ]
            response = requests.request("POST", url, headers=header, data=payload, files=files)
            
            # payload['assetData'] = fl.read()
            # response = requests.request("POST", url, headers=header, data=payload)
        # print(response.text)
        assert response.ok
        with open(done_path, 'w+') as f:
            f.write('DONE!')
        # print(f'{idd} UPLOADED!')
    else:
        # print(f'{idd} Already UPLOADED!')
        pass

def upload_image_mod(img_dict):
    idd = img_dict['id']
    _, final_path, done_path = get_paths(img_dict)
    url = f"{BASE_URL}/api/assets"


    payload = {
        'deviceAssetId': img_dict['deviceAssetId'],
        'deviceId': img_dict['deviceId'],
        'duration': img_dict['duration'],
        'fileCreatedAt': img_dict['fileCreatedAt'],
        'fileModifiedAt': img_dict['fileModifiedAt'],
        'x-immich-checksum': img_dict['checksum']
    }
    

    header = {k:v for k, v in DEST_HEADER.items()}
    del header['Content-Type']
    
    with open(final_path,'rb') as fl:
        files=[
          ('assetData',fl)
        ]
        response = requests.request("POST", url, headers=header, data=payload, files=files)

        # payload['assetData'] = fl.read()
        # response = requests.request("POST", url, headers=header, data=payload)
    print(response.text)
    assert response.ok
    return response.json()
        
def do_a_file(img_dict):
    save_image(img_dict)
    upload_image(img_dict)
    print(f"{img_dict['id']} Done")
    
def delete_asset(dest_id):
    url = f"{BASE_URL}/api/assets"


    payload = json.dumps({
      "force": True,
      "ids": [
        dest_id
      ]
    })
    headers = {
      'Content-Type': 'application/json',
      'x-api-key': 'asdf'
    }


    response = requests.request("DELETE", url, headers=DEST_HEADER, data=payload)


    print(response.text)
    print(response.ok)

Initial upload (That had the issue):

def process_all_files_concurrently():
    # Set the maximum number of threads to 6
    max_threads = 3

    # Use ThreadPoolExecutor to parallelize the processing
    with concurrent.futures.ThreadPoolExecutor(max_threads) as executor:
        # Use map to apply do_a_file to each item in all_stuff
        executor.map(do_a_file, all_stuff)        

First fix attempt (that also had the issue):

for i, cur_stuff in enumerate(all_stuff):
    # if cur_stuff['type'] == 'VIDEO':
    #     continue
    resp_json = upload_image_mod(cur_stuff)
    if resp_json['status'] == 'duplicate':
        print(f"REUPLOADING {cur_stuff['id']}")
        delete_asset(resp_json['id'])
        print(f"DELETED TARGET id {resp_json['id']}")
        resp_json = upload_image_mod(cur_stuff)
        if resp_json == 'created':
            print(f"SUCCESS: {cur_stuff['id']} -> {resp_json['id']}")

Final fix that worked:

import time

for i, cur_stuff in enumerate(all_stuff):
    # if cur_stuff['type'] == 'VIDEO': # Note, I did videos and images separately, as videos were easier to check.  but the issue affected both.
    #     continue
    resp_json = upload_image_mod(cur_stuff)
    if resp_json['status'] == 'duplicate':
        print(f"REUPLOADING {cur_stuff['id']}")
        delete_asset(resp_json['id'])
        time.sleep(1)
        print(f"DELETED TARGET id {resp_json['id']}")
        resp_json = upload_image_mod(cur_stuff)
        if resp_json == 'created':
            print(f"SUCCESS: {cur_stuff['id']} -> {resp_json['id']}")
        time.sleep(.5) 

The OS that Immich Server is running on

Unraid 7.0.0 via Docker

Version of Immich Server

v1.123.0

Version of Immich Mobile App

v1.124.2

Platform with the issue

  • Server
  • Web
  • Mobile

Your docker-compose.yml content

Not sure how to display this.  I added this via the unraid managed feature.

Your .env content

Not sure how to display this.  I added this via the unraid managed feature.

Reproduction steps

described above

Relevant log output

No response

Additional information

No response

@twentworth twentworth changed the title Bug / Race condition when adding assets via the API Bug / Race condition when adding assets via the python API Jan 15, 2025
@alextran1502
Copy link
Contributor

Thank you for the information. When you cannot view a certain photo, do you know if the jobs have finished running? We use a queue system so regardless of how many photos get uploaded, they should be processed in order

@twentworth
Copy link
Author

Yes, I checked that. All the queues were complete. I also tried re-running what queues that I could from the UI.

@bo0tzz
Copy link
Member

bo0tzz commented Jan 15, 2025

Our CLI does a multithreaded upload and we've never had any issues around that.

@twentworth
Copy link
Author

twentworth commented Jan 17, 2025

Yea, I unfortunately don't have a good explanation for what happened. I'm pretty new to Immich and don't have a good grasp on the internals. I was only able to detect the issue because I still had both accounts active and was able to manually check some of the files. Some additional details that I forgot to mention:

  • My main server has an intel N150 processor, running unraid, with 16 GB of ram and all SSD drives. Immich runs on this server via Docker.
  • AI tasks are offloaded to a M2 mac mini on the same network
  • The python code was executed from a jupyter notebook running on my main server (so, it was pretty quick)
  • The issue does seem replicable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants