X-Git-Url: https://git.arvados.org/arvados-workbench2.git/blobdiff_plain/c276ce4c8dd3cf27ad6d17eca9af473f353fda55..bf64b82d45ef695c312b08b663311f4da7b7a4e3:/src/store/process-logs-panel/process-logs-panel-actions.ts diff --git a/src/store/process-logs-panel/process-logs-panel-actions.ts b/src/store/process-logs-panel/process-logs-panel-actions.ts index 79c6434c..88b56a2c 100644 --- a/src/store/process-logs-panel/process-logs-panel-actions.ts +++ b/src/store/process-logs-panel/process-logs-panel-actions.ts @@ -2,28 +2,43 @@ // // SPDX-License-Identifier: AGPL-3.0 -import { unionize, ofType, UnionOf } from "~/common/unionize"; -import { ProcessLogs, getProcessLogsPanelCurrentUuid } from './process-logs-panel'; -import { LogEventType } from '~/models/log'; -import { RootState } from '~/store/store'; -import { ServiceRepository } from '~/services/services'; +import { unionize, ofType, UnionOf } from "common/unionize"; +import { ProcessLogs } from './process-logs-panel'; +import { LogEventType } from 'models/log'; +import { RootState } from 'store/store'; +import { ServiceRepository } from 'services/services'; import { Dispatch } from 'redux'; -import { groupBy } from 'lodash'; -import { loadProcess } from '~/store/processes/processes-actions'; -import { LogResource } from '~/models/log'; -import { LogService } from '~/services/log-service/log-service'; -import { ResourceEventMessage } from '~/websocket/resource-event-message'; -import { getProcess } from '~/store/processes/process'; -import { FilterBuilder } from "~/services/api/filter-builder"; -import { OrderBuilder } from "~/services/api/order-builder"; +import { LogFragment, LogService, logFileToLogType } from 'services/log-service/log-service'; +import { Process, getProcess } from 'store/processes/process'; +import { navigateTo } from 'store/navigation/navigation-action'; +import { snackbarActions, SnackbarKind } from 'store/snackbar/snackbar-actions'; +import { CollectionFile, CollectionFileType } from "models/collection-file"; +import { ContainerRequestResource, ContainerRequestState } from "models/container-request"; + +const SNIPLINE = `================ ✀ ================ ✀ ========= Some log(s) were skipped ========= ✀ ================ ✀ ================`; +const LOG_TIMESTAMP_PATTERN = /^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{9}Z/; export const processLogsPanelActions = unionize({ RESET_PROCESS_LOGS_PANEL: ofType<{}>(), INIT_PROCESS_LOGS_PANEL: ofType<{ filters: string[], logs: ProcessLogs }>(), SET_PROCESS_LOGS_PANEL_FILTER: ofType(), - ADD_PROCESS_LOGS_PANEL_ITEM: ofType<{ logType: string, log: string }>(), + ADD_PROCESS_LOGS_PANEL_ITEM: ofType(), }); +// Max size of logs to fetch in bytes +const maxLogFetchSize: number = 128 * 1000; + +type FileWithProgress = { + file: CollectionFile; + lastByte: number; +} + +type SortableLine = { + logType: LogEventType, + timestamp: string; + contents: string; +} + export type ProcessLogsPanelAction = UnionOf; export const setProcessLogsPanelFilter = (filter: string) => @@ -31,76 +46,308 @@ export const setProcessLogsPanelFilter = (filter: string) => export const initProcessLogsPanel = (processUuid: string) => async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => { - dispatch(processLogsPanelActions.RESET_PROCESS_LOGS_PANEL()); - const process = await dispatch(loadProcess(processUuid)); - if (process.container) { - const logResources = await loadContainerLogs(process.container.uuid, logService); - const initialState = createInitialLogPanelState(logResources); + let process: Process | undefined; + try { + dispatch(processLogsPanelActions.RESET_PROCESS_LOGS_PANEL()); + process = getProcess(processUuid)(getState().resources); + if (process?.containerRequest?.uuid) { + // Get log file size info + const logFiles = await loadContainerLogFileList(process.containerRequest, logService); + + // Populate lastbyte 0 for each file + const filesWithProgress = logFiles.map((file) => ({ file, lastByte: 0 })); + + // Fetch array of LogFragments + const logLines = await loadContainerLogFileContents(filesWithProgress, logService, process); + + // Populate initial state with filters + const initialState = createInitialLogPanelState(logFiles, logLines); + dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState)); + } + } catch (e) { + // On error, populate empty state to allow polling to start + const initialState = createInitialLogPanelState([], []); dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState)); + // Only show toast on errors other than 404 since 404 is expected when logs do not exist yet + if (e.status !== 404) { + dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Error loading process logs', hideDuration: 4000, kind: SnackbarKind.ERROR })); + } + if (e.status === 404 && process?.containerRequest.state === ContainerRequestState.FINAL) { + dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Log collection was trashed or deleted.', hideDuration: 4000, kind: SnackbarKind.WARNING })); + } } }; -export const addProcessLogsPanelItem = (message: ResourceEventMessage<{ text: string }>) => +export const pollProcessLogs = (processUuid: string) => async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => { - if (PROCESS_PANEL_LOG_EVENT_TYPES.indexOf(message.eventType) > -1) { - const uuid = getProcessLogsPanelCurrentUuid(getState()); - if (uuid) { - const process = getProcess(uuid)(getState().resources); - if (process) { - const { containerRequest, container } = process; - if (message.objectUuid === containerRequest.uuid - || container && message.objectUuid === container.uuid) { - dispatch(processLogsPanelActions.ADD_PROCESS_LOGS_PANEL_ITEM({ - logType: SUMMARIZED_FILTER_TYPE, - log: message.properties.text - })); - dispatch(processLogsPanelActions.ADD_PROCESS_LOGS_PANEL_ITEM({ - logType: message.eventType, - log: message.properties.text - })); + try { + // Get log panel state and process from store + const currentState = getState().processLogsPanel; + const process = getProcess(processUuid)(getState().resources); + + // Check if container request is present and initial logs state loaded + if (process?.containerRequest?.uuid && Object.keys(currentState.logs).length > 0) { + const logFiles = await loadContainerLogFileList(process.containerRequest, logService); + + // Determine byte to fetch from while filtering unchanged files + const filesToUpdateWithProgress = logFiles.reduce((acc, updatedFile) => { + // Fetch last byte or 0 for new log files + const currentStateLogLastByte = currentState.logs[logFileToLogType(updatedFile)]?.lastByte || 0; + + const isNew = !Object.keys(currentState.logs).find((currentStateLogName) => (updatedFile.name.startsWith(currentStateLogName))); + const isChanged = !isNew && currentStateLogLastByte < updatedFile.size; + + if (isNew || isChanged) { + return acc.concat({ file: updatedFile, lastByte: currentStateLogLastByte }); + } else { + return acc; } + }, [] as FileWithProgress[]); + + // Perform range request(s) for each file + const logFragments = await loadContainerLogFileContents(filesToUpdateWithProgress, logService, process); + + if (logFragments.length) { + // Convert LogFragments to ProcessLogs with All/Main sorting & line-merging + const groupedLogs = groupLogs(logFiles, logFragments); + await dispatch(processLogsPanelActions.ADD_PROCESS_LOGS_PANEL_ITEM(groupedLogs)); } } + return Promise.resolve(); + } catch (e) { + // Remove log when polling error is handled in some way instead of being ignored + console.error("Error occurred in pollProcessLogs:", e); + return Promise.reject(); } }; -const loadContainerLogs = async (containerUuid: string, logService: LogService) => { - const requestFilters = new FilterBuilder() - .addEqual('objectUuid', containerUuid) - .addIn('eventType', PROCESS_PANEL_LOG_EVENT_TYPES) - .getFilters(); - const requestOrder = new OrderBuilder() - .addAsc('eventAt') - .getOrder(); - const requestParams = { - limit: MAX_AMOUNT_OF_LOGS, - filters: requestFilters, - order: requestOrder, - }; - const { items } = await logService.list(requestParams); - return items; +const loadContainerLogFileList = async (containerRequest: ContainerRequestResource, logService: LogService) => { + const logCollectionContents = await logService.listLogFiles(containerRequest); + + // Filter only root directory files matching log event types which have bytes + return logCollectionContents.filter((file): file is CollectionFile => ( + file.type === CollectionFileType.FILE && + PROCESS_PANEL_LOG_EVENT_TYPES.indexOf(logFileToLogType(file)) > -1 && + file.size > 0 + )); }; -const createInitialLogPanelState = (logResources: LogResource[]) => { - const allLogs = logsToLines(logResources); - const groupedLogResources = groupBy(logResources, log => log.eventType); - const groupedLogs = Object - .keys(groupedLogResources) - .reduce((grouped, key) => ({ - ...grouped, - [key]: logsToLines(groupedLogResources[key]) - }), {}); - const filters = [SUMMARIZED_FILTER_TYPE, ...Object.keys(groupedLogs)]; - const logs = { [SUMMARIZED_FILTER_TYPE]: allLogs, ...groupedLogs }; +/** + * Loads the contents of each file from each file's lastByte simultaneously + * while respecting the maxLogFetchSize by requesting the start and end + * of the desired block and inserting a snipline. + * @param logFilesWithProgress CollectionFiles with the last byte previously loaded + * @param logService + * @param process + * @returns LogFragment[] containing a single LogFragment corresponding to each input file + */ +const loadContainerLogFileContents = async (logFilesWithProgress: FileWithProgress[], logService: LogService, process: Process) => ( + (await Promise.allSettled(logFilesWithProgress.filter(({ file }) => file.size > 0).map(({ file, lastByte }) => { + const requestSize = file.size - lastByte; + if (requestSize > maxLogFetchSize) { + const chunkSize = Math.floor(maxLogFetchSize / 2); + const firstChunkEnd = lastByte + chunkSize - 1; + return Promise.all([ + logService.getLogFileContents(process.containerRequest, file, lastByte, firstChunkEnd), + logService.getLogFileContents(process.containerRequest, file, file.size - chunkSize, file.size - 1) + ] as Promise<(LogFragment)>[]); + } else { + return Promise.all([logService.getLogFileContents(process.containerRequest, file, lastByte, file.size - 1)]); + } + })).then((res) => { + if (res.length && res.every(promiseResult => (promiseResult.status === 'rejected'))) { + // Since allSettled does not pass promise rejection we throw an + // error if every request failed + const error = res.find( + (promiseResult): promiseResult is PromiseRejectedResult => promiseResult.status === 'rejected' + )?.reason; + return Promise.reject(error); + } + return res.filter((promiseResult): promiseResult is PromiseFulfilledResult => ( + // Filter out log files with rejected promises + // (Promise.all rejects on any failure) + promiseResult.status === 'fulfilled' && + // Filter out files where any fragment is empty + // (prevent incorrect snipline generation or an un-resumable situation) + !!promiseResult.value.every(logFragment => logFragment.contents.length) + )).map(one => one.value) + })).map((logResponseSet) => { + // For any multi fragment response set, modify the last line of non-final chunks to include a line break and snip line + // Don't add snip line as a separate line so that sorting won't reorder it + for (let i = 1; i < logResponseSet.length; i++) { + const fragment = logResponseSet[i - 1]; + const lastLineIndex = fragment.contents.length - 1; + const lastLineContents = fragment.contents[lastLineIndex]; + const newLastLine = `${lastLineContents}\n${SNIPLINE}`; + + logResponseSet[i - 1].contents[lastLineIndex] = newLastLine; + } + + // Merge LogFragment Array (representing multiple log line arrays) into single LogLine[] / LogFragment + return logResponseSet.reduce((acc, curr: LogFragment) => ({ + logType: curr.logType, + contents: [...(acc.contents || []), ...curr.contents] + }), {} as LogFragment); + }) +); + +const createInitialLogPanelState = (logFiles: CollectionFile[], logFragments: LogFragment[]): { filters: string[], logs: ProcessLogs } => { + const logs = groupLogs(logFiles, logFragments); + const filters = Object.keys(logs); return { filters, logs }; +} + +/** + * Converts LogFragments into ProcessLogs, grouping and sorting All/Main logs + * @param logFiles + * @param logFragments + * @returns ProcessLogs for the store + */ +const groupLogs = (logFiles: CollectionFile[], logFragments: LogFragment[]): ProcessLogs => { + const sortableLogFragments = mergeMultilineLoglines(logFragments); + + const allLogs = mergeSortLogFragments(sortableLogFragments); + const mainLogs = mergeSortLogFragments(sortableLogFragments.filter((fragment) => (MAIN_EVENT_TYPES.includes(fragment.logType)))); + + const groupedLogs = logFragments.reduce((grouped, fragment) => ({ + ...grouped, + [fragment.logType as string]: { lastByte: fetchLastByteNumber(logFiles, fragment.logType), contents: fragment.contents } + }), {}); + + return { + [MAIN_FILTER_TYPE]: { lastByte: undefined, contents: mainLogs }, + [ALL_FILTER_TYPE]: { lastByte: undefined, contents: allLogs }, + ...groupedLogs, + } }; -const logsToLines = (logs: LogResource[]) => - logs.map(({ properties }) => properties.text); +/** + * Checks for non-timestamped log lines and merges them with the previous line, assumes they are multi-line logs + * If there is no previous line (first line has no timestamp), the line is deleted. + * Only used for combined logs that need sorting by timestamp after merging + * @param logFragments + * @returns Modified LogFragment[] + */ +const mergeMultilineLoglines = (logFragments: LogFragment[]) => ( + logFragments.map((fragment) => { + // Avoid altering the original fragment copy + let fragmentCopy: LogFragment = { + logType: fragment.logType, + contents: [...fragment.contents], + } + // Merge any non-timestamped lines in sortable log types with previous line + if (fragmentCopy.contents.length && !NON_SORTED_LOG_TYPES.includes(fragmentCopy.logType)) { + for (let i = 0; i < fragmentCopy.contents.length; i++) { + const lineContents = fragmentCopy.contents[i]; + if (!lineContents.match(LOG_TIMESTAMP_PATTERN)) { + // Partial line without timestamp detected + if (i > 0) { + // If not first line, copy line to previous line + const previousLineContents = fragmentCopy.contents[i - 1]; + const newPreviousLineContents = `${previousLineContents}\n${lineContents}`; + fragmentCopy.contents[i - 1] = newPreviousLineContents; + } + // Delete the current line and prevent iterating + fragmentCopy.contents.splice(i, 1); + i--; + } + } + } + return fragmentCopy; + }) +); + +/** + * Merges log lines of different types and sorts types that contain timestamps (are sortable) + * @param logFragments + * @returns string[] of merged and sorted log lines + */ +const mergeSortLogFragments = (logFragments: LogFragment[]): string[] => { + const sortableFragments = logFragments + .filter((fragment) => (!NON_SORTED_LOG_TYPES.includes(fragment.logType))); + + const nonSortableLines = fragmentsToLines(logFragments + .filter((fragment) => (NON_SORTED_LOG_TYPES.includes(fragment.logType))) + .sort((a, b) => (a.logType.localeCompare(b.logType)))); -const MAX_AMOUNT_OF_LOGS = 10000; + return [...nonSortableLines, ...sortLogFragments(sortableFragments)]; +}; + +/** + * Performs merge and sort of input log fragment lines + * @param logFragments set of sortable log fragments to be merged and sorted + * @returns A string array containing all lines, sorted by timestamp and + * preserving line ordering and type grouping when timestamps match + */ +const sortLogFragments = (logFragments: LogFragment[]): string[] => { + const linesWithType: SortableLine[] = logFragments + // Map each logFragment into an array of SortableLine + .map((fragment: LogFragment): SortableLine[] => ( + fragment.contents.map((singleLine: string) => { + const timestampMatch = singleLine.match(LOG_TIMESTAMP_PATTERN); + const timestamp = timestampMatch && timestampMatch[0] ? timestampMatch[0] : ""; + return { + logType: fragment.logType, + timestamp: timestamp, + contents: singleLine, + }; + }) + // Merge each array of SortableLine into single array + )).reduce((acc: SortableLine[], lines: SortableLine[]) => ( + [...acc, ...lines] + ), [] as SortableLine[]); -const SUMMARIZED_FILTER_TYPE = 'Summarized'; + return linesWithType + .sort(sortableLineSortFunc) + .map(lineWithType => lineWithType.contents); +}; + +/** + * Sort func to sort lines + * Preserves original ordering of lines from the same source + * Stably orders lines of differing type but same timestamp + * (produces a block of same-timestamped lines of one type before a block + * of same timestamped lines of another type for readability) + * Sorts all other lines by contents (ie by timestamp) + */ +const sortableLineSortFunc = (a: SortableLine, b: SortableLine) => { + if (a.logType === b.logType) { + return 0; + } else if (a.timestamp === b.timestamp) { + return a.logType.localeCompare(b.logType); + } else { + return a.contents.localeCompare(b.contents); + } +}; + +const fragmentsToLines = (fragments: LogFragment[]): string[] => ( + fragments.reduce((acc, fragment: LogFragment) => ( + acc.concat(...fragment.contents) + ), [] as string[]) +); + +const fetchLastByteNumber = (logFiles: CollectionFile[], key: string) => { + return logFiles.find((file) => (file.name.startsWith(key)))?.size +}; + +export const navigateToLogCollection = (uuid: string) => + async (dispatch: Dispatch, getState: () => RootState, services: ServiceRepository) => { + try { + await services.collectionService.get(uuid); + dispatch(navigateTo(uuid)); + } catch { + dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Log collection was trashed or deleted.', hideDuration: 4000, kind: SnackbarKind.WARNING })); + } + }; + +const ALL_FILTER_TYPE = 'All logs'; + +const MAIN_FILTER_TYPE = 'Main logs'; +const MAIN_EVENT_TYPES = [ + LogEventType.CRUNCH_RUN, + LogEventType.STDERR, + LogEventType.STDOUT, +]; const PROCESS_PANEL_LOG_EVENT_TYPES = [ LogEventType.ARV_MOUNT, @@ -111,4 +358,11 @@ const PROCESS_PANEL_LOG_EVENT_TYPES = [ LogEventType.NODE_INFO, LogEventType.STDERR, LogEventType.STDOUT, + LogEventType.CONTAINER, + LogEventType.KEEPSTORE, +]; + +const NON_SORTED_LOG_TYPES = [ + LogEventType.NODE_INFO, + LogEventType.CONTAINER, ];