Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 52 additions & 24 deletions collectoss/tasks/start_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,13 @@
logger.info("Checking for repos to collect")


#Get list of enabled phases
enabled_phase_names = get_enabled_phase_names_from_config(engine, logger)


enabled_collection_hooks = []

with DatabaseSession(logger, self.app.engine) as session:
with DatabaseSession(logger, engine) as session:

#Get list of enabled phases
enabled_phase_names = get_enabled_phase_names_from_config_session(session, logger)

# Get config values for collection intervals
config = SystemConfig(logger, session)
Expand Down Expand Up @@ -345,28 +346,55 @@
engine = self.app.engine
logger = logging.getLogger(create_collection_status_records.__name__)

#TODO: Isaac needs to normalize the status's to be abstract in the
#collection_status table once collectoss dev is less unstable dev is less unstable.
query = s.sql.text(f"""UPDATE collection_status SET secondary_status = '{CollectionState.PENDING.value}'"""
f""" WHERE secondary_status = '{CollectionState.ERROR.value}' and secondary_data_last_collected is NULL;"""
f"""UPDATE collection_status SET core_status = '{CollectionState.PENDING.value}'"""
f""" WHERE core_status = '{CollectionState.ERROR.value}' and core_data_last_collected is NULL;"""
f"""UPDATE collection_status SET facade_status = '{CollectionState.PENDING.value}'"""
f""" WHERE facade_status = '{CollectionState.ERROR.value}' and facade_data_last_collected is NULL;"""
f"""UPDATE collection_status SET ml_status = '{CollectionState.PENDING.value}'"""
f""" WHERE ml_status = '{CollectionState.ERROR.value}' and ml_data_last_collected is NULL;"""

with DatabaseSession(logger, engine) as session:

Check warning

Code scanning / Bandit

Possible SQL injection vector through string-based query construction. Warning

Possible SQL injection vector through string-based query construction.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes i know this part of the code needs a rewrite


# if we have a lot of new repos to collect, we should skip retrying repos that have errored until collection is caught up.
total_new_repos = 0

#Get list of enabled phases
enabled_phase_names = get_enabled_phase_names_from_config_session(session, logger)

query_limit = 1_000_000

if primary_repo_collect_phase.__name__ in enabled_phase_names:
total_new_repos += len(get_newly_added_repos(session, query_limit, "core"))

f"""UPDATE collection_status SET secondary_status = '{CollectionState.SUCCESS.value}'"""
f""" WHERE secondary_status = '{CollectionState.ERROR.value}' and secondary_data_last_collected is not NULL;"""
f"""UPDATE collection_status SET core_status = '{CollectionState.SUCCESS.value}'"""
f""" WHERE core_status = '{CollectionState.ERROR.value}' and core_data_last_collected is not NULL;;"""
f"""UPDATE collection_status SET facade_status = '{CollectionState.SUCCESS.value}'"""
f""" WHERE facade_status = '{CollectionState.ERROR.value}' and facade_data_last_collected is not NULL;;"""
f"""UPDATE collection_status SET ml_status = '{CollectionState.SUCCESS.value}'"""
f""" WHERE ml_status = '{CollectionState.ERROR.value}' and ml_data_last_collected is not NULL;;"""
)
if secondary_repo_collect_phase.__name__ in enabled_phase_names:
total_new_repos += len(get_newly_added_repos(session, query_limit, "secondary"))

execute_sql(query)
if facade_phase.__name__ in enabled_phase_names:
total_new_repos += len(get_newly_added_repos(session, query_limit, "facade"))

if machine_learning_phase.__name__ in enabled_phase_names:
total_new_repos += len(get_newly_added_repos(session, query_limit, "ml"))

if total_new_repos == 0:

#TODO: Isaac needs to normalize the status's to be abstract in the
#collection_status table once collectoss dev is less unstable dev is less unstable.
query = s.sql.text(f"""UPDATE collection_status SET secondary_status = '{CollectionState.PENDING.value}'"""
f""" WHERE secondary_status = '{CollectionState.ERROR.value}' and secondary_data_last_collected is NULL;"""
f"""UPDATE collection_status SET core_status = '{CollectionState.PENDING.value}'"""
f""" WHERE core_status = '{CollectionState.ERROR.value}' and core_data_last_collected is NULL;"""
f"""UPDATE collection_status SET facade_status = '{CollectionState.PENDING.value}'"""
f""" WHERE facade_status = '{CollectionState.ERROR.value}' and facade_data_last_collected is NULL;"""
f"""UPDATE collection_status SET ml_status = '{CollectionState.PENDING.value}'"""
f""" WHERE ml_status = '{CollectionState.ERROR.value}' and ml_data_last_collected is NULL;"""

f"""UPDATE collection_status SET secondary_status = '{CollectionState.SUCCESS.value}'"""
f""" WHERE secondary_status = '{CollectionState.ERROR.value}' and secondary_data_last_collected is not NULL;"""
f"""UPDATE collection_status SET core_status = '{CollectionState.SUCCESS.value}'"""
f""" WHERE core_status = '{CollectionState.ERROR.value}' and core_data_last_collected is not NULL;"""
f"""UPDATE collection_status SET facade_status = '{CollectionState.SUCCESS.value}'"""
f""" WHERE facade_status = '{CollectionState.ERROR.value}' and facade_data_last_collected is not NULL;"""
f"""UPDATE collection_status SET ml_status = '{CollectionState.SUCCESS.value}'"""
f""" WHERE ml_status = '{CollectionState.ERROR.value}' and ml_data_last_collected is not NULL;"""
)

execute_sql(query)
else:
logger.warning(f"Skipping retry of errored repos so we dont overwhelm collection when there are {total_new_repos} new repos to collect")



Expand Down
7 changes: 6 additions & 1 deletion collectoss/tasks/util/collection_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import random

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pylint] reported by reviewdog 🐶
W0611: Unused import random (unused-import)

import datetime
import math
#from celery.result import AsyncResult
from celery import chain
import sqlalchemy as s
Expand Down Expand Up @@ -43,7 +44,11 @@ def get_valid_repos(self,session):
if limit <= 0:
return

new_collection_git_list = get_newly_added_repos(session, limit, hook=self.name)
# fill the remaining limit with half new repos and half recollected repos
# favoring recollection if there isnt an even number of repos
new_collection_limit = math.floor(limit / 2)

new_collection_git_list = get_newly_added_repos(session, new_collection_limit, hook=self.name)
collection_list = [(repo_git, True) for repo_git in new_collection_git_list]
self.repo_list.extend(collection_list)
limit -= len(collection_list)
Expand Down
Loading